轻易云平台ETL转换与MySQL写入详解

  • 轻易云集成顾问-孙传友

钉钉数据集成到MySQL的技术案例分享

在企业的信息化进程中,数据的重要性不言而喻,而如何高效地集成不同系统的数据则是其中的一大挑战。本文将重点讨论一个实际运行的系统对接案例:将钉钉的新付款退款单(方案名称:dd-新付款退款单-->mysql (鸿巢)其他退款)数据集成到MySQL数据库中。

数据获取与API调用

首先,我们需要从钉钉平台获取相关的数据。为此,我们使用了钉钉提供的API接口 v1.0/yida/processes/instances 来定时抓取新生成的付款和退款信息。这一过程要求我们处理分页和限流等复杂情况,以确保数据完整且准确无误。

GET /v1.0/yida/processes/instances?appType=YOUR_APP_TYPE&limit=100 HTTP/1.1
Host: api.dingtalk.com
Authorization: Bearer YOUR_ACCESS_TOKEN

为了避免漏单问题,通过持续监控接口返回的数据状态,并在一定时间间隔内重复请求,从而确保所有记录都被成功捕获。同时设置重试机制以应对网络抖动或临时性API错误。

数据转换与映射

由于钉钉返回的数据格式可能与MySQL预期的不一致,我们需进行相应的自定义转换逻辑。例如,将时间格式标准化、字段名称映射等等。通过平台提供的可视化数据流设计工具,可以轻松构建并调试这些转换步骤,保证每条记录在写入前都符合目标数据库表结构要求。

{
  "instanceId": "12345",
  "createTime": "2023-10-15T08:00:00Z",
  ...
}

MySQL批量写入

当完成初步转换后,即可利用高吞吐量支持快速将大量数据批量插入到MySQL中。采用如下方式:

INSERT INTO refunds (id, create_time, ...) VALUES (?, ?, ...);

结合事务管理和错误日志机制,确保在出现异常时能够及时回溯并重新执行特定操作。此外,通过集中监控和告警系统,可实时跟踪整个任务状态及性能表现,有助于提前发现潜在问题并迅速处理,提高整体可靠性。

通过上述几个关键环节与技术细节,实现了从钉钉至MySQL的数据顺利集成。具体流程中的配置及编码将在后续章节详细展开,包括如何调用核心API、处理数据质量监控以及优化性能等更多实用技巧。

如何开发企业微信API接口

调用钉钉接口v1.0/yida/processes/instances获取并加工数据

在数据集成生命周期的第一步中,调用源系统的API接口是至关重要的一环。本文将详细探讨如何通过轻易云数据集成平台调用钉钉接口v1.0/yida/processes/instances,获取并加工数据,以实现数据的有效集成。

接口调用配置

首先,我们需要配置API调用的元数据。根据提供的元数据配置,以下是具体的请求参数和格式:

{
  "api": "v1.0/yida/processes/instances",
  "method": "POST",
  "request": [
    {"field": "pageNumber", "type": "string", "value": "{PAGINATION_START_PAGE}"},
    {"field": "pageSize", "type": "string", "value": "{PAGINATION_PAGE_SIZE}"},
    {"field": "appType", "type": "string", "value": "APP_WTSCMZ1WOOHGIM5N28BQ"},
    {"field": "systemToken", "type": "string", "value": "IS866HB1DXJ8ODN3EXSVD750RBTK2X72R8MELL4"},
    {"field": "userId", "type": "string", "value": "16000443318138909"},
    {"field": "language", "type": "string", "value": ""},
    {"field": "formUuid", "type": "string", "value": ""},
    {"field":"searchFieldJson","type":"object","children":[
      {"field":"selectField_lgm25d98","label":"费用分类","type":"string"},
      {"parent":"searchFieldJson","label":"流水号","field":"serialNumberField_lgm25d8r","type":"string"},
      {"parent":"searchFieldJson","label":"申请人","field":"textField_lgm25d8p","type":"string"}
    ]},
    {"field":"originatorId","type":"string"},
    {"field":"createFromTimeGMT","type":"string","value":"_function DATE_FORMAT(DATE_ADD(NOW(),INTERVAL - 25 DAY),'%Y-%m-%d 00:00:00')"},
    {"field":"createToTimeGMT","type":"string","value":"{{CURRENT_TIME|datetime}}"},
    {"field":"modifiedFromTimeGMT","type":"string"},
    {"field":"modifiedToTimeGMT","type":"string"},
    {"field":"taskId","type":"string"},
    {"field":"instanceStatus","type":"string","value":"COMPLETED"},
    {"field":"approvedResult","type":"string","value":"agree"}
  ],
  ...
}

数据请求与清洗

在进行数据请求时,需要特别注意分页参数pageNumberpageSize,这些参数确保我们能够逐页获取大批量的数据。此外,通过设置createFromTimeGMTcreateToTimeGMT,可以限定查询的数据时间范围,从而提高查询效率。

{
  ...
  {
    field: 'createFromTimeGMT',
    type: 'string',
    value: '_function DATE_FORMAT(DATE_ADD(NOW(),INTERVAL - 25 DAY),\'%Y-%m-%d 00:00:00\')'
  },
  {
    field: 'createToTimeGMT',
    type: 'string',
    value: '{{CURRENT_TIME|datetime}}'
  },
  ...
}

在清洗过程中,需要对返回的数据进行格式化处理。例如,将日期字段dateField_lgn3helb转换为新的字段名datetime_new,并将其格式化为标准日期格式;将流水号字段serialNumberField_lgm25d8r转换为新的字段名order_no_new

{
  ...
  formatResponse: [
    {old: 'dateField_lgn3helb', new: 'datetime_new', format: 'date'},
    {old: 'serialNumberField_lgm25d8r', new: 'order_no_new', format: 'string'}
  ]
}

数据转换与写入

在完成数据清洗后,需要将处理后的数据写入目标系统。在本案例中,目标系统是MySQL数据库。通过轻易云平台,可以轻松配置目标数据库连接,并将清洗后的数据批量写入。

{
  ...
  beatFlat: ['tableField_lgm25d9j']
}

实例状态与审批结果过滤

为了确保只获取已完成且审批通过的实例,我们在请求参数中添加了实例状态和审批结果的过滤条件:

{
  ...
  {
    field: 'instanceStatus',
    type: 'string',
    value: 'COMPLETED'
  },
  {
    field: 'approvedResult',
    type: 'string',
    value: 'agree'
  }
}

通过上述配置,我们可以高效地调用钉钉接口,获取所需的数据,并进行必要的清洗和转换,最终实现数据的无缝集成。这不仅提高了业务流程的透明度和效率,也为后续的数据分析和决策提供了可靠的数据支持。 如何对接用友BIP接口

使用轻易云数据集成平台进行ETL转换并写入MySQL API接口

在数据集成的生命周期中,第二步是将已经集成的源平台数据进行ETL(Extract, Transform, Load)转换,并最终写入目标平台。在本案例中,我们将详细探讨如何将数据转换为MySQL API接口所能接收的格式,并写入目标平台。

数据请求与清洗

首先,我们需要从源平台获取原始数据。假设我们已经完成了数据请求与清洗阶段,得到了以下字段:

  • bfn_id:明细ID
  • order_no_new:单号
  • datetime_new:时间
  • tableField_lgm25d9j_numberField_lgm25d9r:金额

这些字段将作为输入参数,在ETL过程中进行转换和映射。

数据转换与写入

接下来,我们将这些原始数据进行转换,以符合目标平台MySQL API接口的要求。根据提供的元数据配置,具体操作如下:

  1. 定义API接口参数

    根据元数据配置,我们需要构建一个POST请求,包含以下字段:

    {
     "main_params": {
       "extend_processInstanceId": "{bfn_id}",
       "order_no_new": "{order_no_new}(FKTK)",
       "datetime_new": "{datetime_new}",
       "qty_count": "1",
       "sales_count": "{{tableField_lgm25d9j_numberField_lgm25d9r}}",
       "status": "",
       "Document_Type": "其他退款"
     }
    }
  2. 构建SQL语句

    通过元数据配置中的main_sql字段,我们可以看到需要执行的SQL插入语句:

    INSERT INTO `hc_dd_qttk`
    (`extend_processInstanceId`, `order_no_new`, `datetime_new`, `qty_count`, `sales_count`, `status`, `Document_Type`)
    VALUES (:extend_processInstanceId, :order_no_new, :datetime_new, :qty_count, :sales_count, :status, :Document_Type)
  3. 配置请求参数

    在轻易云数据集成平台中,我们需要配置请求参数以匹配上述SQL语句中的占位符。具体映射关系如下:

    • extend_processInstanceId 映射到 {bfn_id}
    • order_no_new 映射到 {order_no_new}(FKTK)
    • datetime_new 映射到 {datetime_new}
    • qty_count 固定值 "1"
    • sales_count 映射到 {{tableField_lgm25d9j_numberField_lgm25d9r}}
    • status 空值
    • Document_Type 固定值 "其他退款"
  4. 发送POST请求

    最后,通过轻易云的数据集成平台发送POST请求,执行上述SQL插入操作。具体实现如下:

{
  "api": "execute",
  "effect": "EXECUTE",
  "method": "POST",
  "idCheck": true,
  "request": [
    {
      "field": "main_params",
      "label": "main_params",
      "type": "object",
      "describe": "111",
      "children": [
        {"field":"extend_processInstanceId","label":"明细id","type":"string","value":"{bfn_id}"},
        {"field":"order_no_new","label":"单号","type":"string","value":"{order_no_new}(FKTK)"},
        {"field":"datetime_new","label":"时间","type":"date","value":"{datetime_new}"},
        {"field":"qty_count","label":"数量","type":"string","value":"1"},
        {"field":"sales_count","label":"金额","type":"string","value":"{{tableField_lgm25d9j_numberField_lgm25d9r}}"},
        {"field":"status","label":"状态","type":"string"},
        {"field":"Document_Type","label":"单据类型","type":"string","value":"其他退款"}
      ]
    }
  ],
  "otherRequest":[
    {
      "field": "main_sql",
      "label": "main_sql",
      "type": "string",
      "describe": "111",
      "value": 
"INSERT INTO  `hc_dd_qttk`
(`extend_processInstanceId`,`order_no_new`,`datetime_new`,`qty_count`,`sales_count`,`status`,`Document_Type`)
VALUES (:extend_processInstanceId,:order_no_new,:datetime_new,:qty_count,:sales_count,:status,:Document_Type)"
    }
  ]
}

通过以上步骤,我们成功地将源平台的数据进行了ETL转换,并通过MySQL API接口写入了目标平台。这一过程不仅确保了数据的一致性和完整性,还提高了系统间的数据交互效率。 企业微信与ERP系统接口开发配置

更多系统对接方案