高性能数据集成实践:通过轻易云平台实现MySQL数据批量更新

  • 轻易云集成顾问-李国敏

ZZ刷新生产用料清单四化库存-高速写入测试(写入接口不同)-勿删

在本案例中,我们对MySQL数据进行了高效集成,主要任务是将生产用料清单的相关数据,通过轻易云数据集成平台,从一个MySQL数据库快速、准确地写入到另一个MySQL数据库。当处理如此大规模的数据时,高吞吐量的数据写入能力和实时监控及告警系统显得尤为重要。

我们采用了以下几个关键技术要点来确保数据集成过程的顺利进行:

  1. 高吞吐量实现:利用平台提供的batchupdate API实现大量数据的批量插入。这样不仅提升了效率,还有效减少了网络通信开销和服务器负载。

  2. 定时抓取与可靠性保障:通过定时任务调度,对源MySQL中的select API进行可靠抓取,并及时将这些数据更新到目标数据库。这种机制确保在任何时间点都能获取最新的数据,同时避免遗漏或重复。

  3. 异常处理与重试机制:在实际整合过程中,不可避免会遇到一些异常情况。我们设置了完善的错误捕获和重试机制,保证即使发生错误,也能在短时间内恢复并继续任务执行。

  4. 自定义转换逻辑支持:针对业务需求,配置灵活的数据转换规则,使导入的数据结构能够精准匹配目标端要求,避免格式差异引发的问题。

  5. 集中监控与告警系统:借助统一视图,对所有操作步骤进行实时跟踪,一旦出现任何问题,即刻触发告警并生成详细日志记录,有效提高运维人员对整个流程的掌控力。

接下来,我们将详细探讨这些技术点如何具体实施,以及每个环节可能遇到的问题和解决方法。 如何对接用友BIP接口

调用MySQL接口获取并加工数据的技术实现

在数据集成的生命周期中,调用源系统接口获取数据是至关重要的一步。本文将详细探讨如何通过轻易云数据集成平台配置元数据,从MySQL数据库中调用select接口获取并加工数据。

元数据配置解析

首先,我们需要理解元数据配置的各个部分,以便准确地进行API调用和数据处理。以下是关键的元数据配置项:

  • api: select,表示我们要执行的是查询操作。
  • effect: QUERY,表明这是一个查询操作。
  • method: POST,指定了HTTP请求的方法。
  • id: 生产用料清单明细内码,用于标识查询结果中的某个字段。
请求参数

请求参数定义了如何构建查询语句以及如何传递参数:

  • main_params: 这是一个对象类型的主参数,用于包含SQL语句中的动态字段。
    • limit: 限制结果集返回的行数,这是一个必要参数,用于分页查询。
    • offset: 偏移量,指定从结果集的哪一行开始返回数据。
主SQL语句

主SQL语句定义了实际要执行的查询:

select a.fmtono as 计划跟踪号,
b.FEntity_FEntryID as 生产用料清单明细内码,
b.fmaterialid2 as 物料编号,
b.FMustQty as 需求数量,
b.FPickedQty as 已发数量,
c.stock_numb as 四化库存
from mbs_assemble_detail a
left join mbs_assemble_material_detail b on b.fmoentryid=a.fentryid
left join wms_warehouse_store_info c on c.part_no=b.fmaterialid2 and c.mode_no=a.fmtono
where a.fstatus not in ('5','6','7') and a.fpickmtrlStatus<>'3' and left(b.fmaterialid2,4)='0501' and c.matterial_type='4'
and (b.FMustQty-b.FPickedQty)>0
limit :limit offset :offset

在这段SQL语句中,:limit:offset是动态字段,需要在执行查询时绑定实际值。

实现步骤

  1. 配置请求参数

    在轻易云平台上,我们需要配置请求参数,以便在执行查询时传递正确的值。具体来说,我们需要设置limitoffset两个参数。例如:

    {
     "main_params": {
       "limit": 1000,
       "offset": 0
     }
    }
  2. 绑定参数

    在执行查询之前,我们需要将主SQL语句中的动态字段替换为占位符,并使用参数绑定的方法将请求参数的值与占位符进行对应绑定。这可以通过以下伪代码实现:

    main_sql = """
    select a.fmtono as 计划跟踪号,
    b.FEntity_FEntryID as 生产用料清单明细内码,
    b.fmaterialid2 as 物料编号,
    b.FMustQty as 需求数量,
    b.FPickedQty as 已发数量,
    c.stock_numb as 四化库存
    from mbs_assemble_detail a
    left join mbs_assemble_material_detail b on b.fmoentryid=a.fentryid
    left join wms_warehouse_store_info c on c.part_no=b.fmaterialid2 and c.mode_no=a.fmtono
    where a.fstatus not in ('5','6','7') and a.fpickmtrlStatus<>'3' and left(b.fmaterialid2,4)='0501' and c.matterial_type='4'
    and (b.FMustQty-b.FPickedQty)>0
    limit ? offset ?
    """
    
    params = (1000, 0)
    
    cursor.execute(main_sql, params)
  3. 执行查询并处理结果

    执行上述绑定后的SQL语句,并处理返回的数据。例如,将结果转换为JSON格式以便后续处理或写入目标系统:

    results = cursor.fetchall()
    
    # 将结果转换为JSON格式(假设使用Python)
    import json
    
    json_results = json.dumps(results)
    
    # 打印或返回JSON结果以供后续处理
    print(json_results)

技术要点总结

通过以上步骤,我们可以高效地从MySQL数据库中调用接口获取并加工数据。关键在于合理配置元数据、正确绑定动态参数以及高效处理查询结果。这些技术要点确保了数据集成过程的准确性和可靠性,为后续的数据转换与写入奠定了坚实基础。 用友与SCM系统接口开发配置

将源平台数据转换为目标平台 MySQLAPI 接口格式并写入

在轻易云数据集成平台的生命周期中,第二步是将已经集成的源平台数据进行ETL转换,并转为目标平台 MySQLAPI 接口所能够接收的格式,最终写入目标平台。以下将详细探讨这一过程的技术细节和实现方法。

元数据配置解析

我们使用的元数据配置如下:

{
  "api": "batchupdate",
  "effect": "EXECUTE",
  "method": "SQL",
  "number": "id",
  "id": "id",
  "name": "id",
  "idCheck": true,
  "request": [
    {
      "field": "FEntity_FEntryID",
      "label": "FEntity_FEntryID",
      "type": "string",
      "value": "{{生产用料清单明细内码}}"
    },
    {
      "field": "stock_numb",
      "label": "stock_numb",
      "type": "string",
      "value": "{{四化库存}}"
    }
  ],
  "otherRequest": [
    {
      "field": "main_sql",
      "label": "主语句",
      "type": "string",
      "describe": "SQL首次执行的语句,将会返回:lastInsertId",
      "value": "UPDATE mbs_assemble_material_detail SET stock_numb= CASE FEntity_FEntryID"
    },
    {
      "field": "limit",
      "label": "limit",
      "type": "string",
      "value": “10”
    }
  ]
}

数据请求与清洗

在ETL流程中,首先需要从源系统请求数据并进行清洗。根据元数据配置中的request字段,我们需要获取两个字段的数据:生产用料清单明细内码四化库存。这些字段分别对应于目标表中的FEntity_FEntryIDstock_numb

"request":[
  {"field":"FEntity_FEntryID","label":"FEntity_FEntryID","type":"string","value":"{{生产用料清单明细内码}}"},
  {"field":"stock_numb","label":"stock_numb","type":"string","value":"{{四化库存}}"}
]

数据转换与写入

在获取并清洗完数据后,需要将其转换为目标平台 MySQLAPI 接口所能接受的格式,并通过 SQL 批量更新操作写入数据库。

SQL 构建

根据元数据配置中的 main_sql 字段,我们可以构建批量更新 SQL 语句。该语句通过 CASE WHEN THEN END 的方式来批量更新 mbs_assemble_material_detail 表中的 stock_numb 字段。

UPDATE mbs_assemble_material_detail 
SET stock_numb = CASE FEntity_FEntryID 
WHEN 'entry_id_1' THEN 'stock_value_1' 
WHEN 'entry_id_2' THEN 'stock_value_2' 
... 
END 
WHERE FEntity_FEntryID IN ('entry_id_1', 'entry_id_2', ...)
数据绑定与执行

在执行上述 SQL 时,需要将从源系统获取到的数据绑定到相应的占位符上。假设我们从源系统获取到了以下数据:

[
  {"生产用料清单明细内码":"entry_id_1", “四化库存”:”stock_value_1”},
  {"生产用料清单明细内码":"entry_id_2", “四化库存”:”stock_value_2”}
]

则最终生成的 SQL 为:

UPDATE mbs_assemble_material_detail 
SET stock_numb = CASE FEntity_FEntryID 
WHEN 'entry_id_1' THEN 'stock_value_1' 
WHEN 'entry_id_2' THEN 'stock_value_2' 
END 
WHERE FEntity_FEntryID IN ('entry_id_1', 'entry_id_2')

通过 API 调用 batchupdate 方法,将上述 SQL 提交至 MySQLAPI 接口进行执行:

{
  “api”: “batchupdate”,
  “effect”: “EXECUTE”,
  “method”: “SQL”,
  “number”: “id”,
  “idCheck”: true,
  “request”: [
    {
      “field”: “main_sql”,
      “value”: “UPDATE mbs_assemble_material_detail SET stock_numb= CASE FEntity_FEntryID WHEN ‘entry_id_1’ THEN ‘stock_value_1’ WHEN ‘entry_id_2’ THEN ‘stock_value_2’ END WHERE FEntity_FEntryID IN (‘entry_id_1’, ‘entry_id_2’)”
    }
  ]
}

性能优化与限制处理

为了确保批量更新操作不会因为数据量过大而导致性能问题,可以利用元数据配置中的 limit 字段来限制每次更新操作的数据条数。

"otherRequest":[
  {"field":"limit","label":"limit","type":"string","value":"10"}
]

在实际操作中,可以通过分页处理方式,将大批量的数据分批次进行更新,从而提高效率并降低系统负载。

综上所述,通过对元数据配置的深入理解和应用,可以高效地将源平台的数据转换为目标平台 MySQLAPI 接口所能接受的格式,并通过批量更新操作实现快速写入。这一过程不仅提升了数据处理效率,也确保了业务系统间的数据一致性和完整性。 电商OMS与WMS系统接口开发配置