轻易云平台实现金蝶云的ETL转换与数据写入

  • 轻易云集成顾问-孙传友
### 钉钉数据集成到金蝶云星空案例分享:传给钉钉后,回传金蝶字段② 在企业数字化运营的过程中,通过高效的数据集成平台实现不同系统间的数据无缝对接,是提升业务效率的一大法宝。本文将聚焦一个典型的系统对接案例,即如何将钉钉的数据高效、安全地集成并回传到金蝶云星空。 该方案主要包括三大步骤:首先,通过调用钉钉API接口`topapi/processinstance/get`定时抓取所需业务数据,并确保处理过程中不漏单;其次,将获取的大量数据快速写入至金蝶云星空,运用其批量保存API `batchSave`实现数据落地;最后,针对两者之间存在的数据格式差异进行定制化映射和转换,同时实时监控整个处理过程以应对可能出现的分页与限流问题。 #### 如何确保集成钉钉数据不漏单 为了避免遗漏关键业务信息,我们设计了灵活可靠的定时任务机制,通过轻松配置,对接上特定时间点内生成的新记录执行抓取。借助于轻易云提供的平台能力,可以做到精准控制执行频率与同步范围,从而保证每次运行都能够获取最新且完整的数据。这为后续批量写入金蝶云奠定了坚实基础。 #### 大量数据快速写入到金蝶云星空 面对从钉钉端抓取来的庞大数据信息,需要有效、快速地导入至金蝶云。在这里,我们使用了金蝶官方提供的批量保存API `batchSave`,通过并发策略及缓存技术,大幅度提升了写入性能。同时,对于可能发生的异常,在重试机制下自动纠错及补全,有力保障了整体流程稳定性和一致性。 以上只是本次集成项目解决方案中的两个关键步骤。下一部分我们将深入探讨如何处理接口分页与限流挑战,以及自定义映射规则来满足具体业务需求等方面内容。 ![电商OMS与ERP系统接口开发配置](https://pic.qeasy.cloud/D7.png~tplv-syqr462i7n-qeasy.image) ### 调用钉钉接口topapi/processinstance/get获取并加工数据 在数据集成的生命周期中,调用源系统接口是至关重要的一步。本文将深入探讨如何通过轻易云数据集成平台调用钉钉接口`topapi/processinstance/get`,并对获取的数据进行加工处理。 #### 接口调用配置 首先,我们需要配置元数据以便正确调用钉钉接口。以下是具体的元数据配置: ```json { "api": "topapi/processinstance/get", "method": "POST", "number": "id", "id": "id", "idCheck": true, "request": [ { "label": "审批流的唯一码", "field": "process_code", "type": "string", "value": "PROC-B1959981-2CB9-42E9-A054-A975492A5DBB" }, { "label": "审批实例开始时间。Unix时间戳,单位毫秒。", "field": "start_time", "type": "string", "value": "_function ({LAST_SYNC_TIME}-(3600 * 24))* 1000" }, { "label": "审批实例结束时间,Unix时间戳,单位毫秒", "field": "end_time", "type": "string", "value": "{CURRENT_TIME}000" }, { "label": "分页参数,每页大小,最多传20。", "field": "size", "type": "string", "value": "20" }, { "label": "分页查询的游标,最开始传0,后续传返回参数中的next_cursor值。", "field": "cursor", "type": "string" } ] } ``` #### 参数解析与配置 1. **审批流的唯一码 (`process_code`)**: - 固定值 `PROC-B1959981-2CB9-42E9-A054-A975492A5DBB`。 - 用于标识特定的审批流程。 2. **审批实例开始时间 (`start_time`)**: - 使用函数 `_function ({LAST_SYNC_TIME}-(3600 * 24))* 1000` 动态计算。 - 表示从上次同步时间减去一天(以毫秒为单位)。 3. **审批实例结束时间 (`end_time`)**: - 使用 `{CURRENT_TIME}000` 动态获取当前时间,并转换为毫秒。 4. **分页参数 (`size`)**: - 固定值 `20`。 - 每页返回最多20条记录。 5. **分页查询的游标 (`cursor`)**: - 初始值为 `0`,后续请求中使用返回参数中的 `next_cursor` 值。 #### 数据请求与清洗 在完成元数据配置后,我们可以发起POST请求来获取审批实例的数据。以下是请求示例: ```json { "_method_":"POST", "_url_":"https://oapi.dingtalk.com/topapi/processinstance/get", "_data_":{ "_headers_":{ "_content-type_":"application/json" }, "_body_":{ "_process_code_":"PROC-B1959981-2CB9-42E9-A054-A975492A5DBB", "_start_time_":"{计算后的开始时间}", "_end_time_":"{计算后的结束时间}", "_size_":"20", "_cursor_":"0" } } } ``` 通过这个请求,我们可以获取到指定时间范围内的审批实例数据。在实际操作中,需要根据返回结果中的 `next_cursor` 值进行分页处理,以确保所有数据都能被完整获取。 #### 数据转换与写入 获取到原始数据后,需要对其进行清洗和转换,以便后续写入目标系统(如金蝶)。清洗和转换过程可能包括以下步骤: 1. **字段映射**:将钉钉返回的数据字段映射到目标系统所需的字段。 2. **数据格式转换**:例如,将Unix时间戳转换为标准日期格式。 3. **异常处理**:过滤掉无效或异常的数据记录。 例如,将审批实例的开始和结束时间从Unix时间戳转换为标准日期格式: ```python import time def convert_timestamp_to_date(timestamp): return time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(timestamp / 1000)) # 示例数据 data = { 'start_time': '1633072800000', 'end_time': '1633159200000' } # 转换后的数据 converted_data = { 'start_time': convert_timestamp_to_date(int(data['start_time'])), 'end_time': convert_timestamp_to_date(int(data['end_time'])) } print(converted_data) ``` 通过上述步骤,我们可以确保从钉钉获取的数据经过清洗和转换后能够无缝对接到目标系统,实现高效的数据集成。 ![企业微信与OA系统接口开发配置](https://pic.qeasy.cloud/S1.png~tplv-syqr462i7n-qeasy.image) ### 使用轻易云数据集成平台实现金蝶云星空API接口的数据转换与写入 在数据集成的生命周期中,ETL(Extract, Transform, Load)转换是至关重要的一步。本文将深入探讨如何使用轻易云数据集成平台,将已经集成的源平台数据进行ETL转换,并转为金蝶云星空API接口所能够接收的格式,最终写入目标平台。 #### 1. 配置元数据 首先,我们需要配置元数据,以确保数据能够正确地从源平台提取、转换并写入到金蝶云星空。以下是一个典型的元数据配置示例: ```json { "api": "batchSave", "method": "POST", "idCheck": true, "operation": { "method": "batchArraySave", "rows": 1, "rowsKey": "array" }, "request": [ { "field": "FID", "label": "单据编号", "type": "string", "describe": "单据编号", "value": "_findCollection find FID from a2c0a432-8d16-3930-9056-73ae3f6acde0 where FBillNo={{单据编号}}" }, { "field": "F_VAOJ_TDD", "label": "是否同步钉钉", "type": "string", "describe": "单据类型", "value": "已同步" } ], "otherRequest": [ { "field": "FormId", "label": "业务对象表单Id", "type": "string", "describe":"必须填写金蝶的表单ID如:PUR_PurchaseOrder", "value":"CN_PAYAPPLY" }, { "field":"Operation", "label":"执行的操作", "type":"string", "value":"BatchSave" }, { "field":"IsAutoSubmitAndAudit", ... ``` #### 2. 数据请求与清洗 在数据请求阶段,我们通过API调用从源系统提取原始数据。此时,需要确保所提取的数据符合目标系统的需求。例如,通过SQL查询语句 `_findCollection find FID from a2c0a432-8d16-3930-9056-73ae3f6acde0 where FBillNo={{单据编号}}` 提取特定单据编号的数据。 ```json { ... { “field”: “FID”, “label”: “单据编号”, “type”: “string”, “describe”: “单据编号”, “value”: “_findCollection find FID from a2c0a432-8d16-3930-9056-73ae3f6acde0 where FBillNo={{单据编号}}” } } ``` #### 3. 数据转换 在数据转换阶段,我们需要将提取的数据按照金蝶云星空API接口要求的格式进行转换。例如,将字段 `F_VAOJ_TDD` 的值设置为 `"已同步"`,以标识该记录已经同步至钉钉。 ```json { ... { “field”: “F_VAOJ_TDD”, “label”: “是否同步钉钉”, “type”: “string”, “describe”: “单据类型”, “value”: “已同步” } } ``` #### 4. 数据写入 在数据写入阶段,我们将转换后的数据通过API接口写入到金蝶云星空系统中。具体操作包括设置业务对象表单ID、执行批量保存操作等。 ```json { ... { “field”:“FormId”, “label”:“业务对象表单Id”, “type”:“string”, “describe”:“必须填写金蝶的表单ID如:PUR_PurchaseOrder”, “value”:“CN_PAYAPPLY” }, { ... //其他必要字段配置 ... } } ``` #### API接口调用示例 以下是一个完整的API调用示例: ```json { "api":"batchSave", "method":"POST", "idCheck":true, "operation":{ "method":"batchArraySave","rows":1,"rowsKey":"array" }, "request":[ {"field":"FID","label":"单据编号","type":"string","describe":"单据编号","value":"_findCollection find FID from a2c0a432-8d16-3930-9056-73ae3f6acde0 where FBillNo={{单据编号}}"}, {"field":"F_VAOJ_TDD","label":"是否同步钉钉","type":"string","describe":"单据类型","value":"已同步"} ], "otherRequest":[ {"field":"FormId","label":"业务对象表单Id","type":"string","describe":"必须填写金蝶的表单ID如:PUR_PurchaseOrder","value":"CN_PAYAPPLY"}, {"field":"Operation","label":"执行的操作","type":"string","value":"BatchSave"}, {"field":"IsAutoSubmitAndAudit","label":"提交并审核","type":"bool","value":"false"}, {"field":"IsVerifyBaseDataField","label":"验证基础资料","type":"bool","describe":"","value":"", {"label":"","field":"","type":"","parser":{"name":"","params":[]}} ] } ``` 通过以上步骤,我们可以实现从源平台到金蝶云星空系统的数据无缝对接。每一步都需要严格按照配置要求进行,以确保数据准确性和完整性。这不仅提高了业务透明度和效率,也保证了系统间的数据一致性。 ![如何开发企业微信API接口](https://pic.qeasy.cloud/T10.png~tplv-syqr462i7n-qeasy.image)