ETL技术在旺店通到MySQL数据集成中的应用

  • 轻易云集成顾问-贺强
### 旺店通旗舰版-其他出库单-->BI泰海-其他出库单表_原始查询(2024年起)数据集成案例分享 在本技术案例中,我们将深入探讨如何利用轻易云数据集成平台,将旺店通·旗舰奇门的数据无缝对接至MySQL数据库。具体的实现方案是:通过调用旺店通·旗舰奇门接口`wdt.wms.stockout.otherquery.querywithdetail`获取"其他出库单"数据,并将其批量写入MySQL数据库,确保高效且稳定的数据存储和处理。 为了实现这一目标,我们需要克服多个技术难点,包括: 1. **接口调用与分页限流问题**:由于某些API接口存在请求频率和返回记录数限制,需要设计合理的分页抓取策略,以保证所有有效数据都能被完整获取,不漏单。 2. **实时监控与异常处理机制**:在整个数据集成过程中,通过轻易云平台提供的集中式监控和告警系统,实时跟踪任务状态。同时配置错误重试机制,以应对网络波动或API服务异常等情况,从而提升系统可靠性。 3. **自定义数据显示转换逻辑**:考虑到旺店通·旗舰奇门与MySQL之间可能存在的数据格式差异,对获取到的数据进行必要的转换映射。例如,将日期时间字符串转化为标准的时间戳格式,以适应下游业务需求。 4. **高吞吐量快速写入能力**:利用优化后的批量操作API `batchexecute`,显著提升大量出库单记录向MySQL数据库写入的效率,在最短时间内完成大规模历史数据同步,实现性能上的突破。 5. **定时抓取调度任务**:通过定制化调度器,设置固定周期自动执行接口数据抓取任务,为后续分析和报表生成提供及时、准确的数据支撑。此外,还需要考虑在触发过于频繁导致系统负载增加时采取限流措施,以平衡性能与资源使用风险。 上述这些要点不仅仅是一个理论框架,而是在实际工程项目中经过验证行之有效的方法。我们将在之后详细解析每个环节中的具体实施细节及代码实现,对于开发人员而言,这些经验能够为您类似项目中的挑战提供有力支持。 ![如何对接企业微信API接口](https://pic.qeasy.cloud/D8.png~tplv-syqr462i7n-qeasy.image) ### 调用旺店通·旗舰奇门接口wdt.wms.stockout.otherquery.querywithdetail获取并加工数据 在数据集成的生命周期中,调用源系统接口是至关重要的一步。本文将深入探讨如何通过轻易云数据集成平台调用旺店通·旗舰奇门接口`wdt.wms.stockout.otherquery.querywithdetail`,获取并加工其他出库单数据。 #### 接口概述 接口`wdt.wms.stockout.otherquery.querywithdetail`用于查询其他出库单的详细信息。该接口采用POST请求方式,主要参数包括分页参数和业务参数。 #### 元数据配置解析 根据提供的元数据配置,以下是对各字段的详细解析: - **api**: `wdt.wms.stockout.otherquery.querywithdetail` - 接口名称,用于标识具体的API。 - **effect**: `QUERY` - 操作类型,这里表示查询操作。 - **method**: `POST` - 请求方法,使用POST方式提交请求。 - **number**: `order_no` - 表示订单编号字段。 - **id**: `stockout_id` - 表示出库单ID字段。 - **name**: `tid` - 表示交易ID字段。 #### 请求参数配置 请求参数分为分页参数和业务参数两部分: 1. **分页参数(pager)** - **page_size**: 分页大小,默认值为50。 - **page_no**: 页号,默认值为1。 2. **业务参数(params)** - **start_time**: 开始时间,通过模板变量`{{LAST_SYNC_TIME|datetime}}`动态获取上次同步时间。 - **end_time**: 结束时间,通过模板变量`{{CURRENT_TIME|datetime}}`动态获取当前时间。 - **time_type**: 时间类型,默认值为3(最后修改时间)。 #### 请求示例 以下是一个完整的请求示例: ```json { "pager": { "page_size": "50", "page_no": "1" }, "params": { "start_time": "{{LAST_SYNC_TIME|datetime}}", "end_time": "{{CURRENT_TIME|datetime}}", "time_type": "3" } } ``` #### 数据处理与清洗 在接收到接口返回的数据后,需要进行数据处理与清洗。根据元数据配置中的`autoFillResponse`和`beatFlat`属性,可以自动填充响应并将嵌套结构扁平化处理。例如,将响应中的`detail_list`字段展开为平铺结构,以便后续的数据转换与写入操作。 #### 实践案例 假设我们需要将2024年起的其他出库单数据从旺店通导入到BI泰海系统中。首先,我们通过上述接口获取原始数据,并进行必要的清洗和转换。以下是具体步骤: 1. 配置请求参数,确保分页和时间范围设置正确。 2. 调用接口获取原始数据,并检查响应结果是否符合预期。 3. 对响应中的嵌套字段进行扁平化处理,例如将`detail_list`展开。 4. 将清洗后的数据转换为目标系统所需格式,并写入BI泰海系统。 通过这种方式,我们可以高效地实现不同系统间的数据无缝对接,确保数据集成过程的准确性和完整性。 ![用友与WMS系统接口开发配置](https://pic.qeasy.cloud/S27.png~tplv-syqr462i7n-qeasy.image) ### 利用轻易云数据集成平台实现ETL转换并写入MySQLAPI接口 在数据集成过程中,ETL(Extract, Transform, Load)是至关重要的一步。本文将深入探讨如何利用轻易云数据集成平台,将源平台的数据进行ETL转换,并最终写入目标平台MySQL API接口。 #### 数据请求与清洗 在数据请求与清洗阶段,我们已经从源平台(如旺店通旗舰版)获取了相关的出库单数据。这些数据包含多个字段,如出库单ID、出库单编号、业务单号等。在此基础上,我们需要对这些数据进行进一步的转换,以便能够被目标平台MySQL API所接受。 #### 数据转换与写入 为了将清洗后的数据成功写入MySQL数据库,需要进行以下几个关键步骤: 1. **定义API接口元数据配置**: 元数据配置定义了如何将源数据映射到目标数据库的字段中。以下是一个详细的元数据配置示例: ```json { "api": "batchexecute", "effect": "EXECUTE", "method": "SQL", "number": "id", "id": "id", "name": "id", "idCheck": true, "request": [ {"field":"stockout_id","label":"出库单id","type":"string","value":"{stockout_id}"}, {"field":"order_no","label":"出库单编号","type":"string","value":"{order_no}"}, {"field":"src_order_no","label":"业务单号","type":"string","value":"{src_order_no}"}, {"field":"warehouse_no","label":"出库的仓库编号","type":"string","value":"{warehouse_no}"}, {"field":"consign_time","label":"出库时间","type":"string","value":"{{consign_time|datetime}}"}, {"field":"order_type","label":"源单据类别","type":"string","value":"{order_type}"}, {"field":"status","label":"状态","type":"string","value":"{status}"}, {"field":"goods_count","label":"出库数量","type":"string","value":"{goods_count}"}, {"field":"post_fee","label":"邮费","type":"string","value":"{post_fee}"}, {"field":"logistics_no","label":"物流单号","type":"string","value":"{logistics_no}"}, {"field":"receiver_name","label":"收件人姓名","type":"string","value":"{receiver_name}"}, {"field":"receiver_province","label":"省","type":"string","value":"{receiver_province}"}, {"field\":\"receiver_city\",\"label\":\"城市\",\"type\":\"string\",\"value\":\"{receiver_city}\"}, {\"field\":\"receiver_district\",\"label\":\"地区\",\"type\":\"string\",\"value\":\"{receiver_district}\"}, {\"field\":\"receiver_address\",\"label\":\"收件地址\",\"type\":\"string\",\"value\":\"{receiver_address}\"}, {\"field\":\"receiver_mobile\",\"label\":\"收件人手机号\",\"type\":\"string\",\"value\":\"{receiver_mobile}\"}, {\"field\":\"remark\",\"label\":\"出库单备注\",\"type\":\"string\",\"value\":\"{remark}\"}, {\"field\":\"weight\",\"label\":\"实际称重重量(Kg)\",\"type\":\"string\",\"value\":\"{weight}\"}, {\"field\":\"operator_name\",\"label\":\"制单人\",\"type=\"string\",\"value=\"{operator_name}\"}, {\"field=\"goods_total_cost\",\"label=\"总成本\",\"type=\"string\",\"value=\"{goods_total_cost}\"}, {\"field=\"goods_total_amount\",\"label=\"总货款\",\” type \” : \” string \” , \” value \” : \” {goods_total_amount}\” }, { \" field \" : \" modified \", \" label \" : \" 最后修改时间 \", \" type \" : \" string \", \" value \" : \" {{modified|datetime}}\" }, { \" field \" : \" reason \", \" label \" : \" 出库原因 \", \" type \":\” string\”,\” value\”:“\u003creason\u003e\” }, { “ field ” : “ checked_goods_total_cost ” , “ label ” : “ 瞬时成本总额 ” , “ type ” : “ string ” , “ value ” : “ {checked_goods_total_cost}\u003c/td\u003e\u003c/tr\u003e\u003ctr\u003e\u003ctd class=\u0027td-1\u0027 style=\u0027padding-left: 10px;\u0027 valign=\u0027top\u0027 width=\u002725%\u0027\u003e物流公司编号:\u003c/td\u003e\u003ctd class=\u0027td-2\u0027 style=\u0027padding-left: 10px;\u0027 valign=\u0027top\u0027 width=\u002725%\u0027>\n{\" field\":\“ logistics_company_no\”,\“ label\":\“ 物流公司编号\”,“ type”:“ string”,“ value”:“\n{\" field\":\“ detail_list_rec_id\",“ label\":“ 出库单详情id\",“ type\":“ string\",“ value\":“\n{\" field\":\“ detail_list_stockout_id\",“ label\":“ 出库单id\",“ type\":“ string\",“ value\":“\n{\" field\":\n{\" field\":\n{\" field\": ``` 2. **构建SQL语句**: 在元数据配置中,`main_sql`字段定义了主要的SQL语句,用于将数据插入到目标表中。示例如下: ```sql REPLACE INTO wdt_wms_stockout_otherquery_querywithdetail ( stockout_id, order_no, src_order_no, warehouse_no, consign_time, order_type, status, goods_count, post_fee, logistics_no, receiver_name, receiver_province, receiver_city, receiver_district, receiver_address, receiver_mobile, remark, weight, operator_name, goods_total_cost, goods_total_amount, modified, reason, checked_goods_total_cost, logistics_company_no, detail_list_rec_id, detail_list_stockout_id, detail_list_goods_count, detail_list_total_amount, detail_list_expire_date, detail_list_remark, detail_list_brand_no, detail_list_brand_name, detail_list_goods_name, detail_list_goods_no, detail_list_spec_no, detail_list_spec_name, detail_list_spec_code, detail_list_defect, detail_list_cost_price, detail_list_weight, detail_list_goods_type,\n{\" field\":detail_list_unit_name,\n{\" field\":{\"detail_list_base_unit_id\",\n{\" field\":{\"detail_list_batch_no\",\n{\" field\":{\"detail_list_position_id\",\n{\" field\":{\"detail_list_position_no\"\n) VALUES ``` 3. **执行批量操作**: 利用`batchexecute` API接口,可以批量执行上述SQL语句,实现高效的数据插入操作。这个过程确保了大批量的数据能够快速且准确地写入到目标数据库中。 4. **实时监控与日志记录**: 在整个ETL过程中,实时监控和日志记录是确保数据准确性和及时发现问题的重要手段。通过轻易云提供的可视化界面,可以实时查看每一步操作的执行情况,并在出现异常时及时处理。 #### 总结 通过以上步骤,我们成功地将源平台的数据进行了ETL转换,并最终写入到了目标平台MySQL数据库中。这一过程不仅提高了数据处理的效率,还确保了数据的一致性和准确性。在实际应用中,灵活运用元数据配置和API接口,可以实现更加复杂的数据集成需求。 ![如何开发金蝶云星空API接口](https://pic.qeasy.cloud/T3.png~tplv-syqr462i7n-qeasy.image)