ETL流程详解:将源数据转换为金蝶云星辰V2API格式并写入

  • 轻易云集成顾问-叶威宏
### 旺店通·企业奇门到金蝶云星辰V2的数据集成案例分享:其他入库(委外)V2.0 在企业信息化建设中,数据的准确性和实时性至关重要。本文将详细分享如何通过系统集成技术,实现旺店通·企业奇门平台与金蝶云星辰V2之间的数据高效对接。具体方案名为“其他入库(委外)V2.0”,旨在确保各类业务数据能够快速、精准地从旺店通·企业奇门导入到金蝶云星辰V2。 为了实现这一目标,我们采用了轻易云数据集成平台,其提供了一系列强大的功能,包括支持大量数据的高吞吐量写入能力,以及定时可靠地抓取旺店通·企业奇门接口(wdt.stockin.order.query)的数据。这些特性不仅显著提升了数据处理的效率,还能确保业务流程中的每个节点都透明可控。 该方案的一大亮点是使用分布式架构来应对批量数据处理需求。在实际操作中,通过调用wdt.stockin.order.query接口,可以获得需要同步的原始订单数据,再经过一系列自定义转换逻辑,将其映射并适配至金蝶云星辰V2所需格式,然后利用API (/jdy/v2/scm/inv_other_in)完成最终的数据写入。 此外,为保证整个过程中不出现漏单情况,我们配置了详尽的监控和告警机制。一旦监测到异常或错误,会自动启动重试及修复流程,极大减少潜在风险。同时,针对旺店通·企业奇门与金蝶云星辰V2之间可能存在的数据结构差异,也进行了专有解决,以保持两端系统间的一致性和兼容性。 ![钉钉与WMS系统接口开发配置](https://pic.qeasy.cloud/D30.png~tplv-syqr462i7n-qeasy.image) ### 调用旺店通·企业奇门接口wdt.stockin.order.query获取并加工数据的技术案例 在数据集成的生命周期中,调用源系统接口是至关重要的一步。本文将详细探讨如何通过轻易云数据集成平台调用旺店通·企业奇门接口`wdt.stockin.order.query`,并对获取的数据进行加工处理。 #### 接口调用配置 首先,我们需要配置接口调用的元数据。根据提供的元数据配置,可以看到我们需要通过POST方法来调用`wdt.stockin.order.query`接口,并且需要传递多个参数以满足业务需求。 ```json { "api": "wdt.stockin.order.query", "method": "POST", "number": "order_no", "id": "stockin_id", "pagination": { "pageSize": 50 }, "condition": [ [ { "field": "warehouse_no", "logic": "neq", "value": "WH2024052601" } ] ], "idCheck": true, "request": [ { "field": "start_time", "label": "开始时间", "type": "string", "describe": "按最后修改时间增量获取数据,start_time作为开始时间,格式:yyyy-MM-dd HH:mm:ss", "value": "{{LAST_SYNC_TIME|datetime}}" }, { "field": "end_time", "label": "结束时间", "type": "string", "describe": "按最后修改时间增量获取数据,end_time作为结束时间,格式:yyyy-MM-dd HH:mm:ss", "value": "{{CURRENT_TIME|datetime}}" }, { "field": "order_type", "label": "源单据类别", "type": "string", "describe": "源单据类别 1采购入库, 2调拨入库, 4盘盈入库, 5生产入库, 6其他入库, 7保修入库, 8纠错入库, 9初始化入库, 10预入库, 11JIT退货入库, 12委外入库", " value ":"12" }, { "field":"status","label":"入库单状态","type":"string","describe":"入库单状态10已取消20编辑中25待价格确认30待审核32待推送33推送失败35委外待入库60待结算80已完成(按照状态查询时必须传原单据类别,如果未传status则默认查询80已完成单据)","value":"60,80"}, {"field":"warehouse_no","label":"仓库编号","type":"string","describe":"代表仓库所有属性的唯一编码,用于仓库区分,ERP内支持自定义(ERP仓库界面设置),用于获取指定仓库单据信息(不支持一次推送多个仓库编号)"}, {"field":"src_order_no","label":"上层单据编号","type":"string","describe":"上层单据编号,传该字段可以不传开始时间和结束时间"}, {"field":"stockin_no","label":"入库单号","type":"string","describe":"入库单号,传该字段可以不传开始时间和结束时间"} ], "otherRequest":[{"field":"page_size","label":"分页大小","type":"string","describe":"每页返回的数据条数,输入值范围1~100,不传本参数,输入值默认为40,使用举例单击这里","value":"{PAGINATION_PAGE_SIZE}"},{"field":"page_no","label":"页号","type":"string","describe":"不传值默认从0页开始","value":"{PAGINATION_START_PAGE}"}] } ``` #### 参数详解 - **start_time** 和 **end_time**:这两个参数用于指定数据的时间范围,通过增量方式获取最新的数据。 - **order_type**:固定为“12”,表示委外入库。 - **status**:查询状态为“60”和“80”的订单,即待结算和已完成的订单。 - **warehouse_no**:用于过滤特定仓库的数据。 - **pagination**:分页参数,每次请求返回50条记录。 #### 数据请求与清洗 在调用接口后,我们会得到一个包含多条记录的JSON响应。为了确保数据质量,需要对这些记录进行清洗和过滤。以下是一个简单的数据清洗示例: ```python import requests import json # 配置请求参数 url = 'https://api.wangdian.cn/openapi2/wdt.stockin.order.query' headers = {'Content-Type': 'application/json'} payload = { 'start_time': '2023-01-01 00:00:00', 'end_time': '2023-01-31 23:59:59', 'order_type': '12', 'status': '60,80', 'warehouse_no': '', 'page_size': '50', 'page_no': '0' } # 发起请求 response = requests.post(url, headers=headers, data=json.dumps(payload)) data = response.json() # 数据清洗 cleaned_data = [] for record in data['orders']: if record['warehouse_no'] != 'WH2024052601': cleaned_data.append(record) print(cleaned_data) ``` 在这个示例中,我们首先配置了请求参数,然后发起POST请求获取数据。接着,我们对返回的数据进行过滤,只保留`warehouse_no`不等于`WH2024052601`的记录。 #### 数据转换与写入 经过清洗后的数据,需要进一步转换为目标系统所需的格式,并写入目标数据库或系统。这一步通常涉及到字段映射、类型转换等操作。以下是一个简单的数据转换示例: ```python # 数据转换 transformed_data = [] for record in cleaned_data: transformed_record = { '库存ID': record['stockin_id'], '订单号': record['order_no'], '仓库编号': record['warehouse_no'], # 更多字段映射... } transformed_data.append(transformed_record) # 写入目标系统(示例) def write_to_target_system(data): # 假设目标系统提供了一个API接口用于接收数据 target_url = 'https://target-system/api/receive' response = requests.post(target_url, headers=headers, data=json.dumps(data)) return response.status_code write_status = write_to_target_system(transformed_data) print(f'写入状态: {write_status}') ``` 在这个示例中,我们将清洗后的数据转换为目标系统所需的格式,并通过API接口将其写入目标系统。 通过以上步骤,我们实现了从旺店通·企业奇门接口获取、清洗、转换并写入数据的完整流程。这一过程不仅提高了数据处理效率,还确保了数据的一致性和准确性。 ![打通金蝶云星空数据接口](https://pic.qeasy.cloud/S6.png~tplv-syqr462i7n-qeasy.image) ### 将源平台数据转换为金蝶云星辰V2API接口格式并写入目标平台 在数据集成过程中,ETL(Extract, Transform, Load)是至关重要的一环。本文将详细介绍如何使用轻易云数据集成平台将已经集成的源平台数据进行ETL转换,转为金蝶云星辰V2API接口所能够接收的格式,并最终写入目标平台。 #### 1. 数据请求与清洗 首先,我们需要从源平台获取原始数据,并对其进行必要的清洗和预处理。这一步通常包括过滤无效数据、处理缺失值以及标准化数据格式等。假设我们已经完成了这一步,接下来我们将重点放在如何将清洗后的数据转换为金蝶云星辰V2API接口所需的格式。 #### 2. 数据转换与写入 在这一阶段,我们需要根据金蝶云星辰V2API接口的要求,将清洗后的数据进行转换。以下是元数据配置文件,它定义了如何将源数据映射到目标API所需的字段: ```json { "api": "/jdy/v2/scm/inv_other_in", "effect": "EXECUTE", "method": "POST", "number": "id", "id": "id", "name": "id", "idCheck": true, "request": [ {"field": "bill_date", "label": "单据日期", "type": "string", "value": "{stockin_time}"}, {"field": "bill_no", "label": "单据编码", "type": "string", "value": "{order_no}"}, {"field": "trans_type_id", "label": "业务类型id", "type": "string", "value":"12"}, {"field": "operation_key", "label":"操作类型","type":"string","value":"audit"}, { "field":"material_entity", "label":"商品分录", "type":"array", "value":"details_list", ![打通用友BIP数据接口](https://pic.qeasy.cloud/T9.png~tplv-syqr462i7n-qeasy.image)