利用轻易云平台实现采购数据的ETL与MySQL写入

  • 轻易云集成顾问-钟家寿
### 聚水潭-采购入库单到BI斯莱蒙-采购入库表的数据集成案例 在企业信息系统之间实现数据同步和对接是日常业务运营中至关重要的一部分。此次技术分享聚焦于如何将聚水潭系统中的采购入库单数据成功集成到MySQL数据库的实施方案,具体方案名称为“聚水潭-采购入库单-->BI斯莱蒙-采购入库表”。我们将在轻易云数据集成平台上通过配置一系列关键节点,确保高效、可靠的数据传输。 首先,需要调用聚水潭提供的API接口 `/open/purchasein/query` 以定时抓取最新的采购入库单数据。这一步骤不仅要求考虑分页和限流问题,还需要保证所获取的数据完整且不漏单。针对这些挑战,我们使用了一些自定义脚本来处理分页,并结合了数据质量监控机制,以实时检测并通知任何异常情况。 在获取到足够量的原始数据后,下一步则是将这些数据快速写入到MySQL数据库中。在这一过程中,通过高吞吐量的数据写能力以及批量操作手段,使得大量数据信息能够高效地传输和存储。同时,为进一步优化资源利用,采用了统一视图下的API资产管理功能,对所有涉及的API调用进行全面跟踪与控制。 此外,在数据转换阶段,由于聚水潭返回的数据格式与MySQL目标表结构存在一定差异,因此通过轻易云平台所提供的可视化工具设计相应的数据流,将原有结构映射至符合目标要求的新结构。在实际运作中,即便遇到了异常或错误,也可以凭借重试机制及时修复,从而确保整个过程平稳运行。 最后,通过集中化监控及告警系统,实现了对所有任务状态和性能指标的一站式查看,使团队能够迅速应对任何突发状况,大幅提升整体效率。在整个流程完成之后,各项日志记录也能帮助回溯分析,提高未来项目开展中的准确性和一致性。 ![金蝶与WMS系统接口开发配置](https://pic.qeasy.cloud/D38.png~tplv-syqr462i7n-qeasy.image) ### 调用聚水潭接口获取并加工数据的技术案例 在轻易云数据集成平台中,调用源系统聚水潭接口`/open/purchasein/query`是数据集成生命周期的第一步。本文将详细探讨如何通过该接口获取采购入库单数据,并进行必要的数据加工。 #### 接口概述 聚水潭接口`/open/purchasein/query`用于查询采购入库单信息。该接口采用POST方法请求,支持分页查询,并且需要提供时间范围或特定的单号列表作为查询条件。以下是元数据配置的详细信息: ```json { "api": "/open/purchasein/query", "effect": "QUERY", "method": "POST", "number": "io_id", "id": "io_id", "name": "io_id", "idCheck": true, "request": [ {"field":"page_index","label":"第几页","type":"int","describe":"从1开始","value":"1"}, {"field":"page_size","label":"每页数量","type":"int","describe":"最大不超过50","value":"30"}, {"field":"modified_begin","label":"修改起始时间","type":"string","describe":"起始时间和结束时间必须同时存在,时间间隔不能超过七天,与采购单号不能同时为空","value":"{{LAST_SYNC_TIME|datetime}}"}, {"field":"modified_end","label":"修改结束时间","type":"string","describe":"起始时间和结束时间必须同时存在,时间间隔不能超过七天,与采购单号不能同时为空","value":"{{CURRENT_TIME|datetime}}"}, {"field":"po_ids","label":"采购单号列表","type":"string","describe":"与修改时间不能同时为空.采购单号最大不能超过30条"}, {"field":"io_ids","label":"采购入库单号列表","type":"string","describe":"与修改时间不能同时为空.采购入库单号最大不能超过30条"}, {"field":"so_ids","label":"线上单号","type":"string","describe":"与修改时间不能同时为空"} ], "autoFillResponse": true, "beatFlat": ["items"], "delay": 900 } ``` #### 请求参数详解 1. **page_index**: 分页查询的页码,从1开始。 2. **page_size**: 每页返回的数据条数,最大不超过50。 3. **modified_begin**: 修改起始时间,格式为字符串。起始和结束时间必须同时存在,且间隔不超过七天。 4. **modified_end**: 修改结束时间,格式为字符串。 5. **po_ids**: 可选参数,指定要查询的采购单号列表。 6. **io_ids**: 可选参数,指定要查询的采购入库单号列表。 7. **so_ids**: 可选参数,指定要查询的线上单号。 #### 数据请求与清洗 在实际操作中,我们通常会根据上次同步的时间(`LAST_SYNC_TIME`)和当前时间(`CURRENT_TIME`)来设置`modified_begin`和`modified_end`参数,以确保获取到最新的数据。 ```json { "page_index": 1, "page_size": 30, "modified_begin": "{{LAST_SYNC_TIME|datetime}}", "modified_end": "{{CURRENT_TIME|datetime}}" } ``` 请求发送后,我们会接收到一个包含多个采购入库单信息的响应。由于配置了`autoFillResponse`和`beatFlat`属性,响应中的数据会自动平铺并填充到预定义的数据结构中。这一步骤极大地简化了数据清洗过程,使得我们可以直接对平铺后的数据进行处理。 #### 数据转换与写入 在完成数据请求与清洗后,我们需要对数据进行转换以符合目标系统(如BI斯莱蒙)的要求。通常包括以下步骤: 1. **字段映射**:将源系统中的字段映射到目标系统中的相应字段。例如,将聚水潭中的`io_id`映射到BI斯莱蒙中的对应字段。 2. **数据类型转换**:确保所有字段的数据类型符合目标系统的要求。例如,将日期字符串转换为目标系统所需的日期格式。 3. **数据过滤与校验**:根据业务规则过滤无效或重复的数据,并进行必要的校验。 #### 实际案例 假设我们需要将获取到的采购入库单数据写入BI斯莱蒙系统,我们可以使用以下伪代码来实现: ```python # 获取数据 response = post_request("/open/purchasein/query", { "page_index": 1, "page_size": 30, "modified_begin": last_sync_time, "modified_end": current_time }) # 平铺后的响应数据 data = response["items"] # 数据转换 transformed_data = [] for item in data: transformed_item = { "purchase_in_id": item["io_id"], # 更多字段映射... } transformed_data.append(transformed_item) # 写入目标系统 write_to_bi_slimon(transformed_data) ``` 通过上述步骤,我们成功实现了从聚水潭获取并加工采购入库单数据,并将其写入BI斯莱蒙系统。这一过程充分利用了轻易云平台的数据集成能力,实现了不同系统间的数据无缝对接。 ![钉钉与WMS系统接口开发配置](https://pic.qeasy.cloud/S17.png~tplv-syqr462i7n-qeasy.image) ### 使用轻易云数据集成平台进行ETL转换并写入MySQLAPI接口 在数据集成的生命周期中,ETL(Extract, Transform, Load)过程是关键的一环。本文将详细探讨如何使用轻易云数据集成平台,将聚水潭的采购入库单数据转换为BI斯莱蒙采购入库表所需的格式,并通过MySQLAPI接口写入目标平台。 #### 数据请求与清洗 在数据请求与清洗阶段,我们从源平台聚水潭获取采购入库单数据。该阶段主要包括数据提取和初步清洗,以确保数据的完整性和一致性。 #### 数据转换与写入 在完成初步的数据请求与清洗后,进入ETL流程的第二步:数据转换与写入。这一步骤至关重要,因为它决定了目标平台接收到的数据质量和结构是否符合预期。 ##### 元数据配置解析 以下是用于配置MySQLAPI接口的元数据: ```json { "api": "batchexecute", "effect": "EXECUTE", "method": "SQL", "number": "id", "id": "id", "name": "id", "idCheck": true, "request": [ {"field":"id","label":"主键","type":"string","value":"{io_id}-{items_ioi_id}"}, {"field":"io_id","label":"入库单号","type":"string","value":"{io_id}"}, {"field":"ts","label":"数据库行版本号","type":"string","describe":"https://docs.microsoft.com/zh-cn/sql/t-sql/data-types/rowversion-transact-sql?view=sql-server-ver16","value":"{ts}"}, {"field":"warehouse","label":"仓库名称","type":"string","value":"{warehouse}"}, {"field":"po_id","label":"采购单号","type":"string","value":"{po_id}"}, {"field":"supplier_id","label":"供应商编号","type":"string","value":"{supplier_id}"}, {"field":"supplier_name","label":"供应商名称","type":"string","value":"{supplier_name}"}, {"field":"modified","label":"修改时间","type":"string","value":"{modified}"}, {"field":"so_id","label":"线上单号","type":"string","describe":"对应采购入库页面的线上单号(且对应采购入库上传的external_id)","value":"{so_id}"}, {"field":"out_io_id","label":"外部单号","type":"string","value":"{out_io_id}"}, {"field":"status","label":"状态","type":"string","describe":""}, {"field":""}, ... ], "otherRequest":[ { "field": "main_sql", "label": "主语句", "type": "string", "describe": "", "value": "" }, { "" ... ] } ``` ##### 数据字段映射 在配置过程中,需要注意每个字段的映射关系。例如: - `{"field": "id", "label": "主键", "type": "string", "value": "{io_id}-{items_ioi_id}"}`:将`io_id`和`items_ioi_id`组合生成唯一主键。 - `{"field": "warehouse", "label": "仓库名称", "type": "string", "value": "{warehouse}"}`:直接映射仓库名称。 - `{"field": "", ...}`:其他字段类似处理。 ##### SQL语句配置 通过元数据中的`main_sql`字段,我们可以定义SQL执行语句: ```sql REPLACE INTO purchasein_query( id, io_id, ts, warehouse, po_id, supplier_id, supplier_name, modified, so_id, out_io_id, status, io_date, wh_id, wms_co_id, remark, tax_rate, labels, archived, merge_so_id, type, creator_name, f_status, l_id, items_ioi_id, items_sku_id, items_i_id, items_unit, items_name, items_qty, items_io_id, items_cost_price, items_cost_amount, items_remark, items_batch_no, items_tax_rate,sns_sku_id,sns_sn ) VALUES ``` 该语句使用了`REPLACE INTO`,确保在插入新记录时,如果存在相同主键则替换旧记录。这对于保持数据的一致性非常重要。 ##### 执行批量操作 元数据中的`limit`字段设置为1000,表示每次批量操作最多处理1000条记录。这有助于提高效率并避免一次性处理大量数据带来的性能问题。 ```json { ... { ... {"field": "", ...}, { ... } } ``` ### 总结 通过轻易云数据集成平台,我们能够高效地将源平台的数据进行ETL转换,并通过MySQLAPI接口写入目标平台。本文详细解析了元数据配置和SQL语句执行的关键步骤,确保读者能够深入理解并应用这些技术细节。 ![如何对接钉钉API接口](https://pic.qeasy.cloud/T27.png~tplv-syqr462i7n-qeasy.image)