ETL转换与数据写入:轻易云平台的详细实施指南

  • 轻易云集成顾问-曹润

钉钉数据集成到MySQL的案例分享 —— 采购业务退款处理

在本次技术专题中,我们将探讨一个实际运行中的系统对接集成案例:如何通过轻易云数据集成平台,实现钉钉采购业务退款单数据(dd-新付款退款单)向MySQL数据库(鸿巢)的高效、可靠写入。这个方案不仅需要处理大量实时生成的数据,还要求确保每笔交易记录的准确性和完整性。

技术要点总结

  1. API调用与分页限流问题: 我们通过调用v1.0/yida/processes/instances接口,从钉钉获取最新的退款单数据。为了避免接口请求超时和数据丢失,引入了分页机制,并利用定时任务调度,定期抓取新的订单信息。

  2. 高吞吐量数据写入能力: 为保障大量订单信息能够快速且稳健地写入至MySQL数据库,我们采用了批量插入操作。同时,使用异步处理模式,提高整体系统性能,将延迟降至最低。

  3. 实时监控与告警系统: 集成过程中,通过轻易云提供的集中监控和告警功能,实时跟踪每一步的数据处理状态。一旦出现异常情况,即可即时发出告警通知,以便迅速定位并解决问题,确保对接流程顺畅无阻。

  4. 自定义数据转换逻辑: 由于钉钉与MySQL之间的数据格式存在差异,我们设计并实现了一套自定义的数据映射规则。在实现过程中,不仅考虑到了字段名称和类型的转换,还针对特定业务需求进行了相应调整,使得最终输入到MySQL中的数据能够完全符合预期。

  5. 错误重试机制及异常处理: 在实际应用环境下,由于网络波动等因素可能导致部分API调用失败,为此我们引入了自动重试机制。该机制会在指定次数内重新尝试未成功执行的操作,同时详细记录日志,为后续排查故障提供依据。

以下是具体实施过程中的关键步骤及细节解析:

(续篇待补充....)

通过上述技术手段和优化措施,本次集成方案在保障高效性的同时,也大幅提高了整个过程的稳定性与容错能力。那么接下来,让我们深入了解这一项目中所涉及的一些具体配置和代码实现方法。 系统集成平台API接口配置

调用钉钉接口获取并加工数据的技术实现

在数据集成过程中,调用源系统接口是关键的一步。本文将详细探讨如何使用轻易云数据集成平台调用钉钉接口v1.0/yida/processes/instances,并对获取的数据进行加工处理。

接口调用配置

首先,我们需要配置API接口的元数据。以下是具体的配置细节:

{
  "api": "v1.0/yida/processes/instances",
  "method": "POST",
  "number": "title",
  "id": "processInstanceId",
  "idCheck": true,
  "formatResponse": [
    {"old": "dateField_lgn3helb", "new": "datetime_new", "format": "date"},
    {"old": "serialNumberField_lgm25d8r", "new": "order_no_new", "format": "string"}
  ],
  "request": [
    {"field": "pageNumber", "label": "分页页码", "type": "string", "describe": "分页页码", "value": "{PAGINATION_START_PAGE}"},
    {"field": "pageSize", "label": "分页大小", "type": "string", "describe": "分页大小", "value": "{PAGINATION_PAGE_SIZE}"},
    {"field": "appType", "label": "应用ID", "type": "string", "describe": "应用ID", "value":"APP_WTSCMZ1WOOHGIM5N28BQ"},
    {"field": "systemToken", "label":"应用秘钥","type":"string","describe":"应用秘钥","value":"IS866HB1DXJ8ODN3EXSVD750RBTK2X72R8MELL4"},
    {"field":"userId","label":"用户的userid","type":"string","describe":"用户的userid","value":"16000443318138909"},
    {"field":"language","label":"语言","type":"string","describe":"语言,取值:zh_CN:中文(默认值)en_US:英文"},
    {"field":"formUuid","label":"表单ID","type":"string","describe":"表单ID","value":"FORM-UX866Q61RUV939TLEWG9H4HX25523ZRQNXLGLW"},
    {"field":"searchFieldJson","label":"条件","type":"object","children":[{"parent":"searchFieldJson","label":"费用类型","field":"selectField_lgm25d98","type":"string","value":"供应链费用退款"}]},
    {"field":"originatorId","label":"根据流程发起人工号查询","type":"string","describe":"根据流程发起人工号查询"},
    {"field":"createFromTimeGMT","label":"创建时间起始值","type":"string","describe":"创建时间起始值","value":"_function DATE_FORMAT(DATE_ADD(NOW(),INTERVAL - 25 DAY),'%Y-%m-%d 00:00:00')"},
    {"field":"createToTimeGMT","label":"创建时间终止值","type":"string","describe":"创建时间终止值","value
![金蝶与MES系统接口开发配置](https://pic.qeasy.cloud/S14.png~tplv-syqr462i7n-qeasy.image)
### 轻易云数据集成平台生命周期第二步:ETL转换与数据写入

在数据集成过程中,ETL(Extract, Transform, Load)是关键步骤之一。本文将深入探讨如何使用轻易云数据集成平台,将源平台的数据进行ETL转换,最终写入目标平台MySQL的API接口。

#### 数据请求与清洗

在ETL流程的第一步,数据从源系统提取并进行清洗。这一步骤确保了数据的准确性和一致性,为后续的转换和加载奠定基础。我们假设这一步已经完成,接下来重点讨论如何将清洗后的数据转换为目标平台MySQL所能接收的格式,并通过API接口写入。

#### 数据转换与写入

为了实现数据从源平台到目标平台MySQL的无缝对接,需要配置元数据并调用相应的API接口。以下是具体的元数据配置及其应用:

```json
{
  "api": "execute",
  "effect": "EXECUTE",
  "method": "POST",
  "idCheck": true,
  "request": [
    {
      "field": "main_params",
      "label": "main_params",
      "type": "object",
      "describe": "111",
      "children": [
        {"field": "extend_processInstanceId", "label": "明细id", "type": "string", "value": "{bfn_id}"},
        {"field": "order_no_new", "label": "单号", "type": "string", "value": "{order_no_new}(FKTK)"},
        {"field": "datetime_new", "label": "时间", "type": "date", "value": "{datetime_new}"},
        {"field": "qty_count", "label": "数量", "type": "string", "value":"1"},
        {"field":"sales_count","label":"金额","type":"string","value":"{{tableField_lgm25d9j_numberField_lgm25d9r}}"},
        {"field":"status","label":"状态","type":"string"},
        {"field":"Document_Type","label":"单据类型","type":"string","value":"采购退款单"}
      ]
    }
  ],
  "otherRequest":[
    {
      "field":"main_sql",
      "label":"main_sql",
      "type":"string",
      "describe":"111",
      "value":"INSERT INTO `hc_dd_cgtk`(`extend_processInstanceId`,`order_no_new`,`datetime_new`,`qty_count`,`sales_count`,`status`,`Document_Type`) VALUES (:extend_processInstanceId,:order_no_new,:datetime_new,:qty_count,:sales_count,:status,:Document_Type)"
    }
  ]
}
配置解析
  1. API 调用配置

    • api: 指定API操作类型,这里是execute
    • effect: 操作效果,这里是EXECUTE
    • method: HTTP方法,这里使用POST
    • idCheck: 是否进行ID检查,这里设置为true
  2. 请求参数配置

    • request: 包含所有需要传递给API的参数。
      • main_params: 主参数对象,包含多个子字段。
      • extend_processInstanceId: 明细ID,对应源数据中的bfn_id
      • order_no_new: 单号,附加固定字符串(FKTK)
      • datetime_new: 时间字段,对应源数据中的日期时间值。
      • qty_count: 数量,这里固定为1
      • sales_count: 金额,对应表格字段中的数值。
      • status: 状态字段,类型为字符串。
      • Document_Type: 单据类型,这里固定为“采购退款单”。
  3. 其他请求配置

    • otherRequest: 包含SQL插入语句,用于将转换后的数据写入MySQL数据库。
      • main_sql: SQL插入语句模板,通过占位符绑定参数值。
数据写入操作

在完成上述元数据配置后,通过调用相应的API接口,将转换后的数据写入目标平台MySQL。以下是执行该操作的示例代码:

import requests
import json

# API URL
url = 'http://your-api-endpoint/execute'

# Headers
headers = {
    'Content-Type': 'application/json'
}

# Payload
payload = {
    'main_params': {
        'extend_processInstanceId': 'example_bfn_id',
        'order_no_new': 'example_order_no(FKTK)',
        'datetime_new': '2023-10-01T12:00:00Z',
        'qty_count': '1',
        'sales_count': '1000',
        'status': 'completed',
        'Document_Type': '采购退款单'
    },
    'main_sql': (
        """
        INSERT INTO hc_dd_cgtk 
            (extend_processInstanceId, order_no_new, datetime_new, qty_count, sales_count, status, Document_Type) 
        VALUES 
            (:extend_processInstanceId, :order_no_new, :datetime_new, :qty_count, :sales_count, :status, :Document_Type)
        """
    )
}

# Make the request
response = requests.post(url, headers=headers, data=json.dumps(payload))

# Check response status
if response.status_code == 200:
    print('Data successfully written to MySQL.')
else:
    print(f'Failed to write data. Status code: {response.status_code}')

通过上述步骤和代码示例,我们实现了从源平台到目标平台MySQL的数据ETL转换与写入。在实际应用中,可以根据具体需求调整元数据配置和API调用方式,以满足不同业务场景下的数据集成需求。 如何对接企业微信API接口