使用轻易云平台完成ETL转换与数据写入的实践

  • 轻易云集成顾问-彭萍

金蝶云星辰V2数据集成到轻易云集成平台:查询客户案例

在此次技术分享中,我们将深入探讨如何高效地实现金蝶云星辰V2与轻易云数据集成平台的数据对接,具体案例为“查询客户”。项目的关键目标是通过轻易云集成平台来抓取和处理来自金蝶云星辰V2系统中的客户数据。

挑战一:确保不漏单

为了保证从金蝶云星辰V2系统获取的每条客户数据都能够被完整无误地传输到轻易云集成平台,首先需要依据API接口/jdy/v2/bd/customer定时抓取数据。使用定制化的数据映射功能,实现精确对接,并结合异常处理与错误重试机制,确保即使在网络波动或服务不可用情况下,也能最终获得一致性的数据结果。

挑战二:快速写入大量数据

面对海量的客户信息,通过批量操作极大提高了写入效率。利用轻易云集成平台所提供的大批量并发写入特性,可以显著缩短同步时间。此外,采用分页和限流策略,有效规避了金蝶云星辰V2 API调用频率限制的问题,从而保障了整个过程的顺畅进行。

挑战三:处理分页和格式差异问题

由于两个系统的数据结构可能存在显著差异,因此,在实际操作中需要详细配置字段映射及类型转换规则。同时,对页码、页大小参数的位置做出合理调整,使得多次调用联合完成全量数据抓取成为可能。这些设置在某种程度上决定了整个流程的智能化和自动化水平。

通过以上技术手段,我们不仅成功实现了两套不同系统间的数据完全互通,而且还确保实时监控与日志记录功能帮助我们随时掌握各项任务执行情况,一旦出现问题能及时定位并解决。 如何开发金蝶云星空API接口

使用轻易云数据集成平台调用金蝶云星辰V2接口获取客户数据

在数据集成的生命周期中,第一步是调用源系统接口获取原始数据。本文将详细探讨如何通过轻易云数据集成平台调用金蝶云星辰V2接口/jdy/v2/bd/customer,并对获取的数据进行初步加工。

接口概述

金蝶云星辰V2提供了丰富的API接口供外部系统调用,其中/jdy/v2/bd/customer用于查询客户信息。该接口支持GET请求,能够根据指定的查询条件返回客户数据。以下是该接口的元数据配置:

{
  "api": "/jdy/v2/bd/customer",
  "effect": "QUERY",
  "method": "GET",
  "number": "number",
  "id": "id",
  "name": "number",
  "request": [
    {
      "field": "modify_end_time",
      "label": "修改时间-结束时间的时间戳(毫秒)",
      "type": "string",
      "describe": "修改时间-结束时间的时间戳(毫秒)",
      "value": "_function {CURRENT_TIME}*1000"
    },
    {
      "field": "modify_start_time",
      "label": "修改时间-开始时间的时间戳(毫秒)",
      "type": "string",
      "describe": "修改时间-开始时间的时间戳(毫秒)",
      "value": "_function {LAST_SYNC_TIME}*1000"
    },
    {
      "field": "page",
      "label": "当前页,默认1",
      "type": "string",
      "describe": "当前页,默认1",
      "value": "1"
    },
    {
      "field": "page_size",
      "label": "每页显示条数,默认10",
      "type": "string",
      "describe": "每页显示条数,默认10",
      "value": "50"
    }
  ],
  ...
}

请求参数解析

  1. modify_end_time: 修改时间的结束时间戳,以毫秒为单位。使用函数{CURRENT_TIME}*1000动态生成当前时间。
  2. modify_start_time: 修改时间的开始时间戳,以毫秒为单位。使用函数{LAST_SYNC_TIME}*1000动态生成上次同步的时间。
  3. page: 当前页码,默认为1。
  4. page_size: 每页显示的数据条数,默认为50。

这些参数确保了我们可以灵活地控制查询范围和分页,从而高效地获取所需数据。

调用示例

以下是一个调用该接口的示例代码:

import requests
import time

# 定义请求URL和参数
url = 'https://api.kingdee.com/jdy/v2/bd/customer'
params = {
    'modify_end_time': int(time.time()) * 1000,
    'modify_start_time': (int(time.time()) - 86400) * 1000, # 假设上次同步为24小时前
    'page': '1',
    'page_size': '50'
}

# 发起GET请求
response = requests.get(url, params=params)

# 检查响应状态码
if response.status_code == 200:
    data = response.json()
    # 对返回的数据进行处理
else:
    print(f"Error: {response.status_code}")

数据处理与清洗

在获取到原始数据后,我们需要对其进行清洗和初步加工。这包括但不限于:

  1. 字段映射与转换:将源系统中的字段映射到目标系统中的字段,并进行必要的数据类型转换。
  2. 去重与过滤:去除重复记录,并根据业务需求过滤掉不需要的数据。
  3. 格式化处理:对日期、金额等特殊字段进行格式化处理,以符合目标系统要求。

以下是一个简单的数据清洗示例:

def clean_data(raw_data):
    cleaned_data = []

    for record in raw_data:
        cleaned_record = {
            'customer_id': record['id'],
            'customer_number': record['number'],
            'customer_name': record['name'],
            # 添加更多字段映射和转换逻辑
        }
        cleaned_data.append(cleaned_record)

    return cleaned_data

# 假设data是从API获取到的原始数据
cleaned_data = clean_data(data)

通过上述步骤,我们可以确保从金蝶云星辰V2获取的数据经过清洗和加工后,可以无缝地集成到目标系统中,为后续的数据转换与写入打下坚实基础。 钉钉与CRM系统接口开发配置

使用轻易云数据集成平台进行ETL转换与数据写入

在数据集成的生命周期中,第二步是将已经集成的源平台数据进行ETL转换,并转为目标平台所能够接收的格式,最终写入目标平台。本文将深入探讨如何利用轻易云数据集成平台的API接口实现这一过程。

API接口配置

在进行ETL转换和数据写入之前,我们首先需要了解目标平台API接口的配置。在本案例中,我们使用以下元数据配置:

{
  "api": "写入空操作",
  "effect": "EXECUTE",
  "method": "POST",
  "idCheck": true
}

该配置表示我们将通过POST方法调用“写入空操作”API,并且在执行过程中会进行ID检查。下面我们详细介绍如何基于此配置完成ETL转换和数据写入。

数据请求与清洗

在第一步中,我们已经完成了从源平台的数据请求与清洗。假设我们获取到的数据如下:

[
  {
    "customerId": "12345",
    "name": "张三",
    "email": "zhangsan@example.com",
    "phoneNumber": "+8613800138000"
  },
  {
    "customerId": "67890",
    "name": "李四",
    "email": "lisi@example.com",
    "phoneNumber": "+8613900139000"
  }
]

数据转换

接下来,我们需要将上述数据转换为目标平台所能接收的格式。假设目标平台要求的数据格式如下:

{
  "id": "",
  "attributes": {
    "fullName": "",
    "contactEmail": "",
    "contactPhone": ""
  }
}

我们可以编写一个简单的转换函数,将源数据转换为目标格式:

def transform_data(source_data):
    transformed_data = []

    for item in source_data:
        transformed_item = {
            "id": item["customerId"],
            "attributes": {
                "fullName": item["name"],
                "contactEmail": item["email"],
                "contactPhone": item["phoneNumber"]
            }
        }
        transformed_data.append(transformed_item)

    return transformed_data

source_data = [
  {
    "customerId": "12345",
    "name": "张三",
    "email": "zhangsan@example.com",
    "phoneNumber": "+8613800138000"
  },
  {
    "customerId": "67890",
    "name": "李四",
    "email": "lisi@example.com",
    "phoneNumber": "+8613900139000"
  }
]

transformed_data = transform_data(source_data)
print(transformed_data)

运行上述代码后,transformed_data 将包含如下内容:

[
  {
    "id": "12345",
    "attributes":
      {
        "fullName":"张三",
        "contactEmail":"zhangsan@example.com",
        “contactPhone":"+8613800138000"
      }
   },
   {
     “id":"67890",
     “attributes":
       {
         “fullName":"李四",
         “contactEmail":"lisi@example.com",
         “contactPhone":"+8613900139000"
       }
   }
]

数据写入

完成数据转换后,我们需要将其写入目标平台。根据元数据配置,我们使用POST方法调用“写入空操作”API,并进行ID检查。以下是一个示例代码片段,展示如何通过HTTP请求库(如requests)实现这一过程:

import requests

def write_to_target_platform(data):
    url = 'https://api.targetplatform.com/execute'

    headers = {
        'Content-Type': 'application/json'
    }

    for item in data:
        response = requests.post(url, json=item, headers=headers)

        if response.status_code == 200:
            print(f"Successfully wrote data for ID: {item['id']}")
        else:
            print(f"Failed to write data for ID: {item['id']}, Status Code: {response.status_code}")

write_to_target_platform(transformed_data)

上述代码通过POST方法逐条发送已转换的数据到目标平台,并检查响应状态码,以确保每条记录都成功写入。

小结

本文介绍了如何使用轻易云数据集成平台进行ETL转换和数据写入,包括API接口配置、数据请求与清洗、数据转换以及最终的数据写入过程。通过这些步骤,可以实现不同系统间的数据无缝对接,提高业务透明度和效率。 用友与CRM系统接口开发配置

更多系统对接方案