ETL技术在轻易云平台的应用:从MySQL到钉钉

  • 轻易云集成顾问-谢楷斌

MySQL数据集成到钉钉的解决方案案例分享:SZ-SC采购订单审核流-钉钉工作通知-催审通知V2

在本技术案例中,我们将聚焦于通过轻易云数据集成平台,将MySQL中的采购订单审核数据有效快速地集成到钉钉,进而实现自动化的工作通知与催审提醒。我们具体分析和实施的方案命名为“SZ-SC采购订单审核流-钉钉工作通知-催审通知V2”。

数据采集与处理

首先,通过MySQL数据库提供的API接口SELECT操作,我们定时可靠地抓取最新的采购订单审核信息。这部分的数据处理需要确保以下几点:

  1. 分页和限流问题:由于数据库中存储的数据量较大,需要设计合理的分页机制及并发控制,以避免对数据库造成过度压力。
  2. 批量抓取数据:采用高吞吐量的数据写入能力,使得每次可以快速导出大量数据,提高单位时间内的数据处理效率。

数据转换与映射

从MySQL获取原始数据后,需要进行一定的清洗和格式转换以符合业务需求。例如:

  1. 将原始表结构转化为满足API调用要求的数据结构。
  2. 定制化的数据映射过程,根据实际业务定义转换逻辑,以保证各字段准确无误。

这一步利用了自定义数据转换逻辑,并依托可视化工具进行直观管理,使复杂的数据加工变得更加简洁明了。

实时监控与异常处理

为了确保整个流程顺利进行,系统配备了集中监控和告警功能,对任务状态实时跟踪。当出现任何异常或错误,如传输失败、格式不匹配等情况,可以自动触发重试机制并记录日志便于追溯分析。

针对特定API接口调用细节

接下来,通过调用钉钉提供的API topapi/message/corpconversation/asyncsend_v2 完成消息发送,包括多个关键点:

  1. 如何适应不同系统间(即MySQL与钉钉)的数据格式差异,是一项重要挑战。
  2. 在向上千用户下发消息时候,如何做到批量、高效且不遗漏单条信息。
  3. 资源消耗优化和性能调优策略,实现海量消息发送而不阻塞主接口服务。

以上几步共同构建了一个稳定、高效、透明且易操作的数据集成流水线,有力保障了企业生产运营中关键环节的信息及时传递。 打通企业微信数据接口

调用MySQL接口select获取并加工数据的技术实现

在数据集成过程中,调用源系统MySQL接口select获取并加工数据是至关重要的一步。本文将详细探讨如何利用轻易云数据集成平台配置元数据,实现对MySQL数据库的高效查询和数据处理。

元数据配置解析

首先,我们需要理解元数据配置中的各个字段及其作用。以下是关键字段的解析:

  • api: 指定API类型,这里为select,表示执行查询操作。
  • effect: 操作效果,这里为QUERY,表示查询操作。
  • method: 请求方法,这里为POST
  • numberid: 用于标识用户ID的字段。
  • request: 包含主参数配置,如分页限制和偏移量。
  • otherRequest: 包含主SQL语句及其动态参数绑定。

主参数配置

主参数包含两个关键字段:limitoffset。它们用于分页查询,以控制每次返回的数据行数和起始位置。

{
  "field": "main_params",
  "label": "主参数",
  "type": "object",
  "describe": "对应其它请求字段内SQL语句的主参数,必须一一对应。",
  "value": "1",
  "children": [
    {
      "field": "limit",
      "label": "限制结果集返回的行数",
      "type": "int",
      "describe": "必要的参数!LIMIT 子句用于限制查询结果返回的行数。",
      "value": "{PAGINATION_PAGE_SIZE}"
    },
    {
      "field": "offset",
      "label": "偏移量",
      "type": "int",
      "describe": "OFFSET 子句用于指定查询结果的起始位置或偏移量。",
      "value": "{PAGINATION_START_ROW}"
    }
  ]
}

主SQL语句及动态参数绑定

主SQL语句中使用了动态参数绑定,以确保查询语句与请求参数一一对应。这种方式提高了查询语句的可读性和维护性,并保证了查询的准确性和安全性。

{
  "field": "main_sql",
  "label": "主SQL语句",
  "type": "string",
  "describe": "...通过这种优化方式,我们能够提高查询语句的可读性和维护性...",
  "value": "
    select user1.user_id,
           user1.user_count,
           user2.job_number,
           case user3.userid
             when '0119001462201065096' then '0119001462201065096,2635645303840252'
             else user3.userid
           end as userid,
           user2.real_name,
           now() as id
    from (
        SELECT user_id, COUNT(*) AS user_count
        FROM (
            SELECT SUBSTRING_INDEX(SUBSTRING_INDEX(afp.audit_user, ',', numbers.n), ',', -1) AS user_id
            FROM mbs_pur_record a
            LEFT JOIN basic_supplier_info b ON b.supplier_uuid = a.supplier_uuid
            LEFT JOIN sys_user c ON c.user_id = a.create_by
            LEFT JOIN act_to_form_process afp ON afp.form_uuid = a.pur_audit_uuid 
                AND afp.company_code = a.company_code 
            JOIN (
                SELECT a.N + b.N * 10 + 1 AS n
                FROM (
                    SELECT 0 AS N
                    UNION SELECT 1 UNION SELECT 2 UNION SELECT 3 UNION SELECT 4 UNION SELECT 5
                    UNION SELECT 6 UNION SELECT 7 UNION SELECT 8 UNION SELECT 9
                ) a
                CROSS JOIN (
                    SELECT 0 AS N
                    UNION SELECT 1 UNION SELECT 2 UNION SELECT 3 UNION SELECT 4 UNION SELECT 5
                    UNION SELECT 6 UNION SELECT 7 UNION SELECT 8 UNION SELECT 9
                ) b
            ) numbers ON numbers.n <= LENGTH(afp.audit_user) - LENGTH(REPLACE(afp.audit_user, ',', '')) + 1
            WHERE a.pur_status IN ('1', '11') 
                AND a.company_code = 'TYZN'
                AND a.pur_type = '2' and a.dingding_flag='1'
        ) AS users
        GROUP BY user_id
        ORDER BY user_id) user1 
    left join sys_user user2 on user1.user_id=user2.user_id
    left join basic_dingding_userid user3 on user2.job_number=user3.workid
    limit :limit offset :offset"
}

实现步骤

  1. 定义请求结构:根据元数据配置定义请求结构,包括分页参数(limit、offset)和主SQL语句。

  2. 构建SQL查询:将主SQL语句中的动态字段替换为占位符(如?),并在执行查询前进行参数绑定。

  3. 执行查询:通过轻易云平台发送POST请求,执行构建好的SQL查询,并获取结果集。

  4. 处理结果:对返回的数据进行必要的处理,如格式转换、过滤等,以满足业务需求。

示例代码

以下是一个示例代码片段,展示如何通过轻易云平台调用MySQL接口并处理返回的数据:

import requests

# 定义请求URL和头信息(假设已知)
url = 'https://api.qingyiyun.com/data-integration/query'
headers = {'Content-Type': 'application/json'}

# 定义请求体,包括分页参数和主SQL语句(带占位符)
payload = {
    'main_params': {
        'limit': PAGINATION_PAGE_SIZE,
        'offset': PAGINATION_START_ROW,
    },
    'main_sql': """
        select user1.user_id,
               ...
               limit ? offset ?
    """
}

# 发起POST请求并获取响应结果
response = requests.post(url, headers=headers, json=payload)
data = response.json()

# 对返回的数据进行处理(如格式转换、过滤等)
processed_data = process_data(data)

def process_data(data):
    # 数据处理逻辑...
    return processed_data

print(processed_data)

通过上述步骤,我们可以高效地调用MySQL接口select获取并加工数据,为后续的数据转换与写入奠定基础。 金蝶与WMS系统接口开发配置

使用轻易云数据集成平台实现ETL转换并写入钉钉API接口

在数据集成生命周期的第二步,我们将已经集成的源平台数据进行ETL(Extract, Transform, Load)转换,转为目标平台钉钉API接口所能够接收的格式,并最终写入目标平台。以下将详细介绍如何通过元数据配置实现这一过程。

数据请求与清洗

在开始ETL转换之前,我们需要确保从源平台获取的数据已经经过清洗和预处理。这一步骤确保了数据的准确性和一致性,为后续的转换和写入打下基础。

数据转换与写入

我们使用轻易云数据集成平台提供的可视化界面,配置元数据以实现数据的转换和写入。以下是具体步骤:

  1. 配置API接口信息

    根据元数据配置,我们需要调用钉钉API接口topapi/message/corpconversation/asyncsend_v2。这是一个异步发送工作通知的接口,支持POST请求。

  2. 设置请求参数

    请求参数包括userid_list, to_all_user, msg, 和 agent_id。这些参数需要根据源平台的数据进行动态填充。

    • userid_list: 用户ID列表,类型为字符串。
    • to_all_user: 是否发送给所有用户,类型为字符串,这里设置为false
    • msg: 消息内容,类型为对象,包括消息类型和具体消息内容。
      • msgtype: 消息类型,这里设置为markdown
      • markdown: markdown消息内容,包括标题和文本。
      • title: 消息标题,这里设置为采购审核流
      • text: 消息文本,通过函数拼接生成,包含动态数据,如紧急采购订单数量和审批链接。
    • agent_id: 应用ID,类型为字符串,这里设置为2811489571
  3. 元数据配置示例

    以下是具体的元数据配置:

    {
       "api": "topapi/message/corpconversation/asyncsend_v2",
       "effect": "EXECUTE",
       "method": "POST",
       "idCheck": true,
       "request": [
           {
               "field": "userid_list",
               "label": "userid_list",
               "type": "string",
               "describe": "111",
               "value": "{userid}"
           },
           {
               "field": "to_all_user",
               "label": "to_all_user",
               "type": "string",
               "describe": "111",
               "value": "false"
           },
           {
               "field": "msg",
               "label": "msg",
               "type": "object",
               "describe": "111",
               "value": "test",
               "children": [
                   {
                       "field": "msgtype",
                       "label": "msgtype",
                       "type": "string",
                       "value": "markdown"
                   },
                   {
                       "field": "markdown",
                       "label": "markdown",
                       "type": ":object",
                       ":children":[
                           {
                               ":field":"title", 
                               ":label":"title", 
                               ":type":"string", 
                               ":value":"采购审核流"
                           },
                           {
                               ":field":"text", 
                               ":label":"text", 
                               ":type":"string", 
                               ":value":"_function CONCAT('![](http://192.168.110.232:9000/img/logo.eb09f95f.png)  \\n # *紧急采购订单-待审*  \\n  ### 你有',{user_count},'张紧急采购订单等待审批  ','{id}','  \\n  ### [<查看详情>](http://192.168.110.232:9000/examine/outsourcAudit)')"
                           }
                       ]
                   }
               ]
           },
           {
               ":field":"agent_id", 
               ":label":"agent_id", 
               ":type":"string", 
               ":describe":"111", 
               ":value":"2811489571"
           }
       ]
    }
  4. 执行API调用

    配置完成后,我们通过轻易云数据集成平台执行API调用,将转换后的数据发送到钉钉。系统会自动处理请求并监控执行状态,确保每个环节都透明可见。

  5. 监控与日志记录

    在整个过程中,轻易云平台提供实时监控功能,可以随时查看数据流动和处理状态。同时,系统会记录详细日志,以便在出现问题时进行排查和解决。

通过上述步骤,我们成功地将源平台的数据经过ETL转换,并通过钉钉API接口发送工作通知,实现了不同系统间的数据无缝对接。这不仅提高了业务效率,也确保了信息传递的准确性和及时性。 企业微信与ERP系统接口开发配置