使用ETL技术实现数据从旺店通到金蝶云的转换

  • 轻易云集成顾问-孙传友
### 旺店通销售出库单同步至金蝶销售出库单【分销】案例解析 在企业信息化系统集成过程中,如何高效、准确地进行数据对接是一个关键问题。本篇文章将聚焦于旺店通·企业奇门的数据集成到金蝶云星空的实际案例,通过具体的技术细节分享,实现“旺店通销售出库单同步至金蝶销售出库单【分销】”这一操作。 首先,我们需要从旺店通获取相关的数据。这里使用了 `wdt.stockout.order.query.trade` API 来抓取最新的销售出库单数据。为了确保不漏单,并实现定时可靠的数据抓取,我们设置了合理的触发时间和轮询机制。同时,处理接口调用中的分页和限流问题也是必要的一环,以保证每次请求能够顺利返回完整数据。 接着是数据格式转换的问题。在将旺店通·企业奇门的数据写入到金蝶云星空之前,需进行字段映射和格式转换,这里我们实现了一套定制化的数据映射方案,使得两边系统间无缝对接。此外,为保证大量数据能快速而稳定地写入金蝶云星空,我们采用了批量提交的方法来提高效率。而对应的API则为 `batchSave`,该API支持多条记录同时插入,大大提升了运行性能。 为了应对可能出现的数据异常情况,本方案中还引入了错误重试机制。一旦某个批次提交失败,可以自动重新尝试提交,从而避免因为网络波动或临时故障导致的大规模数据丢失。同时,在整个处理过程中,我们通过实时监控与日志记录机制,对每一步操作进行了详细追踪,一旦发现异常,可迅速定位并解决问题。 最后,实施中涉及到了若干精细化配置,不仅确保了高效传输,还兼顾到了业务逻辑复杂性及扩展性需求,例如动态调整限流参数、优化分页策略等。当所有这些技术要点有机结合后,即可实现高度可靠且灵活扩展的系统对接集成。 此案展示的不仅是一种工具应用,更是多种技术手段综合运用后的最佳实践,希望能为类似复杂业务场景下的信息系统整合提供有益借鉴。 ![用友与外部系统接口集成开发](https://pic.qeasy.cloud/D1.png~tplv-syqr462i7n-qeasy.image) ### 调用源系统旺店通·企业奇门接口wdt.stockout.order.query.trade获取并加工数据 在数据集成的生命周期中,调用源系统接口是至关重要的一步。本文将详细探讨如何通过轻易云数据集成平台调用旺店通·企业奇门接口`wdt.stockout.order.query.trade`来获取销售出库单数据,并进行初步的数据清洗和加工。 #### 接口调用配置 首先,我们需要配置API接口的基本信息。根据提供的元数据配置,接口的基本信息如下: - **API名称**: `wdt.stockout.order.query.trade` - **请求方法**: `POST` - **分页大小**: 100条/页 - **唯一标识字段**: `order_no` #### 请求参数设置 为了实现增量数据获取,我们需要设置时间范围参数`start_time`和`end_time`。这些参数将帮助我们获取特定时间段内的数据: ```json { "start_time": "{{HOURE_AGO_3|datetime}}", "end_time": "{{CURRENT_TIME|datetime}}" } ``` 此外,我们还可以根据业务需求设置其他查询条件,例如订单状态、系统订单编号、原始单号等: ```json { "status": "55", // 已审核 "src_order_no": "", "src_tid": "", "stockout_no": "", "shop_no": "", "warehouse_no": "" } ``` #### 条件过滤 为了确保数据的准确性和有效性,我们需要对数据进行条件过滤。根据元数据配置,以下是我们需要应用的过滤条件: 1. 仓库名称包含“七遇”或“百媚”或“江苏淮安” 2. 店铺名称不包含“OEM” 3. 分销昵称不等于某个特定值 这些条件可以通过以下JSON格式来表示: ```json [ { "field": "warehouse_name", "logic": "like", "value": "七遇" }, { "field": "shop_name", "logic": "notlike", "value": "OEM" }, { "field": "fenxiao_nick", "logic": "neqv2" } ], [ { "field": "warehouse_name", "logic": "like", "value": "百媚" }, { "field": "shop_name", "logic": "notlike", "value": "OEM" }, { "field": "fenxiao_nick", "logic": "neqv2" } ], [ { "field": "warehouse_name", "logic": "like", { value: “江苏淮安” } }, { field: “shop_name”, logic: “notlike”, value: “OEM” }, { field: “fenxiao_nick”, logic: “neqv2” } ] ``` #### 数据请求与分页处理 由于每次请求返回的数据条数有限(最多100条),我们需要实现分页处理,以确保能够获取所有符合条件的数据。分页处理可以通过设置`page_size`和`page_no`参数来实现: ```json { “page_size”: “100”, “page_no”: “0” } ``` 在实际操作中,需要循环增加`page_no`直到没有更多数据返回为止。 #### 数据清洗与转换 在获取到原始数据后,需要对其进行清洗和转换,以便后续写入目标系统。在这个过程中,可以执行以下操作: 1. **字段映射**:将源系统中的字段映射到目标系统中的对应字段。 2. **数据格式转换**:例如,将日期格式从“yyyy-MM-dd HH:mm:ss”转换为目标系统所需的格式。 3. **异常数据处理**:过滤掉不符合业务规则的数据,或者标记为异常以便后续处理。 #### 实际案例应用 假设我们已经成功调用了接口并获取了销售出库单数据,接下来我们需要对这些数据进行清洗和转换。例如,将原始单号(src_tid)映射到目标系统中的订单编号(order_id),并将日期格式进行转换: ```json { order_id: src_tid, order_date: convertDateFormat(order_date, ‘yyyy-MM-dd HH:mm:ss’, ‘MM/dd/yyyy’) } ``` 通过以上步骤,我们可以确保从旺店通·企业奇门接口获取的数据经过清洗和转换后,能够无缝对接到目标系统中,从而实现高效的数据集成。 总结来说,通过合理配置API接口、设置请求参数、应用条件过滤以及执行分页处理,我们能够高效地从源系统获取并加工所需的数据,为后续的数据写入和业务决策提供坚实的基础。 ![钉钉与WMS系统接口开发配置](https://pic.qeasy.cloud/S17.png~tplv-syqr462i7n-qeasy.image) ### 使用轻易云数据集成平台实现旺店通销售出库单同步至金蝶云星空 在数据集成生命周期的第二步,我们需要将已经集成的源平台数据进行ETL转换,转为目标平台金蝶云星空API接口所能够接收的格式,并最终写入目标平台。以下是详细的技术实现过程。 #### 1. API接口配置 首先,我们需要了解金蝶云星空API接口的配置。根据元数据配置文件,目标API为`batchSave`,请求方法为`POST`。在请求中,我们需要传递多个字段,这些字段包括单据类型、单据编号、日期、销售组织、客户、发货组织等。 #### 2. 数据转换与映射 我们需要将源平台的数据转换为目标平台所需的格式。以下是主要字段的映射和转换规则: - **FBillTypeID(单据类型)**: 固定值`XSCKD01_SYS`。 - **FBillNo(单据编号)**: 从源数据中提取订单编号`{order_no}`。 - **FDate(日期)**: 使用模板引擎将源数据中的发货时间`{consign_time}`转换为日期格式。 - **FSaleOrgId(销售组织)**: 通过MongoDB查询,根据分销商昵称`{fenxiao_nick}`获取对应的销售组织编码。 - **FCustomerID(客户)**: 同样通过MongoDB查询,根据分销商昵称获取客户名称。 - **FStockOrgId(发货组织)**: 从源数据中提取仓库编号`{warehouse_no}`并进行映射。 #### 3. 明细信息处理 对于明细信息,我们需要处理多个子字段: - **FMaterialID(物料编码)**: 根据物料编码规范,将源数据中的规格编号`{{details_list.spec_no}}`进行转换。 - **FRealQty(实发数量)**: 提取源数据中的商品数量`{{details_list.goods_count}}`。 - **FTaxPrice(含税单价)**: 提取源数据中的销售价格`{{details_list.sell_price}}`。 - **FOwnerTypeId(货主类型)**: 固定值`BD_OwnerOrg`。 - **FOwnerId(货主)**: 固定值100,通过解析器进行转换。 - **FStockID(仓库)**: 提取并转换仓库编号。 - **FIsFree(是否赠品)**: 判断商品是否为赠品,进行相应标记。 - **FEntrynote(备注)**: 提取并转换备注信息。 - **F_POIH_Text(原始单号)**: 提取原始订单号。 - **FEntryTaxRate(税率)**: 提取并设置税率。 #### 4. 财务信息处理 财务信息包含结算组织和结算币别: - **FSettleOrgID(结算组织)**: 同样通过MongoDB查询,根据分销商昵称获取结算组织编码。 - **FSETTLECURRID(结算币别)**: 固定值`PRE001`,表示人民币。 #### 5. 其他请求参数 除了上述主要字段外,还需要传递一些其他参数以确保请求成功: - **FormId(业务对象表单Id)**: 固定值`SAL_OUTSTOCK`,表示销售出库单表单ID。 - **Operation(执行的操作)**: 固定值`BatchSave`,表示批量保存操作。 - **IsAutoSubmitAndAudit(提交并审核)**: 设置为true,表示自动提交并审核单据。 - **IsVerifyBaseDataField(验证基础资料)**: 设置为true,表示验证基础资料是否正确。 - **SubSystemId(系统模块)**: 固定值21,表示具体系统模块ID。 - **InterationFlags(允许负库存)**: 固定值`STK_InvCheckResult`, 表示允许负库存。 #### 6. 实现代码示例 以下是一个伪代码示例,用于展示如何实现上述ETL过程: ```python import requests import json from datetime import datetime def transform_data(source_data): transformed_data = { "FBillTypeID": {"FNumber": "XSCKD01_SYS"}, "FBillNo": source_data["order_no"], "FDate": datetime.strptime(source_data["consign_time"], "%Y-%m-%d %H:%M:%S").strftime("%Y-%m-%d"), "FSaleOrgId": get_sale_org_id(source_data["fenxiao_nick"]), "FCustomerID": get_customer_id(source_data["fenxiao_nick"]), "FStockOrgId": {"FNumber": source_data["warehouse_no"]}, "FNote": source_data["cs_remark"], "FEntity": [ { "FMaterialID": {"FNumber": detail["spec_no"]}, "FRealQty": detail["goods_count"], "FTaxPrice": detail["sell_price"], "FOwnerTypeId": "BD_OwnerOrg", "FOwnerId": {"FNumber": "100"}, "FStockID": {"FNumber": source_data["warehouse_no"]}, "FIsFree": detail.get("is_free", ""), "FEntrynote": detail.get("remark", ""), "F_POIH_Text": source_data["src_tids"], "FEntryTaxRate": source_data["tax_rate"] } for detail in source_data["details_list"] ], "SubHeadEntity": { "FSettleOrgID": get_settle_org_id(source_data["fenxiao_nick"]), "FSETTLECURRID": {"FNumber":"PRE001"} }, # Other request parameters } return transformed_data def get_sale_org_id(fenxiao_nick): # Implement MongoDB query to get sale org id pass def get_customer_id(fenxiao_nick): # Implement MongoDB query to get customer id pass def get_settle_org_id(fenxiao_nick): # Implement MongoDB query to get settle org id pass def send_request(transformed_data): url = 'https://api.kingdee.com/batchSave' headers = {'Content-Type': 'application/json'} response = requests.post(url, headers=headers, data=json.dumps(transformed_data)) return response.json() source_data = { # Source data from 旺店通 } transformed_data = transform_data(source_data) response = send_request(transformed_data) print(response) ``` 以上代码展示了如何从源数据中提取必要的信息,并通过ETL过程将其转换为金蝶云星空API所需的格式。最终,通过HTTP POST请求将转换后的数据发送到金蝶云星空,实现数据的无缝对接和写入。 ![金蝶与MES系统接口开发配置](https://pic.qeasy.cloud/T16.png~tplv-syqr462i7n-qeasy.image)