使用轻易云平台进行ETL转换并写入食品追溯平台指南

  • 轻易云集成顾问-吕修远

MySQL数据集成到广东省特殊食品电子追溯平台的技术分享

在本案例中,我们将探讨如何通过MySQL接口select获取检验报告,并利用ProductFinishedCheck接口将这些数据高效、可靠地写入广东省特殊食品电子追溯平台。这一过程涉及从MySQL数据库到目标平台的数据对接,强调了高吞吐量的数据写入能力和实时监控机制,以确保每条数据都能准确无误地完成同步上报。

数据源与目标系统选择

首先,MySQL作为源数据库,通过其select API接口定时抓取检验报告。这些数据不仅需要批量快速读取,同时还要兼顾分页和限流问题,以提高查询效率并保护源系统。随后,这些数据信息被准备传输至广东省特殊食品电子追溯平台,该平台提供了ProductFinishedCheck API用于接收批量数据。

数据转换与映射

为了确保数据格式的一致性及业务逻辑的匹配,需要进行自定义的数据转换操作。例如,将MySQL数据库中的字段名映射为追溯平台要求的字段格式,以及处理可能出现的数据结构差异。此外,为提升整体流程的可视化管理,我们使用了直观便利的数据流设计工具,使得整个集成过程更加清晰易懂。

实时监控与错误处理机制

在实际运行过程中,通过集中化监控和告警系统,实时跟踪各个数据集成任务的状态和性能。在遇到异常情况时,实现自动重试机制,从而保证整个上报流程具备较高的鲁棒性。对于可能发生的问题,如网络抖动或API响应失败等,都能够及时检测并采取相应措施,有效减少因单次错误导致的大规模数据丢失风险。

这个案例展示了一整套完整、精细且效果显著的数据集成解决方案,在保障高吞吐量、高质量以及高可靠性的基础上,实现了业务上的需求闭环。 如何对接钉钉API接口

调用源系统MySQL接口select获取并加工数据

在数据集成的生命周期中,调用源系统接口获取数据是关键的第一步。本文将详细探讨如何通过轻易云数据集成平台配置元数据,从MySQL数据库中调用select接口获取并加工数据。

元数据配置解析

首先,我们需要理解元数据配置中的各个字段及其作用:

  • api: select 表示我们将使用SELECT语句从MySQL数据库中查询数据。
  • effect: QUERY 表示这是一个查询操作。
  • method: POST 指定了HTTP请求的方法。
  • numberid: 分别对应业务单据号和条目ID,用于唯一标识记录。
  • idCheck: true 表示需要进行ID检查。

接下来,我们重点关注请求参数和主SQL语句的配置。

请求参数配置

请求参数部分定义了查询所需的主要参数:

"request": [
  {
    "field": "main_params",
    "label": "主参数",
    "type": "object",
    "describe": "对应其它请求字段内SQL语句的主参数,必须一一对应。",
    "value": "1",
    "children": [
      {
        "field": "limit",
        "label": "限制结果集返回的行数",
        "type": "int",
        "describe": "必要的参数!LIMIT 子句用于限制查询结果返回的行数。它指定了查询应该返回的最大行数。例如,LIMIT 10 表示查询结果只包含最多 10 行数据。这对于分页查询非常有用,可以在每次查询中返回一定数量的结果。",
        "value": "{PAGINATION_PAGE_SIZE}"
      },
      {
        "field": "offset",
        "label": "偏移量",
        "type": "int",
        "describe": "OFFSET 子句用于指定查询结果的起始位置或偏移量。它表示查询应该从结果集的哪一行开始返回数据。例如,OFFSET 20 表示查询应该从结果集的第 21 行开始返回数据。结合 LIMIT 子句使用时,OFFSET 指定了查询结果的起始行数。",
        "value": "{PAGINATION_START_ROW}"
      },
      {
        "field": "last_time",
        "label": "上次同步时间",
        "type": "string",
        "value": "{{LAST_SYNC_TIME|datetime}}"
      }
    ]
  }
]

这些参数包括:

  • limit: 限制返回的数据行数,用于分页。
  • offset: 数据起始位置,用于分页。
  • last_time: 上次同步时间,用于增量同步。

主SQL语句配置

主SQL语句部分定义了实际执行的SQL查询:

"otherRequest":[
  {
    "field":"main_sql",
    "label":"主SQL语句",
    "type":"string",
    "describe":"主SQL查询语句中使用 :limit 这种动态语法字段的赋值,以确保字段与请求参数一一对应,我们可以采用参数绑定的方式。下面是具体的优化步骤:\n1.将主SQL查询语句中的动态字段 :limit 替换为占位符(例如 ?),表示参数的位置。\n2.在执行查询之前,使用参数绑定的方法,将请求参数的值与占位符进行对应绑定。\n通过这种优化方式,我们能够提高查询语句的可读性和维护性,并确保动态语法字段与请求参数的正确对应关系。这样可以更好地保证查询的准确性和安全性。",
    "value":"SELECT scrk.* FROM scrk LEFT JOIN xsck_and_fbsdc xf ON scrk.fmaterialid_fnumber = xf.fmaterialid_fnumber AND scrk.flot = xf.flot WHERE xf.fsend_flag = '发送' AND xf.created_at >= :last_time limit :limit offset :offset"
  }
]

该SQL语句实现了以下功能:

  1. scrk表中选择所有列。
  2. 使用LEFT JOIN连接xsck_and_fbsdc表,根据物料编号和批次号匹配记录。
  3. 筛选出发送标志为“发送”的记录,并且创建时间大于等于上次同步时间。
  4. 使用LIMITOFFSET子句实现分页。

参数绑定与执行

为了确保动态字段与请求参数正确对应,我们采用参数绑定的方法。在执行查询之前,将请求参数值与占位符进行绑定:

SELECT scrk.* 
FROM scrk 
LEFT JOIN xsck_and_fbsdc xf 
ON scrk.fmaterialid_fnumber = xf.fmaterialid_fnumber 
AND scrk.flot = xf.flot 
WHERE xf.fsend_flag = '发送' 
AND xf.created_at >= ? 
LIMIT ? OFFSET ?

在实际执行时,将:last_time, :limit, 和 :offset替换为具体值。这种方式不仅提高了可读性,还增强了安全性,防止SQL注入攻击。

自动填充响应

最后,通过设置autoFillResponse: true,平台会自动填充响应结果,使得后续处理更加便捷。

综上所述,通过合理配置元数据,我们可以高效地从MySQL数据库中调用接口获取并加工数据,为后续的数据转换与写入奠定坚实基础。 金蝶与MES系统接口开发配置

使用轻易云数据集成平台进行ETL转换并写入广东省特殊食品电子追溯平台API接口

在数据集成生命周期的第二步中,我们需要将已经集成的源平台数据进行ETL转换,转为目标平台广东省特殊食品电子追溯平台API接口所能够接收的格式,并最终写入目标平台。本文将详细介绍如何使用轻易云数据集成平台完成这一过程。

配置元数据

首先,我们需要根据提供的元数据配置,定义请求参数和字段映射。以下是元数据配置的详细内容:

{
  "api": "ProductFinishedCheck",
  "method": "POST",
  "idCheck": true,
  "request": [
    {
      "field": "DOCUMENTID",
      "label": "文档唯一标识号",
      "type": "string",
      "value": "_function CONCAT('{fbill_no}-{flot}-{fentry_id}-fileType', FLOOR(RAND() * 10001))"
    },
    {
      "field": "dataset",
      "label": "数据集",
      "type": "array",
      "children": [
        {
          "field": "productBarCode",
          "label": "条形码",
          "type": "string",
          "value": "{fbarcode}",
          "parent": "dataset"
        },
        {
          "field": "batch",
          "label": "批号",
          "type": "string",
          "value": "{flot}",
          "parent": "dataset"
        },
        {
          "field": "fileName",
          "label": "文件名称",
          "describe": "上传文件名称(界面显示用)",
          "type": "string",
          "value": "{fmaterialid_fnumber}-{flot}fileType",
          ...
        },
        ...
      ]
    }
  ]
}

数据请求与清洗

在数据请求与清洗阶段,我们从源系统获取原始数据,并对其进行预处理。假设源系统返回的数据格式如下:

{
  ...
  {
    fbarcode: '1234567890',
    flot: '20231001',
    fmaterialid_fnumber: 'MAT001',
    fdate: '2023-10-01',
    fbill_no: 'BILL123',
    fentry_id: 'ENTRY456'
  }
}

我们需要将这些原始字段映射到目标API所需的字段上。

数据转换与写入

接下来,我们使用轻易云的数据转换功能,将预处理后的数据转换为目标API所需的格式,并通过POST请求将其写入广东省特殊食品电子追溯平台。

  1. 生成文档唯一标识号

    DOCUMENTID = CONCAT(fbill_no, '-', flot, '-', fentry_id, '-fileType', FLOOR(RAND() * 10001));
  2. 构建数据集

    dataset = [
     {
       productBarCode: fbarcode,
       batch: flot,
       fileName: CONCAT(fmaterialid_fnumber, '-', flot, 'fileType'),
       checkType: 'ProductReport',
       creator: '纽斯葆广赛(广东)生物科技股份有限公司',
       createDate: fdate,
       FILE_CONTENT: null
     }
    ];
  3. 构建最终请求体

    requestBody = {
     DOCUMENTID: DOCUMENTID,
     dataset: dataset
    };
  4. 发送POST请求: 使用轻易云提供的HTTP请求功能,将上述构建好的requestBody通过POST方法发送至ProductFinishedCheck API接口。

import requests

url = 'https://api.gdspecialfoodtrace.com/ProductFinishedCheck'
headers = {'Content-Type': 'application/json'}
response = requests.post(url, json=requestBody, headers=headers)

if response.status_code == 200:
    print('Data successfully written to the target platform.')
else:
    print('Failed to write data:', response.text)

实时监控与错误处理

在整个过程中,轻易云的数据集成平台提供了实时监控功能,可以随时查看数据流动和处理状态。如果出现错误,系统会记录详细日志,方便我们进行排查和修正。

通过上述步骤,我们成功地将源平台的数据经过ETL转换后写入了广东省特殊食品电子追溯平台,实现了不同系统间的数据无缝对接。这不仅提高了业务透明度和效率,也确保了数据的一致性和准确性。 如何对接用友BIP接口