使用ETL技术将平台数据写入MySQL

  • 轻易云集成顾问-曹润

班牛数据集成到MySQL:BDS对账班牛售后理赔单

在企业级应用系统中,实现高效的数据集成是极为重要的任务,尤其是在处理跨平台的数据交互时。本案例聚焦于将班牛系统中的售后理赔单数据高效、稳定地集成到MySQL数据库中,方案命名为“BDS对账班牛售后理赔单”。

首先,我们通过调用班牛提供的task.list接口定期抓取待处理的售后理赔单数据。为了确保大规模数据能够被快速、高效地集成,我们利用了轻易云平台支持的大吞吐量写入功能,将这些数据批量导入至MySQL数据库。此外,通过自定义的数据转换逻辑,使得源端和目标端的数据结构差异问题得到解决。

在具体实现过程中,需要特别注意以下技术要点:

  1. 班牛API分页与限流管理: 为了获取全部待处理的理赔单,需要妥善处理来自班牛API的分页响应,并考虑限流策略,以防止因请求过多导致服务不可用。

  2. MySQL API高效写入: 使用executeReturn API进行批量操作,保障大量数据能够迅速写入,同时需设置可靠性机制以防丢失任何一个关键记录。

  3. 实时监控及异常处理: 采用集中监控和告警系统,全程跟踪每个步骤的数据状态,如果出现异常情况,则触发错误重试机制,对失败任务重新执行,以保证所有数据信息准确无误地完成迁移。

  4. 数据质量控制与日志记录: 集成过程中,对输入输出的数据进行一致性校验,确保录入到MySQL中的每条记录都是完整、准确且符合预期。此外,通过详细的日志记录功能,可以随时回溯任意一笔交易,有助于问题诊断和性能优化。

接下来讲述的是该方案实施以来遇见的问题以及相应的解决办法,从而进一步增强整体系统集成效率和可靠性。 数据集成平台可视化配置API接口

调用源系统班牛接口task.list获取并加工数据

在数据集成生命周期的第一步中,调用源系统接口获取数据是至关重要的环节。本文将深入探讨如何通过轻易云数据集成平台调用班牛接口task.list,并对获取的数据进行初步加工。

接口调用配置

在轻易云数据集成平台中,我们使用元数据配置来定义接口调用的参数和行为。以下是针对班牛接口task.list的具体配置:

{
  "api": "task.list",
  "effect": "QUERY",
  "method": "GET",
  "number": "-1",
  "id": "-1",
  "idCheck": true,
  "request": [
    {"field": "project_id", "label": "群组ID", "type": "string", "value": "25821"},
    {"field": "page_size", "label": "page_size", "type": "string", "value": "50"},
    {"field": "page_num", "label": "page_num", "type": "string", "value": "1"},
    {"field": "star_created", "label": "起始时间", "type": "string"},
    {"field": "end_created", "label": "结束时间", "type":"string"},
    {"field":"star_modified","label":"修改时间起始时间","type":"string","value":"_function DATE_FORMAT(DATE_ADD(NOW(),INTERVAL - 24 HOUR),'%Y-%m-%d %H:%i:%s')"},
    {"field":"end_modified","label":"修改时间结束时间","type":"string","value":"{{CURRENT_TIME|datetime}}"}
  ],
  "autoFillResponse": true,
  ...
}

参数解析与设置

  1. API与方法:我们使用的是task.list API,采用HTTP GET方法进行请求。
  2. 请求参数
    • project_id: 固定值为"25821",用于指定群组ID。
    • page_size: 每页返回记录数,设置为"50"。
    • page_num: 当前页码,初始值为"1"。
    • star_createdend_created: 用于指定查询的起止创建时间,可以根据需求动态赋值。
    • star_modified: 修改时间起始时间,使用函数计算当前时间减去24小时。
    • end_modified: 修改时间结束时间,使用当前系统时间。

这些参数确保了我们能够准确地从班牛系统中获取所需的数据。

数据请求与清洗

在实际操作中,我们需要处理分页数据。由于每次请求只能返回固定数量的数据(如上例中的50条),因此需要循环调用API以获取所有符合条件的数据。

import requests
import json

def fetch_data():
    url = 'https://api.banniu.com/task.list'
    headers = {'Content-Type': 'application/json'}

    params = {
        'project_id': '25821',
        'page_size': '50',
        'page_num': '1',
        'star_modified': (datetime.now() - timedelta(hours=24)).strftime('%Y-%m-%d %H:%M:%S'),
        'end_modified': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    }

    all_data = []

    while True:
        response = requests.get(url, headers=headers, params=params)
        data = response.json()

        if not data['tasks']:
            break

        all_data.extend(data['tasks'])

        params['page_num'] = str(int(params['page_num']) + 1)

    return all_data

上述代码展示了如何通过循环分页获取所有符合条件的数据,并将其存储在一个列表中。

数据转换与写入

在获取到原始数据后,我们通常需要对其进行清洗和转换,以便后续处理。例如,将日期格式统一、去除无效字段等。以下是一个简单的示例:

def clean_data(raw_data):
    cleaned_data = []

    for item in raw_data:
        cleaned_item = {
            'task_id': item['id'],
            'project_id': item['project_id'],
            'created_at': item['created_at'],
            'modified_at': item['modified_at']
            # 添加更多需要保留或转换的字段
        }

        cleaned_data.append(cleaned_item)

    return cleaned_data

通过上述步骤,我们可以将原始数据转换为更适合业务需求的格式,并准备好进行下一步的数据处理或写入目标系统。

小结

本文详细介绍了如何通过轻易云数据集成平台调用班牛接口task.list并对获取的数据进行初步加工。通过合理配置元数据、循环分页获取数据以及清洗和转换数据,我们能够高效地完成数据集成生命周期中的第一步,为后续的数据处理打下坚实基础。 如何对接用友BIP接口

数据集成:将源平台数据ETL转换并写入MySQL API接口

在轻易云数据集成平台的生命周期中,数据转换与写入是关键步骤之一。本文将详细探讨如何将已经集成的源平台数据进行ETL(提取、转换、加载)处理,并最终通过MySQL API接口写入目标平台。

数据提取与清洗

首先,我们需要从源系统中提取所需的数据。这个过程包括从不同的数据源获取原始数据,并对其进行初步清洗和规范化处理,以确保数据的一致性和准确性。例如,使用MongoDB查询语句从数据库中提取特定字段的值:

{
  "field": "org_name",
  "label": "组织名称",
  "type": "string",
  "value": "_mongoQuery e8890e68-7f56-33d9-ae79-492a7c9cbead findField=content.options_title where={\"content.options_id\":{\"$eq\":\"{{26387}}\"}}"
}

上述配置表示通过MongoDB查询获取“组织名称”字段的值,其中content.options_id等于{{26387}}

数据转换

在完成数据提取后,下一步是将这些数据转换为目标平台所能接受的格式。这里我们主要使用JSON格式来定义API请求参数,并进行必要的字段映射和类型转换。例如:

{
  "api": "executeReturn",
  "method": "POST",
  "idCheck": true,
  "request": [
    {
      "field": "main_params",
      "label": "主参数",
      "type": "object",
      "children": [
        {"field": "bill_no", "label": "单据编号", "type": "string", "value": "{{-1}}"},
        {"field": "trade_no", "label": "系统订单编号", "type": "string", "value": "{{80981}}"},
        {"field": "online_trade_no", "label": "网店订单号", "type": "string", "value": "{{26390}}"},
        {"field": "source_bill_no",   ...   }
      ]
    }
  ]
}

在这个配置中,我们定义了一个名为main_params的对象,其中包含了多个子字段,如bill_notrade_no等。这些字段将被映射到目标平台的相应字段中。

数据加载

最后一步是将转换后的数据通过API接口写入目标平台。在这里,我们使用MySQL API接口,通过HTTP POST请求将数据发送到目标数据库。例如:

{
  ...
  {
    field: 'main_sql',
    label: '主语句',
    type: 'string',
    value: `INSERT INTO \`lhhy_srm\`.\`supplier_return_change\`
            (\`bill_no\`, \`trade_no\`, \`online_trade_no\`, ...)
            VALUES (<{bill_no: }>, <{trade_no: }>, <{online_trade_no: }>, ...)`
  },
  ...
}

上述SQL语句用于插入主表记录,其中各个字段值通过占位符进行动态替换。类似地,我们还可以定义扩展表的插入语句:

{
  field: 'extend_sql_1',
  label: '1:1扩展语句',
  type: 'string',
  value: `INSERT INTO \`lhhy_srm\`.\`supplier_return_change_detail\`
          (\`order_id\`, \`supplier_code\`, \`supplier_name\`, ...)
          VALUES (<{lastInsertId: }>, <{supplier_code: }>, <{supplier_name: }>, ...)`
}

这个配置表示在主表插入成功后,将相关联的数据插入到扩展表中。

实际案例应用

在实际应用中,我们可能会遇到需要处理复杂的数据结构和多层嵌套关系。例如,需要同时处理多个扩展参数和附件信息:

{
  field: 'extend_params_2',
  label: '1:1扩展参数',
  type: 'object',
  children: [
    {"field":"lastInsertId","label":"order_id","type":"string","value":"lastInsertId"},
    {"field":"file_name","label":"附件名称","type":"string","value":"_function CEIL(RAND()*10000000000)"},
    {"field":"url","label":"附件链接","type":"string","value":"{{26401}}"}
  ]
}

这种情况下,我们需要确保每个子字段都能正确映射并写入到相应的数据库表中。

综上所述,通过合理配置元数据并利用轻易云数据集成平台强大的ETL功能,可以高效地实现不同系统间的数据无缝对接,确保数据在整个生命周期中的一致性和准确性。 企业微信与ERP系统接口开发配置