ETL转换技术在特殊食品追溯平台中的应用

  • 轻易云集成顾问-钟家寿

产品包装信息同步--自制上报流程3:MySQL数据集成到广东省特殊食品电子追溯平台

在进行产品包装信息的同步时,确保各个环节的数据准确无误地上传至广东省特殊食品电子追溯平台,是一项技术性极高的任务。本文主要分享如何利用轻易云数据集成平台高效、安全地实现这一目标。

本案例所采用的解决方案为“产品包装信息同步--自制上报流程3”,核心步骤包括从MySQL数据库中通过select接口抓取最新的产品包装信息,并将这些数据快速写入到广东省特殊食品电子追溯平台的ProductPackInfo接口中。整个过程需要处理大量数据,因此系统需具备高吞吐量的数据写入能力,以保证及时完成业务需求。

为了保证每条记录都能被正确且及时地传输,我们充分利用了轻易云提供的一系列高级特性:

  1. 定时可靠的数据抓取
    系统设置了自动化定时任务,从MySQL数据库接口按指定时间间隔拉取最新更新的数据,确保不会漏单和遗漏任何关键信息。

  2. 批量处理与分页限流
    大量数据需分批次、分页抓取和处理,同时通过限流机制避免对源数据库和目标API造成过大压力,保证系统稳定运行。

  3. 实时监控与告警系统
    利用集中化监控工具,对每个数据集成任务进行实时跟踪。如果出现异常情况或性能问题,通过告警系统可以第一时间发现并解决,有效减少因故障导致的信息延迟或丢失风险。

  4. 自定义转换逻辑及格式映射
    由于MySQL数据库中的原始数据格式可能与广东省特殊食品电子追溯平台要求存在差异,通过可视化设计工具,自定义相应的数据转换逻辑,实现两个系统之间精准、高效的数据对接。例如,可针对具体字段进行必要调整,使其符合最终API规范要求。

  5. 错误重试机制 在网络波动或服务器响应延迟等情况下,为防止部分请求失败而导致数据缺失,实现了完善的错误重试机制。这不仅提升整体成功率,也提高了操作稳定性和可靠性。

这只是一个简要介绍。在后续内容中,我们将更详细探讨此技术方案中涉及到的方法调用、参数配置以及常遇问题与解决策略等具体细节。

金蝶与外部系统打通接口

调用MySQL接口select获取并加工数据

在轻易云数据集成平台中,生命周期的第一步是调用源系统MySQL接口select获取并加工数据。本文将深入探讨如何利用元数据配置来实现这一过程。

API接口配置

首先,我们需要理解元数据配置中的各个字段及其作用。以下是关键字段的详细解释:

  • api: 表示调用的API类型,这里为select
  • effect: 定义了API的作用,这里为QUERY,即查询操作。
  • method: HTTP请求方法,这里为POST
  • numberid: 用于标识请求中的特定字段,分别为fbill_nofentry_id

请求参数配置

请求参数主要包括主参数和其他请求参数。主参数定义了SQL语句中的动态字段,而其他请求参数则包含具体的SQL查询语句。

主参数

主参数定义了SQL语句中需要动态赋值的部分,包括:

  1. limit: 限制结果集返回的行数,使用占位符 {PAGINATION_PAGE_SIZE}
  2. offset: 指定查询结果的起始位置,使用占位符 {PAGINATION_START_ROW}
  3. last_time: 上次同步时间,使用占位符 {{LAST_SYNC_TIME|datetime}}

这些参数确保了查询结果可以分页返回,并且只获取自上次同步以来的新数据。

{
  "field": "main_params",
  "label": "主参数",
  "type": "object",
  "describe": "对应其它请求字段内SQL语句的主参数,必须一一对应。",
  "value": "1",
  "children": [
    {
      "field": "limit",
      "label": "限制结果集返回的行数",
      "type": "int",
      "describe": "必要的参数!LIMIT 子句用于限制查询结果返回的行数。",
      "value": "{PAGINATION_PAGE_SIZE}"
    },
    {
      "field": "offset",
      "label": "偏移量",
      "type": "int",
      "describe": "OFFSET 子句用于指定查询结果的起始位置或偏移量。",
      "value": "{PAGINATION_START_ROW}"
    },
    {
      "field": "last_time",
      "label": "上次同步时间",
      "type": "string",
      "value": "{{LAST_SYNC_TIME|datetime}}"
    }
  ]
}
主SQL语句

主SQL语句中使用了动态字段(如:limit, :offset, :last_time),这些字段将在执行查询时被实际值替换。以下是具体的SQL语句:

{
  "field": "main_sql",
  "label": "主SQL语句",
  "type": "string",
  "describe": 
    """
    主SQL查询语句中使用 :limit 这种动态语法字段的赋值,以确保字段与请求参数一一对应,我们可以采用参数绑定的方式。
    下面是具体的优化步骤:
    1. 将主SQL查询语句中的动态字段 :limit 替换为占位符(例如 ?),表示参数的位置。
    2. 在执行查询之前,使用参数绑定的方法,将请求参数的值与占位符进行对应绑定。

    通过这种优化方式,我们能够提高查询语句的可读性和维护性,并确保动态语法字段与请求参数的正确对应关系。这样可以更好地保证查询的准确性和安全性。
    """,
  value: 
    """
    SELECT scrk.* 
    FROM scrk 
    LEFT JOIN xsck_and_fbsdc xf 
        ON scrk.fmaterialid_fnumber = xf.fmaterialid_fnumber 
        AND scrk.flot = xf.flot 
    WHERE xf.fsend_flag = '发送' 
        AND scrk.fmaterialid_fnsb_sccj LIKE '%纽斯葆广赛%' 
        AND xf.created_at >= :last_time 
    LIMIT :limit OFFSET :offset
    """
}

数据处理流程

在实际操作中,轻易云数据集成平台会根据上述配置生成相应的数据请求,并通过HTTP POST方法将请求发送到MySQL数据库。数据库会根据接收到的动态参数执行查询,并返回符合条件的数据集。

  1. 解析元数据配置:平台首先解析元数据配置文件,提取出主参数和主SQL语句。
  2. 生成实际SQL:将主SQL语句中的动态字段替换为实际值。例如,将:limit, :offset, :last_time替换为具体数值或时间戳。
  3. 执行查询:生成最终SQL后,通过POST方法发送到MySQL数据库执行查询操作。
  4. 处理返回结果:接收并处理数据库返回的数据集,根据需求进行进一步的数据清洗或转换。

通过这种方式,可以高效地从MySQL数据库中获取所需的数据,并确保每次同步时只获取最新的数据,从而提高数据处理效率和准确性。

以上就是调用MySQL接口select获取并加工数据的方法,通过合理配置元数据,可以实现灵活、高效的数据集成。 企业微信与ERP系统接口开发配置

产品包装信息同步至广东省特殊食品电子追溯平台的ETL转换技术案例

在数据集成生命周期的第二步,我们需要将已经集成的源平台数据进行ETL(Extract, Transform, Load)转换,使其符合目标平台——广东省特殊食品电子追溯平台API接口所能够接收的格式,并最终写入目标平台。本文将详细探讨这一过程中的技术细节和实现方法。

API接口配置与元数据解析

在本案例中,我们使用的目标API为ProductPackInfo,请求方法为POST。以下是具体的元数据配置:

{
  "api": "ProductPackInfo",
  "method": "POST",
  "idCheck": true,
  "request": [
    {
      "field": "DOCUMENTID",
      "label": "文档唯一标识号",
      "type": "string",
      "value": "_function CONCAT('{fbill_no}-{flot}-{fentry_id}-{id}-PACK', FLOOR(RAND() * 10001))"
    },
    {
      "field": "dataset",
      "label": "数据集",
      "type": "array",
      "children": [
        {
          "field": "productBarCode",
          "label": "条形码{fbarcode}",
          "type": "string",
          "value": "_mongoQuery 534f876d-5a7a-329b-a79c-16785898efcb findField=content.FBARCODE where={\"content.FNumber\":{\"$eq\":\"{fmaterialid_fnumber}\"}}",
          "parent": "dataset"
        },
        {
          "field": "batch",
          "label": "批号",
          "type": "string",
          "value": "{flot}",
          "parent": "dataset"
        },
        {
          "field": "boxCode",
          "label": "箱码-21E13ADS04",
          "type": "string",
          "parent": "dataset"
        },
        {
          "field": "traceCode",
          "label": "追溯码",
          "type": "string",
          "$value":"{flot}",
          "$parent":"dataset"
        }
      ]
    }
  ]
}

数据提取与转换

  1. 文档唯一标识号生成

    • DOCUMENTID字段通过一个自定义函数生成,其格式为{fbill_no}-{flot}-{fentry_id}-{id}-PACK,并附加一个随机数。这确保了每个文档都有唯一的标识。
  2. 条形码查询

    • productBarCode字段通过MongoDB查询获取。具体查询方式为:
      _mongoQuery 534f876d-5a7a-329b-a79c-16785898efcb findField=content.FBARCODE where={"content.FNumber":{"$eq":"{fmaterialid_fnumber}"}}
      • 此查询从MongoDB集合中找到与{fmaterialid_fnumber}匹配的记录,并提取其FBARCODE字段值。
  3. 批号、箱码和追溯码

    • batch字段直接从源数据中的{flot}字段获取。
    • boxCode字段固定为"21E13ADS04"。
    • traceCode字段同样使用源数据中的{flot}字段。

数据加载

将转换后的数据按照目标API的格式进行组装,并通过HTTP POST请求发送到广东省特殊食品电子追溯平台。以下是一个示例请求体:

{
  DOCUMENTID: '12345-67890-00001-PACK1234',
  dataset: [
    {
      productBarCode: '9876543210123',
      batch: '20230101',
      boxCode: '21E13ADS04',
      traceCode: '20230101'
    }
  ]
}

实现代码示例

以下是一个基于Node.js和MongoDB的实现示例:

const axios = require('axios');
const { MongoClient } = require('mongodb');

async function transformAndLoad(data) {
  const client = new MongoClient('mongodb://localhost:27017');

  try {
    await client.connect();
    const database = client.db('yourDatabase');
    const collection = database.collection('yourCollection');

    // 提取条形码
    const barcodeResult = await collection.findOne({ 'content.FNumber': data.fmaterialid_fnumber });
    const productBarCode = barcodeResult ? barcodeResult.content.FBARCODE : null;

    // 构建请求体
    const requestBody = {
      DOCUMENTID: `${data.fbill_no}-${data.flot}-${data.fentry_id}-${data.id}-PACK${Math.floor(Math.random() * 10001)}`,
      dataset: [
        {
          productBarCode,
          batch: data.flot,
          boxCode: '21E13ADS04',
          traceCode: data.flot
        }
      ]
    };

    // 发起POST请求
    const response = await axios.post('https://api.gdspecialfoodtrace.com/ProductPackInfo', requestBody);

    console.log(response.data);

  } finally {
    await client.close();
  }
}

// 示例调用
transformAndLoad({
  fbill_no: '12345',
  flot: '20230101',
  fentry_id: '00001',
  id: '1234',
  fmaterialid_fnumber: 'A001'
});

上述代码展示了如何从MongoDB中提取所需的数据,并将其转换为目标API所需的格式,最后通过HTTP POST请求发送到目标平台。

通过这种方式,我们实现了从源平台到广东省特殊食品电子追溯平台的数据无缝对接,确保数据在整个生命周期内的准确性和一致性。 打通金蝶云星空数据接口