ETL流程优化:如何将班牛数据写入MySQL

  • 轻易云集成顾问-李国敏

BDS对账班牛售后代发补发单(修正错误货品编码)

在数据集成的实际业务场景中,如何高效、可靠地将班牛系统中的数据无缝接入MySQL数据库,是一个技术难点。本文将分享我们运用轻易云数据集成平台,成功实现BDS对账班牛售后代发补发单(修正错误货品编码)任务的具体技术方案。

首先,我们需要解决的是通过定时任务调用班牛的数据获取API task.list,来确保每个周期内数据的完整抓取。为此,我们配置了定时器以周期性地从班牛系统拉取最新的数据,并设计了重试机制,以防止由于网络或服务异常导致的数据丢失。

在获取到原始数据之后,通过自定义逻辑来处理不同业务场景下可能出现的数据格式差异和特定需求,例如本案例中的“修正错误货品编码”。然后,将经过转换和验证后的有效数据批量写入到MySQL数据库中。为了应对大规模并行写入带来的性能瓶颈问题,我们优化了MySQL的写入API executeReturn 的使用策略,实现高吞吐量、高效率的数据导入。

整个过程中,为保证交易透明度和安全性,在平台上实时监控每一个步骤,包括接口调用状态、数据流动情况以及处理结果。一旦检测到异常状况,例如接口限流或分页策略不当造成的问题,会及时触发告警,并执行预设的纠错流程。

以下章节我们将详细解析如何利用可视化工具设计并管理这些复杂逻辑,从而最终达到企业快速响应市场需求,提高运营效率的目标。 打通金蝶云星空数据接口

调用源系统班牛接口task.list获取并加工数据

在数据集成的生命周期中,调用源系统接口获取数据是至关重要的第一步。本文将深入探讨如何使用轻易云数据集成平台调用班牛接口task.list,并对获取的数据进行初步加工。

接口调用配置

首先,我们需要了解如何配置和调用班牛的task.list接口。以下是该接口的元数据配置:

{
  "api": "task.list",
  "effect": "QUERY",
  "method": "GET",
  "number": "-1",
  "id": "-1",
  "idCheck": true,
  "request": [
    {"field": "project_id", "label": "群组ID", "type": "string", "value": "77206"},
    {"field": "page_size", "label": "page_size", "type": "string", "value": "50"},
    {"field": "page_num", "label": "page_num", "type": "string", "value": "1"},
    {"field": "star_created", "label": "起始时间", "type": "string"},
    {"field": "end_created", "label": "结束时间", "type": "string"},
    {"field": "star_modified", 
        "label":"修改时间起始时间","type":"string","value":"_function DATE_FORMAT(DATE_ADD(NOW(),INTERVAL - 24 HOUR),'%Y-%m-%d %H:%i:%s')"
    },
    {"field":"end_modified","label":"修改时间结束时间","type":"string","value":"{{CURRENT_TIME|datetime}}"}
  ],
  ...
}

参数详解

  • project_id: 群组ID,用于指定要查询的项目组。
  • page_size: 每页返回的数据条数,这里设置为50。
  • page_num: 当前页码,从第一页开始。
  • star_created: 起始创建时间,可以为空或根据需求动态设置。
  • end_created: 截止创建时间,同样可以为空或动态设置。
  • star_modified: 修改时间起始时间,这里使用了一个函数来获取当前时间前24小时的时间点。
  • end_modified: 修改时间结束时间,使用当前系统时间。

这些参数通过GET请求方式传递给班牛系统,以获取符合条件的数据列表。

数据请求与清洗

在实际操作中,我们需要确保请求参数的正确性和完整性。以下是一个示例代码片段,展示如何通过轻易云平台发起请求:

import requests
import datetime

# 设置请求参数
params = {
    'project_id': '77206',
    'page_size': '50',
    'page_num': '1',
    'star_created': '',
    'end_created': '',
    'star_modified': (datetime.datetime.now() - datetime.timedelta(hours=24)).strftime('%Y-%m-%d %H:%M:%S'),
    'end_modified': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
}

# 发起GET请求
response = requests.get('https://api.banniu.com/task.list', params=params)

# 检查响应状态
if response.status_code == 200:
    data = response.json()
else:
    raise Exception(f"Failed to fetch data, status code: {response.status_code}")

数据转换与写入

获取到原始数据后,需要对其进行清洗和转换。例如,修正错误货品编码等操作。以下是一个简单的数据处理示例:

def clean_data(data):
    for item in data['tasks']:
        # 修正错误货品编码逻辑
        if item['product_code'] == 'ERROR_CODE':
            item['product_code'] = correct_product_code(item)

def correct_product_code(item):
    # 根据业务逻辑修正货品编码
    return 'CORRECT_CODE'

# 清洗数据
clean_data(data)

# 将处理后的数据写入目标系统(示例)
write_to_target_system(data)

在这个过程中,我们可以利用轻易云平台提供的自动填充响应功能(autoFillResponse),简化部分手动操作,提高效率。

实时监控与优化

为了确保数据处理过程的透明度和高效性,可以利用轻易云平台的实时监控功能,跟踪每个环节的数据流动和处理状态。这样不仅可以及时发现问题,还能不断优化集成方案。

通过上述步骤,我们完成了从调用班牛接口获取数据到初步加工处理的一系列操作。这只是轻易云数据集成生命周期中的一部分,但却是至关重要的一环,为后续的数据转换与写入奠定了基础。 用友与外部系统接口集成开发

数据集成平台生命周期的第二步:ETL转换与写入MySQL API接口

在数据集成平台的生命周期中,ETL(提取、转换、加载)过程是至关重要的一环。本文将详细探讨如何将已经集成的源平台数据通过ETL转换为目标平台 MySQL API 接口所能接收的格式,并最终写入目标平台。

元数据配置解析

元数据配置是整个ETL过程的核心,它定义了如何从源系统提取数据、如何转换数据以及如何将数据加载到目标系统。以下是元数据配置的详细解析:

{
  "api": "executeReturn",
  "method": "POST",
  "idCheck": true,
  "request": [
    {
      "field": "main_params",
      "label": "主参数",
      "type": "object",
      "children": [
        {"field": "bill_no", "label": "单据编号", "type": "string", "value": "{{-1}}"},
        {"field": "solution", "label": "解决方案", "type": "string", "describe":"解决方案(73628-配件补发,76615-成品代发)", "value":"{{77212}}"},
        {"field": "total_solution", "label":"处理方案","type":"string","value":"{{77242}}"},
        {"field":"date","label":"日期","type":"datetime","value":"{{4}}"},
        {"field":"responsible_party","label":"责任方","type":"string"},
        {"field":"responsible_party_detail","label":"责任方细分","type":"string","value":"{reason}"},
        {"field":"online_trade_no","label":"网店订单号","type":"string","value":"{{77213}}"},
        {"field":"trade_no","label":"系统订单编号","type":"string"},
        {"field":"source_bill_no","label":"源单编号","type":"string","value":"{{-1}}"},
        {"field":"reason","label":"原因","type":"string","value":"{reason}"},
        {"field":"status","label":"工单状态","type":"string","describe":"工单状态(1-启用,0-禁用)","value":"1"},
        {"field":"shop_code","label":"销售渠道编码","type":"string"},
        {"field":"shop_name","label":"销售渠道名称","type":"string", 
            "value":
            "_mongoQuery edcc99df-5589-3530-96c3-807e26f3f171 findField=content.options_title where={\"content.options_id\":{\"$eq\":\"{{77214}}\"}}" },
        // ...省略部分字段
      ]
    },
    {
      "field": "extend_params_1",
      "label": "1:1扩展参数",
      "type": "array",
      // ...省略部分字段
    }
  ],
  // ...省略部分字段
}

数据提取与转换

在元数据配置中,我们定义了多个字段及其对应的数据类型和默认值。这些字段从源系统中提取并进行必要的转换。例如:

  • bill_no(单据编号):从源系统直接提取。
  • solution(解决方案):根据业务逻辑设定默认值{{77212}}
  • date(日期):直接从源系统提取并保持原始格式。

特别需要注意的是一些复杂字段,如shop_name,它需要通过MongoDB查询来获取:

{
  field: 'shop_name',
  label: '销售渠道名称',
  type: 'string',
  value: '_mongoQuery edcc99df-5589-3530-96c3-807e26f3f171 findField=content.options_title where={"content.options_id":{"$eq":"' + {{77214}} + '"}}'
}

这种情况下,我们需要先执行MongoDB查询,再将结果填充到目标字段中。

数据加载到MySQL

在完成所有必要的数据转换后,下一步是将这些数据加载到MySQL数据库。元数据配置中的main_sqlextend_sql_1定义了具体的SQL插入语句:

INSERT INTO `lhhy_srm`.`supplier_after_sale_send`
(`bill_no`, `solution`, `total_solution`, `date`, `responsible_party`, ...)
VALUES (<{bill_no: }>, <{solution: }>, <{total_solution: }>, <{date: CURRENT_TIMESTAMP}>, ...);

INSERT INTO `lhhy_srm`.`supplier_after_sale_send_detail`
(`order_id`, `goods_no`, `goods_name`, `bar_code`, ...)
VALUES (<{lastInsertId: }>, <{goods_no: }>, <{goods_name: }>, <{bar_code: }>, ...);

这些SQL语句会将转换后的数据插入到相应的MySQL表中。需要特别注意的是,在插入过程中,我们可能需要处理一些ID关联问题,例如使用:lastInsertId来获取前一个插入操作生成的ID,并在后续操作中使用它。

实践案例

假设我们有一条售后补发单的数据,需要通过上述ETL过程写入MySQL数据库。首先,我们从源系统提取相关字段并进行必要的转换:

{
  bill_no: '123456',
  solution: '73628',
  total_solution: '77242',
  date: '2023-10-01T00:00:00Z',
  responsible_party: '供应商A',
  responsible_party_detail: '质量问题',
  online_trade_no: '9876543210',
  trade_no: '6543210987',
  source_bill_no: '1234567890',
  reason: '货品损坏',
  status: '1',
  shop_code: 'SC001',
  shop_name:
    '_mongoQuery edcc99df-5589-3530-96c3-807e26f3f171 findField=content.options_title where={"content.options_id":{"$eq":"' + {{77214}} + '"}}'
}

然后,通过执行定义好的SQL语句,将这些数据插入到目标MySQL数据库中:

INSERT INTO `lhhy_srm`.`supplier_after_sale_send`
(`bill_no`, `solution`, `total_solution`, `date`, ...)
VALUES ('123456', '73628', '77242', CURRENT_TIMESTAMP, ...);

通过这种方式,我们实现了从源系统到目标系统的数据无缝对接,确保了数据的一致性和完整性。

总结来说,通过精确定义元数据配置和合理设计ETL流程,可以有效地将复杂的数据集成任务简化为可执行的操作步骤,从而提高整个系统的数据处理效率和可靠性。 轻易云数据集成平台金蝶集成接口配置