利用轻易云实现数据集成ETL转换并写入MySQL

  • 轻易云集成顾问-彭亮
### 钉钉数据集成到MySQL:dd-新转账单(资金调拨)-->mysql(鸿巢)技术案例分享 在实施“dd-新转账单(资金调拨)-->mysql(鸿巢)”项目时,面临的主要挑战是如何高效、可靠地将钉钉平台上的数据集成至内部的MySQL数据库。通过轻易云数据集成平台,我们利用其高级特性,实现了从获取钉钉API接口的数据,到转换及入库全过程的精准控制。 首先,在对接过程中我们使用了`v1.0/yida/processes/instances` API,从钉钉中抓取相关业务流程实例的数据。这一步骤涉及到了处理分页和限流问题,为确保不漏单,我们制定了一套定时任务机制,能够可靠地周期性抓取并存储最新的数据。同时,通过设置自定义的重试策略,有效降低了由于网络延迟或API速率限制所导致的问题。 对于输送到MySQL数据库中的数据,我们采用了批量写入的方法,以提升整体吞吐效率。借助可视化的数据流设计工具,将多次复杂操作简化为一系列直观且容易管理的步骤。此外,还使用了执行API `execute`进行快速、大量的数据写入,这确保了系统在大规模数据交换环境下依然保持稳定运行。 为了实现与业务需求相适应的数据转换逻辑,自定义脚本被用于转换和映射不同格式的数据。在这一过程中,我们特别注意处理可能存在的不一致类型及结构差异,提高最终储存于MySQL中的数据质量。同时,通过集中监控告警系统,对整个集成过程进行了实时跟踪,迅速发现并解决任何异常情况,使得整个项目顺利落地实施。 ![数据集成平台可视化配置API接口](https://pic.qeasy.cloud/D15.png~tplv-syqr462i7n-qeasy.image) ### 调用钉钉接口v1.0/yida/processes/instances获取并加工数据 在数据集成的生命周期中,调用源系统接口是至关重要的一步。本文将详细探讨如何通过轻易云数据集成平台调用钉钉接口`v1.0/yida/processes/instances`,获取并加工数据,以实现高效的数据集成。 #### 接口调用配置 首先,我们需要配置API调用的元数据。以下是具体的配置细节: ```json { "api": "v1.0/yida/processes/instances", "method": "POST", "number": "title", "id": "processInstanceId", "idCheck": true, "formatResponse": [ {"old": "dateField_lglvrpp4", "new": "datetime_new", "format": "date"}, {"old": "serialNumberField_lgov9d3b", "new": "order_no_new", "format": "string"} ], "request": [ {"field": "pageNumber", "label": "分页页码", "type": "string", "describe": "分页页码", "value": "{PAGINATION_START_PAGE}"}, {"field": "pageSize", "label": "分页大小", "type": "string", "describe": "分页大小", "value": "{PAGINATION_PAGE_SIZE}"}, {"field": "appType", "label": "应用ID", "type": "string", "describe": "应用ID", "value":"APP_WTSCMZ1WOOHGIM5N28BQ"}, {"field":"systemToken","label":"应用秘钥","type":"string","describe":"应用秘钥","value":"IS866HB1DXJ8ODN3EXSVD750RBTK2X72R8MELL4"}, {"field":"userId","label":"用户的userid","type":"string","describe":"用户的userid","value":"16000443318138909"}, {"field":"language","label":"语言","type":"string","describe":"语言,取值:zh_CN:中文(默认值)en_US:英文"}, {"field":"formUuid","label":"表单ID","type":"string","describe":"表单ID","value":"FORM-KW766OD1931ALBVJBHUHLD6KFJ3G3OELLVLGL0"}, {"field":"searchFieldJson","label":"条件","type":"object","children":[{"field":"selectField_lglvrpmg","label":"类型","type":"string","value":"跨组织转账"}]}, {"field":"originatorId","label":"根据流程发起人工号查询","type":"string","describe":"根据流程发起人工号查询"}, {"field":"createFromTimeGMT","label":"创建时间起始值","type":"string","describe":"创建时间起始值", "value": "_function DATE_FORMAT(DATE_ADD(NOW(),INTERVAL - 25 DAY),'%Y-%m-%d 00:00:00')"}, {"field": "createToTimeGMT", "label": "创建时间终止值", "type": "string", "describe": "创建时间终止值", "value": "{{CURRENT_TIME|datetime}}" }, { "field": "modifiedFromTimeGMT", "label": "修改时间起始值", "type": "string", "describe": "修改时间起始值" }, { "field": "modifiedToTimeGMT", "label": "修改时间终止值", "type": "string", "describe": "修改时间终止值" }, { "field": "taskId", "label": "任务ID", "type": "string", "describe": "任务ID" }, { "field": "instanceStatus", "label": "实例状态", "type": "strings ,"describe : 实例状态 ,"value : COMPLETED" }, { " approvedResult ," label : 流程审批结果 ,"type : strings ,"describe : 流程审批结果 ,"value : agree" } ], " condition":[[{"field :"dateField_lglvrpp4" ,"logic :"notnull"}]] } ``` #### 请求参数详解 - **pageNumber** 和 **pageSize**:用于分页控制,确保每次请求的数据量可控。 - **appType** 和 **systemToken**:分别为应用ID和秘钥,用于身份验证。 - **userId** 和 **language**:指定用户和语言环境。 - **formUuid** 和 **searchFieldJson**:用于指定表单和搜索条件,其中`searchFieldJson`包含了具体的业务逻辑,如类型为“跨组织转账”。 - **createFromTimeGMT** 和 **createToTimeGMT**:用于限定数据的创建时间范围,确保只获取特定时间段内的数据。 - **instanceStatus** 和 **approvedResult**:用于过滤已完成且审批通过的实例。 #### 数据格式化与转换 在获取到原始数据后,需要对其进行格式化和转换。元数据配置中的`formatResponse`字段定义了这一过程: - 将 `dateField_lglvrpp4` 转换为 `datetime_new`,格式为日期。 - 将 `serialNumberField_lgov9d3b` 转换为 `order_no_new`,格式为字符串。 这种转换确保了数据在进入目标系统前已经符合预期格式,提高了数据处理效率。 #### 数据清洗与写入 在完成数据请求和初步清洗后,可以利用轻易云平台提供的工具进一步处理这些数据,并将其写入目标系统(如MySQL数据库)。这一过程包括: 1. 数据校验:根据元数据中的条件配置(如字段不为空)进行校验。 2. 数据转换:利用平台提供的脚本或函数对数据进行进一步转换。 3. 数据写入:通过配置好的连接器,将处理后的数据写入目标数据库。 通过上述步骤,我们可以高效地从钉钉系统中获取所需的数据,并将其无缝集成到其他业务系统中,实现跨系统的数据共享与协同工作。 ![如何对接金蝶云星空API接口](https://pic.qeasy.cloud/S6.png~tplv-syqr462i7n-qeasy.image) ### 使用轻易云数据集成平台进行ETL转换并写入MySQL API接口 在数据集成过程中,ETL(Extract, Transform, Load)转换是关键步骤之一。本文将深入探讨如何通过轻易云数据集成平台,将已经集成的源平台数据进行ETL转换,转为目标平台 MySQL API 接口所能够接收的格式,并最终写入目标平台。 #### 数据请求与清洗 在进行ETL转换之前,首先需要完成数据的请求与清洗。这一步骤确保从源系统获取的数据是准确且符合要求的。假设我们已经完成了这一步骤,现在我们关注如何将清洗后的数据进行转换并写入目标平台。 #### 数据转换与写入 在轻易云数据集成平台中,元数据配置是实现数据转换和写入的重要环节。以下是一个详细的元数据配置示例: ```json { "api": "execute", "effect": "EXECUTE", "method": "POST", "idCheck": true, "request": [ { "field": "main_params", "label": "main_params", "type": "object", "describe": "111", "children": [ { "field": "extend_processInstanceId", "label": "明细id", "type": "string", "value": "{{extend.processInstanceId}}" }, { "field": "order_no_new", "label": "单号", "type": "string", "value": "{order_no_new}(ZJDB)" }, { "field": "datetime_new", "label": "时间", "type": "date", "value": "{datetime_new}" }, { "field": "qty_count", "label": "数量", "type": "string", "value":"1" }, { "field":"sales_count", "label":"金额", "type":"string", "value":"{numberField_lglvrpn8}" }, { “field”:”status”, “label”:”状态”, “type”:”string” }, { “field”:”Document_Type”, “label”:”单据类型”, “type”:”string”, “value”:”资金调拨” } ] } ], "otherRequest":[ { “field”:”main_sql”, “label”:”main_sql”, “type”:”string”, “describe”:”111”, “value”:” INSERT INTO `hc_dd_zjdb` ( `extend_processInstanceId`, `order_no_new`, `datetime_new`, `qty_count`, `sales_count`, `status`, `Document_Type` ) VALUES ( :extend_processInstanceId, :order_no_new, :datetime_new, :qty_count, :sales_count, :status, :Document_Type ) ” } ] } ``` #### 元数据配置解析 1. **API接口定义** - `"api":"execute"`:指定要执行的操作。 - `"effect":"EXECUTE"`:定义操作效果为执行。 - `"method":"POST"`:使用HTTP POST方法。 2. **请求参数** - `"request"`部分定义了请求参数,包括字段名称、标签、类型和描述等信息。 - 每个字段都包含具体的值来源,例如: - `"extend_processInstanceId"`:从`{{extend.processInstanceId}}`获取值。 - `"order_no_new"`:从`{order_no_new}(ZJDB)`获取值并添加后缀。 - `"datetime_new"`:从`{datetime_new}`获取日期值。 - `"qty_count"`:固定值为`1`。 - `"sales_count"`:从`{numberField_lglvrpn8}`获取金额值。 - `"status"`和`"Document_Type"`分别表示状态和单据类型。 3. **SQL语句** - `"otherRequest"`部分定义了要执行的SQL语句,用于将数据插入到目标MySQL数据库中。 - SQL语句中的占位符(如`:extend_processInstanceId`)对应于请求参数中的字段。 #### 实现过程 通过上述元数据配置,我们可以实现以下几个步骤: 1. **提取(Extract)**: 从源系统提取所需的数据,并根据业务需求进行初步处理和清洗。 2. **转换(Transform)**: 根据元数据配置,将提取的数据转换为目标系统所需的格式。例如,将订单号添加后缀,将日期格式化等。 3. **加载(Load)**: 使用HTTP POST方法,将转换后的数据通过API接口发送到目标MySQL数据库,并执行插入操作。 这种方式不仅确保了数据的一致性和完整性,还大大简化了跨系统的数据集成过程。通过灵活的元数据配置,我们可以轻松适应不同业务场景下的数据需求,实现高效的数据流转和处理。 ![如何对接钉钉API接口](https://pic.qeasy.cloud/T8.png~tplv-syqr462i7n-qeasy.image)