使用轻易云集成平台进行ETL转换与数据写入的技术案例

  • 轻易云集成顾问-卢剑航

金蝶云星空数据集成到轻易云集成平台的实现方案

在多个企业系统中高效、准确地完成数据对接一直是系统集成领域的重要课题。本案例将详细探讨如何利用轻易云数据集成平台,将金蝶云星空的数据无缝对接,特别是关于“查询金蝶部门”的具体实施方案。

首先,我们需要确保从金蝶云星空获取的API接口executeBillQuery能够定时可靠地抓取所需的部门信息。该接口允许我们通过参数设定,实现灵活多样的信息检索,通常来说,这些请求会涉及到分页处理和限流问题。为保证高效且稳定的数据传输,需要配置合理的分页机制并设置适当的限流策略,以防止因瞬间的大量请求而导致系统开销过大或响应超时。

在成功获取数据之后,下一步就是将这些大量的数据快速写入到轻易云集成平台。这一步骤极为关键,因为它关系到整体数据集成过程中的性能表现及业务连续性。这里我们主要利用轻易云提供的数据写入API写入空操作来完成这一任务。在实际操作过程中,为了避免因为网络抖动或服务器异常带来的意外错误,还可以结合异常处理与错误重试机制,以提升整体流程的健壮性和容错能力。

同时,在整个数据转移过程中,需要解决两种平台间可能存在的数据格式差异问题。这一过程不仅包括制定合适的数据映射规则,还必须进行必要的数据转换和清洗工作,以确保最终导入轻易云平台中的数据信息完整且正确。然而,这是一个需要细致调校和逐步验证的过程,可以借助于实时监控与日志记录功能,对每个环节进行有效追踪与管理,从而及时发现并纠正任何潜在的问题。

通过以上步骤,我们可以构建出一个高效、可靠且便于维护的平台对接方案,为后续更多元化、更深入的数据整合奠定坚实基础。 用友BIP接口开发配置

调用金蝶云星空接口executeBillQuery获取并加工数据

在数据集成的生命周期中,调用源系统接口是至关重要的一步。本文将详细探讨如何通过轻易云数据集成平台调用金蝶云星空的executeBillQuery接口来获取并加工部门数据。

接口配置与请求参数

首先,我们需要配置元数据以便正确调用金蝶云星空的API。以下是元数据配置的关键部分:

{
  "api": "executeBillQuery",
  "method": "POST",
  "number": "FName",
  "id": "FDEPTID",
  "pagination": {
    "pageSize": 100
  },
  "request": [
    {"field":"FName","label":"名称","type":"string","value":"FName"},
    {"field":"FNumber","label":"编码","type":"string","value":"FNumber"},
    {"field":"FCreateOrgId","label":"创建组织","type":"string","value":"FCreateOrgId"},
    {"field":"FUseOrgId","label":"使用组织","type":"string","value":"FUseOrgId"},
    {"label":"FDEPTID","field":"FDEPTID","type":"string","value":"FDEPTID"}
  ],
  "otherRequest": [
    {"field":"Limit","label":"最大行数","type":"string","describe":"金蝶的查询分页参数","value":"{PAGINATION_PAGE_SIZE}"},
    {"field":"StartRow","label":"开始行索引","type":"string","describe":"金蝶的查询分页参数","value":"{PAGINATION_START_ROW}"},
    {"field":"TopRowCount","label":"返回总行数","type":"int","describe":"金蝶的查询分页参数"},
    {"field":"FilterString","label":"过滤条件","type":"string","describe":"示例写法 FSupplierId.FNumber = 'VEN00010' and FApproveDate>=","value":"FAuditDate>='{{LAST_SYNC_TIME|datetime}}' and FForbidStatus='A'"},
    {"field":"FieldKeys","label":"需查询的字段key集合","type":"array","describe":"金蝶分录主键ID格式:FPOOrderEntry_FEntryId,其它格式 FPurchaseOrgId.FNumber", "parser":{"name": "ArrayToString", "params": ","}},
    {"field": "FormId", "label": "业务对象表单Id", "type": "string", "describe": "必须填写金蝶的表单ID如:PUR_PurchaseOrder", "value": "BD_Department"}
  ]
}

请求构建

在构建请求时,我们需要特别注意以下几个关键字段:

  1. FormId: 表单ID,这里我们设置为BD_Department,表示我们要查询的是部门信息。
  2. FieldKeys: 查询字段集合,这里我们需要将多个字段组合成一个字符串,以逗号分隔。
  3. FilterString: 用于过滤数据的条件,例如我们可以设置为FAuditDate>='{{LAST_SYNC_TIME|datetime}}' and FForbidStatus='A',以确保只获取最新且有效的数据。
  4. Pagination: 分页参数,通过设置LimitStartRow来控制每次请求的数据量和起始位置。

数据请求与清洗

通过上述配置,我们可以发送POST请求到金蝶云星空API。以下是一个示例请求体:

{
  "FormId": "BD_Department",
  "FieldKeys": ["FName", "FNumber", "FCreateOrgId", "FUseOrgId", "FDEPTID"].join(","),
  "FilterString": "FAuditDate>='2023-01-01T00:00:00' and FForbidStatus='A'",
  "Limit": 100,
  "StartRow": 0
}

在接收到响应后,需要对数据进行清洗和转换,以便后续处理。清洗过程包括但不限于:

  • 去除无效或重复的数据。
  • 转换字段类型,例如将日期字符串转换为日期对象。
  • 根据业务需求重新格式化数据。

数据转换与写入

经过清洗后的数据可以进一步转换为目标系统所需的格式,并写入到相应的数据存储中。这一步通常涉及到:

  • 字段映射:将源系统字段映射到目标系统字段。
  • 数据验证:确保所有必填字段都有值且格式正确。
  • 批量写入:为了提高效率,可以批量写入数据。

通过以上步骤,我们完成了从调用源系统接口获取数据,到清洗、转换和写入目标系统的全过程。这不仅提高了数据处理的效率,也确保了数据的一致性和准确性。 打通用友BIP数据接口

使用轻易云数据集成平台进行ETL转换并写入目标平台的技术案例

在数据集成的生命周期中,ETL(提取、转换、加载)过程至关重要。本文将深入探讨如何将已经集成的源平台数据进行ETL转换,转为目标平台轻易云集成平台API接口所能够接收的格式,并最终写入目标平台。

数据提取与清洗

假设我们从金蝶系统中查询部门信息,获取到的数据通常是原始且未经处理的。为了确保数据质量,我们需要对其进行清洗和预处理。这一步骤包括但不限于去除冗余字段、标准化数据格式以及校验数据完整性。

import requests
import json

# 示例:从金蝶系统中提取部门数据
def fetch_department_data():
    response = requests.get("http://kingdee.example.com/api/departments")
    if response.status_code == 200:
        raw_data = response.json()
        cleaned_data = clean_data(raw_data)
        return cleaned_data
    else:
        raise Exception("Failed to fetch data from Kingdee API")

def clean_data(raw_data):
    # 假设我们只需要部门ID和名称
    cleaned_data = []
    for item in raw_data:
        if 'department_id' in item and 'department_name' in item:
            cleaned_data.append({
                'id': item['department_id'],
                'name': item['department_name']
            })
    return cleaned_data

数据转换

在完成数据清洗后,下一步是将数据转换为目标平台能够接收的格式。根据元数据配置,我们需要使用POST方法,并且启用ID检查功能。

# 示例:将清洗后的数据转换为目标平台所需格式
def transform_data(cleaned_data):
    transformed_data = []
    for item in cleaned_data:
        transformed_item = {
            "api": "写入空操作",
            "method": "POST",
            "idCheck": True,
            "data": {
                "id": item['id'],
                "name": item['name']
            }
        }
        transformed_data.append(transformed_item)
    return transformed_data

数据写入

最后一步是将转换后的数据通过API接口写入到目标平台。在这个过程中,需要确保API请求的正确性和成功率。

# 示例:通过API接口将转换后的数据写入目标平台
def write_to_target_platform(transformed_data):
    url = "http://qingyiyun.example.com/api/write"

    for item in transformed_data:
        headers = {'Content-Type': 'application/json'}
        response = requests.post(url, data=json.dumps(item), headers=headers)

        if response.status_code == 200:
            print(f"Successfully wrote data: {item}")
        else:
            print(f"Failed to write data: {item}, Status Code: {response.status_code}")

# 主函数调用示例
if __name__ == "__main__":
    try:
        cleaned_data = fetch_department_data()
        transformed_data = transform_data(cleaned_data)
        write_to_target_platform(transformed_data)
    except Exception as e:
        print(f"An error occurred: {e}")

通过上述步骤,我们实现了从金蝶系统提取部门信息,并经过清洗、转换后,通过轻易云集成平台API接口成功写入目标平台。此过程不仅保证了数据的一致性和完整性,还提高了系统间的数据流通效率。 用友与SCM系统接口开发配置

更多系统对接方案