ETL生命周期管理:从钉钉到MySQL的数据写入

  • 轻易云集成顾问-卢剑航
### 钉钉数据对接到MySQL——案例分享:dd-新薪金单(非工资)-->mysql(鸿巢付款单) 在现代企业的应用架构中,实现关键业务系统间的数据集成是提升运营效率的重要环节。本文将详细解析如何使用数据集成平台完成钉钉新薪金单及其他付款单信息到MySQL数据库的对接。 本次案例采用了钉钉API `v1.0/yida/processes/instances` 获取相关数据,并通过MySQL API `execute` 将其高效写入到数据库中。在具体操作过程中,主要涵盖以下技术要点: - **定时可靠抓取**:利用平台内置调度功能,定时调用钉钉接口获取最新的薪金单信息,确保数据抓取的及时性和完整性。 - **处理分页与限流**:由于API返回结果可能存在分页和请求频率限制,我们进行了合理的分页控制与限流管理,以保障接口调用稳定。 - **批量快速写入**:通过优化的数据传输机制,将获取到的大量薪金单数据进行清洗、转换后,高效批量写入至MySQL数据库,提高处理速度并减少资源占用。 - **自定义映射与转换**:针对双方系统数据格式差异,通过自定义规则灵活调整数据结构,包括字段映射、类型转换等,使得两端系统无缝对接。 此外,在整个流程中我们采用了集中监控和告警机制,对每一步操作实时跟踪,一旦出现异常情况及时告知运维人员。同时,为确保最终落地效果,体系内部还建立了详尽的数据质量监控和错误重试策略,从而保证整体解决方案的稳健运行。 ![打通金蝶云星空数据接口](https://pic.qeasy.cloud/D23.png~tplv-syqr462i7n-qeasy.image) ### 调用钉钉接口获取并加工数据的技术实现 在数据集成过程中,调用源系统接口获取数据是至关重要的一步。本文将详细探讨如何通过轻易云数据集成平台调用钉钉接口`v1.0/yida/processes/instances`获取并加工数据。 #### 接口调用配置 首先,我们需要配置API接口的请求参数和元数据。根据提供的元数据配置,我们可以看到以下关键参数: - `api`: `v1.0/yida/processes/instances` - `method`: `POST` - 请求参数包括分页信息、应用ID、用户ID、表单ID等。 以下是请求参数的详细配置: ```json { "pageNumber": "{PAGINATION_START_PAGE}", "pageSize": "{PAGINATION_PAGE_SIZE}", "appType": "APP_WTSCMZ1WOOHGIM5N28BQ", "systemToken": "IS866HB1DXJ8ODN3EXSVD750RBTK2X72R8MELL4", "userId": "16000443318138909", "language": "zh_CN", "formUuid": "FORM-0A966I81H10AX9NTBHCRX9JYRM0X2DS1D2MGLL", "searchFieldJson": { "selectField_lgn2qshb": "", "textField_lgn2qsh7": "" }, "originatorId": "", "createFromTimeGMT": "_function DATE_FORMAT(DATE_ADD(NOW(),INTERVAL - 25 DAY),'%Y-%m-%d 00:00:00')", "createToTimeGMT": "{{CURRENT_TIME|datetime}}", "modifiedFromTimeGMT": "", "modifiedToTimeGMT": "", "taskId": "", "instanceStatus": "COMPLETED", "approvedResult": "agree" } ``` #### 数据清洗与格式转换 在获取到原始数据后,需要对数据进行清洗和格式转换。根据元数据配置中的`formatResponse`字段,我们需要对特定字段进行重新命名和格式化: ```json [ {"old":"dateField_lgn2qsju","new":"datetime_new","format":"date"}, {"old":"serialNumberField_lgovkejx","new":"order_no_new","format":"string"} ] ``` 具体操作如下: 1. **日期字段转换**:将`dateField_lgn2qsju`字段转换为新的字段名`datetime_new`,并确保其格式为日期类型。 2. **字符串字段转换**:将`serialNumberField_lgovkejx`字段转换为新的字段名`order_no_new`,并确保其格式为字符串类型。 #### 数据过滤条件 为了确保获取的数据符合业务需求,我们需要设置过滤条件。根据元数据配置中的`condition`字段,我们需要满足以下条件: ```json [ {"field":"selectField_lgn2qshb","logic":"in","value":"工资发放(临时),活动经费,福利申请,其他"}, {"field":"dateField_lgn2qsju","logic":"notnull"} ] ``` 这意味着我们只会处理类型为“工资发放(临时)”、“活动经费”、“福利申请”或“其他”的记录,并且这些记录的日期字段不能为空。 #### 实际操作步骤 1. **构建请求**:根据上述请求参数构建API请求。 2. **发送请求**:使用POST方法调用钉钉接口,获取原始数据。 3. **解析响应**:解析API响应,提取所需的数据字段。 4. **清洗与转换**:根据配置对数据进行清洗和格式转换。 5. **过滤数据**:应用过滤条件,筛选出符合要求的数据记录。 以下是一个示例代码片段,用于演示上述步骤: ```python import requests import json from datetime import datetime, timedelta # 构建请求体 request_body = { # 请求参数... } # 调用API response = requests.post('https://api.dingtalk.com/v1.0/yida/processes/instances', json=request_body) data = response.json() # 数据清洗与转换 for record in data['records']: record['datetime_new'] = datetime.strptime(record['dateField_lgn2qsju'], '%Y-%m-%d %H:%M:%S') record['order_no_new'] = str(record['serialNumberField_lgovkejx']) # 数据过滤 filtered_data = [record for record in data['records'] if record['selectField_lgn2qshb'] in ['工资发放(临时)', '活动经费', '福利申请', '其他'] and record['dateField_lgn2qsju']] # 输出结果 print(json.dumps(filtered_data, indent=4, ensure_ascii=False)) ``` 通过以上步骤,我们可以高效地从钉钉系统中获取并处理所需的数据,为后续的数据写入和分析奠定基础。 ![如何对接用友BIP接口](https://pic.qeasy.cloud/S8.png~tplv-syqr462i7n-qeasy.image) ### 数据集成平台生命周期中的ETL转换:实现MySQL API接口数据写入 在轻易云数据集成平台的生命周期管理中,数据的ETL(提取、转换、加载)过程是关键环节之一。本文将深入探讨如何将已集成的源平台数据通过ETL转换为目标平台MySQL API接口所能接收的格式,并最终写入目标平台。 #### 元数据配置解析 首先,我们需要理解元数据配置,这是实现数据转换和写入的基础。以下是我们需要处理的元数据配置: ```json { "api": "execute", "effect": "EXECUTE", "method": "POST", "idCheck": true, "request": [ { "field": "main_params", "label": "main_params", "type": "object", "describe": "111", "children": [ {"field": "extend_processInstanceId", "label": "明细id", "type": "string", "value": "{{extend.processInstanceId}}"}, {"field": "order_no_new", "label": "单号", "type": "string", "value": "{order_no_new}(FKD)"}, {"field": "datetime_new", "label": "时间", "type": "date", "value": "{datetime_new}"}, {"field": "qty_count", "label": "数量", "type": "string", "value":"1"}, {"field":"sales_count","label":"金额","type":"string","value":"{numberField_lgn2qsi6}"}, {"field":"status","label":"状态","type":"string"}, {"field":"Document_Type","label":"单据类型","type":"string","value":"付款单"} ] } ], ... } ``` #### 数据请求与清洗 在ETL过程中,首先要从源平台提取数据并进行清洗。这一步骤确保我们获取的数据是准确且一致的。元数据中的`request`部分定义了需要提取的数据字段及其对应关系。例如: - `extend_processInstanceId` 映射到 `{{extend.processInstanceId}}` - `order_no_new` 映射到 `{order_no_new}(FKD)` - `datetime_new` 映射到 `{datetime_new}` - `sales_count` 映射到 `{numberField_lgn2qsi6}` 这些字段将从源平台的数据中提取,并进行必要的格式化和清洗。 #### 数据转换与写入 接下来,我们进入数据转换阶段,将清洗后的数据转化为目标平台MySQL API接口能够接受的格式。根据元数据配置,最终的数据结构如下: ```json { main_params: { extend_processInstanceId: "<实际值>", order_no_new: "<实际值>(FKD)", datetime_new: "<实际值>", qty_count: 1, sales_count: "<实际值>", status: "<实际值>", Document_Type: '付款单' } } ``` 在此过程中,我们需要特别注意字段类型和格式的一致性,例如日期格式、字符串长度等。 #### SQL语句生成与执行 最后一步是生成SQL语句并执行,将转换后的数据写入MySQL数据库。元数据中的`otherRequest`部分定义了具体的SQL插入语句: ```json { field: 'main_sql', label: 'main_sql', type: 'string', describe: '111', value: 'INSERT INTO `hc_dd_fkd`(`extend_processInstanceId`, `order_no_new`, `datetime_new`, `qty_count`, `sales_count`, `status`, `Document_Type`) VALUES (:extend_processInstanceId, :order_no_new, :datetime_new, :qty_count, :sales_count, :status, :Document_Type)' } ``` 通过将上一步中生成的数据结构映射到SQL语句中的占位符,我们可以得到具体的插入操作。例如: ```sql INSERT INTO `hc_dd_fkd` (`extend_processInstanceId`, `order_no_new`, `datetime_new`, `qty_count`, `sales_count`, `status`, `Document_Type`) VALUES ('12345', 'ORD12345(FKD)', '2023-10-01', '1', '1000.00', 'completed', '付款单'); ``` 执行该SQL语句,即可将处理后的数据成功写入目标MySQL数据库。 #### 技术要点总结 1. **元数据配置解析**:理解并正确解析元数据配置是实现ETL过程的基础。 2. **数据请求与清洗**:确保从源平台提取的数据准确无误,并进行必要的清洗。 3. **数据转换**:将清洗后的数据转化为目标平台能够接受的格式,注意字段类型和格式的一致性。 4. **SQL语句生成与执行**:通过映射生成具体的SQL插入操作,实现最终的数据写入。 通过以上步骤,我们可以高效地完成从源平台到目标MySQL数据库的数据集成过程,实现不同系统间的数据无缝对接。 ![金蝶与WMS系统接口开发配置](https://pic.qeasy.cloud/T30.png~tplv-syqr462i7n-qeasy.image)