ETL转换实现及写入MySQL的最佳实践

  • 轻易云集成顾问-吕修远
### 钉钉数据集成到MySQL:实战案例分享 在复杂的企业业务运营过程中,如何有效地进行系统对接与数据集成是一个技术性挑战。本次我们将分享一个实际运行的方案——`dd-新报销单(实报实销)-->mysql(鸿巢)费用报销☆`,通过轻易云数据集成平台实现从钉钉获取新的费用报销单并将其写入MySQL数据库。 #### 数据抓取与处理 首先,通过调用钉钉API `v1.0/yida/processes/instances` 获取最新的报销单实例。这个接口支持大批量高吞吐量的数据查询,使得我们能够在短时间内抓取大量的业务数据。此外,轻易云提供了可靠的定时任务功能,可以周期性地抓取和更新这些数据信息,从而确保每个新创建或修改过的数据都被及时同步。 ```python # 示例:调用钉钉API获取数据 import requests endpoint = "https://oapi.dingtalk.com/v1.0/yida/processes/instances" headers = { "Content-Type": "application/json", "Authorization": "Bearer YOUR_ACCESS_TOKEN" } response = requests.get(endpoint, headers=headers) data = response.json() ``` #### 数据转换与质量监控 由于源端(钉钉)和目标端(MySQL)的数据结构存在差异,需要借助自定义的数据转换逻辑来调整字段格式及内容。例如,将JSON格式的数据解析为适合关系型数据库存储的数据表形式。同时,为了保证数据质量,在整个流程中引入异常检测机制,对于不符合预期标准的数据实时报警,并进行处理。 ```python # 示例:数据转换逻辑 def transform_data(raw_data): transformed_data = [] for item in raw_data: record = { 'expense_id': item['instanceId'], 'amount': float(item['formComponentValues']['fieldName']), # ... 其他字段映射 ... } transformed_data.append(record) return transformed_data ``` ##### 批量写入与性能优化 为了提升整体效率,我们采用批量操作方式,将处理好的记录集中插入到MySQL中。利用`execute` API可以一次性执行多条INSERT语句,大大减少网络延迟带来的影响。同时,还需考虑分页问题以及限流策略,以避免因为请求频率过高导致接口返回错误或超时。 ```sql -- 示例:使用batch insert提高写入速度 INSERT INTO expense_reports (expense_id, amount, ...) VALUES ('12345', 100.50), ('67890', 200.75), ... ; ``` #### 实时监控与异常重试机制 整个 ![电商OMS与ERP系统接口开发配置](https://pic.qeasy.cloud/D39.png~tplv-syqr462i7n-qeasy.image) ### 调用钉钉接口获取并加工数据的技术实现 在数据集成生命周期的第一步,我们需要调用钉钉接口`v1.0/yida/processes/instances`来获取源系统的数据,并对其进行初步加工。本文将详细探讨如何通过轻易云数据集成平台配置元数据,完成这一过程。 #### API调用配置 首先,我们需要配置API调用的基本信息。根据提供的元数据配置,API调用采用POST方法,具体配置如下: ```json { "api": "v1.0/yida/processes/instances", "method": "POST" } ``` #### 请求参数设置 为了确保能够正确地分页获取数据,我们需要设置分页参数。以下是分页参数的配置: ```json { "field": "pageNumber", "label": "分页页码", "type": "string", "describe": "分页页码", "value": "{PAGINATION_START_PAGE}" }, { "field": "pageSize", "label": "分页大小", "type": "string", "describe": "分页大小", "value": "{PAGINATION_PAGE_SIZE}" } ``` 此外,为了确保接口调用的安全性和准确性,还需要设置应用ID、应用秘钥、用户ID等参数: ```json { "field": "appType", "label": "应用ID", "type": "string", "describe": "应用ID", "value": "APP_WTSCMZ1WOOHGIM5N28BQ" }, { "field": "systemToken", "label": "应用秘钥", "type": "string", "describe": "应用秘钥", "value": "IS866HB1DXJ8ODN3EXSVD750RBTK2X72R8MELL4" }, { "field": "userId", "label": "用户的userid", "type": "string", "describe": "用户的userid", ``"value":"16000443318138909" } ``` #### 数据过滤与查询条件 为了精确获取所需的数据,我们可以通过设置查询条件来过滤数据。例如,我们可以根据表单内组件值进行查询: ```json { { { { { { { { { { { { { { { { { { { { { { {"field":"radioField_lgk9jn2v","label":"类型","type":"string","value":"实报实销"}, {"parent":"searchFieldJson","label":"报销金额","field":"numberField_lgk9jn4n","type":"string","value":[0.001]} } } } } } } } } } } } } } } } } }, {"field":"originatorId","label":"根据流程发起人工号查询","type":"string","describe":"根据流程发起人工号查询"} }, {"field":"createFromTimeGMT","label":"创建时间起始值","type":"string","describe":"创建时间起始值","value":"_function DATE_FORMAT(DATE_ADD(NOW(),INTERVAL -25 DAY),'%Y-%m-%d00:00:00')"}, {"field":"createToTimeGMT","label":"创建时间终止值","type":"string","describe":"创建时间终止值","value":"{{CURRENT_TIME|datetime}}"} }, {"field":"modifiedFromTimeGMT","label":"修改时间起始值","type":"string","describe":"修改时间起始值"} }, {"field":"modifiedToTimeGMT","label":"修改时间终止值","type":"string","describe":"修改时间终止值"} }, {"field":"taskId","label":"任务ID","type":"string","describe":"任务ID"} }, {"field":"instanceStatus","label":"实例状态","type":"string","describe":"","实例状态"value:"COMPLETED"}, {"field:"approvedResult,""label:"流程审批结果,""type:""string,""describe:"流程审批结果,""value:"agree"} } ``` #### 数据格式转换 在获取到原始数据后,需要对部分字段进行格式转换。例如,将日期字段`dateField_lgkieplu`转换为新的日期字段`datetime_new`,将字符串字段`serialNumberField_lgk9jn2s`转换为新的字符串字段`order_no_new`: ```json [ {"old":""dateField_lgkieplu,"new:"datetime_new,"format:"date"}, {"old:"serialNumberField_lgk9jn2s,"new:"order_no_new,"format:"string"} ] ``` #### 数据校验与处理 在数据处理过程中,需要确保某些关键字段不为空,例如日期字段: ```json [ [ {"field:dateField_lgkieplu,"logic:notnull} ] ] ``` 通过上述配置和处理步骤,我们可以高效地调用钉钉接口获取所需的数据,并对其进行初步加工,为后续的数据转换与写入奠定基础。这一过程充分利用了轻易云数据集成平台提供的元数据配置功能,实现了高效、透明的数据集成。 ![用友与外部系统接口集成开发](https://pic.qeasy.cloud/S15.png~tplv-syqr462i7n-qeasy.image) ### 数据集成生命周期第二步:ETL转换与写入MySQLAPI接口 在数据集成的过程中,ETL(Extract, Transform, Load)转换是至关重要的一环。本文将深入探讨如何将已经集成的源平台数据进行ETL转换,转为目标平台MySQLAPI接口所能够接收的格式,并最终写入目标平台。 #### 元数据配置解析 在配置元数据时,我们需要特别关注以下几个关键字段和步骤: 1. **API调用方法**:`POST` 2. **API路径**:`execute` 3. **请求体结构**: - `main_params`:包含具体的数据字段 - `otherRequest`:包含SQL语句 以下是元数据配置的详细内容: ```json { "api": "execute", "effect": "EXECUTE", "method": "POST", "idCheck": true, "request": [ { "field": "main_params", "label": "main_params", "type": "object", "describe": "111", "children": [ {"field": "extend_processInstanceId", "label": "明细id", "type": "string", "value":"{bfn_id}"}, {"field": "order_no_new", "label": "单号", "type": "string", "value":"{order_no_new}(FYSQ)"}, {"field": "datetime_new", "label": "时间", "type":"date", "value":"{datetime_new}"}, {"field":"qty_count","label":"数量","type":"string","value":"1"}, {"field":"sales_count","label":"金额","type":"string","value":"{{tableField_lgk9jn4p_numberField_lgk9jn43}}"}, {"field":"status","label":"状态","type":"string"}, {"field":"Document_Type","label":"单据类型","type":"string","value":"费用报销单"} ] } ], "otherRequest":[ { "field": ![金蝶与MES系统接口开发配置](https://pic.qeasy.cloud/T9.png~tplv-syqr462i7n-qeasy.image)