ETL与数据转换:聚水潭到MySQL的数据集成完整流程

  • 轻易云集成顾问-潘裕
### 案例分享:聚水潭·奇门数据集成到MySQL 在本次技术案例中,我们将探讨如何通过轻易云数据集成平台,将聚水潭·奇门的销售订单数据高效、可靠地集成到内部BI系统中的MySQL数据库中。这一方案被命名为“聚水潭-销售订单-->BI事在人为--销售订单表”,旨在实现业务需求的快速响应与决策支持。 #### 任务背景 为了满足业务部门对实时库存和销售情况分析的需求,必须定时从聚水潭·奇门接口(API: jushuitan.order.list.query)抓取最新的销售订单数据,并批量写入到MySQL数据库(API: batchexecute)。整个过程需要确保以下几个关键技术点: 1. **高吞吐量的数据写入**:大规模数据能够迅速且有效地导入至MySQL,以保证系统处理时效性。 2. **异常处理与错误重试机制**:包括分页和限流问题处理,确保无漏单现象发生。 3. **自定义数据转换逻辑**:适应不同业务需求,实现精确的数据映射和格式转换。 4. **集中监控与告警系统**:实时跟踪并监控整个集成过程中的任务状态及性能,及时发现并解决潜在问题。 #### 技术实现 首先,通过配置定期执行任务,从聚水潭·奇门接口获取原始销售订单数据。每个请求需考虑接口返回结果分页管理的问题,以防止大量未能一次性读回的数据丢失。然后在轻易云平台上设定相应的数据转换规则,将获取到的数据按照目标数据库所需格式进行调整。 接着,在写入流程设计中,我们将采用批量操作方式,通过batchexecute API将整理好的大批量记录快速写入至MySQL。同时,为了避免因网络或服务器问题导致写入失败情况,可以设计多种重试机制来保证最终一致性。 通过这些步骤,不仅可以高效完成从源头至目的端的全流程自动化,还能够借助轻易云提供的可视化工具,对整个流水线进行动态管理及优化,进一步提高整体运作效率。在后续部分内容中,我会详细解构具体实施细节及代码实例。 ![打通用友BIP数据接口](https://pic.qeasy.cloud/D3.png~tplv-syqr462i7n-qeasy.image) ### 调用聚水潭·奇门接口jushuitan.order.list.query获取并加工数据 在轻易云数据集成平台的生命周期中,调用源系统接口是数据处理的第一步。本文将深入探讨如何通过调用聚水潭·奇门接口`jushuitan.order.list.query`来获取销售订单数据,并对其进行初步加工。 #### 接口配置与调用 首先,我们需要配置并调用聚水潭·奇门接口`jushuitan.order.list.query`。该接口采用POST方法,主要用于查询销售订单列表。以下是元数据配置的详细说明: - **API**: `jushuitan.order.list.query` - **方法**: POST - **主键字段**: `o_id` - **请求参数**: - `page_index`: 页数,从第一页开始,默认值为1。 - `page_size`: 每页行数,默认25,最大25。 - `start_time`: 修改开始时间,与结束时间必须同时存在,时间间隔不能超过七天。 - `end_time`: 修改结束时间,与起始时间必须同时存在,时间间隔不能超过七天。 - `status`: 单据状态,可选值包括WaitConfirm(待出库)、Confirmed(已出库)、Cancelled(作废)。 - `date_type`: 时间类型,默认0(修改时间),可选值包括0(修改时间)、1(制单日期)、2(出库时间)。 - `so_ids`: 线上单号列表,多个线上单号以逗号分开。 #### 请求参数设置 在实际操作中,我们需要根据业务需求动态设置请求参数。例如,可以使用模板变量来自动填充时间参数: ```json { "start_time": "{{LAST_SYNC_TIME|datetime}}", "end_time": "{{CURRENT_TIME|datetime}}" } ``` 这些变量会在请求时被替换为实际的同步时间和当前时间,以确保查询的数据范围准确。 #### 数据过滤与条件设置 为了确保获取的数据符合业务需求,可以在元数据配置中添加条件过滤。例如,我们可以设置以下条件来排除特定标签和指定店铺: ```json "condition_bk": [ [ {"field": "labels", "logic": "notlike", "value": "线上发货,虚拟发货"}, {"field": "shop_site", "logic": "eqv2", "value": "头条放心购"} ] ] ``` 上述配置表示排除标签包含“线上发货”或“虚拟发货”的订单,并且只获取店铺为“头条放心购”的订单。 #### 数据清洗与加工 在获取到原始数据后,需要对其进行清洗和初步加工。轻易云平台支持自动填充响应数据,并提供了多种工具来处理复杂的数据结构。例如,可以使用`beatFlat`字段将嵌套的数组展开: ```json "beatFlat": ["items"] ``` 这将把嵌套在`items`字段中的子项展开为平级结构,方便后续的数据处理和分析。 #### 异常处理与补偿机制 为了确保数据处理的可靠性,需要考虑异常情况和补偿机制。轻易云平台提供了定时任务和接管请求功能,以应对可能的异常情况: ```json "omissionRemedy": { "crontab": "2 2 * * *", "takeOverRequest": [ { "field": "start_time", "value": "{{DAYS_AGO_1|datetime}}", "type": "datetime", "label": "接管字段" } ] } ``` 上述配置表示每天凌晨2点执行一次补偿任务,以确保前一天的数据能够被完整获取。 通过以上步骤,我们可以高效地调用聚水潭·奇门接口获取销售订单数据,并对其进行初步加工,为后续的数据转换与写入奠定基础。 ![用友与WMS系统接口开发配置](https://pic.qeasy.cloud/S30.png~tplv-syqr462i7n-qeasy.image) ### 使用轻易云数据集成平台将源平台数据转换并写入MySQL API接口 在数据集成的生命周期中,第二步是将已经集成的源平台数据进行ETL(Extract, Transform, Load)转换,并最终写入目标平台。本案例中,我们将聚水潭的销售订单数据转换为BI事在人为系统所能接收的格式,并通过MySQL API接口写入目标数据库。 #### 数据转换与写入配置 在轻易云数据集成平台中,我们使用元数据配置来定义数据转换和写入过程。以下是具体的配置细节: ```json { "api": "batchexecute", "effect": "EXECUTE", "method": "SQL", "number": "id", "id": "id", "name": "id", "idCheck": true, "request": [ {"field":"id","label":"主键","type":"string","value":"{o_id}-{items_oi_id}"}, {"field":"order_date","label":"下单时间","type":"string","value":"{order_date}"}, {"field":"shop_status","label":"线上订单状态","type":"string","value":"{shop_status}"}, {"field":"question_type","label":"异常类型","type":"string","value":"{question_type}"}, {"field":"shop_id","label":"店铺编号","type":"string","value":"{shop_id}"}, {"field":"question_desc","label":"异常描述","type":"string","value":"{question_desc}"}, {"field":"so_id","label":"线上单号","type":"string","value":"{so_id}"}, {"field":"status","label":"ERP订单状态","type":"string","value":"{status}"}, {"field":"receiver_state","label":"省","type":"string","value":"{receiver_state}"}, {"field":"receiver_city","label":"市","type":"string","value":"{receiver_city}"}, {"field":"receiver_district","label":"区","type":"string","value":"{receiver_district}"}, {"field":"send_date","label":"发货时间","type":"string","value":"{send_date}"}, {"field":... ``` #### 配置解析 1. **API调用方式**:`batchexecute` 表示批量执行SQL语句。 2. **执行效果**:`EXECUTE` 表示执行操作。 3. **方法**:`SQL` 表示使用SQL语句进行操作。 4. **主键设置**:`number`, `id`, `name` 均为 `id`,用于唯一标识每条记录。 5. **请求字段映射**: - 每个字段都有对应的标签、类型和值。例如: - `{"field": "order_date", "label": "下单时间", "type": "string", "value": "{order_date}"}` 表示将源数据中的 `order_date` 字段映射到目标表中的 `order_date` 字段。 - 特殊处理字段如 `items_item_ext_data` 使用 `_function LEFT( '{items_item_ext_data}' , 20)` 截取前20个字符。 #### 主SQL语句 主SQL语句用于插入或更新目标表的数据。这里使用了 `REPLACE INTO` 来确保如果记录存在则更新,不存在则插入: ```sql REPLACE INTO order_list_query( id, order_date, shop_status, question_type, shop_id, question_desc, so_id, status, receiver_state, receiver_city, receiver_district, send_date, plan_delivery_date, creator_name, buyer_tax_no, invoice_type, pay_amount, freight, buyer_message, remark, invoice_title, is_cod, type, paid_amount, pay_date, modified, order_from, l_id, shop_name, wms_co_id, logistics_company, free_amount, co_id, drp_co_id_to, end_time, referrer_id, invoice_data, drp_info, shop_buyer_id,seller_flag, invoice_amount, oaid, open_id,node, referrer_name, shop_site, drp_co_id_from, un_lid, receiver_zip, receiver_email,f_freight, created, receiver_country, skus, ... ) VALUES ``` #### 批量处理限制 为了避免一次性处理过多数据导致性能问题,配置了批量处理限制: ```json {"field": "limit", "label": "limit", "type": "string", "value": "1000"} ``` 这意味着每次最多处理1000条记录。 #### 实际应用 在实际应用中,这些配置会被轻易云数据集成平台自动解析并执行。通过这种方式,可以高效地将源平台的数据转换并写入目标MySQL数据库,实现不同系统之间的数据无缝对接。 这种全异步、多异构系统支持的集成方式,极大提升了业务的透明度和效率,使得复杂的数据处理变得简单可控。 ![打通金蝶云星空数据接口](https://pic.qeasy.cloud/T20.png~tplv-syqr462i7n-qeasy.image)