ETL实践:从旺店通到MySQL的数据转换与批量写入

  • 轻易云集成顾问-黄宏棵
### 案例分享:旺店通·旗舰奇门数据集成到MySQL 在构建高效的数据集成方案中,如何确保数据的准确与实时传输是每个技术专家必须面对的挑战。本文将分享一个具体案例:如何通过轻易云数据集成平台,将旺店通·旗舰奇门系统中的采购退货单——wdt.wms.stockout.purchasereturn.querywithdetail接口获取的数据,可靠地写入到MySQL数据库。 此方案对接了名为“旺店通旗舰版-采购退货单-->BI泰海-采购退货单表_原始查询(2024年起)”的实际运行环境,通过发挥平台强大的批量处理能力和定制化数据映射功能,实现了大规模、高吞吐量的数据同步。以下将详细描述这个过程中关键技术要点及操作流程。 首先,本次集成任务需解决几个核心问题: 1. **API调用与分页控制**: 采用`wdt.wms.stockout.purchasereturn.querywithdetail`接口,需处理返回结果的分页,以确保全量数据不漏单。 2. **高并发写入支持**: MySQL API的`batchexecute`提供了批量执行能力,使大量记录能够迅速且稳定地存储进数据库。 3. **实时监控与告警机制**: 配置集中监控和告警系统,及时捕获并应对可能出现的问题。 4. **异常处理与错误重试**: 针对网络波动或服务端响应延迟等情况,实现完善的异常捕捉和自动重试策略。 通过以上措施,我们不仅保障了数据传输过程中的完整性,还优化了不同系统间的数据格式差异,并实现双向联动,在提高业务效率同时,大幅降低人力投入成本。在接下来的部分,我们将深入探讨具体实施细节,包括API参数设置、流控管理、以及Error Handling等关键操作步骤。 ![电商OMS与WMS系统接口开发配置](https://pic.qeasy.cloud/D20.png~tplv-syqr462i7n-qeasy.image) ### 调用旺店通·旗舰奇门接口wdt.wms.stockout.purchasereturn.querywithdetail获取并加工数据 在数据集成生命周期的第一步,我们需要从源系统中调用API接口获取原始数据,并对其进行初步处理和清洗。本文将详细探讨如何通过轻易云数据集成平台调用旺店通·旗舰奇门接口`wdt.wms.stockout.purchasereturn.querywithdetail`,并对返回的数据进行加工。 #### 接口调用配置 首先,我们需要配置API接口的调用参数。根据提供的元数据配置,接口使用POST方法,通过分页参数和业务参数来控制查询范围和结果集。 ```json { "api": "wdt.wms.stockout.purchasereturn.querywithdetail", "effect": "QUERY", "method": "POST", "number": "order_no", "id": "stockout_id", "name": "tid", "request": [ { "field": "pager", "label": "分页参数", "type": "object", "describe": "分页参数", "children": [ { "field": "page_size", "label": "分页大小", "type": "string", "describe": "分页大小", "value": "50" }, { "field": "page_no", "label": "页号", "type": "string", "describe": "页号", "value": "1" } ] }, { "field": "params", "label": "业务参数", "type": "object", "describe": "业务参数", ... ``` #### 分页参数 分页参数用于控制每次请求返回的数据量和页码,以避免一次性获取过多数据导致性能问题。默认配置为每页50条记录,从第一页开始: ```json { ... { field: 'pager', label: '分页参数', type: 'object', describe: '分页参数', children: [ { field: 'page_size', label: '分页大小', type: 'string', describe: '分页大小', value: '50' }, { field: 'page_no', label: '页号', type: 'string', describe: '页号', value: '1' } ] } } ``` #### 业务参数 业务参数主要包括时间范围和时间类型,用于指定查询条件。时间范围通过`start_time`和`end_time`字段定义,分别对应上次同步时间和当前时间: ```json { ... { field: 'params', label: '业务参数', type: 'object', describe: '业务参数', children: [ { field: 'start_time', label: '开始时间', type: 'string', describe: '开始时间', value: '{{LAST_SYNC_TIME|datetime}}' }, { field: 'end_time', label: '结束时间', type: 'string', describe: '结束时间', value: '{{CURRENT_TIME|datetime}}' }, { field: 'time_type', label: '时间类型\n\n1:出库时间\n\n2:创建时间\n\n3:最后修改时间\n\n默认1', type: 'string', describe': ‘时间类型’, value': ‘3’ } ] } } ``` #### 数据清洗与转换 在获取到原始数据后,需要对数据进行清洗与转换,以便后续处理。轻易云平台提供了自动填充响应(autoFillResponse)功能,可以简化这一过程。 例如,对于返回的采购退货单详情列表(details_list),可以通过beatFlat选项将其平铺展开,方便后续的数据处理: ```json { ... autoFillResponse': true, beatFlat': ['details_list'] } ``` #### 实际应用案例 假设我们需要从2024年起的采购退货单数据,并将其写入BI泰海的采购退货单表。首先,我们会设置合适的开始和结束时间,然后通过上述配置调用API接口获取数据。 ```json { "api":"wdt.wms.stockout.purchasereturn.querywithdetail","effect":"QUERY","method":"POST","number":"order_no","id":"stockout_id","name":"tid","request":[{"field":"pager","label":"分页参数","type":"object","describe":"分页参数","children":[{"field":"page_size","label":"分页大小","type":"string","describe":"分页大小","value":"50"},{"field":"page_no","label":"页号","type":"string","describe":"页号","value":"1"}]},{"field":"params","label":"业务参数","type":"object","describe":"业务参数","children":[{"field":"start_time","label":"开始时间","type":"string","describe":"开始时间","value":"'2024-01-01T00:00:00Z'"},{"field':'end_time','label':'结束时间','type':'string','describe':'结束时间','value':'{{CURRENT_TIME|datetime}}'},{'field':'time_type','label':'时间类型\n\n1:出库时间\n\n2:创建时间\n\n3:最后修改时间\n\n默认1','type':'string','describe':'时间类型','value':'3'}]}],'autoFillResponse':true,'beatFlat':['details_list']} } ``` 通过上述步骤,我们成功地从旺店通·旗舰奇门系统中获取了所需的采购退货单数据,并进行了初步清洗与转换,为后续的数据写入奠定了基础。这一过程展示了如何利用轻易云平台实现高效、透明的数据集成操作。 ![钉钉与ERP系统接口开发配置](https://pic.qeasy.cloud/S5.png~tplv-syqr462i7n-qeasy.image) ### 数据集成生命周期中的ETL转换与写入MySQL 在数据集成的生命周期中,ETL(提取、转换、加载)是关键的一步。本文将详细探讨如何将源平台的数据经过ETL转换后,写入目标平台MySQL,通过API接口实现数据的无缝对接。 #### 数据请求与清洗 在数据集成过程中,首先要从源平台获取原始数据,并进行必要的清洗操作。这一步确保数据的准确性和一致性,为后续的转换和写入奠定基础。 #### 数据转换与写入 接下来是数据转换与写入阶段。我们需要将清洗后的数据转化为目标平台MySQL能够接受的格式,并通过API接口进行写入。以下是具体的技术步骤和配置。 #### 元数据配置解析 元数据配置是整个ETL过程中的核心部分,它定义了如何将源数据字段映射到目标数据库字段。以下是一个典型的元数据配置示例: ```json { "api": "batchexecute", "effect": "EXECUTE", "method": "SQL", "number": "id", "id": "id", "name": "id", "idCheck": true, "request": [ {"field": "stockout_id", "label": "出库单id", "type": "string", "value": "{stockout_id}"}, {"field": "order_no", "label": "出库单号", "type": "string", "value": "{order_no}"}, {"field": "src_order_no", "label": "采购退货单号", "type": "string", "value": "{src_order_no}"}, {"field": "warehouse_no", "label": "仓库编号", "type": "string", "value": "{warehouse_no}"}, {"field": "consign_time", "label": "发货时间", "type": string, value: "{{consign_time|datetime}}"}, // 更多字段... ], // SQL主语句 { field: 'main_sql', label: '主语句', type: 'string', describe: 'SQL首次执行的语句,将会返回:lastInsertId', value: 'REPLACE INTO wdt_wms_stockout_purchasereturn_querywithdetail (stockout_id, order_no, src_order_no, warehouse_no, consign_time, status, goods_count, logistics_no, post_fee, receiver_name, receiver_province, receiver_city, receiver_district, receiver_address, receiver_telno, remark, weight, provider_no, provider_name, last_load_purchase_no, goods_type_count, create_time, operator_name, goods_total_cost, goods_total_amount, checked_goods_total_cost, modified) VALUES' }, { field: 'limit', label: 'limit', type: 'string', value: '100' } } ``` #### API接口调用 在轻易云平台中,我们使用`batchexecute` API来批量执行SQL语句,实现数据写入。以下是一个示例代码片段,用于调用API并执行SQL插入操作: ```python import requests import json url = 'https://api.example.com/batchexecute' headers = {'Content-Type': 'application/json'} payload = { # 填充元数据配置中的各个字段值 } response = requests.post(url, headers=headers, data=json.dumps(payload)) if response.status_code == 200: print('Data inserted successfully') else: print('Failed to insert data:', response.text) ``` #### 数据字段映射与格式转换 在实际操作中,需要特别注意字段类型和格式的转换。例如,时间字段通常需要转换为标准的日期时间格式,这可以通过元数据配置中的`datetime`函数实现: ```json {"field":"consign_time","label":"发货时间","type":"string","value":"{{consign_time|datetime}}"} ``` 类似地,其他字段也需要根据需求进行相应的格式化处理,以确保符合目标数据库的要求。 #### 执行SQL语句 最终,我们通过构建完整的SQL插入语句,将处理后的数据写入MySQL数据库。以下是一个完整的SQL插入示例: ```sql REPLACE INTO wdt_wms_stockout_purchasereturn_querywithdetail ( stockout_id, order_no, src_order_no, warehouse_no, consign_time, status, goods_count, logistics_no, post_fee, receiver_name, receiver_province, receiver_city, receiver_district, receiver_address, receiver_telno, remark, weight, provider_no, provider_name, ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ``` 每个问号代表一个占位符,对应于元数据配置中的字段值。在执行时,这些占位符将被实际的数据替换,从而完成插入操作。 #### 总结 通过上述步骤,我们实现了从源平台到目标平台MySQL的数据ETL过程。利用轻易云提供的全异步、多系统支持特性,我们能够高效地完成不同系统间的数据集成任务。关键在于正确配置元数据,并通过API接口实现自动化的数据处理和写入,从而提升业务效率和透明度。 ![系统集成平台API接口配置](https://pic.qeasy.cloud/T27.png~tplv-syqr462i7n-qeasy.image)