ETL与数据写入技术:从旺店通到金蝶云星空的实践

  • 轻易云集成顾问-曾平安
### 旺店通·企业奇门数据集成到金蝶云星空的解决方案分享:销售出库_copy 在现代化企业管理中,实现不同业务系统的数据无缝对接已成为提升运营效率和决策质量的关键环节。在本文中,我们将聚焦于一个实际运行的技术案例——如何通过轻易云数据集成平台,将旺店通·企业奇门的数据成功集成到金蝶云星空,并实现销售出库数据的高效传输与处理。 此次项目涉及的主要任务是利用API接口wdt.stockout.order.query.trade抓取旺店通中的销售出库订单数据,再通过批量写入API batchSave导入至金蝶云星空。整个过程中,确保了数据不漏单,大量订单能够快速且准确地写入目标系统,同时有效处理分页、限流及两种系统间的数据格式差异问题,并能做到实时监控和异常处理。 1. **确保集成旺店通·企业奇门数据不漏单** 为保证从旺店通获取的数据完整无遗漏,我们设计了一套定时可靠的数据抓取机制。每日定时调用wdt.stockout.order.query.trade接口,通过增量拉取方式获取新增或变更的订单记录。定期校验与对比,确保任何变动都能被及时捕获并同步至金蝶云星空。 2. **大量数据快速写入到金蝶云星空** 考虑到业务场景下可能存在的大规模订单需求,本方案采用分块批量写入的方法,通过batchSave API将从旺店通提取出的数据信息以较大吞吐量推送到金蝶云。这种方法不仅提高了整体处理速度,还减少了因网络波动造成失败重试频次,从而优化了性能表现。 3. **批量化与格式适配** 在实现批量操作同时,还必须应对两系统之间潜在的数据格式差异。例如,某些字段在定义上并不完全一致,为此需进行自定义映射转换,以保障每条记录都符合目标系统要求。此外,在调试过程中对于接口返回的信息亦进行细致分析,对可能出现的不符情况提供动态调整选项,提高兼容性和稳定性。 4. **分页、限流及异常处理机制** 在调用API时遇到了常见分页与限流限制,为应对此类挑战,方案引入自动分页读取并结合速率控制策略。在发生服务端响应错误情况下,可以启动错误重试逻辑,对特例事件作针对性修复,同时还设置有日志记录模式,方便后续审计以及问题诊断追踪工作开展。 本案例展示的是基于实际生产环境中的一系列具体挑战 ![钉钉与MES系统接口开发配置](https://pic.qeasy.cloud/D35.png~tplv-syqr462i7n-qeasy.image) ### 调用旺店通·企业奇门接口获取并加工数据的技术案例 在数据集成的生命周期中,调用源系统接口是关键的第一步。本文将详细探讨如何通过轻易云数据集成平台调用旺店通·企业奇门接口`wdt.stockout.order.query.trade`来获取并加工销售出库数据。 #### 接口调用配置 首先,我们需要配置元数据以便正确调用接口。以下是该接口的元数据配置: ```json { "api": "wdt.stockout.order.query.trade", "method": "POST", "number": "order_no", "id": "stockout_id", "pagination": { "pageSize": 100 }, "idCheck": true, "request": [ { "field": "start_time", "label": "开始时间", "type": "datetime", "describe": "增量获取数据,start_time作为开始时间,格式:yyyy-MM-dd HH:mm:ss", "value": "{{LAST_SYNC_TIME|datetime}}" }, { "field": "end_time", "label": "结束时间", "type": "datetime", "describe": "增量获取数据,end_time作为结束时间,格式:yyyy-MM-dd HH:mm:ss", "value": "{{CURRENT_TIME|datetime}}" }, { "field": "status", "label": "状态", "type": "string", "describe": { 5: '已取消', 55: '已审核', 95: '已发货', 105: '部分打款', 110: '已完成', 113: '异常发货' } }, { ... } ], ... } ``` #### 请求参数解析 1. **时间参数**: - `start_time` 和 `end_time` 用于指定增量获取数据的时间范围。通过模板变量 `{{LAST_SYNC_TIME|datetime}}` 和 `{{CURRENT_TIME|datetime}}` 动态生成。 2. **状态参数**: - `status` 字段用于过滤订单状态,如已取消、已审核等。根据业务需求选择合适的状态值。 3. **分页参数**: - `page_size` 和 `page_no` 用于控制分页请求,每次请求最多返回100条记录。 #### 数据请求与清洗 在调用接口后,我们会得到一个包含销售出库订单信息的数据集。接下来需要对这些数据进行清洗和转换,以便后续处理。 1. **清洗步骤**: - 去除无效或重复的数据。 - 格式化日期和数值字段,确保一致性。 2. **转换步骤**: - 将原始字段名映射到目标系统所需的字段名。 - 根据业务逻辑计算衍生字段,例如总金额、折扣等。 #### 示例代码 以下是一个示例代码片段,用于调用接口并处理返回的数据: ```python import requests import json from datetime import datetime # 配置请求头和URL url = 'https://api.wangdian.cn/openapi2/stockout_order_query_trade.php' headers = {'Content-Type': 'application/json'} # 设置请求参数 params = { 'start_time': datetime.strftime(LAST_SYNC_TIME, '%Y-%m-%d %H:%M:%S'), 'end_time': datetime.strftime(CURRENT_TIME, '%Y-%m-%d %H:%M:%S'), 'status': '55', # 已审核 'page_size': 100, 'page_no': 0 } # 发起POST请求 response = requests.post(url, headers=headers, data=json.dumps(params)) data = response.json() # 数据清洗与转换 cleaned_data = [] for record in data['orders']: cleaned_record = { 'order_id': record['stockout_id'], 'order_number': record['order_no'], ... } cleaned_data.append(cleaned_record) # 输出清洗后的数据 print(cleaned_data) ``` #### 注意事项 - **错误处理**:在实际应用中,需要添加错误处理机制,例如捕获网络异常、API返回错误等。 - **性能优化**:对于大批量数据,可以采用多线程或异步请求方式提高效率。 - **安全性**:确保API密钥和其他敏感信息不被泄露,使用加密传输。 通过上述步骤,我们可以高效地从旺店通·企业奇门接口获取销售出库数据,并进行必要的清洗和转换,为后续的数据处理奠定基础。 ![钉钉与ERP系统接口开发配置](https://pic.qeasy.cloud/S2.png~tplv-syqr462i7n-qeasy.image) ### 使用轻易云数据集成平台进行ETL转换并写入金蝶云星空API接口的技术案例 在数据集成的生命周期中,第二步是将已集成的源平台数据进行ETL转换,并将其转为目标平台——金蝶云星空API接口所能接收的格式。本文将详细探讨如何利用轻易云数据集成平台配置元数据,实现这一过程。 #### API接口配置 我们使用金蝶云星空的`batchSave` API接口,通过POST方法提交数据。为了确保数据准确性,我们启用了ID检查功能(`idCheck: true`),并通过批量保存方法(`batchArraySave`)处理每次10条记录。 ```json { "api": "batchSave", "method": "POST", "idCheck": true, "operation": { "rowsKey": "array", "rows": 10, "method": "batchArraySave" } } ``` #### 请求参数配置 请求参数包含多个字段,每个字段都有特定的类型、描述和解析器。以下是关键字段及其配置: 1. **单据类型(FBillTypeID)**: - 类型:字符串 - 值:固定值 `XSCKD07_SYS` - 解析器:`ConvertObjectParser`,参数为 `FNumber` 2. **单据编号(FBillNo)**: - 类型:字符串 - 值:动态值 `{order_no}` 3. **日期(FDate)**: - 类型:字符串 - 值:通过模板引擎转换 `{{consign_time|date}}` 4. **发货组织(FStockOrgId)** 和 **销售组织(FSaleOrgId)**: - 类型:字符串 - 解析器:`ConvertObjectParser` - 动态值 `_findCollection find email from ... where shop_no={shop_no}` 5. **客户(FCustomerID)**: - 类型:字符串 - 动态值 `{shop_no}` - 解析器:`ConvertObjectParser` 6. **运输单号(FCarriageNO)**、**联系电话(FLinkPhone)**、**收货人姓名(FLinkMan)**、**备注(FNote)** 等字段均为文本类型,直接从源数据中获取相应值。 ```json [ { "field": "FBillTypeID", "label": "单据类型", "type": "string", "describe": "单据类型", "parser": { "name": "ConvertObjectParser", "params": "FNumber" }, "value": "XSCKD07_SYS" }, { ... } ] ``` #### 子实体和嵌套对象处理 对于复杂的数据结构,如财务信息和明细信息,我们使用嵌套对象和数组来表示: 1. **财务信息(SubHeadEntity)**: 包含结算组织和结算币别两个子字段。 2. **明细信息(FEntity)**: 包含物料编码、实发数量、含税单价等多个子字段。 这些子字段同样需要通过解析器进行转换,以确保符合金蝶云星空的要求。 ```json { ... { "field": "SubHeadEntity", "label": "财务信息", "type": "object", ... }, { ... } } ``` #### 特殊字段处理 某些字段需要特殊处理,例如: 1. **是否赠品(FIsFree)**: 使用条件判断 `_function case '{{details_list.paid}}' when '0.0000' then true else false end` 2. **原始订单号截取(F_JSJT_Text_YSDDH)**: 使用字符串截取函数 `_function left("{src_trade_no}",50)` ```json { ... { ... { "field": "FIsFree", ... "_function case '{{details_list.paid}}' when '0.0000' then true else false end" }, ... } } ``` #### 最终请求示例 最终生成的请求示例如下: ```json { ... { ... { ... { ... "_function left(\"{src_trade_no}\",50)" }, ... } ... [ { ... "_function round((({{details_list.paid}}+{{details_list.share_post}})/{{details_list.goods_count}}),6)" }, ... ] ... "_findCollection find email from ... where shop_no={shop_no}" ... } ``` 通过上述配置,我们能够将源平台的数据成功转换为金蝶云星空API接口所能接收的格式,并实现数据的无缝写入。这不仅提高了数据处理效率,还确保了数据的一致性和准确性。 ![电商OMS与WMS系统接口开发配置](https://pic.qeasy.cloud/T27.png~tplv-syqr462i7n-qeasy.image)