ETL转换在数据集成中的应用:从吉客云到班牛

  • 轻易云集成顾问-彭亮
### 吉客云·奇门数据集成到班牛:Qeasy1查询吉客云销售单状态回写班牛技术案例 在本次技术案例中,我们将探讨如何实现吉客云·奇门系统与班牛之间的数据集成,具体方案命名为“Qeasy1查询吉客云销售单状态回写班牛”。该集成过程通过轻易云数据集成平台的强大功能进行配置与实施,助力企业高效管理和利用API资产,实现业务流程的顺畅衔接。 我们的主要任务是从吉客云·奇门获取销售单状态,并及时、准确地回写至班牛系统。为了达到这一目标,我们采用了以下核心步骤和关键技术: - **调用吉客云接口**: - 我们使用`jackyun.tradenotsensitiveinfos.list.get`接口来定时抓取订单相关信息。在处理过程中,需要注意分页和限流问题,以保障数据抓取的稳定性和连续性。 - **数据转化和映射**: - 由于两者系统的数据格式存在差异,我们自定义了一套转换逻辑,将获取到的数据适配至班牛所需格式。这一步骤不仅提高了数据处理效率,还确保了信息的准确传递。 - **批量写入到班牛**: - 使用`task.update` API,在批量操作中支持高吞吐量的数据快速写入能力,使得大量订单状态能够迅速同步至班牛,提高整体业务响应速度。 - **监控与异常处理**: - 数据质量监控及异常检测是此方案中的重要环节。通过实时日志记录以及告警机制,保证每一条信息都能正确传输,即使遇到错误也能够自动重试,从而不漏掉任何一笔订单。 结合以上这些关键点,本方案展示了如何利用轻易云平台实现复杂但可靠的数据对接,为企业构建一个透明、高效且可追溯的业务流程提供坚实基础。接下来我们将深入探究每个步骤中的实现细节及技术要点,以便读者能更好地理解并复用这一解决方案。 ![用友与外部系统接口集成开发](https://pic.qeasy.cloud/D36.png~tplv-syqr462i7n-qeasy.image) ### 使用轻易云数据集成平台调用吉客云·奇门接口获取并加工数据 在数据集成的生命周期中,第一步是从源系统获取数据。本文将详细探讨如何使用轻易云数据集成平台调用吉客云·奇门接口`jackyun.tradenotsensitiveinfos.list.get`来获取销售单状态,并对数据进行初步加工。 #### 接口概述 吉客云·奇门接口`jackyun.tradenotsensitiveinfos.list.get`主要用于查询销售单的非敏感信息。该接口采用POST请求方式,支持多种查询条件和分页功能,能够返回指定字段的数据列表。 #### 元数据配置解析 根据提供的元数据配置,我们需要设置以下请求参数: - `modified_begin` 和 `modified_end`: 修改起始和结束时间,必须同时存在且时间间隔不能超过七天。 - `startModified` 和 `endModified`: 最后修改时间的起始和截止。 - `tradeNo`: 销售单号,多个用半角逗号分隔。 - `pageSize`: 每页记录数,默认50,最大1000。 - `pageIndex`: 页码,0为第1页。 - `hasTotal`: 默认返回,首次调用时可以传1获取总记录数。 - `startCreated` 和 `endCreated`: 创建时间的起始和截止。 - `startAuditTime` 和 `endAuditTime`: 审核时间的起始和截止。 - `startConsignTime` 和 `endConsignTime`: 发货时间的起始和截止。这里使用了函数 `_function from_unixtime(({LAST_SYNC_TIME}-86400),'%Y-%m-%d %H:%i:%s')` 和 `_function from_unixtime(({CURRENT_TIME}-86400),'%Y-%m-%d %H:%i:%s')` 来动态计算时间范围。 - `tradeStatus`: 订单状态。 - `tradeType`: 订单类型,默认值为92。 - `sourceTradeNos`: 网店订单号。 - `fields`: 需要返回字段列表,用逗号分隔。 #### 请求示例 以下是一个典型的请求示例: ```json { "api": "jackyun.tradenotsensitiveinfos.list.get", "method": "POST", "params": { "modified_begin": "2023-09-01 00:00:00", "modified_end": "2023-09-07 23:59:59", "tradeNo": "123456,789012", "pageSize": "100", "pageIndex": "0", "hasTotal": "1", "startCreated": "", "endCreated": "", "startAuditTime": "", "endAuditTime": "", "startConsignTime": "_function from_unixtime(({LAST_SYNC_TIME}-86400),'%Y-%m-%d %H:%i:%s')", "endConsignTime": "_function from_unixtime(({CURRENT_TIME}-86400),'%Y-%m-%d %H:%i:%s')", "tradeStatus": "", "tradeType": "92", "sourceTradeNos": "", "fields": "" } } ``` #### 数据清洗与转换 在获取到原始数据后,需要对其进行清洗与转换,以便后续处理。常见的数据清洗操作包括: 1. **去重**:确保没有重复记录。 2. **格式化**:将日期、金额等字段格式化为统一标准。 3. **过滤**:根据业务需求过滤掉不必要的数据。例如,只保留特定状态或类型的订单。 以下是一个简单的数据清洗示例: ```python import pandas as pd # 假设我们已经通过API获取了数据,并存储在data变量中 data = [ {"tradeNo": "123456", "status": 6000, "amount": 100.5, "date_modified": "2023-09-01"}, {"tradeNo": "789012", "status": 9090, "amount": 200.75, "date_modified": ""}, ] # 转换为DataFrame df = pd.DataFrame(data) # 去重 df.drop_duplicates(subset=['tradeNo'], inplace=True) # 格式化日期 df['date_modified'] = pd.to_datetime(df['date_modified'], errors='coerce') # 过滤状态为6000或9090的订单 df = df[df['status'].isin([6000, 9090])] print(df) ``` #### 数据写入 经过清洗与转换后的数据,可以写入目标系统。在轻易云平台上,这一步通常通过配置相应的写入接口来实现。具体操作步骤包括: 1. 配置目标系统的API接口参数。 2. 映射源数据字段到目标系统字段。 3. 执行写入操作,并监控写入结果。 通过以上步骤,我们完成了从吉客云·奇门接口获取销售单状态并进行初步加工的全过程。这不仅提高了数据处理效率,也确保了数据的一致性和准确性。 ![电商OMS与ERP系统接口开发配置](https://pic.qeasy.cloud/S20.png~tplv-syqr462i7n-qeasy.image) ### 数据集成生命周期的ETL转换:从源平台到班牛API接口 在数据集成生命周期中,ETL(Extract, Transform, Load)转换是一个关键步骤。本文将详细探讨如何使用轻易云数据集成平台,将已经集成的源平台数据进行ETL转换,转为目标平台班牛API接口所能够接收的格式,并最终写入目标平台。 #### API接口配置与元数据解析 首先,我们需要理解元数据配置中的各个字段及其含义。以下是我们要处理的元数据配置: ```json { "api": "task.update", "effect": "EXECUTE", "method": "POST", "idCheck": true, "request": [ { "field": "app_id", "label": "小程序id", "type": "int", "value": "21151" }, { "field": "project_id", "label": "群组ID", "type": "int", "value": "77206" }, { "field": "task_id", "label": "工单id", "type": "int", "value": "_mongoQuery 73f95f22-03a2-3f8a-aa21-4c08c541daf4 findField=id where={\"content.77213\":{\"$eq\":\"{onlineTradeNo}\"}}" }, { "field": "contents", "label": "contents", "type": "object", "children": [ { "field": "78538", "label": "销售出库", "type": "int", "value": 78534 } ] } ] } ``` #### 数据请求与清洗 在进行ETL转换之前,首先需要从源系统中提取相关数据并进行清洗。假设我们已经通过轻易云平台完成了这一阶段,并获得了如下结构的数据: ```json { "_id":"5f8d0d55b54764421b7156e8", "_class":"com.qeasy.model.SalesOrderStatusUpdateRequest", "_mongoQuery":{ "_id":"73f95f22-03a2-3f8a-aa21-4c08c541daf4" }, ... } ``` #### 数据转换 接下来,我们需要将这些数据转换为班牛API所需的格式。根据元数据配置,我们需要构造一个POST请求,具体字段如下: 1. `app_id`:固定值21151。 2. `project_id`:固定值77206。 3. `task_id`:通过MongoDB查询获取,其中`onlineTradeNo`作为查询条件。 4. `contents`:包含一个子字段`78538`,其值固定为78534。 具体实现代码示例如下: ```python import requests import json # 定义API URL和Headers api_url = 'https://api.banniu.com/task.update' headers = {'Content-Type': 'application/json'} # 构造请求体 payload = { 'app_id': 21151, 'project_id': 77206, 'task_id': get_task_id(onlineTradeNo), # 假设get_task_id是一个函数,用于根据onlineTradeNo查询task_id 'contents': { '78538': 78534 } } # 将请求体转为JSON格式 payload_json = json.dumps(payload) # 发起POST请求 response = requests.post(api_url, headers=headers, data=payload_json) # 检查响应状态码和内容 if response.status_code == 200: print('Data successfully written to Banniu') else: print(f'Failed to write data: {response.text}') ``` #### 数据写入 上述代码示例展示了如何构造并发送HTTP POST请求,将转换后的数据写入班牛系统。特别注意的是,`task_id`字段需要通过MongoDB查询来动态获取,这里可以利用轻易云提供的查询功能。 例如,通过以下伪代码实现对MongoDB的查询: ```python def get_task_id(onlineTradeNo): query = {"content.77213":{"$eq":"{onlineTradeNo}"}} result = mongo_collection.find_one(query) return result['id'] if result else None ``` 通过这种方式,我们可以确保在每次调用API时都能获取到最新的、正确的`task_id`。 #### 实践中的注意事项 1. **错误处理**:在实际操作中,需要对可能出现的各种错误情况进行处理,例如网络异常、API返回错误等。 2. **日志记录**:建议在每次调用API时记录日志,以便后续追踪和排查问题。 3. **性能优化**:对于大批量的数据处理,可以考虑使用批量操作或异步处理方式,以提高效率。 通过以上步骤,我们成功地将源平台的数据经过ETL转换后,写入到了目标平台班牛系统。这一过程不仅提升了数据处理效率,也确保了数据的一致性和准确性。 ![打通钉钉数据接口](https://pic.qeasy.cloud/T21.png~tplv-syqr462i7n-qeasy.image)