轻易云平台ETL转换与MySQL写入详解

  • 轻易云集成顾问-孙传友
### 钉钉数据集成到MySQL的技术案例分享 在企业的信息化进程中,数据的重要性不言而喻,而如何高效地集成不同系统的数据则是其中的一大挑战。本文将重点讨论一个实际运行的系统对接案例:将钉钉的新付款退款单(方案名称:dd-新付款退款单-->mysql (鸿巢)其他退款)数据集成到MySQL数据库中。 #### 数据获取与API调用 首先,我们需要从钉钉平台获取相关的数据。为此,我们使用了钉钉提供的API接口 `v1.0/yida/processes/instances` 来定时抓取新生成的付款和退款信息。这一过程要求我们处理分页和限流等复杂情况,以确保数据完整且准确无误。 ```shell GET /v1.0/yida/processes/instances?appType=YOUR_APP_TYPE&limit=100 HTTP/1.1 Host: api.dingtalk.com Authorization: Bearer YOUR_ACCESS_TOKEN ``` 为了避免漏单问题,通过持续监控接口返回的数据状态,并在一定时间间隔内重复请求,从而确保所有记录都被成功捕获。同时设置重试机制以应对网络抖动或临时性API错误。 #### 数据转换与映射 由于钉钉返回的数据格式可能与MySQL预期的不一致,我们需进行相应的自定义转换逻辑。例如,将时间格式标准化、字段名称映射等等。通过平台提供的可视化数据流设计工具,可以轻松构建并调试这些转换步骤,保证每条记录在写入前都符合目标数据库表结构要求。 ```json { "instanceId": "12345", "createTime": "2023-10-15T08:00:00Z", ... } ``` #### MySQL批量写入 当完成初步转换后,即可利用高吞吐量支持快速将大量数据批量插入到MySQL中。采用如下方式: ```sql INSERT INTO refunds (id, create_time, ...) VALUES (?, ?, ...); ``` 结合事务管理和错误日志机制,确保在出现异常时能够及时回溯并重新执行特定操作。此外,通过集中监控和告警系统,可实时跟踪整个任务状态及性能表现,有助于提前发现潜在问题并迅速处理,提高整体可靠性。 通过上述几个关键环节与技术细节,实现了从钉钉至MySQL的数据顺利集成。具体流程中的配置及编码将在后续章节详细展开,包括如何调用核心API、处理数据质量监控以及优化性能等更多实用技巧。 ![如何开发企业微信API接口](https://pic.qeasy.cloud/D37.png~tplv-syqr462i7n-qeasy.image) ### 调用钉钉接口v1.0/yida/processes/instances获取并加工数据 在数据集成生命周期的第一步中,调用源系统的API接口是至关重要的一环。本文将详细探讨如何通过轻易云数据集成平台调用钉钉接口`v1.0/yida/processes/instances`,获取并加工数据,以实现数据的有效集成。 #### 接口调用配置 首先,我们需要配置API调用的元数据。根据提供的元数据配置,以下是具体的请求参数和格式: ```json { "api": "v1.0/yida/processes/instances", "method": "POST", "request": [ {"field": "pageNumber", "type": "string", "value": "{PAGINATION_START_PAGE}"}, {"field": "pageSize", "type": "string", "value": "{PAGINATION_PAGE_SIZE}"}, {"field": "appType", "type": "string", "value": "APP_WTSCMZ1WOOHGIM5N28BQ"}, {"field": "systemToken", "type": "string", "value": "IS866HB1DXJ8ODN3EXSVD750RBTK2X72R8MELL4"}, {"field": "userId", "type": "string", "value": "16000443318138909"}, {"field": "language", "type": "string", "value": ""}, {"field": "formUuid", "type": "string", "value": ""}, {"field":"searchFieldJson","type":"object","children":[ {"field":"selectField_lgm25d98","label":"费用分类","type":"string"}, {"parent":"searchFieldJson","label":"流水号","field":"serialNumberField_lgm25d8r","type":"string"}, {"parent":"searchFieldJson","label":"申请人","field":"textField_lgm25d8p","type":"string"} ]}, {"field":"originatorId","type":"string"}, {"field":"createFromTimeGMT","type":"string","value":"_function DATE_FORMAT(DATE_ADD(NOW(),INTERVAL - 25 DAY),'%Y-%m-%d 00:00:00')"}, {"field":"createToTimeGMT","type":"string","value":"{{CURRENT_TIME|datetime}}"}, {"field":"modifiedFromTimeGMT","type":"string"}, {"field":"modifiedToTimeGMT","type":"string"}, {"field":"taskId","type":"string"}, {"field":"instanceStatus","type":"string","value":"COMPLETED"}, {"field":"approvedResult","type":"string","value":"agree"} ], ... } ``` #### 数据请求与清洗 在进行数据请求时,需要特别注意分页参数`pageNumber`和`pageSize`,这些参数确保我们能够逐页获取大批量的数据。此外,通过设置`createFromTimeGMT`和`createToTimeGMT`,可以限定查询的数据时间范围,从而提高查询效率。 ```json { ... { field: 'createFromTimeGMT', type: 'string', value: '_function DATE_FORMAT(DATE_ADD(NOW(),INTERVAL - 25 DAY),\'%Y-%m-%d 00:00:00\')' }, { field: 'createToTimeGMT', type: 'string', value: '{{CURRENT_TIME|datetime}}' }, ... } ``` 在清洗过程中,需要对返回的数据进行格式化处理。例如,将日期字段`dateField_lgn3helb`转换为新的字段名`datetime_new`,并将其格式化为标准日期格式;将流水号字段`serialNumberField_lgm25d8r`转换为新的字段名`order_no_new`。 ```json { ... formatResponse: [ {old: 'dateField_lgn3helb', new: 'datetime_new', format: 'date'}, {old: 'serialNumberField_lgm25d8r', new: 'order_no_new', format: 'string'} ] } ``` #### 数据转换与写入 在完成数据清洗后,需要将处理后的数据写入目标系统。在本案例中,目标系统是MySQL数据库。通过轻易云平台,可以轻松配置目标数据库连接,并将清洗后的数据批量写入。 ```json { ... beatFlat: ['tableField_lgm25d9j'] } ``` #### 实例状态与审批结果过滤 为了确保只获取已完成且审批通过的实例,我们在请求参数中添加了实例状态和审批结果的过滤条件: ```json { ... { field: 'instanceStatus', type: 'string', value: 'COMPLETED' }, { field: 'approvedResult', type: 'string', value: 'agree' } } ``` 通过上述配置,我们可以高效地调用钉钉接口,获取所需的数据,并进行必要的清洗和转换,最终实现数据的无缝集成。这不仅提高了业务流程的透明度和效率,也为后续的数据分析和决策提供了可靠的数据支持。 ![如何对接用友BIP接口](https://pic.qeasy.cloud/S18.png~tplv-syqr462i7n-qeasy.image) ### 使用轻易云数据集成平台进行ETL转换并写入MySQL API接口 在数据集成的生命周期中,第二步是将已经集成的源平台数据进行ETL(Extract, Transform, Load)转换,并最终写入目标平台。在本案例中,我们将详细探讨如何将数据转换为MySQL API接口所能接收的格式,并写入目标平台。 #### 数据请求与清洗 首先,我们需要从源平台获取原始数据。假设我们已经完成了数据请求与清洗阶段,得到了以下字段: - `bfn_id`:明细ID - `order_no_new`:单号 - `datetime_new`:时间 - `tableField_lgm25d9j_numberField_lgm25d9r`:金额 这些字段将作为输入参数,在ETL过程中进行转换和映射。 #### 数据转换与写入 接下来,我们将这些原始数据进行转换,以符合目标平台MySQL API接口的要求。根据提供的元数据配置,具体操作如下: 1. **定义API接口参数** 根据元数据配置,我们需要构建一个POST请求,包含以下字段: ```json { "main_params": { "extend_processInstanceId": "{bfn_id}", "order_no_new": "{order_no_new}(FKTK)", "datetime_new": "{datetime_new}", "qty_count": "1", "sales_count": "{{tableField_lgm25d9j_numberField_lgm25d9r}}", "status": "", "Document_Type": "其他退款" } } ``` 2. **构建SQL语句** 通过元数据配置中的`main_sql`字段,我们可以看到需要执行的SQL插入语句: ```sql INSERT INTO `hc_dd_qttk` (`extend_processInstanceId`, `order_no_new`, `datetime_new`, `qty_count`, `sales_count`, `status`, `Document_Type`) VALUES (:extend_processInstanceId, :order_no_new, :datetime_new, :qty_count, :sales_count, :status, :Document_Type) ``` 3. **配置请求参数** 在轻易云数据集成平台中,我们需要配置请求参数以匹配上述SQL语句中的占位符。具体映射关系如下: - `extend_processInstanceId` 映射到 `{bfn_id}` - `order_no_new` 映射到 `{order_no_new}(FKTK)` - `datetime_new` 映射到 `{datetime_new}` - `qty_count` 固定值 `"1"` - `sales_count` 映射到 `{{tableField_lgm25d9j_numberField_lgm25d9r}}` - `status` 空值 - `Document_Type` 固定值 `"其他退款"` 4. **发送POST请求** 最后,通过轻易云的数据集成平台发送POST请求,执行上述SQL插入操作。具体实现如下: ```json { "api": "execute", "effect": "EXECUTE", "method": "POST", "idCheck": true, "request": [ { "field": "main_params", "label": "main_params", "type": "object", "describe": "111", "children": [ {"field":"extend_processInstanceId","label":"明细id","type":"string","value":"{bfn_id}"}, {"field":"order_no_new","label":"单号","type":"string","value":"{order_no_new}(FKTK)"}, {"field":"datetime_new","label":"时间","type":"date","value":"{datetime_new}"}, {"field":"qty_count","label":"数量","type":"string","value":"1"}, {"field":"sales_count","label":"金额","type":"string","value":"{{tableField_lgm25d9j_numberField_lgm25d9r}}"}, {"field":"status","label":"状态","type":"string"}, {"field":"Document_Type","label":"单据类型","type":"string","value":"其他退款"} ] } ], "otherRequest":[ { "field": "main_sql", "label": "main_sql", "type": "string", "describe": "111", "value": "INSERT INTO `hc_dd_qttk` (`extend_processInstanceId`,`order_no_new`,`datetime_new`,`qty_count`,`sales_count`,`status`,`Document_Type`) VALUES (:extend_processInstanceId,:order_no_new,:datetime_new,:qty_count,:sales_count,:status,:Document_Type)" } ] } ``` 通过以上步骤,我们成功地将源平台的数据进行了ETL转换,并通过MySQL API接口写入了目标平台。这一过程不仅确保了数据的一致性和完整性,还提高了系统间的数据交互效率。 ![企业微信与ERP系统接口开发配置](https://pic.qeasy.cloud/T18.png~tplv-syqr462i7n-qeasy.image)