从钉钉到BI崛起:数据转换与写入MySQL的具体实现

  • 轻易云集成顾问-彭萍
### 钉钉数据集成到MySQL案例分享:钉钉-万师傅充值单-->BI崛起-万师傅充值表 在企业信息化系统中,如何高效地实现跨平台的数据集成一直是一个极具挑战的话题。本文将围绕轻易云数据集成平台进行深入探讨,通过具体的技术案例——“钉钉-万师傅充值单-->BI崛起-万师傅充值表”的实施过程,展示如何将来自钉钉的数据集成到MySQL数据库中。 首先,我们要解决的是通过调用顶层API接口`topapi/processinstance/get`从钉钉获取指定工作流实例的数据。在这个过程中,需要特别注意处理分页和限流问题,以确保大批量数据能够被高效、稳定地抓取。同时,为了保证数据不漏单和任务的定期可靠执行,我们还设计了定时抓取机制,并实时监控其状态。 其次,将获得的原始数据转换为适合存储于MySQL中的格式是一项至关重要的任务。通过自定义的数据转换逻辑,我们不仅消除了两者之间的数据结构差异,还优化了数据映射,使其更符合业务需求。此外,在写入大量数据到MySQL过程中,使用支持高吞吐量的数据写入能力,可以大幅提升处理效率,这对于需要快速反应的大型业务场景尤为关键。 为了进一步提高系统稳定性和可维护性,本方案还包括对异常情况的处理与错误重试机制。当出现接口请求失败或其他不可预见错误时,该机制可以有效保障任务连续执行,避免因临时故障导致整个流程停滞。同时,通过集中监控和告警系统,实现实时追踪每个步骤的状态及性能,让运维人员能够迅速发现并解决潜在问题。 最后,为了确保所有操作都处于受控环境下,利用统一视图和控制台进行API资产管理,使得资源利用更加透明和优化。总之,通过合理运用上述特性,不仅成功完成了这次复杂而又紧迫的跨平台数据集成任务,也为未来类似项目提供了一套成熟、可靠的方法论。 ![企业微信与ERP系统接口开发配置](https://pic.qeasy.cloud/D20.png~tplv-syqr462i7n-qeasy.image) ### 使用轻易云数据集成平台调用钉钉接口topapi/processinstance/get获取并加工数据 在数据集成过程中,调用源系统的API接口是至关重要的一步。本文将深入探讨如何通过轻易云数据集成平台调用钉钉接口`topapi/processinstance/get`来获取并加工数据。 #### 接口配置与元数据解析 首先,我们需要理解元数据配置中的各个字段及其作用。以下是该接口的元数据配置: ```json { "api": "topapi/processinstance/get", "effect": "QUERY", "method": "POST", "number": "number", "id": "id", "idCheck": true, "request": [ { "field": "process_code", "label": "审批流的唯一码", "type": "string", "describe": "这里填写钉钉表单的id", "value": "PROC-9E080FCB-182E-43F6-BBE4-79CF1C1F669F" }, { "field": "start_time", "label": "审批实例开始时间。Unix时间戳,单位毫秒。", "type": "string", "describe": "Help", "value": "_function {LAST_SYNC_TIME}*1000" }, { "field": "end_time", "label": "审批实例结束时间,Unix时间戳,单位毫秒", "type": "string", "describe": "Help", "value": "_function {CURRENT_TIME}*1000" }, { "field": "size", "label": "分页参数,每页大小,最多传20。", "type": "string", "describe": "Help", "value": 20 }, { "field": “cursor”, “label”: “分页查询的游标,最开始传0,后续传返回参数中的next_cursor值。”, “type”: “string”, “describe”: “Help” } ], “autoFillResponse”: true } ``` #### 调用API接口 根据上述配置,我们可以看到该接口使用`POST`方法进行请求,并且需要提供多个参数,包括审批流的唯一码、开始时间、结束时间、分页大小和游标。 1. **process_code**: 审批流的唯一码,这是一个固定值,用于标识特定的审批流程。 2. **start_time**: 审批实例开始时间,这里使用了一个函数`_function {LAST_SYNC_TIME}*1000`来动态获取上次同步时间,并转换为毫秒。 3. **end_time**: 审批实例结束时间,同样使用了一个函数`_function {CURRENT_TIME}*1000`来获取当前时间,并转换为毫秒。 4. **size**: 分页参数,每页大小,这里设置为20。 5. **cursor**: 分页查询的游标,初始值为0,后续请求中将使用返回结果中的`next_cursor`。 #### 数据请求与清洗 在实际操作中,我们首先需要构建请求体,并发送HTTP POST请求到钉钉API。以下是一个示例代码片段: ```python import requests import time url = 'https://oapi.dingtalk.com/topapi/processinstance/get' headers = {'Content-Type': 'application/json'} data = { 'process_code': 'PROC-9E080FCB-182E-43F6-BBE4-79CF1C1F669F', 'start_time': int(time.time() - 86400) * 1000, # 假设上次同步时间为24小时前 'end_time': int(time.time()) * 1000, 'size': 20, 'cursor': 0 } response = requests.post(url, headers=headers, json=data) result = response.json() ``` 上述代码中,我们构建了请求体并发送请求。响应结果将包含审批实例的数据以及分页信息。 #### 数据转换与写入 获取到原始数据后,需要对其进行清洗和转换,以便写入目标系统。在轻易云平台中,可以通过内置的数据处理工具对数据进行清洗和转换。例如,将Unix时间戳转换为可读日期格式,将字段名称映射到目标系统所需的字段名称等。 以下是一个简单的数据清洗示例: ```python def clean_data(raw_data): cleaned_data = [] for item in raw_data: cleaned_item = { '审批编号': item['process_instance_id'], '申请人': item['originator_userid'], '申请时间': time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(item['create_time'] / 1000)), # 添加更多字段映射... } cleaned_data.append(cleaned_item) return cleaned_data cleaned_result = clean_data(result['process_instance_list']) ``` 经过清洗后的数据可以直接写入目标系统,如BI崛起平台中的万师傅充值表。 #### 自动填充响应 在元数据配置中,我们注意到有一个字段`autoFillResponse`设置为`true`。这意味着平台会自动处理响应结果并填充到相应的数据结构中,大大简化了开发工作量。 通过以上步骤,我们实现了从调用钉钉API获取原始数据,到清洗和转换,再到写入目标系统的完整流程。这不仅提高了工作效率,还确保了数据的一致性和准确性。 ![如何开发钉钉API接口](https://pic.qeasy.cloud/S20.png~tplv-syqr462i7n-qeasy.image) ### 使用轻易云数据集成平台进行ETL转换并写入MySQL API接口的技术案例 在数据集成的生命周期中,第二步是将已经集成的源平台数据进行ETL转换,并最终写入目标平台。在本案例中,我们将展示如何使用轻易云数据集成平台将钉钉-万师傅充值单的数据转换为MySQL API接口所能接收的格式,并写入目标数据库。 #### 数据请求与清洗 首先,我们需要从钉钉-万师傅充值单获取原始数据。这一步主要涉及到数据请求和初步清洗。假设我们已经完成了这一步,并且得到了结构化的数据。 #### 数据转换与写入 接下来,我们进入数据转换与写入阶段。我们需要将上述获取到的数据转换为目标平台 MySQL 所能接受的格式。以下是元数据配置的详细说明: ```json { "api": "execute", "effect": "EXECUTE", "method": "POST", "idCheck": true, "request": [ { "field": "main_params", "label": "main_params", "type": "object", "describe": "111", "children": [ {"field":"bfn_id","label":"id","type":"string","value":"{id}"}, {"field":"department","label":"申请部门","type":"string","value":"{{申请部门}}"}, {"field":"month","label":"所属月份","type":"string","value":"{{所属月份}}"}, {"field":"purpose_details","label":"用途内容-费用明细","type":"string","value":"{{用途内容-费用明细}}"}, {"field":"corresponding_subjects","label":"对应科目","type":"string","value":"{{对应科目}}"}, {"field":"recharge_account","label":"充值账户","type":"string","value":"{{充值账户}}"}, {"field":"expenses_explanation","label":"费用说明","type":"string","value":"{{费用说明}}"}, {"field":"recharge_amount","label":"充值金额(元)","type":"string","value":"{{充值金额(元)}}"}, {"field":"payment_method","label":"支付方式","type":"string","value":"{{支付方式}}"}, {"field":"other_payment_account","label":"其他收款账号","type":"string","value":"{{其他收款账号}}"}, {"field":"payee_name","label":"收款户名","type":"string","value":"{{收款户名}}"}, {"field":"payment_account","label":"收款账号","type":"","value""{{收款账号}}"}, {"field"::"payment_bank",:"label"::"收款银行",:"type"::"string",:"value"::"{{收款银行}}"}, {"field"::"financial_payment_method",:"label"::"财务支付方式",:"type"::"string",:"value"::"{{财务支付方式}}"}, {"field"::"actual_payment_amount",:"label"::"实付金额(元)",:"type"::"string",:"value"::"{{实付金额(元)}}"}, {"field"::"create_time",:"label"::审批发起时间,:"type""string",:"value""{{extend.create_time}}" }, {“ field”:“ finish_time”,“ label”:“审批结束时间”,“ type”:“ string”,“ value”:“ {{extend.finish_time }}”}, {“ field”:“ originator_userid”,“ label”:“发起人userid”,“ type”:“ string”,“ value”:“ {{extend.originator_userid }}”}, {“ field”:“ originator_dept_id”,“ label”:“发起人所属部门id”,“ type”:“ string”,“ value”:“ {{extend.originator_dept_id }}”}, {“ field”:“ status”,“ label”:“审批实例状态”,“ type”:“ string”,“ value”:“ {{extend.status }} ”}, {“ field”:“ result”,“ label”:“审批结果”,“ type”:“ string”,“ value”: “ {{extend.result }} ”}, {“ field”: “ business_id”, “ label”: “审批编号”, “ type”: “ string”, “ value”: “ {{extend.business_id }} ”}, {“ field”: “ originator_dept_name”, “ label”: “发起人所属部门名称”, “ type”: “ string”, “ value”: “ {{ extend.originator_dept_name }} ”}, {“ field”: “ biz_action”, “ label”: “审批实例业务动作”, “ type”: “ string”, “ value”: “ {{ extend.biz_action }} ” } ] } ], "otherRequest":[ { "field": "main_sql", "label": "main_sql", "type": "string", "describe": "111", "value": `REPLACE INTO master_wan_recharge ( bfn_id, department, month, purpose_details, corresponding_subjects, recharge_account, expenses_explanation, recharge_amount, payment_method, other_payment_account, payee_name, payment_account, payment_bank, financial_payment_method, actual_payment_amount, create_time, finish_time, originator_userid, originator_dept_id, status, result,business_id ,originator_dept_name ,biz_action) VALUES ( :bfn_id,:department,:month,:purpose_details,:corresponding_subjects,:recharge_account,:expenses_explanation,:recharge_amount,:payment_method,:other_payment_account,:payee_name,:payment_account,:payment_bank,:financial_payment_method,:actual_payment_amount,:create_time ,:finish_time ,:originator_userid ,:originator_dept_id ,:status ,:result ,:business_id ,:originator_dept_name ,:biz_action);` } ] } ``` #### 配置解析 1. **API调用配置**:配置中指定了API调用方法为`POST`,通过`execute`接口执行SQL语句。 2. **参数映射**:`request`部分定义了从源数据到目标数据库字段的映射关系。每个字段都有明确的标签和类型。 3. **SQL语句**:在`otherRequest`部分定义了具体的SQL语句,用于将转换后的数据插入到目标数据库表中。 #### 数据写入过程 1. **参数准备**:根据配置,将源数据中的各字段值提取出来,填充到相应的参数中。例如,`bfn_id`对应源数据中的`{id}`,`department`对应源数据中的`申请部门`。 2. **执行SQL**:利用准备好的参数,执行配置中的SQL语句,将数据插入到目标数据库表 `master_wan_recharge` 中。 通过上述步骤,我们实现了从钉钉-万师傅充值单到BI崛起-万师傅充值表的数据转换和写入。这一过程充分利用了轻易云数据集成平台提供的灵活配置和高效执行能力,实现了异构系统间的数据无缝对接。 ![金蝶与SCM系统接口开发配置](https://pic.qeasy.cloud/T17.png~tplv-syqr462i7n-qeasy.image)