ETL数据转换与写入:轻易云平台集成技术详解

  • 轻易云集成顾问-杨嫦

金蝶云星空数据集成到轻易云集成平台的实施案例——客户查询

在实际业务需求中,企业常面临着将不同系统的数据进行高效整合的问题。本文将聚焦于一个具体案例:通过轻易云数据集成平台,将金蝶云星空中的客户信息无缝对接,确保数据不漏单、快速写入和批量处理。

集成背景与挑战

为了实现这一目标,我们使用了金蝶云星空的executeBillQuery API接口来获取所需的客户数据,并通过轻易云集成平台的batchSave接口批量写入。这一过程中有几个关键挑战需要解决:

  1. 如何确保集成功能不漏单:在调用金蝶云星空API时,需要针对分页和限流问题进行精细化处理,以避免遗漏任何记录。
  2. 大量数据快速写入: 我们设计了高效的数据传输和写入机制,使得大量客户数据能够迅速且稳定地导入到轻易云平台。
  3. 处理格式差异与映射关系: 针对两个系统间的数据格式差异,通过定制化的数据映射实现无缝对接。

主要技术方案与实现步骤

首先,我们必须确保从金蝶云星空获取的数据全面且准确,为此我们采用了一些特定方法:

  • 调用executeBillQuery API时,根据文档要求设定适当的参数,如分页大小及请求频率,从而有效管理API限流问题。
  • 实现错误重试机制,在网络异常或服务器返回非预期结果时自动重新发起请求,提高整体抓取操作的可靠性。

然后,对抓取到的大量客户信息进行统一转换及清洗,使其符合轻易云平台所需的数据结构。这个环节中,涉及字段名称、类型以及特殊符号等方面的一致性调整,通过自定义脚本或转换工具完成这些任务。

最后,要特别注意的是,在将整理后的大批量数据信息提交给轻易云的平台之前,应先经过预检查以确认数据完整性,然后利用其提供的批量保存功能,即调用batchSave接口,实现一次性的多条记录插入操作,大幅提升效率。

下一步内容将详细介绍每个具体环节中的技术实施细节,包括代码示例和注意事项,帮助您更好理解并运用这种高效、安全、灵活的数据集成功能。 用友与MES系统接口开发配置

调用金蝶云星空接口executeBillQuery获取并加工数据

在数据集成的生命周期中,调用源系统接口获取数据是至关重要的一步。本文将详细介绍如何使用轻易云数据集成平台调用金蝶云星空的executeBillQuery接口,以实现客户信息的查询和加工。

接口配置与请求参数

首先,我们需要配置调用金蝶云星空接口的元数据。根据提供的元数据配置,我们可以看到该接口使用POST方法,并且需要传递多个字段参数。以下是关键的配置项:

  • API: executeBillQuery
  • Method: POST
  • Pagination: 支持分页,默认每页100条记录
  • ID Check: 启用ID检查

请求参数包括客户ID、编码、名称、创建组织、使用组织等多个字段。这些字段在请求体中以JSON格式传递。

{
  "FCUSTID": "FCUSTID",
  "FNumber": "FNumber",
  "FName": "FName",
  "FCreateOrgId_FNumber": "FCreateOrgId.FNumber",
  "FUseOrgId_FNumber": "FUseOrgId.FNumber",
  ...
}

构建请求体

为了构建请求体,我们需要将上述字段按照元数据配置进行映射和填充。以下是一个示例请求体:

{
  "FormId": "BD_Customer",
  "FieldKeys": [
    "FCUSTID", 
    "FNumber", 
    "FName", 
    ...
  ],
  "FilterString": "FApproveDate>='2023-01-01'",
  "Limit": 100,
  "StartRow": 0
}

在这个示例中,FormId指定了业务对象表单ID为BD_CustomerFieldKeys包含了我们需要查询的字段集合,FilterString用于设置过滤条件,例如只查询2023年1月1日之后的数据。

数据清洗与转换

获取到原始数据后,需要对其进行清洗和转换,以满足目标系统或业务需求。常见的数据清洗操作包括:

  1. 去除空值或无效值:确保每个字段都有有效的数据。
  2. 格式转换:例如,将日期字符串转换为标准日期格式。
  3. 字段映射:根据目标系统的要求,对字段进行重命名或重新组织。

以下是一个简单的数据清洗示例:

def clean_data(raw_data):
    cleaned_data = []
    for record in raw_data:
        if record['FCUSTID'] and record['FName']:
            cleaned_record = {
                'CustomerID': record['FCUSTID'],
                'CustomerName': record['FName'],
                'Organization': record['FCreateOrgId_FNumber'],
                ...
            }
            cleaned_data.append(cleaned_record)
    return cleaned_data

数据写入

经过清洗和转换后的数据,可以通过轻易云平台写入到目标系统。在写入过程中,需要确保数据的一致性和完整性。例如,可以使用事务管理来保证批量写入操作的原子性。

def write_data_to_target(cleaned_data):
    for record in cleaned_data:
        # 调用目标系统API写入数据
        response = target_system_api.write(record)
        if not response.success:
            # 错误处理逻辑
            handle_error(response.error)

实时监控与日志记录

在整个数据集成过程中,实时监控和日志记录是不可或缺的部分。通过轻易云平台提供的可视化界面,可以实时监控数据流动和处理状态。同时,通过日志记录,可以追踪每一步操作,方便后续排查问题。

def log_operation(operation, status, details):
    log_entry = {
        'operation': operation,
        'status': status,
        'details': details,
        'timestamp': datetime.now()
    }
    logging.info(log_entry)

以上就是调用金蝶云星空接口获取并加工客户信息的详细技术案例。通过合理配置元数据、构建请求体、进行数据清洗与转换,以及最终的数据写入和监控,可以高效地实现跨系统的数据集成。 用友与外部系统接口集成开发

轻易云数据集成平台的ETL转换与写入

在轻易云数据集成平台中,数据处理的第二步是将已经集成的源平台数据进行ETL(提取、转换、加载)转换,并转为目标平台API接口所能够接收的格式,最终写入目标平台。本文将详细探讨这一过程中的技术细节,特别是如何通过API接口实现数据的高效集成。

API接口配置与调用

在进行ETL转换时,首先需要配置API接口,以便将处理后的数据写入目标平台。以下是一个元数据配置示例,用于客户查询的数据集成:

{
  "api": "batchSave",
  "method": "POST",
  "idCheck": true,
  "request": [
    {"field": "FName", "label": "客户名称", "type": "string"},
    {"field": "FNumber", "label": "客户编码", "type": "string"},
    {"field": "FCreateOrgId", "label": "创建组织", "type": "string", 
     "parser": {"name": "ConvertObjectParser", "params": "FNumber"}, 
     "value": "100"},
    {"field": "FUseOrgId", "label": "使用组织", 
     "type": "string", 
     "parser": {"name": "ConvertObjectParser", 
     "params": "FNumber"}, 
     "value":"100"},
    {"field":"FDescription","label":"描述","type":"string"}
  ],
  ...
}

数据请求与清洗

在此阶段,我们从源系统提取原始数据,并进行必要的清洗操作。清洗操作包括去除重复记录、处理缺失值以及标准化字段格式等。这一步骤确保了数据的一致性和准确性,为后续的转换打下基础。

数据转换

接下来,我们进入数据转换阶段。根据元数据配置中的request部分,我们需要将源数据字段映射到目标平台所需的字段。例如:

  • FName 映射到客户名称
  • FNumber 映射到客户编码
  • FCreateOrgIdFUseOrgId 需要通过 ConvertObjectParser 转换器进行解析

具体实现时,可以利用轻易云提供的内置解析器,如 ConvertObjectParser,来处理复杂的数据转换需求。以下是一个示例代码片段:

def convert_data(source_data):
    converted_data = []
    for record in source_data:
        converted_record = {
            'FName': record['customer_name'],
            'FNumber': record['customer_code'],
            'FCreateOrgId': convert_object_parser(record['create_org']),
            'FUseOrgId': convert_object_parser(record['use_org']),
            'FDescription': record.get('description', '')
        }
        converted_data.append(converted_record)
    return converted_data

def convert_object_parser(value):
    # 假设 ConvertObjectParser 的逻辑为简单映射
    return f"ORG_{value}"

数据写入

完成数据转换后,我们需要将其批量写入目标平台。根据元数据配置中的 operation 部分,可以看到我们使用的是 batchArraySave 方法,每次处理20条记录。以下是一个示例API调用:

import requests

def batch_save_to_target_platform(converted_data):
    url = 'https://api.qingyiyun.com/batchSave'
    headers = {'Content-Type': 'application/json'}

    payload = {
        'FormId': 'BD_Customer',
        'Operation': 'BatchSave',
        'IsAutoSubmitAndAudit': True,
        'IsVerifyBaseDataField': False,
        'data': converted_data[:20]  # 每次发送20条记录
    }

    response = requests.post(url, json=payload, headers=headers)

    if response.status_code == 200:
        print("Data successfully saved to target platform.")
    else:
        print(f"Failed to save data: {response.text}")

# 示例调用
source_data = [...]  # 从源系统提取的数据
converted_data = convert_data(source_data)
batch_save_to_target_platform(converted_data)

通过上述步骤,我们实现了从源系统提取、清洗、转换并最终写入目标平台的完整ETL流程。在实际应用中,可以根据具体业务需求和系统特性对上述流程进行优化和调整,以提升效率和可靠性。

以上内容展示了如何利用轻易云数据集成平台进行高效的数据ETL转换与写入操作,通过合理配置API接口,实现不同系统间的数据无缝对接。 用友与外部系统接口集成开发