使用轻易云平台进行ETL转换并写入旺店通企业奇门

  • 轻易云集成顾问-何语琴
### 金蝶云星空数据集成至旺店通·企业奇门的解决方案 在企业信息化系统中,实现不同平台的数据对接是一项复杂且关键的任务。本文将分享一个实际运行的案例——如何高效地将金蝶云星空中的仓库数据集成到旺店通·企业奇门系统中。 此次集成项目命名为“金蝶仓库”,核心目标是通过调用金蝶云星空API接口ExecuteBillQuery,以批量抓取和可靠写入大量库存数据至旺店通·企业奇门,满足业务实时同步需求。本案重点解决了如下技术难点: 1. **高吞吐量的数据写入能力**:确保大规模、频繁的数据变动能够及时更新到旺店通·企业奇门系统中,避免因延迟或性能瓶颈造成订单处理滞后。 2. **集中监控和告警系统**:利用轻易云平台提供的集中监控和告警功能,实时跟踪每个数据集成任务的状态与性能。当出现异常时,立即触发报警并启动自动处理机制。 3. **自定义数据转换逻辑**:面对两大平台之间不同的数据结构,通过灵活配置自定义转换规则,使得各类复杂格式的数据都能顺利适配,实现无缝对接。 4. **分页与限流处理**:根据金蝶云星空API接口规范,对ExecuteBillQuery进行分页查询,并结合速率限制策略,有效规避可能存在的服务端过载问题。 5. **错误重试机制及异常处理**:设计健壮的错误重试机制,当遇到网络波动或服务不稳定导致调用失败时,可多次尝试直至成功。此外,还实现了详细日志记录,加快故障定位及排除效率。 从技术架构层面来看,本案例充分利用了可视化的数据流设计工具,将整个流程模块化呈现。在此框架下,各环节既独立执行,又无缝衔接,为确保最终实现准确、高效、稳定的数据同步奠定了坚实基础。 ![如何开发用友BIP接口](https://pic.qeasy.cloud/D8.png~tplv-syqr462i7n-qeasy.image) ### 调用金蝶云星空接口ExecuteBillQuery获取并加工数据 在数据集成的生命周期中,调用源系统接口获取数据是至关重要的一步。本文将深入探讨如何通过轻易云数据集成平台调用金蝶云星空的`ExecuteBillQuery`接口来获取并加工数据。 #### 接口配置与元数据解析 首先,我们需要理解和配置元数据,以便正确调用`ExecuteBillQuery`接口。以下是元数据配置的详细解析: ```json { "api": "ExecuteBillQuery", "method": "POST", "number": "FNumber", "id": "FId", "pagination": { "pageSize": 100 }, "request": [ { "field": "Limit", "label": "最大行数", "type": "string", "describe": "金蝶的查询分页参数", "value": "{PAGINATION_PAGE_SIZE}" }, { "field": "StartRow", "label": "开始行索引", "type": "string", "describe": "金蝶的查询分页参数", "value": "{PAGINATION_START_ROW}" }, { "field": "TopRowCount", "label": "返回总行数", "type": "int", "describe": "金蝶的查询分页参数" }, { "field": "FilterString", "label": "过滤条件", "type": "string", "describe":"示例写法 FSupplierId.FNumber = 'VEN00010' and FApproveDate>=", "value":"FAuditDate>='{{LAST_SYNC_TIME|datetime}}' and F_UOMS_CHECKBOX = 1 and FUseOrgId.FNumber <> '100'" }, { "field":"FieldKeys", "label":"需查询的字段key集合", "type":"array", ... ``` #### 配置请求参数 在实际操作中,我们需要根据业务需求配置请求参数。以下是关键字段及其配置说明: 1. **Limit**: 设置每次请求返回的数据行数,使用分页参数`{PAGINATION_PAGE_SIZE}`。 2. **StartRow**: 设置起始行索引,使用分页参数`{PAGINATION_START_ROW}`。 3. **TopRowCount**: 返回总行数,用于控制查询结果的大小。 4. **FilterString**: 设置过滤条件,例如:`FAuditDate>='{{LAST_SYNC_TIME|datetime}}' and F_UOMS_CHECKBOX = 1 and FUseOrgId.FNumber <> '100'`,确保只获取符合条件的数据。 5. **FieldKeys**: 指定需要查询的字段集合,通过数组转字符串处理。 #### 调用接口示例 以下是一个调用`ExecuteBillQuery`接口的示例代码: ```python import requests import json url = 'https://api.kingdee.com/ExecuteBillQuery' headers = {'Content-Type': 'application/json'} payload = { 'Limit': '100', 'StartRow': '0', 'TopRowCount': 0, 'FilterString':"FAuditDate>='2023-01-01' and F_UOMS_CHECKBOX = 1 and FUseOrgId.FNumber <> '100'", 'FieldKeys': ['FPOOrderEntry_FEntryId', 'FPurchaseOrgId.FNumber'], 'FormId': 'BD_STOCK' } response = requests.post(url, headers=headers, data=json.dumps(payload)) if response.status_code == 200: data = response.json() # 数据处理逻辑 else: print(f"Error: {response.status_code}") ``` #### 数据清洗与转换 获取到原始数据后,需要进行清洗和转换,以满足目标系统的数据格式要求。例如: 1. **日期格式转换**:将日期字段从字符串格式转换为标准日期格式。 2. **字段重命名**:根据目标系统要求重命名字段。 3. **数据过滤**:进一步过滤不必要的数据。 以下是一个简单的数据清洗示例: ```python def clean_data(raw_data): cleaned_data = [] for record in raw_data: cleaned_record = {} cleaned_record['EntryID'] = record['FPOOrderEntry_FEntryId'] cleaned_record['PurchaseOrg'] = record['FPurchaseOrgId.FNumber'] # 日期格式转换 cleaned_record['AuditDate'] = convert_date(record['FAuditDate']) cleaned_data.append(cleaned_record) return cleaned_data def convert_date(date_str): from datetime import datetime return datetime.strptime(date_str, '%Y-%m-%d').date() ``` 通过上述步骤,我们可以有效地调用金蝶云星空接口获取所需数据,并进行必要的数据清洗和转换,为后续的数据写入和处理打下坚实基础。 ![如何对接企业微信API接口](https://pic.qeasy.cloud/S14.png~tplv-syqr462i7n-qeasy.image) ### 使用轻易云数据集成平台进行ETL转换并写入旺店通·企业奇门API接口 在轻易云数据集成平台的生命周期中,数据转换与写入是关键的一步。本文将详细探讨如何将已经集成的源平台数据(如金蝶仓库的数据)进行ETL转换,并转为旺店通·企业奇门API接口所能够接收的格式,最终写入目标平台。 #### 数据请求与清洗 首先,我们需要从金蝶仓库提取原始数据。这一步通常包括数据请求和初步清洗,以确保数据的完整性和准确性。假设我们已经完成了这一步,接下来进入ETL转换阶段。 #### ETL转换过程 ETL(Extract, Transform, Load)过程中的Transform部分是关键所在。在这个阶段,我们需要根据旺店通·企业奇门API接口的要求,对源数据进行格式化和转换。以下是具体的元数据配置: ```json { "api": "wdt.stock.transfer.push", "method": "POST", "idCheck": true, "request": [ {"field": "outer_no", "label": "外部单号", "type": "string", "describe": "外部单据唯一标识,用于避免同一数据重复推送"}, {"field": "from_warehouse_no", "label": "源仓库", "type": "string", "describe": "货品被调出的仓库"}, {"field": "to_warehouse_no", "label": "目标仓库", "type": "string", "describe": "货品被调入的仓库"}, {"field": "address", "label": "地址", "type": "string", "describe": "地址"}, {"field": "contact", "label": "联系人", "type": "string", "describe": ""}, {"field": "telno", "label":"联系电话","type":"string","describe":"联系电话"}, {"field":"transfer_type","label":"调拨类型","type":"string","describe":"0:分步调拨,1:快速调拨,默认是1","value":"1"}, {"field":"mode","label":"调拨方式","type":"string","describe":"0:单品调拨,1:货位调拨;默认是0"}, {"field":"autocheck","label":"是否审核","type":"string","describe":"1:审核,0:不审核,默认为0","value":"1"}, { "field":"skus", "label":"货品列表节点", "type":"array", "describe":"调拨单货品列表节点", "children":[ {"field":"remark","label":"备注","type":"string","describe":"货品明细备注"}, {"field":"spec_no","label":"商家编码","type":"string","describe":"ERP内单品唯一编码(SKU),代表单品(SKU)所有属性,并且为库存量出入计算最小单元。SKU概念介绍,单击这里,"}, {"field":"from_position_no","label":"调出货位","type":"string","describe":"调出货位(当mode为1时,必传),不传值读ERP配置【入库开单货位优先级配置】,取默认货位、上一次入库货位、ZANCUN货位其中一个货位。"}, {"field":"to_position_no","label":"调出货位","type":"string","describe":"","parent":"","parent":"","parent":"","parent":"","parent":"","parent":"","parent":"","parent":"","parent":"","parent":"","parent":"","parent":"","parent":"","parent":"","parent":"","parent":"","parent":"","parent":"","parent":"", {"field": } ] } ], otherRequest:[{"field": } ``` #### 数据映射与转换 在此步骤中,我们将金蝶仓库的数据字段映射到旺店通·企业奇门API接口所需的字段。例如: - `outer_no` 映射到金蝶仓库中的订单编号。 - `from_warehouse_no` 映射到金蝶仓库中的源仓库编号。 - `to_warehouse_no` 映射到金蝶仓库中的目标仓库编号。 - `address`, `contact`, `telno` 等字段需要从金蝶仓库中提取相应的信息。 对于复杂的数据结构,如`skus`数组,需要特别注意每个子字段的映射和转换。例如: ```json { skus: [ { remark: "<来自金蝶仓库的备注>", spec_no: "<来自金蝶仓库的SKU编码>", from_position_no: "<来自金蝶仓库的调出货位>", to_position_no: "<来自金蝶仓库的调入货位>", num: "<来自金蝶仓库的调拨数量>" } ] } ``` #### 数据写入 完成数据转换后,将其封装成JSON格式,通过HTTP POST请求发送到旺店通·企业奇门API接口: ```json { api: 'wdt.stock.transfer.push', method: 'POST', data: { outer_no: '<外部单号>', from_warehouse_no: '<源仓库>', to_warehouse_no: '<目标仓库>', address: '<地址>', contact: '<联系人>', telno: '<联系电话>', transfer_type: '1', // 默认快速调拨 mode: '0', // 默认单品调拨 autocheck: '1', // 默认审核 skus: [ { remark: '<备注>', spec_no: '<商家编码>', from_position_no: '<调出货位>', to_position_no: '<调入货位>', num: '<调拨数量>' } ] } } ``` 通过这种方式,可以确保从源平台(金蝶仓库)提取的数据经过ETL转换后,符合目标平台(旺店通·企业奇门)的API接口要求,从而实现无缝的数据集成和写入。 #### 实时监控与错误处理 在整个过程中,实时监控和错误处理至关重要。轻易云数据集成平台提供了丰富的监控工具,可以实时跟踪每个数据包的状态。一旦出现错误,可以迅速定位并解决问题,以确保数据流动顺畅。 通过上述步骤,我们成功地实现了从源平台到目标平台的数据ETL转换和写入。这不仅提高了业务效率,也确保了数据的一致性和准确性。 ![金蝶与SCM系统接口开发配置](https://pic.qeasy.cloud/T1.png~tplv-syqr462i7n-qeasy.image)