ETL技术在旺店通和金蝶云星空数据同步中的应用实例

  • 轻易云集成顾问-钟家寿
### 旺店通·旗舰版数据集成到金蝶云星空:04-生产出库-其他出库同步案例分享 在本篇技术文章中,我们将聚焦于一个真实的系统对接集成案例——如何高效地将旺店通·旗舰版的数据无缝集成到金蝶云星空。具体来说,本文重点介绍了采用轻易云数据集成平台执行的“04-生产出库-其他出库同步”方案。 #### 确保数据不漏单与高效写入 首先,通过调用旺店通·旗舰版的API接口`wms.stockout.Process.queryWithDetail`来抓取各项详细的出库数据,这是确保订单信息完备而不遗漏的重要步骤。得益于平台具备定时可靠抓取机制,我们能够通过设定合理抓取频率,及时获取业务所需数据,从根本上杜绝漏单现象。 当海量的数据成功采集后,就需要快速并准确地写入到金蝶云星空。因此,高效处理批量数据成为关键环节之一。在这一过程中,使用金蝶云星空提供的`batchSave` API,可以显著提升大规模数据写入效率。该接口支持一次性提交多条记录,有助于减少网络传输次数和服务器压力,大幅缩短整体处理时间。 #### 数据格式差异和分页限流问题解决方案 在实际操作中,不同系统间的数据格式往往存在差异,需要进行合适的数据转换和映射。例如,将旺店通返回的一组JSON格式转化为符合金蝶云标准的结构。此外,还面临分页和限流的问题,由于旺店通API具有请求频次限制,在实现连续大规模查询时需要妥善设计分页策略,并设置合理延迟避免触发限流机制。 对于上述问题,利用轻易云平台提供的自定义映射及转换工具,以及其内置异常处理与错误重试功能,可以极大简化复杂操作过程。当发生异常状况或API调用失败时,可自动触发重新尝试机制,以保证最终结果达到预期一致性要求。 --- 汇总这些技术要点,为后续更深入解析整个“生产出库 – 其他出库同步”的执行方案奠定基础。在下文中,我们会进一步探讨具体实施细节,包括实际代码示例、配置参数以及注意事项等等。 ![如何对接企业微信API接口](https://pic.qeasy.cloud/D36.png~tplv-syqr462i7n-qeasy.image) ### 调用旺店通·旗舰版接口wms.stockout.Process.queryWithDetail获取并加工数据 在数据集成生命周期的第一步中,调用源系统的API接口以获取原始数据是至关重要的一环。本文将详细探讨如何通过轻易云数据集成平台调用旺店通·旗舰版的`wms.stockout.Process.queryWithDetail`接口,并对获取的数据进行初步加工。 #### 接口概述 `wms.stockout.Process.queryWithDetail`接口用于查询出库单的详细信息。该接口采用POST请求方式,支持多种查询参数,以便灵活地获取所需数据。 #### 元数据配置解析 根据提供的元数据配置,我们可以看到该接口的请求参数主要分为两个部分:查询参数和分页参数。 1. **查询参数(params)**: - `start_time` 和 `end_time`:用于指定查询的时间范围。这两个字段是必填项,尤其是在没有提供出库单编号或生产单编号时。 - `time_type`:时间判断类型,默认为建单时间(1),在本案例中设置为出库时间(2)。 - `warehouse_no`、`stockout_no`、`process_no`:分别对应仓库编号、出库单编号和生产单编号,用于更精确地筛选数据。 - `status`:状态字段,用逗号分隔开数字的分隔符,在本案例中设置为“110”。 2. **分页参数(pager)**: - `page_size`:每页记录数,默认设置为100。 - `page_no`:页码,用于分页查询。 #### 数据请求与清洗 在实际操作中,我们需要按照以下步骤进行数据请求与清洗: 1. **构建请求体**: 根据元数据配置,构建POST请求体。示例如下: ```json { "params": { "start_time": "{{LAST_SYNC_TIME|datetime}}", "end_time": "{{CURRENT_TIME|datetime}}", "time_type": "2", "warehouse_no": "", "stockout_no": "", "process_no": "", "status": "110" }, "pager": { "page_size": "100", "page_no": "1" } } ``` 2. **发送请求**: 使用轻易云平台提供的HTTP客户端功能发送POST请求至旺店通·旗舰版API端点,并接收响应结果。 3. **处理响应数据**: 对返回的数据进行初步清洗,包括但不限于: - 过滤无效或不完整的数据记录。 - 转换时间格式以符合目标系统要求。 - 合并多页结果集(如果有分页)。 4. **错误处理与重试机制**: 在实际操作中,可能会遇到网络波动或API限流等问题。需要设计合理的错误处理与重试机制,以确保数据请求的稳定性和完整性。 #### 数据转换与写入 在完成初步清洗后,需要将数据转换为目标系统所需的格式,并写入目标数据库或系统。在此过程中,可以利用轻易云平台提供的数据转换工具,实现字段映射、格式转换等操作。 #### 实践案例 以下是一个实际调用并处理该接口的示例代码片段: ```python import requests import json from datetime import datetime, timedelta # 设置时间范围 last_sync_time = (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d %H:%M:%S') current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S') # 构建请求体 payload = { "params": { "start_time": last_sync_time, "end_time": current_time, "time_type": "2", "warehouse_no": "", "stockout_no": "", "process_no": "", "status": "110" }, "pager": { "page_size": 100, "page_no": 1 } } # 发送POST请求 response = requests.post( url="https://api.wangdiantong.com/wms.stockout.Process.queryWithDetail", headers={"Content-Type": "application/json"}, data=json.dumps(payload) ) # 处理响应结果 if response.status_code == 200: data = response.json() # 清洗和转换数据 processed_data = [] for record in data.get('data', []): # 示例:仅保留必要字段并转换时间格式 processed_record = { 'stockout_no': record['stockout_no'], 'warehouse_no': record['warehouse_no'], 'create_time': datetime.strptime(record['create_time'], '%Y-%m-%d %H:%M:%S').isoformat(), 'status': record['status'] } processed_data.append(processed_record) # 将处理后的数据写入目标系统(示例) # write_to_target_system(processed_data) else: print(f"Error: {response.status_code}, {response.text}") ``` 通过以上步骤,我们可以高效地调用旺店通·旗舰版接口获取出库单详细信息,并对其进行初步清洗和加工,为后续的数据转换与写入奠定基础。 ![钉钉与CRM系统接口开发配置](https://pic.qeasy.cloud/S7.png~tplv-syqr462i7n-qeasy.image) ### 使用轻易云数据集成平台进行ETL转换与写入金蝶云星空API接口的技术案例 在数据集成生命周期的第二步中,我们重点关注如何将已经集成的源平台数据进行ETL(提取、转换、加载)转换,最终转为目标平台金蝶云星空API接口所能够接收的格式,并写入目标平台。以下是详细的技术实现过程。 #### 元数据配置解析 元数据配置是实现数据转换和写入的重要基础。通过对元数据配置的解析,我们可以清晰地了解每个字段的映射关系和处理逻辑。 ```json { "api": "batchSave", "method": "POST", "idCheck": true, "operation": { "rowsKey": "array", "rows": 20, "method": "batchArraySave" }, "request": [ { "field": "FBillNo", "label": "单据编号", "type": "string", "describe": "单据编号", "value": "{stockout_no}" }, { ... } ], ... } ``` #### 数据请求与清洗 首先,我们需要从源平台提取原始数据,并对其进行清洗,以确保数据的准确性和一致性。例如,从源平台提取出库单据编号、库存组织等信息,并根据业务规则进行必要的数据校验和清洗。 ```json { "stockout_no": "SO123456", ... } ``` #### 数据转换 在数据转换阶段,我们需要将清洗后的数据按照金蝶云星空API接口要求的格式进行转换。具体来说,需要将各个字段映射到目标平台所需的字段,并进行必要的数据类型转换和格式化处理。 1. **单据编号** (`FBillNo`) - 源数据字段:`stockout_no` - 转换后字段:`FBillNo` - 示例值:`SO123456` 2. **单据类型** (`FBillTypeID`) - 固定值:`QTCKD01_SYS` 3. **库存组织** (`FStockOrgId`) - 固定值:`100` 4. **日期** (`FDate`) - 源数据字段:`consign_time` - 示例值:`2023-10-01` 5. **领料部门** (`FDeptId`) - 固定值:`BM000032` 6. **明细信息** (`FEntity`) - 包含多个子字段,如物料编码、实发数量、发货仓库等。 - 每个子字段需要从源数据中的对应字段进行映射和转换。 示例: ```json { "FBillNo": "{stockout_no}", ... "FEntity": [ { "FMaterialId": "{{detail_list.spec_no}}", ... "FQty": "{{detail_list.num}}", ... } ] } ``` #### 数据写入 完成数据转换后,使用金蝶云星空提供的API接口将处理后的数据批量写入目标系统。根据元数据配置,使用`batchSave` API接口,通过HTTP POST请求发送转换后的JSON数据。 ```json { "FormId": "STK_MisDelivery", ... } ``` 具体实现步骤如下: 1. 构建HTTP请求: - URL: `https://api.kingdee.com/batchSave` - Method: `POST` - Headers: 包含认证信息和内容类型,如`Authorization: Bearer <token>`,`Content-Type: application/json` 2. 请求体包含所有经过ETL处理的数据: ```json { "FormId": "STK_MisDelivery", ... } ``` 3. 发起HTTP请求并处理响应: ```python import requests url = 'https://api.kingdee.com/batchSave' headers = { 'Authorization': 'Bearer <token>', 'Content-Type': 'application/json' } data = { # 包含所有经过ETL处理的数据 } response = requests.post(url, headers=headers, json=data) if response.status_code == 200: print("Data successfully written to Kingdee Cloud") else: print(f"Failed to write data: {response.text}") ``` 通过以上步骤,我们完成了从源平台提取、清洗、转换并最终写入金蝶云星空API接口的数据集成全过程。这一过程确保了不同系统间的数据无缝对接,提高了业务流程的自动化程度和效率。 ![如何开发企业微信API接口](https://pic.qeasy.cloud/T27.png~tplv-syqr462i7n-qeasy.image)