轻易云与MySQLAPI接口的数据ETL解决方案

  • 轻易云集成顾问-吕修远

测试-查询销售渠道信息(已删除数据)-dange 集成案例解析

在此次技术案例中,我们将详细解析如何通过轻易云数据集成平台,将吉客云的数据高效、可靠地集成到MySQL数据库中。本次方案旨在实现对吉客云销售渠道信息,包括已删除数据的定时抓取,并批量写入到MySQL中,方案名称为“测试-查询销售渠道信息(已删除数据)-dange”。

注意事项与挑战

对于此次集成任务,我们主要面临以下几个技术挑战:

  1. 大量数据快速写入:确保从吉客云获取的大量销售渠道信息能够高吞吐量地写入到MySQL,实现数据处理高效性。
  2. 实时监控和异常处理:全面跟踪每一个数据集成任务的状态与性能,同时提供有效的错误重试机制,以应对网络波动或接口调用失败等情况。
  3. 分页与限流问题:由于API接口返回的数据量较大,需要合理设计分页逻辑,并解决限流带来的潜在影响。

方案概要

本次实施集中使用了如下特性:

  • 使用erp.sales.get API 从吉客云定时获取销售信息,并包含已删除的数据记录,保障不漏单。
  • 利用轻易云提供的可视化流程设计工具,实现自定义的数据转换逻辑,使得不同格式之间的数据顺利映射至目标表结构。
  • 借助中心化监控告警系统,对整个集成过程进行实时监控,及时发现并处理异常情况,通过日志记录功能追踪每一步操作。

接下来我们将深入剖析此集成案例,从具体API调用、分页策略、到实际写入细节,以及如何使用轻易云平台配置过程中的关键步骤。 打通金蝶云星空数据接口

调用吉客云接口erp.sales.get获取并加工数据

在数据集成的生命周期中,第一步是调用源系统接口以获取原始数据。在本案例中,我们将重点讨论如何通过轻易云数据集成平台调用吉客云的erp.sales.get接口,并对获取的数据进行初步加工。

元数据配置解析

首先,我们需要理解元数据配置中的各个字段及其作用。以下是对元数据配置的详细解析:

{
  "api": "erp.sales.get",
  "effect": "QUERY",
  "method": "POST",
  "number": "channelCode",
  "id": "channelId",
  "request": [
    {"field": "pageIndex", "label": "页码(默认0)", "type": "int"},
    {"field": "pageSize", "label": "每页页数(默认50)", "type": "int", "value": "50"},
    {"field": "code", "label": "编号", "type": "string"},
    {"field": "name", "label": "名称", "type": "string"},
    {"field": "gmtModifiedStart", "label": "起始修改时间", "type": "string", 
        "value":"{{LAST_SYNC_TIME|datetime}}"},
    {"field": "gmtModifiedEnd",     "label":"结束修改时间","type":"string","value":"{{CURRENT_TIME|datetime}}"},
    {"field":"isBlockup","label":"是否停用,1-是,0-否","type":"int","value":"1"},
    {"field":"isDelete","label":"是否删除,1-是,0-否","type":"int"}
  ],
  “autoFillResponse”: true
}

请求参数详解

  1. api: erp.sales.get,表示我们要调用的吉客云API接口。
  2. effect: QUERY,表示此操作为查询操作。
  3. method: POST,表示使用HTTP POST方法进行请求。
  4. number: channelCode,用于标识销售渠道的编号字段。
  5. id: channelId,用于标识销售渠道的唯一ID字段。

请求参数列表:

  • pageIndex:页码,默认为0。
  • pageSize:每页记录数,默认为50。
  • code:销售渠道编号,可选参数。
  • name:销售渠道名称,可选参数。
  • gmtModifiedStart:起始修改时间,使用模板变量{{LAST_SYNC_TIME|datetime}}自动填充上次同步时间。
  • gmtModifiedEnd:结束修改时间,使用模板变量{{CURRENT_TIME|datetime}}自动填充当前时间。
  • isBlockup:是否停用,固定值为1(是)。
  • isDelete:是否删除,可选参数。

实际调用与数据处理

在实际操作中,我们需要按照上述配置构建请求,并发送给吉客云API。以下是一个示例请求体:

{
  “pageIndex”: 0,
  “pageSize”: 50,
  “gmtModifiedStart”: “2023-09-01T00:00:00Z”,
  “gmtModifiedEnd”: “2023-09-30T23:59:59Z”,
  “isBlockup”: 1
}

通过轻易云平台,我们可以自动生成并发送上述请求。响应结果会包含符合条件的销售渠道信息。由于设置了autoFillResponse=true,平台会自动处理响应并填充到相应的数据结构中。

数据清洗与转换

获取到原始数据后,需要对其进行清洗和转换,以便后续处理。常见的数据清洗步骤包括:

  1. 去除无效记录:过滤掉已删除或不符合业务规则的记录。例如,根据字段isDelete=1来过滤已删除的数据。
  2. 字段映射与转换:将源系统中的字段映射到目标系统所需的字段。例如,将吉客云中的channelCode映射为目标系统中的salesChannelCode

示例代码如下:

def clean_and_transform(data):
    cleaned_data = []
    for record in data:
        if record['isDelete'] == 0:
            transformed_record = {
                'salesChannelCode': record['channelCode'],
                'salesChannelId': record['channelId'],
                'name': record['name'],
                'modifiedTime': record['gmtModified']
            }
            cleaned_data.append(transformed_record)
    return cleaned_data

通过上述步骤,我们完成了从调用源系统API获取数据,到初步清洗和转换的全过程。这为后续的数据写入和进一步处理奠定了基础。 轻易云数据集成平台金蝶集成接口配置

使用轻易云数据集成平台进行ETL转换并写入MySQL API接口

在数据集成的生命周期中,ETL(Extract, Transform, Load)转换是关键的一步。本文将深入探讨如何利用轻易云数据集成平台,将已经集成的源平台数据进行ETL转换,转为目标平台 MySQL API 接口所能够接收的格式,并最终写入目标平台。

数据请求与清洗

在数据请求与清洗阶段,我们从源系统获取原始数据,并对其进行初步处理,以确保数据的准确性和一致性。这一步通常包括去重、填补缺失值、标准化字段等操作。假设我们已经完成了这一阶段,接下来将重点放在数据转换与写入阶段。

数据转换与写入

为了将清洗后的数据转为 MySQL API 接口所能接收的格式,我们需要配置相应的元数据。以下是一个详细的元数据配置示例:

{
  "api": "execute",
  "effect": "EXECUTE",
  "method": "POST",
  "idCheck": true,
  "request": [
    {
      "field": "main_params",
      "label": "主参数",
      "type": "object",
      "describe": "111",
      "value": "1",
      "children": [
        {"field": "id", "label": "销售渠道id", "type": "int", "value": "{channelId}"},
        {"field": "channel_code", "label": "渠道编码", "type": "string", "value": "{channelCode}"},
        {"field": "channel_name", "label": "销售渠道名称", "type": "string", "value": "{channelName}"},
        {"field": "channel_type", 
            "label": 
            "渠道类型", 
            "type":
                "string", 
            "describe":
                "渠道类型(0:分销办公室; 1:直营网店;2:直营门店;3:销售办公室;4:货主虚拟店;5:分销虚拟店;6:加盟门店;7:内部交易渠道)",
            "value":
                "{channelType}"
        },
        {"field":"plat_code","label":"店铺平台编码","type":"string","value":"{onlinePlatTypeCode}"},
        {"field":"plat_name","label":"店铺平台名称","type":"string","value":"{onlinePlatTypeName}"},
        {"field":"depart_name","label":"负责部门名称","type":"string","value":"{channelDepartName}"},
        {"field":"link_man","label":"联系人","type":"string","value":"{linkMan}"},
        {"field":"link_tel","label":"联系电话","type":"string","value":"{linkTel}"},
        {"field":"office_address","label":"办公地址","type":"string","value":"{officeAddress}"},
        {"field":"company_name","label":"公司名称","type":"string","value":"{companyName}"},
        {"field":"country_name","label":"国家","type":"string","value":"{countryName}"},
        {"field":"province_name","label":"省","type":"string","value":"{provinceName}"},
        {"field":
            "city_name",
            "label":
            "市",
            "type":
                "string",
            "value":
                "{cityName}"
        },
        {"field":
            "town_name",
            "label":
            "镇,区",
            "type":
                "string",
            "value":
                "{townName}"
        },
        {"field":
            "street_name",
            "label":
            "街道",
            "type":
                "string",
            "value":
                "{streetName}"
        },
        {"field":
            "memo",
            "label":
            "备注",
            "type":
                "string",
            "value":
                "{memo}"
        },
        {"field":
            "warehouse_code",
            "label":
            "默认仓库编码",
          type:
              string,
          value:
              {warehouseCode}
          },
          {
            field:
              warehouse_name,
            label:
              默认仓库名称,
            type:
              string,
            value:
              {warehouseName}
          },
          {
            field:
              charge_type,
            label:
              结算方式,
            type:
              string,
            describe:
              结算方式(1:担保交易;2:银行收款;3:现金收款;4:货到付款;5:欠款计应收;6:客户预存款;7:多种结算;8:退换货冲抵;9:电子钱包)
            value:
              {chargeType}
          },
          {
            field: depart_code, label: 部门编码, type: string, value: {departCode}
          },
          {
            field: company_code, label: 公司编码, type: string, value: {departCode}
          },
          {
            field: is_blockup, label: 是否停用, type: int, describe: 1-是,0-否,value:1,default:"1"
          },
          {
            field:is_delete,label:"是否删除",type:"int",describe:"1-是,0-否"
          },
          {
            field:create_by,label:"创建人",type:"string",value:"系统自动",default:"系统自动"
          },
          {
            field:update_by,label:"修改人",type:"string",value:"系统自动",default:"系统自动"
          }
      ]
    }
  ],
  otherRequest:[
    {
      field: main_sql,label: 主语句,type:string,describe:111,value:`REPLACE INTO lehua.sc_sale_channel(
       id,
       channel_code,
       channel_name,
       channel_type,
       plat_code,
       plat_name,
       depart_name,
       link_man,
       link_tel,
       office_address,
       company_name,
       country_name,
       province_Name
       city_name
       town_name
       street_name
       memo
       warehouse_code
       warehouse_name
       charge_type
       depart_code
       company_code
       is_blockup
       is_delete
       create_by
       update_by)
     VALUES(
     <{id}>,
     <{channel_code}>,
     <{channel_name}>,
     <{channel_type}>,
     <{plat_code}>,
     <{plat_name}>,
     <{depart_name}>,
     <{link_man}>,
     <{link_tel}>,
     <{office_address}>,
     <{company_name}>,
     <{country_name}>
     <{province_Name}>
     <{city_Name}>
     <{town_Name}>
     <{street_Name}>
     <{memo}>
     <{warehouse_Code}>
     <{warehouse_Name}>
     <charge_Type>
     depart_Code>
     company_Code>
    is_blockup>
    is_delete>
    create_by>
    update_by>);
    `
   }
 ]
}

配置解析

  • API配置

    • api: 指定执行操作。
    • effect: 设置为EXECUTE表示执行操作。
    • method: HTTP方法,设置为POST
  • 主参数配置

    • main_params: 包含所有需要传递的数据字段,每个字段都有详细的描述和类型定义。例如,id表示销售渠道ID,类型为整数。
  • SQL语句

    • main_sql: 包含实际执行的SQL语句,通过占位符<{...}>来插入实际的数据值。

数据写入

通过上述配置,我们可以将清洗后的数据转换为MySQL数据库所需的格式,并使用HTTP POST方法将其写入目标数据库。每个字段都经过了严格定义和映射,确保了数据的一致性和完整性。

这种方式不仅提高了数据处理效率,还保证了每个环节的透明度和可追溯性。通过实时监控和日志记录,可以随时检查和调整数据流动状态,确保业务流程顺畅无误。 钉钉与CRM系统接口开发配置