利用轻易云实现数据集成ETL转换并写入MySQL

  • 轻易云集成顾问-彭亮

钉钉数据集成到MySQL:dd-新转账单(资金调拨)-->mysql(鸿巢)技术案例分享

在实施“dd-新转账单(资金调拨)-->mysql(鸿巢)”项目时,面临的主要挑战是如何高效、可靠地将钉钉平台上的数据集成至内部的MySQL数据库。通过轻易云数据集成平台,我们利用其高级特性,实现了从获取钉钉API接口的数据,到转换及入库全过程的精准控制。

首先,在对接过程中我们使用了v1.0/yida/processes/instances API,从钉钉中抓取相关业务流程实例的数据。这一步骤涉及到了处理分页和限流问题,为确保不漏单,我们制定了一套定时任务机制,能够可靠地周期性抓取并存储最新的数据。同时,通过设置自定义的重试策略,有效降低了由于网络延迟或API速率限制所导致的问题。

对于输送到MySQL数据库中的数据,我们采用了批量写入的方法,以提升整体吞吐效率。借助可视化的数据流设计工具,将多次复杂操作简化为一系列直观且容易管理的步骤。此外,还使用了执行API execute进行快速、大量的数据写入,这确保了系统在大规模数据交换环境下依然保持稳定运行。

为了实现与业务需求相适应的数据转换逻辑,自定义脚本被用于转换和映射不同格式的数据。在这一过程中,我们特别注意处理可能存在的不一致类型及结构差异,提高最终储存于MySQL中的数据质量。同时,通过集中监控告警系统,对整个集成过程进行了实时跟踪,迅速发现并解决任何异常情况,使得整个项目顺利落地实施。 数据集成平台可视化配置API接口

调用钉钉接口v1.0/yida/processes/instances获取并加工数据

在数据集成的生命周期中,调用源系统接口是至关重要的一步。本文将详细探讨如何通过轻易云数据集成平台调用钉钉接口v1.0/yida/processes/instances,获取并加工数据,以实现高效的数据集成。

接口调用配置

首先,我们需要配置API调用的元数据。以下是具体的配置细节:

{
  "api": "v1.0/yida/processes/instances",
  "method": "POST",
  "number": "title",
  "id": "processInstanceId",
  "idCheck": true,
  "formatResponse": [
    {"old": "dateField_lglvrpp4", "new": "datetime_new", "format": "date"},
    {"old": "serialNumberField_lgov9d3b", "new": "order_no_new", "format": "string"}
  ],
  "request": [
    {"field": "pageNumber", "label": "分页页码", "type": "string", "describe": "分页页码", "value": "{PAGINATION_START_PAGE}"},
    {"field": "pageSize", "label": "分页大小", "type": "string", "describe": "分页大小", "value": "{PAGINATION_PAGE_SIZE}"},
    {"field": "appType", "label": "应用ID", "type": "string", "describe": "应用ID", 
"value":"APP_WTSCMZ1WOOHGIM5N28BQ"},
    {"field":"systemToken","label":"应用秘钥","type":"string","describe":"应用秘钥","value":"IS866HB1DXJ8ODN3EXSVD750RBTK2X72R8MELL4"},
    {"field":"userId","label":"用户的userid","type":"string","describe":"用户的userid","value":"16000443318138909"},
    {"field":"language","label":"语言","type":"string","describe":"语言,取值:zh_CN:中文(默认值)en_US:英文"},
    {"field":"formUuid","label":"表单ID","type":"string","describe":"表单ID","value":"FORM-KW766OD1931ALBVJBHUHLD6KFJ3G3OELLVLGL0"},
    {"field":"searchFieldJson","label":"条件","type":"object","children":[{"field":"selectField_lglvrpmg","label":"类型","type":"string","value":"跨组织转账"}]},
    {"field":"originatorId","label":"根据流程发起人工号查询","type":"string","describe":"根据流程发起人工号查询"},
    {"field":"createFromTimeGMT","label":"创建时间起始值","type":"string","describe":"创建时间起始值",
"value":
"_function DATE_FORMAT(DATE_ADD(NOW(),INTERVAL - 25 DAY),'%Y-%m-%d 00:00:00')"},
    {"field":
"createToTimeGMT",
"label":
"创建时间终止值",
"type":
"string",
"describe":
"创建时间终止值",
"value":
"{{CURRENT_TIME|datetime}}"
},
    {
"field":
"modifiedFromTimeGMT",
"label":
"修改时间起始值",
"type":
"string",
"describe":
"修改时间起始值"
},
    {
"field":
"modifiedToTimeGMT",
"label":
"修改时间终止值",
"type":
"string",
"describe":
"修改时间终止值"
},
    {
"field":
"taskId",
"label":
"任务ID",
"type":
"string",
"describe":
"任务ID"
},
    {
"field":
"instanceStatus",
"label":
"实例状态",
"type":
"strings
,"describe
:
实例状态
,"value
:
COMPLETED"
},
{
"
approvedResult
,"
label
:
流程审批结果
,"type
:
strings
,"describe
:
流程审批结果
,"value
:
agree"
}
],
  "
condition":[[{"field
:"dateField_lglvrpp4"
,"logic
:"notnull"}]]
}

请求参数详解

  • pageNumberpageSize:用于分页控制,确保每次请求的数据量可控。
  • appTypesystemToken:分别为应用ID和秘钥,用于身份验证。
  • userIdlanguage:指定用户和语言环境。
  • formUuidsearchFieldJson:用于指定表单和搜索条件,其中searchFieldJson包含了具体的业务逻辑,如类型为“跨组织转账”。
  • createFromTimeGMTcreateToTimeGMT:用于限定数据的创建时间范围,确保只获取特定时间段内的数据。
  • instanceStatusapprovedResult:用于过滤已完成且审批通过的实例。

数据格式化与转换

在获取到原始数据后,需要对其进行格式化和转换。元数据配置中的formatResponse字段定义了这一过程:

  • dateField_lglvrpp4 转换为 datetime_new,格式为日期。
  • serialNumberField_lgov9d3b 转换为 order_no_new,格式为字符串。

这种转换确保了数据在进入目标系统前已经符合预期格式,提高了数据处理效率。

数据清洗与写入

在完成数据请求和初步清洗后,可以利用轻易云平台提供的工具进一步处理这些数据,并将其写入目标系统(如MySQL数据库)。这一过程包括:

  1. 数据校验:根据元数据中的条件配置(如字段不为空)进行校验。
  2. 数据转换:利用平台提供的脚本或函数对数据进行进一步转换。
  3. 数据写入:通过配置好的连接器,将处理后的数据写入目标数据库。

通过上述步骤,我们可以高效地从钉钉系统中获取所需的数据,并将其无缝集成到其他业务系统中,实现跨系统的数据共享与协同工作。 如何对接金蝶云星空API接口

使用轻易云数据集成平台进行ETL转换并写入MySQL API接口

在数据集成过程中,ETL(Extract, Transform, Load)转换是关键步骤之一。本文将深入探讨如何通过轻易云数据集成平台,将已经集成的源平台数据进行ETL转换,转为目标平台 MySQL API 接口所能够接收的格式,并最终写入目标平台。

数据请求与清洗

在进行ETL转换之前,首先需要完成数据的请求与清洗。这一步骤确保从源系统获取的数据是准确且符合要求的。假设我们已经完成了这一步骤,现在我们关注如何将清洗后的数据进行转换并写入目标平台。

数据转换与写入

在轻易云数据集成平台中,元数据配置是实现数据转换和写入的重要环节。以下是一个详细的元数据配置示例:

{
  "api": "execute",
  "effect": "EXECUTE",
  "method": "POST",
  "idCheck": true,
  "request": [
    {
      "field": "main_params",
      "label": "main_params",
      "type": "object",
      "describe": "111",
      "children": [
        {
          "field": "extend_processInstanceId",
          "label": "明细id",
          "type": "string",
          "value": "{{extend.processInstanceId}}"
        },
        {
          "field": "order_no_new",
          "label": "单号",
          "type": "string",
          "value": "{order_no_new}(ZJDB)"
        },
        {
          "field": "datetime_new",
          "label": "时间",
          "type": "date",
          "value": "{datetime_new}"
        },
        {
          "field": "qty_count",
          "label": "数量",
          "type": "string",
          "value":"1"
        },
        {
          "field":"sales_count",
         "label":"金额",
         "type":"string",
         "value":"{numberField_lglvrpn8}"
        },
        {
          “field”:”status”,
          “label”:”状态”,
          “type”:”string”
        },
        {
         “field”:”Document_Type”,
         “label”:”单据类型”,
         “type”:”string”,
         “value”:”资金调拨”
       }
     ]
   }
 ],
"otherRequest":[
   {
     “field”:”main_sql”,
     “label”:”main_sql”,
     “type”:”string”,
     “describe”:”111”,
     “value”:”
       INSERT INTO `hc_dd_zjdb` (
         `extend_processInstanceId`,
         `order_no_new`,
         `datetime_new`,
         `qty_count`,
         `sales_count`,
         `status`,
         `Document_Type`
       ) VALUES (
         :extend_processInstanceId,
         :order_no_new,
         :datetime_new,
         :qty_count,
         :sales_count,
         :status,
         :Document_Type
       )
     ”
   }
 ]
}

元数据配置解析

  1. API接口定义

    • "api":"execute":指定要执行的操作。
    • "effect":"EXECUTE":定义操作效果为执行。
    • "method":"POST":使用HTTP POST方法。
  2. 请求参数

    • "request"部分定义了请求参数,包括字段名称、标签、类型和描述等信息。
    • 每个字段都包含具体的值来源,例如:
      • "extend_processInstanceId":从{{extend.processInstanceId}}获取值。
      • "order_no_new":从{order_no_new}(ZJDB)获取值并添加后缀。
      • "datetime_new":从{datetime_new}获取日期值。
      • "qty_count":固定值为1
      • "sales_count":从{numberField_lglvrpn8}获取金额值。
      • "status""Document_Type"分别表示状态和单据类型。
  3. SQL语句

    • "otherRequest"部分定义了要执行的SQL语句,用于将数据插入到目标MySQL数据库中。
    • SQL语句中的占位符(如:extend_processInstanceId)对应于请求参数中的字段。

实现过程

通过上述元数据配置,我们可以实现以下几个步骤:

  1. 提取(Extract): 从源系统提取所需的数据,并根据业务需求进行初步处理和清洗。

  2. 转换(Transform): 根据元数据配置,将提取的数据转换为目标系统所需的格式。例如,将订单号添加后缀,将日期格式化等。

  3. 加载(Load): 使用HTTP POST方法,将转换后的数据通过API接口发送到目标MySQL数据库,并执行插入操作。

这种方式不仅确保了数据的一致性和完整性,还大大简化了跨系统的数据集成过程。通过灵活的元数据配置,我们可以轻松适应不同业务场景下的数据需求,实现高效的数据流转和处理。 如何对接钉钉API接口