ETL在轻易云与用友BIP之间的数据转换与写入

  • 轻易云集成顾问-钟家寿
### 用友BIP数据集成到轻易云平台的技术实践:〇YS现存量查询 在企业信息化系统中,确保各个子系统之间的数据无缝对接是保障业务流程畅通的关键环节。本案例分享的是如何通过轻易云数据集成平台,实现用友BIP中的现存量查询(〇YS现存量查询)数据与其他业务系统进行高效、可靠地对接。 首先,我们需要从用友BIP接口获取到指定条件下的库存数据。API路径为`/yonbip/scm/stock/QueryCurrentStocksByCondition`,这个接口允许我们根据特定的条件筛选出当前库存情况。处理这个API时,需要特别注意分页和限流的问题。在实际应用中,一个合理的分页机制不仅能提升性能,还可以避免单次请求返回过多数据导致超时或失败。同时,为了确保每轮抓取操作不漏单,我们设置了详细的数据抓取日志,并结合错误重试机制来处理网络波动或系统异常带来的问题。 为了将这些大量的数据快速、安全地写入到轻易云集成平台,我们采用批量处理的方法,通过分割大批次的数据包减少单次提交体积,并执行定制化的数据映射策略,以解决两者间可能存在的数据格式差异。这不仅提高了传输效率,还减小了错误率。当然,在整个过程中实时监控和记录日志也不可缺少,这为我们提供及时反馈,以便更快响应任何潜在问题。 最后,将经过清洗和转换后的数据,通过轻易云提出的一些创新方法,如自动错误重试等机制保证最终写入成功。通过这种方式,不仅实现用友BIP与轻易云集成平台之间有效、高效且透明的数据流转,也为后续更多复杂场景下的对接打下坚实基础。 ![用友与SCM系统接口开发配置](https://pic.qeasy.cloud/D3.png~tplv-syqr462i7n-qeasy.image) ### 调用用友BIP接口获取并加工数据的技术案例 在轻易云数据集成平台中,调用源系统用友BIP接口`/yonbip/scm/stock/QueryCurrentStocksByCondition`是实现数据集成生命周期的第一步。本文将详细探讨如何配置和使用该接口获取并加工数据。 #### 接口调用配置 首先,我们需要配置API接口的元数据,以便正确调用和处理返回的数据。以下是关键的元数据配置: - **API路径**: `/yonbip/scm/stock/QueryCurrentStocksByCondition` - **请求方法**: `POST` - **请求字段**: - `org`: 组织id,例如:`1891127153005312` - `warehouse`: 仓库ID,例如:`1875184118731008` - `productsku`: 商品SKUid,例如:`1921567767616256` - `batchno`: 批次号,例如:`QTRK0803B200` - `product`: 商品id,例如:`1921567765125888` - `product.oUnitId`: 主计量,例如:`1921567765125888` - `stockStatusDoc`: 库存状态,例如:`2006555827382257` - `store`: 门店id,例如:`1861030949130496` - `bStockStatusDocNotNull`: 是否库存状态非空,值为`true`时,查询结果里`stockStatusDoc`值都是大于0 - `billnum`: 可用量规则分配中的单据类型,例如销售订单:`voucher_order` #### 请求示例 为了更好地理解请求过程,以下是一个具体的请求示例: ```json { "org": "1891127153005312", "warehouse": "1875184118731008", "productsku": "1921567767616256", "batchno": "QTRK0803B200", "product": "1921567765125888", "product.oUnitId": "1921567765125888", "stockStatusDoc": "2006555827382257", "store": "1861030949130496", "bStockStatusDocNotNull": true, "billnum": "voucher_order" } ``` #### 数据格式化与转换 在接收到API响应后,需要对数据进行格式化和转换。根据元数据配置中的formatResponse字段,我们需要将原始字段名转换为新的字段名,并确保其格式正确。 - **原始字段**: - `product` - `productsku` - `warehouse` - **新字段**: - `new_product` - `new_productsku` - `new_warehouse` - **格式**: 字符串(string) #### 响应处理示例 假设我们从API得到了如下响应: ```json { "data": [ { "product": "产品A", "productsku": "SKU12345", "warehouse": "仓库1" }, { "product": "产品B", "productsku": "SKU67890", "warehouse": "仓库2" } ] } ``` 我们需要将其转换为以下格式: ```json { "data": [ { "new_product": "产品A", "new_productsku": "SKU12345", "new_warehouse": "仓库1" }, { "new_product": "产品B", "new_productsku": "SKU67890", "new_warehouse": "仓库2" } ] } ``` #### 实现步骤 1. **发送请求**: 使用POST方法向API发送请求,携带必要的参数。 2. **接收响应**: 获取API返回的数据。 3. **格式化数据**: 根据formatResponse字段,将原始字段名转换为新的字段名。 4. **输出结果**: 将格式化后的数据输出到下一个处理环节或系统。 通过上述步骤,我们能够高效地调用用友BIP接口获取所需的数据,并进行必要的加工和转换,为后续的数据处理和写入打下坚实基础。这一过程不仅提高了数据集成的透明度和效率,也确保了各个系统间的数据一致性和准确性。 ![金蝶云星空API接口配置](https://pic.qeasy.cloud/S1.png~tplv-syqr462i7n-qeasy.image) ### 数据转换与写入轻易云集成平台API接口的技术案例 在数据集成生命周期中,数据转换与写入是关键步骤之一。本文将详细探讨如何将已经集成的源平台数据进行ETL转换,转为目标平台轻易云集成平台API接口所能够接收的格式,并最终写入目标平台。 #### 数据请求与清洗 在开始ETL转换之前,首先需要完成数据请求与清洗。这一步骤确保从源系统获取的数据是准确且符合预期的。假设我们已经完成了这一阶段,并且手头有清洗后的数据。 #### 数据转换 在数据转换阶段,我们需要将源平台的数据格式转化为目标平台所能接受的格式。以下是一个示例代码片段,用于将源数据转换为目标API接口所需的JSON格式: ```python import json # 假设这是从源系统获取并清洗后的数据 source_data = { "id": "12345", "name": "Sample Data", "value": 100 } # 转换为目标API接口所需的格式 transformed_data = { "operation": "write", "data": { "record_id": source_data["id"], "record_name": source_data["name"], "record_value": source_data["value"] } } # 转换为JSON字符串 json_payload = json.dumps(transformed_data) ``` 在这个示例中,我们将源数据中的`id`、`name`和`value`字段映射到目标API接口所需的`record_id`、`record_name`和`record_value`字段。 #### 数据写入 接下来,我们需要将转换后的数据通过API接口写入到目标平台。根据元数据配置,我们使用POST方法,并且需要进行ID检查。以下是一个示例代码片段,用于执行这个操作: ```python import requests # API配置 api_url = "https://api.qingyiyun.com/write" headers = { "Content-Type": "application/json" } # 检查ID是否存在(假设有一个检查ID存在性的API) def check_id_exists(record_id): check_url = f"https://api.qingyiyun.com/check/{record_id}" response = requests.get(check_url) return response.status_code == 200 # 写入操作 def write_data(payload): response = requests.post(api_url, headers=headers, data=payload) if response.status_code == 200: print("Data written successfully") else: print(f"Failed to write data: {response.text}") # 执行ID检查和写入操作 if check_id_exists(transformed_data["data"]["record_id"]): write_data(json_payload) else: print("Record ID does not exist") ``` 在这个示例中,我们首先定义了一个函数`check_id_exists`来检查记录ID是否存在。如果存在,则调用`write_data`函数将转换后的JSON数据写入到目标平台。 #### 元数据配置应用 根据提供的元数据配置: ```json { "api": "写入空操作", "method": "POST", "idCheck": true } ``` 我们可以看到,API方法是POST,并且需要进行ID检查。这正好与我们的示例代码相匹配。在实际应用中,可以根据具体需求调整API URL和其他细节。 #### 总结 通过以上步骤,我们成功地将源平台的数据进行了ETL转换,并通过轻易云集成平台的API接口实现了数据写入。关键点在于正确理解和应用元数据配置,确保每个环节都按照预期进行。这种方法不仅提高了数据处理效率,还保证了业务流程的透明度和可追溯性。 ![用友与WMS系统接口开发配置](https://pic.qeasy.cloud/T5.png~tplv-syqr462i7n-qeasy.image)