详解ETL过程:金蝶云数据在轻易云平台的转换与写入策略

  • 轻易云集成顾问-冯潇

金蝶云星空数据集成到轻易云平台案例分享:KD4 查询发货通知单-关联查询

在企业信息化系统之间进行高效的数据对接,一直是提升业务效率的关键。本文将重点介绍如何通过轻易云数据集成平台,将金蝶云星空中的发货通知单数据无缝、可靠地集成到我们的系统中。此次共享的方案名为“KD4 查询发货通知单-关联查询”,旨在确保不漏单、高效快速并且稳定地实现数据同步。

确保集成过程中的一致性与完整性

为了保障从金蝶云星空获取的数据完整,我们利用了金蝶提供的executeBillQuery接口,此接口支持批量查询和过滤条件,能精确获取需要的数据。然而,处理大量调用时分页和限流是不可避免的问题。在此过程中,通过设计合理的分页参数与限流机制,以及定时调度任务,保证每次抓取操作都能够按预期顺利完成,不遗漏任何一条记录。此外,对不同类型错误设定对应重试策略,提高了整个流程异常情况下恢复能力。

处理大规模数据写入

当成功从金蝶云星空取得相关数据后,需要将其高效写入到轻易云平台。我们采用了一套批量写入策略,这种方式不仅减少API调用次数,也显著加快了数据库操作速度。同时,为应对可能出现的数据格式差异问题,在轻易云平台端建立了一整套自定义映射规则,并通过可视化工具实时调整查看,有力保障了最终存储结果的一致性。

实时监控与日志记录

整个流程中,每一次API请求和数据库操作都会被实时监控,并详细记录至日志文件。这不但便于后续运维人员排查故障,更有助于分析潜在优化点。例如,通过观察日志,可以及时发现某次请求处理时间过长,从而反向追踪具体原因并改进相应环节。

此次技术文章开头即聚焦这些核心步骤,希望能为类似场景下的系统集成需求提供实用参考。在接下来的内容中,我们将更详尽地探讨各个技术细节及实际应用效果。 用友与SCM系统接口开发配置

调用金蝶云星空接口executeBillQuery获取并加工数据

在轻易云数据集成平台中,调用源系统接口是数据处理生命周期的第一步。本文将详细介绍如何通过调用金蝶云星空的executeBillQuery接口来获取并加工发货通知单的数据。

接口配置与请求参数

金蝶云星空提供了丰富的API接口,其中executeBillQuery用于查询业务单据。该接口采用POST方法,支持复杂的查询条件和分页参数。以下是元数据配置中的关键字段及其含义:

  • api: executeBillQuery
  • method: POST
  • number: FBillNo
  • id: FEntity_FEntryID

请求参数分为两类:主请求参数和其他请求参数。

主请求参数

这些参数主要用于指定需要查询的字段和过滤条件:

[
  {"field":"FEntity_FEntryID","label":"FEntity_FEntryID","type":"string","describe":"FEntity_FEntryID","value":"FEntity_FEntryID"},
  {"field":"FID","label":"FID","type":"string","describe":"FID","value":"FID"},
  {"field":"FBillNo","label":"单据编号","type":"string","describe":"单据编号","value":"FBillNo"},
  {"field":"FDate","label":"日期","type":"string","describe":"日期","value":"FDate"},
  {"field":"FSaleOrgId_Fname","label":"销售组织名称","type":"string","describe":"销售组织","value":"FSaleOrgId.Fname"},
  // 更多字段...
]
其他请求参数

这些参数用于控制查询行为,如分页、过滤等:

[
  {"field":"Limit","label":"最大行数","type":"int","describe":"金蝶的查询分页参数","value":100},
  {"field":"StartRow","label":"开始行索引","type":"int"},
  {"field":"TopRowCount","label":"返回总行数","type": "int"},
  {"field": "FilterString", "label": "过滤条件", "type": "string", "describe": "示例写法 FSupplierId.FNumber = 'VEN00010' and FApproveDate>=", "value": "FApproveDate>='{{LAST_SYNC_TIME|datetime}}'"},
  {"field": "FieldKeys", "label": "需查询的字段key集合", "type": "array", "parser":{"name": "ArrayToString", "params": ","}},
  {"field": "FormId", "label": "业务对象表单Id", "type": "string", "describe": "必须填写金蝶的表单ID如:PUR_PurchaseOrder", "value": "SAL_DELIVERYNOTICE"}
]

数据请求与清洗

在实际操作中,首先需要构建请求体,将上述配置中的字段映射到实际的API调用中。例如:

{
  "FormId": "SAL_DELIVERYNOTICE",
  "FieldKeys": ["FBillNo", "FDate", ...],
  "FilterString": "FApproveDate>='2023-01-01'",
  // 分页参数
  "Limit": 100,
  ...
}

通过轻易云的数据集成平台,可以方便地将这些配置转化为实际的API调用。平台会自动处理分页、过滤等逻辑,并将结果返回。

数据转换与写入

获取到数据后,需要对其进行清洗和转换,以便后续处理。例如,将日期格式统一、将金额字段转换为标准货币格式等。这些操作可以通过轻易云平台内置的数据转换工具实现。

{
  // 清洗后的数据示例
  {
    "单据编号": "DN20230101001",
    ...
    // 转换后的字段
    ...
  }
}

实际案例

以下是一个实际调用金蝶云星空executeBillQuery接口并处理返回数据的案例:

  1. 构建请求体:

    {
     "FormId": "SAL_DELIVERYNOTICE",
     ...
     // 请求体内容
    }
  2. 调用API:

    response = requests.post(api_url, json=request_body)
    data = response.json()
  3. 数据清洗与转换:

    cleaned_data = clean_data(data)
  4. 将清洗后的数据写入目标系统或存储:

    save_to_target_system(cleaned_data)

通过以上步骤,可以高效地从金蝶云星空获取所需的数据,并进行必要的清洗和转换,为后续的数据处理打下坚实基础。 泛微OA与ERP系统接口开发配置

KD4 查询发货通知单-关联查询的ETL转换与写入

在数据集成生命周期的第二步,我们将已经从源平台集成的数据进行ETL(提取、转换、加载)处理,转化为目标平台轻易云集成平台API接口所能够接收的格式,并最终写入目标平台。本文将重点探讨这一过程中的技术细节和实现方法。

数据提取与清洗

首先,我们需要从源系统中提取发货通知单的数据。在这个阶段,数据可能存在各种不一致性和冗余信息,因此需要进行清洗操作。清洗操作包括但不限于:

  • 去除重复记录
  • 处理缺失值
  • 格式标准化

例如,假设我们从源系统中提取到的原始数据如下:

[
    {"order_id": "123", "shipment_date": "2023-10-01", "status": "delivered"},
    {"order_id": "124", "shipment_date": null, "status": "pending"},
    {"order_id": "123", "shipment_date": "2023-10-01", "status": "delivered"}
]

在清洗过程中,我们会去除重复的订单记录,并处理缺失的发货日期:

[
    {"order_id": "123", "shipment_date": "2023-10-01", "status": "delivered"},
    {"order_id": "124", "shipment_date": "", "status": "pending"}
]

数据转换

接下来,我们需要将清洗后的数据转换为目标平台能够接受的格式。根据元数据配置,目标平台的API接口要求如下:

{
    "api":"写入空操作",
    "effect":"EXECUTE",
    "method":"POST",
    "idCheck":true
}

这意味着我们需要通过HTTP POST方法,将数据发送到指定的API接口,并且在发送之前需要进行ID检查,以确保每条记录都具有唯一标识符。

假设目标平台要求的数据格式如下:

{
    "operation": {
        "type": "insert",
        "data": [
            {"id": 1, "order_id": 123, "shipment_date": "2023-10-01", "status": 1},
            {"id": 2, "order_id": 124, "shipment_date": "", "status": 0}
        ]
    }
}

我们需要对清洗后的数据进行进一步转换,确保字段名和数据类型符合目标平台的要求。例如,将status字段从字符串类型转换为整数类型(1表示已发货,0表示待发货),并为每条记录生成唯一的id

转换后的数据如下:

{
    "operation": {
        "type": "insert",
        "data":[
            {"id": 1, "order_id":"123",     "shipment_date":"2023-10-01",   "status" :1},
            {"id" :2,   "order_id":"124",   "shipment_date":"", "status" :0}
        ]
    }
}

数据加载

最后一步是将转换后的数据通过API接口写入目标平台。根据元数据配置,我们使用HTTP POST方法发送请求:

POST /api/execute HTTP/1.1
Host: target-platform.com
Content-Type: application/json

{
    ...
}

在发送请求之前,需要确保每条记录都经过ID检查,以防止重复插入或更新错误。可以使用数据库或者内存缓存来维护一个已处理ID列表,每次插入新记录前检查该ID是否已经存在。

实现代码示例

以下是一个简单的Python代码示例,展示了如何实现上述ETL过程:

import requests
import json

# 清洗函数示例
def clean_data(data):
    cleaned_data = []
    seen_ids = set()

    for record in data:
        if record['order_id'] not in seen_ids:
            seen_ids.add(record['order_id'])
            record['shipment_date'] = record['shipment_date'] or ""
            cleaned_data.append(record)

    return cleaned_data

# 转换函数示例
def transform_data(data):
    transformed_data = []

    for idx, record in enumerate(data):
        transformed_record = {
            'id': idx + 1,
            'order_id': int(record['order_id']),
            'shipment_date': record['shipment_date'],
            'status': 1 if record['status'] == 'delivered' else 0
        }
        transformed_data.append(transformed_record)

    return {
        'operation': {
            'type': 'insert',
            'data': transformed_data
        }
    }

# 加载函数示例
def load_data(api_url, data):
    headers = {'Content-Type': 'application/json'}

    response = requests.post(api_url, headers=headers, data=json.dumps(data))

    if response.status_code == 200:
        print("Data loaded successfully")
    else:
        print(f"Failed to load data: {response.status_code}")

# 主程序示例
if __name__ == "__main__":
    raw_data = [
        {"order_id":"123","shipment_date":"2023-10-01","status":"delivered"},
        {"order_id":"124","shipment_date":"","status":"pending"},
        {"order_id":"123","shipment_date":"2023-10-01","status":"delivered"}
     ]

     cleaned_data = clean_data(raw_data)
     transformed_data = transform_data(cleaned_data)
     api_url = 'http://target-platform.com/api/execute'

     load_data(api_url, transformed_data)

通过上述步骤和代码示例,我们可以高效地完成从源系统到目标平台的数据ETL转换与加载。这不仅提高了数据处理效率,还确保了数据的一致性和准确性。 数据集成平台API接口配置