ETL实践:从旺店通到MySQL的数据转换与批量写入

  • 轻易云集成顾问-黄宏棵

案例分享:旺店通·旗舰奇门数据集成到MySQL

在构建高效的数据集成方案中,如何确保数据的准确与实时传输是每个技术专家必须面对的挑战。本文将分享一个具体案例:如何通过轻易云数据集成平台,将旺店通·旗舰奇门系统中的采购退货单——wdt.wms.stockout.purchasereturn.querywithdetail接口获取的数据,可靠地写入到MySQL数据库。

此方案对接了名为“旺店通旗舰版-采购退货单-->BI泰海-采购退货单表_原始查询(2024年起)”的实际运行环境,通过发挥平台强大的批量处理能力和定制化数据映射功能,实现了大规模、高吞吐量的数据同步。以下将详细描述这个过程中关键技术要点及操作流程。

首先,本次集成任务需解决几个核心问题:

  1. API调用与分页控制: 采用wdt.wms.stockout.purchasereturn.querywithdetail接口,需处理返回结果的分页,以确保全量数据不漏单。
  2. 高并发写入支持: MySQL API的batchexecute提供了批量执行能力,使大量记录能够迅速且稳定地存储进数据库。
  3. 实时监控与告警机制: 配置集中监控和告警系统,及时捕获并应对可能出现的问题。
  4. 异常处理与错误重试: 针对网络波动或服务端响应延迟等情况,实现完善的异常捕捉和自动重试策略。

通过以上措施,我们不仅保障了数据传输过程中的完整性,还优化了不同系统间的数据格式差异,并实现双向联动,在提高业务效率同时,大幅降低人力投入成本。在接下来的部分,我们将深入探讨具体实施细节,包括API参数设置、流控管理、以及Error Handling等关键操作步骤。 电商OMS与WMS系统接口开发配置

调用旺店通·旗舰奇门接口wdt.wms.stockout.purchasereturn.querywithdetail获取并加工数据

在数据集成生命周期的第一步,我们需要从源系统中调用API接口获取原始数据,并对其进行初步处理和清洗。本文将详细探讨如何通过轻易云数据集成平台调用旺店通·旗舰奇门接口wdt.wms.stockout.purchasereturn.querywithdetail,并对返回的数据进行加工。

接口调用配置

首先,我们需要配置API接口的调用参数。根据提供的元数据配置,接口使用POST方法,通过分页参数和业务参数来控制查询范围和结果集。

{
  "api": "wdt.wms.stockout.purchasereturn.querywithdetail",
  "effect": "QUERY",
  "method": "POST",
  "number": "order_no",
  "id": "stockout_id",
  "name": "tid",
  "request": [
    {
      "field": "pager",
      "label": "分页参数",
      "type": "object",
      "describe": "分页参数",
      "children": [
        {
          "field": "page_size",
          "label": "分页大小",
          "type": "string",
          "describe": "分页大小",
          "value": "50"
        },
        {
          "field": "page_no",
          "label": "页号",
          "type": "string",
          "describe": "页号",
          "value": "1"
        }
      ]
    },
    {
      "field": "params",
      "label": "业务参数",
      "type": "object",
      "describe": "业务参数",
      ...

分页参数

分页参数用于控制每次请求返回的数据量和页码,以避免一次性获取过多数据导致性能问题。默认配置为每页50条记录,从第一页开始:

{
  ...
  {
    field: 'pager',
    label: '分页参数',
    type: 'object',
    describe: '分页参数',
    children: [
      {
        field: 'page_size',
        label: '分页大小',
        type: 'string',
        describe: '分页大小',
        value: '50'
      },
      {
        field: 'page_no',
        label: '页号',
        type: 'string',
        describe: '页号',
        value: '1'
      }
    ]
  }
}

业务参数

业务参数主要包括时间范围和时间类型,用于指定查询条件。时间范围通过start_timeend_time字段定义,分别对应上次同步时间和当前时间:

{
  ...
  {
    field: 'params',
    label: '业务参数',
    type: 'object',
    describe: '业务参数',
    children: [
      {
        field: 'start_time',
        label: '开始时间',
        type: 'string',
        describe: '开始时间',
        value: '{{LAST_SYNC_TIME|datetime}}'
      },
      {
        field: 'end_time',
        label: '结束时间',
        type: 'string',
        describe: '结束时间',
        value: '{{CURRENT_TIME|datetime}}'
      },
      {
        field: 'time_type',
        label: '时间类型\n\n1:出库时间\n\n2:创建时间\n\n3:最后修改时间\n\n默认1',
        type: 'string',
        describe': ‘时间类型’,
       value': ‘3’
     }
   ]
 }
}

数据清洗与转换

在获取到原始数据后,需要对数据进行清洗与转换,以便后续处理。轻易云平台提供了自动填充响应(autoFillResponse)功能,可以简化这一过程。

例如,对于返回的采购退货单详情列表(details_list),可以通过beatFlat选项将其平铺展开,方便后续的数据处理:

{
  ...
  autoFillResponse': true,
 beatFlat': ['details_list']
}

实际应用案例

假设我们需要从2024年起的采购退货单数据,并将其写入BI泰海的采购退货单表。首先,我们会设置合适的开始和结束时间,然后通过上述配置调用API接口获取数据。

{
"api":"wdt.wms.stockout.purchasereturn.querywithdetail","effect":"QUERY","method":"POST","number":"order_no","id":"stockout_id","name":"tid","request":[{"field":"pager","label":"分页参数","type":"object","describe":"分页参数","children":[{"field":"page_size","label":"分页大小","type":"string","describe":"分页大小","value":"50"},{"field":"page_no","label":"页号","type":"string","describe":"页号","value":"1"}]},{"field":"params","label":"业务参数","type":"object","describe":"业务参数","children":[{"field":"start_time","label":"开始时间","type":"string","describe":"开始时间","value":"'2024-01-01T00:00:00Z'"},{"field':'end_time','label':'结束时间','type':'string','describe':'结束时间','value':'{{CURRENT_TIME|datetime}}'},{'field':'time_type','label':'时间类型\n\n1:出库时间\n\n2:创建时间\n\n3:最后修改时间\n\n默认1','type':'string','describe':'时间类型','value':'3'}]}],'autoFillResponse':true,'beatFlat':['details_list']}
}

通过上述步骤,我们成功地从旺店通·旗舰奇门系统中获取了所需的采购退货单数据,并进行了初步清洗与转换,为后续的数据写入奠定了基础。这一过程展示了如何利用轻易云平台实现高效、透明的数据集成操作。 钉钉与ERP系统接口开发配置

数据集成生命周期中的ETL转换与写入MySQL

在数据集成的生命周期中,ETL(提取、转换、加载)是关键的一步。本文将详细探讨如何将源平台的数据经过ETL转换后,写入目标平台MySQL,通过API接口实现数据的无缝对接。

数据请求与清洗

在数据集成过程中,首先要从源平台获取原始数据,并进行必要的清洗操作。这一步确保数据的准确性和一致性,为后续的转换和写入奠定基础。

数据转换与写入

接下来是数据转换与写入阶段。我们需要将清洗后的数据转化为目标平台MySQL能够接受的格式,并通过API接口进行写入。以下是具体的技术步骤和配置。

元数据配置解析

元数据配置是整个ETL过程中的核心部分,它定义了如何将源数据字段映射到目标数据库字段。以下是一个典型的元数据配置示例:

{
  "api": "batchexecute",
  "effect": "EXECUTE",
  "method": "SQL",
  "number": "id",
  "id": "id",
  "name": "id",
  "idCheck": true,
  "request": [
    {"field": "stockout_id", "label": "出库单id", "type": "string", "value": "{stockout_id}"},
    {"field": "order_no", "label": "出库单号", "type": "string", "value": "{order_no}"},
    {"field": "src_order_no", "label": "采购退货单号", "type": "string", "value": "{src_order_no}"},
    {"field": "warehouse_no", "label": "仓库编号", "type": "string", "value": "{warehouse_no}"},
    {"field": "consign_time", "label": "发货时间", "type": string, value: "{{consign_time|datetime}}"},
    // 更多字段...
  ],
  // SQL主语句
  {
    field: 'main_sql',
    label: '主语句',
    type: 'string',
    describe: 'SQL首次执行的语句,将会返回:lastInsertId',
    value: 'REPLACE INTO wdt_wms_stockout_purchasereturn_querywithdetail (stockout_id, order_no, src_order_no, warehouse_no, consign_time, status, goods_count, logistics_no, post_fee, receiver_name, receiver_province, receiver_city, receiver_district, receiver_address, receiver_telno, remark, weight, provider_no, provider_name, last_load_purchase_no, goods_type_count, create_time, operator_name, goods_total_cost, goods_total_amount, checked_goods_total_cost, modified) VALUES'
  },
  {
    field: 'limit',
    label: 'limit',
    type: 'string',
    value: '100'
  }
}

API接口调用

在轻易云平台中,我们使用batchexecute API来批量执行SQL语句,实现数据写入。以下是一个示例代码片段,用于调用API并执行SQL插入操作:

import requests
import json

url = 'https://api.example.com/batchexecute'
headers = {'Content-Type': 'application/json'}
payload = {
  # 填充元数据配置中的各个字段值
}

response = requests.post(url, headers=headers, data=json.dumps(payload))

if response.status_code == 200:
    print('Data inserted successfully')
else:
    print('Failed to insert data:', response.text)

数据字段映射与格式转换

在实际操作中,需要特别注意字段类型和格式的转换。例如,时间字段通常需要转换为标准的日期时间格式,这可以通过元数据配置中的datetime函数实现:

{"field":"consign_time","label":"发货时间","type":"string","value":"{{consign_time|datetime}}"}

类似地,其他字段也需要根据需求进行相应的格式化处理,以确保符合目标数据库的要求。

执行SQL语句

最终,我们通过构建完整的SQL插入语句,将处理后的数据写入MySQL数据库。以下是一个完整的SQL插入示例:

REPLACE INTO wdt_wms_stockout_purchasereturn_querywithdetail (
  stockout_id,
  order_no,
  src_order_no,
  warehouse_no,
  consign_time,
  status,
  goods_count,
  logistics_no,
  post_fee,
  receiver_name,
  receiver_province,
  receiver_city,
  receiver_district,
  receiver_address,
  receiver_telno,
  remark,
  weight,
  provider_no,
  provider_name,
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)

每个问号代表一个占位符,对应于元数据配置中的字段值。在执行时,这些占位符将被实际的数据替换,从而完成插入操作。

总结

通过上述步骤,我们实现了从源平台到目标平台MySQL的数据ETL过程。利用轻易云提供的全异步、多系统支持特性,我们能够高效地完成不同系统间的数据集成任务。关键在于正确配置元数据,并通过API接口实现自动化的数据处理和写入,从而提升业务效率和透明度。 系统集成平台API接口配置