详解ETL在轻易云集成平台中的应用

  • 轻易云集成顾问-蔡威

案例分享:金蝶云星空采购订单数据迁移到轻易云集成平台

在某大型制造企业的系统对接项目中,我们成功将其金蝶云星空系统的采购订单数据无缝集成到轻易云数据集成平台。本文将重点介绍如何通过调用金蝶cloud executeBillQuery接口抓取采购订单数据,并利用轻易云实现高效、可靠的数据写入和转储。

一、确保集成过程中的零丢失

首先要解决的是如何保证从金蝶cloud提取的数据不漏单。在这个项目中,我们通过定时任务周期性调用executeBillQuery接口,获取最新的采购订单,并设计了有效的数据校验机制:

  1. 分页查询与限流处理:由于金蝶Cloud API返回的数据量较大,我们需要分批次进行请求,同时还必须应对API自身的限流策略。为此,在每次执行分页查询时,会记录当前页码及返回结果数量,确保下一轮请求能准确衔接未完成部分。

  2. 缓存与重试机制:为了防止网络波动或API响应超时导致的数据丢失问题,我们引入了一个临时缓存,将每一次成功请求但未确认写入的平台间隔一次存储,以便在发生异常情况时,可以重新发起未完成的数据写入操作。

二、大量数据快速写入

对于从executeBillQuery接口获取的大量采购订单数据,需要迅速而准确地导入至轻易云。这一过程中我们使用批量操作来加速事务处理:

  1. 并行处理和批量提交:在接收到分页后的完整数据后,通过多线程技术进行并行处理,每个线程负责一定数量的记录后,再采用“一键式”批量提交至轻易云,这不仅提高了速度,也减少了网络IO重复开销。

  2. 异步日志与实时监控:交付过程中,关联日志会同步显示当前任务状态及已传输条目数,为运维人员提供实时监控支持。如果发现异常情况,可即时干预调整。

总结近期这段实施经验,在队伍合作下仅耗费少许时间便搭建出稳固且高效的信息桥梁——顺利将来自不同源头的大规模业务信息精准、高速地搬迁到新的目标平台上。在即后续内容里,我将更全面阐述具体实施步骤,包括API代码封装、映射关系配置等方面细节,进一步揭示整个联调工作流程。 打通金蝶云星空数据接口

调用金蝶云星空接口executeBillQuery获取并加工数据

在数据集成过程中,调用源系统的API接口是至关重要的一步。本文将深入探讨如何通过轻易云数据集成平台调用金蝶云星空的executeBillQuery接口,以获取采购订单数据并进行初步加工。

接口配置与请求参数

首先,我们需要了解executeBillQuery接口的基本配置和请求参数。根据元数据配置,executeBillQuery接口采用POST方法进行调用,主要用于查询(effect为QUERY)。以下是关键的请求字段及其描述:

  • FPOOrderEntry_FEntryId: 采购订单条目ID
  • FID: 采购订单ID
  • FBillNo: 单据编号
  • FSourceBillNo: 源单编号
  • FBillTypeID.FNumber: 单据类型(如标准采购订单、标准委外订单等)
  • FBusinessType: 业务类型(如标准采购、标准委外等)
  • FSupplierId.FNumber: 供应商编号
  • FSettleModeId.FNumber: 结算方式编号
  • FPayConditionId.FNumber: 付款条件编号
  • FDate: 采购日期
  • FDocumentStatus: 单据状态(如暂存、创建、审核中、已审核)
  • FPurchaseOrgId.FNumber: 采购组织编号

这些字段涵盖了采购订单的各个方面,为后续的数据处理提供了丰富的信息。

请求示例

为了更好地理解如何构建请求,我们来看一个具体的请求示例:

{
    "FormId": "PUR_PurchaseOrder",
    "FieldKeys": [
        "FID", 
        "FBillNo", 
        "FSupplierId.FNumber", 
        "FPurchaseOrgId.FNumber", 
        "FDate", 
        "FDocumentStatus"
    ],
    "FilterString": "FApproveDate>='2023-01-01' and FPurchaseOrgId.FNumber='100'",
    "Limit": 100,
    "StartRow": 0,
    "TopRowCount": true
}

在这个请求中,我们指定了业务对象表单ID为PUR_PurchaseOrder,并选择了需要查询的字段,如采购订单ID、单据编号、供应商编号等。同时,通过过滤条件FilterString限制了查询结果,仅返回审批日期在2023年1月1日之后且采购组织编号为100的记录。分页参数LimitStartRow则控制了返回结果的数量和起始行索引。

数据清洗与转换

获取到原始数据后,需要对其进行清洗和转换,以便后续处理。以下是一些常见的数据清洗与转换操作:

  1. 字段重命名:将原始字段名转换为更具可读性的名称。例如,将FSupplierId.FNumber重命名为SupplierCode
  2. 数据类型转换:确保每个字段的数据类型符合预期。例如,将日期字符串转换为日期对象。
  3. 缺失值处理:填补或删除缺失值,以保证数据完整性。
  4. 数据过滤:进一步筛选符合业务需求的数据记录。

以下是一个简单的数据清洗示例:

import pandas as pd

# 假设我们已经从API获取到原始数据,并存储在data变量中
data = [
    {"FID": "12345", "FBillNo": "PO20230101", "FSupplierId.FNumber": "SUP001", "FPurchaseOrgId.FNumber": "ORG001", "FDate": "2023-01-15", "FDocumentStatus": "C"},
    # 更多记录...
]

# 转换为DataFrame以便处理
df = pd.DataFrame(data)

# 字段重命名
df.rename(columns={
    'FID': 'PurchaseOrderID',
    'FBillNo': 'BillNo',
    'FSupplierId.FNumber': 'SupplierCode',
    'FPurchaseOrgId.FNumber': 'PurchaseOrgCode',
    'FDate': 'PurchaseDate',
    'FDocumentStatus': 'DocumentStatus'
}, inplace=True)

# 数据类型转换
df['PurchaseDate'] = pd.to_datetime(df['PurchaseDate'])

# 缺失值处理(示例:删除包含任何缺失值的行)
df.dropna(inplace=True)

print(df)

自动填充响应

轻易云平台支持自动填充响应功能,可以根据元数据配置自动将API响应映射到目标系统所需的格式。这极大简化了开发工作,提高了效率。

通过上述步骤,我们成功调用了金蝶云星空的executeBillQuery接口,获取并初步加工了采购订单数据。这些操作为后续的数据写入和进一步处理奠定了坚实基础。 数据集成平台API接口配置

数据集成生命周期中的ETL转换与写入

在数据集成的生命周期中,ETL(Extract, Transform, Load)过程是至关重要的一环。本文将深入探讨如何将已经集成的源平台数据进行ETL转换,并转为目标平台轻易云集成平台API接口所能够接收的格式,最终写入目标平台。

数据转换与清洗

在数据请求与清洗阶段,我们已经从源系统(金蝶-采购订单)获取了原始数据。接下来,我们需要对这些数据进行转换,以符合目标平台的要求。在这个过程中,元数据配置起到了关键作用。

元数据配置示例:

{
  "api": "写入空操作",
  "effect": "EXECUTE",
  "method": "POST",
  "idCheck": true
}

这个配置文件定义了API接口的基本信息,包括API名称、执行效果、HTTP方法以及是否进行ID检查。我们需要根据这些信息来设计我们的数据转换逻辑。

数据转换逻辑

  1. 字段映射:首先,我们需要将源系统的数据字段映射到目标系统的数据字段。例如,金蝶-采购订单中的字段order_id可能需要映射到轻易云平台中的id字段。
  2. 数据类型转换:不同系统之间的数据类型可能有所不同。例如,金蝶中的日期格式可能是YYYY-MM-DD,而轻易云平台可能要求YYYYMMDD格式。因此,我们需要对日期格式进行转换。
  3. 数据校验:根据元数据配置中的idCheck属性,如果设置为true,我们需要在写入之前检查ID是否存在,以避免重复写入或覆盖已有数据。

API接口调用

在完成数据转换之后,我们需要通过API接口将数据写入目标平台。根据元数据配置,我们使用HTTP POST方法来调用API接口。

示例代码:

import requests
import json

# 转换后的数据
data = {
    "id": transformed_order_id,
    "field1": transformed_field1,
    "field2": transformed_field2,
    # 其他字段...
}

# API URL
url = "https://api.qingyiyun.com/write_empty_operation"

# 请求头
headers = {
    "Content-Type": "application/json"
}

# 发起POST请求
response = requests.post(url, headers=headers, data=json.dumps(data))

# 检查响应状态
if response.status_code == 200:
    print("Data written successfully.")
else:
    print(f"Failed to write data: {response.status_code}, {response.text}")

在这个示例中,我们首先构建了一个包含转换后数据的字典,然后通过HTTP POST方法将其发送到指定的API URL。请求头中指定了内容类型为JSON格式。最后,通过检查响应状态码来确定数据是否成功写入。

异常处理与日志记录

在实际操作中,异常处理和日志记录也是不可或缺的一部分。我们需要捕获并处理可能出现的各种异常,例如网络超时、无效响应等。同时,将每次操作的详细日志记录下来,以便于后续排查问题和优化流程。

示例代码:

try:
    response = requests.post(url, headers=headers, data=json.dumps(data))
    response.raise_for_status()  # 如果响应状态码不是200,会抛出HTTPError异常
except requests.exceptions.RequestException as e:
    # 记录异常日志
    with open("error_log.txt", "a") as log_file:
        log_file.write(f"Failed to write data: {e}\n")
else:
    print("Data written successfully.")

通过这种方式,我们可以确保即使在出现异常时,也能有详细的日志记录以供分析和解决问题。

总结

本文详细介绍了如何在轻易云集成平台上进行ETL转换,并通过API接口将转换后的数据写入目标平台。从字段映射、数据类型转换到API调用和异常处理,每个步骤都至关重要且不可忽视。希望这些技术细节能够帮助您更好地理解和应用轻易云的数据集成能力,实现高效的数据管理和业务流程优化。 用友BIP接口开发配置