ETL转换与数据写入:将采购退货单数据导入MySQL的最佳实践

  • 轻易云集成顾问-曹润
### 技术案例分享:聚水潭数据集成到MySQL 在本技术案例中,我们将展示如何成功实现从聚水潭的采购退货单数据,到BI初本中的MySQL采购退货表的高效集成。具体方案名称为“聚水潭-采购退货单-->BI初本-采购退货表_copy”,此方案通过轻易云数据集成平台得以顺利实施,实现了多个关键技术点。 首先,针对如何调用聚水潭接口`/open/purchaseout/query`进行定时可靠的数据抓取,是整个项目中的重要环节之一。我们运用了平台自带的调度工具,确保能够按照预设时间间隔准确无误地获取最新的采购退货单数据。同时,通过对分页和限流问题进行解决,以避免因大批量请求而导致API性能下降或请求失败。 其次,为实现大量数据快速写入到MySQL数据库,我们借助了平台支持高吞吐量的数据写入特性。在经过处理后的数据,会通过一个名为`batchexecute`的API接口,以批量模式高效插入至MySQL。这不仅提升了整体操作速度,还减少了网络传输过程中的开销。 此外,在实时监控和日志记录方面,提供了一套集中化的监控告警系统,使得每个任务执行状态都能被及时追踪。如遇异常情况,可迅速响应并采取补救措施,包括利用错误重试机制重新尝试未成功的数据导入,大幅提高集成任务的可靠性。 当然,对于不同系统之间的数据格式差异也进行了专门处理,通过可视化的数据流设计工具,自定义转换逻辑,使得原始数据与目标表结构完全匹配。不仅如此,还设置了严格的数据质量监控和异常检测机制,以保证最终进入MySQL数据库的信息完整且准确无误。 接下来部分将详细描述各个步骤中涉及到的方法、代码实例以及注意事项等内容。 ![金蝶与外部系统打通接口](https://pic.qeasy.cloud/D11.png~tplv-syqr462i7n-qeasy.image) ### 调用聚水潭接口/open/purchaseout/query获取并加工数据的技术案例 在数据集成过程中,调用源系统的API接口是至关重要的一步。本文将详细探讨如何通过轻易云数据集成平台调用聚水潭的采购退货单查询接口`/open/purchaseout/query`,并对获取的数据进行初步加工。 #### API接口配置与调用 首先,我们需要配置和调用聚水潭的API接口。根据提供的元数据配置,API接口的基本信息如下: - **API路径**: `/open/purchaseout/query` - **请求方法**: `POST` - **主要字段**: - `page_index`: 第几页,从第一页开始,默认1 - `page_size`: 每页多少条,默认30,最大50 - `modified_begin`: 修改起始时间 - `modified_end`: 修改结束时间 - `so_ids`: 指定线上订单号 - `status`: 单据状态(Confirmed=生效,WaitConfirm待审核,Creating=草拟,Archive=归档,Cancelled=作废) - `io_ids`: 采购退货单号列表(最大30) #### 请求参数设置 在实际调用中,我们需要根据业务需求设置请求参数。以下是一个典型的请求参数示例: ```json { "page_index": "1", "page_size": "30", "modified_begin": "{{LAST_SYNC_TIME|datetime}}", "modified_end": "{{CURRENT_TIME|datetime}}", "status": "Confirmed" } ``` 其中,`modified_begin`和`modified_end`使用了模板变量来动态填充同步时间,这样可以确保每次请求的数据都是最新的。 #### 数据清洗与转换 获取到数据后,需要对数据进行清洗和转换,以便后续处理。以下是一个简单的数据清洗和转换示例: 1. **数据去重**: 根据`io_id`字段去重,确保每条记录唯一。 2. **字段平铺**: 将嵌套结构中的`items`字段平铺到主记录中,以便于后续处理。 示例代码如下: ```python import requests import json # 配置API请求参数 url = "https://api.jushuitan.com/open/purchaseout/query" headers = {"Content-Type": "application/json"} payload = { "page_index": "1", "page_size": "30", "modified_begin": "{{LAST_SYNC_TIME|datetime}}", "modified_end": "{{CURRENT_TIME|datetime}}", "status": "Confirmed" } # 发起API请求 response = requests.post(url, headers=headers, data=json.dumps(payload)) data = response.json() # 数据去重 unique_data = {item['io_id']: item for item in data['items']}.values() # 字段平铺 flattened_data = [] for item in unique_data: for sub_item in item.get('items', []): flat_record = {**item, **sub_item} flattened_data.append(flat_record) # 输出处理后的数据 print(json.dumps(flattened_data, indent=2)) ``` #### 自动填充响应与实时监控 轻易云平台支持自动填充响应和实时监控功能,这使得我们可以在平台上直接查看每个环节的数据流动和处理状态。通过配置`autoFillResponse: true`和`beatFlat: ["items"]`选项,可以自动将嵌套结构中的子项平铺到主记录中,大大简化了数据处理流程。 #### 总结 通过以上步骤,我们成功实现了从聚水潭系统获取采购退货单数据,并对其进行初步加工。这一过程展示了如何利用轻易云数据集成平台高效地完成数据请求、清洗与转换,为后续的数据分析和业务决策提供了可靠的数据基础。 ![金蝶云星空API接口配置](https://pic.qeasy.cloud/S14.png~tplv-syqr462i7n-qeasy.image) ### 数据集成与ETL转换:实现采购退货单数据写入MySQL 在数据集成生命周期的第二阶段,我们将聚水潭平台的采购退货单数据转换为BI初本-采购退货表_copy格式,并通过MySQL API接口将其写入目标平台。本文将详细探讨如何利用轻易云数据集成平台进行ETL(提取、转换、加载)操作,确保数据准确无误地传输到目标数据库。 #### 配置元数据 首先,我们需要配置元数据,以便正确映射源平台的数据字段到目标平台的字段。以下是我们使用的元数据配置: ```json { "api": "batchexecute", "effect": "EXECUTE", "method": "SQL", "number": "id", "id": "id", "name": "id", "idCheck": true, "request": [ {"field":"id","label":"主键","type":"string","value":"{io_id}-{items_ioi_id}"}, {"field":"io_id","label":"退货单号","type":"string","value":"{io_id}"}, {"field":"io_date","label":"退货日期","type":"string","value":"{io_date}"}, {"field":"status","label":"状态","type":"string","describe":"Confirmed:生效,WaitConfirm:待审核,Creating:草拟,Cancelled:作废,OuterConfirming:外部确认中,Delete:取消","value":"{status}"}, {"field":"so_id","label":"线上单号","type":"string","value":"{so_id}"}, {"field":"f_status","label":"财务状态","type":"string","describe":"WaitConfirm=待审核,Confirmed=待审核","value":"{f_status}"}, {"field":"warehouse","label":"仓库名","type":"string","value":"{warehouse}"}, {"field":"receiver_name","label":"收货人/供应商名称","type":"string","value":"{receiver_name}"}, {"field":"receiver_mobile","label":"收货电话","type":"string","value":"{receiver_mobile}"}, {"field":"receiver_state","label":"收件人省","type":"string","value":"{receiver_state}"}, {"field":"receiver_city","label":"收件人市","type":""}, // ... (其他字段略) ], "otherRequest": [ { "field": "main_sql", "label": "主语句", "type": "string", "describe": "SQL首次执行的语句,将会返回:lastInsertId", "value": `REPLACE INTO purchaseout_query( id, io_id, io_date, status, so_id, f_status, warehouse, receiver_name, receiver_mobile, receiver_state, receiver_city, // ... (其他字段略) ) VALUES` }, { "field": "limit", "label": "limit", "type": "string", "value": "1000" } ] } ``` #### 数据提取与清洗 在ETL过程中,首先要从源系统提取原始数据。这一步通常涉及调用源系统API或读取文件等操作。提取的数据可能包含冗余或不一致的信息,因此需要进行清洗。例如: ```sql SELECT io_id AS '退货单号', io_date AS '退货日期', status AS '状态', so_id AS '线上单号', // ... (其他字段映射) FROM source_table WHERE status IN ('Confirmed', 'WaitConfirm') ``` #### 数据转换 接下来是数据转换阶段,根据目标平台的要求,对提取的数据进行格式化和处理。例如,将日期格式统一为`YYYY-MM-DD`,将状态值从英文转为中文等。 ```python def transform_data(row): row['io_date'] = format_date(row['io_date']) row['status'] = translate_status(row['status']) # ... (其他转换逻辑) return row ``` #### 数据写入 最后一步是将转换后的数据写入目标平台。在这里,我们使用MySQL API接口,通过执行SQL语句完成数据插入操作。 ```sql REPLACE INTO purchaseout_query ( id, io_id, io_date, status, so_id, f_status, warehouse, receiver_name, receiver_mobile, receiver_state, receiver_city, // ... (其他字段) ) VALUES ( '{id}', '{io_id}', '{io_date}', '{status}', '{so_id}', '{f_status}', '{warehouse}', '{receiver_name}', '{receiver_mobile}', '{receiver_state}', '{receiver_city}', // ... (其他字段值) ); ``` #### 批量执行与性能优化 为了提高效率,我们可以使用批量执行的方法,每次处理一定数量的数据记录,并设置合理的批量大小(如1000条记录)。 ```python batch_size = metadata['otherRequest'][1]['value'] for batch in get_batches(data_source, batch_size): transformed_batch = [transform_data(row) for row in batch] execute_sql(transformed_batch) ``` 通过以上步骤,我们成功地将聚水潭平台的采购退货单数据转换并写入到BI初本-采购退货表_copy中,实现了不同系统间的数据无缝对接。 ![钉钉与WMS系统接口开发配置](https://pic.qeasy.cloud/T18.png~tplv-syqr462i7n-qeasy.image)