ETL转换与写入MySQL的技术实现:吉客云数据集成案例

  • 轻易云集成顾问-何语琴

吉客云数据集成到MySQL的技术案例:采购单数据处理

在企业日常运营中,采购单数据的高效管理至关重要。针对这一需求,我们选择使用轻易云的数据集成平台,将吉客云(Gikoo Cloud)的采购单数据无缝对接到MySQL数据库。本次分享将深入探讨如何通过调用吉客云API接口erp.purch.get来获取实时采购单信息,并利用MySQL的写入API execute进行高吞吐量的数据存储,确保每一条业务记录都精准落地。

方案概述

本案件是基于"吉客云采购单-BDSBI_pro"方案实施。我们充分利用平台提供的可视化操作界面和中心化监控系统,实现了全流程自动化管理和实时状态跟踪。同时,通过自定义的数据转换逻辑和质量监控机制,有效应对复杂多变的数据格式与业务规则,确保整个集成过程中的稳定性和可靠性。

实现步骤

首先,通过定时任务触发器周期性调用erp.purch.get接口,从吉客云中抓取最新的采购订单信息。在此过程中,为了解决分页与限流问题,我们设计了一套稳健的请求控制策略,以避免因过度请求导致API服务异常。此外,我们还配置了异常检测及重试机制,不仅能够及时发现并处理潜在的问题,还能保证每一次数据抓取任务都顺利完成。

其次,在获取到完整且准确的原始数据后,我们会对其进行必要的预处理、过滤及格式转换。这一步骤主要解决的是两端系统之间可能存在的数据结构差异。目前采用自定义逻辑为主,使得这些差异得以最优方式呈现。值得注意的是,每一次转换步骤都会被详细记录,以便后续审计和追踪。

最后,将整理后的有效数据通过MySQL API execute批量写入目标数据库。在这一环节中特别注重提高写入效率,同时保障事务的一致性。借助分片并行处理技术,大幅提升了多表联合操作下的大规模插入性能表现。不仅如此,该流程同样配备完善的日志记录功能,以便在发生错误时快速定位原因并实施补救措施。

真人真事胜雄辩,这一系列严谨、精巧构建的方法,让我们成功实现了从吉客云到MySQL全链路无缝集成,并解决实际应用中的各种挑战。具体执行细节将在后续章节中详细展开,包括不同场景下具体参数调整以及优化技巧等内容,希望能够为更多项目提供宝贵借鉴。 用友BIP接口开发配置

调用吉客云接口erp.purch.get获取并加工数据

在轻易云数据集成平台中,调用源系统接口是数据处理生命周期的第一步。本文将深入探讨如何通过调用吉客云接口erp.purch.get来获取采购单数据,并进行必要的加工处理。

接口配置与调用

首先,我们需要根据元数据配置来设置接口调用参数。以下是元数据配置的详细信息:

{
  "api": "erp.purch.get",
  "effect": "QUERY",
  "method": "POST",
  "number": "orderNum",
  "id": "id",
  "idCheck": true,
  "request": [
    {"field":"pageIndex","label":"分页页码","type":"string","describe":"111"},
    {"field":"pageSize","label":"分页页数","type":"string","describe":"111","value":"50"},
    {"field":"warehouseCode","label":"仓库编号(多个用\",\"号隔开)","type":"string"},
    {"field":"startDate","label":"订单创建的起始时间","type":"string","value":"{{LAST_SYNC_TIME|datetime}}"},
    {"field":"endDate","label":"订单创建的结束时间","type":"string","value":"{{CURRENT_TIME|datetime}}"},
    {"field":"outSkuCode","label":"与外部货品关联的code","type":"string"},
    {"field":"revwStatus","label":"订单状态","type":"string","describe":"订单状态,多个用\",\"隔开(0:待递交;1:待审核;2:执行中;3:执行中止;10:审核中 ;20:订单完成 )(2,20均属于已审核)", "value": "2"},
    {"field": "startModifyDate", "label": "修改的开始时间", "type": "string"},
    {"field": "endModifyDate", "label": "修改的结束时间", "type": "string"},
    {"field": "startProvDate", "label": "审核的开始时间", "type": "string"},
    {"field": "endProvDate", "label": "审核的结束时间", "type": "string"},
    {"field": "auditUserId", "label": "审核人id", "type": : string},
    {"field" : vendOrderIds, label: 外部单号, type: string}
  ],
  autoFillResponse: true
}

参数设置

在实际调用过程中,需要特别注意以下几个关键参数:

  1. 分页参数

    • pageIndex:当前页码。
    • pageSize:每页记录数,默认值为50。
  2. 时间参数

    • startDateendDate:用于限定订单创建时间范围。startDate使用上次同步时间({{LAST_SYNC_TIME|datetime}}),而endDate则使用当前时间({{CURRENT_TIME|datetime}})。
    • startModifyDate, endModifyDate, startProvDate, endProvDate: 用于限定修改和审核时间范围。
  3. 状态参数

    • revwStatus: 用于过滤订单状态,默认值为2(已审核)。
  4. 其他参数

    • warehouseCode: 仓库编号,可以指定多个,用逗号分隔。
    • outSkuCode: 与外部货品关联的code。
    • auditUserId: 审核人ID。
    • vendOrderIds: 外部单号。

数据请求与清洗

在完成参数设置后,通过POST方法调用吉客云接口获取采购单数据。返回的数据需要进行清洗和转换,以便后续处理和存储。以下是一个示例代码片段,展示了如何进行数据请求和初步清洗:

import requests
import datetime

# 设置请求URL和头信息
url = 'https://api.jikecloud.com/erp.purch.get'
headers = {'Content-Type': 'application/json'}

# 设置请求参数
params = {
    'pageIndex': '1',
    'pageSize': '50',
    'warehouseCode': '',
    'startDate': (datetime.datetime.now() - datetime.timedelta(days=1)).strftime('%Y-%m-%d %H:%M:%S'),
    'endDate': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
    'revwStatus': '2'
}

# 发起POST请求
response = requests.post(url, json=params, headers=headers)

# 检查响应状态并处理返回的数据
if response.status_code == 200:
    data = response.json()

    # 数据清洗与转换逻辑
    cleaned_data = []

    for record in data['data']:
        cleaned_record = {
            'orderNum': record['orderNum'],
            'id': record['id'],
            # 添加更多字段转换逻辑...
        }
        cleaned_data.append(cleaned_record)

else:
    print(f"Error: {response.status_code}")

# 输出清洗后的数据
print(cleaned_data)

自动填充响应

在元数据配置中,我们可以看到一个重要属性——autoFillResponse: true。该属性表示系统会自动填充响应结果,这极大简化了开发者的数据处理工作。在实际应用中,我们只需关注核心业务逻辑,而不必担心复杂的数据映射和转换过程。

通过上述步骤,我们成功实现了从吉客云获取采购单数据并进行初步清洗,为后续的数据转换与写入奠定了基础。这一过程展示了轻易云平台强大的数据集成能力,使得异构系统间的数据对接变得高效且透明。 用友BIP接口开发配置

数据集成中的ETL转换与写入目标平台MySQL的技术实现

在数据集成生命周期中,将源平台的数据转换为目标平台所能接收的格式,并最终写入目标平台,是一个至关重要的步骤。本文将详细探讨如何通过轻易云数据集成平台,将吉客云采购单数据转换并写入MySQL数据库。

元数据配置解析

元数据配置是ETL过程的核心,它定义了从源数据到目标数据的映射关系和转换规则。以下是关键元数据配置项的解析:

  1. API接口配置

    {"api":"execute","effect":"EXECUTE","method":"POST","idCheck":true}

    该配置定义了API接口的基本信息,包括接口名称、执行效果、HTTP方法以及是否进行ID检查。

  2. 请求参数映射

    "request":[{"field":"main_params","label":"主参数","type":"object","children":[...]}]

    main_params对象包含了所有需要传递给目标平台的数据字段及其映射关系。每个字段都包含以下属性:

    • field: 字段名
    • label: 字段标签
    • type: 数据类型(如string, int, float等)
    • value: 映射值或转换函数
  3. SQL语句生成

    "otherRequest":[{"field":"main_sql","label":"主语句","type":"string","value":"INSERT INTO ..."}]

    main_sql字段定义了最终执行的SQL插入语句,使用占位符来表示动态填充的数据字段。

数据字段转换示例

在实际操作中,我们需要将源平台的数据按照元数据配置进行转换。以下是几个关键字段的转换示例:

  1. 时间戳转换 源数据中的时间戳通常以毫秒为单位,需要转换为MySQL可识别的日期时间格式。

    {"field":"gmt_create","label":"单据创建日期","type":"string","value":"_function FROM_UNIXTIME(  ( {gmtCreate} \/ 1000 )  ,'%Y-%m-%d %T' )"}

    使用FROM_UNIXTIME函数将时间戳转换为标准日期时间格式。

  2. 嵌套对象处理 某些字段可能是嵌套对象,需要逐层解析。

    {"field":"main_params","label":"主参数","type":"object","children":[{"field":"order_id","label":"采购单id","type":"string","value":"{id}"}, ...]}

    在这种情况下,需要确保每个子字段都正确映射到相应的源数据字段。

  3. 条件查询 有些字段需要根据条件从其他数据库中查询得到。

    {"field":"second_category","label":"二级分类","type":"string","value":"_mongoQuery c42915a7-d97b-3bc1-9c57-76291c8e648f findField=content.goodsField5 where={\"content.goodsCode\":{\"$eq\":\"{goodsNo}\"}}"}

    使用_mongoQuery函数,根据货品编号从MongoDB中查询对应的二级分类信息。

SQL语句生成与执行

最终生成的SQL插入语句如下:

INSERT INTO `lehua`.`purchase_order`
(`order_id`, `order_num`, `bill_source_no`, `gmt_create`, `quantity`, ...)
VALUES
(<{order_id: }>, <{order_num: }>, <{bill_source_no: }>, <{gmt_create: }>, <{quantity: }>, ...);

该语句使用占位符表示动态填充的数据字段。在实际执行时,这些占位符会被相应的数据值替换,从而完成数据插入操作。

实际案例应用

假设我们有一条吉客云采购单记录,其部分原始数据如下:

{
    "id": "12345",
    "orderNum": "PO20231001",
    "billSourceNo": "REQ20231001",
    "gmtCreate": 1696243200000,
    "quantity": 100,
    ...
}

通过上述元数据配置和转换规则,该记录将被处理并生成如下SQL语句:

INSERT INTO `lehua`.`purchase_order`
(`order_id`, `order_num`, `bill_source_no`, `gmt_create`, `quantity`, ...)
VALUES
('12345', 'PO20231001', 'REQ20231001', '2023-10-02 00:00:00', 100, ...);

该SQL语句将被发送到MySQL数据库,完成采购单记录的插入操作。

通过以上步骤,我们成功地实现了从吉客云到MySQL的数据ETL转换和写入。这不仅确保了数据的一致性和完整性,还极大地提升了系统间的数据交互效率。 打通用友BIP数据接口