ETL技术在轻易云平台的应用:从MySQL到钉钉

  • 轻易云集成顾问-谢楷斌
### MySQL数据集成到钉钉的解决方案案例分享:SZ-SC采购订单审核流-钉钉工作通知-催审通知V2 在本技术案例中,我们将聚焦于通过轻易云数据集成平台,将MySQL中的采购订单审核数据有效快速地集成到钉钉,进而实现自动化的工作通知与催审提醒。我们具体分析和实施的方案命名为“SZ-SC采购订单审核流-钉钉工作通知-催审通知V2”。 #### 数据采集与处理 首先,通过MySQL数据库提供的API接口`SELECT`操作,我们定时可靠地抓取最新的采购订单审核信息。这部分的数据处理需要确保以下几点: 1. **分页和限流问题**:由于数据库中存储的数据量较大,需要设计合理的分页机制及并发控制,以避免对数据库造成过度压力。 2. **批量抓取数据**:采用高吞吐量的数据写入能力,使得每次可以快速导出大量数据,提高单位时间内的数据处理效率。 #### 数据转换与映射 从MySQL获取原始数据后,需要进行一定的清洗和格式转换以符合业务需求。例如: 1. 将原始表结构转化为满足API调用要求的数据结构。 2. 定制化的数据映射过程,根据实际业务定义转换逻辑,以保证各字段准确无误。 这一步利用了自定义数据转换逻辑,并依托可视化工具进行直观管理,使复杂的数据加工变得更加简洁明了。 #### 实时监控与异常处理 为了确保整个流程顺利进行,系统配备了集中监控和告警功能,对任务状态实时跟踪。当出现任何异常或错误,如传输失败、格式不匹配等情况,可以自动触发重试机制并记录日志便于追溯分析。 #### 针对特定API接口调用细节 接下来,通过调用钉钉提供的API `topapi/message/corpconversation/asyncsend_v2` 完成消息发送,包括多个关键点: 1. 如何适应不同系统间(即MySQL与钉钉)的数据格式差异,是一项重要挑战。 2. 在向上千用户下发消息时候,如何做到批量、高效且不遗漏单条信息。 3. 资源消耗优化和性能调优策略,实现海量消息发送而不阻塞主接口服务。 以上几步共同构建了一个稳定、高效、透明且易操作的数据集成流水线,有力保障了企业生产运营中关键环节的信息及时传递。 ![打通企业微信数据接口](https://pic.qeasy.cloud/D19.png~tplv-syqr462i7n-qeasy.image) ### 调用MySQL接口select获取并加工数据的技术实现 在数据集成过程中,调用源系统MySQL接口select获取并加工数据是至关重要的一步。本文将详细探讨如何利用轻易云数据集成平台配置元数据,实现对MySQL数据库的高效查询和数据处理。 #### 元数据配置解析 首先,我们需要理解元数据配置中的各个字段及其作用。以下是关键字段的解析: - `api`: 指定API类型,这里为`select`,表示执行查询操作。 - `effect`: 操作效果,这里为`QUERY`,表示查询操作。 - `method`: 请求方法,这里为`POST`。 - `number`和`id`: 用于标识用户ID的字段。 - `request`: 包含主参数配置,如分页限制和偏移量。 - `otherRequest`: 包含主SQL语句及其动态参数绑定。 #### 主参数配置 主参数包含两个关键字段:`limit`和`offset`。它们用于分页查询,以控制每次返回的数据行数和起始位置。 ```json { "field": "main_params", "label": "主参数", "type": "object", "describe": "对应其它请求字段内SQL语句的主参数,必须一一对应。", "value": "1", "children": [ { "field": "limit", "label": "限制结果集返回的行数", "type": "int", "describe": "必要的参数!LIMIT 子句用于限制查询结果返回的行数。", "value": "{PAGINATION_PAGE_SIZE}" }, { "field": "offset", "label": "偏移量", "type": "int", "describe": "OFFSET 子句用于指定查询结果的起始位置或偏移量。", "value": "{PAGINATION_START_ROW}" } ] } ``` #### 主SQL语句及动态参数绑定 主SQL语句中使用了动态参数绑定,以确保查询语句与请求参数一一对应。这种方式提高了查询语句的可读性和维护性,并保证了查询的准确性和安全性。 ```json { "field": "main_sql", "label": "主SQL语句", "type": "string", "describe": "...通过这种优化方式,我们能够提高查询语句的可读性和维护性...", "value": " select user1.user_id, user1.user_count, user2.job_number, case user3.userid when '0119001462201065096' then '0119001462201065096,2635645303840252' else user3.userid end as userid, user2.real_name, now() as id from ( SELECT user_id, COUNT(*) AS user_count FROM ( SELECT SUBSTRING_INDEX(SUBSTRING_INDEX(afp.audit_user, ',', numbers.n), ',', -1) AS user_id FROM mbs_pur_record a LEFT JOIN basic_supplier_info b ON b.supplier_uuid = a.supplier_uuid LEFT JOIN sys_user c ON c.user_id = a.create_by LEFT JOIN act_to_form_process afp ON afp.form_uuid = a.pur_audit_uuid AND afp.company_code = a.company_code JOIN ( SELECT a.N + b.N * 10 + 1 AS n FROM ( SELECT 0 AS N UNION SELECT 1 UNION SELECT 2 UNION SELECT 3 UNION SELECT 4 UNION SELECT 5 UNION SELECT 6 UNION SELECT 7 UNION SELECT 8 UNION SELECT 9 ) a CROSS JOIN ( SELECT 0 AS N UNION SELECT 1 UNION SELECT 2 UNION SELECT 3 UNION SELECT 4 UNION SELECT 5 UNION SELECT 6 UNION SELECT 7 UNION SELECT 8 UNION SELECT 9 ) b ) numbers ON numbers.n <= LENGTH(afp.audit_user) - LENGTH(REPLACE(afp.audit_user, ',', '')) + 1 WHERE a.pur_status IN ('1', '11') AND a.company_code = 'TYZN' AND a.pur_type = '2' and a.dingding_flag='1' ) AS users GROUP BY user_id ORDER BY user_id) user1 left join sys_user user2 on user1.user_id=user2.user_id left join basic_dingding_userid user3 on user2.job_number=user3.workid limit :limit offset :offset" } ``` #### 实现步骤 1. **定义请求结构**:根据元数据配置定义请求结构,包括分页参数(limit、offset)和主SQL语句。 2. **构建SQL查询**:将主SQL语句中的动态字段替换为占位符(如?),并在执行查询前进行参数绑定。 3. **执行查询**:通过轻易云平台发送POST请求,执行构建好的SQL查询,并获取结果集。 4. **处理结果**:对返回的数据进行必要的处理,如格式转换、过滤等,以满足业务需求。 #### 示例代码 以下是一个示例代码片段,展示如何通过轻易云平台调用MySQL接口并处理返回的数据: ```python import requests # 定义请求URL和头信息(假设已知) url = 'https://api.qingyiyun.com/data-integration/query' headers = {'Content-Type': 'application/json'} # 定义请求体,包括分页参数和主SQL语句(带占位符) payload = { 'main_params': { 'limit': PAGINATION_PAGE_SIZE, 'offset': PAGINATION_START_ROW, }, 'main_sql': """ select user1.user_id, ... limit ? offset ? """ } # 发起POST请求并获取响应结果 response = requests.post(url, headers=headers, json=payload) data = response.json() # 对返回的数据进行处理(如格式转换、过滤等) processed_data = process_data(data) def process_data(data): # 数据处理逻辑... return processed_data print(processed_data) ``` 通过上述步骤,我们可以高效地调用MySQL接口select获取并加工数据,为后续的数据转换与写入奠定基础。 ![金蝶与WMS系统接口开发配置](https://pic.qeasy.cloud/S18.png~tplv-syqr462i7n-qeasy.image) ### 使用轻易云数据集成平台实现ETL转换并写入钉钉API接口 在数据集成生命周期的第二步,我们将已经集成的源平台数据进行ETL(Extract, Transform, Load)转换,转为目标平台钉钉API接口所能够接收的格式,并最终写入目标平台。以下将详细介绍如何通过元数据配置实现这一过程。 #### 数据请求与清洗 在开始ETL转换之前,我们需要确保从源平台获取的数据已经经过清洗和预处理。这一步骤确保了数据的准确性和一致性,为后续的转换和写入打下基础。 #### 数据转换与写入 我们使用轻易云数据集成平台提供的可视化界面,配置元数据以实现数据的转换和写入。以下是具体步骤: 1. **配置API接口信息** 根据元数据配置,我们需要调用钉钉API接口`topapi/message/corpconversation/asyncsend_v2`。这是一个异步发送工作通知的接口,支持POST请求。 2. **设置请求参数** 请求参数包括`userid_list`, `to_all_user`, `msg`, 和 `agent_id`。这些参数需要根据源平台的数据进行动态填充。 - `userid_list`: 用户ID列表,类型为字符串。 - `to_all_user`: 是否发送给所有用户,类型为字符串,这里设置为`false`。 - `msg`: 消息内容,类型为对象,包括消息类型和具体消息内容。 - `msgtype`: 消息类型,这里设置为`markdown`。 - `markdown`: markdown消息内容,包括标题和文本。 - `title`: 消息标题,这里设置为`采购审核流`。 - `text`: 消息文本,通过函数拼接生成,包含动态数据,如紧急采购订单数量和审批链接。 - `agent_id`: 应用ID,类型为字符串,这里设置为`2811489571`。 3. **元数据配置示例** 以下是具体的元数据配置: ```json { "api": "topapi/message/corpconversation/asyncsend_v2", "effect": "EXECUTE", "method": "POST", "idCheck": true, "request": [ { "field": "userid_list", "label": "userid_list", "type": "string", "describe": "111", "value": "{userid}" }, { "field": "to_all_user", "label": "to_all_user", "type": "string", "describe": "111", "value": "false" }, { "field": "msg", "label": "msg", "type": "object", "describe": "111", "value": "test", "children": [ { "field": "msgtype", "label": "msgtype", "type": "string", "value": "markdown" }, { "field": "markdown", "label": "markdown", "type": ":object", ":children":[ { ":field":"title", ":label":"title", ":type":"string", ":value":"采购审核流" }, { ":field":"text", ":label":"text", ":type":"string", ":value":"_function CONCAT('![](http://192.168.110.232:9000/img/logo.eb09f95f.png) \\n # *紧急采购订单-待审* \\n ### 你有',{user_count},'张紧急采购订单等待审批 ','{id}',' \\n ### [<查看详情>](http://192.168.110.232:9000/examine/outsourcAudit)')" } ] } ] }, { ":field":"agent_id", ":label":"agent_id", ":type":"string", ":describe":"111", ":value":"2811489571" } ] } ``` 4. **执行API调用** 配置完成后,我们通过轻易云数据集成平台执行API调用,将转换后的数据发送到钉钉。系统会自动处理请求并监控执行状态,确保每个环节都透明可见。 5. **监控与日志记录** 在整个过程中,轻易云平台提供实时监控功能,可以随时查看数据流动和处理状态。同时,系统会记录详细日志,以便在出现问题时进行排查和解决。 通过上述步骤,我们成功地将源平台的数据经过ETL转换,并通过钉钉API接口发送工作通知,实现了不同系统间的数据无缝对接。这不仅提高了业务效率,也确保了信息传递的准确性和及时性。 ![企业微信与ERP系统接口开发配置](https://pic.qeasy.cloud/T20.png~tplv-syqr462i7n-qeasy.image)