从源平台到钉钉:实现数据集成与ETL转换的全流程解析

  • 轻易云集成顾问-杨嫦
### 金蝶云星空与钉钉的数据集成案例分享:传给金蝶后,回传钉钉提示(已审核) 在企业级应用中,实现不同系统间的数据对接往往是一个复杂而又至关重要的过程。本次案例将介绍如何通过轻易云数据集成平台,将金蝶云星空的数据无缝对接到钉钉,并实现“传给金蝶后,回传钉钉提示(已审核)”这一业务流程。 首先,在进行数据集成时,我们需要确保获取到的是最新且准确的数据。为此,我们调用了金蝶云星空的`executeBillQuery`接口,以定时可靠地抓取其系统中的实时数据。在处理大量订单信息时,为了避免漏单和提高效率,我们使用了分页技术并采取了一系列措施应对API限流问题。 为了确保从金蝶云星空提取出的数据能够被快速写入到钉钉,我们采用批量提交方式,同时进行了必要的格式转换,以适配两者之间可能存在的数据格式差异。具体来说,通过解析每个字段并映射关系,从而做到数据的一一对应。同时,对于异常情况,例如处理过程中出现网络波动或者接口返回错误码等问题,我们设计了完善的错误重试机制,保证整个流程的稳定性和高可用性。 此外,在成功完成上述步骤之后,为进一步提升透明度和实时监控能力,通过在轻易云平台高级功能支持下,即时生成详细日志记录,每一步操作都有据可查。这不仅便于问题排查,更能让管理人员随时掌握当前任务进展状态。 最后,当所有前置任务都顺利执行完毕后,通过调用钉丁API `topapi/process/instance/comment/add` 实现消息通知提醒,从而让相关负责人第一时间知晓相应业务信息已经完成审核,有效提升项目协作效率。 ![金蝶与MES系统接口开发配置](https://pic.qeasy.cloud/D37.png~tplv-syqr462i7n-qeasy.image) ### 调用金蝶云星空接口executeBillQuery获取并加工数据 在轻易云数据集成平台的生命周期管理中,调用源系统接口是数据处理的第一步。本文将深入探讨如何通过调用金蝶云星空的`executeBillQuery`接口来获取并加工数据。 #### 接口配置与调用 首先,我们需要配置元数据以便正确调用金蝶云星空的`executeBillQuery`接口。以下是关键的元数据配置: ```json { "api": "executeBillQuery", "method": "POST", "number": "FBillNo", "id": "FPAYBILLENTRY_FEntryID", "pagination": { "pageSize": 500 }, "idCheck": true, "condition": [ [{"field":"FPAYORGID","logic":"eq","value":"10.01"}], [{"field":"FPAYORGID","logic":"eq","value":"10.04"}] ], "request": [ {"field":"FPAYBILLENTRY_FEntryID","label":"FPAYBILLENTRY_FEntryID","type":"string","value":"FPAYBILLENTRY_FEntryID"}, {"field":"FID","label":"实体主键","type":"string","value":"FID"}, {"field":"FBillNo","label":"单据编号","type":"string","value":"FBillNo"}, {"field":"FDOCUMENTSTATUS","label":"单据状态","type":"string","value":"FDOCUMENTSTATUS"}, {"field":"FCreatorId","label":"创建人","type":"string","value":"FCreatorId"}, {"field":"FAPPROVERID","label":"审核人","type":"string","value":"FAPPROVERID"}, {"field":"FCreateDate","label":"创建日期","type":"string","value":"FCreateDate"}, {"field":"FSETTLEORGID","label":"结算组织","type":"string","value":"FSETTLEORGID.FNumber"}, {"field":"FApproveDate","label":"审核日期","type":"string","value":"FApproveDate"}, {"field":...} ], "otherRequest": [ {"field": "Limit", "label": "最大行数", "type": "string", "describe": "金蝶的查询分页参数", "value": "{PAGINATION_PAGE_SIZE}"}, {"field": "StartRow", "label": "开始行索引", "type": "string", "describe": "金蝶的查询分页参数", "value": "{PAGINATION_START_ROW}"}, {"field":...} ] } ``` #### 数据请求与清洗 在进行数据请求时,我们需要确保请求参数的正确性和完整性。以下是一个示例请求体: ```json { "FormId": "AP_PAYBILL", "FieldKeys": [ ... // 列出所有需要查询的字段 ], "FilterString": "(FPAYORGID = '10.01' OR FPAYORGID = '10.04') AND FApproveDate >= '2023-01-01' AND FDOCUMENTSTATUS = 'C'", ... } ``` 通过上述配置,我们可以确保从金蝶云星空系统中获取到所需的数据。 #### 数据转换与写入 获取到原始数据后,需要对其进行清洗和转换,以便后续处理和写入目标系统。以下是一些常见的数据清洗操作: 1. **字段映射**:将原始字段映射到目标系统所需的字段。例如,将`FBillNo`映射为目标系统中的`InvoiceNumber`。 2. **数据格式转换**:将日期格式从`YYYY-MM-DD`转换为目标系统所需的格式。 3. **数据过滤**:根据业务需求过滤掉不必要的数据。例如,只保留审核状态为“已审核”的记录。 #### 实际案例 假设我们需要从金蝶云星空中获取所有付款组织为“10.01”或“10.04”,且审核日期在2023年1月1日之后,且单据状态为“已审核”的付款单。我们可以通过以下步骤实现: 1. **配置请求参数**: ```json { ... "FilterString": "(FPAYORGID = '10.01' OR FPAYORGID = '10.04') AND FApproveDate >= '2023-01-01' AND FDOCUMENTSTATUS = 'C'", ... } ``` 2. **发送请求并获取响应**: ```python import requests url = "<金蝶云星空API地址>" headers = { 'Content-Type': 'application/json' } payload = { ... # 填充上述配置中的请求体内容 } response = requests.post(url, json=payload, headers=headers) data = response.json() ``` 3. **处理响应数据**: ```python processed_data = [] for record in data['Result']: processed_record = { 'InvoiceNumber': record['FBillNo'], 'ApprovalDate': record['FApproveDate'], ... } processed_data.append(processed_record) # 将处理后的数据写入目标系统或存储 ``` 通过以上步骤,我们可以高效地从金蝶云星空中获取并加工所需的数据,为后续的数据集成和业务处理打下坚实基础。 ![金蝶与外部系统打通接口](https://pic.qeasy.cloud/S29.png~tplv-syqr462i7n-qeasy.image) ### 数据集成与ETL转换:从源平台到钉钉API接口 在数据集成的生命周期中,第二步是将已经集成的源平台数据进行ETL(Extract, Transform, Load)转换,使其符合目标平台的格式要求。在本案例中,我们将探讨如何将数据转换为钉钉API接口所能接收的格式,并最终写入目标平台。 #### API接口元数据配置解析 在本案例中,我们需要使用钉钉的API接口`topapi/process/instance/comment/add`来添加审批实例的评论。该接口采用POST请求方式,具体的元数据配置如下: ```json { "api": "topapi/process/instance/comment/add", "method": "POST", "idCheck": true, "request": [ { "field": "request", "label": "请求对象", "type": "object", "children": [ { "field": "process_instance_id", "label": "审批实例ID", "type": "string", "describe": "可通过调用获取审批实例ID列表接口获取。", "value": "_findCollection find id from 97cce894-15a8-3c23-a1ee-0c50bbf587cf where business_id={FBillNo}", "parent": "request" }, { "field": "text", "label": "评论的内容", "type": "string", "value": "已审核", "parent": "request" }, { "field": "comment_userid", "label": "评论人的userid", "type": "string", "value": "112018120420563028", "parent": "request" } ] } ] } ``` #### 数据提取与清洗 首先,我们需要从源平台提取相关数据。假设我们从金蝶系统中获取了业务单据编号`FBillNo`,接下来我们要根据这个编号找到对应的审批实例ID。这一步可以通过调用轻易云的数据查询功能实现: ```sql _findCollection find id from 97cce894-15a8-3c23-a1ee-0c50bbf587cf where business_id={FBillNo} ``` 这个查询语句将返回与业务单据编号匹配的审批实例ID。 #### 数据转换 接下来,我们需要将提取到的数据进行转换,使其符合钉钉API接口要求。根据元数据配置,我们需要构建如下格式的请求对象: ```json { request: { process_instance_id: "<审批实例ID>", text: "<评论内容>", comment_userid: "<评论人的userid>" } } ``` 具体值替换如下: - `process_instance_id`: 从前一步查询结果中获取。 - `text`: 固定值"已审核"。 - `comment_userid`: 固定值"112018120420563028"。 #### 数据加载 最后一步是将转换后的数据通过HTTP POST请求发送到钉钉API接口。以下是一个示例代码片段,展示如何使用Python发送这个请求: ```python import requests import json url = 'https://oapi.dingtalk.com/topapi/process/instance/comment/add' headers = {'Content-Type': 'application/json'} data = { 'request': { 'process_instance_id': '从查询结果中获取', 'text': '已审核', 'comment_userid': '112018120420563028' } } response = requests.post(url, headers=headers, data=json.dumps(data)) if response.status_code == 200: print("评论成功添加") else: print(f"请求失败,状态码:{response.status_code}") ``` 在这个示例中,我们使用Python的`requests`库发送POST请求,并将构建好的JSON对象作为请求体传递给钉钉API。 #### 实时监控与错误处理 在实际操作过程中,实时监控和错误处理同样重要。轻易云平台提供了实时监控功能,可以帮助我们及时发现并解决数据处理过程中的问题。例如,如果API调用失败,可以通过日志和监控界面快速定位问题所在,并进行相应调整。 通过以上步骤,我们实现了从源平台到目标平台的数据ETL转换,并成功调用了钉钉API接口,将处理结果写入目标平台。这不仅提高了数据处理效率,也确保了业务流程的透明和可追溯性。 ![钉钉与CRM系统接口开发配置](https://pic.qeasy.cloud/T28.png~tplv-syqr462i7n-qeasy.image)