轻易云平台数据ETL转换与集成实操案例解析

  • 轻易云集成顾问-谢楷斌
### 案例分享:查询旺店通采购订单数据集成到轻易云平台 在本文中,我们将详细解析如何将旺店通·企业奇门的数据无缝集成到轻易云数据集成平台,确保业务操作的高效与准确。本次案例的核心目标是实现对旺店通采购订单(通过接口`wdt.purchase.order.query`)的定时抓取、批量处理和可靠写入,保证各类交易数据不漏单、不丢失,并能够在轻易云平台上进行实时监控。 首先,对于确保采购订单数据不漏单这一关键需求,我们采用了分步执行设计。使用接口`wdt.purchase.order.query`可以定时拉取最新的采购订单信息,这样即便系统存在瞬间网络延迟,也能通过重试机制确保所有数据都能最终写入,不会造成遗漏。同时,通过分页处理机制,有效解决了大量数据拉取时可能碰到的限流问题,每次请求只获取一定数量的数据,从而避免超出API接口调用频率限制。 其次,在处理从旺店通获取的大规模数据信息并快速写入到轻易云平台方面,我们设计了一套优化方案。这包括在接收到原始JSON格式的数据后,利用轻易云提供的数据映射功能,将其转换为适合目标库表结构的一致性格式。面对不同系统之间潜在的数据模式差异,通过自定义字段映射功能,可以灵活应对实际业务中的复杂要求,比如日期格式、数字精度等问题。 最后,为确保整个流程从启动至结束保持一致性和透明度,我们加入了完善的日志记录及异常处理模块。在每一步操作完成后都会生成相应日志,以便未来追踪分析,并针对可能出现的错误情况引入自动重试机制,提高系统稳健性。此外,通过轻易云内置的实时监控工具,可以直观地观察每一个环节,从而及时发现并纠正任何异常情况,极大提升整体作业效率与准确率。 以上就是本篇技术文章开头部分内容。接下来我们将具体描述如何配置相关步骤,实现上述目标功能,包括API调用细节、参数设置以及注意事项等等。 ![用友与外部系统接口集成开发](https://pic.qeasy.cloud/D26.png~tplv-syqr462i7n-qeasy.image) ### 调用旺店通·企业奇门接口wdt.purchase.order.query获取并加工数据 在数据集成的生命周期中,调用源系统接口是至关重要的一步。本文将详细探讨如何使用轻易云数据集成平台调用旺店通·企业奇门接口`wdt.purchase.order.query`,获取并加工采购订单数据。 #### 接口调用配置 首先,我们需要配置元数据,以便正确调用`wdt.purchase.order.query`接口。以下是关键的元数据配置项: - **API**: `wdt.purchase.order.query` - **Method**: `POST` - **Pagination**: 每页返回100条记录 - **ID Check**: 启用ID检查 - **Response Format**: 将返回的字段`check_time`格式化为`check_time_new` #### 请求参数配置 为了确保接口请求的准确性,我们需要配置请求参数。以下是主要的请求参数及其描述: 1. **start_time**(开始时间): 用于指定查询的起始时间,通常使用上次同步时间。 ```json {"field":"start_time","label":"开始时间","type":"string","value":"{{LAST_SYNC_TIME|datetime}}"} ``` 2. **outer_no**(API单号): 外部创建采购单推送的单号,可以不传开始时间和结束时间。 ```json {"field":"outer_no","label":"API单号","type":"string"} ``` 3. **purchase_no**(采购单号): ERP系统采购单编号,可以不传开始时间和结束时间。 ```json {"field":"purchase_no","label":"采购单号","type":"string"} ``` 4. **status**(采购单状态): 采购单状态,如10已取消、20编辑中等,不传默认查询全部状态。 ```json {"field":"status","label":"采购单状态","type":"string"} ``` 5. **warehouse_no**(仓库编码): 仓库编码,用于指定查询特定仓库的采购订单。 ```json {"field":"warehouse_no","label":"仓库编码","type":"string"} ``` 6. **end_time**(结束时间): 查询的结束时间,通常使用当前时间。 ```json {"field":"end_time","label":"结束时间","type":"string","value":"{{CURRENT_TIME|datetime}}"} ``` 7. **page_size**(分页大小): 每页返回的数据条数,默认值为40。 ```json {"field":"page_size","label":"分页大小","type":"string","value":"{PAGINATION_PAGE_SIZE}"} ``` 8. **page_no**(页号): 页码,不传值默认从0页开始。 ```json {"field":"page_no","label":"页号","type":"string","value":"{PAGINATION_START_PAGE}"} ``` #### 数据处理与格式化 在获取到原始数据后,需要对数据进行清洗和格式化。根据元数据配置,我们需要将返回结果中的`check_time`字段格式化为`check_time_new`。 ```json {"old": "check_time", "new": "check_time_new", "format": "date"} ``` 这一步骤确保了数据的一致性和可读性,为后续的数据转换与写入奠定基础。 #### 实际案例应用 假设我们需要查询从上次同步时间到当前时间之间所有状态为“已审核”的采购订单,并且每页返回100条记录。具体的请求配置如下: ```json { "api": "wdt.purchase.order.query", "method": "POST", "request": [ {"field": "start_time", "value": "{{LAST_SYNC_TIME|datetime}}"}, {"field": "end_time", "value": "{{CURRENT_TIME|datetime}}"}, {"field": "status", "value": "40"}, {"field": "page_size", "value": 100}, {"field": "page_no", "value": 0} ] } ``` 通过上述配置,我们可以高效地获取所需的采购订单数据,并进行必要的数据清洗和格式化处理。这一过程不仅提高了数据处理效率,还确保了数据的一致性和准确性,为后续的数据转换与写入提供了可靠保障。 综上所述,通过合理配置元数据和请求参数,我们能够高效地调用旺店通·企业奇门接口获取并加工采购订单数据。这一过程展示了轻易云数据集成平台在处理异构系统间数据集成时的强大能力。 ![如何对接用友BIP接口](https://pic.qeasy.cloud/S18.png~tplv-syqr462i7n-qeasy.image) ### ETL转换与写入目标平台:轻易云集成平台API接口技术案例 在数据集成的生命周期中,ETL(Extract, Transform, Load)过程是至关重要的一环。本文将详细探讨如何将已经集成的源平台数据通过ETL转换为轻易云集成平台API接口所能够接收的格式,并最终写入目标平台。 #### 数据提取与清洗 首先,我们需要从源系统(如旺店通)提取采购订单数据。假设我们已经完成了数据请求与清洗步骤,获得了结构化的采购订单数据。此时的数据可能包含以下字段: - 订单编号(order_number) - 订单ID(order_id) - 供应商名称(supplier_name) #### 数据转换 接下来,我们需要将这些数据转换为轻易云集成平台API接口所能接受的格式。根据元数据配置,我们需要将字段映射到目标平台所需的字段: ```json { "api": "写入空操作", "effect": "EXECUTE", "method": "POST", "number": "number", "id": "id", "name": "编码", "idCheck": true } ``` 从上述元数据配置中可以看出,目标平台要求的数据格式如下: - `number` 对应 `order_number` - `id` 对应 `order_id` - `name` 对应 `supplier_name` 因此,我们需要编写一个转换函数,将源数据映射到目标格式: ```python def transform_data(source_data): transformed_data = [] for record in source_data: transformed_record = { "number": record["order_number"], "id": record["order_id"], "name": record["supplier_name"] } transformed_data.append(transformed_record) return transformed_data ``` #### 数据写入 在完成数据转换后,我们需要将转换后的数据通过轻易云集成平台API接口写入目标系统。根据元数据配置,API调用的方法为POST,且需要进行ID校验。 以下是一个示例代码片段,展示如何使用Python进行API调用: ```python import requests def write_to_target_platform(transformed_data): api_url = 'https://api.qingyiyun.com/write_empty_operation' headers = {'Content-Type': 'application/json'} for record in transformed_data: response = requests.post(api_url, json=record, headers=headers) if response.status_code == 200: print(f"Record {record['id']} written successfully.") else: print(f"Failed to write record {record['id']}. Status code: {response.status_code}") # 示例源数据 source_data = [ {"order_number": "12345", "order_id": "1", "supplier_name": "供应商A"}, {"order_number": "67890", "order_id": "2", "supplier_name": "供应商B"} ] # 转换并写入目标平台 transformed_data = transform_data(source_data) write_to_target_platform(transformed_data) ``` 在这个示例中,`transform_data` 函数将源数据转换为目标格式,而 `write_to_target_platform` 函数则负责通过API接口将这些数据写入目标系统。 #### 注意事项 1. **ID校验**:根据元数据配置中的 `idCheck: true`,我们需要确保每条记录的ID是唯一且有效的。这可以通过在写入前检查ID是否已存在来实现。 2. **错误处理**:在实际操作中,需要对API调用失败的情况进行详细处理,例如重试机制、错误日志记录等,以确保数据完整性和一致性。 3. **性能优化**:对于大规模的数据传输,可以考虑批量处理和并行化,以提高效率。 通过以上步骤,我们成功地将源平台的数据经过ETL转换,并通过轻易云集成平台API接口写入了目标系统。这一过程不仅确保了数据的一致性和完整性,也极大地提升了业务流程的自动化和透明度。 ![如何对接用友BIP接口](https://pic.qeasy.cloud/T10.png~tplv-syqr462i7n-qeasy.image)