ETL流程优化:实施MySQL数据集成的实用指南

  • 轻易云集成顾问-吴伟

案例分享:吉客云数据集成到MySQL

在本案例中,我们探讨如何通过轻易云数据集成平台将吉客云的数据高效地对接到MySQL数据库。整个方案的核心任务是实现从"cgth-1吉克云查询采购退货接口"(erp.storage.goodsdocout.v2)定时抓取数据,并批量写入MySQL数据库中。我们将聚焦于具体技术细节和实际操作步骤,确保每一个环节清晰可见且高效运行。

数据获取与处理

首先,为了保证数据不漏单,我们需要设定可靠的定时任务,通过调用吉客云的API接口erp.storage.goodsdocout.v2来获取最新的采购退货信息。为了应对分页和限流的问题,系统设计了智能调度机制,确保每次请求都能最大化获取有效数据,同时避免触发API限流策略。

def fetch_data_from_jikeyun(api_url, params):
    response = requests.get(api_url, params=params)
    if response.status_code == 200:
        return response.json()
    else:
        handle_error(response)

# 定时器设置,每10分钟拉取一次
schedule.every(10).minutes.do(fetch_data_from_jikeyun, api_url="https://api.jikecloud.com/erp/storage/goodsdocout/v2", params={"page": 1})

数据转换与映射

由于吉客云返回的数据格式与最终写入MySQL的数据结构可能有所不同,这就要求我们定义自适应的数据转换逻辑。这一部分可以使用轻易云平台提供的自定义转换工具,将原始JSON格式的数据解析并映射为预期的表结构。

def convert_and_map_data(raw_data):
    mapped_data = []
    for entry in raw_data['data']:
        mapped_entry = {
            'order_id': entry['orderID'],
            'product_name': entry['productName'],
            'quantity': entry['quantity'],
            # 更多字段映射...
        }
        mapped_data.append(mapped_entry)
    return mapped_data

数据写入及性能优化

大量数据写入到MySQL是另一个关键挑战。在这里,我们利用轻易云平台支持高吞吐量的数据写入能力,实现大规模、高频次、低延迟的数据存储。同时,通过批量处理和事务管理机制,可以进一步提高效率,并保障数据一致性。

-- 批量插入模板 SQL
INSERT INTO purchase_returns (order_id, product_name, quantity) VALUES (%s, %s, %s);

def batch_insert_to_mysql(database_connection, data_list):
    cursor = database_connection.cursor()
    insert_query = "INSERT INTO purchase_returns (order_id, product_name, quantity) VALUES
![如何开发用友BIP接口](https://pic.qeasy.cloud/D11.png~tplv-syqr462i7n-qeasy.image)
### 调用吉客云接口获取并加工数据的技术实现

在数据集成生命周期的第一步,我们需要调用源系统吉客云的接口 `erp.storage.goodsdocout.v2` 获取采购退货数据,并对其进行初步加工处理。本文将详细探讨如何通过轻易云数据集成平台配置元数据,完成这一过程。

#### 接口调用与请求配置

首先,我们需要配置API请求参数,以便正确调用吉客云的接口。以下是元数据配置中的关键字段及其含义:

```json
{
  "api": "erp.storage.goodsdocout.v2",
  "method": "POST",
  "number": "goodsdocNo",
  "id": "recId",
  "pagination": {
    "pageSize": 50
  },
  "idCheck": true,
  "formatResponse": [
    {"old": "inOutDate", "new": "datetime_new", "format": "date"},
    {"old": "goodsdocNo", "new": "order_no_new", "format": "string"}
  ],
  ...
}
  • api: 指定要调用的API接口。
  • method: HTTP请求方法,这里使用POST。
  • numberid: 用于标识记录的唯一字段。
  • pagination: 分页配置,指定每页的数据量。
  • idCheck: 启用ID检查,确保数据唯一性。
  • formatResponse: 定义响应数据格式转换规则。

请求参数详解

接下来,我们需要详细配置请求参数,以确保获取到所需的数据:

"request": [
  {"field":"pageIndex","label":"分页页码","type":"string"},
  {"field":"pageSize","label":"分页页数","type":"string","value":"100"},
  {"field":"goodsDocNo","label":"出库单号","type":"string"},
  {"field":"startDate","label":"创建时间的起始时间","type":"string","value":"_function DATE_FORMAT(DATE_ADD(NOW(),INTERVAL -30 DAY),'%Y-%m-%d 00:00:00')"},
  {"field":"endDate","label":"创建时间的结束时间","type":"string","value":"{{CURRENT_TIME|datetime}}"},
  {"field":"inouttype","label":"类型","type":"string","describe":"201-销售出库 ... 205采购退货 ... ","value":"205"},
  ...
]
  • pageIndexpageSize: 分页控制参数,确保大数据量时分批次获取。
  • goodsDocNo: 出库单号,可用于精确查询特定记录。
  • startDateendDate: 时间范围过滤,确保只获取最近30天的数据。
  • inouttype: 出库类型,这里固定为“205”表示采购退货。

数据格式转换与清洗

在获取到原始数据后,需要对其进行格式转换和清洗。根据元数据配置中的formatResponse字段,我们将对部分字段进行重命名和格式化:

"formatResponse": [
    {"old": "inOutDate", "new": "datetime_new", "format": "date"},
    {"old": "goodsdocNo", "new": "order_no_new", "format": "string"}
]
  • inOutDate字段重命名为datetime_new并格式化为日期类型。
  • goodsdocNo字段重命名为order_no_new并格式化为字符串类型。

数据请求示例

以下是一个完整的数据请求示例,通过POST方法向吉客云接口发送请求:

{
    "pageIndex": 1,
    "pageSize": 100,
    "startDate": "_function DATE_FORMAT(DATE_ADD(NOW(),INTERVAL -30 DAY),'%Y-%m-%d 00:00:00')",
    "endDate": "{{CURRENT_TIME|datetime}}",
    "inouttype": "205",
    ...
}

该请求将返回符合条件的采购退货记录,并按照我们定义的格式进行初步处理。

数据清洗与转换

在接收到响应后,我们需要进一步处理数据,例如去除嵌套结构、提取必要字段等。根据元数据配置中的beatFlat字段,可以将嵌套结构展平:

"beatFlat":["goodsDocDetailList"]

这一步骤可以简化后续的数据处理流程,使得每条记录都包含所有必要的信息。

通过上述步骤,我们成功实现了从吉客云获取采购退货数据并进行初步加工,为后续的数据转换与写入奠定了基础。在实际应用中,可以根据具体需求进一步优化和调整这些配置,以满足不同业务场景下的数据集成需求。 用友与CRM系统接口开发配置

轻易云数据集成平台ETL转换与MySQLAPI接口写入技术案例

在数据集成生命周期的第二阶段,我们重点关注如何将已经集成的源平台数据进行ETL转换,并最终写入目标平台MySQL。以下将详细探讨这一过程,特别是如何通过API接口实现高效的数据写入。

元数据配置解析

元数据配置是ETL过程中的关键部分,它定义了数据从源到目标的映射关系和处理逻辑。在本案例中,我们使用如下元数据配置:

{
  "api": "execute",
  "effect": "EXECUTE",
  "method": "POST",
  "idCheck": true,
  "request": [
    {
      "field": "main_params",
      "label": "main_params",
      "type": "object",
      "describe": "111",
      "children": [
        {"field": "recId", "label": "明细id", "type": "string", "value": "{goodsDocDetailList_recId}"},
        {"field": "order_no_new", "label": "单号", "type": "string", "value": "{order_no_new}"},
        {"field": "datetime_new", "label": "时间", "type": "date", "value": "{datetime_new}"},
        {"field": "sales_count", "label": "金额", "type": "string", "value": "{goodsDocDetailList_estCost}"},
        {"field": "qty_count", "label": "数量", "type":"string","value":"{goodsDocDetailList_quantity}"},
        {"field":"status","label":"状态","type":"string","value":"{qeasystatus}"},
        {"field":"Document_Type","label":"单据类型","type":"string","value":"采购退货"}
      ]
    }
  ],
  ...
}

数据请求与清洗

首先,通过request字段定义了需要从源系统提取的数据字段及其对应的目标字段。每个字段都有明确的类型和描述,例如:

  • recId(明细id)对应goodsDocDetailList_recId
  • order_no_new(单号)对应order_no_new
  • datetime_new(时间)对应datetime_new
  • sales_count(金额)对应goodsDocDetailList_estCost
  • qty_count(数量)对应goodsDocDetailList_quantity
  • status(状态)对应qeasystatus
  • Document_Type(单据类型)固定为“采购退货”

这些字段的信息将被提取并清洗,确保其格式和内容符合目标系统的要求。

数据转换与写入

接下来,通过定义SQL语句实现数据的转换和写入。元数据中的otherRequest字段包含了具体的SQL插入语句:

{
  ...
  {
    “field”: “main_sql”,
    “label”: “main_sql”,
    “type”: “string”,
    “describe”: “111”,
    “value”: “INSERT INTO `jky_cgth`(`recId`,\n
              `order_no_new`,\n
              `datetime_new`,\n
              `sales_count`,\n
              `qty_count`,\n
              `status`,\n
              `Document_Type`\n) VALUES (:recId,\n
              :order_no_new,\n
              :datetime_new,\n
              :sales_count,\n
              :qty_count,\n
              :status,\n
              :Document_Type\n)”
  }
}

该SQL语句定义了如何将清洗后的数据插入到目标表jky_cgth中。每个占位符如:recId, :order_no_new, 等等,将被相应的数据值替换。

API接口调用

通过API接口调用实现数据写入是整个流程的最后一步。在本案例中,使用POST方法调用API:

{
  ...
  “method”: “POST”,
  ...
}

确保在调用时,所有必要的数据都已准备好并且符合目标系统MySQL API接口的要求。

技术细节总结

  1. 元数据配置:通过详细定义每个字段及其映射关系,确保源数据能够准确提取。
  2. SQL语句:通过预定义的SQL语句,实现数据从源到目标表的无缝插入。
  3. API接口调用:使用POST方法调用API,将清洗和转换后的数据写入MySQL数据库。

这种方法不仅提高了数据处理效率,还确保了每一步操作透明可追踪,为业务提供了可靠的数据支持。 金蝶与外部系统打通接口