借助轻易云进行数据ETL转换并写入金蝶云星空

  • 轻易云集成顾问-彭萍

钉钉数据集成到金蝶云星空:传给钉钉后,回传金蝶字段②

在进行企业应用系统的数据对接过程中,高效、准确地实现数据流动和转换是确保业务连续性与高质量的关键。本文将介绍如何通过轻易云数据集成平台,将来自钉钉的数据批量、准确地写入到金蝶云星空中,并具体分析其中涉及的技术细节与解决方案。

针对接口分页及限流问题的处理

在从钉钉获取流程实例详情时,我们采用了topapi/processinstance/get API,该接口需要精确处理分页及调用频次限制。为了保证不遗漏任何数据,首先我们通过一次请求确定总记录数,然后分批次、多线程逐页抓取。在此过程中,通过自定义队列机制协调各个线程,有效避免因接口限流导致的数据丢失或请求失败。

数据格式差异转换及映射机制

由于钉钉和金蝶云星空中的数据结构并不完全相同,我们利用轻易云平台内置的映射工具,对两者之间的数据格式进行了定制化转换。例如,对于来自钉端的一些复合型字段,需要拆解为多个对应于金蝶单元格的独立字段,在命名规则以及类型匹配上也做了精细化调整。

高效稳定的大量数据写入

针对将大量输出结果迅速且可靠地导入到金蝶云星空中,我们使用了其提供的batchSave API,以批量模式减少网络延迟及错误率。同时,为应对可能出现的大规模写入失败情况,实现了一套异常处理与重试机制。一旦检测到插入操作发生错误,即刻启动重试逻辑,并记录详细日志以备查验,如多次尝试仍旧失败,则触发告警提醒相关人员介入调查。

这些有效方法不仅确保了不同系统间复杂对象的数据顺利迁移,也提高了整体运作效率,实现高水准的数据对接集成。本案例也展示出,不仅是技术方案,而是在实际落地执行过程中必须考虑的问题及相应解决策略,这才是真正意义上的成功经验分享。 泛微OA与ERP系统接口开发配置

调用钉钉接口topapi/processinstance/get获取并加工数据

在轻易云数据集成平台的生命周期中,调用源系统接口是至关重要的一步。本文将深入探讨如何通过调用钉钉接口topapi/processinstance/get来获取审批流数据,并进行初步的数据加工处理。

接口概述

钉钉提供的topapi/processinstance/get接口用于获取审批实例的详细信息。该接口采用POST请求方式,支持分页查询,返回指定时间范围内的审批实例数据。以下是该接口的元数据配置:

{
  "api": "topapi/processinstance/get",
  "method": "POST",
  "number": "id",
  "id": "id",
  "idCheck": true,
  "request": [
    {"label": "审批流的唯一码", "field": "process_code", "type": "string", "value": "PROC-22EDF4E6-5CC9-4712-B9A3-34AEAF37B8AC"},
    {"label": "审批实例开始时间。Unix时间戳,单位毫秒。", "field": "start_time", "type": "string", "value": "_function ({LAST_SYNC_TIME}-(3600 * 48))* 1000"},
    {"label": "审批实例结束时间,Unix时间戳,单位毫秒", "field": "end_time", "type": "string", "value": "{CURRENT_TIME}000"},
    {"label": "分页参数,每页大小,最多传20。", "field": "size", "type": "string", "value": "20"},
    {"label": "分页查询的游标,最开始传0,后续传返回参数中的next_cursor值。", "field": "cursor", "type": "string"}
  ]
}

请求参数解析

  1. process_code: 审批流的唯一码,用于标识具体的审批流程。在本例中,该值为固定字符串PROC-22EDF4E6-5CC9-4712-B9A3-34AEAF37B8AC
  2. start_time: 审批实例开始时间,以Unix时间戳表示。这里使用了一个函数计算方法,将上次同步时间减去48小时(即两天),并转换为毫秒。
  3. end_time: 审批实例结束时间,同样以Unix时间戳表示,这里直接取当前时间并转换为毫秒。
  4. size: 分页参数,每页大小,最多传20。
  5. cursor: 分页查询的游标,最开始传0,后续传返回参数中的next_cursor值。

数据请求与清洗

在调用接口获取数据后,需要对返回的数据进行清洗和初步处理。这一步骤确保数据符合目标系统(如金蝶)的要求,并为后续的数据转换与写入做好准备。

  1. 调用接口

    import requests
    import time
    
    url = 'https://oapi.dingtalk.com/topapi/processinstance/get'
    headers = {'Content-Type': 'application/json'}
    payload = {
       'process_code': 'PROC-22EDF4E6-5CC9-4712-B9A3-34AEAF37B8AC',
       'start_time': int((time.time() - (3600 * 48)) * 1000),
       'end_time': int(time.time() * 1000),
       'size': '20',
       'cursor': '0'
    }
    
    response = requests.post(url, json=payload, headers=headers)
    data = response.json()
  2. 数据清洗 对于返回的数据,需要进行清洗和格式化。例如,将日期字段转换为目标系统所需的格式,对冗余字段进行删除等。

    def clean_data(data):
       cleaned_data = []
       for instance in data['result']['list']:
           cleaned_instance = {
               'id': instance['process_instance_id'],
               'start_time': time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(instance['create_time'] / 1000)),
               'end_time': time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(instance['finish_time'] / 1000)),
               # 添加其他需要的字段
           }
           cleaned_data.append(cleaned_instance)
       return cleaned_data
    
    cleaned_data = clean_data(data)

数据转换与写入

在完成数据清洗后,可以将数据转换为目标系统所需的格式,并写入目标系统(如金蝶)。这一步骤通常涉及到字段映射、数据类型转换等操作。

def transform_and_write(cleaned_data):
    for record in cleaned_data:
        # 转换为目标系统所需格式
        transformed_record = {
            '单据编号': record['id'],
            '开始时间': record['start_time'],
            '结束时间': record['end_time'],
            # 添加其他需要映射的字段
        }
        # 写入目标系统,例如通过API调用或数据库操作
        write_to_target_system(transformed_record)

def write_to_target_system(record):
    # 示例:通过API写入金蝶系统
    target_url = 'https://kingdee.example.com/api/endpoint'
    headers = {'Content-Type': 'application/json'}
    response = requests.post(target_url, json=record, headers=headers)
    if response.status_code == 200:
        print('Record written successfully')
    else:
        print('Failed to write record')

transform_and_write(cleaned_data)

通过以上步骤,我们实现了从钉钉获取审批实例数据,并进行初步加工处理,为后续的数据集成奠定了基础。这一过程展示了轻易云数据集成平台在生命周期管理中的高效性和透明性。 金蝶云星空API接口配置

使用轻易云数据集成平台进行ETL转换并写入金蝶云星空API接口

在数据集成生命周期的第二阶段,关键任务是将已经集成的源平台数据进行ETL(Extract, Transform, Load)转换,并转为目标平台——金蝶云星空API接口所能够接收的格式,最终写入目标平台。本文将详细探讨这一过程中的技术细节和实现方法。

配置元数据

首先,我们需要配置元数据,以确保数据能够正确地从源平台提取、转换,并写入到金蝶云星空。以下是一个示例元数据配置:

{
  "api": "batchSave",
  "method": "POST",
  "idCheck": true,
  "operation": {
    "method": "batchArraySave",
    "rows": 1,
    "rowsKey": "array"
  },
  "request": [
    {
      "field": "FID",
      "label": "单据编号",
      "type": "string",
      "describe": "单据编号",
      "value": "_findCollection find FID from 621d031b-1a18-361a-886b-67df87d8c5b6 where FBillNo={{单据编号}}"
    },
    {
      "field": "F_VAOJ_TDD",
      "label": "是否同步钉钉",
      "type": "string",
      "describe": "单据类型",
      "value": "已同步"
    }
  ],
  "otherRequest": [
    {
      "field": "FormId",
      "label": "业务对象表单Id",
      "type": "string",
      "describe": "必须填写金蝶的表单ID如:PUR_PurchaseOrder",
      "value": "CN_PAYAPPLY"
    },
    {
      "field": "Operation",
      "label": "执行的操作",
      "type": "string",
      "value": ":BatchSave"
    },
    {
      ...
    }
  ]
}

数据提取与转换

在ETL过程中,第一步是从源系统中提取数据。在上面的元数据配置中,通过_findCollection函数来查找特定条件下的数据。例如,_findCollection find FID from 621d031b-1a18-361a-886b-67df87d8c5b6 where FBillNo={{单据编号}}表示根据单据编号查找对应的FID。

接下来,需要对提取的数据进行转换,以满足目标系统的要求。在这里,我们需要将字段F_VAOJ_TDD设置为“已同步”,以标识该记录已经同步到钉钉。

数据加载

最后一步是将转换后的数据加载到目标系统——金蝶云星空。通过配置中的otherRequest部分,可以指定一些额外的请求参数,例如:

  • FormId: 金蝶业务对象表单ID,例如“CN_PAYAPPLY”。
  • Operation: 执行的操作类型,这里为“BatchSave”。
  • IsAutoSubmitAndAudit: 是否自动提交并审核,布尔值。
  • IsVerifyBaseDataField: 是否验证基础资料有效性,布尔值。

这些参数确保了在调用金蝶API时,能够正确地执行批量保存操作。

调用API接口

通过上述配置,我们可以使用轻易云的数据集成平台调用金蝶云星空的API接口,实现数据的批量保存。具体实现代码如下:

import requests
import json

url = 'https://api.kingdee.com/batchSave'
headers = {'Content-Type': 'application/json'}
payload = {
    'FormId': 'CN_PAYAPPLY',
    'Operation': 'BatchSave',
    'IsAutoSubmitAndAudit': False,
    'IsVerifyBaseDataField': False,
    'NeedUpDateFields': ['F_VAOJ_TDD'],
    'data': [
        {
            'FID': '<extracted_FID>',
            'F_VAOJ_TDD': '已同步'
        }
        # More records can be added here
    ]
}

response = requests.post(url, headers=headers, data=json.dumps(payload))
if response.status_code == 200:
    print('Data successfully saved to Kingdee Cloud.')
else:
    print('Failed to save data:', response.text)

通过上述步骤和代码示例,我们可以完成从源平台到金蝶云星空的数据ETL转换和写入过程。这不仅确保了数据的一致性和完整性,还极大地提高了业务处理效率。 系统集成平台API接口配置