轻易云平台下的数据清洗与转换实战

  • 轻易云集成顾问-陈洁琳
### 案例分享:金蝶云星空数据集成到旺店通·企业奇门 在本次技术案例中,将详细介绍如何通过轻易云数据集成平台,实现金蝶直接调拨单数据对接到旺店通的其他入库,并进行后补批号操作。具体方案名称为“金蝶直接调拨单对接旺店通其他入库_后补批号ZJDB”。 #### 金蝶云星空API——executeBillQuery的使用 首先,利用金蝶提供的`executeBillQuery` API接口抓取调拨单的数据。为了确保数据不漏单,我们采取了定时可靠的抓取机制,通过配置周期性任务,在每个预设时间点自动调用该接口,获取最新的数据。同时,为解决分页和限流问题,每次请求都带有适当的分页参数和速率限制控制,以避免因超量请求导致接口响应失败。 #### 数据格式转换与映射 由于金蝶云星空和旺店通·企业奇门之间存在明显的数据格式差异,我们使用了自定义映射规则来转换数据。例如,从金蝶调拨单导出的字段需要经过特定处理,才能适配到旺店通所需的字段规范。在实际操作中,这部分工作主要是通过平台内置的数据清洗与转化工具完成,使得上游系统输出的数据能够无缝对接至下游系统。 #### 批量快速写入至旺店通·企业奇门 成功获取并正确转换后的数据,需要快速、稳定地写入至目标系统——即调用`wdt.vip.wms.stockinout.order.push` API。为此,我们设计了高效、批量处理机制,大幅提升了大规模数据传输过程中的效率。这不仅保障了整体流程的一致性,还有效缩短了从源头到终端应用所需的时间。 #### 异常处理及重试机制 在整个对接过程中,不可避免会遇到各种异常状况,比如网络故障或临时性API失效等。因此,本方案中特别强调错误捕获与重试机制。一旦某一批次操作出现问题,系统能迅速记录日志并触发相应策略重新执行,使得最终结果达到预期,而不会遗漏任何重要信息。 以上章节简要描述了从基础概念到关键步骤的一整套实现逻辑,下文将进一步探讨具体实现细节和优化措施。 ![数据集成平台API接口配置](https://pic.qeasy.cloud/D6.png~tplv-syqr462i7n-qeasy.image) ### 调用金蝶云星空接口executeBillQuery获取并加工数据的技术实现 在数据集成的生命周期中,调用源系统接口获取数据是至关重要的一步。本文将详细探讨如何通过轻易云数据集成平台调用金蝶云星空的`executeBillQuery`接口,以获取并加工调拨单数据。 #### 接口配置与请求参数 首先,我们需要配置调用金蝶云星空接口的元数据。以下是关键的元数据配置项: ```json { "api": "executeBillQuery", "method": "POST", "number": "FBillNo", "id": "FBillEntry_FEntryID", "pagination": { "pageSize": 100 }, "idCheck": true, "request": [ {"field":"FBillEntry_FEntryID","label":"FEntryID","type":"string","value":"FBillEntry_FEntryID"}, {"field":"FID","label":"实体主键","type":"string","value":"FID"}, {"field":"FBillNo","label":"单据编号","type":"string","value":"FBillNo"}, {"field":"FDocumentStatus","label":"单据状态","type":"string","value":"FDocumentStatus"}, {"field":"FStockOrgId_FNumber","label":"调入库存组织","type":"string","value":"FStockOrgId.FNumber"}, {"field":"FDate","label":"日期","type":"string","value":"FDate"}, {"field":"FBillTypeID_FNumber","label":"单据类型","type":"string","value":"FBillTypeID.FNumber"}, {"field":"FTransferBizType","label":"调拨类型","type":"string","value":"FTransferBizType"}, {"field":"FStockOutOrgId_FNumber","label":"调出库存组织","type":"string","value":"FStockOutOrgId.FNumber"}, {"field":"FTransferDirect","label":"调拨方向","type":"string","value":"FTransferDirect"}, {"field":"FNote","label":"备注","type":"","value":""}, // 更多字段配置... ], "otherRequest": [ {"field": "Limit", "label": "最大行数", "type": "string", "describe": "金蝶的查询分页参数", "value": "{PAGINATION_PAGE_SIZE}"}, {"field": "FieldKeys", "label": "需查询的字段key集合", "type": "array", "describe": "", "parser":{"name": "ArrayToString", "params": ","}}, {"field": "TopRowCount", "label": "", "type": "", "describe":"","value":"", "parser":{"name":"","params":[]}}, // 更多其他请求配置... ] } ``` #### 请求构建与发送 根据上述元数据配置,我们需要构建一个POST请求来调用`executeBillQuery`接口。以下是一个示例请求体: ```json { "FormId": "STK_TransferDirect", "FieldKeys": [ // 字段列表 ... ], // 分页参数 ... } ``` 在实际操作中,我们会利用轻易云平台提供的可视化界面来填充这些参数,并自动生成请求体。 #### 数据清洗与转换 获取到原始数据后,下一步是对数据进行清洗和转换。这一步骤包括但不限于: 1. **字段映射**:将金蝶返回的数据字段映射到目标系统所需的字段。 2. **格式转换**:例如,将日期格式从YYYY-MM-DD转换为目标系统所需的格式。 3. **数据过滤**:根据业务需求过滤掉不必要的数据记录。 以下是一个简单的数据清洗示例: ```python def clean_data(raw_data): cleaned_data = [] for record in raw_data: cleaned_record = { 'entry_id': record['FBillEntry_FEntryID'], 'bill_no': record['FBillNo'], 'status': record['FDocumentStatus'], 'stock_in_org': record['FStockOrgId_FNumber'], 'date': convert_date_format(record['FDate']), # 更多字段处理... } cleaned_data.append(cleaned_record) return cleaned_data def convert_date_format(date_str): # 假设原始日期格式为 YYYY-MM-DD,需要转换为 DD/MM/YYYY parts = date_str.split('-') return f"{parts[2]}/{parts[1]}/{parts[0]}" ``` #### 数据写入目标系统 最后一步是将清洗后的数据写入目标系统。这通常涉及调用目标系统的API接口,确保数据能够无缝对接。例如: ```python def write_to_target_system(cleaned_data): for record in cleaned_data: response = requests.post('TARGET_SYSTEM_API_URL', json=record) if response.status_code != 200: log_error(response.text) ``` 通过上述步骤,我们可以高效地实现从金蝶云星空获取调拨单数据,并将其加工后写入目标系统。这不仅提高了数据处理效率,还确保了数据的一致性和准确性。 ![系统集成平台API接口配置](https://pic.qeasy.cloud/S24.png~tplv-syqr462i7n-qeasy.image) ### 使用轻易云数据集成平台实现金蝶直接调拨单对接旺店通其他入库_后补批号ZJDB 在数据集成生命周期的第二步,我们将已经集成的源平台数据进行ETL转换,转为目标平台旺店通·企业奇门API接口所能够接收的格式,并最终写入目标平台。以下是详细的技术实现过程。 #### 元数据配置解析 在本案例中,我们使用了如下元数据配置: ```json { "api": "wdt.vip.wms.stockinout.order.push", "method": "POST", "idCheck": true, "operation": { "method": "merge", "field": "FBillNo,FMaterialId_FNumber,FDestStockId_FNumber,FDestLot", "bodyName": "spec_list", "bodySum": ["FQty"], "header": ["FBillNo"], "body": ["FMaterialId_FNumber", "FDestStockId_FNumber", "FQty", "FDestLot"] }, "request": [ {"field":"api_outer_no","label":"接口外部单号","type":"string","describe":"调用本接口时推送的唯一单据编号,避免重复推送数据","value":"{FBillNo}-{{spec_list.FMaterialId_FNumber}}-{{spec_list.FDestStockId_FNumber}}-{{spec_list.FDestLot}}"}, {"field":"warehouse_no","label":"仓库编号","type":"string","describe":"代表仓库所有属性的唯一编码,用于仓库区分,ERP内支持自定义(ERP仓库界面设置);本接口仓库编号对应的仓库类型必须为委外仓类型。【例如: 3京东仓储 4科捷 5百世物流 6 SKU360 7通天晓 8中联网仓 9顺丰仓储 10网仓2号 11奇门仓储 12旺店通仓储 13心怡仓储 14力威仓储 15京东沧海 16云集仓储 )】","value":"{{spec_list.FDestStockId_FNumber}}"}, {"field":"order_type","label":"出入类型","type":"string","describe":"可选值:1(出库);2(入库)","value":"2"}, {"field":"logistics_fee","label":"邮资","type":"string","describe":"物流或者快递运输货物产生的费用,默认为0"}, {"field":"other_fee","label":"其他费用","type":"string","describe":"其他费用,默认为0"}, {"field":"province","label":"省","type":"string","describe":"省份名称,直辖市注意输入值为“北京”不是“北京市”,“北京市”在city参数内输入"}, {"field":"city","label":"市","type":"string","describe":"市名称"}, {"field":"district","label":"县(区)","type":"string","describe":"区县名称"}, {"field":"address","label":"详细地址","type":"string","describe":"地址详情描述 ,例如 xx街道XX小区xx号楼xx单元401"}, {"field":"contact","label":"联系人","type":"","describe":"","value":"","required":true}, {"field":""}, {"field":"","label":"","type":"","describe":"","value":"","required":true}, {"field":"","label":"","type":"","describe":"","value":"","required":true}, ``` #### 数据转换与写入步骤 1. **数据合并与清洗**: - 首先,我们需要将金蝶直接调拨单的数据进行合并和清洗。根据元数据配置中的`operation`字段,我们将`FBillNo`, `FMaterialId_FNumber`, `FDestStockId_FNumber`, `FDestLot`四个字段作为合并依据,并对`FQty`字段进行汇总。 - 合并后的数据结构体以`spec_list`命名,用于后续的数据转换。 2. **构建请求体**: - 根据元数据配置中的`request`字段,我们需要构建请求体。 - 请求体包含了多个字段,如`api_outer_no`, `warehouse_no`, `order_type`, 等等。这些字段通过模板字符串从合并后的数据中提取相应的值。 3. **货品明细节点处理**: - 在请求体中,有一个重要的节点是`goods_list`,它表示入库单货品列表节点。 - 每个货品明细包含多个字段,如`spec_no`, `num`, `position_no`, `price`, `batch_no`, 等等。这些字段同样通过模板字符串从合并后的数据中提取相应的值。 4. **发送请求**: - 最后,将构建好的请求体通过HTTP POST方法发送到旺店通·企业奇门API接口。 - API接口路径为:`wdt.vip.wms.stockinout.order.push` #### 示例代码 以下是一个示例代码片段,用于展示如何实现上述步骤: ```python import requests import json # 合并和清洗后的数据 merged_data = { 'FBillNo': '12345', 'spec_list': [ { 'FMaterialId_FNumber': 'MAT001', 'FDestStockId_FNumber': 'WH001', 'FQty': '100', 'FDestLot': 'LOT001' } # 更多记录... ] } # 构建请求体 request_body = { 'api_outer_no': f"{merged_data['FBillNo']}-{merged_data['spec_list'][0]['FMaterialId_FNumber']}-{merged_data['spec_list'][0]['FDestStockId_FNumber']}-{merged_data['spec_list'][0]['FDestLot']}", 'warehouse_no': merged_data['spec_list'][0]['FDestStockId_FNumber'], 'order_type': '2', 'logistics_fee': '0', 'other_fee': '0', # 更多字段... } # 构建goods_list节点 goods_list = [] for item in merged_data['spec_list']: goods_item = { 'spec_no': item['FMaterialId_FNumber'], 'num': item['FQty'], # 更多字段... 'batch_no': item['FDestLot'] } goods_list.append(goods_item) request_body['goods_list'] = goods_list # 发起HTTP POST请求 response = requests.post( url='https://api.wangdiantong.com/wdt.vip.wms.stockinout.order.push', headers={'Content-Type': 'application/json'}, data=json.dumps(request_body) ) # 打印响应结果 print(response.json()) ``` 通过以上步骤和代码示例,我们可以将金蝶直接调拨单的数据成功转换并写入到旺店通·企业奇门API接口,实现系统间的数据无缝对接。 ![泛微OA与ERP系统接口开发配置](https://pic.qeasy.cloud/T21.png~tplv-syqr462i7n-qeasy.image)