ETL实践:从旺店通到MySQL的数据转换与加载

  • 轻易云集成顾问-张妍琪
### 系统对接集成案例:旺店通·旗舰奇门数据集成到MySQL 在构建高效的数据处理和管理系统中,数据的快速、高效、可靠传输是关键。本文将详细介绍如何利用轻易云数据集成平台,将“旺店通·旗舰版-采购入库单”通过API接口(wdt.wms.stockin.purchase.querywithdetail)解决方案,实现从原始查询到BI泰海系统的MySQL数据库表的无缝对接。 **技术实现概述** 1. **接口调用与分页处理** 使用`wdt.wms.stockin.purchase.querywithdetail` API获取采购入库单,我们在设计时需要特别注意分页和限流问题,以确保每页的数据都被完整、准确地抓取而不漏单。在实际操作中,通过添加批量任务并调节接口请求频率,有效降低了API调用异常风险。 2. **实时监控与性能优化** 为保障整个数据传输过程的稳定性,我们引入了集中监控和告警系统,实时跟踪各个节点的数据流动状态。同时,对采集端进行性能调优,提高吞吐量,使大量数据能以最快速度写入MySQL数据库,从而提升整体工作效率。 3. **自定义转换逻辑与格式差异处理** 面临不同系统之间的数据格式差异,通过轻易云平台灵活强大的自定义转换功能,可以将源数据结构调整为目标数据库所需格式。这一步骤不仅简化了后续处理操作,也为进一步分析提供了可靠基础。 4. **异常检测与重试机制** 在多线程、大批量操作情况下,不可避免会出现一些异常情况。我们设计了一套错误重试机制,如遇网络或API响应问题,可以自动重新尝试相关任务,确保所有有效记录最终成功传递至MySQL。此外,还设定报警阈值,当错误次数过高时及时通知管理员干预。 5. **日志记录和透明化管理** 数据处理过程中,引入全流程日志记录,帮助定位及解决可能存在的问题,并且通过可视化工具展示每一个环节的进度,让业务方明确了解当前汇总结果及其背后的运行细节,大大提高业务透明度及决策准确性。 通过这次配置实施,我们成功实现了从“旺店通·旗舰奇门”到“MySQL”的高效、安全、有序地同步,为企业全面掌握采购入库单信息及更好发挥BI报表功能奠定坚实基础。下一步,我们将分步骤描述具体部署过程,包括环境准备、参数设置以及代码实例等内容。 ![如何开发企业微信API接口](https://pic.qeasy.cloud/D1.png~tplv-syqr462i7n-qeasy.image) ### 调用旺店通·旗舰奇门接口wdt.wms.stockin.purchase.querywithdetail获取并加工数据 在数据集成生命周期的第一步,我们需要调用源系统的API接口以获取原始数据,并对其进行初步加工。本文将详细探讨如何使用轻易云数据集成平台调用旺店通·旗舰奇门接口`wdt.wms.stockin.purchase.querywithdetail`,并对返回的数据进行处理。 #### 接口调用配置 首先,我们需要配置接口的元数据。以下是该接口的元数据配置: ```json { "api": "wdt.wms.stockin.purchase.querywithdetail", "effect": "QUERY", "method": "POST", "number": "order_no", "id": "stockin_id", "name": "tid", "request": [ { "field": "pager", "label": "分页参数", "type": "object", "describe": "分页参数", "children": [ { "field": "page_size", "label": "分页大小", "type": "string", "describe": "分页大小", "value": "50" }, { "field": "page_no", "label": "页号", "type": "string", "describe": "页号", "value": "1" } ] }, { "field": "params", "label": "业务参数", "type": "object", "describe": "业务参数", "children": [ { "field": "start_time", "label": "开始时间", ``type``: ``string``, ``describe``: ``开始时间``, ``value``: ``{{LAST_SYNC_TIME|datetime}}`` }, { ``field``: ``end_time``, ``label``: ``结束时间``, ``type``: ``string``, ``describe``: ``结束时间``, ``value``: ``{{CURRENT_TIME|datetime}}`` } ] } ], ``autoFillResponse``: true, ``beatFlat``: [``details_list``] } ``` #### 请求参数解析 在请求参数中,我们主要关注两个部分:分页参数和业务参数。 1. **分页参数**: - `page_size`: 每页返回的数据条数,默认设置为50。 - `page_no`: 当前请求的页码,默认设置为1。 2. **业务参数**: - `start_time`: 数据查询的起始时间,使用占位符`{{LAST_SYNC_TIME|datetime}}`动态填充。 - `end_time`: 数据查询的结束时间,使用占位符`{{CURRENT_TIME|datetime}}`动态填充。 这些参数确保了我们可以灵活地控制数据查询的范围和分页,从而高效地获取所需数据。 #### 数据获取与初步加工 在成功调用接口并获取到原始数据后,我们需要对返回的数据进行初步加工。由于接口返回的数据可能包含嵌套结构,因此我们需要将其扁平化处理,以便后续的数据转换和写入操作。 例如,接口返回的数据可能包含一个名为`details_list`的嵌套字段。通过配置中的`beatFlat`属性,我们可以自动将该字段扁平化处理,使得每条记录都能独立存在于最终的数据集中。 #### 实践案例 假设我们需要从2024年1月1日起至今的采购入库单数据,可以通过以下步骤实现: 1. **设置请求参数**: ```json { ``pager``: { ``page_size``: ``50``, ``page_no``: ``1`` }, ``params``: { ``start_time``: ``2024-01-01T00:00:00Z``, ``end_time``: `CURRENT_TIMESTAMP` } } ``` 2. **调用API**: 使用POST方法发送上述请求至接口地址,并获取响应数据。 3. **处理响应数据**: 对返回的数据进行扁平化处理,将嵌套字段如`details_list`展开为独立记录。 通过上述步骤,我们能够高效地从旺店通·旗舰奇门系统中获取采购入库单数据,并为后续的数据转换和写入做好准备。这一过程展示了如何利用轻易云数据集成平台的强大功能,实现复杂系统间的数据无缝对接。 ![金蝶与WMS系统接口开发配置](https://pic.qeasy.cloud/S16.png~tplv-syqr462i7n-qeasy.image) ### 使用轻易云数据集成平台进行ETL转换并写入MySQL 在数据集成生命周期的第二步,我们将重点探讨如何使用轻易云数据集成平台将源平台的数据进行ETL(提取、转换、加载)转换,并最终写入目标平台MySQL。本文将深入探讨系统接口和数据集成的技术细节。 #### 数据提取与清洗 首先,从源平台(旺店通旗舰版)提取采购入库单的数据。这一步骤涉及到对原始数据的获取和初步清洗,以确保数据质量和一致性。由于本文重点在于ETL转换,因此不再详细描述数据提取过程。 #### 数据转换与写入 接下来,进入ETL生命周期中的关键步骤:数据转换与写入。我们需要将已经清洗好的数据转化为目标平台MySQL能够接收的格式,并通过API接口将其写入数据库。 ##### 元数据配置解析 以下是元数据配置,用于定义如何将源平台的数据字段映射到目标MySQL数据库表中: ```json { "api": "batchexecute", "effect": "EXECUTE", "method": "SQL", "number": "id", "id": "id", "name": "id", "idCheck": true, "request": [ {"field":"stockin_id","label":"入库单id","type":"string","value":"{stockin_id}"}, {"field":"order_no","label":"入库单号","type":"string","value":"{order_no}"}, {"field":"warehouse_no","label":"仓库编号","type":"string","value":"{warehouse_no}"}, {"field":"status","label":"状态","type":"string","value":"{status}"}, {"field":"modified","label":"修改时间","type":"string","value":"{{modified|datetime}}"}, {"field":"created_time","label":"制单时间","type":"string","value":"{{created_time|datetime}}"}, {"field":"remark","label":"备注","type":"string","value":"{remark}"}, {"field":"logistics_type_name","label":"入库单物流类型","type":"string","value":"{logistics_type_name}"}, {"field":"check_time","label":"审核时间","type":"string","value":"{{check_time|datetime}}"}, {"field":"purchase_id","label":"采购单id","type":"string","value":"{purchase_id}"}, {"field":"purchase_no","label":"采购单号","type":"string","value":"{purchase_no}"}, {"field":"goods_count","label":"货品数量","type":"string","value":"{goods_count}"}, {"field":"provider_no","label":"供应商编号","type\":\"string\",\"value\":\"{provider_no}\"}, {"field\":\"provider_name\",\"label\":\"供应商名称\",\"type\":\"string\",\"value\":\"{provider_name}\"}, {\"field\":\"logistics_no\",\"label\":\"物流单号\",\"type\":\"string\",\"value\":\"{logistics_no}\"}, {\"field\":\"logistics_name\",\"label\":\"物流公司\",\"type\":\"string\",\"value\":\"{logistics_name}\"}, {\"field\":\"goods_amount\",\"label\":\"货品总价格,不包含优惠\",\"type\":\"string\",\"value\":\"{goods_amount}\"}, {\"field\":\"total_price\",\"label\":\"税前总货款(折后)\",\"type\":\"string\",\"value\":\"{total_price}\"}, {\"field\":\"tax_amount\",\"label\":\"税后总额(折后)\",\"type\":\"string\",\"value\":\"{tax_amount}\"}, {\"field\":\"total_stockin_price\",\"label\":\"入库总金额\",\"type\":\”string\”,\”value\”:\”{total_stockin_price}\”}, {\"field\":\”flag_name\”,\”label\”: \”标记名称\”, \”type\”: \”string\”, \”value\”: \”{flag_name}\”}, {\"field\": \”operator_name\", \”label\": \”经办人\", \”type\": \”string\", \”value\": \"{operator_name}\"}, {\"field\": \"details_list_rec_id\", \"label\": \"入库单明细id\", \"type\": \"string\", \"value\": \"{details_list_rec_id}\"}, {\"field\": \"details_list_num\", \"label\": \"数量\", \"type\": \"string\", \"value\": \"{details_list_num}\"}, {\"field\": \"details_list_discount\", \"label\": \"折扣\", \"type\": \"string\", \"value\": \"{goods_amount}\"}, {\"field\": \"details_list_cost_price\", \"label\": \"税前单价\", \"type\": “ string”, “ value”: “ {details_list_cost_price}\ ” }, {\" field”: “ details_list_src_price”, “ label”: “ 税前折后单价”, “ type”: “ string”, “ value”: “ {details_list_src_price}\ ” }, {\" field”: “ details_list_tax_price”, “ label”: “ 税后单价”, “ type”: “ string”, “ value”: “ {details_list_tax_price}\ ” }, {\" field”: “ details_list_tax_amount”, “ label”: “ 税后金额(税后总价)”,“ type”:" string "," value ":" {details_list_tax_amount}" }, {\" field ":“ details_list_tax ",“ label ":“ 税率 ",“ type ":“ string ",“ value ":“ {details_list_tax}" }, {\" field ":“ details_list_total_cost ",“ label ":“ 税前金额(税前总价)",“ type ":" string "," value ":" {details_list_total_cost}" }, {\" field ":“ details_list_remark ",“ label ":“ 入库单明细备注 ",“ type ":" string "," value ":" {details_list_remark}" }, {\" field ":“ details_list_goods_name ",“ label ":“ 货品名称 ",“ type ":" string "," value ":" {details_list_goods_name}" }, {\" field ":“ details_list_goods_no ",“ label ":“ 货品编号 ",“ type ":" string "," value ":" {details_list_goods_no}" }, {\" field ":“ details_list_spec_no ",“ label ":“ 商家编码 ",“ type ":" string "," value ":" {details_list_spec_no}" }, {\" field ":“ details_list_spec_code ",“ label ":‘规格码’,‘类型’:‘字符串’,‘值’:‘{详细列表规格代码}’} {\" field ': ‘详细列表道具1’, ‘标签’:‘采购明细自定义属性1’, ‘类型’:‘字符串’, ‘值’:‘{详细列表道具1}’} {\" 字段': ‘详细列表道具2’, ‘标签’:‘采购明细自定义属性2’, ‘类型’:‘字符串’, ‘值’:‘{详细列表道具2}’} {\" 字段': ‘详细列表道具3’, ‘标签’:‘采购明细自定义属性3’, ‘类型’:‘字符串’, ‘值’:‘{详细列表道具3}’} {\" 字段': ‘详细列表道具4’, ‘标签’:‘采购明细自定义属性4’, ‘类型’:‘字符串’, ‘值’:‘{详细列表道具4}’} {\" 字段': '详情_列表_规格_名称', '标签':'规格名称', '类型':'字符串', '值':'{详情_列表_规格_名称}'} {\" 字段': '详情_列表_品牌_名称', '标签':'品牌名称', '类型':'字符串', '值':'{详情_列表_品牌_名称}'} {\" 字段': '详情_列表_单位_名称', '标签':'单位', '类型':'字符串', '值':'{详情_列表_单位_名称}'} {\" 字段': '详情_列表批次号', '标签':'批次', '类型':'字符串', '值':'{详情批次号}'} {\" 字段': '详情批次有效期', '标签':'有效期', '类型':'字符串', '值':'{{有效期|日期时间}}'} {\" 字段': '生产日期', '标签':'生产日期', '类型':'字符串', '值':'{{生产日期|日期时间}}'} {\" 字段': '位置编号', '标签':'货位', '类型':'字符串', '值':'位置编号'} {\" 字段': 是否缺陷, 标签: 是否缺陷, 类型: 字符串, 值: 是否缺陷 } ], "otherRequest":[ { "字段":主sql, 标签: 主语句, 类型: 字符串, 描述: SQL首次执行的语句,将会返回:lastInsertId, 值: REPLACE INTO wdt_wms_stockin_purchase_querywithdetail (stockin_id,order_no,warehouse_no,status,modified,created_time,remark,logistics_type_name,check_time,purchase_id,purchase_no,goods_count,provider_no,provider_name,logistics_no,logistics_name,goods_amount,total_price,tax_amount,total_stockin_price,flag_name,operator_name,details_rec_id,num_discount,cost_price_src_tax_total_cost_remark_goods_goods_spec_code_prop1_prop2_prop3_prop4_spec_brand_unit_batch_expire_production_position_defect_unit_ratio_purchase_unit_stockin) VALUES }, { 字段: limit, 标签: limit, 类型: 字符串, 值:100 } ] } ``` 在这个配置中,每个`request`字段都定义了一个从源平台到目标MySQL表的映射关系。例如: - `{"字段":"stockin_id","标签":"入库单id","类型":"字符串","值":"stockin_id"}` 将源平台的`stockin_id`映射到目标表中的`stockin_id`字段。 - `{"字段":"modified","标签":"修改时间","类型":"字符串","值":"{{modified|datetime}}"}` 将源平台的`modified`字段经过日期时间格式化后映射到目标表中的`modified`字段。 ##### SQL主语句配置 主语句用于执行实际的数据插入操作: ```json { "字段":主sql, 标签: 主语句, 类型: 字符串, 描述: SQL首次执行的语句,将会返回:lastInsertId, 值: REPLACE INTO wdt_wms_stockin_purchase_querywithdetail (stockin_id,order_no,warehouse_no,status,modified,created_time,remark,logistics_type_name,check_time,purchase_id,purchase_no,goods_count,provider_no,provider_name,logistics_no,logistics_name,goods_amount,total_price,tax_amount,total_stockin_price) VALUES } ``` 该主语句使用了REPLACE INTO语法,将所有映射后的字段插入到目标表中。如果记录已经存在,则替换现有记录。 ##### 数据写入过程 通过调用API接口 `batchexecute` 并传递上述配置,我们可以实现批量执行SQL插入操作。API请求示例如下: ```json { api:"batchexecute", effect:"EXECUTE", method:"SQL", number:"id", id:"id", name:"id", idCheck:true, request:[ // 映射后的具体请求内容... ], otherRequest:[ { field:"main_sql", label:"主语句", type:"string", describe:"SQL首次执行的语句,将会返回:lastInsertId", value:"REPLACE INTO wdt_wms_stockin_purchase_querywithdetail (stockin_id,..." }, { field:"limit", label:"limit", type:"string", value:"100" } ] } ``` 通过上述配置和API调用,可以高效地将源平台的数据转换并写入到目标MySQL数据库中,实现不同系统间的数据无缝对接。 #### 总结 本文深入探讨了如何使用轻易云数据集成平台进行ETL转换,并通过API接口将处理后的数据写入目标MySQL数据库。在实际操作中,通过精确配置元数据和合理设计API请求,可以高效完成复杂的数据集成任务,提高业务透明度和效率。 ![用友与SCM系统接口开发配置](https://pic.qeasy.cloud/T13.png~tplv-syqr462i7n-qeasy.image)