数据集成中的ETL转换与写入详解:聚水潭到BI勤威

  • 轻易云集成顾问-潘裕

聚水潭数据集成到MySQL的实施案例分析

在现代企业的数据管理过程中,通过高效、可靠的工具进行系统对接和数据集成,已成为提升业务运行效率的重要手段。本文将分享一个具体的技术案例:如何通过轻易云平台实现聚水潭采购入库单数据向MySQL数据库的集成。

在本次案例中,我们主要关注以下几个核心技术点:

  1. 调用聚水潭接口获取采购入库单数据:使用API /open/purchasein/query 定时抓取最新入库单信息,确保不漏单。
  2. 处理聚水潭接口分页与限流问题:为应对大数据量下的信息拉取挑战,实现分页抓取,并配以合理的限流策略。
  3. 批量写入MySQL数据库:针对获取的大量采购入库数据信息,使用MySQL API batchexecute 实现高速、大吞吐量的数据写入能力。
  4. 解决格式差异与自定义转换逻辑:由于原始数据结构与目标表格不完全一致,需要进行必要的数据清洗、映射和转换,以满足业务需求。
  5. 异常处理与错误重试机制:构建健全的数据质量监控体系,在碰到网络延迟或API故障等情况时能够及时发现并触发重试逻辑。

这些步骤通过轻易云平台提供的一系列功能,包括实时监控、可视化操作界面以及灵活配置选项,使得整个流程变得更加透明、高效。 接下来我们详细介绍每个环节中的实现细节及遇到的问题和对应的解决方案。 金蝶与MES系统接口开发配置

调用聚水潭接口获取并加工数据的技术案例

在数据集成过程中,调用源系统接口是至关重要的一步。本文将详细探讨如何通过轻易云数据集成平台调用聚水潭接口/open/purchasein/query,获取采购入库单数据并进行初步加工。

接口概述

聚水潭接口/open/purchasein/query用于查询采购入库单信息。该接口采用POST请求方式,支持分页查询,并允许根据修改时间、采购单号、采购入库单号等条件进行过滤。以下是该接口的元数据配置:

{
  "api": "/open/purchasein/query",
  "effect": "QUERY",
  "method": "POST",
  "number": "io_id",
  "id": "io_id",
  "name": "io_id",
  "idCheck": true,
  "request": [
    {"field":"page_index","label":"第几页","type":"int","describe":"从1开始","value":"1"},
    {"field":"page_size","label":"每页数量","type":"int","describe":"最大不超过50","value":"30"},
    {"field":"modified_begin","label":"修改起始时间","type":"string","describe":"起始时间和结束时间必须同时存在,时间间隔不能超过七天,与采购单号不能同时为空","value":"{{LAST_SYNC_TIME|datetime}}"},
    {"field":"modified_end","label":"修改结束时间","type":"string","describe":"起始时间和结束时间必须同时存在,时间间隔不能超过七天,与采购单号不能同时为空","value":"{{CURRENT_TIME|datetime}}"},
    {"field":"po_ids","label":"采购单号列表","type":"string","describe":"与修改时间不能同时为空.采购单号最大不能超过30条"},
    {"field":"io_ids","label":"采购入库单号列表","type":"string","describe":"与修改时间不能同时为空.采购入库单号最大不能超过30条"},
    {"field":"so_ids","label":"线上单号","type":"string","describe":"与修改时间不能同时为空"}
  ],
  "autoFillResponse": true,
  "beatFlat": ["items"]
}

请求参数配置

在调用该接口时,需要配置以下请求参数:

  • page_index: 页码,从1开始。
  • page_size: 每页记录数,最大不超过50。
  • modified_begin: 修改起始时间。
  • modified_end: 修改结束时间。
  • po_ids: 采购单号列表。
  • io_ids: 采购入库单号列表。
  • so_ids: 线上单号。

其中,modified_beginmodified_end必须同时存在,并且两者之间的时间间隔不能超过七天。如果提供了这些参数,则可以根据指定的时间范围来查询数据;否则,可以通过提供具体的订单编号来查询。

数据请求与清洗

在实际操作中,我们通常会按以下步骤进行数据请求与清洗:

  1. 初始化请求参数:设置初始的分页参数和时间范围。例如:

    {
     "page_index": 1,
     "page_size": 30,
     "modified_begin": "{{LAST_SYNC_TIME|datetime}}",
     "modified_end": "{{CURRENT_TIME|datetime}}"
    }
  2. 发送请求:使用POST方法发送请求到聚水潭接口,并获取响应数据。响应数据通常包含多个字段,其中最重要的是包含在items数组中的具体记录。

  3. 解析响应:解析响应中的items字段,将其展开为平面结构,以便后续处理。例如:

    {
     "items": [
       {
         "io_id": "12345",
         ...
       },
       ...
     ]
    }
  4. 处理分页:如果返回的数据量较大,需要处理分页逻辑,即循环发送请求直到所有数据都被获取完毕。

  5. 数据清洗:对获取的数据进行初步清洗,包括字段映射、类型转换、去重等操作。例如,将字符串类型的日期字段转换为标准日期格式。

示例代码

以下是一个简单的示例代码片段,用于调用聚水潭接口并处理返回的数据:

import requests
import json

# 初始化请求参数
params = {
    "page_index": 1,
    "page_size": 30,
    "modified_begin": last_sync_time, # 替换为实际值
    "modified_end": current_time      # 替换为实际值
}

# 定义API URL
api_url = 'https://api.jushuitan.com/open/purchasein/query'

# 循环处理分页
while True:
    response = requests.post(api_url, json=params)
    data = response.json()

    # 检查是否有更多数据需要处理
    if not data['items']:
        break

    # 数据清洗和处理逻辑
    for item in data['items']:
        io_id = item['io_id']
        # 更多字段处理...

    # 更新分页参数
    params['page_index'] += 1

print("数据处理完成")

通过上述步骤,我们可以高效地从聚水潭系统中获取所需的采购入库单数据,并进行初步加工,为后续的数据转换与写入做好准备。 企业微信与ERP系统接口开发配置

数据集成中的ETL转换与写入:从聚水潭到BI勤威

在数据集成过程中,将源平台的数据转换为目标平台可接受的格式,并最终写入目标平台,是一个至关重要的步骤。本文将详细探讨如何使用轻易云数据集成平台将聚水潭的采购入库单数据转换为BI勤威的采购入库表数据,并通过MySQL API接口写入目标平台。

元数据配置解析

在进行ETL(Extract, Transform, Load)操作时,元数据配置是关键。以下是我们将使用的元数据配置:

{
  "api": "batchexecute",
  "effect": "EXECUTE",
  "method": "SQL",
  "number": "id",
  "id": "id",
  "name": "id",
  "idCheck": true,
  "request": [
    {"field": "id", "label": "主键", "type": "string", "value": "{io_id}-{items_ioi_id}"},
    {"field": "io_id", "label": "入库单号", "type": "string", "value": "{io_id}"},
    {"field": "ts", "label": "数据库行版本号", "type": "string", 
        "describe":"https://docs.microsoft.com/zh-cn/sql/t-sql/data-types/rowversion-transact-sql?view=sql-server-ver16","value":"{ts}"},
    {"field": "warehouse", "label": "仓库名称", "type": "string", 
        "value":"{warehouse}"},
    {"field":"po_id","label":"采购单号","type":"string","value":"{po_id}"},
    {"field":"supplier_id","label":"供应商编号","type":"string","value":"{supplier_id}"},
    {"field":"supplier_name","label":"供应商名称","type":"string","value":"{supplier_name}"},
    {"field":"modified","label":"修改时间","type":"string","value":"{modified}"},
    {"field":"so_id","label":"线上单号","type":"string",
        "describe":"对应采购入库页面的线上单号(且对应采购入库上传的external_id)","value":"{so_id}"},
    {"field":"out_io_id","label":"外部单号","type":"string","value":"{out_io_id}"},
    {"field":"status","label":"状态","type":"string",
        "describe":"状态(WaitConfirm待入库;Confirmed已入库;Cancelled取消;Archive归档;OuterConfirming外部确认中)","value":
        "{status}"},
    {"field":"io_date","label":"入库日期","type":"string",
        "value":
        "{io_date}"},
    {"field":
        "wh_id",
        "label":
        "仓库编号",
        "type":
        "string",
        "describe":
        "主仓=1,销退仓=2,进货仓=3,次品仓=4,自定义1仓=6,自定义2仓=7,自定义3仓=8",
        "value":
        "{wh_id}"
    },
    {
      ...
      // 其他字段省略
      ...
    }
  ],
  ...
}

数据转换与写入流程

  1. 提取数据(Extract): 从聚水潭系统中提取采购入库单的数据。这一步通常通过API调用或数据库查询实现。

  2. 数据清洗与转换(Transform): 根据元数据配置,将提取到的数据进行清洗和转换。例如,将io_iditems_ioi_id组合生成新的主键id,确保每个字段的数据类型和格式符合目标平台要求。

    REPLACE INTO purchasein_query(
       id, io_id, ts, warehouse, po_id, supplier_id, supplier_name, modified,
       so_id, out_io_id, status, io_date, wh_id, wms_co_id, remark,
       tax_rate, labels, archived, merge_so_id, type,
       creator_name, f_status, l_id,
       items_ioi_id, items_sku_id, items_i_id,
       items_unit, items_name, items_qty,
       items_ioi_d, items_cost_price,
       items_cost_amount,
       items_remark,
       items_batch_no,
       items_tax_rate,
       sns_sku_id,
       sns_sn
    ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ? ,?, ? ,?, ? ,?, ? ,?, ? ,?, ? ,?, ? ,?, ? ,?, ? ,?)
  3. 加载数据(Load): 使用MySQL API接口将转换后的数据批量写入BI勤威的数据库。通过执行上述SQL语句,将处理好的数据插入到目标表中。

API接口调用示例

以下是一个简化的API调用示例,用于将转换后的数据批量插入到MySQL数据库:

import requests

url = 'http://your-mysql-api-endpoint/batchexecute'
headers = {'Content-Type': 'application/json'}
data = {
  'main_sql': 'REPLACE INTO purchasein_query(...) VALUES (...)',
  'limit': '1000',
  'request': [
      {
          'id': '123-456',
          'io_id': '123',
          ...
          # 其他字段值
          ...
      },
      # 更多记录
  ]
}

response = requests.post(url, headers=headers, json=data)
if response.status_code == 200:
    print('Data loaded successfully')
else:
    print('Failed to load data:', response.text)

以上代码展示了如何通过HTTP POST请求,将准备好的数据发送到MySQL API接口进行批量插入操作。

注意事项

  • 字段映射:确保源平台和目标平台之间字段的一一对应关系正确无误。
  • 数据类型:在转换过程中,要特别注意不同系统之间的数据类型差异,例如字符串、整数、浮点数等。
  • 错误处理:在实际操作中,需要加入错误处理机制,以便及时发现并解决问题。

通过上述步骤,我们可以高效地将聚水潭系统中的采购入库单数据转换并写入到BI勤威系统中,实现不同系统间的数据无缝对接。 如何对接金蝶云星空API接口