数据ETL转换与钉钉集成实现自动化工作流

  • 轻易云集成顾问-曹润
### 金蝶云星空数据集成到钉钉:传给金蝶后,回传钉钉提示(已审核)④ 在系统对接与集成的过程中,实现高效、稳定的数据流转是每一个技术架构师的重要目标。本文将分享一个具体的案例,即如何使用轻易云数据集成平台,将金蝶云星空中的财务数据无缝集成到企业日常工具——钉钉,并实现当数据被成功处理和审核后,通过消息提醒相关人员。 本次方案命名为 "传给金蝶后,回传钉钉提示(已审核)④",通过这一方案,我们主要解决以下几个技术要点: 1. **高效获取金蝶云星空数据**: 使用`executeBillQuery` API从金蝶云星空中定时拉取最新的财务单据信息。在此过程中,要特别注意接口调用时可能遇到的分页处理和限流问题,确保不漏单且能及时抓取所有需要的数据。 2. **针对特定业务需求自定义转换逻辑**: 为了符合钉钉API所需的数据格式,对获取到的原始数据进行必要的转换和映射。该步骤利用轻易云的平台特性,通过可视化的数据流设计工具,自定义转换逻辑,使得整个流程更加直观及便于管理。 3. **大规模、高吞吐量的数据写入能力**: 将经过加工后的数据信息快速写入至钉钉,用以触发审批或发送通知。采用的是`topapi/process/instance/comment/add` API,该API支持批量写入操作,有助于提升整体效率。 4. **实时监控与告警机制**: 实现统一视图下对整个任务状态和性能的集中监控。当出现异常情况时,例如网络抖动导致的数据提取失败,可以第一时间自动重试并记录日志,以防丢失重要信息。这不仅提高了系统鲁棒性,也保证了业务连续性。 5. **差异化处理与错误重试机制实现**: 对于两套系统之间存在的数据格式差异,通过灵活调整字段映射关系来实现兼容。同时,在实际环境中不可避免地会遇到各种错误类型,因此加入了完善的异常处理与错误重试机制,使得任何故障都可以即时得到修复,从而保证最终用户体验不会受到影响。 6. **消息反馈功能完美结合工作流**: 当关键财务单据在金蝶云星空内完成审查流程后,系统会自动生成并发送一条包含“已审核”状态的信息至指定的员工账号,这一步骤极大减少了人工通知环节,提高 ![金蝶与外部系统打通接口](https://pic.qeasy.cloud/D31.png~tplv-syqr462i7n-qeasy.image) ### 调用金蝶云星空接口executeBillQuery获取并加工数据 在数据集成的生命周期中,调用源系统接口是至关重要的一步。本文将详细探讨如何通过轻易云数据集成平台调用金蝶云星空的`executeBillQuery`接口来获取并加工数据。 #### 接口配置与调用 首先,我们需要配置元数据以便正确调用金蝶云星空的`executeBillQuery`接口。以下是元数据配置的关键部分: ```json { "api": "executeBillQuery", "method": "POST", "number": "FBillNo", "id": "FPAYBILLENTRY_FEntryID", "pagination": { "pageSize": 500 }, "idCheck": true, "condition": [ [{"field":"FPAYORGID","logic":"eq","value":"7.01"}], [{"field":"FPAYORGID","logic":"eq","value":"7.08"}], [{"field":"FPAYORGID","logic":"eq","value":"7.03"}], [{"field":"FPAYORGID","logic":"eq","value":"7.02"}] ], ... } ``` #### 请求参数设置 在请求参数中,我们需要特别注意以下几个字段: - `FormId`: 表单ID,指定为`AP_PAYBILL`。 - `FieldKeys`: 查询字段集合,通过解析器将数组转换为字符串。 - `FilterString`: 用于过滤查询结果的条件,例如:`FApproveDate>='{{LAST_SYNC_TIME|dateTime}}' and FDOCUMENTSTATUS='C'`。 以下是完整的请求参数示例: ```json { "FormId": "AP_PAYBILL", "FieldKeys": [ "FPAYBILLENTRY_FEntryID", "FID", "FBillNo", ... ], "FilterString": "FApproveDate>='{{LAST_SYNC_TIME|dateTime}}' and FDOCUMENTSTATUS='C'", ... } ``` #### 数据请求与清洗 在发送请求后,金蝶云星空会返回符合条件的数据。我们需要对这些数据进行清洗和转换,以便后续处理。以下是一个简单的数据清洗示例: ```python def clean_data(raw_data): cleaned_data = [] for entry in raw_data: cleaned_entry = { '单据编号': entry['FBillNo'], '创建日期': entry['FCreateDate'], '审核日期': entry['FApproveDate'], '付款组织': entry['FPAYORGID.FNumber'], ... } cleaned_data.append(cleaned_entry) return cleaned_data ``` #### 数据转换与写入 清洗后的数据需要进一步转换,并写入目标系统。在这个过程中,我们可以利用轻易云平台提供的可视化操作界面,设置数据映射和转换规则。例如,将金蝶云星空中的字段映射到目标系统中的相应字段。 ```json { "source_field": "FBillNo", "target_field": "bill_number" }, { "source_field": "FCreateDate", "target_field": "creation_date" }, ... ``` #### 异常处理与监控 在整个过程中,异常处理和实时监控是确保数据集成成功的重要环节。轻易云平台提供了丰富的日志和监控功能,可以帮助我们及时发现和解决问题。例如,当接口调用失败时,可以自动重试或发送告警通知。 ```python try: response = call_executeBillQuery_api() if response.status_code != 200: raise Exception("API调用失败") except Exception as e: log_error(e) send_alert("API调用失败", e) ``` 通过以上步骤,我们可以高效地调用金蝶云星空接口获取并加工数据,为后续的数据处理打下坚实基础。 ![电商OMS与ERP系统接口开发配置](https://pic.qeasy.cloud/S11.png~tplv-syqr462i7n-qeasy.image) ### 数据集成生命周期第二步:ETL转换与写入钉钉API接口 在数据集成生命周期的第二步,我们将已经集成的源平台数据进行ETL(提取、转换、加载)转换,使其符合目标平台——钉钉API接口所能够接收的格式,并最终写入目标平台。以下是详细的技术实现过程。 #### 1. 数据提取与清洗 首先,我们需要从源系统中提取相关数据。在本案例中,假设我们已经通过轻易云平台完成了数据请求与清洗阶段。接下来,我们将重点放在如何将这些数据转换为钉钉API接口所需的格式。 #### 2. 数据转换 根据元数据配置,我们需要将提取的数据转化为适合钉钉API接口`topapi/process/instance/comment/add`的请求格式。以下是该API接口的元数据配置: ```json { "api": "topapi/process/instance/comment/add", "method": "POST", "idCheck": true, "request": [ { "field": "request", "label": "请求对象", "type": "object", "children": [ { "field": "process_instance_id", "label": "审批实例ID", "type": "string", "describe": "可通过调用获取审批实例ID列表接口获取。", "value": "_findCollection find id from ed67b3fa-ebfd-3a8a-9ecb-2e1730f0a9e1 where business_id={FBillNo}", "parent": "request" }, { "field": "text", "label": "评论的内容", "type": "string", "value": "已审核", "parent": "request" }, { "field": "comment_userid", "label": "评论人的userid", "type": "string", "value": 1120181204-651141925, } ] } ] } ``` #### 3. 构建请求对象 我们需要构建一个符合上述配置的请求对象。假设我们已经从金蝶系统中获取了业务单据编号`FBillNo`,并通过查询获取了对应的审批实例ID。 ```python # 示例Python代码 import requests # 假设从金蝶系统中获取到的数据 FBillNo = '123456' process_instance_id = 'ed67b3fa-ebfd-3a8a-9ecb-2e1730f0a9e1' # 示例审批实例ID # 构建请求对象 request_data = { 'request': { 'process_instance_id': process_instance_id, 'text': '已审核', 'comment_userid': '1120181204-651141925' } } # 转换为JSON格式 import json json_request_data = json.dumps(request_data) ``` #### 4. 调用钉钉API接口 使用Python中的`requests`库发送POST请求,将构建好的JSON数据发送到钉钉API接口。 ```python # 钉钉API URL url = 'https://oapi.dingtalk.com/topapi/process/instance/comment/add' # 发起POST请求 response = requests.post(url, data=json_request_data, headers={'Content-Type': 'application/json'}) # 检查响应状态 if response.status_code == 200: print('成功写入钉钉') else: print('写入失败:', response.text) ``` #### 5. 数据写入与监控 在完成上述步骤后,数据将被成功写入到钉钉平台。轻易云平台提供了实时监控功能,可以帮助我们跟踪每个环节的数据流动和处理状态,确保整个过程透明可控。 通过以上步骤,我们实现了从源平台到目标平台的数据ETL转换,并成功调用了钉钉API接口,将处理结果写入目标平台。这种方式不仅提高了数据处理效率,还确保了各个系统之间的数据一致性和完整性。 ![数据集成平台可视化配置API接口](https://pic.qeasy.cloud/T8.png~tplv-syqr462i7n-qeasy.image)