使用轻易云平台进行数据ETL及MySQL写入操作

  • 轻易云集成顾问-张妍琪

钉钉数据集成到MySQL的技术实现:品类即时报表集成方案-月度目标表v2更新

在构建企业信息化系统时,如何高效、稳定地将各种应用系统的数据进行整合,实现资源的最优配置,是一个非常重要的问题。在本案例中,我们深入探讨了如何通过钉钉API和MySQL数据写入API,使得从钉钉获取到的重要业务数据能够准确快速地同步至MySQL数据库,从而支持更实时、更可靠的数据分析需求。

任务概述

项目背景是基于"品类即时报表集成方案-月度目标表v2更新",我们需要定时批量抓取并处理来自钉钉应用中的报表数据,通过API对接,将其写入到企业内部的MySQL数据库。这不仅要确保每次抓取流程无遗漏,还需解决接口分页与限流问题,同时实现对异常情况的有效处理和重试机制,以保证整个集成过程的稳定性及可靠性。

数据获取与转换

为了确定从钉钉获取所需的数据,我们调用了/yida/forms/instances/ids/{appType}/{formUuid} API。这个过程中重点关注以下几点:

  1. 分页与限流应对:由于单次请求返回数据可能有限,需要实施合理的分页策略,并且考虑调用频率限制,避免触发接口封禁。
  2. 自定义转换逻辑:根据业务需求,对获取到的数据进行清洗和格式转换,以适配后续存储阶段的数据结构要求。
  3. 质量监控及异常检测:提供自动化监控手段,如实时日志记录和告警通知,一旦发现异常立即采取应急措施。

数据写入操作优化

在将整理后的数据信息写入MySQL时,使用了execute API进行批处理操作。关键点如下:

  1. 高吞吐量支撑:通过优化批量写入策略,实现高效的大规模数据插入,降低操作耗时,提高整体性能。
  2. 错误重试机制:建立详细日志体系,当遇到因网络波动或其它原因导致部分数据未成功导入时,可以及时记录并触发重试逻辑,确保最终一致性。
  3. 定制化映射规则:依据业务实际要求制定灵活多样的字段映射规则,使得源端与目的端之间能顺利匹配,不丢失关键信息。

借助可视化设计工具,可以简明直观地管理整个流程,从任务调度、参数设置,到实时监控各节点状态,都显得便捷而易用。

以上内容奠定了本案例主要技术路线 如何对接企业微信API接口

调用钉钉接口获取并加工数据的技术实现

在轻易云数据集成平台的生命周期管理中,调用源系统接口是至关重要的一步。本文将详细探讨如何通过调用钉钉接口 v1.0/yida/forms/instances/ids/{appType}/{formUuid} 获取并加工数据,以实现品类即时报表集成方案-月度目标表v2更新。

接口概述

该接口用于查询指定应用和表单的实例数据。通过POST请求方式,传递必要的参数以获取相应的数据。以下是元数据配置中的关键字段及其作用:

  • api: 接口路径。
  • effect: 操作类型,这里为查询(QUERY)。
  • method: 请求方法,这里为POST。
  • number: 标题字段。
  • id: 唯一标识字段。

请求参数详解

元数据配置中定义了一系列请求参数,每个参数都有特定的用途:

  1. appType: 应用编码,用于标识具体的应用。

    • 示例值: "APP_BNJNRVQ32174RSX3MROF"
  2. formUuid: 表单ID,用于标识具体的表单。

    • 示例值: "FORM-DA5DD28FDCC644ECB2A8F123534D3EF4I082"
  3. pageNumber: 分页页码,指定要查询的页数。

    • 示例值: "1"
  4. pageSize: 分页大小,指定每页返回的数据条数。

    • 示例值: "50"
  5. modifiedToTimeGMT: 修改时间终止值,用于过滤修改时间在此之前的数据。

    • 示例值: "{{CURRENT_TIME|datetime}}"
  6. systemToken: 应用秘钥,用于验证请求合法性。

    • 示例值: "KYC664C1WR9LODIIAI09I913S0HO2G3YGREWL43"
  7. modifiedFromTimeGMT: 修改时间起始值,用于过滤修改时间在此之后的数据。

    • 示例值: "{{LAST_SYNC_TIME|datetime}}"
  8. language: 语言选项,支持中文(zh_CN)和英文(en_US)。

    • 示例值: "zh_CN"
  9. searchFieldJson: 根据表单内组件值查询,可用于复杂查询条件。

  10. userId: 用户userid,用于特定用户的数据过滤。

    • 示例值: "16000443318138909"
  11. originatorId: 根据流程发起人工号查询。

  12. createToTimeGMT: 创建时间终止值。

  13. createFromTimeGMT: 创建时间起始值。

实现步骤

  1. 构建请求体

根据元数据配置构建请求体,确保所有必需参数都已填充。例如:

{
  "appType": "APP_BNJNRVQ32174RSX3MROF",
  "formUuid": "FORM-DA5DD28FDCC644ECB2A8F123534D3EF4I082",
  "pageNumber": "1",
  "pageSize": "50",
  "modifiedToTimeGMT": "{{CURRENT_TIME|datetime}}",
  "systemToken": "KYC664C1WR9LODIIAI09I913S0HO2G3YGREWL43",
  "modifiedFromTimeGMT": "{{LAST_SYNC_TIME|datetime}}",
  "language": "zh_CN"
}
  1. 发送请求

使用HTTP客户端(如curl、Postman或编程语言中的HTTP库)发送POST请求到指定API路径,并附上构建好的请求体。

curl -X POST \
  https://api.dingtalk.com/v1.0/yida/forms/instances/ids/APP_BNJNRVQ32174RSX3MROF/FORM-DA5DD28FDCC644ECB2A8F123534D3EF4I082 \
  -H 'Content-Type: application/json' \
  -d '{
        "appType": "APP_BNJNRVQ32174RSX3MROF",
        "formUuid": "FORM-DA5DD28FDCC644ECB2A8F123534D3EF4I082",
        ...
      }'
  1. 处理响应

解析响应数据,根据业务需求进行进一步处理。例如,将获取到的数据进行清洗、转换后写入目标系统。

import requests
import json

url = 'https://api.dingtalk.com/v1.0/yida/forms/instances/ids/APP_BNJNRVQ32174RSX3MROF/FORM-DA5DD28FDCC644ECB2A8F123534D3EF4I082'
headers = {'Content-Type': 'application/json'}
data = {
    "appType": "APP_BNJNRVQ32174RSX3MROF",
    ...
}

response = requests.post(url, headers=headers, data=json.dumps(data))
if response.status_code == 200:
    result = response.json()
    # 数据处理逻辑
else:
    print(f"Error {response.status_code}: {response.text}")

通过上述步骤,我们可以成功调用钉钉接口获取所需数据,并进行必要的加工处理。这是轻易云数据集成平台生命周期管理中的关键一步,为后续的数据转换与写入奠定了基础。 金蝶云星空API接口配置

使用轻易云数据集成平台进行ETL转换并写入MySQL API接口的技术案例

在数据集成生命周期的第二步,我们需要将已经集成的源平台数据进行ETL(Extract, Transform, Load)转换,并最终写入目标平台MySQL。这一过程中,元数据配置是关键,它定义了如何将源数据转换为目标平台能够接收的格式。以下是一个详细的技术案例,展示如何使用轻易云数据集成平台完成这一任务。

元数据配置解析

元数据配置是ETL过程中的核心部分,它决定了如何从源系统提取数据、进行必要的转换,并最终写入目标系统。在这个案例中,我们的目标是将品类即时报表的数据更新到MySQL数据库中的月度目标表。以下是元数据配置的详细解析:

{
  "api": "execute",
  "effect": "EXECUTE",
  "method": "SQL",
  "number": "id",
  "id": "id",
  "name": "id",
  "idCheck": true,
  "request": [
    {
      "field": "main_params",
      "label": "主参数",
      "type": "object",
      "describe": "对应主语句内的动态参数",
      "children": [
        {"field": "form_instance_id", "label": "源id(实例id)", "type": "string", "value": "{id}"},
        {"field": "platform", "label": "平台", "type": "string", "value": "{selectField_lxa3pxj6}"},
        {"field": "date", 
         "label": "日期", 
         "type": "timestamp", 
         "value":"_function FROM_UNIXTIME( ( {dateField_lxa3pxie} / 1000 ) ,'%Y-%m-%d' )"},
        {"field": "shop_name", 
         "label":"店铺名称", 
         "type":"string", 
         "value":"{selectField_lxa3pxj7}"},
        {"field":"shop_code","label":"店铺编码","type":"string","value":"{textField_lxa3pxj4}"},
        {"field":"category","label":"品类","type":"string","value":"{selectField_lxa3pxjh}"},
        {"field":"sale_goal","label":"销售目标","type":"float","value":"{numberField_lxa3pxjj}"},
        {"field":"sale_out_goal","label":"出库目标","type":"float","value":"{numberField_lxa3pxjk}"},
        {"field":"create_time",
         "label":"创建时间",
         "type":"timestamp",
         "_function DATE_FORMAT('{gmtCreate}','%Y-%m-%d %H:%i:%s')",
         default:"{{CURRENT_TIME|datetime}}"
        },
        {"field":"create_by","label":"创建人","type":"string","value":"{originator}"},
        {"field":"create_user_id","label":"创建人id","type":"string","value":"{originatorUserId}"},
        {"field":"modify_time",
         "_function DATE_FORMAT('{gmtModified}','%Y-%m-%d %H:%i:%s')"
        },
        {"field":"modify_by","label":"","type":"","value":"","default":"","describe":"","required":"","defaultValue":"","options":[],"children":[]},
        {"field":"","label":"","type":"","value":"","default":"","describe":"","required":"","defaultValue":"","options":[],"children":[]}
      ]
    }
  ],
  otherRequest: [
    {
      field: 'main_sql',
      label: '主语句',
      type: 'string',
      describe: 'SQL首次执行的语句,将会返回:lastInsertId',
      value: `UPDATE lehua.month_goal
              SET platform = <{platform: }>,
                  date = <{date: }>,
                  shop_name = <{shop_name: }>,
                  shop_code = <{shop_code: }>,
                  category = <{category: }>,
                  sale_goal = <{sale_goal: }>,
                  sale_out_goal = <{sale_out_goal: }>,
                  create_time = <{create_time: CURRENT_TIMESTAMP}>,
                  create_by = <{create_by: }>,
                  create_user_id = <{create_user_id: }>,
                  modify_time = <{modify_time: }>,
                  modify_by = <{modify_by: }>,
                  modify_user_id = <{modify_user_id: }>
              WHERE form_instance_id = <{form_instance_id: }>;`
    }
  ]
}

数据请求与清洗

在这个步骤中,我们从源系统提取数据,并根据元数据配置进行清洗和格式化。例如,日期字段需要从Unix时间戳转换为标准日期格式,创建时间和更新时间需要格式化为%Y-%m-%d %H:%i:%s

FROM_UNIXTIME( ( {dateField_lxa3pxie} / 1000 ) ,'%Y-%m-%d' )

数据转换与写入

在清洗完毕后,我们需要将这些清洗后的数据通过SQL语句写入到MySQL数据库中。这里使用了UPDATE语句来更新已有记录。如果记录不存在,可以考虑使用INSERT INTO ... ON DUPLICATE KEY UPDATE来实现插入或更新操作。

UPDATE lehua.month_goal
SET platform = '{platform}',
    date = '{date}',
    shop_name = '{shop_name}',
    shop_code = '{shop_code}',
    category = '{category}',
    sale_goal = '{sale_goal}',
    sale_out_goal = '{sale_out_goal}',
    create_time = CURRENT_TIMESTAMP,
    create_by = '{create_by}',
    create_user_id = '{create_user_id}',
    modify_time = CURRENT_TIMESTAMP,
    modify_by = '{modify_by}',
    modify_user_id = '{modify_user_id}'
WHERE form_instance_id = '{form_instance_id}';

API接口调用

在整个过程中,API接口调用是关键的一环。我们通过API接口将处理后的数据发送到MySQL数据库。这里使用的是execute方法,通过SQL语句直接执行更新操作。

{
  api: 'execute',
  method: 'SQL'
}

通过上述步骤,我们实现了从源系统到目标系统的数据无缝对接,并确保每个环节的数据都经过严格校验和转换。这不仅提高了数据处理效率,也确保了业务流程的透明度和可追溯性。 如何开发金蝶云星空API接口