实现互客订单数据ETL转换与写入轻易云平台

  • 轻易云集成顾问-黄宏棵
### 查询互客销售订单——网易互客数据集成到轻易云集成平台案例分享 在本技术案例中,我们将探讨如何高效、可靠地将网易互客的销售订单数据集成到轻易云数据集成平台。具体实施方案命名为"查询互客销售订单"。 针对业务需求,我们首先需要通过调用网易互客的API接口`openapi/trade/searchTrades`获取最新的销售订单数据。此过程面临诸多挑战,包括避免遗漏单据、大量数据快速写入以及处理分页和限流问题。在成功获取并处理这些数据之后,下一步是利用轻易云的数据API进行批量写入操作,确保所有关键信息无缝迁移至目标平台。 为了实现这一整合过程,我们采用了定时任务调度系统来定期触发对网易互客接口的数据抓取。这不仅确保了在每个时间点都能获取最新的数据,还提供了故障重试机制,以防API调用失败。此外,为了解决两者之间可能存在的数据格式差异,我们使用了轻易云支持的定制化数据映射功能,使得从源系统导出的JSON结构能够顺利转换为目标数据库要求的格式。 实时监控也是本次项目的重要组成部分。通过详细记录每一笔交易及其状态变化,并及时捕捉任何异常情况,这样我们可以迅速响应并修复潜在的问题,从而保障整个集成流程的平稳运行。 综上所述,本案例不仅展示了如何有效利用各类API接口完成复杂的数据对接任务,也突显出运用现代工具提升业务透明度和效率的重要性。在下文中,将详细介绍具体实现步骤及相关代码示例。 ![用友与SCM系统接口开发配置](https://pic.qeasy.cloud/D22.png~tplv-syqr462i7n-qeasy.image) ### 调用网易互客接口openapi/trade/searchTrades获取并加工数据 在轻易云数据集成平台的生命周期中,调用源系统接口是数据请求与清洗阶段的关键步骤。本文将详细探讨如何通过调用网易互客接口`openapi/trade/searchTrades`来获取销售订单数据,并进行必要的数据加工。 #### 接口调用配置 首先,我们需要配置接口调用的元数据。以下是具体的配置项: - **API地址**:`openapi/trade/searchTrades` - **请求方法**:`POST` - **主键字段**:`tid` - **ID检查**:启用(`idCheck: true`) 请求参数配置如下: 1. **订单来源**(source):固定值为`2`,表示销售创建。 2. **查询页码**(page):固定值为`1`。 3. **每页展示的订单数量**(pageSize):固定值为`10`。 4. **时间范围限制**(timeRanges):包含以下子参数: - 时间类型(timeType):固定值为`1`,表示下单时间。 - 开始时间(startTime):当前时间减去72000秒。 - 结束时间(endTime):当前时间。 ```json { "source": "2", "page": "1", "pageSize": "10", "timeRanges": [ { "timeType": "1", "startTime": "_function REPLACE(unix_timestamp(current_timestamp(3))-72000,'.','')", "endTime": "_function REPLACE(unix_timestamp(current_timestamp(3)),'.','')" } ] } ``` #### 数据过滤条件 为了确保获取的数据符合业务需求,我们设置了以下过滤条件: - 排除交易状态为4的订单: ```json { "field": "showTradeStatus", "logic": "neq", "value": "4" } ``` #### 数据格式化 在获取到原始数据后,需要对部分字段进行格式化处理。具体包括: - 将字段 `dealTime` 格式化为 `dealTime_new` - 将字段 `createTime` 格式化为 `createTime_new` 格式化规则均为日期时间格式转换。 ```json [ { "old": "dealTime", "new": "dealTime_new", "format": "dateTime" }, { "old": "createTime", "new": "createTime_new", "format": "dateTime" } ] ``` #### 实际操作步骤 1. **构建请求体**:根据上述配置构建请求体,并发送POST请求至网易互客API。 2. **接收响应数据**:解析响应数据,并根据过滤条件筛选出符合要求的记录。 3. **格式化字段**:对筛选后的记录进行字段格式化处理,将 `dealTime` 和 `createTime` 转换为新的日期时间格式字段。 4. **存储与后续处理**:将处理后的数据写入目标系统或用于后续的数据转换与写入阶段。 通过上述步骤,我们可以高效地从网易互客系统中获取并加工销售订单数据,为后续的数据集成工作打下坚实基础。这一过程不仅确保了数据的一致性和准确性,还提升了整体业务流程的透明度和效率。 ![金蝶与MES系统接口开发配置](https://pic.qeasy.cloud/S20.png~tplv-syqr462i7n-qeasy.image) ### 数据请求与清洗 在数据集成的生命周期中,首先需要从源平台获取数据并进行清洗。假设我们已经完成了这一步,现在需要将清洗后的数据进行ETL转换,并写入目标平台。 ### 数据转换与写入 为了将数据转换为目标平台所能接收的格式,我们需要遵循轻易云集成平台API接口的要求。以下是一个具体的技术案例,展示如何使用元数据配置将互客销售订单的数据写入轻易云集成平台。 #### API接口配置 根据提供的元数据配置,我们需要使用`POST`方法调用轻易云集成平台的API接口,并且在写入之前进行ID检查。以下是具体的API接口配置: ```json { "api": "写入空操作", "method": "POST", "idCheck": true } ``` #### 数据转换 在将数据写入目标平台之前,需要确保数据格式符合API接口的要求。以下是一个示例代码,展示如何进行数据转换: ```python import requests import json # 假设我们已经从源平台获取并清洗了以下销售订单数据 source_data = { "order_id": "12345", "customer_name": "张三", "product_id": "67890", "quantity": 2, "price": 100.0 } # 转换为目标平台所需的格式 target_data = { "orderId": source_data["order_id"], "customerName": source_data["customer_name"], "productId": source_data["product_id"], "quantity": source_data["quantity"], "totalPrice": source_data["quantity"] * source_data["price"] } # 将转换后的数据转换为JSON格式 json_data = json.dumps(target_data) # 定义API接口URL和Headers api_url = 'https://api.qingyiyun.com/write' headers = {'Content-Type': 'application/json'} # 发送POST请求,将数据写入目标平台 response = requests.post(api_url, headers=headers, data=json_data) # 检查响应状态码以确认是否成功写入 if response.status_code == 200: print("数据成功写入目标平台") else: print(f"写入失败,状态码: {response.status_code}, 响应内容: {response.text}") ``` #### ID检查 在进行数据写入操作之前,我们需要确保ID检查功能已启用。这可以防止重复的数据写入或ID冲突。以下是一个示例代码,展示如何实现ID检查: ```python def check_id_exists(order_id): # 假设有一个API接口用于检查ID是否存在 check_url = f'https://api.qingyiyun.com/check_id/{order_id}' response = requests.get(check_url) if response.status_code == 200: return response.json().get('exists', False) else: raise Exception(f"ID检查失败,状态码: {response.status_code}") # 在写入之前进行ID检查 if not check_id_exists(source_data["order_id"]): # 如果ID不存在,则执行写入操作 response = requests.post(api_url, headers=headers, data=json_data) if response.status_code == 200: print("数据成功写入目标平台") else: print(f"写入失败,状态码: {response.status_code}, 响应内容: {response.text}") else: print("订单ID已存在,跳过写入操作") ``` 通过上述步骤,我们可以确保从源平台获取的数据经过清洗和转换后,以正确的格式和方式写入目标平台。这不仅提高了数据处理的效率,还保证了数据的一致性和完整性。 ![系统集成平台API接口配置](https://pic.qeasy.cloud/T26.png~tplv-syqr462i7n-qeasy.image)