使用轻易云完成数据ETL转换并写入金蝶云星空API接口

  • 轻易云集成顾问-曾平安
### 旺店通·企业奇门数据集成到金蝶云星空的技术实现 在实际业务场景中,实现旺店通·企业奇门至金蝶云星空的数据无缝对接,确保数据精准、高效传输是一个高难度的技术挑战。本次分享将聚焦于如何通过轻易云实现这一目标,尤其关注“其他入库同步6_批号联查a”任务。 首先,我们需要解决多个关键问题: 一是**确保集成过程中不漏单**。为此,通过调用旺店通·企业奇门接口`wdt.stockin.order.query`来获取所有待处理的入库订单,并设计了一套严密的校验机制,以防止数据丢失。例如,当API调用返回分页结果时,我们会逐页抓取并存储到临时缓存中,直到确认所有记录均已成功读取。 二是**大量数据快速写入到金蝶云星空**。我们采用了分批次、异步写入的方法,通过调用金蝶云星空提供的`batchSave`接口,将整理好的数据以高效方式导入系统。每次写入前先进行合法性和关联性校验,以确保输入的数据格式与目标系统一致。 然后,在面对**接口限流和分页问题处理上**,我们采取了多线程并发策略以及适当的请求间隔控制。一旦检测到请求频率达到接口设定上限,会自动调整请求速率,并重试未完成部分,从而避免因瞬时流量过大导致的问题。 最后,对于两者之间复杂且多变的数据格式差异,我们通过自定义映射规则进行转换。在这过程中,不仅考虑字段类型的一致性,还要规范化日期格式、数值精度等细节内容,同时保证所需附加信息完整准确地传递给目标端。 在整个流程中,为实时监控和日志记录提供支持,这样可以迅速定位数据异常点并实施错误重试机制。结合这些措施,使得本案例中的“其他入库同步6_批号联查a”任务能稳定运行,并具备较高执行效率和安全可靠性。 以上即是该实战案例初步解析,下文将展开具体实施步骤及产出效果。 ![系统集成平台API接口配置](https://pic.qeasy.cloud/D9.png~tplv-syqr462i7n-qeasy.image) ### 调用旺店通·企业奇门接口wdt.stockin.order.query获取并加工数据的技术案例 在数据集成生命周期的第一步,调用源系统接口获取数据是至关重要的一环。本文将深入探讨如何通过轻易云数据集成平台调用旺店通·企业奇门接口`wdt.stockin.order.query`来获取并加工数据。 #### 接口调用配置 首先,我们需要配置API接口的元数据,以便正确地发起请求。以下是该接口的元数据配置: ```json { "api": "wdt.stockin.order.query", "method": "POST", "number": "order_no", "id": "stockin_id", "pagination": { "pageSize": 50 }, "condition_bk": [[]], "request": [ {"field":"start_time","label":"开始时间","type":"string","value":"{{LAST_SYNC_TIME|datetime}}"}, {"field":"end_time","label":"结束时间","type":"string","value":"{{CURRENT_TIME|datetime}}"}, {"field":"order_type","label":"源单据类别","type":"string","value":"6"}, {"field":"status","label":"入库单状态","type":"string"}, {"field":"warehouse_no","label":"仓库编号","type":"string"}, {"field":"src_order_no","label":"上层单据编号","type":"string"}, {"field":"stockin_no","label":"入库单号","type":"string"} ], "otherRequest": [ {"field":"page_size","label":"分页大小","type":"string","value":"{PAGINATION_PAGE_SIZE}"}, {"field":"page_no","label":"页号","type":"string","value":"{PAGINATION_START_PAGE}"} ] } ``` #### 请求参数解析 1. **时间参数**: - `start_time` 和 `end_time` 分别表示查询的起始和结束时间,使用模板变量`{{LAST_SYNC_TIME|datetime}}`和`{{CURRENT_TIME|datetime}}`来动态生成。 2. **单据类别**: - `order_type` 固定为"6",表示特定类型的入库单。 3. **其他可选参数**: - `status`, `warehouse_no`, `src_order_no`, `stockin_no`等字段用于进一步过滤查询结果。 4. **分页参数**: - `page_size` 和 `page_no` 用于控制分页查询,每次请求返回50条记录。 #### 数据请求与清洗 在发起API请求后,接收到的数据通常需要进行清洗和预处理,以确保其符合目标系统的要求。以下是一个简单的数据清洗流程: 1. **字段映射与转换**: - 将源系统返回的数据字段映射到目标系统所需的字段。例如,将源系统的`order_no`映射为目标系统中的订单编号。 2. **数据格式转换**: - 确保日期、数值等字段格式符合目标系统要求。例如,将日期字符串转换为标准的ISO格式。 3. **异常处理**: - 对于缺失或异常值进行处理,确保数据完整性。例如,对于缺失的仓库编号,可以设置默认值或标记为待处理。 #### 示例代码 以下是一个示例代码片段,展示如何通过轻易云平台调用该接口并进行简单的数据清洗: ```python import requests import json from datetime import datetime # 配置API请求参数 url = 'https://api.wangdian.cn/openapi2/wdt.stockin.order.query' headers = {'Content-Type': 'application/json'} params = { 'start_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), 'end_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), 'order_type': '6', 'page_size': 50, 'page_no': 1 } # 发起POST请求 response = requests.post(url, headers=headers, data=json.dumps(params)) data = response.json() # 数据清洗与转换 cleaned_data = [] for record in data['orders']: cleaned_record = { '订单编号': record['order_no'], '入库单ID': record['stockin_id'], # 添加更多字段映射与转换逻辑 } cleaned_data.append(cleaned_record) # 输出清洗后的数据 print(json.dumps(cleaned_data, ensure_ascii=False, indent=4)) ``` 通过上述步骤,我们可以成功地从旺店通·企业奇门接口获取并加工所需的数据,为后续的数据转换与写入做好准备。这一过程不仅提高了数据集成的效率,也确保了数据质量和一致性。 ![金蝶与MES系统接口开发配置](https://pic.qeasy.cloud/S4.png~tplv-syqr462i7n-qeasy.image) ### 使用轻易云数据集成平台进行ETL转换并写入金蝶云星空API接口 在数据集成的生命周期中,第二步是将已经集成的源平台数据进行ETL转换,并转为目标平台能够接收的格式。本文将详细探讨如何使用轻易云数据集成平台,将源数据转换为金蝶云星空API接口所需的格式,并最终写入目标平台。 #### API接口配置与元数据解析 在本案例中,我们使用了金蝶云星空的`batchSave` API接口,该接口采用POST方法进行数据传输。以下是具体的元数据配置: ```json { "api": "batchSave", "method": "POST", "idCheck": true, "operation": { "rowsKey": "array", "rows": 1, "method": "batchArraySave" }, "request": [ {"field":"FBillNo","label":"单据编号","type":"string","value":"{stockin_no}"}, {"field":"FBillTypeID","label":"单据类型","type":"string","parser":{"name":"ConvertObjectParser","params":"FNumber"},"value":"QTRKD01_SYS"}, {"field":"FStockOrgId","label":"库存组织","type":"string","parser":{"name":"ConvertObjectParser","params":"FNumber"},"value":"100"}, {"field":"FDate","label":"日期","type":"string","value":"{stockin_time}"}, {"field":"FSUPPLIERID","label":"供应商","type":"string","parser":{"name":"ConvertObjectParser","params":"FNumber"}}, {"field":"FDEPTID","label":"部门","type":"string","parser":{"name":"ConvertObjectParser","params":"FNumber"},"value":"BM003"}, {"field":"FNOTE","label":"备注","type":"string","value":"{stockin_no} {remark}"}, { "field": "FEntity", "label": "明细信息", "type": "array", "children": [ {"field": "FMATERIALID", "label": "物料编码", "type": "string", "parser":{"name": "ConvertObjectParser", "params": "FNumber"}, "value": "{{details_list.goods_no}}", "parent": "FEntity"}, {"field": "FCMKBarCode", "label": "零售条形码", "type": "string", "parent": "FEntity"}, {"field": "FSTOCKID", "label": "收货仓库", "type": "string", "parser":{"name": "ConvertObjectParser", "params": "FNumber"}, "value": "{warehouse_no}", "parent": "FEntity"}, {"field": "FQty", "label": "实收数量", "type": "string", "value": "{{details_list.goods_count}}", "parent": "FEntity"}, { "field": "FLOT", "label": "批号", "type": "string", "_function CASE '_findCollection find FIsBatchManage from 4ae4b66a-79ab-3a6d-8f33-723f326a42d0 where FNumber={{details_list.goods_no}} _endFind' WHEN 'false' THEN '' ELSE '{{details_list.batch_no}}' END", "parent": { parser: { name: ConvertObjectParser, params: FNumber } } }, { field: FEntryNote, label: 备注, type: string, parent: FEntity }, { field: FPrice, label: 成本价, type: string, parent: FEntity } ], value: details_list } ], otherRequest:[ { field: FormId, label: 业务对象表单Id, type: string, value: STK_MISCELLANEOUS }, { field: IsVerifyBaseDataField, label: 验证基础资料, type: bool, value: true }, { field: Operation, label: 执行的操作, type: string, value: Save }, { field:IsAutoSubmitAndAudit,label:"提交并审核",type:"bool",value:true} ] } ``` #### 数据请求与清洗 在ETL过程中,首先需要从源系统请求原始数据,并对其进行清洗。清洗过程包括去除无效数据、标准化字段格式等操作。例如,从源系统获取到的库存入库单号、日期、供应商信息等都需要经过相应的处理和验证,以确保其符合目标系统的要求。 #### 数据转换与写入 1. **字段映射与转换**: - `FBillNo`:将源系统中的库存入库单号映射到目标系统中的单据编号。 - `FBillTypeID`:固定值为`QTRKD01_SYS`,通过`ConvertObjectParser`解析器进行对象转换。 - `FStockOrgId`:固定值为`100`,同样通过解析器进行对象转换。 - `FSUPPLIERID`:供应商信息通过解析器进行对象转换。 - `FDEPTID`:固定值为`BM003`,通过解析器进行对象转换。 2. **数组字段处理**: - `details_list`:包含多个物料明细信息,需要逐条处理。每条明细包括物料编码、零售条形码、收货仓库、实收数量、批号等字段。 - 批号字段(`FLOT`)需要根据条件判断是否填写,如果物料不需要批次管理,则为空,否则填写对应批号。 3. **其他请求参数**: - `FormId`: 固定值为`STK_MISCELLANEOUS`, 表示业务对象表单Id。 - `IsVerifyBaseDataField`: 设置为true,表示验证基础资料。 - `Operation`: 固定值为`Save`, 表示执行保存操作。 - `IsAutoSubmitAndAudit`: 设置为true,表示自动提交并审核。 #### 实际应用案例 在实际应用中,通过轻易云数据集成平台,我们可以轻松配置上述元数据,并将其应用于具体的数据集成任务中。例如,在处理某一批次库存入库时,我们可以通过以下步骤实现: 1. **配置元数据**:根据上述配置文件,在轻易云平台上创建相应的数据集成任务。 2. **请求原始数据**:从源系统请求库存入库相关的数据,并进行初步清洗和验证。 3. **执行ETL转换**:利用配置好的元数据,将原始数据转换为金蝶云星空API接口所需的格式。 4. **写入目标系统**:调用金蝶云星空API接口,将转换后的数据写入目标系统,实现无缝对接。 通过这种方式,可以大大简化复杂的数据集成过程,提高工作效率和准确性。 ![钉钉与MES系统接口开发配置](https://pic.qeasy.cloud/T5.png~tplv-syqr462i7n-qeasy.image)