使用轻易云平台的ETL转换与数据写入:从旺店通到金蝶云星空

  • 轻易云集成顾问-蔡威
### 案例分享:旺店通·企业奇门数据集成到金蝶云星空 在现代商业环境中,高效的数据集成是业务成功的关键。今天,我们将探讨一个实际运行的系统对接案例:通过轻易云数据集成平台,将旺店通·企业奇门的数据无缝同步到金蝶云星空。本次方案实施名称为“旺店通其他入库委外同步”,旨在实现两大系统间顺畅而高效的数据流动。 #### 确保不漏单与快速写入 首先,确保数据从旺店通·企业奇门迁移至金蝶云星空过程中没有遗漏,是本次集成的重要目标之一。我们利用wdt.stockin.order.query接口定时抓取订单数据,并通过精确的调度和监控机制确保每一笔订单都能被准确获取。 紧接着,在大量数据需要批量写入到金蝶云星空时,我们调用了其batchSave API。这使得大量交易记录能够以极高的效率导入目标系统,不仅提高了写入速度,还极大减少了可能出现的网络延迟以及请求超时时间。 #### 技术挑战及解决方案 一个显著的问题是在处理分页和限流问题上。由于接口限制,每次查询只能返回固定数量的数据。当面对海量订单信息时,通过迭代分页拉取所有待处理记录成为必然选项。此外,为避免过多请求导致限流问题,我们采用适当的策略进行速率控制,从容应对巨大数据量带来的压力。 其次,在实际操作过程中,常会碰见源系统与目标系统之间格式差异较大的情况。在这个项目中,我们设立了一套完善的数据映射规则,保证了从获取、转换直到最终写入,每个步骤都有条不紊地执行下去。而这背后的核心正是灵活可配置且透明化强大的映射管理后台,让我们可以精准定义并转换每一个字段,以符合业务需求。 #### 异常处理与实时监控 不可忽视的是异常处理能力。在整个迁移过程中,如果遇到失败或错误情况,本次方案中的重试机制即时启动,保障任务自动恢复继续执行。同时,通过全生命周期管理平台提供详细日志记录和实时监控界面,我们不仅可以清楚掌握当前进展状态,还能迅速定位并解决潜在问题。这种全面而可靠的日志功能无疑为整体流程保驾护航。 以上便是此次技术案例文章开头部分简要介绍,希望这些详细说明能为您提供有效且有用的信息。随后的章节将深入剖析具体技术细节以及实际操作步骤,请持续关注后续内容更新。 ![钉钉与WMS系统接口开发配置](https://pic.qeasy.cloud/D34.png~tplv-syqr462i7n-qeasy.image) ### 调用源系统旺店通·企业奇门接口wdt.stockin.order.query获取并加工数据 在数据集成的生命周期中,调用源系统接口是至关重要的一步。本文将深入探讨如何通过轻易云数据集成平台调用旺店通·企业奇门接口`wdt.stockin.order.query`来获取并加工数据。 #### 接口配置与请求参数 首先,我们需要配置元数据,以确保能够正确地调用API并获取所需的数据。以下是元数据配置的详细内容: ```json { "api": "wdt.stockin.order.query", "method": "POST", "number": "order_no", "id": "stockin_id", "pagination": { "pageSize": 50 }, "idCheck": true, "request": [ {"field":"start_time","label":"开始时间","type":"string","value":"{{LAST_SYNC_TIME|datetime}}"}, {"field":"end_time","label":"结束时间","type":"string","value":"{{CURRENT_TIME|datetime}}"}, {"field":"order_type","label":"源单据类别","type":"string","value":"12"}, {"field":"status","label":"入库单状态","type":"string"}, {"field":"warehouse_no","label":"仓库编号","type":"string"}, {"field":"src_order_no","label":"上层单据编号","type":"string"}, {"field":"stockin_no","label":"入库单号","type":"string"} ], "otherRequest": [ {"field":"page_size","label":"分页大小","type":"string","value":"{PAGINATION_PAGE_SIZE}"}, {"field":"page_no","label":"页号","type":"string","value":"{PAGINATION_START_PAGE}"} ], "condition_bk": [ [{"field": "operator_name", "logic": "neq", "value": "外部接口"}, {"field": "remark", "logic": "like", "value": "106"}] ], "condition": [ [{"field": "operator_name", "logic": "neq", "value": "外部接口"}, {"field": "remark", "logic": "like", "value": "106"}] ] } ``` #### 请求参数解析 1. **时间参数**:`start_time`和`end_time`分别表示查询的起始和结束时间,使用动态变量`{{LAST_SYNC_TIME|datetime}}`和`{{CURRENT_TIME|datetime}}`来填充。 2. **单据类别**:通过设置`order_type`为"12",我们指定了要查询的源单据类别。 3. **其他过滤条件**:包括入库单状态、仓库编号、上层单据编号和入库单号等,这些字段可以根据实际需求进行调整。 4. **分页参数**:为了处理大批量数据,设置了分页大小`page_size`为50,并通过动态变量控制页号。 #### 数据请求与清洗 在发送请求时,轻易云平台会根据上述配置生成相应的HTTP POST请求,并将响应的数据进行初步清洗。以下是一个示例请求体: ```json { "start_time": "{{LAST_SYNC_TIME|datetime}}", "end_time": "{{CURRENT_TIME|datetime}}", "order_type": "12", ... } ``` 响应的数据将包含多个入库订单的信息,我们需要对这些数据进行进一步处理。例如,可以通过过滤条件排除不必要的数据: ```json { "$and":[ {"operator_name":{"$ne" : "外部接口"}}, {"remark":{"$regex" : /106/}} ] } ``` #### 数据转换与写入 在完成初步清洗后,下一步是将数据转换为目标系统所需的格式,并写入目标数据库或系统。在这个过程中,可以使用轻易云平台提供的各种转换工具,如字段映射、数据类型转换等。 例如,将响应中的字段映射到目标系统所需的字段: ```json { "_id" : "$stockin_id", ... } ``` #### 实时监控与调试 轻易云平台提供了实时监控功能,可以随时查看数据流动和处理状态。如果在调用API或处理数据时遇到问题,可以利用平台提供的日志和调试工具进行排查和解决。 通过以上步骤,我们实现了从旺店通·企业奇门接口获取并加工数据的全过程。这不仅提高了数据处理效率,还确保了数据的一致性和准确性。 ![数据集成平台可视化配置API接口](https://pic.qeasy.cloud/S10.png~tplv-syqr462i7n-qeasy.image) ### 使用轻易云数据集成平台进行ETL转换并写入金蝶云星空API接口 在数据集成的生命周期中,第二步是将已经集成的源平台数据进行ETL转换,并将其转为目标平台所能够接收的格式。本文将详细探讨如何使用轻易云数据集成平台,将旺店通的其他入库委外同步数据转换为金蝶云星空API接口所需的格式,并最终写入目标平台。 #### 元数据配置解析 首先,我们需要理解元数据配置,以便正确地进行ETL转换。以下是我们要处理的元数据配置: ```json { "api": "batchSave", "method": "POST", "idCheck": true, "operation": { "rowsKey": "array", "rows": 1, "method": "batchArraySave" }, "request": [ {"field": "FBillNo", "label": "单据编号", "type": "string", "value": "{stockin_no}-TC"}, {"field": "FBillTypeID", "label": "单据类型", "type": "string", "parser": {"name": "ConvertObjectParser", "params":"FNumber"}, "value":"QTRKD01_SYS"}, {"field": "FStockOrgId", "label":"库存组织", "type":"string", "parser":{"name":"ConvertObjectParser","params":"FNumber"}, "value":"106"}, {"field":"FDate","label":"日期","type":"string","value":"{stockin_time}"}, {"field":"FSUPPLIERID","label":"供应商","type":"string", "parser":{"name":"ConvertObjectParser","params":"FNumber"}}, {"field":"FDEPTID","label":"部门","type":"string", "parser":{"name":"ConvertObjectParser","params":"FNumber"}, "value":"BM000002"}, {"field":"FNOTE","label":"备注","type":"string","value":"{remark}"}, {"field":"FEntity","label":"明细信息","type":"array", "children":[ {"field":"FMATERIALID","label":"物料编码","type":"string", "parser":{"name":"ConvertObjectParser","params":"FNumber"}, "value":"{{details_list.spec_no}}", "parent":""}, {"field":"FCMKBarCode","label":"零售条形码","type":"","parent":""}, {"field":"","label":"","type":"","parser":{"name":"","params":""}, "" : ""}, {"field":"","label":"","type":"","parser":{"name":"","params":""}, "" : ""} ],"value" : ""} ], ... } ``` #### 数据请求与清洗 在这个阶段,我们需要从源平台(旺店通)获取原始数据,并对其进行清洗。清洗过程包括去除无效或重复的数据,确保所有字段都符合目标平台(金蝶云星空)的要求。 #### 数据转换与写入 接下来,我们进入ETL生命周期的第二步:数据转换与写入。以下是具体步骤: 1. **字段映射和转换**: - `FBillNo`:从`{stockin_no}`字段获取,并附加后缀`-TC`。 - `FBillTypeID`:固定值`QTRKD01_SYS`,通过`ConvertObjectParser`解析为目标系统所需格式。 - `FStockOrgId`:固定值`106`,同样通过`ConvertObjectParser`解析。 - `FDate`:直接从源字段`{stockin_time}`获取。 - `FSUPPLIERID`、`FDEPTID`等字段需要通过解析器进行格式转换。 2. **嵌套数组处理**: - `FEntity`字段包含明细信息,是一个数组结构。每个子项如物料编码(FMATERIALID)、零售条形码(FCMKBarCode)、收货仓库(FSTOCKID)等都需要分别处理和映射。 3. **API请求构建**: - 根据元数据配置构建POST请求,确保所有字段都符合金蝶云星空API接口的要求。 4. **发送请求并处理响应**: - 使用HTTP POST方法将构建好的请求发送到金蝶云星空API接口。如果配置中启用了自动提交和审核功能(IsAutoSubmitAndAudit),则会自动完成这些操作。 以下是一个示例代码片段,用于构建和发送API请求: ```python import requests import json # 构建请求头 headers = { 'Content-Type': 'application/json', } # 构建请求体 payload = { 'FormId': 'STK_MISCELLANEOUS', 'IsVerifyBaseDataField': True, 'Operation': 'Save', 'IsAutoSubmitAndAudit': True, 'Model': { 'FBillNo': f"{source_data['stockin_no']}-TC", 'FBillTypeID': {'FNumber': 'QTRKD01_SYS'}, 'FStockOrgId': {'FNumber': '106'}, 'FDate': source_data['stockin_time'], # 其他字段依次添加... 'FEntity': [ { 'FMATERIALID': {'FNumber': detail['spec_no']}, # 其他子项依次添加... } for detail in source_data['details_list'] ] } } # 发送POST请求 response = requests.post('https://api.kingdee.com/batchSave', headers=headers, data=json.dumps(payload)) # 检查响应状态 if response.status_code == 200: print('Data successfully written to Kingdee Cloud.') else: print(f'Failed to write data: {response.text}') ``` 通过以上步骤,我们可以确保源平台的数据被成功转换并写入到金蝶云星空系统中。这种方法不仅提高了数据处理效率,还保证了不同系统间的数据一致性和完整性。 ![钉钉与MES系统接口开发配置](https://pic.qeasy.cloud/T4.png~tplv-syqr462i7n-qeasy.image)