轻易云平台与旺店通API的数据集成与ETL

  • 轻易云集成顾问-叶威宏
### 用友BIP数据集成到旺店通·企业奇门:物料集成同步-v案例分享 在系统对接和数据集成的实际应用中,透明、高效的数据流转至关重要。本文将聚焦于一个具体的技术案例,即用友BIP的数据集成到旺店通·企业奇门,实现“物料集成同步-v”方案。 首先,我们需要从用友BIP获取产品信息。这一步通过调用`/yonbip/digitalModel/product/list` API接口来完成,该接口能够高效地提取所需的数据。然而,在这一过程中,我们面临着如何处理分页、限流等问题,以确保稳定性与可靠性的挑战。 针对这些技术难点,本次方案特别设计了定时抓取机制,通过批量请求方式避免漏单情况,同时支持自定义数据转换逻辑,以适应不同系统之间的数据结构差异。此外,为了更好地监控与管理整个流程,还引入了集中监控和告警系统,实时跟踪数据集成任务的状态,并提供异常检测和错误重试机制,有效提升操作透明度和及时性。 下一步是将从用友BIP获取的大量产品信息快速写入到旺店通·企业奇门。我们使用的是`wdt.goods.push` API接口,这一过程得益于平台强大的高吞吐量支持,使得大规模数据能够迅速、准确地进入目标系统。同时,为保障最终的数据一致性,与业务需求紧密结合进行定制化映射,也是关键的一环。 本文将在下文详细介绍每个步骤中的技术实现细节及遇到的问题解决方法,希望能为您的项目提供实质性的参考与借鉴。如果您有类似的需求或面临相似的问题,不妨继续阅读以了解更多细节。 ![用友与外部系统接口集成开发](https://pic.qeasy.cloud/D24.png~tplv-syqr462i7n-qeasy.image) ### 用友BIP接口调用与数据加工技术案例 在数据集成生命周期的第一步,我们需要从用友BIP系统中获取物料数据,并对其进行初步加工。本文将详细介绍如何通过调用`/yonbip/digitalModel/product/list`接口获取数据,并进行必要的处理和转换。 #### 接口调用配置 首先,我们需要配置API接口的调用参数。根据元数据配置,以下是我们需要发送的POST请求参数: ```json { "pageIndex": "1", "pageSize": "500", "code": "", "name": "", "modelDescription": "", "manageClass": "", "productClass": "", "productTemplate": "", "model": "", "simple": { "pubts": "{{LAST_SYNC_TIME|datetime}}" } } ``` 这些参数中,`pageIndex`和`pageSize`用于分页控制,默认值分别为1和500。其他字段如`code`、`name`等可以根据具体需求进行过滤查询。 #### 数据请求与清洗 在成功获取到原始数据后,我们需要对数据进行清洗和格式化。根据元数据配置中的`formatResponse`部分,我们需要将返回的数据字段`unit`重命名为`unit_new`,并确保其类型为字符串。 假设我们收到的原始响应数据如下: ```json { "data": [ { "id": "123", "code": "A001", "name": "物料A", "unit": 10, // ...其他字段 }, { "id": "124", "code": "A002", "name": "物料B", "unit": 20, // ...其他字段 } ] } ``` 我们需要对其进行处理,使其符合目标格式: ```python import json # 假设response是从API获取的原始响应 response = ''' { "data": [ {"id":"123", "code":"A001", "name":"物料A", "unit":"10"}, {"id":"124", "code":"A002", "name":"物料B", "unit":"20"} ] } ''' # 将字符串转换为字典 response_dict = json.loads(response) # 遍历并处理每个物料条目 for item in response_dict['data']: # 重命名字段 item['unit_new'] = str(item.pop('unit')) # 转换后的结果 print(json.dumps(response_dict, indent=2, ensure_ascii=False)) ``` 输出结果将会是: ```json { "data": [ { "id": "123", "code": "A001", "name": "物料A", "unit_new": "10" }, { "id": "124", "code": "A002", "name": "物料B", "unit_new": "20" } ] } ``` #### 数据转换与写入 在完成数据清洗后,下一步是将这些数据转换为目标系统所需的格式,并写入到目标数据库或系统中。这一步通常包括以下操作: 1. **字段映射**:将源系统中的字段映射到目标系统中的相应字段。 2. **类型转换**:确保所有字段的数据类型符合目标系统的要求。 3. **批量写入**:使用批量操作提高写入效率。 例如,如果目标系统要求的数据格式如下: ```json { "_id": "", "_number": "", "_name":"", "_unit_new":"" } ``` 我们可以通过以下代码实现映射和写入: ```python # 假设目标系统的数据结构如下: target_data = [] for item in response_dict['data']: target_item = { "_id": item["id"], "_number": item["code"], "_name": item["name"], "_unit_new": item["unit_new"] } target_data.append(target_item) # 输出结果以供验证或进一步处理 print(json.dumps(target_data, indent=2, ensure_ascii=False)) ``` 输出结果将会是: ```json [ { "_id": '123', "_number': 'A001', "_name': '物料A', "_unit_new': '10' }, { "_id': '124', "_number': 'A002', "_name': '物料B', "_unit_new': '20' } ] ``` 通过上述步骤,我们完成了从用友BIP接口获取数据并进行初步加工的全过程。这些技术细节确保了数据在集成过程中的准确性和一致性,为后续的数据转换与写入奠定了基础。 ![系统集成平台API接口配置](https://pic.qeasy.cloud/S23.png~tplv-syqr462i7n-qeasy.image) ### 使用轻易云数据集成平台进行ETL转换并写入旺店通·企业奇门API接口 在数据集成的生命周期中,第二步是将已经集成的源平台数据进行ETL(提取、转换、加载)转换,并最终写入目标平台。在本文中,我们将深入探讨如何使用轻易云数据集成平台,将源数据转换为旺店通·企业奇门API接口所能够接收的格式,并成功写入目标平台。 #### 1. 数据提取与清洗 在数据集成过程中,首先需要从源系统中提取原始数据。假设我们已经完成了这一阶段,获得了结构化的原始数据。接下来,需要对这些数据进行清洗,以确保其质量和一致性。这一步通常包括去除重复项、处理缺失值以及标准化字段格式等操作。 ```json { "source_data": [ {"item_id": "123", "name": "产品A", "price": 100, "quantity": 10}, {"item_id": "124", "name": "产品B", "price": 150, "quantity": 5} ] } ``` #### 2. 数据转换 在清洗完毕后,下一步是将这些数据转换为目标平台所需的格式。根据元数据配置,我们需要将数据转换为旺店通·企业奇门API接口`wdt.goods.push`能够接收的格式,并通过POST方法发送请求。 ##### 配置元数据 根据提供的元数据配置,我们需要确保以下几点: - API接口名称:`wdt.goods.push` - 请求方法:`POST` - ID检查:`true` ##### 转换逻辑 我们需要编写一个转换逻辑,将源数据字段映射到目标API接口所需的字段。例如: ```json { "api_name": "wdt.goods.push", "method": "POST", "data": [ { "goods_no": "123", "goods_name": "产品A", "price": 100, "stock_qty": 10 }, { "goods_no": "124", "goods_name": "产品B", "price": 150, "stock_qty": 5 } ] } ``` 在这个例子中,`item_id`被映射为`goods_no`,`name`被映射为`goods_name`,而价格和数量字段则保持不变。 #### 3. 数据写入 完成数据转换后,我们需要将这些数据通过POST请求发送到旺店通·企业奇门API接口。以下是一个示例代码片段,用于发送HTTP POST请求: ```python import requests import json url = 'https://api.wangdian.cn/openapi2/goods_push.php' headers = {'Content-Type': 'application/json'} payload = { 'api_name': 'wdt.goods.push', 'method': 'POST', 'data': [ { 'goods_no': '123', 'goods_name': '产品A', 'price': 100, 'stock_qty': 10 }, { 'goods_no': '124', 'goods_name': '产品B', 'price': 150, 'stock_qty': 5 } ] } response = requests.post(url, headers=headers, data=json.dumps(payload)) print(response.json()) ``` 在这个代码片段中,我们使用Python的requests库来发送HTTP POST请求。请求头部设置为JSON格式,并且将转换后的payload作为请求体发送到指定URL。 #### 实时监控与错误处理 在实际操作过程中,实时监控和错误处理是确保数据准确性的关键步骤。轻易云平台提供了实时监控功能,可以随时查看每个环节的数据流动和处理状态。如果出现错误,可以及时捕获并进行相应处理,例如重新发送请求或记录日志以便后续分析。 ```python if response.status_code == 200: print("Data successfully pushed to Wangdian API") else: print(f"Failed to push data: {response.status_code}, {response.text}") ``` 通过以上技术步骤,我们成功地将源平台的数据进行了ETL转换,并最终写入到了旺店通·企业奇门API接口。这不仅提高了业务流程的自动化程度,还确保了数据的一致性和准确性。 ![如何对接用友BIP接口](https://pic.qeasy.cloud/T8.png~tplv-syqr462i7n-qeasy.image)