ETL转换与数据清洗:从SQLServer到MySQL的高效实现

  • 轻易云集成顾问-卢剑航

SQL Server数据集成到MySQL案例分享:6--巨益OMS-退款单表-->Mysql-退款单表-refundorder_z

在实际的业务场景中,跨系统之间的数据集成常常面临诸多挑战,如如何确保高吞吐量的数据传输、处理不同数据库间的数据格式差异等。本文将详细分享一个具体的集成案例,即将SQL Server中的退款单表数据集成到MySQL数据库,以供技术人员借鉴和参考。

案例背景

本次操作涉及的核心任务是将名为“巨益OMS”的退款单表从SQL Server迁移至MySQL,目标数据表名称为refundorder_z。该方案旨在通过定时抓取及大批量写入,实现数据的快速同步,同时保障数据质量和系统稳定性。

技术实现要点

  1. API调用细节

    • 获取SQL Server 数据

      select * from refund_order where updated_at > :last_update_time;

      通过上述查询语句,从Refund_Order表中提取需要更新或新增的记录,并结合时间戳(:last_update_time)参数进行增量同步。

    • 写入MySQL 数据

      batchexecute("INSERT INTO refundorder_z (column1, column2, ...) VALUES (?, ?, ...)");

      使用batchexecute API,将提取出来的大批量数据写入目标MySQL数据库。

  2. 分页与限流策略 为避免大量数据请求对源系统造成冲击,在执行select操作时,需要设置合理的分页机制。例如每次请求限制返回1000条记录。如果总记录数超过此限制,多次请求以分批读取所有需要同步的数据。

  3. 自定义转换逻辑 在某些情况下,两套系统间字段类型或命名规则可能不一致。这种情况下,可以利用自定义转换逻辑来进行适配,使得两端的数据结构保持一致。此外,还需根据业务需求处理一些特定字段如日期格式转换、安全脱敏等。

  4. 监控与异常处理机制 集中的监控系统实时跟踪每个集成任务状态,包括当前进度、成功/失败次数等。同时,通过告警机制及时通知潜在问题。在发生异常情况,如网络断连或资源不足时,可触发重试逻辑,确保最终任务完成且不漏单、不丢失重要信息.

  5. 可靠性与性能优化措施 采用事务控制以及批量提交方式,提高写入效率并减少锁等待。同时,根据实际吞吐需求调整连接池大小和并发线程数,以平衡性能和资源利用率。

文章接下部分,将会深入探讨各技术环节 打通钉钉数据接口

使用轻易云数据集成平台调用SQL Server接口获取并加工数据

在数据集成的生命周期中,调用源系统接口获取数据是至关重要的一步。本文将详细探讨如何使用轻易云数据集成平台,通过SQL Server接口select方法获取并加工退款单表的数据。

元数据配置解析

首先,我们需要理解元数据配置metadata中的各个字段及其作用:

{
  "api": "select",
  "effect": "QUERY",
  "method": "SQL",
  "number": "Id",
  "id": "Id",
  "request": [
    {
      "field": "main_params",
      "label": "主参数",
      "type": "object",
      "describe": "111",
      "children": [
        {"field": "offset", "label": "offset", "type": "int"},
        {"field": "fetch", "label": "fetch", "type": "int", "value":"5000"},
        {"field": "ApproveDateBegin", 
         "label":"审核日期(开始时间)", 
         "type":"string", 
         "value":"{{LAST_SYNC_TIME|datetime}}"
        },
        {"field":"ApproveDateEnd", 
         "label":"审核日期(结束时间)", 
         "type":"string", 
         "value":"{{CURRENT_TIME|datetime}}"
        }
      ]
    }
  ],
  ...
}
  • api: 指定API类型为select,表示执行查询操作。
  • effect: 表示操作的效果,这里是查询(QUERY)。
  • method: 指定方法类型为SQL。
  • numberid: 用于标识记录的唯一字段,这里是Id
  • request: 包含请求参数的定义,其中包括主参数对象main_params,它包含了分页和时间范围等子参数。

SQL 查询语句

元数据配置中定义了主查询语句:

{
  ...
  ,"otherRequest":[
    {
      ...
      ,"value":"select Id,Code,CreateDate,CreateUserName,IsLocked,LockedUserName,ReturnOrderId,ReturnOrderCode,ApproveUser,ApproveDate,ActualAmount,OffsetAmount,CustomerCode,CustomerName,StoreId,StoreName,Status,SalesOrderCode,TradeId,ReturnType,RefundType,Mobile,Consignee,MessageString,Tag,IsCod,IsQuickRefund,IsRefund,ObsoleteUser,ObsoleteDate,ExpressNo,ExpressName,Note,
AliPayNo,
SalesOrderId,
AuditDate,
AuditUser,
BeneficiaryAccount,
BeneficiaryName,
AlipayStatus,
RefundWay,
Account,
ReturnResion,
AccountName,
IsAccounted,
DocType,
AlipayOrderNo,
Version,
ModifyDate from RefundOrder where ApproveDate>=:ApproveDateBegin and ApproveDate<=:ApproveDateEnd order by Id offset :offset rows fetch next :fetch rows only"
    }
  ]
}

该SQL语句从退款单表(RefundOrder)中选择所需字段,并根据审核日期范围进行过滤,同时实现分页功能。

实际应用案例

  1. 设置请求参数: 在实际应用中,我们需要设置请求参数,包括分页信息和时间范围。假设我们需要从上次同步时间到当前时间内的数据,每次获取5000条记录:

    {
     main_params: {
       offset: 0,
       fetch: 5000,
       ApproveDateBegin: '2023-01-01T00:00:00',
       ApproveDateEnd: '2023-01-31T23:59:59'
     }
    }
  2. 执行查询: 使用上述请求参数执行查询,轻易云平台会自动将这些参数替换到SQL语句中的占位符位置:

    select Id,... from RefundOrder where ApproveDate>='2023-01-01T00:00:00' and ApproveDate<='2023-01-31T23:59:59' order by Id offset 0 rows fetch next 5000 rows only
  3. 处理结果: 查询结果返回后,可以进行进一步的数据清洗和转换操作,如格式化日期、处理空值等。

  4. 迭代获取: 如果需要获取更多数据,可以通过调整offset值来实现分页迭代。例如,第二次请求可以设置offset=5000,依此类推,直到所有数据被获取完毕。

技术要点总结

在使用轻易云数据集成平台调用SQL Server接口获取并加工数据时,需要注意以下技术要点:

  1. 元数据配置:准确配置元数据,包括API类型、方法、请求参数等。
  2. SQL语句构建:确保SQL语句能够正确处理分页和时间范围过滤。
  3. 参数替换:平台会自动替换SQL语句中的占位符,因此请求参数必须与元数据配置一致。
  4. 结果处理:对查询结果进行必要的清洗和转换,以满足业务需求。

通过上述步骤,可以高效地从SQL Server中获取所需的数据,并为后续的数据转换与写入奠定基础。 电商OMS与ERP系统接口开发配置

使用轻易云数据集成平台进行ETL转换并写入MySQL

在轻易云数据集成平台的生命周期管理中,第二步是将已经集成的源平台数据进行ETL(提取、转换、加载)转换,并将其转为目标平台 MySQL API 接口所能够接收的格式,最终写入目标平台。本文将详细探讨如何利用轻易云数据集成平台实现这一过程。

数据提取与清洗

在进行数据转换之前,首先需要从源系统中提取数据,并对其进行清洗。清洗过程包括去除冗余数据、修正错误数据以及确保数据的一致性和完整性。这一步骤通常通过 SQL 查询或其他数据提取工具完成。

数据转换

在完成数据提取和清洗之后,下一步是将这些数据转换为目标平台所能接受的格式。在本案例中,我们需要将退款单表的数据从巨益OMS系统转换为MySQL数据库中的格式。以下是元数据配置的详细说明:

{
  "api": "batchexecute",
  "effect": "EXECUTE",
  "method": "SQL",
  "idCheck": true,
  "request": [
    {"field": "Id", "label": "Id", "type": "int", "value": "{Id}"},
    {"field": "Code", "label": "Code", "type": "string", "value": "{Code}"},
    {"field": "CreateDate", "label": "CreateDate", "type": "datetime", "value": "{CreateDate}", "default":"1970-01-01 00:00:00"},
    {"field": "CreateUserName", "label": "CreateUserName", "type": "string", "value": "{CreateUserName}"},
    {"field": "IsLocked", "label": "IsLocked", "type": "int", "value": "{IsLocked}"},
    {"field":"LockedUserName","label":"LockedUserName","type":"string","value":"{LockedUserName}"},
    {"field":"ReturnOrderId","label":"ReturnOrderId","type":"int","value":"{ReturnOrderId}"},
    {"field":"ReturnOrderCode","label":"ReturnOrderCode","type":"string","value":"{ReturnOrderCode}"},
    {"field":"ApproveUser","label":"ApproveUser","type":"string","value":"{ApproveUser}"},
    {"field":"ApproveDate","label":"ApproveDate","type":"datetime","value":"{ApproveDate}","default":"1970-01-01 00:00:00"},
    {"field":"ActualAmount","label":"ActualAmount","type":"float","value":"{ActualAmount}"},
    {"field":"OffsetAmount","label":"OffsetAmount","type":"float","value":"{OffsetAmount}"},
    {"field":"CustomerCode","label":"CustomerCode","type":"string","value":"{CustomerCode}"},
    {"field":"CustomerName","label":"CustomerName","type":"string","value":"{CustomerName}"},
    {"field":"StoreId","label":"StoreId","type":"string","value":"{StoreId}"},
    {"field":"'StoreName'',''StoreName'',''StoreName'',''StoreName'',''StoreName'',''StoreName'',''StoreName''},

该配置定义了如何将源系统中的字段映射到目标系统中的字段。例如,将 Id 字段映射到目标系统中的 Id 字段,将 CreateDate 字段映射到目标系统中的 CreateDate 字段,并指定默认值为 1970-01-01 00:00:00

数据写入

完成数据转换后,最后一步是将转换后的数据写入目标平台。在本案例中,我们使用 MySQL API 接口来实现这一过程。以下是用于写入数据的 SQL 语句:

REPLACE INTO refundorder_z (
  Id, Code, CreateDate, CreateUserName, IsLocked, LockedUserName, ReturnOrderId, ReturnOrderCode, ApproveUser, ApproveDate,
  ActualAmount, OffsetAmount, CustomerCode, CustomerName, StoreId, StoreName, Status, SalesOrderCode, TradeId, ReturnType,
  RefundType, Mobile, Consignee, MessageString, Tag, IsCod, IsQuickRefund, IsRefund, ObsoleteUser, ObsoleteDate,
  ExpressNo, ExpressName, Note, AliPayNo, SalesOrderId, AuditDate, AuditUser,
  BeneficiaryAccount,BeneficiaryName ,AlipayStatus ,RefundWay ,Account ,ReturnResion ,AccountName ,IsAccounted ,
DocType ,AlipayOrderNo ,Version ,ModifyDate 
) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)

该 SQL 语句使用了 REPLACE INTO 命令,以确保如果记录已存在,则更新记录;如果不存在,则插入新记录。每个字段都对应于元数据配置中的一个字段。

实践案例

假设我们从巨益OMS系统中提取了一条退款单记录,其内容如下:

{
  Id: 1,
  Code: 'R12345',
  CreateDate: '2023-10-01T12:34:56',
  CreateUserName: 'admin',
  IsLocked: 0,
  LockedUserName: '',
  ReturnOrderId: 1001,
  ReturnOrderCode: 'RO12345',
  ApproveUser: 'manager',
  ApproveDate: '2023-10-02T12:34:56',
}

根据上述元数据配置和 SQL 插入语句,该记录将被转换并插入到 MySQL 数据库中的 refundorder_z 表中。

通过上述步骤,我们成功地实现了从巨益OMS系统到 MySQL 数据库的 ETL 转换和写入。这一过程不仅保证了数据的一致性和完整性,还提高了业务流程的透明度和效率。 如何对接钉钉API接口