数据集成生命周期实践:从数据获取到MySQL批量写入

  • 轻易云集成顾问-彭亮
### 旺店通·旗舰奇门数据集成到MySQL:从货品档案查询到BI系统的高效迁移 在技术团队处理电商平台和业务分析系统的数据对接过程中,高效稳定的API调用及数据传输是至关重要的一环。本文将详细介绍如何使用轻易云数据集成平台,实现旺店通·旗舰版货品档案接口(`wdt.goods.goods.querywithspec`)与MySQL数据库之间的数据集成,通过批量写入API(`batchexecute`)实现高吞吐量的数据处理。 本案例聚焦于“旺店通旗舰版-货品档案查询-->BI泰海-货品档案表(奇门)”方案,旨在解决以下几个关键技术问题: 1. **定时可靠地抓取旺店通·旗舰奇门接口数据**: - 结合分页及限流机制,确保大规模数据获取的真实性和完整性。 2. **自定义转换逻辑适应不同的数据结构**: - 配置映射规则,将原始JSON格式转换为适配MySQL存储标准的数据模型。 3. **批量写入与吞吐量优化**: - 利用MySQL API提供的批量执行功能,实现大量数据快速、高效地写入操作。 4. **实时监控与异常处理机制**: - 集成全程启用集中化监控和告警系统,对任务状态进行追踪;并采用错误重试策略,提高任务成功率。 5. **日志记录与质量监控**: - 全面记录操作日志,以便日后检索与审查;通过自动化工具实现对异常情况的及时捕捉和响应。 除了这些核心内容,我们还将简要讨论如何利用轻易云平台提供的可视化设计工具构建更加直观、易管理的数据流,并着重提及在实际配置过程中的一些实用技巧。希望此文能为各位技术同仁提供有价值的参考,为类似业务场景下的数据集成工作贡献一份力量。 ![金蝶与外部系统打通接口](https://pic.qeasy.cloud/D3.png~tplv-syqr462i7n-qeasy.image) ### 调用源系统旺店通·旗舰奇门接口wdt.goods.goods.querywithspec获取并加工数据 在数据集成的生命周期中,调用源系统接口是至关重要的一步。本文将详细探讨如何使用旺店通·旗舰奇门接口`wdt.goods.goods.querywithspec`获取并加工数据,以实现高效的数据集成。 #### 接口概述 接口`wdt.goods.goods.querywithspec`用于查询货品档案信息,支持通过时间范围和分页参数进行查询。该接口采用POST请求方式,返回的数据包含货品的详细信息,包括规格列表等。 #### 元数据配置解析 根据提供的元数据配置,我们可以看到该接口的具体配置如下: - **API**: `wdt.goods.goods.querywithspec` - **请求方法**: `POST` - **主要字段**: - `number`: `goods_no` - `id`: `goods_no` - `name`: `brand_name` 请求参数分为两个主要部分:查询参数和分页参数。 1. **查询参数**: - `start_time`: 查询开始时间,使用占位符`{{LAST_SYNC_TIME|datetime}}`表示上次同步时间。 - `end_time`: 查询结束时间,使用占位符`{{CURRENT_TIME|datetime}}`表示当前时间。 2. **分页参数**: - `page_size`: 每页记录数,默认值为200。 - `page_no`: 页号,默认值为1。 此外,元数据配置中还包含一个重要属性`autoFillResponse`,设置为true表示自动填充响应数据,以及一个数组属性`beatFlat`用于平铺嵌套字段,这里指定了`spec_list`。 #### 数据请求与清洗 在实际操作中,我们首先需要构建请求体。根据元数据配置,请求体应包含查询参数和分页参数: ```json { "params": { "start_time": "{{LAST_SYNC_TIME|datetime}}", "end_time": "{{CURRENT_TIME|datetime}}" }, "pager": { "page_size": "200", "page_no": "1" } } ``` 发送POST请求后,我们会收到包含货品信息的响应数据。由于设置了`autoFillResponse: true`,响应中的嵌套字段(如规格列表)会被自动展开。这一步骤极大简化了后续的数据处理工作。 #### 数据转换与写入 接下来,我们需要对获取到的数据进行转换,以符合目标系统(如BI泰海)的要求。假设目标系统需要以下字段: - 货品编号(goods_no) - 品牌名称(brand_name) - 规格列表(spec_list) 我们可以通过编写脚本或使用轻易云平台的内置工具,对响应数据进行清洗和转换。例如,将规格列表中的每个规格项单独提取出来,并与对应的货品信息关联。 ```python def transform_data(response): transformed_data = [] for item in response['data']: goods_no = item['goods_no'] brand_name = item['brand_name'] for spec in item['spec_list']: transformed_data.append({ 'goods_no': goods_no, 'brand_name': brand_name, 'spec': spec }) return transformed_data ``` 上述函数将原始响应数据转换为目标格式,每个规格项都独立记录,并保留其所属货品的信息。 #### 实践案例 假设我们在一次实际操作中,需要从旺店通系统中获取最近24小时内更新的货品档案,并将其导入到BI泰海系统中。首先,我们构建请求体: ```json { "params": { "start_time": "2023-10-01T00:00:00", "end_time": "2023-10-02T00:00:00" }, "pager": { "page_size": "200", "page_no": "1" } } ``` 发送请求后,我们得到如下响应(简化示例): ```json { "data": [ { "goods_no": "G12345", "brand_name": "BrandA", "spec_list": [ {"spec_id": "S1", "spec_name": "Size M"}, {"spec_id": "S2", "spec_name": "Size L"} ] }, { "goods_no": "G67890", "brand_name": "BrandB", "spec_list": [ {"spec_id": "S3", "spec_name": "Size S"} ] } ] } ``` 通过上述转换函数处理后,我们得到以下格式的数据: ```json [ {"goods_no": "G12345", "brand_name": "BrandA", "spec": {"spec_id":"S1", "spec_name":"Size M"}}, {"goods_no": "G12345", "brand_name":"BrandA", "spec":{"spec_id":"S2"," spec_name":"Size L"}}, {"goods_no":"G67890","brand_name":"BrandB"," spec":{" spec_id":"S3"," spec_name":"Size S"}} ] ``` 这些数据即可直接写入到BI泰海系统中的货品档案表,实现无缝对接。 通过以上步骤,我们展示了如何利用旺店通·旗舰奇门接口获取并加工数据,为后续的数据集成奠定基础。这一过程不仅提高了数据处理效率,也确保了数据的一致性和准确性。 ![电商OMS与ERP系统接口开发配置](https://pic.qeasy.cloud/S5.png~tplv-syqr462i7n-qeasy.image) ### 数据集成生命周期中的ETL转换:将源平台数据写入MySQL API接口 在数据集成的生命周期中,ETL(提取、转换、加载)是至关重要的一步。本文将详细探讨如何使用轻易云数据集成平台,将源平台的数据进行转换,并通过MySQL API接口写入目标平台。我们将以旺店通旗舰版的货品档案查询数据为例,展示具体的实现步骤和技术细节。 #### 元数据配置解析 首先,我们需要理解元数据配置中的各个字段及其对应关系。这些字段定义了从源平台提取的数据如何映射到目标平台的数据库表中。 ```json { "api": "batchexecute", "effect": "EXECUTE", "method": "POST", "idCheck": true, "request": [ {"field":"goods_id","label":"货品id","type":"string","value":"{goods_id}"}, {"field":"goods_no","label":"货品编号","type":"string","value":"{goods_no}"}, {"field":"goods_name","label":"货品名称","type":"string","value":"{goods_name}"}, // ...省略其他字段... {"field":"spec_list_tax_code","label":"税务编码","type":"string","value":"{spec_list_tax_code}"} ], "otherRequest": [ { "field": "main_sql", "label": "主语句", "type": "string", "describe": "111", "value": "REPLACE INTO goods_querywithspec (goods_id, goods_no, goods_name, short_name, alias, goods_type, spec_count, brand_name, brand_id, remark, prop1, prop2, prop3, prop4, prop5, prop6, origin, class_name, class_id, unit_name, aux_unit_name, flag_name, deleted, goods_modified, goods_created, modified,spec_list_goods_id,spec_list_spec_id,spec_list_spec_no,spec_list_spec_code,spec_list_barcode,spec_list_spec_name,spec_list_lowest_price,spec_list_retail_price,spec_list_wholesale_price,spec_list_member_price,spec_list_market_price,spec_list_validity_days,spec_list_sales_days,spec_list_receive_days,spec_list_weight,spec_list_length,spec_list_width,spec_list_height,spec_list_sn_type,spec_list_is_lower_cost,spec_list_is_not_use_air,spec_list_wms_process_mask,spec_list_tax_rate,spec_list_large_type,spec_list_goods_label,spec_list_deleted,spec_list_remark,spec_list_spec_modified,spec_list_spec_created,spec_list_prop1,spec_list_prop2,spec_list_prop3,spec_list_prop4,spec_list_prop5,spec_list_prop6,spec_list_custom_price1,spec_list_custom_price2,spec_list_img_url ,spec_list_spec_unit_name ,spec list_spec_aux_unit_name ,spec list_tax_code) VALUES" }, { "field": "limit", "label": "limit", "type": "string", "describe": "111", "value": "1000" } ] } ``` #### 数据请求与清洗 在ETL过程的第一步,我们需要从源平台请求数据并进行清洗。假设我们已经完成了这一步,并且得到了一个结构化的数据集合。接下来,我们需要将这些数据转换为目标平台能够接受的格式。 #### 数据转换与写入 轻易云数据集成平台提供了灵活的配置方式,使得我们可以通过简单的配置完成复杂的数据转换和写入操作。在上述元数据配置中,我们定义了一个`main_sql`语句,用于将清洗后的数据插入到MySQL数据库中的`goods_querywithspec`表中。 ##### 配置API接口 1. **API Endpoint**: `batchexecute` - 这个API用于批量执行SQL语句,将大批量的数据一次性写入数据库。 2. **HTTP Method**: `POST` - 使用POST方法发送请求,确保数据安全传输。 3. **ID Check**: `true` - 在写入之前检查记录是否已经存在,以避免重复插入。 ##### 映射字段 在`request`部分,我们列出了所有需要映射的字段。例如: - `{"field":"goods_id", ...}` 映射到 MySQL 表中的 `goods_id` 字段。 - `{"field":"goods_no", ...}` 映射到 MySQL 表中的 `goods_no` 字段。 - ... 这些映射确保了从源平台提取的数据能够正确地转换为目标平台所需的格式。 ##### SQL语句 在`otherRequest`部分,我们定义了一个关键字段: - `main_sql`: ```sql REPLACE INTO goods_querywithspec (goods_id,...spec list_tax_code) VALUES ``` 这个SQL语句使用了`REPLACE INTO`,确保如果记录已经存在则更新,否则插入新记录。这对于保持数据库的一致性和完整性非常重要。 #### 执行ETL任务 一旦配置完成,我们可以通过轻易云数据集成平台执行这个ETL任务。以下是执行流程: 1. **发送请求**: 使用配置好的API和HTTP方法发送请求。 2. **检查响应**: 确保响应状态码为200,表示成功。 3. **日志记录**: 实时监控日志,确保每条记录都正确处理。 通过这种方式,我们可以高效地将源平台的数据转换并写入到目标MySQL数据库中,实现不同系统间的数据无缝对接。这不仅提升了业务效率,也确保了数据的一致性和可靠性。 ![用友与WMS系统接口开发配置](https://pic.qeasy.cloud/T17.png~tplv-syqr462i7n-qeasy.image)