轻易云平台ETL实现:从数据提取到MySQL写入

  • 轻易云集成顾问-林峰
### 钉钉数据集成到MySQL:付款申请单到BI崛起的技术解读 在跨系统数据流动愈加频繁的背景下,实现高效的数据集成已成为企业核心竞争力之一。本文将深入探讨如何通过轻易云数据集成平台,将钉钉中的付款申请单数据无缝对接到MySQL数据库,便于后续BI系统进行分析和处理。 #### 案例场景介绍 我们所要解决的问题是如何将“钉钉-付款申请单”中的实时业务信息准确、及时地同步至“BI崛起-付款申请表”中。这一过程需要借助顶级API接口`topapi/processinstance/get`从钉钉获取数据,并使用MySQL写入API `execute`实现高效的数据存储和管理。在实际操作过程中,需要克服分页限流、异常处理以及不同数据格式之间的差异等诸多挑战。 #### 核心技术方案概述 - **定时可靠的抓取与分页处理**: 使用定时任务调度器,每隔固定时间段调用`topapi/processinstance/get`接口,并妥善处理分页,以确保所有待同步的数据均被有效采集。 - **批量快速写入机制**: 利用平台提供的大吞吐量写入能力,将大量获取自钉钉的数据批量插入至MySQL。通过优化批次大小与事务管理,提升整体性能并保证一致性。 - **统一监控与告警体系**: 实时跟踪各个环节的状态,通过集中监控工具识别异常波动,第一时间触发告警机制,从而提高问题发现和响应速度。 - **自定义转换逻辑及映射对接**: 根据业务需求,自定义编排转换逻辑,对原始JSON格式进行解析和重组,使之符合目标表结构要求。同时,在写入前完成必要字段映射与清洗工作,以确保最终数据质量达标。 这些关键步骤不仅保障了整个流程贯穿始终的一致性,还显著提升了操作透明度,为企业内外部用户提供了高度可见的数据追踪路径。 ![如何对接钉钉API接口](https://pic.qeasy.cloud/D22.png~tplv-syqr462i7n-qeasy.image) ### 调用钉钉接口topapi/processinstance/get获取并加工数据 在数据集成的生命周期中,调用源系统接口是至关重要的一步。本文将深入探讨如何使用轻易云数据集成平台调用钉钉接口`topapi/processinstance/get`,并对获取的数据进行加工处理。 #### 接口概述 钉钉接口`topapi/processinstance/get`主要用于查询审批实例的详细信息。通过该接口,我们可以获取付款申请单的相关数据,并将其集成到目标系统中。在配置元数据时,需要特别注意请求参数和返回数据的处理。 #### 元数据配置解析 以下是元数据配置的详细解析: ```json { "api": "topapi/processinstance/get", "effect": "QUERY", "method": "POST", "number": "number", "id": "id", "request": [ { "field": "process_code", "label": "审批流的唯一码", "type": "string", "describe": "这里填写钉钉表单的id", "value": "PROC-6D095BC9-1E1B-4306-95EF-854971B52272" }, { "field": "start_time", "label": "审批实例开始时间。Unix时间戳,单位毫秒。", "type": "string", "describe": "Help", "value": "_function {LAST_SYNC_TIME}*1000" }, { "field": "end_time", { ... ``` 1. **API路径**:`"api":"topapi/processinstance/get"`,指定了调用的具体API。 2. **请求方法**:`"method":"POST"`,使用POST方法进行请求。 3. **请求参数**: - `process_code`:审批流的唯一码,用于标识具体的审批流程。 - `start_time`和`end_time`:分别表示审批实例的开始和结束时间,使用Unix时间戳(毫秒)。 - `size`:分页参数,每页大小,最多传20。 - `cursor`:分页查询的游标,初始值为0,后续传递返回参数中的next_cursor值。 #### 数据请求与清洗 在实际操作中,我们需要按照上述元数据配置向钉钉接口发送请求,并对返回的数据进行清洗和处理。以下是一个示例代码片段,用于展示如何实现这一过程: ```python import requests import time # 定义请求参数 params = { 'process_code': 'PROC-6D095BC9-1E1B-4306-95EF-854971B52272', 'start_time': int(time.time() - 86400) * 1000, # 假设同步过去24小时的数据 'end_time': int(time.time()) * 1000, 'size': 20, 'cursor': 0 } # 发起POST请求 response = requests.post('https://oapi.dingtalk.com/topapi/processinstance/get', json=params) # 检查响应状态码 if response.status_code == 200: data = response.json() # 对返回的数据进行清洗和处理 instances = data.get('result', {}).get('list', []) for instance in instances: # 提取并处理每个审批实例的数据 process_instance_id = instance.get('process_instance_id') status = instance.get('status') create_time = instance.get('create_time') # 根据业务需求进行进一步处理... else: print(f"Error: {response.status_code}, {response.text}") ``` #### 数据转换与写入 在获取并清洗了源系统的数据后,需要将其转换为目标系统所需的格式,并写入目标系统。在轻易云平台上,这一步通常通过可视化界面完成,但也可以通过编写自定义脚本实现。 例如,将清洗后的数据写入BI崛起平台,可以使用以下伪代码: ```python for instance in instances: transformed_data = transform(instance) # 自定义转换函数 # 将转换后的数据写入目标系统 bi_rise_api.write(transformed_data) ``` #### 自动填充与扁平化处理 在元数据配置中,还提到了自动填充响应和扁平化处理。自动填充响应(autoFillResponse)意味着平台会自动将API响应中的字段映射到目标字段,而扁平化处理(beatFlat)则是将嵌套结构的数据展开为平铺结构,以便于后续处理。 总结以上内容,通过合理配置元数据并调用钉钉接口,我们可以高效地获取并加工付款申请单的数据,为后续的数据集成奠定坚实基础。这一过程不仅提高了业务透明度,也极大提升了数据处理效率。 ![数据集成平台可视化配置API接口](https://pic.qeasy.cloud/S20.png~tplv-syqr462i7n-qeasy.image) ### 使用轻易云数据集成平台进行ETL转换并写入MySQL 在数据集成生命周期的第二步中,我们需要将已经集成的源平台数据进行ETL(提取、转换、加载)转换,转为目标平台 MySQL API 接口所能够接收的格式,并最终写入目标平台。本文将详细探讨如何通过轻易云数据集成平台实现这一过程。 #### 数据请求与清洗 首先,我们从源系统(例如钉钉的付款申请单)提取数据。在这个阶段,我们需要确保数据的完整性和准确性,清洗不必要的数据,并将其转换为适合进一步处理的格式。 #### 数据转换与写入 接下来是数据转换与写入阶段,这是整个ETL过程的核心部分。我们将使用元数据配置来定义如何将源数据映射到目标数据库表中。以下是具体步骤和技术细节: 1. **定义API接口请求** 我们需要配置一个API接口请求,用于将处理后的数据发送到目标平台。根据提供的元数据配置,API接口如下: ```json { "api": "execute", "effect": "EXECUTE", "method": "POST", "idCheck": true, "request": [ { "field": "main_params", "label": "main_params", "type": "object", "describe": "111", "children": [ {"field": "bfn_id", "label": "id", "type": "string", "value": "{bfn_id}"}, {"field": "department", "label": "所属部门", "type": "string", "value": "{{所属部门}}"}, {"field": "purpose", "label": "用途", "type": "string", "value": "{{用途}}"}, {"field": "estimated_payment_date", "label": "预计付款日期", ... ] } ], ... } ``` 2. **字段映射** 在元数据配置中,我们定义了每个字段如何从源系统映射到目标系统。例如: - `bfn_id` 映射到 `id` - `department` 映射到 `所属部门` - `purpose` 映射到 `用途` 这些映射关系确保了我们从源系统提取的数据能够正确地转换为目标系统所需的格式。 3. **SQL语句生成** 我们使用SQL语句将处理后的数据插入到MySQL数据库中。根据元数据配置,生成的SQL语句如下: ```sql REPLACE INTO payment_application_info ( bfn_id, department, purpose, estimated_payment_date, total_amount, payment_method, payee_name, payee_account, payee_bank, actual_payment_amount, details_of_purpose, other_expenses, amount, corresponding_subjects, create_time, finish_time, originator_userid, originator_dept_id, status, result, business_id, originator_dept_name, biz_action ) VALUES ( :bfn_id,:department,:purpose,:estimated_payment_date,:total_amount, :payment_method,:payee_name,:payee_account,:payee_bank,:actual_payment_amount, :details_of_purpose,:other_expenses,:amount,:corresponding_subjects,:create_time, :finish_time,:originator_userid,:originator_dept_id,:status,:result, :business_id,:originator_dept_name,:biz_action ); ``` 4. **执行API请求** 最后,我们通过POST方法执行API请求,将转换后的数据发送到MySQL数据库。确保每个字段都正确映射并且值已填充。 #### 实际应用案例 假设我们有一条来自钉钉的付款申请单记录,其字段值如下: - `bfn_id`: 12345 - `department`: 财务部 - `purpose`: 办公用品采购 - `estimated_payment_date`: 2023-10-01 - `total_amount`: 1000.00 - ... 在轻易云平台上配置好上述元数据后,系统会自动将这些字段值按照预定义的映射关系进行转换,并生成相应的SQL插入语句。执行后,这条记录会被成功写入MySQL数据库中的`payment_application_info`表。 通过这种方式,我们可以高效地实现不同系统间的数据无缝对接,确保每个环节的数据都能够准确传递和存储。这不仅提升了业务透明度和效率,还大大简化了复杂的数据集成过程。 ![打通用友BIP数据接口](https://pic.qeasy.cloud/T1.png~tplv-syqr462i7n-qeasy.image)