使用轻易云进行ETL转换并写入钉钉API的最佳实践

  • 轻易云集成顾问-曹润
### MySQL数据集成到钉钉的案例分享:SiHua-来料质检不良通知采购员-钉钉工作通知 在本次技术案例中,我们将深入探讨如何通过一个高效的数据集成方案,将MySQL中的质量检验数据实时、准确地推送到钉钉,以便及早通知采购员处理异常情况。我们将专注于实现一系列关键功能,包括定时可靠抓取MySQL接口数据、高吞吐量写入至钉钉以及基于API资产管理的优化配置。 首先,为了确保能够及时、准确地获取MySQL中的最新质检数据,我们使用了`SELECT`语句来调用MySQL API接口。这一步骤不仅要求高效且必须保证每一次读取操作都不会漏掉任何关键信息。因此,定期调度和分页处理显得尤为重要,以应对海量数据同时避免系统负载过大。 接下来是如何批量集成这些已获取的质检数据到钉钉工作通知中。我们利用轻易云平台提供的可视化数据流设计工具,根据具体业务需求自定义转换逻辑,从而适配不同的数据结构。这不仅提高了开发效率,也使得整个流程更直观、更易管理。在向钉 钝写入 数据时,通过调用 `topapi/message/corpconversation/asyncsend_v2` API,实现大量数据快速、高效地传输至指定人员的工作台。 此外,为保障整个数据信息流转过程全程透明、稳定运行,集中监控和告警系统实时跟踪各个环节状态及性能。一旦出现异常即刻触发告警机制,同时也支持错误重试从而最大限度降低失败率。 通过例析上述步骤与技术手段,不仅达到预期目标,更露出了一条行之有效的数据集成实践路径。在后续内容中,敬请期待关于具体实现细节的进一步拆解与解析。 ![数据集成平台可视化配置API接口](https://pic.qeasy.cloud/D34.png~tplv-syqr462i7n-qeasy.image) ### 调用MySQL接口获取并加工数据的技术案例 在数据集成过程中,调用源系统MySQL接口是至关重要的一步。本文将详细探讨如何使用轻易云数据集成平台配置元数据,通过调用MySQL接口`select`获取并加工数据。 #### 配置元数据 首先,我们需要配置元数据,以便正确调用MySQL接口。以下是我们使用的元数据配置: ```json { "api": "select", "effect": "QUERY", "method": "POST", "number": "id", "id": "id", "idCheck": true, "request": [ { "field": "main_params", "label": "主参数", "type": "object", "describe": "对应其它请求字段内SQL语句的主参数,必须一一对应。", "value": "1", "children": [ { "field": "limit", "label": "限制结果集返回的行数", "type": "int", "describe": "必要的参数!LIMIT 子句用于限制查询结果返回的行数。它指定了查询应该返回的最大行数。例如,LIMIT 10 表示查询结果只包含最多 10 行数据。这对于分页查询非常有用,可以在每次查询中返回一定数量的结果。", "value": 1000 }, { "field": "offset", "label": "偏移量", "type": "int", "describe": "OFFSET 子句用于指定查询结果的起始位置或偏移量。它表示查询应该从结果集的哪一行开始返回数据。例如,OFFSET 20 表示查询应该从结果集的第 21 行开始返回数据。结合 LIMIT 子句使用时,OFFSET 指定了查询结果的起始行数。", "value": "{PAGINATION_START_ROW}" } ] } ], ... } ``` #### 主SQL语句 在元数据配置中,我们定义了主SQL语句,并使用动态字段进行参数绑定: ```sql SELECT a.id, e.real_name, c.pur_no, d.supplier_full_name, b.bom_no, b.barcode, b.part_no, b.grade_name, b.pic_no, b.quality, b.heat_treatment, a.total_num, a.check_num, a.bad_num, a.except_phenomena, CONCAT( IFNULL(o.userid,''),',',IFNULL(f.userid,''), ',064140631924255283' ) AS userid, now() as time, g.file_name as filename, g.file_path as filepath1, REPLACE(REPLACE(REPLACE(REPLACE(CONCAT('?fileOldName=',g.file_path,'&resource='),'?','%3F'),'=','%3D'),'\/','%2F'),'&','%26') as filepath2, l.customer_name, j.order_no, m.dict_label, j.part_no as 'Prd', j.pic_no as 'Prd_pic', n.real_name as 'PM' FROM mbs_check a LEFT JOIN mbs_pur_record_detail b ON a.pur_record_detail_uuid=b.pur_record_detail_uuid LEFT JOIN mbs_pur_record c ON b.pur_record_uuid=c.pur_record_uuid LEFT JOIN basic_supplier_info d ON c.supplier_uuid=d.supplier_uuid LEFT JOIN sys_user e ON c.create_by=e.user_id LEFT JOIN basic_dingding_userid f ON e.job_number=f.WorkID LEFT JOIN basic_material_info g ON g.part_no=b.part_no LEFT JOIN mbs_order_plan_bom h ON h.bom_no=b.bom_no LEFT JOIN mbs_order_bom j ON h.bom_uuid=j.bom_uuid LEFT JOIN mbs_order k ON j.order_uuid=k.order_uuid LEFT JOIN basic_customer_info l ON l.customer_uuid=k.customer_uuid LEFT JOIN sys_dict_data m ON m.dict_type='sys_mbs_order_type' AND dict_value=k.order_type LEFT JOIN sys_user n ON j.leader=n.user_id LEFT JOIN basic_dingding_userid o ON n.job_number=o.WorkID WHERE a.create_time>=date(now()) AND a.bad_num>0 LIMIT :limit OFFSET :offset; ``` #### 参数绑定 为了确保动态字段与请求参数正确对应,我们采用参数绑定的方法。在执行查询之前,将请求参数值与占位符进行绑定: ```java String sql = "..."; // 上述SQL语句内容 PreparedStatement pstmt = connection.prepareStatement(sql); pstmt.setInt(1, limit); pstmt.setInt(2, offset); ResultSet rs = pstmt.executeQuery(); ``` 通过这种方式,我们提高了查询语句的可读性和维护性,并确保动态字段与请求参数正确对应。 #### 数据处理与返回 在获取到查询结果后,我们可以对数据进行进一步处理,例如格式化日期、计算统计值等。最后,将处理后的数据返回给调用方: ```java List<Map<String, Object>> resultList = new ArrayList<>(); while (rs.next()) { Map<String, Object> row = new HashMap<>(); row.put("id", rs.getInt("id")); row.put("real_name", rs.getString("real_name")); // ... 添加其他字段 resultList.add(row); } return resultList; ``` 通过上述步骤,我们成功实现了调用MySQL接口获取并加工数据,为后续的数据转换与写入奠定了基础。 ![打通钉钉数据接口](https://pic.qeasy.cloud/S24.png~tplv-syqr462i7n-qeasy.image) ### 使用轻易云数据集成平台进行ETL转换并写入钉钉API接口 在数据集成的生命周期中,第二步是将已经集成的源平台数据进行ETL(提取、转换、加载)转换,并转为目标平台能够接收的格式。在本案例中,我们将探讨如何使用轻易云数据集成平台,将来料质检不良通知的数据转换为钉钉API接口所能接收的格式,并最终写入钉钉。 #### 钉钉API接口配置 我们需要将数据发送到钉钉的工作通知接口。根据元数据配置,目标API为`topapi/message/corpconversation/asyncsend_v2`,请求方法为`POST`。以下是请求参数的详细配置: - `userid_list`: 用户ID列表,用于指定接收消息的用户。 - `to_all_user`: 是否发送给所有用户,此处设置为`false`。 - `msg`: 消息内容对象,包含消息类型和具体内容。 - `msgtype`: 消息类型,此处为`markdown`。 - `markdown`: markdown格式的消息内容,包含标题和文本。 - `title`: 消息标题,此处为“来料不良通知”。 - `text`: 消息正文,通过函数拼接多个字段生成。 #### 数据转换过程 在ETL过程中,我们需要对源数据进行提取、转换,并按照目标API接口要求的格式进行组装。以下是具体步骤: 1. **提取源数据**:从源系统中获取来料质检不良通知的数据,包括客户信息、订单信息、物料信息等。 2. **数据清洗与转换**: - 将各个字段的数据进行标准化处理,例如日期格式统一、字符串去除空格等。 - 根据元数据配置,将各字段值映射到目标API请求参数中。 3. **组装请求参数**: - 构建消息内容对象`msg`,其中包含消息类型和具体内容。 - 使用字符串拼接函数生成markdown格式的消息正文。以下是拼接函数示例: ```plaintext CONCAT( '# 来料不良通知: \\n', '{time}', ' \\n', '### 客户:', '{customer_name}', ' \\n', '### 销售订单:', '{order_no}', ' \\n', '### 订单类型:', '{dict_label}', ' \\n', '### PMC:', '{PM}', ' \\n', '### 成品编号:', '{Prd}', ' \\n', '### 成品图号:', '{Prd_pic}', ' \\n', '### 采购员:', '{real_name}', ' \\n', '### 采购订单编号:', '{pur_no}', ' \\n', '### 供应商名称:', '{supplier_full_name}', ' \\n', '### 计划跟踪号:', '{bom_no}', ' \\n', '### 条码:', '{barcode}', ' \\n', '### 物料编号:', '{part_no}', ' \\n', '### 物料名称:', '{grade_name}', ' \\n', '### 图号:', '{pic_no}', ' \\n', '### 材质:', '{quality}', ' \\n', '### 表面处理:', '{heat_treatment}', ' \\n', '### 总数:', '{total_num}', ' \\n', '### 检验数量:', '{check_num}', ' \\n', ... ) ``` 4. **发送请求**: - 使用HTTP POST方法,将组装好的请求参数发送到钉钉API接口。 #### 实践案例 假设我们从源系统提取到以下数据: ```json { "userid": "12345", "time": "2023-10-01", "customer_name": "客户A", "order_no": "SO123456", "dict_label": "普通订单", "PM": "张三", ... } ``` 根据上述元数据配置和提取的数据,我们可以构建如下请求参数: ```json { "userid_list": "12345", "to_all_user": false, "msg": { "msgtype": "markdown", "markdown": { "title": "来料不良通知", "text": "# 来料不良通知: \\\\n2023-10-01\\\\n ### 客户: 客户A\\\\n ### 销售订单: SO123456\\\\n ### ..." } }, "agent_id": "2811489571" } ``` 通过轻易云数据集成平台,我们可以轻松实现上述ETL过程,并确保数据准确无误地写入到钉钉API接口,从而实现自动化的工作通知功能。这不仅提升了业务效率,还减少了人为错误,为企业的信息化管理提供了有力支持。 ![用友与外部系统接口集成开发](https://pic.qeasy.cloud/T23.png~tplv-syqr462i7n-qeasy.image)