轻易云平台实现ETL转换与目标平台数据写入

  • 轻易云集成顾问-何语琴

查询简道云物料表单数据集成方案分享

在本案例中,我们将详细解析如何使用轻易云数据集成平台,从简道云系统中抓取物料表单数据并进行高效处理和写入。此方案不仅确保了高吞吐量的数据写入能力,还提供实时监控与告警,显著提升系统对接的可靠性和透明度。

首先,通过调用简道云API接口/api/v2/app/{app_id}/entry/{entry_id}/data获取原始的物料表单数据。为保障数据传输过程中的准确性,我们通过处理分页和限流问题来确保不漏单,并利用定时任务机制定期抓取最新的数据,以保持信息同步。

然后,为应对简道云与集成平台之间存在的数据格式差异,轻易云集成平台支持自定义的数据转换逻辑。这一功能使得我们可以灵活适配各类业务需求,实现精准映射。同时,通过其可视化的数据流设计工具,整个配置变得更加直观且易于管理,大大减少了开发周期,提高了整体效率。

在任务执行过程中,为保证每个环节的顺畅运行以及及时响应异常情况,我们还实施了一套完善的错误重试机制。在出现问题时,即可快速定位并解决,使系统始终处于最佳状态。此外,对接口调用的全过程进行了日志记录与实时监控,无论是性能瓶颈还是故障排除,都能做到心中有数,有据可查。

总之,本次技术分享将围绕上述核心点展开,希望能为您提供全面、高效、稳定的系统对接解决方案。 钉钉与ERP系统接口开发配置

调用简道云接口获取并加工数据的技术实现

在数据集成生命周期的第一步,调用源系统接口获取数据是至关重要的。本文将深入探讨如何通过轻易云数据集成平台调用简道云接口/api/v2/app/{app_id}/entry/{entry_id}/data来获取并加工数据。

元数据配置解析

首先,我们需要理解元数据配置metadata,它定义了如何调用简道云的API接口。以下是关键字段的解析:

  • api: 接口路径,格式为/api/v2/app/{app_id}/entry/{entry_id}/data
  • method: 请求方法,这里使用的是POST
  • number: 需要查询的字段之一,对应简道云中的字段标识。
  • id: 数据唯一标识符,对应简道云中的字段标识。
  • idCheck: 是否检查ID字段。

请求参数部分:

  1. 应用ID和表单ID

    {"field":"appId","label":"应用ID","type":"string","value":"63899c8e6705fb000870437d"}
    {"field":"entryId","label":"表单ID","type":"string","value":"63899c9264b9a9000ad4d3ce"}

    这些参数用于指定要查询的具体应用和表单。

  2. 需要查询的字段

    {"field":"fields","label":"需要查询的字段","type":"string","describe":"多个字段以逗号隔开,默认不传入则输出所有字段","value":"_widget_1669962898406,_id,_widget_1669962898407","parser":{"name":"StringToArray","params":","}}

    该参数指定了要查询的具体字段,并通过解析器将字符串转换为数组。

  3. 每页返回数量

    {"field":"limit","label":"每页返回数量","type":"string","describe":"查询的数据条数,1~100,默认10","value":"100"}

    该参数控制每次请求返回的数据条数,最大值为100。

  4. 过滤参数

    {
       "field": "filter",
       "label": "过滤参数",
       "type": "object",
       "children": [
           {
               "field": "rel",
               "label": "过滤参数逻辑",
               "type": "string",
               "describe": "\"rel\": \"and\", // 或者 \"or\"",
               "value": "and"
           },
           {
               "field": "cond_1",
               "label": "自定义字段过滤",
               "type": "object",
               "children": [
                   {
                       "field": "field",
                       "label": "过滤字段",
                       "type": "string",
                       "value": "createTime"
                   },
                   {
                       "field": "type",
                       "label": "过滤类型",
                       "type": "string",
                       "describe": "\"text\\number\\flowstate\"",
                       "value": "datetime"
                   },
                   {
                       "field": "method",
                       "label": "过滤方法",
                       "type": "string",
                       {
                           ...
                           ...
                           ...

实际调用与数据处理

在理解了元数据配置后,我们可以开始实际调用API并处理返回的数据。以下是一个示例代码片段,用于演示如何通过轻易云平台进行API调用和数据处理:

import requests
import json

# 定义API URL和请求头
api_url = 'https://www.jiandaoyun.com/api/v2/app/63899c8e6705fb000870437d/entry/63899c9264b9a9000ad4d3ce/data'
headers = {
    'Authorization': 'Bearer YOUR_ACCESS_TOKEN',
    'Content-Type': 'application/json'
}

# 构建请求体
payload = {
    'fields': ['_widget_1669962898406', '_id', '_widget_1669962898407'],
    'limit': 100,
    'filter': {
        'rel': 'and',
        'cond_1': {
            'field': 'createTime',
            'type': 'datetime',
            'method': 'range',
            'value': ['2023-01-01T00:00:00Z', '2023-12-31T23:59:59Z']
        }
    }
}

# 发起POST请求
response = requests.post(api_url, headers=headers, data=json.dumps(payload))

# 检查响应状态码
if response.status_code == 200:
    data = response.json()
    # 数据处理逻辑,例如存储到数据库或进一步加工
    print(data)
else:
    print(f"Error: {response.status_code}, {response.text}")

数据清洗与转换

在获取到原始数据后,我们通常需要对其进行清洗和转换,以便适应目标系统的需求。例如,将日期格式统一、去除空值、或者根据业务规则进行计算等。以下是一个简单的数据清洗示例:

def clean_data(raw_data):
    cleaned_data = []
    for record in raw_data:
        if record['_widget_1669962898406'] and record['_id']:
            cleaned_record = {
                'material_number': record['_widget_1669962898406'],
                'record_id': record['_id'],
                # 假设我们还需要将日期格式从ISO转换为其他格式
                'create_time': convert_date_format(record['createTime'])
            }
            cleaned_data.append(cleaned_record)
    return cleaned_data

def convert_date_format(iso_date):
    from datetime import datetime
    dt = datetime.fromisoformat(iso_date.replace('Z', '+00:00'))
    return dt.strftime('%Y-%m-%d %H:%M:%S')

# 调用清洗函数
cleaned_data = clean_data(data['data'])
print(cleaned_data)

通过以上步骤,我们成功地从简道云获取了原始数据,并进行了必要的数据清洗和转换,为后续的数据写入和集成做好了准备。 用友与WMS系统接口开发配置

使用轻易云数据集成平台进行ETL转换和写入目标平台的技术案例

在数据集成生命周期的第二步,我们将重点探讨如何将已经集成的源平台数据进行ETL转换,并转为目标平台轻易云集成平台API接口所能够接收的格式,最终写入目标平台。以下是具体的技术案例。

元数据配置解析

在本案例中,我们使用以下元数据配置来指导ETL过程:

{
  "api": "写入空操作",
  "effect": "EXECUTE",
  "method": "POST",
  "number": "number",
  "id": "id",
  "name": "编码",
  "idCheck": true
}

该配置包含以下关键字段:

  • api: 指定了目标API接口名称。
  • effect: 定义了操作类型,这里为EXECUTE,表示执行操作。
  • method: HTTP请求方法,这里为POST
  • number, id, name: 分别对应源数据中的字段名和目标API所需字段名。
  • idCheck: 布尔值,表示是否需要对ID进行校验。

数据请求与清洗

首先,从简道云物料表单中获取原始数据。假设我们已经完成了这一阶段的数据请求和初步清洗,得到如下结构的数据:

[
  {"number": "001", "id": "1001", "编码": "A001"},
  {"number": "002", "id": "1002", "编码": "A002"}
]

数据转换

根据元数据配置,我们需要将上述数据转换为目标API接口所能接收的格式。具体步骤如下:

  1. 字段映射:将源数据中的字段映射到目标API所需的字段。例如,将编码映射到name

  2. ID校验:如果idCheck为真,则需要对每条记录的ID进行校验,以确保其唯一性和正确性。

  3. 构建请求体:根据目标API接口要求构建HTTP请求体。

转换后的数据结构如下:

[
  {"number": "001", "id": "1001", "name": "A001"},
  {"number": "002", "id": "1002", "name": "A002"}
]

写入目标平台

接下来,通过轻易云集成平台提供的API接口将转换后的数据写入目标平台。具体实现如下:

  1. 设置HTTP请求头

    • Content-Type: application/json
    • Authorization: Bearer
  2. 发送POST请求

示例代码(Python):

import requests
import json

url = 'https://api.qingyiyun.com/execute'
headers = {
    'Content-Type': 'application/json',
    'Authorization': 'Bearer <token>'
}
data = [
    {"number": "001", "id": "1001", "name": "A001"},
    {"number": "002", "id": "1002", "name": "A002"}
]

response = requests.post(url, headers=headers, data=json.dumps(data))

if response.status_code == 200:
    print("Data successfully written to the target platform.")
else:
    print(f"Failed to write data. Status code: {response.status_code}")

通过上述步骤,我们成功地将简道云物料表单中的数据经过ETL处理后写入了轻易云集成平台。这一过程充分利用了元数据配置,实现了高效、准确的数据转换与写入。 金蝶与CRM系统接口开发配置

更多系统对接方案