数据转换与写入:高效实现钉钉数据至MySQL

  • 轻易云集成顾问-陈洁琳
### 钉钉数据集成到MySQL:对账系统--货品价格(产品调价) 在企业信息化系统中,数据的高效传递和实时处理是关键环节。本文将分享通过轻易云数据集成平台实现钉钉与MySQL的数据对接案例,该方案聚焦于针对"对账系统--货品价格(产品调价)"的具体实施。 为确保任务顺利进行,我们需要应对以下技术挑战: 1. **从钉钉接口定时采集数据**: - 利用`v1.0/yida/processes/instances` API,从钉钉获取最新的货品价格及其变动记录。 2. **大规模数据写入 MySQL**: - 通过 `execute` API,实现大量数据快速、高效地批量写入 MySQL 数据库。 3. **解决分页与限流问题**: - 因为API请求次数受限,需要合理设计分页逻辑和限流策略,以稳定和连续地抓取所有必要的数据避免遗漏。 4. **自定义转换逻辑适应业务需求**: - 在传输过程中,对来自钉钉的数据进行必要转换,使其符合MySQL数据库结构要求,包括字段映射及格式调整等操作。 5. **监控与告警机制保障任务执行**: - 配置集中监控和实时告警功能,随时跟踪各个API调用过程中的状态变化、性能指标以及潜在异常情况,实现高透明度管理。 6. **异常处理及重试机制的实现**: - 当API调用或数据库操作遇到错误时,通过完善的重试机制及时恢复正常流程,减少人工干预。 7. **保证整体解决方案简洁直观,可视化管理整个流程进程,提高操作效率并降低维护难度。** 上述方案不仅囊括了对接过程中的核心技术点,还辅以实际运作细节提升可靠性。在下一步内容中,我们将详细探讨每一项技术要点如何具象落地,并分享一些实操经验技巧。如如何优化API接口性能、加强网络安全通信,以及处理各种突发状况下保持服务可用性等实际问题。 ![钉钉与WMS系统接口开发配置](https://pic.qeasy.cloud/D19.png~tplv-syqr462i7n-qeasy.image) ### 调用钉钉接口获取并加工数据 在数据集成的生命周期中,调用源系统接口是至关重要的一步。本文将深入探讨如何使用轻易云数据集成平台调用钉钉接口`v1.0/yida/processes/instances`,并对获取的数据进行初步加工。 #### 接口配置与请求参数 首先,我们需要配置API接口及其请求参数。以下是元数据配置中的关键部分: ```json { "api": "v1.0/yida/processes/instances", "method": "POST", "number": "title", "id": "processInstanceId", "pagination": {"pageSize": 50}, "idCheck": true, "request": [ {"field": "pageSize", "label": "分页大小", "type": "string", "describe": "分页大小", "value": "50"}, {"field": "pageNumber", "label": "分页页码", "type": "string", "describe": "分页页码", "value": "1"}, {"field": "appType", "label": "应用ID", "type": "string", "describe": "应用ID", "value":"APP_UYN987QNZ82Q4QK409VT"}, {"field":"systemToken","label":"应用秘钥","type":"string","describe":"应用秘钥", "value":"DR766X813F8925E1F57YN8U6ZQFR26RQKCJFL04"}, {"field":"userId","label":"用户的userid","type":"string","describe":"用户的userid", "value":"16000443318138909"}, {"field":"language","label":"语言","type":"string","describe":"语言,取值:zh_CN:中文(默认值)en_US:英文"}, {"field":"formUuid","label":"表单ID","type":"string","describe":"表单ID", "value":"FORM-5Q966D91RDWAYU08B9LR84QB6FHN3I9Q9ZTHL5"}, {"field":"searchFieldJson","label":"条件","type":"object","children":[ {"field":"selectField_llkks8u6","label":"调整类型","type":"string", "value":"产品调价(产品中心)"}, {"parent":"searchFieldJson","label":"品牌","field":"selectField_lmlugdfx", "type":"string"}, {"parent":"searchFieldJson","label":"三级分类","field": "selectField_lnfs8m04","type": "string"}]}, {"field":"createFromTimeGMT","label": "创建时间起始值","type": "string", "describe": "创建时间起始值", "value": "2024-03-20 00:00:00"}, {"field": "createToTimeGMT", "label": "创建时间终止值", "type": "string", "describe": "创建时间终止值", "value": "{{CURRENT_TIME|datetime}}"}, {"field": "modifiedFromTimeGMT", "label": "修改时间起始值", "type": "string", "describe": "修改时间起始值"}, {"field": "modifiedToTimeGMT", "label": "修改时间终止值", "type": "string", "describe": 修改时间终止值}, {"field": taskId, label: 任务ID, type: string, describe: 任务ID}, {"field": instanceStatus, label: 实例状态, type: string, describe: 实例状态, value: COMPLETED}, {"field": approvedResult, label: 流程审批结果, type: string, describe: 流程审批结果, value: agree} ] } ``` #### 请求参数解析 1. **基本参数**: - `pageSize` 和 `pageNumber` 用于控制分页。 - `appType` 和 `systemToken` 是用于身份验证的关键参数。 - `userId` 表示请求发起者的用户ID。 2. **语言设置**: - `language` 参数可以设置为中文(zh_CN)或英文(en_US),默认为中文。 3. **表单和查询条件**: - `formUuid` 指定了要查询的表单ID。 - `searchFieldJson` 包含了具体的查询条件,如调整类型、品牌和三级分类。 4. **时间范围**: - `createFromTimeGMT` 和 `createToTimeGMT` 用于指定创建时间范围。 - `modifiedFromTimeGMT` 和 `modifiedToTimeGMT` 用于指定修改时间范围。 5. **其他过滤条件**: - `taskId`, `instanceStatus`, 和 `approvedResult` 用于进一步过滤实例状态和审批结果。 #### 数据请求与清洗 在发送请求后,我们会得到一个包含多个实例数据的响应。接下来需要对这些数据进行清洗和初步加工,以便后续处理。以下是一个简单的数据清洗示例: ```python import requests import json # 配置API URL和请求头 url = 'https://api.dingtalk.com/v1.0/yida/processes/instances' headers = { 'Content-Type': 'application/json', } # 构建请求体 payload = { 'pageSize': '50', 'pageNumber': '1', 'appType': 'APP_UYN987QNZ82Q4QK409VT', 'systemToken': 'DR766X813F8925E1F57YN8U6ZQFR26RQKCJFL04', 'userId': '16000443318138909', 'language': 'zh_CN', 'formUuid': 'FORM-5Q966D91RDWAYU08B9LR84QB6FHN3I9Q9ZTHL5', 'searchFieldJson': { 'selectField_llkks8u6': '产品调价(产品中心)' # 可以添加更多条件 }, 'createFromTimeGMT': '2024-03-20 00:00:00', 'createToTimeGMT': '{{CURRENT_TIME|datetime}}', } # 发起POST请求 response = requests.post(url, headers=headers, data=json.dumps(payload)) # 检查响应状态码 if response.status_code == 200: data = response.json() # 数据清洗示例:提取所需字段并转换格式 cleaned_data = [] for instance in data['data']: cleaned_instance = { 'title': instance['title'], 'processInstanceId': instance['processInstanceId'], # 添加更多需要提取的字段 } cleaned_data.append(cleaned_instance) else: print(f'Error: {response.status_code}') ``` #### 小结 通过上述步骤,我们成功调用了钉钉接口并对返回的数据进行了初步清洗。这一步骤为后续的数据转换与写入奠定了基础。在实际操作中,可以根据业务需求进一步优化和扩展数据处理逻辑,以确保数据集成过程高效且准确。 ![如何开发钉钉API接口](https://pic.qeasy.cloud/S14.png~tplv-syqr462i7n-qeasy.image) ### 数据转换与写入目标平台的技术实现 在数据集成生命周期的第二步中,我们需要将已经集成的源平台数据进行ETL转换,转为目标平台 MySQL API 接口所能够接收的格式,并最终写入目标平台。以下是具体的技术实现过程及相关细节。 #### 1. 数据请求与清洗 首先,我们需要从源平台获取原始数据,并对其进行必要的清洗和预处理。这一步骤确保数据的准确性和一致性,为后续的数据转换打下基础。 ```json { "api": "execute", "method": "POST", "idCheck": true, "request": [ { "field": "main_params", "label": "主参数", "type": "object", "children": [ {"field": "change_type", "label": "变更类型", "type": "string", "value": "B"}, {"field": "brand", "label": "品牌", "type": "string", "value": "{selectField_lmlugdfx}"}, {"field": "supplier_code", "label": "供应商编码", "type": "string", "value": "_findCollection find textField_ln2uyh3e from 68e141c6-4351-3f2f-b9a2-5eaee8f01a55 where textField_lfjcloll={{tableField_lnedzxul_textField_lnedzxum}}" }, {"field": "goods_code", "label": "货品编码", "type": "string", "value": "{{tableField_lnedzxul_textField_lnedzxum}}" }, {"field": "goods_name", "label": "货品名称", "type": "string", "value":"{{tableField_lnedzxul_selectField_lnedzxun}}" }, {"field":"price","label":"实际结算价格","type":"float","value":"{{tableField_lnedzxul_numberField_lnoes31u}}","default":"abc"}, {"field":"distrib_price","label":"分销价格","type":"float","value":"{{tableField_lnedzxul_numberField_lneeit5z}}"}, {"field":"min_price","label":"最低售价","type":"float","value":"{{tableField_lnedzxul_numberField_lneeit62}}"}, {"field":"class_a","label":"一级分类","type":"string","value":"厨卫成品"}, {"field":"class_b","label":"二级分类","type":"string","value":"{selectField_lmlugdg3}"}, {"field":"class_c","label":"三级分类","type":"string","value":"_findCollection find selectField_lfjclolt from 68e141c6-4351-3f2f-b9a2-5eaee8f01a55 where textField_lfjcloll={{tableField_lnedzxul_textField_lnedzxum}}"}, {"field":"source_Id","label":"系统来源","type":"int","value":"5"}, {"field":"effective_time", "label":"生效日期", "type" :"datetime", `value`:"_function FROM_UNIXTIME( ( {dateField_lothx6po} \/ 1000 ) ,'%Y-%m-%d' )" }, {"field" :"create_time", `label`:"创建日期", `type`:"datetime", `value`:"_function DATE_FORMAT('{gmtCreate}','%Y-%m-%d 00:00:00')" }, {`field`:"create_by", `label`:"创建人", `type`:"int", `value`:"1" }, {`field`:"status", `label`:"状态", `type`:"int" }, {`field`:"approve_status", `label`:"审核状态", `type`:"int" }, {`field`:"brand_coefficient`, `label`:``品牌系数`, `type`:``float`, ``value`:``_mongoQuery 68e141c6-4351-3f2f-b9a2-5eaee8f01a55 findField=content.numberField_lfjclona where={\"content.textField_lfjcloll\":{\"$eq\":\"{{tableField_lhykr1el_textField_lhykr1en}}\"}} } ] } ] } ``` #### 2. 数据转换 在数据清洗完成后,下一步是将数据转换为目标平台 MySQL API 接口能够接收的格式。我们需要根据元数据配置中的字段定义,将各个字段映射到相应的数据库表字段中。 ```json { `"otherRequest":[ { `"field"`:`"main_sql"`, `"label"`:`"主语句"`, `"type"`:`"string"`, `"value"`:`INSERT INTO \``lhhy_srm\``.`goods_price`\n(\n\``change_type`\`,\n\``brand`\`,\n\``supplier_code`\`,\n\``goods_code`\`,\n\``goods_name`\`,\n\``price`\`,\n\``distrib_price`\`,\n\``min_price`\`,\n\``brand_coefficient`\`,\n\``class_a`\`,\n\``class_b`\`,\n\``class_c`\`,\n\``source_Id`\`, \n \ ``effective_time`\`,`create_time`,`create_by`,`status`,`approve_status`) VALUES (\n<{change_type: }>,<{brand: }>,<{supplier_code: }>,<{goods_code: }>,<{goods_name: }>,<{price: }>,<{distrib_price: }>,<{min_price: }>,<{brand_coefficient: }>,<{class_a: }>,<{class_b: }>,<{class_c: }>,<{source_Id: }> , <{effective_time}> , <{create_time}> , <{create_by}> , <{status}> , <{approve_status}>);` } ], `"buildModel"`:`true` } ``` #### 3. 数据写入 最后一步是将转换后的数据通过 MySQL API 接口写入目标数据库。在这里,我们使用 SQL 插入语句将数据插入到指定的表中。 ```sql INSERT INTO `lhhy_srm`.`goods_price` ( `change_type`, `brand`, `supplier_code`, `goods_code`, `goods_name`, `price`, `distrib_price`, `min_price`, `brand_coefficient`, `class_a`, `class_b`, `class_c`, `source_Id`, `effective_time`, `create_time`, `create_by`, `status`, `approve_status` ) VALUES ( 'B', '{selectField_lmlugdfx}', '_findCollection find textField_ln2uyh3e from 68e141c6-4351-3f2f-b9a2-5eaee8f01a55 where textField_lfjcloll={{tableField_lnedzxul_textField_lnedzxum}}', '{{tableField_lnedzxul_textField_lnedzxum}}', '{{tableField_lnedzxul_selectField_lnedzxun}}', '{{tableField_lnedzxul_numberField_lnoes31u}}', '{{tableField_lnedzxul_numberField_lneeit5z}}', '{{table_field.lned.zx.ul.number.Field.lnee.it62}}', '_mongoQuery 68e141c6-4351-3f2f-b9a2-5eaee8f01a55 find.Field=content.number.Field.lfj.clona where={\"content.text.Field.lfj.cloll\":{\"$eq\":\"{{table.Field.lhy.kr1.el.text.Field.lhy.kr1.en}\"}', '厨卫成品', '{select.Field.lml.ugd.g3}', '_find.Collection find select.Field.lfj.clolt from 68e141c6-4351-3f2f-b9a2-5eaee8.f01.a55 where text.Field.lfj.cloll={{table.Field.lne.dz.xu.l.text.Field.lne.dz.xu.m}}', '5', '_function FROM_UNIXTIME( ( {date.Field.lo.thx.6.po} /1000 ) ,'%Y-%m-%d' )', '_function DATE_FORMAT('{gmt.Create}','%Y-%m-%d 00:00:00')', '1', '1' ); ``` ![钉钉与CRM系统接口开发配置](https://pic.qeasy.cloud/T15.png~tplv-syqr462i7n-qeasy.image)