轻易云数据集成平台:将源数据ETL转换并写入金蝶云星空

  • 轻易云集成顾问-冯潇

钉钉数据集成到金蝶云星空:传给钉钉后,回传金蝶字段②

在处理企业内部系统对接的过程中,实现高效且可靠的数据集成是一项重要任务。本文将重点分享如何通过轻易云数据集成平台,将钉钉的数据集成到金蝶云星空,并确保每个环节的准确性和可靠性。

实现方案概述

本次案例应用了“传给钉钉后,回传金蝶字段②”的方案名称。在这个技术案例中,我们利用轻易云平台来协调两大业务系统——从获取并处理来自钉钉的流程实例数据(topapi/processinstance/get),到快速、高效地写入这些数据至金蝶云星空(batchSave)。

数据获取与接口调用

首先,通过调用topapi/processinstance/get接口,我们能够定时、可靠地抓取并监控来自钉钉的流程实例信息。为了保证数据不漏单,必须合理应对分页和限流问题。这需要我们设置适当的请求频率,同时针对API限制设计轮询机制,以逐页读取和处理返回结果。

def fetch_dingtalk_data():
    # 调用DingTalk API: topapi/processinstance/get
    payload = {"process_code": "your_process_code", "start_time": start_time, "end_time": end_time}
    response = requests.post('https://oapi.dingtalk.com/topapi/processinstance/get', data=payload)

    if response.status_code == 200:
        return response.json()
    else:
        handle_error(response)

data = fetch_dingtalk_data()

数据转换与格式差异调整

其次,在将采集到的数据推送至金蝶云星空之前,需要解决两者之间可能存在的数据格式差异。在实际场景下,这种映射通常通过自定义规则来完成。例如,某些字符串类型需转换为日期或浮点数等特定格式。此外,还要考虑两者对于必填字段、不允许为空字段等校验要求。

def transform_data(dingtalk_data):
    transformed = []

    for record in dingtalk_data['result']['items']:
        transformed_record = {
            'field1': record['field_x'],
            'field2': convert_date(record['field_y']),
            # 其他字段映射规则...
        }
        transformed.append(transformed_record)

    return transformed

transformed_data = transform_data(data)

快速写入与异常处理机制

最后,为确保大量数据能快速、安全地批量写入到金蝶云星空中,我们采用了batchSave接口。同时,为保障整个过程中的稳定运行,还引入了一套异常处理 打通金蝶云星空数据接口

调用钉钉接口topapi/processinstance/get获取并加工数据

在轻易云数据集成平台的生命周期中,调用源系统接口是至关重要的一步。本文将深入探讨如何通过调用钉钉接口topapi/processinstance/get来获取并加工数据。

接口调用配置

首先,我们需要配置API调用的元数据。以下是具体的元数据配置:

{
  "api": "topapi/processinstance/get",
  "method": "POST",
  "number": "id",
  "id": "id",
  "idCheck": true,
  "condition_bk": [
    [
      {
        "field": "field1",
        "logic": "eqv2",
        "value": null
      }
    ]
  ],
  "request": [
    {
      "label": "审批流的唯一码",
      "field": "process_code",
      "type": "string",
      "value": "PROC-A48FE992-9F20-4D1D-BDDB-92208230F790"
    },
    {
      "label": "审批实例开始时间。Unix时间戳,单位毫秒。",
      "field": "start_time",
      "type": "string",
      "value": "_function ({LAST_SYNC_TIME}-(3600 * 24))* 1000"
    },
    {
      "label": "审批实例结束时间,Unix时间戳,单位毫秒",
      "field": "end_time",
      "type": "string",
      "value": "{CURRENT_TIME}000"
    },
    {
      "label": "分页参数,每页大小,最多传20。",
      "field": "size",
      "type": "string",
      "value": 20
    },
    {
      "label": "分页查询的游标,最开始传0,后续传返回参数中的next_cursor值。",
      "field": "cursor",
      "type": "string"
    }
  ]
}

请求参数解析

  1. 审批流的唯一码(process_code):这是一个固定值,用于标识具体的审批流程。
  2. 审批实例开始时间(start_time):通过函数计算得到,该函数将上次同步时间减去一天(3600 * 24秒),然后转换为毫秒。
  3. 审批实例结束时间(end_time):当前时间转换为Unix时间戳,并乘以1000转换为毫秒。
  4. 分页参数(size):每页最多返回20条记录。
  5. 分页查询的游标(cursor):初始值为0,后续请求中使用返回结果中的next_cursor

数据请求与清洗

在实际操作中,我们需要通过HTTP POST方法向钉钉接口发送上述请求参数。以下是一个示例代码片段:

import requests
import time

url = 'https://oapi.dingtalk.com/topapi/processinstance/get'
headers = {'Content-Type': 'application/json'}
data = {
    'process_code': 'PROC-A48FE992-9F20-4D1D-BDDB-92208230F790',
    'start_time': int((time.time() - (3600 * 24)) * 1000),
    'end_time': int(time.time() * 1000),
    'size': 20,
    'cursor': 0
}

response = requests.post(url, json=data, headers=headers)
result = response.json()

在这个过程中,需要注意以下几点:

  1. 时间戳计算:确保start_timeend_time的计算正确无误,以避免数据遗漏或重复。
  2. 分页处理:如果返回结果中包含next_cursor,则需要进行循环请求,直到所有数据获取完毕。

数据转换与写入

获取到的数据通常需要进行清洗和转换,以便后续处理。例如,将Unix时间戳转换为可读日期格式、过滤掉不必要的字段等。以下是一个简单的数据清洗示例:

from datetime import datetime

def clean_data(data):
    cleaned_data = []
    for item in data:
        cleaned_item = {
            'id': item['id'],
            'process_code': item['process_code'],
            'start_time': datetime.fromtimestamp(item['start_time'] / 1000).strftime('%Y-%m-%d %H:%M:%S'),
            'end_time': datetime.fromtimestamp(item['end_time'] / 1000).strftime('%Y-%m-%d %H:%M:%S'),
            # 添加其他需要的字段
        }
        cleaned_data.append(cleaned_item)
    return cleaned_data

cleaned_result = clean_data(result['data'])

最终,将清洗后的数据写入目标系统或数据库中,以完成整个数据集成过程。

通过以上步骤,我们实现了从钉钉接口获取、清洗并加工数据,为后续的数据处理和分析打下了坚实基础。在实际应用中,可以根据具体需求对上述流程进行优化和调整,以提高效率和准确性。 打通金蝶云星空数据接口

使用轻易云数据集成平台进行ETL转换并写入金蝶云星空API接口的技术案例

在轻易云数据集成平台的生命周期中,第二步是将已经集成的源平台数据进行ETL(提取、转换、加载)转换,使其符合目标平台金蝶云星空API接口所能接收的格式,并最终写入目标平台。本文将详细探讨如何通过配置元数据,实现这一过程。

元数据配置解析

在本案例中,我们需要将数据通过金蝶云星空的batchSave API接口进行写入。以下是元数据配置的具体内容及其解析:

{
  "api": "batchSave",
  "method": "POST",
  "idCheck": true,
  "operation": {
    "method": "batchArraySave",
    "rows": 1,
    "rowsKey": "array"
  },
  "request": [
    {
      "field": "FID",
      "label": "单据编号",
      "type": "string",
      "describe": "单据编号",
      "value": "_findCollection find FID from 6e81d756-f703-35df-a576-a63fd822f5f5 where FBillNo={{单据编号}}"
    },
    {
      "field": "F_VAOJ_TDD",
      "label": "是否同步钉钉",
      "type": "string",
      "describe": "单据类型",
      "value": "已同步"
    }
  ],
  "otherRequest": [
    {
      "field": "FormId",
      "label": "业务对象表单Id",
      "type": "string",
      "describe": "必须填写金蝶的表单ID如:PUR_PurchaseOrder",
      "value":"CN_PAYAPPLY"
    },
    {
      "field":"Operation",
      "label":"执行的操作",
      "type":"string",
      "value":"BatchSave"
    },
    {
      "field":"IsAutoSubmitAndAudit",
      "label":"提交并审核",
      “type”:”bool”,
      “value”:”false”
    },
    {
       “field”:”IsVerifyBaseDataField”,
       “label”:”验证基础资料”,
       “type”:”bool”,
       “describe”:”是否验证所有的基础资料有效性,布尔类,默认false(非必录)”,
       “value”:”false”
     },
     {
       “label”:”NeedUpDateFields”,
       “field”:”NeedUpDateFields”,
       “type”:”string”,
       “value”:”F_VAOJ_TDD”,
       “parser”:{“name”:StringToArray”,params”:”,“}
     }
   ]
}

配置详解

  1. API和方法

    • api: 指定了要调用的API接口为batchSave
    • method: HTTP请求方法为POST
  2. ID检查

    • idCheck: 设置为true,表示在进行批量保存时会检查ID字段。
  3. 操作配置

    • operation: 定义了具体操作的方法和相关参数。
      • method: 使用batchArraySave方法。
      • rows: 每次处理的数据行数为1。
      • rowsKey: 数据行键名为array
  4. 请求参数

    • request: 包含了具体的数据字段配置。
      • FID: 单据编号,通过SQL查询从指定的数据集中获取。
      • F_VAOJ_TDD: 是否同步钉钉,固定值为“已同步”。
  5. 其他请求参数

    • otherRequest: 包含了额外的请求参数。
      • FormId: 金蝶业务对象表单ID,这里设定为“CN_PAYAPPLY”。
      • Operation: 执行操作类型,这里设定为“BatchSave”。
      • IsAutoSubmitAndAudit: 是否自动提交并审核,布尔值设定为false
      • IsVerifyBaseDataField: 是否验证基础资料有效性,布尔值设定为false(非必录)。
      • NeedUpDateFields: 需要更新的字段,这里使用了一个字符串转数组的解析器,将逗号分隔的字符串转换为数组。

实际应用案例

在实际应用中,我们需要将源平台的数据经过清洗和转换后,通过上述配置写入到金蝶云星空系统。以下是具体步骤:

  1. 提取数据:首先从源平台提取所需的数据,例如订单信息、客户信息等。
  2. 数据清洗:对提取的数据进行清洗,包括去除无效数据、格式化字段等操作。
  3. 数据转换:根据目标平台要求,将清洗后的数据进行转换。例如,将订单编号映射到金蝶系统中的单据编号字段(FID)。
  4. 构建请求:使用元数据配置构建HTTP请求,将转换后的数据打包成JSON格式,并填充到相应的字段中。
  5. 发送请求:通过HTTP POST方法,将构建好的请求发送到金蝶云星空API接口进行批量保存操作。

技术要点总结

  • 在整个ETL过程中,确保每个步骤的数据准确性和一致性至关重要。特别是在转换阶段,需要严格按照目标系统的字段要求进行映射和格式化。
  • 使用轻易云提供的全透明可视化界面,可以实时监控和调试每个环节的数据流动和处理状态,大大提高了工作效率和准确度。
  • 配置元数据时,需要详细了解目标系统API接口的要求,以便正确设置每个字段和参数。这包括了解必要字段、可选字段以及特殊处理逻辑(如字符串转数组)。

通过以上技术案例,可以看到如何利用轻易云数据集成平台高效地完成从源平台到金蝶云星空系统的数据ETL转换和写入过程。这不仅简化了复杂的数据集成任务,也确保了业务流程的顺畅运行。 如何开发企业微信API接口