轻易云平台上的ETL转换和写入技术案例

  • 轻易云集成顾问-贺强

金蝶云星空与轻易云集成平台的销售出库单查询方案-new

在实际业务场景中,如何高效、精准地将金蝶云星空的数据集成到轻易云数据集成平台,是确保业务连续性和准确性的关键。本案例分享了一个完整的解决方案,展示了从调用金蝶云星空API接口executeBillQuery获取销售出库单数据,到批量写入轻易云集成平台,以及如何处理过程中可能遇到的各种技术挑战。

确保数据不漏单

为避免在数据抓取过程中出现遗漏,我们采用了定时可靠的数据抓取机制。通过配置定时任务,每隔一段时间便自动调用金蝶云星空的接口executeBillQuery进行查询,并记录每次查询的起始和结束时间,以确保所有生成的新出库单都能被及时捕获。

批量写入大量数据

面对高并发、大批量的数据传输需求,我们特别关注了批量写入技术。在接收到来自金蝶云星空的大量出库单信息后,通过轻易云提供的批处理功能,将这些数据快速、高效地提交到目标数据库。该方式不仅提高了传输效率,还大幅减少了系统资源消耗。

异常处理与重试机制

为了保障整个流程的稳定性,当出现网络抖动或其他导致请求失败的问题时,我们实现了一套完善异常处理与错误重试机制。如果某次API调用返回异常,会立即启动重新尝试逻辑直到成功,或者累计一定次数错误后进行报警通知。

数据格式差异处理

不同系统之间的数据格式往往存在一致性问题,在对接过程中我们利用轻易云平台强大的自定义映射功能,对从金蝶获取的数据进行了必要转换,完全匹配目标结构要求。这一步骤消除了因格式差异而导致的数据解析困难,使得最终写入操作变得顺畅无误。

本文将在以下章节详细探讨上述各个环节,同时也会涉及分页与限流、实时监控日志、以及具体接口使用方法等内容,为您呈现一个完整且可复用的数据集成解决方案。 金蝶与CRM系统接口开发配置

调用金蝶云星空接口executeBillQuery获取并加工数据

在轻易云数据集成平台中,调用源系统接口是数据集成生命周期的第一步。本文将详细探讨如何通过调用金蝶云星空的executeBillQuery接口来获取销售出库单数据,并进行初步加工。

接口配置与请求参数

根据元数据配置,我们需要向金蝶云星空发送一个POST请求,具体配置如下:

  • API名称: executeBillQuery
  • 请求方法: POST
  • 业务对象表单Id: SAL_OUTSTOCK
  • 查询字段集合: 通过FieldKeys字段指定
  • 过滤条件: 使用FilterString字段定义
  • 分页参数: 包括Limit, StartRow, TopRowCount

请求体中的主要字段包括:

  1. FID: 唯一标识符
  2. FBillTypeID_FNumber: 单据类型编号
  3. FBillNo: 单据编号
  4. FEntity_FENTRYID: 分录行唯一标识符
  5. FDate: 日期
  6. FSaleOrgId_FNumber: 销售组织编号
  7. FCustomerID_FNumber: 客户编号

以及其他相关字段,如销售门店、销售部门、交货地点等。

请求示例

以下是一个典型的请求示例:

{
  "FormId": "SAL_OUTSTOCK",
  "FieldKeys": [
    "FID", 
    "FBillTypeID.FNumber", 
    "FBillNo", 
    "FEntity_FENTRYID", 
    "FDate", 
    "FSaleOrgId.FNumber", 
    "FCustomerID.FNumber"
  ],
  "FilterString": "FRealQty <> FARJoinQty and F_QDUT_TEXT4 <> '' and FDocumentStatus='C'",
  "Limit": 2000,
  "StartRow": 0,
  "TopRowCount": true
}

数据清洗与转换

在获取到原始数据后,需要对其进行清洗和转换,以便后续处理。以下是一些常见的数据清洗操作:

  1. 去除无效数据:例如,删除空值或无效的记录。
  2. 格式转换:将日期字符串转换为标准日期格式,将数值字符串转换为数值类型。
  3. 字段映射与重命名:根据业务需求,对字段进行重命名或映射。

假设我们从接口返回了以下数据:

[
  {
    "FID": "12345",
    "FBillTypeID.FNumber": "XSCKD01_SYS",
    "FBillNo": "SO20231001",
    "FEntity_FENTRYID": "1001",
    "FDate": "2023-10-01",
    "FSaleOrgId.FNumber": "ORG001",
    "FCustomerID.FNumber": "CUST001"
  },
  ...
]

我们可以对其进行如下处理:

import json
from datetime import datetime

# 示例原始数据
data = [
  {
    "FID": "12345",
    "FBillTypeID.FNumber": "XSCKD01_SYS",
    "FBillNo": "SO20231001",
    "FEntity_FENTRYID": "1001",
    "FDate": "2023-10-01",
    "FSaleOrgId.FNumber": "ORG001",
    "FCustomerID.FNumber": "CUST001"
  }
]

# 数据清洗与转换函数
def clean_and_transform(data):
    cleaned_data = []

    for record in data:
        cleaned_record = {}
        cleaned_record['id'] = record['FID']
        cleaned_record['bill_type'] = record['FBillTypeID.FNumber']
        cleaned_record['bill_no'] = record['FBillNo']
        cleaned_record['entry_id'] = record['FEntity_FENTRYID']
        cleaned_record['date'] = datetime.strptime(record['FDate'], '%Y-%m-%d')
        cleaned_record['sale_org'] = record['FSaleOrgId.FNumber']
        cleaned_record['customer_id'] = record['FCustomerID.FNumber']

        cleaned_data.append(cleaned_record)

    return cleaned_data

# 执行清洗与转换
transformed_data = clean_and_transform(data)
print(json.dumps(transformed_data, indent=2, default=str))

自动填充响应

在轻易云平台上,可以通过自动填充响应功能,将处理后的数据直接写入目标系统或数据库。这一步骤通常包括:

  1. 定义目标系统的API或数据库连接。
  2. 配置自动填充规则,将清洗后的字段映射到目标系统的相应字段。
  3. 执行写入操作,并监控写入状态。

通过上述步骤,我们可以高效地完成从金蝶云星空获取销售出库单数据并进行初步加工的全过程,为后续的数据处理和分析打下坚实基础。 金蝶与MES系统接口开发配置

使用轻易云数据集成平台进行ETL转换并写入目标平台的技术案例

在本案例中,我们将深入探讨如何利用轻易云数据集成平台将已经集成的源平台数据进行ETL转换,并最终写入目标平台。我们将重点关注API接口的配置和调用,确保数据能够无缝地从源系统传输到目标系统。

数据清洗与转换

首先,我们需要对从源平台获取的数据进行清洗和转换。数据清洗的目的是去除不必要或错误的数据,确保数据的完整性和准确性。数据转换则是将源数据格式转化为目标系统所需的格式。

例如,假设我们从源系统获取到以下销售出库单数据:

{
  "order_id": "12345",
  "customer_name": "张三",
  "product_code": "P001",
  "quantity": 10,
  "price": 100.0,
  "order_date": "2023-10-01"
}

在清洗过程中,我们可能需要验证order_id是否唯一,检查customer_name是否为空,以及确认quantityprice字段的数据类型是否正确。

接下来,我们进行数据转换,将上述JSON格式的数据转换为目标平台所需的格式。例如,目标平台可能要求订单日期格式为YYYY/MM/DD,而不是YYYY-MM-DD

配置元数据

在轻易云数据集成平台中,我们通过配置元数据来定义API接口的调用方式。根据提供的元数据配置:

{
  "api": "写入空操作",
  "effect": "EXECUTE",
  "method": "POST",
  "idCheck": true
}

我们可以理解以下几点:

  • api: 指定了要调用的API接口名称,这里是“写入空操作”。
  • effect: 表示该操作的效果,这里是执行(EXECUTE)。
  • method: 指定HTTP请求方法,这里是POST。
  • idCheck: 表示是否需要进行ID检查,这里为true。

API接口调用

在完成数据清洗和转换后,我们需要通过API接口将处理后的数据写入目标平台。以下是一个示例代码片段,展示了如何使用Python进行API调用:

import requests
import json

# 定义目标URL
url = 'https://api.qingyiyun.com/write'

# 定义请求头
headers = {
    'Content-Type': 'application/json'
}

# 定义请求体
payload = {
    "order_id": "12345",
    "customer_name": "张三",
    "product_code": "P001",
    "quantity": 10,
    "price": 100.0,
    "order_date": "2023/10/01"
}

# 发起POST请求
response = requests.post(url, headers=headers, data=json.dumps(payload))

# 检查响应状态码
if response.status_code == 200:
    print("Data written successfully.")
else:
    print(f"Failed to write data. Status code: {response.status_code}")

在这个示例中,我们首先定义了目标URL和请求头,然后构建了请求体(payload),最后通过requests库发起POST请求,将处理后的数据发送到目标平台。

实时监控与错误处理

为了确保数据能够顺利写入目标平台,我们还需要实现实时监控和错误处理机制。在实际应用中,可以通过轻易云提供的可视化监控界面实时查看数据流动和处理状态。如果出现错误,可以通过日志记录和告警机制及时发现并解决问题。

例如,如果API返回了非200状态码,可以记录详细的错误信息,以便后续排查:

if response.status_code != 200:
    error_log = {
        'status_code': response.status_code,
        'response_text': response.text,
        'payload': payload
    }
    with open('error_log.json', 'a') as log_file:
        log_file.write(json.dumps(error_log) + '\n')

通过以上步骤,我们实现了从源系统到目标系统的数据ETL转换,并成功将处理后的数据写入目标平台。这不仅提高了业务流程的自动化程度,还保证了数据的一致性和准确性。 如何开发用友BIP接口