ETL转换与数据写入:轻易云完整流程详解

  • 轻易云集成顾问-何语琴

金蝶云星空数据集成到轻易云平台:查询金蝶云星空客户案例分享

在现代企业应用中,高效的数据对接与集成是实现业务成功的关键环节之一。本篇技术文章将详细介绍如何在轻易云数据集成平台上,实现金蝶云星空系统中的客户信息数据的高效、稳定对接。具体方案名称为“查询金蝶云星空客户”。

首先,针对这一任务,我们需要通过金蝶云星空提供的API接口executeBillQuery来获取客户信息数据。这一步骤至关重要,因为它确保我们获得的数据完整且准确。为了应对可能出现的大量数据处理和分页问题,我们设计了分步抓取和批量写入的方法。此外,还考虑到了接口调用过程中可能的限流策略,以避免过载影响系统性能。

通过使用轻易云提供的数据写入API 写入空操作, 我们能快速地将从金蝶获取的数据批量写入到目标系统中。期间,通过设置定时任务,可以确保抓取过程定期可靠地运行,从而保证实时更新。另外,考虑到不同系统间的数据格式差异,我们在配置元数据时进行了相应的映射设计,以便两者间无缝衔接。

此外,为保障整个流程透明且可控,实施了实时监控和日志记录功能。一旦发生异常情况,例如网络故障或接口响应错误,轻易云的平台能够迅速检测并触发重试机制,有效提高了数据传输过程中的可靠性。

这只是本次技术案例的一部分介绍。在后续内容中,将进一步深入探讨具体实现步骤及代码示例,帮助读者更好理解并掌握这种高效、安全的数据集成方法。 打通钉钉数据接口

调用金蝶云星空接口executeBillQuery获取并加工数据

在数据集成的生命周期中,调用源系统接口是至关重要的一步。本文将详细探讨如何使用轻易云数据集成平台调用金蝶云星空的executeBillQuery接口来获取客户数据,并对其进行初步加工。

接口配置与请求参数

首先,我们需要配置元数据以正确调用金蝶云星空的API。以下是关键的元数据配置:

{
  "api": "executeBillQuery",
  "method": "POST",
  "number": "FNumber",
  "id": "FCUSTID",
  "pagination": {
    "pageSize": 100
  },
  "idCheck": true,
  "request": [
    {"field":"FCUSTID","label":"FCUSTID","type":"string","value":"FCUSTID"},
    {"field":"FNumber","label":"编码","type":"string","value":"FNumber"},
    {"field":"FName","label":"名称","type":"string","value":"FName"},
    {"field":"FCreateOrgId_FNumber","label":"创建组织","type":"string","value":"FCreateOrgId.FNumber"},
    {"field":"FUseOrgId_FNumber","label":"使用组织","type":"string","value":"FUseOrgId.FNumber"},
    {"field":"FGroup_FNumber","label":"客户分组","type":"string","value":"FGroup.FNumber"},
    {"field":"FSALDEPTID_FNumber","label":"销售部门","type":"string","value":"FSALDEPTID.FNumber"},
    {"field":"FSELLER_FNumber","label":"销售员","type":"string","value":"FSELLER.FNumber"},
    {"label":"聚水潭客户编号","field":"F_KLF_TEXT","type":"string","value":"F_KLF_TEXT"}
  ],
  "otherRequest": [
    {"field": "Limit", "label": "最大行数", "type": "string", "describe": "金蝶的查询分页参数", "value": "{PAGINATION_PAGE_SIZE}"},
    {"field": "StartRow", "label": "开始行索引", "type": "string", "describe": "金蝶的查询分页参数", "value": "{PAGINATION_START_ROW}"},
    {"field": "TopRowCount", "label": "返回总行数", "type": "int", "describe": "金蝶的查询分页参数"},
    {"field": "FilterString", "label": "过滤条件", "type": "string", 
        "describe": 
        "示例写法 FSupplierId.FNumber = 'VEN00010' and FApproveDate>=",
        "value":
        "FApproveDate>='{{LAST_SYNC_TIME|dateTime}}' and FUseOrgId.FNumber in ('101','102') and F_KLF_TEXT is not null"
    },
    {"field": 
        "FieldKeys",
        "label":
        "需查询的字段key集合",
        "type":
        "array",
        "describe":
        "金蝶分录主键ID格式:FPOOrderEntry_FEntryId,其它格式 FPurchaseOrgId.FNumber",
        "parser":{
            "name":
            "ArrayToString",
            "params":
            ","
        }
    },
    {"field":
        "FormId",
        "label":
        "业务对象表单Id",
        "type":
        "string",
        "describe":
        "必须填写金蝶的表单ID如:PUR_PurchaseOrder",
        "value":
        "BD_Customer"
    }
  ]
}

请求构建与发送

根据上述元数据配置,我们需要构建一个POST请求来调用executeBillQuery接口。以下是一个请求示例:

{
  "FormId": "BD_Customer",
  "_parameters_":{
      "_metadata_":{
          "_pagination_":{
              "_pageSize_":{
                  "_value_":{
                      "_number_":{
                          "_integer_":{

        }
      }
}

数据清洗与转换

在接收到原始数据后,下一步是对其进行清洗和转换。这一步骤确保数据符合目标系统的要求,并去除不必要的信息。

  1. 字段映射:将原始字段映射到目标字段。例如,将FCUSTID映射到目标系统中的客户ID。
  2. 数据类型转换:确保所有字段的数据类型符合目标系统要求。例如,将字符串类型转换为日期类型。
  3. 过滤无效数据:根据业务规则过滤掉无效或重复的数据。例如,删除没有客户编号(F_KLF_TEXT为空)的记录。

示例代码

以下是一个简单的数据清洗和转换示例代码:

import json

def clean_and_transform(data):
    cleaned_data = []

    for record in data:
        if record.get("F_KLF_TEXT"):
            transformed_record = {
                'customer_id': record.get("FCUSTID"),
                'code': record.get("FNumber"),
                'name': record.get("FName"),
                'create_org': record.get("FCreateOrgId_FNumber"),
                'use_org': record.get("FUseOrgId_FNumber"),
                'group': record.get("FGroup_FNumber"),
                'sales_dept': record.get("FSALDEPTID_FNumber"),
                'salesman': record.get("FSELLER_FNumber"),
                'jushuitan_customer_no': record.get("F_KLF_TEXT")
            }
            cleaned_data.append(transformed_record)

    return cleaned_data

# 示例调用
raw_data = [...] # 从API获取的原始数据
cleaned_data = clean_and_transform(raw_data)
print(json.dumps(cleaned_data, indent=2))

通过上述步骤,我们成功地从金蝶云星空获取了客户数据,并对其进行了初步加工,为后续的数据处理和写入奠定了基础。 如何对接金蝶云星空API接口

使用轻易云数据集成平台进行ETL转换并写入目标平台的技术案例

在数据集成生命周期的第二步中,我们需要将已经集成的源平台数据进行ETL转换,转为目标平台能够接收的格式,并最终写入目标平台。本文将详细介绍如何利用轻易云数据集成平台API接口完成这一过程。

数据请求与清洗

首先,我们从金蝶云星空系统中查询客户数据。这一步通常包括通过API接口获取原始数据,并对其进行初步清洗和预处理。假设我们已经成功获取了客户数据,并进行了必要的清洗操作,使其符合基本的数据质量要求。

数据转换

接下来,我们需要将清洗后的数据进行转换,以符合轻易云集成平台API接口所能接受的格式。在此过程中,我们可能需要进行以下几种操作:

  1. 字段映射:将源系统中的字段映射到目标系统中的相应字段。例如,金蝶云星空中的“客户名称”字段可能需要映射到轻易云集成平台中的“customer_name”字段。
  2. 数据类型转换:确保数据类型的一致性。例如,将字符串类型的数据转换为目标系统所需的日期类型。
  3. 值转换:根据业务需求,对某些字段的值进行转换。例如,将状态码从“0”转换为“inactive”,从“1”转换为“active”。

假设我们有以下原始客户数据:

{
  "customer_id": "12345",
  "customer_name": "ABC公司",
  "contact_number": "13800000000",
  "status": "1"
}

经过ETL转换后,目标格式可能如下:

{
  "id": "12345",
  "name": "ABC公司",
  "phone": "13800000000",
  "status": "active"
}

数据写入

在完成数据转换后,我们需要将这些数据写入到轻易云集成平台。根据提供的元数据配置,可以看到我们需要使用POST方法调用API接口,并且需要进行ID检查(idCheck: true)。

以下是一个示例代码片段,展示如何使用Python调用轻易云集成平台API接口完成数据写入:

import requests
import json

# 转换后的客户数据
data = {
    "id": "12345",
    "name": "ABC公司",
    "phone": "13800000000",
    "status": "active"
}

# API接口URL
url = 'https://api.qingyiyun.com/write'

# 请求头
headers = {
    'Content-Type': 'application/json'
}

# 发起POST请求
response = requests.post(url, headers=headers, data=json.dumps(data))

# 检查响应状态码
if response.status_code == 200:
    print("Data written successfully")
else:
    print(f"Failed to write data: {response.status_code}")

在上述代码中,我们首先定义了要写入的数据,然后构建了请求头和请求体,最后通过requests.post方法发起POST请求,将数据写入到轻易云集成平台。

ID检查机制

根据元数据配置中的idCheck: true,我们需要确保在写入新记录之前,检查是否存在相同ID的记录。如果存在,则可能需要更新而不是插入新记录。这可以通过在发起POST请求之前先发起GET请求来实现:

# 检查是否存在相同ID的记录
check_url = f'https://api.qingyiyun.com/check/{data["id"]}'
check_response = requests.get(check_url)

if check_response.status_code == 200 and check_response.json().get('exists'):
    # 如果记录存在,则执行更新操作
    update_url = f'https://api.qingyiyun.com/update/{data["id"]}'
    update_response = requests.put(update_url, headers=headers, data=json.dumps(data))

    if update_response.status_code == 200:
        print("Data updated successfully")
    else:
        print(f"Failed to update data: {update_response.status_code}")
else:
    # 如果记录不存在,则执行插入操作
    insert_response = requests.post(url, headers=headers, data=json.dumps(data))

    if insert_response.status_code == 200:
        print("Data written successfully")
    else:
        print(f"Failed to write data: {insert_response.status_code}")

通过这种方式,我们可以确保在写入新记录之前,先检查是否存在相同ID的记录,从而避免重复插入或覆盖已有的数据。

以上就是利用轻易云数据集成平台进行ETL转换并写入目标平台的详细技术案例,通过这些步骤,我们可以高效地实现不同系统间的数据无缝对接。 钉钉与ERP系统接口开发配置