ETL处理:将旺店通数据写入金蝶云星空

  • 轻易云集成顾问-卢剑航
### 旺店通销售出库单集成案例:从企业奇门到金蝶云星空 在电商行业日益激烈的竞争环境下,数据集成的重要性不言而喻。本文将详细介绍通过轻易云数据集成平台如何实现旺店通·企业奇门数据高效、稳定地对接至金蝶云星空,从而确保订单处理过程中的全透明化和实时追踪。 #### 接口源码解析与API调用 在本次系统对接项目中,我们重点使用了旺店通·企业奇门的`wdt.stockout.order.query.trade` API接口来获取销售出库单的数据,并利用金蝶云星空的`batchSave` API接口进行批量数据写入。这一过程涉及多个技术环节,包括数据采集、格式转换、分页限流处理以及异常重试机制等。 ##### 数据抓取与分页处理 首先,在调用`wdt.stockout.order.query.trade`时,我们需要特别注意其分页和限流机制,以避免因请求频繁导致API访问受限。我们设计了一套定时调度任务,周期性抓取出库单信息并根据页面大小分块处理,从而大幅提升了数据获取的效率和可靠性。 ```python def fetch_data(page, page_size): response = wdt_api_call('wdt.stockout.order.query.trade', {'page': page, 'page_size': page_size}) return response.json() ``` ##### 数据转换及映射 获得原始数据后,需要进行适配性的格式转换以符合金蝶云星空的数据标准。在这一步中,通过自定义映射规则,将不同字段之间建立对应关系,同时兼顾日期时间格式、数字精度要求等方面细节,实现平滑过渡。 ```python def transform_data(raw_data): transformed_data = [] for item in raw_data: transformed_item = { 'sourceOrderNo': item['order_no'], 'dateTime': convert_date(item['order_time']), # 其他字段映射... } transformed_data.append(transformed_item) return transformed_data ``` ##### 批量写入与异常处理 为了应对大量销售出库单的数据写入需求,采用了批量操作方式,即通过调用金蝶云星空的`batchSave`接口,一次提交多条记录。在这一过程中,还需构建完善的异常检测和重试机制,以确保每一次写入操作都能成功落地,并提供详细日志记录,用于问题溯源和分析。 ```python try: result = kingdee_batch_save(target_url, payload) except Exception as e: log_error(e) retry_logic(payload) # 实现错误重试逻辑 ``` 通过上述流程,不仅提高了系统 ![如何开发金蝶云星空API接口](https://pic.qeasy.cloud/D14.png~tplv-syqr462i7n-qeasy.image) ### 调用源系统旺店通·企业奇门接口wdt.stockout.order.query.trade获取并加工数据 在数据集成生命周期的第一步中,我们需要调用源系统的API接口以获取原始数据,并对其进行初步加工。本文将详细探讨如何通过轻易云数据集成平台调用旺店通·企业奇门接口`wdt.stockout.order.query.trade`,并对返回的数据进行处理。 #### 接口调用配置 首先,我们需要配置接口调用的元数据。根据提供的元数据配置,我们可以看到该接口使用POST方法,主要用于查询销售出库单信息。以下是具体的请求参数: - `start_time` 和 `end_time`:用于增量获取数据,分别表示开始时间和结束时间,格式为`yyyy-MM-dd HH:mm:ss`。 - `status`:表示订单状态,取值范围包括已取消(5)、已审核(55)、已发货(95)、部分打款(105)、已完成(110)和异常发货(113)。在本案例中,我们关注的是已发货和已完成的订单,即状态值为95和110。 - 其他参数如 `src_order_no`, `src_tid`, `stockout_no`, `shop_no`, 和 `warehouse_no` 用于进一步筛选特定订单。 此外,还有分页参数`page_size`和`page_no`,用于控制每次请求返回的数据条数及分页。 #### 请求参数示例 ```json { "start_time": "{{LAST_SYNC_TIME|datetime}}", "end_time": "{{CURRENT_TIME|datetime}}", "status": "95,110", "page_size": "{PAGINATION_PAGE_SIZE}", "page_no": "{PAGINATION_START_PAGE}" } ``` #### 数据清洗与转换 在获取到原始数据后,需要对其进行清洗与转换,以便后续写入目标系统。以下是一些关键步骤: 1. **字段映射**:将源系统中的字段映射到目标系统所需的字段。例如,将源系统中的订单编号映射到目标系统中的相应字段。 2. **数据类型转换**:确保所有字段的数据类型符合目标系统的要求。例如,将日期字符串转换为日期对象。 3. **条件过滤**:根据业务需求过滤不必要的数据。例如,只保留状态为已发货和已完成的订单。 #### 示例代码 以下是一个简化的Python示例代码,用于调用接口并处理返回的数据: ```python import requests import json from datetime import datetime # 配置请求参数 params = { "start_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "end_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "status": "95,110", "page_size": 40, "page_no": 0 } # 调用API接口 response = requests.post("https://api.wangdiantong.com/wdt.stockout.order.query.trade", data=json.dumps(params)) data = response.json() # 数据清洗与转换 processed_data = [] for order in data['orders']: processed_order = { "order_id": order["stockout_id"], "order_number": order["order_no"], # 添加更多字段映射和转换逻辑 } processed_data.append(processed_order) # 输出处理后的数据 print(json.dumps(processed_data, indent=4)) ``` #### 自动填充响应 在轻易云平台中,可以启用自动填充响应功能,这将极大简化数据处理过程。平台会根据预定义的规则自动将API响应中的字段填充到目标系统所需的位置,从而减少手动干预。 #### 条件过滤 元数据配置中还包含了条件过滤逻辑,例如: ```json "condition":[[{"field":"trade_type","logic":"in","value":"1,4"},{"field":"shop_name","logic":"neqv2","value":"系统分销店铺ae89e42"}]] ``` 这意味着我们只会处理交易类型为1或4,并且店铺名称不等于“系统分销店铺ae89e42”的订单。这些条件可以帮助我们进一步精确地筛选出需要的数据。 通过上述步骤,我们能够高效地从旺店通·企业奇门接口获取销售出库单信息,并对其进行初步加工,为后续的数据写入和集成奠定基础。 ![打通金蝶云星空数据接口](https://pic.qeasy.cloud/S21.png~tplv-syqr462i7n-qeasy.image) ### 使用轻易云数据集成平台将旺店通销售出库单转换并写入金蝶云星空API接口 在数据集成过程中,ETL(Extract, Transform, Load)是一个关键环节。本文将详细探讨如何使用轻易云数据集成平台将已经集成的源平台数据(旺店通销售出库单)进行转换,并通过金蝶云星空API接口写入目标平台。 #### 元数据配置解析 在本案例中,我们主要依赖以下元数据配置来完成数据的转换和写入: ```json { "api": "batchSave", "effect": "EXECUTE", "method": "POST", "idCheck": true, "operation": { "method": "batchArraySave", "rows": 1, "rowsKey": "array" }, "request": [ { "field": "FBillTypeID", "label": "单据类型", ... }, ... ], ... } ``` #### 数据转换与写入步骤 1. **单据类型 (FBillTypeID)** - 根据`flag_name`字段的值来决定单据类型。如果`flag_name`为“刷单”,则单据类型为“ZYD-106”,否则为“ZYD-017”。 ```json { "field": "FBillTypeID", "label": "单据类型", ... "value": "_function CASE '{flag_name}' WHEN '刷单' THEN 'ZYD-106' ELSE 'ZYD-017' END" } ``` 2. **单据编号 (FBillNo)** - 直接映射自源平台的`order_no`字段。 ```json { "field": "FBillNo", ... "value": "{order_no}" } ``` 3. **日期 (FDate)** - 映射自源平台的`consign_time`字段。 ```json { "field": "FDate", ... "value": "{consign_time}" } ``` 4. **销售组织 (FSaleOrgId) 和发货组织 (FStockOrgId)** - 根据`shop_no`字段的值来决定组织ID。具体规则如下: - 如果`shop_no`为'E0029', 'E0030', 'E0031', 'E0036', 或'E0037',则组织ID为‘3000’; - 否则,组织ID为‘7000’。 ```json { ... "value": "_function CASE '{shop_no}' WHEN 'E0029' THEN '3000' WHEN 'E0030' THEN '3000' WHEN 'E0031' THEN '3000' WHEN 'E0036' THEN '3000' WHEN 'E0037' THEN '3000' ELSE '7000' END" } ``` 5. **客户 (FCustomerID)** - 映射自源平台的`shop_no`字段,并通过ConvertObjectParser进行转换。 ```json { ... "value": "{shop_no}", ... "parser": {"name":"ConvertObjectParser","params":"FNumber"} } ``` 6. **销售部门 (FSaleDeptID)** - 映射自源平台的`shop_no`字段,并通过ConvertObjectParser进行转换。 ```json { ... "value":"{shop_no}", ... "parser":{"name":"ConvertObjectParser","params":"FNumber"} } ``` 7. **明细行处理 (FEntity)** 每个明细行包含以下几个关键字段: - **物料编码 (FMaterialID)**: 映射自源平台的`spec_no`字段,并通过ConvertObjectParser进行转换。 ```json { ... "value":"{spec_no}", ... "parser":{"name":"ConvertObjectParser","params":"FNumber"} } ``` - **仓库 (FStockID)**: 根据仓库名称从预定义集合中查找对应的仓库编号。 ```json { ... "_findCollection find FNumber from a70e7dd4-902d-33a7-a5a3-db76d81a6b73 where FName={warehouse_name}", ... {"name":"ConvertObjectParser","params":"FNumber"} } ``` - **计价数量 (FPriceUnitQty)**: 映射自源平台的`goods_count`字段。 ```json { ... "{goods_count}" } ``` - **价税合计 (FAllAmount)**: 映射自源平台的`total_amount`字段。 ```json { ... "{total_amount}" } ``` - **总毛重 (F_TZES_Qty1)**: 映射自源平台的`weight`字段。 ```json { ... "{weight}" } ``` - **批号 (FLot)**: 根据物料编码决定批号,如果物料编码以‘8’开头,则批号为‘0’,否则为空。 ```json { ... "_function CASE WHEN {spec_no} LIKE '8%' THEN '0' ELSE '' END", {"name":"ConvertObjectParser","params":"FNumber"} } ``` #### API调用及参数配置 最后,通过配置API调用参数,将处理后的数据写入金蝶云星空: ```json { ..., {"field":"FormId",...,"value":"SAL_OUTSTOCK"}, {"field":"Operation",...,"value":"Save"}, {"field":"IsAutoSubmitAndAudit",...,"value":"true"}, {"field":"IsVerifyBaseDataField",...,"value":"true"}, {"field":"SubSystemId",...,"value":"21"}, {"field":"IgnoreInterationFlag",...,"value":"true"}, {"field":"InterationFlags",...,"value":"STK_InvCheckResult"} } ``` 通过以上步骤,我们成功地将旺店通销售出库单的数据进行ETL转换,并通过金蝶云星空API接口完成了数据写入。这不仅实现了不同系统间的数据无缝对接,还确保了数据在整个生命周期中的透明和高效管理。 ![如何开发钉钉API接口](https://pic.qeasy.cloud/T30.png~tplv-syqr462i7n-qeasy.image)