ETL与数据转换:聚水潭到MySQL的数据集成完整流程

  • 轻易云集成顾问-潘裕

案例分享:聚水潭·奇门数据集成到MySQL

在本次技术案例中,我们将探讨如何通过轻易云数据集成平台,将聚水潭·奇门的销售订单数据高效、可靠地集成到内部BI系统中的MySQL数据库中。这一方案被命名为“聚水潭-销售订单-->BI事在人为--销售订单表”,旨在实现业务需求的快速响应与决策支持。

任务背景

为了满足业务部门对实时库存和销售情况分析的需求,必须定时从聚水潭·奇门接口(API: jushuitan.order.list.query)抓取最新的销售订单数据,并批量写入到MySQL数据库(API: batchexecute)。整个过程需要确保以下几个关键技术点:

  1. 高吞吐量的数据写入:大规模数据能够迅速且有效地导入至MySQL,以保证系统处理时效性。
  2. 异常处理与错误重试机制:包括分页和限流问题处理,确保无漏单现象发生。
  3. 自定义数据转换逻辑:适应不同业务需求,实现精确的数据映射和格式转换。
  4. 集中监控与告警系统:实时跟踪并监控整个集成过程中的任务状态及性能,及时发现并解决潜在问题。

技术实现

首先,通过配置定期执行任务,从聚水潭·奇门接口获取原始销售订单数据。每个请求需考虑接口返回结果分页管理的问题,以防止大量未能一次性读回的数据丢失。然后在轻易云平台上设定相应的数据转换规则,将获取到的数据按照目标数据库所需格式进行调整。

接着,在写入流程设计中,我们将采用批量操作方式,通过batchexecute API将整理好的大批量记录快速写入至MySQL。同时,为了避免因网络或服务器问题导致写入失败情况,可以设计多种重试机制来保证最终一致性。

通过这些步骤,不仅可以高效完成从源头至目的端的全流程自动化,还能够借助轻易云提供的可视化工具,对整个流水线进行动态管理及优化,进一步提高整体运作效率。在后续部分内容中,我会详细解构具体实施细节及代码实例。 打通用友BIP数据接口

调用聚水潭·奇门接口jushuitan.order.list.query获取并加工数据

在轻易云数据集成平台的生命周期中,调用源系统接口是数据处理的第一步。本文将深入探讨如何通过调用聚水潭·奇门接口jushuitan.order.list.query来获取销售订单数据,并对其进行初步加工。

接口配置与调用

首先,我们需要配置并调用聚水潭·奇门接口jushuitan.order.list.query。该接口采用POST方法,主要用于查询销售订单列表。以下是元数据配置的详细说明:

  • API: jushuitan.order.list.query
  • 方法: POST
  • 主键字段: o_id
  • 请求参数:
    • page_index: 页数,从第一页开始,默认值为1。
    • page_size: 每页行数,默认25,最大25。
    • start_time: 修改开始时间,与结束时间必须同时存在,时间间隔不能超过七天。
    • end_time: 修改结束时间,与起始时间必须同时存在,时间间隔不能超过七天。
    • status: 单据状态,可选值包括WaitConfirm(待出库)、Confirmed(已出库)、Cancelled(作废)。
    • date_type: 时间类型,默认0(修改时间),可选值包括0(修改时间)、1(制单日期)、2(出库时间)。
    • so_ids: 线上单号列表,多个线上单号以逗号分开。

请求参数设置

在实际操作中,我们需要根据业务需求动态设置请求参数。例如,可以使用模板变量来自动填充时间参数:

{
  "start_time": "{{LAST_SYNC_TIME|datetime}}",
  "end_time": "{{CURRENT_TIME|datetime}}"
}

这些变量会在请求时被替换为实际的同步时间和当前时间,以确保查询的数据范围准确。

数据过滤与条件设置

为了确保获取的数据符合业务需求,可以在元数据配置中添加条件过滤。例如,我们可以设置以下条件来排除特定标签和指定店铺:

"condition_bk": [
  [
    {"field": "labels", "logic": "notlike", "value": "线上发货,虚拟发货"},
    {"field": "shop_site", "logic": "eqv2", "value": "头条放心购"}
  ]
]

上述配置表示排除标签包含“线上发货”或“虚拟发货”的订单,并且只获取店铺为“头条放心购”的订单。

数据清洗与加工

在获取到原始数据后,需要对其进行清洗和初步加工。轻易云平台支持自动填充响应数据,并提供了多种工具来处理复杂的数据结构。例如,可以使用beatFlat字段将嵌套的数组展开:

"beatFlat": ["items"]

这将把嵌套在items字段中的子项展开为平级结构,方便后续的数据处理和分析。

异常处理与补偿机制

为了确保数据处理的可靠性,需要考虑异常情况和补偿机制。轻易云平台提供了定时任务和接管请求功能,以应对可能的异常情况:

"omissionRemedy": {
  "crontab": "2 2 * * *",
  "takeOverRequest": [
    {
      "field": "start_time",
      "value": "{{DAYS_AGO_1|datetime}}",
      "type": "datetime",
      "label": "接管字段"
    }
  ]
}

上述配置表示每天凌晨2点执行一次补偿任务,以确保前一天的数据能够被完整获取。

通过以上步骤,我们可以高效地调用聚水潭·奇门接口获取销售订单数据,并对其进行初步加工,为后续的数据转换与写入奠定基础。 用友与WMS系统接口开发配置

使用轻易云数据集成平台将源平台数据转换并写入MySQL API接口

在数据集成的生命周期中,第二步是将已经集成的源平台数据进行ETL(Extract, Transform, Load)转换,并最终写入目标平台。本案例中,我们将聚水潭的销售订单数据转换为BI事在人为系统所能接收的格式,并通过MySQL API接口写入目标数据库。

数据转换与写入配置

在轻易云数据集成平台中,我们使用元数据配置来定义数据转换和写入过程。以下是具体的配置细节:

{
  "api": "batchexecute",
  "effect": "EXECUTE",
  "method": "SQL",
  "number": "id",
  "id": "id",
  "name": "id",
  "idCheck": true,
  "request": [
    {"field":"id","label":"主键","type":"string","value":"{o_id}-{items_oi_id}"},
    {"field":"order_date","label":"下单时间","type":"string","value":"{order_date}"},
    {"field":"shop_status","label":"线上订单状态","type":"string","value":"{shop_status}"},
    {"field":"question_type","label":"异常类型","type":"string","value":"{question_type}"},
    {"field":"shop_id","label":"店铺编号","type":"string","value":"{shop_id}"},
    {"field":"question_desc","label":"异常描述","type":"string","value":"{question_desc}"},
    {"field":"so_id","label":"线上单号","type":"string","value":"{so_id}"},
    {"field":"status","label":"ERP订单状态","type":"string","value":"{status}"},
    {"field":"receiver_state","label":"省","type":"string","value":"{receiver_state}"},
    {"field":"receiver_city","label":"市","type":"string","value":"{receiver_city}"},
    {"field":"receiver_district","label":"区","type":"string","value":"{receiver_district}"},
    {"field":"send_date","label":"发货时间","type":"string","value":"{send_date}"},
    {"field":...

配置解析

  1. API调用方式batchexecute 表示批量执行SQL语句。
  2. 执行效果EXECUTE 表示执行操作。
  3. 方法SQL 表示使用SQL语句进行操作。
  4. 主键设置number, id, name 均为 id,用于唯一标识每条记录。
  5. 请求字段映射
    • 每个字段都有对应的标签、类型和值。例如:
      • {"field": "order_date", "label": "下单时间", "type": "string", "value": "{order_date}"} 表示将源数据中的 order_date 字段映射到目标表中的 order_date 字段。
      • 特殊处理字段如 items_item_ext_data 使用 _function LEFT( '{items_item_ext_data}' , 20) 截取前20个字符。

主SQL语句

主SQL语句用于插入或更新目标表的数据。这里使用了 REPLACE INTO 来确保如果记录存在则更新,不存在则插入:

REPLACE INTO order_list_query(
  id, order_date, shop_status, question_type, shop_id, question_desc, so_id, status,
  receiver_state, receiver_city, receiver_district, send_date, plan_delivery_date,
  creator_name, buyer_tax_no, invoice_type, pay_amount, freight, buyer_message,
  remark, invoice_title, is_cod, type, paid_amount, pay_date, modified,
  order_from, l_id, shop_name, wms_co_id, logistics_company,
  free_amount, co_id, drp_co_id_to, end_time,
  referrer_id, invoice_data, drp_info,
  shop_buyer_id,seller_flag,
  invoice_amount,
  oaid,
  open_id,node,
  referrer_name,
  shop_site,
  drp_co_id_from,
  un_lid,
  receiver_zip,
  receiver_email,f_freight,
  created,
  receiver_country,
  skus,
...
) VALUES

批量处理限制

为了避免一次性处理过多数据导致性能问题,配置了批量处理限制:

{"field": "limit", "label": "limit", "type": "string", "value": "1000"}

这意味着每次最多处理1000条记录。

实际应用

在实际应用中,这些配置会被轻易云数据集成平台自动解析并执行。通过这种方式,可以高效地将源平台的数据转换并写入目标MySQL数据库,实现不同系统之间的数据无缝对接。

这种全异步、多异构系统支持的集成方式,极大提升了业务的透明度和效率,使得复杂的数据处理变得简单可控。 打通金蝶云星空数据接口