数据ETL:使用轻易云平台写入MySQL的技术方法

  • 轻易云集成顾问-张妍琪
### 案例分享:聚水潭·奇门数据集成到MySQL 在本案例中,我们探讨了如何通过轻易云数据集成平台高效地将聚水潭·奇门系统中的销售出库单历史数据(23年)集成至BI彩度的MySQL数据库中。任务目标是确保大量历史数据能够快速、安全、不漏单地从源系统顺利传输并写入目标数据库。 首先,我们需要调用聚水潭·奇门API接口`jushuitan.saleout.list.query`,以定期抓取其销售出库单信息。为了应对API分页和限流问题,在设计初期我们就明确引入了批量处理机制,以便在可接受的时间范围内完成大规模数据的获取。与此同时,考虑到可能发生的数据异常与网络波动情况,还实现了重试机制和错计日志记录,以保证稳定性和可靠性。 接着,通过自定义的数据转换逻辑,将聚水潭·奇门返回的数据格式整理为适合MySQL字段结构的形式。这一步不仅涉及常规的数据类型匹配,还包括一些业务规则上的映射处理,例如日期格式、字符串长度限制等。 在写入阶段,我们利用轻易云强大的批量数据写入能力,实现了高吞吐量的数据导入操作。在整个过程之中,实时监控工具提供了全面透明化的操控视图,使得每个步骤都能被清晰跟踪,并即时响应任何潜在的问题或告警。例如,通过设置集中监控与告警系统,可以及时发现并解决性能瓶颈,提高整体效率。 这里特别值得一提的是,为确保不漏单,我们采用了一种双重校验机制,即在每次批量操作完成后,会自动进行一次完整性验证。如果检测到数量差异或者其他异常状况,则立即触发重新拉取和补偿策略,以避免任何关键数据信息丢失或错误转载。同时,借助于定制化的数据映射表格,对接过程中所有细节均有详细记录,使后续维护变得更加简便直观。 这一整套解决方案,不仅有效保障了大型销售出库单历史数据的不间断迁移,同时凭借实时性能监测及灵活配置能力,让企业可以更安心地管理其API资产及相关资源,从而极大提升运营效率。 ![数据集成平台可视化配置API接口](https://pic.qeasy.cloud/D16.png~tplv-syqr462i7n-qeasy.image) ### 调用聚水潭·奇门接口获取并加工数据的技术案例 在数据集成过程中,调用源系统接口是至关重要的一步。本文将详细探讨如何通过轻易云数据集成平台调用聚水潭·奇门接口`jushuitan.saleout.list.query`,获取销售出库单数据并进行初步加工。 #### 接口配置与请求参数 首先,我们需要了解`jushuitan.saleout.list.query`接口的基本配置和请求参数。根据元数据配置,接口的主要参数如下: - **api**: `jushuitan.saleout.list.query` - **method**: `POST` - **number**: `io_id` - **id**: `{o_id}{modified}` - **name**: `name` - **idCheck**: `true` 请求参数包括: 1. **page_index**(页数):从第一页开始,默认值为1。 2. **page_size**(每页行数):默认25条,最大50条。 3. **start_time**(修改开始时间):格式为YYYY-MM-DD,必须与结束时间同时存在。 4. **end_time**(修改结束时间):格式为YYYY-MM-DD,必须与开始时间同时存在。 5. **status**(单据状态):可选值包括WaitConfirm(待出库)、Confirmed(已出库)、Cancelled(作废)。 6. **date_type**:固定值为2。 #### 请求参数动态生成 为了确保请求参数的动态性和准确性,我们使用以下配置: ```json [ {"field":"page_index","label":"页数","type":"string","describe":"第几页,从第一页开始,默认1","value":"{PAGINATION_START_PAGE}"}, {"field":"page_size","label":"每页行数","type":"string","describe":"每页多少条,默认25,最大50","value":"{PAGINATION_PAGE_SIZE}"}, {"field":"start_time","label":"修改开始时间","type":"string","describe":"修改起始时间,和结束时间必须同时存在,时间间隔不能超过七天,与线上单号不能同时为空","value":"_function LEFT( '{{DAYS_AGO_1|datetime}}' , 10)"}, {"field":"end_time","label":"修改结束时间","type":"string","describe":"修改结束时间,和起始时间必须同时存在,时间间隔不能超过七天,与线上单号不能同时为空","value":"_function LEFT( '{{CURRENT_TIME|datetime}}' , 10)"}, {"field":"status","label":"单据状态","type":"string","describe":"单据状态: WaitConfirm=待出库; Confirmed=已出库; Cancelled=作废"}, {"field":"date_type","label":"时间类型","type":"string","value":"2"} ] ``` 这里使用了动态函数 `_function LEFT` 来截取日期字符串,以确保日期格式正确。 #### 数据请求与清洗 在轻易云平台上,我们可以通过可视化界面配置上述请求参数,并发起API调用以获取销售出库单数据。以下是一个示例请求: ```json { "api": "jushuitan.saleout.list.query", "method": "POST", "params": { "page_index": "1", "page_size": "25", "start_time": "2023-01-01", "end_time": "2023-01-07", "status": "Confirmed", "date_type": "2" } } ``` 响应的数据通常包含多个字段,如订单ID、商品信息、数量、价格等。我们需要对这些数据进行初步清洗,以便后续处理。例如,可以使用轻易云提供的自动填充功能 `autoFillResponse` 和扁平化处理 `beatFlat` 来简化数据结构: ```json { "autoFillResponse": true, "beatFlat": ["items"] } ``` #### 数据转换与写入 在完成数据清洗后,我们可以将数据转换为目标系统所需的格式,并写入BI彩度的销售出库表中。这一步通常涉及字段映射、类型转换等操作。例如,将聚水潭中的订单ID映射到BI彩度中的相应字段。 通过轻易云平台的全生命周期管理功能,我们可以实时监控数据流动和处理状态,确保每个环节都透明可见,从而提升业务效率。 总结来说,通过轻易云平台调用聚水潭·奇门接口`jushuitan.saleout.list.query`并进行初步加工,是实现不同系统间无缝对接的重要步骤。本文详细介绍了接口配置、请求参数动态生成、数据清洗及转换写入的技术细节,为实际应用提供了参考。 ![打通钉钉数据接口](https://pic.qeasy.cloud/S19.png~tplv-syqr462i7n-qeasy.image) ### 使用轻易云数据集成平台进行ETL转换并写入MySQL API接口的技术案例 在数据集成过程中,ETL(提取、转换、加载)是一个至关重要的环节。本文将详细探讨如何利用轻易云数据集成平台,将已经集成的源平台数据进行ETL转换,并转为目标平台MySQL API接口所能够接收的格式,最终写入目标平台。 #### 数据请求与清洗 首先,我们需要从源系统提取数据,并进行必要的清洗和预处理。假设我们已经完成了这一步,接下来我们将重点放在如何将这些清洗后的数据转换为目标系统所需的格式,并通过API接口写入MySQL数据库。 #### 数据转换与写入 为了实现这一目标,我们需要配置元数据。以下是一个完整的元数据配置示例: ```json { "api": "execute", "effect": "EXECUTE", "method": "SQL", "number": "id", "id": "id", "name": "id", "idCheck": true, "request": [ { "field": "main_params", "label": "主参数", "type": "object", "describe": "对应主语句内的动态参数", "children": [ {"field":"id","label":"主键","type":"string","value":"{o_id}-{items_ioi_id}-{modified}"}, {"field":"co_id","label":"公司编号","type":"string","value":"{co_id}"}, {"field":"shop_id","label":"店铺编号","type":"string","value":"{shop_id}"}, {"field":"io_id","label":"出库单号","type":"string","value":"{io_id}"}, {"field":"o_id","label":"内部订单号","type":"string","value":"{o_id}"}, {"field":"so_id","label":"线上订单号","type":"string","value":"{so_id}"}, {"field":"created","label":"登记时间","type":"string","value":"{created}"}, {"field":"modified","label":"修改时间","type":"string","value":"{modified}"}, {"field":"status","label":"出库单状态","type":"string","value":"{status}"}, {"field":"invoice_title","label":"发票抬头","type":"string","value":"{invoice_title}"}, {"field":"shop_buyer_id","label":"买家帐号","type":"string","value":"{shop_buyer_id}"}, {"field":"receiver_country","label":"国家","type":"","value":"","describe":""}, // 省略部分字段... ] } ], "otherRequest":[ { "field": "main_sql", "label": "主语句", "type": "string", "describe": "SQL首次执行的语句,将会返回:lastInsertId", "value": `REPLACE INTO saleout_list_query( id, co_id, shop_id, io_id, o_id, so_id, created, modified, status, invoice_title, shop_buyer_id, receiver_country, receiver_state, receiver_city, receiver_district, buyer_message, remark, is_cod, pay_amount, l_id, io_date, lc_id, stock_enabled, labels, paid_amount, free_amount, freight, weight, warehouse, drp_co_id_from,f_weight ,order_type ,open_id ,is_print_express ,is_print , drp_info ,buyer_tax_no ,logistics_company ,sns_sku_id ,sns_sn ,merge_so_id , wms_co_id ,items_i_id ,items_sale_base_price ,items_is_gift ,items_oi_id , items_outer_oi_id ,items_raw_so_id ,items_pay_amount ,items_combine_sku_id , items_ioi_id ,items_sku_id ,items_qty ,items_name , items_properties_value ,items_sale_price , items_sale_amount ,shop_name,f_freight,business_staff,currency,node,pay_date,seller_flag,wave_id, order_staff_name) VALUES ( :id,:co_id,:shop_name,:io_date,:o_date,:so_date,:created,:modified,:status,:invoice_title, :shop_buyer_name,:receiver_country,:receiver_state,:receiver_city,:receiver_district, :buyer_message,:remark,:is_cod,:pay_amount,:l_date,:io_date,:lc_date, :stock_enabled,:labels,:paid_amount,:free_amount, :freight_weight.:warehouse_weight.:drp_co_weight.:f_weight.:order_type.:open_weight.:is_print_express :is_print.:drp_info.:buyer_tax_no.:logistics_company.:sns_sku_no.:sns_sn_no.: merge_so_no.:wms_co_no.:item_i_no.item_sale_base_price:item_is_gift:item_oi_no:item_outer_oi_no: item_raw_so_no:item_pay_amount:item_combine_sku_no:item_ioi_no:item_sku_no:item_qty: item_name:item_properties_value:item_sale_price:item_sale_amount:shop_name:f_freight:business_staff: currency:node:pay_date:seller_flag:wave_i:order_staff_name)` } ] } ``` #### 解析元数据配置 1. **API调用方式**:`"api"`字段指定了调用类型为`execute`,表示执行SQL语句。 2. **方法**:`"method"`字段为`SQL`,表示使用SQL语句进行操作。 3. **主参数**:`"request"`字段定义了一个对象类型的主参数,其中包含多个子字段,每个子字段对应一列数据库表中的字段。例如: - `"id"`:由多个源数据字段拼接而成,用于唯一标识记录。 - `"co_id"`、`"shop_id"`等字段直接映射到源数据中的相应字段。 4. **主语句**:`"otherRequest"`中的`"main_sql"`定义了实际执行的SQL语句。这里使用的是`REPLACE INTO`,确保如果记录已存在则更新,不存在则插入。 #### 数据写入过程 1. **准备动态参数**:根据元数据配置,从源系统的数据中提取并组装动态参数。 2. **执行SQL语句**:通过API调用,将组装好的参数传递给定义好的SQL语句,并执行该语句。 3. **处理返回结果**:通常情况下,执行成功后会返回一个`lastInsertId`,用于后续操作或日志记录。 #### 实践案例 假设我们从源系统获取了一条销售出库单记录,其部分字段如下: ```json { "o_id": 12345, "co_id": 67890, // 其他字段... } ``` 根据元数据配置,我们将其转换为目标格式: ```json { ":id": "{12345}-{67890}-{2023-01-01}", ":co_id": 67890, ":shop_name": "...", // 其他映射后的字段... } ``` 然后,通过API调用执行以下SQL语句: ```sql REPLACE INTO saleout_list_query(id,...) VALUES (:id,...) ``` 这样,就完成了从源系统到目标系统的数据转换和写入过程。 通过上述步骤,我们可以高效地利用轻易云数据集成平台,实现复杂的数据ETL转换,并确保最终的数据准确无误地写入到目标平台MySQL数据库中。 ![用友与CRM系统接口开发配置](https://pic.qeasy.cloud/T3.png~tplv-syqr462i7n-qeasy.image)