ETL过程详解:使用轻易云实现数据转换和MySQL写入

  • 轻易云集成顾问-蔡威
### 钉钉数据集成到MySQL:品类即时贡献报表集成方案-推广费写入 在复杂多变的业务环境中,实现高效、准确的数据对接已成为企业追求的目标。本案例探讨了如何通过轻易云数据集成平台,将钉钉系统中的数据无缝整合到MySQL数据库中,具体聚焦于“品类即时贡献报表集成方案-推广费写入”。 本次解决方案主要依赖于以下几个关键技术点: 1. **API接口调用及分页处理** 为确保完整获取所需数据,我们利用钉钉提供的API `v1.0/yida/forms/instances/ids/{appType}/{formUuid}`反复查询,并妥善处理分页问题,避免由于单次请求量过大而导致的数据丢失或异常。 2. **高吞吐量与批量数据写入** 数据量较大的场景下,为提高性能和效率,我们采取了批量写入策略,通过`execute` API将大量来自钉钉的数据快速、安全地导入MySQL。此过程中充分考虑到了MySQL对事务并发性和锁机制的管理,以减少资源占用和冲突。 3. **实时监控与告警系统** 采用集中化监控工具,对整个数据流动进行实时跟踪和状态分析。万一发生异常情况,如网络延迟或接口限流问题,系统能够及时告警并启动自动重试机制,从而保证任务可靠完成。 4. **自定义数据转换逻辑及格式映射** 针对业务需求进行定制化配置,使得从钉钉抽取的数据能被准确转为符合MySQL数据库结构要求的信息。这包括字段匹配、类型校验以及必要的预处理操作,以确保最终存储的数据质量可靠且具备一致性。 5. **异构系统之间的日志记录与错误处理** 在跨平台数据交互过程中,精细化的日志记录至关重要。每个步骤都有详细文档支持,不仅便于后期溯源,还简化了排错流程,同时结合错误重试机制,大幅提升了整体稳定性。 以上技术要点不仅有效满足了特定业务需求,也展示出通过合理运用现代技术手段,可以显著提升各环节间协同效应。在下文我们将深入探讨具体实现方法,包括代码示例与实践经验分享,请继续关注。 ![用友与MES系统接口开发配置](https://pic.qeasy.cloud/D8.png~tplv-syqr462i7n-qeasy.image) ### 调用钉钉接口获取并加工数据的技术案例 在数据集成过程中,调用源系统接口是关键的一步。本文将详细探讨如何通过轻易云数据集成平台调用钉钉接口`v1.0/yida/forms/instances/ids/{appType}/{formUuid}`获取并加工数据,以实现品类即时贡献报表的推广费写入。 #### 接口调用与请求配置 首先,我们需要配置API请求参数。根据提供的元数据配置,以下是主要参数及其描述: - `appType`: 应用编码,值为`APP_BNJNRVQ32174RSX3MROF`。 - `formUuid`: 表单ID,值为`FORM-99D9CFA106C3427F838829938C81D5AAADQ7`。 - `pageNumber`: 分页页码,值为`1`。 - `pageSize`: 分页大小,值为`50`。 - `modifiedToTimeGMT`: 修改时间终止值。 - `systemToken`: 应用秘钥,值为`KYC664C1WR9LODIIAI09I913S0HO2G3YGREWL43`。 - `modifiedFromTimeGMT`: 修改时间起始值。 - `language`: 语言,默认值为中文(`zh_CN`)。 - `searchFieldJson`: 根据表单内组件值查询。 - `userId`: 用户userid,值为`16000443318138909`。 - `originatorId`: 根据流程发起人工号查询。 - `createToTimeGMT`: 创建时间终止值,动态获取当前时间(使用模板变量`{{CURRENT_TIME|datetime}}`)。 - `createFromTimeGMT`: 创建时间起始值,动态获取上次同步时间(使用模板变量`{{LAST_SYNC_TIME|datetime}}`)。 #### 请求示例 根据上述参数,我们构建POST请求体如下: ```json { "appType": "APP_BNJNRVQ32174RSX3MROF", "formUuid": "FORM-99D9CFA106C3427F838829938C81D5AAADQ7", "pageNumber": "1", "pageSize": "50", "modifiedToTimeGMT": "", "systemToken": "KYC664C1WR9LODIIAI09I913S0HO2G3YGREWL43", "modifiedFromTimeGMT": "", "language": "zh_CN", "searchFieldJson": "", "userId": "16000443318138909", "originatorId": "", "createToTimeGMT": "{{CURRENT_TIME|datetime}}", "createFromTimeGMT": "{{LAST_SYNC_TIME|datetime}}" } ``` #### 数据处理与清洗 在成功获取数据后,需要对原始数据进行清洗和转换,以便后续的数据写入操作。以下是常见的数据处理步骤: 1. **字段映射**:将API返回的数据字段映射到目标系统所需的字段。例如,将钉钉返回的表单实例ID映射到目标系统中的记录ID。 2. **数据过滤**:根据业务需求过滤不必要的数据。例如,只保留特定状态或特定时间段内的数据。 3. **格式转换**:将日期、金额等字段转换为目标系统所需的格式。例如,将日期格式从UTC转换为本地时区。 4. **数据校验**:对关键字段进行校验,如检查ID是否重复、金额是否合理等。 #### 实践案例 假设我们从钉钉接口获取了以下原始数据: ```json { "instances": [ { "id": "12345", "title": "推广费报销", "amount": 1000, "createdTime": "2023-10-01T08:00:00Z" }, { "id": "12346", "title": "市场活动费用", "amount": 2000, "createdTime": "2023-10-02T08:00:00Z" } ] } ``` 我们需要将这些数据转换并写入目标系统。首先进行字段映射和格式转换: ```json [ { "record_id": "12345", "description": "推广费报销", "expense_amount": 1000, "submission_date": "2023-10-01T16:00:00+08:00" // 转换为北京时间 }, { "record_id": "12346", "description": "市场活动费用", "expense_amount": 2000, "submission_date": "2023-10-02T16:00:00+08:00" // 转换为北京时间 } ] ``` 接下来,对转换后的数据进行校验和过滤。例如,确保每条记录的金额大于零,并且记录ID不重复。 #### 总结 通过轻易云数据集成平台调用钉钉接口并对获取的数据进行清洗和转换,可以高效地实现不同系统间的数据无缝对接。在实际操作中,需要根据具体业务需求灵活调整请求参数和处理逻辑,以确保数据准确性和一致性。 ![企业微信与ERP系统接口开发配置](https://pic.qeasy.cloud/S2.png~tplv-syqr462i7n-qeasy.image) ### 使用轻易云数据集成平台进行ETL转换并写入MySQL API接口 在数据集成生命周期的第二步中,我们需要将已经集成的源平台数据进行ETL转换,最终写入目标平台MySQL。本文将深入探讨如何使用轻易云数据集成平台的元数据配置,将源数据转换为MySQL API接口能够接收的格式,并完成数据写入。 #### 元数据配置解析 元数据配置是整个ETL过程的核心,通过定义字段、类型和转换规则,确保源数据能够正确地映射到目标数据库表中。以下是我们将要使用的元数据配置: ```json { "api": "execute", "effect": "EXECUTE", "method": "SQL", "number": "id", "id": "id", "name": "id", "idCheck": true, "request": [ { "field": "main_params", "label": "主参数", "type": "object", "describe": "对应主语句内的动态参数", "children": [ {"field": "form_instance_id", "label": "源id(实例id)", "type": "string", "value":"{id}"}, {"field": "platform", "label": "平台", "type": "string", "value":"{selectField_lx4mftgl}"}, {"field": "date", "label": "日期", "type": "timestamp", "value":"_function FROM_UNIXTIME( ( {dateField_lx4mftg2} / 1000 ), '%Y-%m-%d' )"}, {"field": "shop_name", ... } ] } ], ... } ``` #### 数据清洗与转换 在这个阶段,我们需要根据元数据配置对源数据进行清洗和转换。以下是几个关键字段的处理方式: 1. **日期字段转换**: ```json {"field":"date","label":"日期","type":"timestamp","value":"_function FROM_UNIXTIME( ( {dateField_lx4mftg2} / 1000 ),'%Y-%m-%d' )"} ``` 将Unix时间戳转换为标准日期格式。 2. **推广费用类型**: ```json {"field":"type","label":"推广费用类型(1-站内推广费,2-站外推广费)","type":"int","value":"_function IF( (STRCMP('{textField_lx4mftgo}','type17')), 1,2)"} ``` 根据条件判断,设置不同的推广费用类型。 3. **创建时间和更新时间**: ```json {"field":"create_time","label":"创建时间","type":"timestamp","value":"_function DATE_FORMAT('{gmtCreate}','%Y-%m-%d %H:%i:%s')","default":"{{CURRENT_TIME|datetime}}"} ``` 使用当前时间作为默认值,并将其格式化为标准时间格式。 #### SQL语句生成 根据元数据配置,我们生成了插入MySQL数据库的SQL语句: ```sql INSERT INTO `lehua`.`promotion_expenses` (`form_instance_id`, `platform`, `date`, `shop_name`, `shop_code`, `type`, `type_id`, `type_name`, `category`, `expense`, `create_time`, `create_by`, `create_user_id`, `modify_time`, `modify_by`, `modify_user_id`, `connect_id`, `instation_expense`, `outstation_expense`) VALUES (<{form_instance_id: }>, <{platform: }>, <{date: }>, <{shop_name: }>, <{shop_code: }>, <{type: }>, <{type_id: }>, <{type_name: }>, <{category: }>, <{expense: }>, <{create_time: CURRENT_TIMESTAMP}>, <{create_by: }>, <{create_user_id: }>, <{modify_time: }>, <{modify_by: }>, <{modify_user_id: }>, <{connect_id: }>, <{instation_expense: 0.0000}>, <{outstation_expense: 0.0000}>); ``` #### 数据写入 最后一步是执行生成的SQL语句,将清洗和转换后的数据写入MySQL数据库。通过API接口调用`execute`方法,实现SQL语句执行: ```json { ... { field:"main_sql", label:"主语句", type:"string", describe:"SQL首次执行的语句,将会返回:lastInsertId", value:"INSERT INTO ... VALUES ...;" } } ``` 通过上述步骤,我们成功地将源平台的数据经过ETL处理后,写入到了目标平台MySQL数据库中。这一过程不仅保证了数据的一致性和完整性,还提高了系统间的数据交互效率。 ![打通用友BIP数据接口](https://pic.qeasy.cloud/T17.png~tplv-syqr462i7n-qeasy.image)