从获取数据到写入MySQL:企业级ETL转换的最佳实践

  • 轻易云集成顾问-曾平安

案例分享:BDS对账吉客云采购入库明细集成MySQL

在处理企业级数据时,高效、准确且稳定的数据集成是至关重要的。近期,我们成功完成了BDS对账吉客云采购入库明细数据到MySQL的集成,实现了两者间的数据互通和同步。本次项目采用了轻易云数据集成平台,通过其强大的可视化配置和全生命周期管理功能,使得整个过程更加直观和透明。

技术方案概述

为了实现将吉客云采购入库明细(API接口:erp.storage.goodsdocin)定期、安全地批量写入到MySQL数据库(API接口:executeReturn),我们设计了一套详细的技术解决方案:

  1. 高吞吐量的数据写入
    我们利用平台支持的大规模数据吞吐能力,将大量的采购入库明细快速、高效地写入到目标MySQL数据库中。这确保了即使在业务峰值期间,也能保持系统性能稳定。

  2. 集中监控与告警
    数据流动过程中,我们借助集中式监控和告警系统实时跟踪任务状态及性能表现。一旦发生异常或预设条件触发,即刻产生报警,并提供详细日志记录,便于及时定位问题并采取相应措施。

  3. 分页与限流处理
    吉客云API对于返回结果存在分页限制,同时也有调用次数限额。因此,在抓取数据时我们特别注意设置适当的分页参数,并优化请求频率以避免因超出调用次数而被封禁的问题。

  4. 自定义数据转换逻辑
    在实际应用中,我们发现吉客云与MySQL之间的数据格式存在一定差异。为此,通过自定义的数据转换规则,对获取到的JSON格式原始数据进行必要的清洗、转化,以符合目标数据库表结构及业务需求。

  5. 可靠性保障机制
    为确保每笔交易都能完整无误地记录下来,我们实现了基于事物控制和错误重试机制。在出现网络抖动或短暂不可用情况时,可自动重新尝试,最大程度上减少漏单现象。

接下来部分将深入探讨这一技术框架下具体实施步骤,包括如何通过轻易云平台完成从初始配置、任务调度,到最终落盘验证等环节,进一步解析每个核心操作点。 如何开发金蝶云星空API接口

调用源系统吉客云接口erp.storage.goodsdocin获取并加工数据

在数据集成生命周期的第一步,我们需要从源系统吉客云调用接口erp.storage.goodsdocin来获取采购入库明细数据,并对其进行初步加工。以下是具体的技术实现和配置细节。

接口调用配置

为了从吉客云获取采购入库明细数据,我们需要配置API请求参数。根据提供的元数据配置,API调用采用POST方法,以下是具体的请求参数:

  • 分页页码(pageIndex):用于控制分页请求的当前页码。
  • 分页页数(pageSize):每页返回的数据条数,默认设置为50。
  • 入库单号(goodsDocNo):指定要查询的入库单号。
  • 创建时间的起始时间(startDate):用于过滤创建时间在此时间之后的数据,动态值为上次同步时间。
  • 创建时间的结束时间(endDate):用于过滤创建时间在此时间之前的数据,动态值为当前时间。
  • 入库类型(inouttype):固定值为101,表示采购入库。
  • 仓库ID(warehouseId)仓库编号(warehouseCode)供应商ID(vendId)供应商编号(vendCode)等字段可根据需求进行填充。

示例请求体如下:

{
  "pageIndex": "1",
  "pageSize": "50",
  "goodsDocNo": "",
  "startDate": "{{LAST_SYNC_TIME|datetime}}",
  "endDate": "{{CURRENT_TIME|datetime}}",
  "inouttype": "101",
  "warehouseId": "",
  "warehouseCode": "",
  "vendId": "",
  "vendCode": "",
  "billNo": "",
  "userName": "",
  "gmtModifiedStart": "",
  "gmtModifiedEnd": ""
}

数据清洗与加工

在获取到原始数据后,需要对数据进行清洗和初步加工,以便后续的数据转换与写入。以下是几个关键步骤:

  1. 字段映射与转换

    • 将原始数据中的字段映射到目标系统所需的字段。例如,将recId映射为目标系统中的唯一标识符。
    • 对日期格式进行标准化处理,将所有日期字段统一转换为目标系统所需的格式。
  2. 数据过滤与校验

    • 根据业务规则过滤掉不符合条件的数据。例如,只保留状态为“已审核”的入库单据。
    • 对关键字段进行校验,如检查goodsDocNo是否为空,确保数据完整性。
  3. 增量更新处理

    • 利用startDateendDate参数实现增量更新,只获取自上次同步以来的新数据。
    • 在处理过程中记录本次同步的最大更新时间,以便下次同步时使用。

示例代码片段如下:

import requests
import datetime

# 配置API请求参数
api_url = 'https://api.jikecloud.com/erp.storage.goodsdocin'
headers = {'Content-Type': 'application/json'}
payload = {
    'pageIndex': '1',
    'pageSize': '50',
    'startDate': last_sync_time.strftime('%Y-%m-%d %H:%M:%S'),
    'endDate': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
    'inouttype': '101'
}

# 发起API请求
response = requests.post(api_url, headers=headers, json=payload)
data = response.json()

# 数据清洗与加工
cleaned_data = []
for record in data['records']:
    if record['status'] == '已审核':
        cleaned_record = {
            'id': record['recId'],
            'goodsDocNo': record['goodsDocNo'],
            # 更多字段映射...
        }
        cleaned_data.append(cleaned_record)

# 返回清洗后的数据
return cleaned_data

通过上述步骤,我们成功调用了吉客云接口erp.storage.goodsdocin获取采购入库明细,并对其进行了必要的数据清洗和加工,为后续的数据转换与写入做好了准备。 钉钉与MES系统接口开发配置

数据集成生命周期中的ETL转换与写入MySQL

在数据集成生命周期的第二步,我们需要将已经集成的源平台数据进行ETL(Extract, Transform, Load)转换,转为目标平台 MySQL API 接口所能够接收的格式,并最终写入目标平台。这一过程不仅仅是简单的数据传输,而是涉及到复杂的数据清洗、转换和加载步骤。本文将详细探讨如何利用元数据配置完成这一任务。

元数据配置解析

元数据配置是实现ETL过程的核心,通过定义字段映射和转换规则,确保源数据能够正确地转换为目标平台所需的格式。以下是我们使用的元数据配置:

{
  "api": "executeReturn",
  "method": "POST",
  "idCheck": true,
  "request": [
    {
      "field": "main_params",
      "label": "主参数",
      "type": "object",
      "children": [
        {"field": "bill_no", "label": "出入库单号", "type": "string", "value":"{goodsdocNo}"},
        {"field": "in_out_type", "label": "出入库类型", "type": "string", "describe":"出入库类型(101-采购入库,205-采购退货)", "value":"101"},
        {"field": "in_out_date", "label": "出入库时间", "type": "datetime", 
            "value":"_function FROM_UNIXTIME( ( {inOutDate} / 1000 ) ,'%Y-%m-%d %T' )"},
        {"field": "in_out_reason", "label": "出入库原因", 
            "type":"string","value":"{inOutReason}"},
        {"field":...}
      ]
    },
    {
      ...
    }
  ],
  ...
}

数据清洗与转换

在这一阶段,我们需要对源数据进行清洗和转换,以确保其符合目标平台MySQL API接口的要求。例如,in_out_date字段需要将Unix时间戳转换为标准日期时间格式,这可以通过如下函数实现:

_function FROM_UNIXTIME( ( {inOutDate} / 1000 ), '%Y-%m-%d %T' )

类似地,其他字段也需要进行相应的处理,如将状态码、对账类型等字段映射到目标系统所需的值。

构建SQL语句

根据元数据配置,我们需要构建相应的SQL插入语句,以便将清洗和转换后的数据写入MySQL数据库。以下是主表和子表的插入语句示例:

主表插入语句:

INSERT INTO `lhhy_srm`.`supplier_purchase_in_out`
(`bill_no`, `in_out_type`, `in_out_date`, `in_out_reason`, `source_bill_no`, 
 `status`, `send_type`, `create_type`, `suppiler_code`, `suppiler_name`,
 `warehouse_code`, `warehouse_name`, `purchase_org_code`, `purchase_org_name`,
 `purchase_dept`, `purchaser`, `delivery_bill`, `take_delivery_bill`,
 `purchase_memo`, `remark`, `create_time`, `create_by`)
VALUES
(<{bill_no: }>, <{in_out_type: }>, <{in_out_date: CURRENT_TIMESTAMP}>, 
 <{in_out_reason: }>, <{source_bill_no: }>, <{status: }>, <{send_type: }>,
 <{create_type: 1}>, <{suppiler_code: }>, <{suppiler_name: }>,
 <{warehouse_code: }>, <{warehouse_name: }>, <{purchase_org_code: }>,
 <{purchase_org_name: }>, <{purchase_dept: }>, <{purchaser: }>,
 <{delivery_bill: }>, <{take_delivery_bill: }>, <{purchase_memo: }>,
 <{remark: }>, <{create_time: }>, <{create_by: }> );

子表插入语句:

INSERT INTO `lhhy_srm`.`supplier_purchase_in_out_detail`
(`order_id`, `goods_no`, `goods_name`, `bar_code`, 
`spec_name`, `cate_name`, `brand_name`,
`price`,`unit`,`quantity`,`tax_price`,
`tax_rate`,`tax_amount`,`purchaser_tax_price`,
`supplier_tax_price`,`confirm_tax_price`,`goods_memo`,
`remark`)
VALUES
(<{lastInsertId: }>,<{goods_no: }>,<{goods_name: }>
,< {bar_code :}> ,< {spec_name :}> ,< {cate_name :}> ,
< {brand_name :}> ,< {price :}> ,< {unit :}> ,< {quantity :}>
,< {tax_price :}> ,< {tax_rate :0 }> ,< {tax_amount :0 }> ,
< {purchaser_tax_price :0 }> ,< {supplier_tax_price :0 }>
,< {confirm_tax_price :0 }> ,< {goods_memo :}> ,< {remark :}> );

数据加载

最后一步是将构建好的SQL语句通过API接口发送到MySQL数据库,实现数据的最终写入。这里我们使用POST方法,通过API接口executeReturn执行上述SQL语句。

通过以上步骤,我们实现了从源平台到目标平台的数据ETL转换与写入,确保了数据在不同系统间的无缝对接。这一过程不仅提升了业务透明度和效率,也确保了数据的一致性和准确性。 如何开发金蝶云星空API接口