利用轻易云完成吉客云数据到MySQL的ETL过程

  • 轻易云集成顾问-彭亮

吉客云数据集成到MySQL:pyd-吉客云查询盘盈单-->mysql 案例分享

在当今复杂多变的业务环境中,高效的数据对接与集成是企业成功的重要基础。本文将聚焦于一个具体案例:如何通过轻易云数据集成平台,将吉客云的盘盈单数据无缝对接到MySQL数据库,实现高效、稳定的数据流动和管理。

一、任务概述

本次案例的目标是实现吉客云API接口wms.stocktake.get获取的盘盈单数据,通过精准且高效的数据转换,定时可靠地写入到MySQL数据库中。这个过程不仅需要解决多种技术挑战,还需保证数据的一致性和完整性。

二、关键技术要点

  1. 大量数据快速写入

    • 数据量大的情况下,需要确保MySQL具备高吞吐量的数据写入能力。本方案采用批量处理方式,提升了整体效率。
  2. 实时监控与告警系统

    • 集中的监控与告警系统能够实时跟踪每个集成任务的状态和性能,及时发现并处理异常情况,保障系统稳定运行。
  3. 自定义数据转换逻辑

    • 为适应特定业务需求,在从吉客云读取原始数据后,对其进行必要的数据格式转换,以符合MySQL表结构要求,并记录操作日志以供追溯。
  4. 分页与限流控制

    • 由于吉客云接口可能存在分页限制或请求频率限制,需要合理设置分页参数及调用频次,使得整个抓取过程既满足业务需求又不超出服务端接口限制。

三、具体实施步骤简述

  1. 调用吉客云API wms.stocktake.get 通过HTTP请求周期性地访问该API,以获取最新盘盈单信息。
  2. 解析返回结果并进行转换 使用自定义逻辑将JSON格式的原始响应转化为适合存储于MySQL中的结构化记录。
  3. 批量写入至MySQL数据库(execute API) 将经过处理后的数据一次性交由execute API执行Insert操作,提高插入效率并减少事务开销。
  4. 验证结果及异步错误重试机制实施 确保每条插入操作都有反馈,并在失败时启动错误重试机制,保证最终一致性。同时,通过日志记载所有操作过程,为调试提供依据。

稍后我们会进一步详细介绍每一步骤中的实现细节,包括代码示例、注意事项以及遇到的问题如何解决等。 如何对接用友BIP接口

调用吉客云接口wms.stocktake.get获取并加工数据

在数据集成生命周期的第一步,我们需要调用源系统吉客云的接口wms.stocktake.get,获取盘盈单数据并进行初步加工。以下是详细的技术实现过程。

1. 配置API请求参数

首先,根据元数据配置,我们需要设置API请求参数。这些参数包括仓库编号、条码、条目、页码以及盘点时间范围。具体配置如下:

{
  "api": "wms.stocktake.get",
  "method": "POST",
  "request": {
    "warehouseCode": "123456",
    "skuBarcode": "",
    "pageSize": "20",
    "pageIndex": "",
    "startPdDate": "{{LAST_SYNC_TIME|datetime}}",
    "endPdDate": "{{CURRENT_TIME|datetime}}"
  }
}

2. 数据请求与清洗

通过上述配置,我们向吉客云发送POST请求,获取盘盈单数据。为了确保数据的一致性和准确性,我们需要对返回的数据进行清洗和格式化处理。

数据清洗与格式化

根据元数据配置中的formatResponse字段,我们需要对返回的数据进行字段重命名和格式转换。例如:

  • stocktakeDate字段重命名为datetime_new,并将其格式化为日期类型。
  • stocktakeId字段重命名为order_no_new,并将其格式化为字符串类型。

具体实现如下:

def format_response(data):
    formatted_data = []
    for item in data:
        formatted_item = {
            "datetime_new": format_date(item["stocktakeDate"]),
            "order_no_new": str(item["stocktakeId"]),
            # 其他字段保持不变
        }
        formatted_data.append(formatted_item)
    return formatted_data

def format_date(date_str):
    # 实现日期格式转换逻辑
    pass

3. 数据转换与写入

在完成数据清洗和格式化后,我们需要将处理后的数据转换为目标系统所需的格式,并写入到MySQL数据库中。

数据转换

根据元数据配置中的beatFlat字段,我们需要将某些字段展平。例如,将stockCountGain字段展平为多个独立的记录。

def flatten_data(data):
    flattened_data = []
    for item in data:
        stock_count_gain = item.pop("stockCountGain", [])
        for gain in stock_count_gain:
            flattened_item = {**item, **gain}
            flattened_data.append(flattened_item)
    return flattened_data
数据写入

最后,将处理后的数据写入到MySQL数据库中。我们可以使用Python的MySQL连接库来实现这一过程。

import mysql.connector

def write_to_mysql(data):
    conn = mysql.connector.connect(
        host="your_host",
        user="your_user",
        password="your_password",
        database="your_database"
    )

    cursor = conn.cursor()

    for item in data:
        sql = """
        INSERT INTO your_table (datetime_new, order_no_new, ...)
        VALUES (%s, %s, ...)
        """
        values = (item["datetime_new"], item["order_no_new"], ...)
        cursor.execute(sql, values)

    conn.commit()
    cursor.close()
    conn.close()

通过上述步骤,我们完成了从吉客云获取盘盈单数据并进行初步加工的全过程。这一步是整个数据集成生命周期中的关键环节,为后续的数据处理和分析奠定了基础。 金蝶云星空API接口配置

利用轻易云数据集成平台进行ETL转换并写入MySQL

在数据集成生命周期的第二步中,我们需要将已经集成的源平台数据进行ETL(Extract, Transform, Load)转换,并最终写入目标平台MySQL。本文将深入探讨如何通过API接口实现这一过程,特别是利用元数据配置来简化和自动化这一过程。

数据请求与清洗

首先,我们从源平台获取原始数据。在这个案例中,我们从吉客云查询盘盈单的数据。假设我们已经完成了数据请求与清洗阶段,接下来进入数据转换与写入阶段。

数据转换与写入

在这个阶段,我们需要将清洗后的数据转换为目标平台MySQL所能接受的格式,并通过API接口写入MySQL数据库。

元数据配置解析

以下是我们需要使用的元数据配置:

{
  "api": "execute",
  "effect": "EXECUTE",
  "method": "POST",
  "idCheck": true,
  "request": [
    {
      "field": "main_params",
      "label": "main_params",
      "type": "object",
      "describe": "111",
      "children": [
        {"field": "stockCountGain_id", "label": "明细id", "type": "string", "value": "{stockCountGain_id}"},
        {"field": "order_no_new", "label": "单号", "type": "string", "value": "{order_no_new}"},
        {"field": "datetime_new", "label": "时间", "type": "date", "value": "{datetime_new}"},
        {"field": "qty_count", "label": "数量", "type": "string", "value": "{stockCountGain_count}"},
        {"field": "sales_count", "label": "",type":"string"},
        {"field":"status","label":"状态","type":"string","value":"{qeasystatus}"},
        {"field":"Document_Type","label":"单据类型","type":"string","value":"盘盈单"}
      ]
    }
  ],
  ``otherRequest``: [
    {
      ``field``: ``main_sql``,
      ``label``: ``main_sql``,
      ``type``: ``string``,
      ``describe``: ``111``,
      ``value``: 
          ```INSERT INTO `jky_pyd`
          ( `stockCountGain_id`,`order_no_new`,`datetime_new`,`qty_count`,`sales_count`,`status`,`Document_Type` ) 
          VALUES (:stockCountGain_id,:order_no_new,:datetime_new,:qty_count,:sales_count,:status,:Document_Type)```
    }
  ]
}
API接口调用
  1. API定义:我们使用POST方法,通过API execute 来执行SQL插入操作。
  2. 参数定义:在 request 部分,我们定义了所需的参数,包括 stockCountGain_id, order_no_new, datetime_new, qty_count, sales_count, status, 和 Document_Type
  3. SQL语句:在 otherRequest 部分,我们定义了具体的SQL插入语句,该语句将上述参数插入到目标表 jky_pyd 中。
参数映射
  • stockCountGain_id: 映射到源数据中的 {stockCountGain_id}
  • order_no_new: 映射到源数据中的 {order_no_new}
  • datetime_new: 映射到源数据中的 {datetime_new}
  • qty_count: 映射到源数据中的 {stockCountGain_count}
  • sales_count: 暂无映射值,可以为空或默认值。
  • status: 映射到源数据中的 {qeasystatus}
  • Document_Type: 固定值“盘盈单”。
执行步骤
  1. 准备请求体:根据元数据配置生成请求体,填充对应的字段值。
  2. 发送请求:通过HTTP POST方法,将请求体发送至API接口地址。
  3. 处理响应:接收并处理API响应,确保数据成功写入MySQL数据库。

示例代码(Python伪代码):

import requests
import json

# 准备请求体
payload = {
    'main_params': {
        'stockCountGain_id': '12345',
        'order_no_new': 'ORD67890',
        'datetime_new': '2023-10-01',
        'qty_count': '100',
        'sales_count': '',
        'status': 'active',
        'Document_Type': '盘盈单'
    },
    'main_sql': """
    INSERT INTO jky_pyd (stockCountGain_id, order_no_new, datetime_new, qty_count, sales_count, status, Document_Type)
    VALUES (:stockCountGain_id, :order_no_new, :datetime_new, :qty_count, :sales_count, :status, :Document_Type)
    """
}

# 发送POST请求
response = requests.post('http://api.example.com/execute', json=payload)

# 检查响应状态
if response.status_code == 200:
    print("Data successfully inserted into MySQL")
else:
    print(f"Failed to insert data: {response.text}")

通过上述步骤和代码示例,我们可以实现从吉客云查询盘盈单的数据转换,并成功写入目标平台MySQL。这一过程充分利用了元数据配置,使得整个ETL过程更加高效和自动化。 钉钉与CRM系统接口开发配置