高效的数据流处理:易仓到金蝶云星空的系统集成方法

  • 轻易云集成顾问-贺强
### 案例分享:易仓数据集成到金蝶云星空 在复杂多变的业务环境中,确保数据无缝、准确地流动对任何一家企业都是至关重要的。今天,我们将聚焦一个实际运行的系统对接案例——“Done-易仓-获取FBA头程数据--->金蝶-分步式调出单(更新)”,解析如何通过有效的数据集成方案,实现从易仓到金蝶云星空的数据传输与处理。 #### 1. 系统背景与需求 该项目主要目标是通过调用易仓API接口 `getFbaShipment` 获取头程运输过程中的FBA数据,并将这些信息批量写入至金蝶云星空系统中。这不仅要求高吞吐量的数据写入能力和实时监控,还必须处理两套系统在接口格式和结构上的差异,以确保不漏单、不重单。 #### 2. 技术挑战与应对策略 ##### 数据快速写入 为满足大量数据快速且稳定地写入金蝶云星空,通过其 `batchSave` API 接口进行批量操作。轻易云平台支持高吞吐量的数据流处理,有效提升了这一过程的时效性。 ##### 分页及限流问题 考虑到易仓API存在分页和限流的问题,我们设定了可靠抓取机制,在每次请求后及时检查返回结果并控制下一次请求的触发时机。这种方式既保证了页面完整性的抓取,又避免因频繁调用导致限流问题而影响进度。 ##### 数据质量监控 为了确保所有集成步骤正常执行,包括针对不同步骤提供异常检测及告警功能。一旦发现数据丢失或网络故障等异常情况,即可触发相应机制实施纠正,保障整体流程顺利进行。此外,自定义转换规则适用于调整特定业务逻辑下的数据结构,从而解决两套系统间属性/格式的不一致性问题。 #### 3. 实施细节概览 我们采用的是分步式实现策略,每一步完成特定任务: 1. 定期调用 getFbaShipment 接口抓取运输数据信息。 2. 实现数据结构转换以匹配金蝶云星空所需格式。 3. 利用 batchSave API 完成批量上传,同时记录日志以便后续审计。 4. 集中监控整个流程,设置告警阈值及时反馈潜在问题,保证全局透明度与控制力。 此技术方案不但提高了业务效率,也增加了系统之间协同工作的可靠性,为未来扩展更多交互模块打下坚实基础。在接下来的内容里,我们会进一步详细介绍具体步骤中的关键技术点及代码实现逻辑 ![如何开发用友BIP接口](https://pic.qeasy.cloud/D29.png~tplv-syqr462i7n-qeasy.image) ### 调用源系统易仓接口getFbaShipment获取并加工数据 在数据集成的生命周期中,调用源系统接口是至关重要的一步。本文将深入探讨如何通过轻易云数据集成平台调用易仓接口`getFbaShipment`来获取FBA头程数据,并对其进行初步加工。 #### 接口配置与调用 首先,我们需要配置和调用易仓的`getFbaShipment`接口。该接口采用POST方法进行数据请求,主要用于查询FBA头程货件信息。以下是元数据配置的详细说明: ```json { "api": "getFbaShipment", "effect": "QUERY", "method": "POST", "number": "{shipment_id}{updated_at}", "id": "shipment_id", "idCheck": true, "request": [ {"field": "page", "label": "page", "type": "int", "value": "1"}, {"field": "page_size", "label": "page_size", "type": "int", "value": "50"}, {"field": "updateFor", "label": "开始出库时间", "type": "string", "value":"{{LAST_SYNC_TIME|datetime}}"}, {"field": "updateTo", "label":"结束出库时间","type":"string", "value":"{{CURRENT_TIME|datetime}}"}, {"field":"platformStatus","label":"货件状态","type":"string", "value":"CLOSED,RECEIVING","parser":{"name":"StringToArray","params":","}}, {"field":"createFor","label":"创建开始时间","type":"string", "value":"2024-04-01 00:00:00"} ], "autoFillResponse": true, "condition_bk":[[]] } ``` #### 请求参数详解 1. **分页参数**: - `page`: 当前页码,默认值为1。 - `page_size`: 每页返回的数据条数,默认值为50。 2. **时间范围参数**: - `updateFor`: 开始出库时间,使用模板变量`{{LAST_SYNC_TIME|datetime}}`动态填充。 - `updateTo`: 结束出库时间,使用模板变量`{{CURRENT_TIME|datetime}}`动态填充。 3. **货件状态**: - `platformStatus`: 查询的货件状态,包括`CLOSED`和`RECEIVING`。通过解析器将逗号分隔的字符串转换为数组形式。 4. **创建时间**: - `createFor`: 创建开始时间,固定值为"2024-04-01 00:00:00"。 #### 数据获取与初步加工 在完成接口配置后,通过轻易云平台发起POST请求获取数据。由于配置了`autoFillResponse: true`,平台会自动处理响应数据并填充到指定的数据结构中。 ##### 数据清洗与转换 获取到原始数据后,需要对其进行清洗和转换,以便后续处理和写入目标系统。以下是常见的数据清洗步骤: 1. **字段映射**:将原始数据字段映射到目标系统所需的字段。例如,将易仓返回的`shipment_id`映射为金蝶系统中的相应字段。 2. **格式转换**:将日期、数值等字段格式转换为目标系统要求的格式。例如,将日期格式从"YYYY-MM-DD HH:mm:ss"转换为"YYYYMMDD"。 3. **状态过滤**:根据业务需求过滤掉不需要的记录。例如,只保留状态为"CLOSED"或"RECEIVING"的记录。 ##### 示例代码片段 以下是一个示例代码片段,用于演示如何处理从易仓接口获取的数据: ```python import requests import json from datetime import datetime # 配置请求参数 params = { 'page': 1, 'page_size': 50, 'updateFor': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), 'updateTo': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), 'platformStatus': ['CLOSED', 'RECEIVING'], 'createFor': '2024-04-01 00:00:00' } # 发起POST请求 response = requests.post('https://api.yicang.com/getFbaShipment', json=params) data = response.json() # 数据清洗与转换 cleaned_data = [] for item in data['results']: cleaned_item = { 'shipment_id': item['shipment_id'], 'updated_at': item['updated_at'].replace('-', '').replace(':', '').replace(' ', ''), # 更多字段映射和转换... } cleaned_data.append(cleaned_item) # 输出清洗后的数据 print(json.dumps(cleaned_data, indent=2)) ``` 通过上述步骤,我们可以成功调用易仓接口获取FBA头程数据,并对其进行初步加工,为后续的数据写入和进一步处理打下坚实基础。在实际应用中,还可以根据具体业务需求进行更复杂的数据处理和逻辑实现。 ![金蝶与WMS系统接口开发配置](https://pic.qeasy.cloud/S16.png~tplv-syqr462i7n-qeasy.image) ### 数据转换与写入金蝶云星空API接口 在数据集成生命周期的第二步,我们需要将已经从源平台获取并清洗过的数据进行ETL转换,使其符合目标平台金蝶云星空API接口的要求,并最终写入目标平台。本文将深入探讨这一过程中的技术细节和实现方法。 #### API接口配置与元数据解析 首先,我们需要理解并配置金蝶云星空API接口的元数据。以下是一个典型的元数据配置示例: ```json { "api": "batchSave", "effect": "EXECUTE", "method": "POST", "idCheck": true, "request": [ { "field": "FID", "label": "单据编号", "type": "string", "describe": "单据编号", "value": "_findCollection find FID from dce1240e-9a5b-38ea-971a-1a29ee6d6713 where F_TLQG_TextHJBH={shipment_id}" }, { "field": "FSTKTRSOUTENTRY", "label": "明细信息", "type": "array", "describe": "明细信息", "value": "products", "children": [ { "field": "FEntryID", "label": "明细行ID", "type": "string", "value": "_findCollection find FSTKTRSOUTENTRY_FEntryID from dce1240e-9a5b-38ea-971a-1a29ee6d6713 where F_TLQG_TextHJBH={shipment_id} FCustMatNo={{products.warehouse_sku}}" }, { "field": "F_TLQG_QtyYCSH", "label": "易仓收货数量", "type": "string", "describe": "物料编码", ... } ] } ], ... } ``` #### 数据请求与清洗 在数据请求与清洗阶段,我们从源平台(如易仓)获取原始数据,并对其进行初步处理和清洗。假设我们已经完成了这一步,现在需要将这些数据转换为金蝶云星空API所能接收的格式。 #### 数据转换逻辑 1. **单据编号 (FID)**: - 使用 `_findCollection` 方法查找对应 `shipment_id` 的单据编号。 - 示例:`_findCollection find FID from dce1240e-9a5b-38ea-971a-1a29ee6d6713 where F_TLQG_TextHJBH={shipment_id}` 2. **明细信息 (FSTKTRSOUTENTRY)**: - 包含多个子字段,如 `FEntryID`, `F_TLQG_QtyYCSH`, `F_TLQG_ComboYWZT`, `F_TLQG_QtyTBSHCY` 等。 - 每个子字段根据不同的逻辑进行处理和转换。 3. **业务状态 (F_TLQG_ComboYWZT)**: - 使用 `_function CASE` 方法根据 `platform_status` 字段值进行条件判断。 - 示例:`_function CASE '{platform_status}' WHEN 'CLOSED' THEN '1' ELSE '3' END` 4. **同步收货差异 (F_TLQG_QtyTBSHCY)**: - 使用 `_function CASE WHEN` 方法计算收货差异并赋值。 - 示例:`_function CASE WHEN '{{products.quantity}}-{{products.had_received_amount}}=0' THEN '3' ELSE '1' END` #### 数据写入操作 配置完成后,通过调用 API 接口将转换后的数据写入到金蝶云星空系统中。以下是关键的 API 调用参数: ```json { ... // Other parameters { field: 'FormId', label: '业务对象表单Id', type: 'string', describe: '必须填写金蝶的表单ID如:PUR_PurchaseOrder', value: 'STK_TRANSFEROUT' }, { field: 'Operation', label: '执行的操作', type: 'string', describe: '执行的操作', value: 'BatchSave' }, ... } ``` 通过 POST 请求方式,将上述配置和处理后的数据发送到金蝶云星空 API 接口,确保每个字段都符合目标平台的要求。 #### 实际应用案例 假设我们有一批从易仓获取的数据,需要将其转换并写入到金蝶云星空系统中。具体步骤如下: 1. **获取原始数据**: ```json { shipment_id: '12345', products: [ { warehouse_sku: 'SKU001', had_received_amount: 100, quantity: 100 }, { warehouse_sku: 'SKU002', had_received_amount: 50, quantity: 60 } ], platform_status: 'CLOSED' } ``` 2. **数据转换**: - 根据上述逻辑,将原始数据转换为符合金蝶云星空 API 要求的格式。 3. **发送请求**: - 将转换后的数据通过 POST 请求发送到金蝶云星空系统,确保成功写入。 通过以上步骤,我们可以高效地完成从源平台到目标平台的数据集成,实现不同系统间的数据无缝对接。 ![金蝶与WMS系统接口开发配置](https://pic.qeasy.cloud/T21.png~tplv-syqr462i7n-qeasy.image)