实现自动化ETL:从聚水潭到BI智选的售后单数据同步

  • 轻易云集成顾问-彭亮
### 聚水潭-售后单-->BI智选-售后表:轻易云数据集成的技术实现 在现代企业的数据管理中,确保高效、准确地对接各系统之间的数据是至关重要的任务。本文将聚焦于一个具体案例:如何通过轻易云数据集成平台,将聚水潭·奇门的售后单数据高效地集成到MySQL数据库中的BI智选售后表。本方案得名为“聚水潭-售后单-->BI智选-售后表”,具体内容如下。 首先,我们需要调用聚水潭·奇门提供的API接口`jushuitan.refund.list.query`来获取待处理的售后单数据。该接口支持批量返回满足条件的退款列表,并可根据业务需求设定查询参数,如时间范围、状态等,以便精确抓取所需数据。 为了确保不漏掉任何一条记录,需要特别注意分页和限流问题。在对接过程中,通过分段抓取和合理设置请求间隔,可以有效避免因超出接口调用限制而导致的数据丢失。此外,为了减少重复读取带来的性能损耗,同时保证每次抓取都是最新数据,我们会利用增量更新机制,即仅提取自上次成功写入后的新增或变更记录。 然后,对从聚水潭·奇门获取到的数据进行必要的数据转换。由于两套系统可能存在字段格式与结构上的差异,通过自定义转换逻辑,可以将源端数据调整为目标端可以接受并正确解析的格式,从而保证最终存储在MySQL数据库中的信息完整且无误。 完成初步清洗与转化之后,采用高吞吐量写入能力,将大批量处理好的曲面镇.别错个股好以合适 t)****b****z**B*DDAF配继续有**版本)。7******* SYSTEM BY** 至此,这样***shift3(4点+1*40)+WER)*ZRt-mysql_BI().最终保障了海量**AP@验)(mysql_39允汇voL'])==k{0这是一个极其复杂.txt(-data))©BATCHEXECUTE()}指 ``` ![如何开发钉钉API接口](https://pic.qeasy.cloud/D38.png~tplv-syqr462i7n-qeasy.image) ### 调用聚水潭·奇门接口获取并加工数据 在数据集成的生命周期中,调用源系统接口是至关重要的一步。本文将详细探讨如何通过轻易云数据集成平台调用聚水潭·奇门接口`jushuitan.refund.list.query`,并对获取的数据进行初步加工处理。 #### 接口调用配置 首先,我们需要配置元数据以便正确调用`jushuitan.refund.list.query`接口。以下是元数据配置的详细说明: ```json { "api": "jushuitan.refund.list.query", "effect": "QUERY", "method": "POST", "number": "as_id", "id": "as_id", "name": "as_id", "request": [ {"field": "page_index", "label": "页码", "type": "int", "describe": "页码", "value": "1"}, {"field": "page_size", "label": "页数", "type": "int", "describe": "页数", "value": "50"}, {"field": "start_time", "label": "修改起始时间", "type": "datetime", "describe": "开始时间", "value":"{{LAST_SYNC_TIME|datetime}}"}, {"field": "end_time", "label":"修改结束时间","type":"datetime","describe":"结束时间","value":"{{CURRENT_TIME|datetime}}"}, {"field":"so_ids","label":"线上单号列表","type":"string","describe":"线上单号列表"}, {"field":"date_type","label":"时间类型","type":"string","describe":"时间类型"}, {"field":"status","label":"售后单状态","type":"string","describe":"售后单状态"}, {"field":"good_status","label":"货物状态","type":"string","describe": "BUYER_NOT_RECEIVED:买家未收到货,BUYER_RECEIVED:买家已收到货,BUYER_RETURNED_GOODS:买家已退货,SELLER_RECEIVED:卖家已收到退货"}, {"field":"type","label":"售后类型","type":"string","describe": "普通退货,其它,拒收退货,仅退款,投诉,补发,换货,维修"} ], ... } ``` #### 请求参数详解 - `page_index` 和 `page_size`:用于分页请求,默认值分别为1和50。 - `start_time` 和 `end_time`:用于指定查询的时间范围,这两个参数使用动态变量分别代表上次同步时间和当前时间。 - `so_ids`、`date_type`、`status`、`good_status` 和 `type`:这些字段用于进一步过滤查询结果,例如根据售后单状态、货物状态等进行筛选。 #### 数据请求与清洗 在完成元数据配置后,我们通过轻易云平台发起POST请求,从聚水潭·奇门系统获取售后单列表。以下是一个示例请求: ```json { "page_index": 1, "page_size": 50, ... } ``` 请求返回的数据通常包含多个字段,为了便于后续处理,我们需要对这些数据进行清洗和转换。例如,可以通过以下步骤进行初步清洗: 1. **过滤无效数据**:移除空值或不符合业务逻辑的数据。 2. **字段映射**:将原始字段名映射为目标系统所需的字段名。 3. **格式转换**:例如,将日期字符串转换为标准日期格式。 #### 数据转换与写入 经过清洗后的数据需要进一步转换,以适应目标系统(如BI智选)的需求。可以使用轻易云平台提供的可视化工具进行字段映射和格式转换。例如,将原始JSON结构中的某些嵌套字段提取出来,并重组为新的结构。 最后,将处理好的数据写入目标系统。在此过程中,可以设置定时任务(如每天凌晨1点执行),确保数据同步的及时性和准确性。 ```json { ... // 定时任务配置 { crontab: '2 1 * * *', takeOverRequest: [ { field: 'start_time', value: '{{DAYS_AGO_1|datetime}}', type: 'datetime', label: '接管字段', formModel: { enable: false }, tableModel: { enable: false }, physicalModel: { enable: false } } ] } } ``` 通过上述步骤,我们实现了从聚水潭·奇门系统到BI智选系统的数据集成。这不仅提高了数据处理的效率,还确保了数据的一致性和准确性。 ![钉钉与ERP系统接口开发配置](https://pic.qeasy.cloud/S14.png~tplv-syqr462i7n-qeasy.image) ### 数据集成生命周期中的ETL转换与写入 在数据集成的生命周期中,将源平台的数据转换为目标平台所能接收的格式并写入,是一个关键步骤。本文将深入探讨如何使用轻易云数据集成平台,将聚水潭售后单数据转换并写入到BI智选的MySQL数据库中。 #### 元数据配置解析 首先,我们需要理解元数据配置,这些配置定义了如何将源数据字段映射到目标数据库字段。以下是元数据配置的详细解析: ```json { "api": "batchexecute", "effect": "EXECUTE", "method": "SQL", "number": "id", "id": "id", "name": "id", "idCheck": true, "request": [ {"field":"id","label":"主键","type":"string","value":"{as_id}-{items_asi_id}"}, {"field":"as_id","label":"售后单号","type":"string","value":"{as_id}"}, {"field":"as_date","label":"申请时间","type":"string","value":"{as_date}"}, {"field":"outer_as_id","label":"外部售后单号","type":"string","value":"{outer_as_id}"}, {"field":"so_id","label":"原始线上单号","type":"string","value":"{so_id}"}, {"field":"type","label":"售后类型","type":"string","describe":"普通退货,其它,拒收退货,仅退款,投诉,补发,换货","value":"{type}"}, {"field":"modified","label":"最后更新时间","type":"string","value":"{modified}"}, {"field":"status","label":"状态","type":"string","describe":"待确认:WaitConfirm;已确认:Confirmed;已取消:Cancelled;","value":"{status}"}, {"field":"remark","label":"备注","type":"string","value":"{remark}"}, {"field":"question_type","label":"问题类型","type":"string","value":"{question_type}"}, {"field":"warehouse","label":"仓库","type":"string","value":"{warehouse}"}, {"field":...} ], "otherRequest": [ { "field": "main_sql", "label": "主语句", "type": "string", "describe": "SQL首次执行的语句,将会返回:lastInsertId", "value": "REPLACE INTO refund_list_query(id, as_id, as_date, outer_as_id, so_id, type, modified, status, remark, question_type, warehouse, refund, payment, good_status, shop_buyer_id, shop_id, logistics_company, l_id, o_id, order_status, drp_co_id_to, wh_id, drp_co_id_from, node, wms_co_id, shop_status,..." }, { "field": "limit", "label": "limit", "type": "string", "value": "1000" } ] } ``` #### 数据请求与清洗 在数据请求阶段,我们从聚水潭系统获取售后单数据。该阶段主要涉及API调用和初步的数据清洗,以确保数据的完整性和准确性。 #### 数据转换与写入 接下来是关键的ETL(Extract-Transform-Load)过程: 1. **提取(Extract)**:从聚水潭系统提取原始售后单数据。 2. **转换(Transform)**:根据元数据配置,将提取的数据进行格式化和转换。例如: - 将`as_id`和`items_asi_id`组合生成主键`id`。 - 将日期字段格式化为目标平台所需的标准格式。 - 根据描述信息,将状态码转换为相应的文本描述。 3. **加载(Load)**:将转换后的数据通过SQL语句写入到BI智选的MySQL数据库中。 以下是具体实现步骤: ##### 提取与初步清洗 ```python import requests import json # 从聚水潭系统提取售后单数据 response = requests.get('https://api.jushuitan.com/after_sales') data = response.json() # 初步清洗 cleaned_data = [] for record in data: cleaned_record = { 'id': f"{record['as_id']}-{record['items_asi_id']}", 'as_id': record['as_id'], 'as_date': record['as_date'], 'outer_as_id': record['outer_as_id'], 'so_id': record['so_id'], # ...其他字段 } cleaned_data.append(cleaned_record) ``` ##### 数据转换与写入 ```python import mysql.connector # MySQL数据库连接配置 config = { 'user': 'username', 'password': 'password', 'host': '127.0.0.1', 'database': 'bi_database' } # 建立数据库连接 cnx = mysql.connector.connect(**config) cursor = cnx.cursor() # SQL插入语句模板 insert_stmt = ( """REPLACE INTO refund_list_query (id, as_id, as_date, outer_as_id, so_id, type, modified,status,...) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,...) """ ) # 执行插入操作 for record in cleaned_data: cursor.execute(insert_stmt, (record['id'], record['as_id'], record['as_date'], record['outer_as_id'], record['so_id'], # ...其他字段值 )) # 提交事务 cnx.commit() # 关闭连接 cursor.close() cnx.close() ``` #### 总结 通过上述步骤,我们成功地将聚水潭售后单的数据提取、转换并写入到BI智选的MySQL数据库中。这一过程不仅确保了数据的一致性和准确性,还提升了业务处理效率。使用轻易云数据集成平台,可以简化这一复杂过程,使得不同系统间的数据无缝对接成为可能。 ![打通用友BIP数据接口](https://pic.qeasy.cloud/T2.png~tplv-syqr462i7n-qeasy.image)