轻易云平台上的ETL过程:从数据提取到写入的完整实现

  • 轻易云集成顾问-吴伟

金蝶云星空数据集成到轻易云:MOM-物料成本核算单信息查询的高效实现

在企业信息化系统中,跨平台的数据对接和集成是一项复杂但又至关重要的任务。本文将分享一个关于金蝶云星空与轻易云数据集成平台的具体案例,通过实际应用展示如何利用这两个强大的工具,实现MOM-物料成本核算单信息的高效查询。

数据接口背景

本次案例以executeBillQuery为主要获取接口,从金蝶云星空提取物料成本核算单的信息,并通过轻易云集成平台进行处理和写入操作。这一过程需要解决多重技术挑战,包括数据吞吐量、实时监控、异常处理,以及不同系统间的数据格式差异等。

方案实施概要

  1. 高吞吐量支持 为了应对大批量数据快速传输需求,我们采用了分布式架构设计,使得大量物料成本核算单数据可以迅速从金蝶云星空流向轻易云,保证整个流程中的时效性要求。

  2. API资产管理 利用轻易云的平台优势,通过统一视图管理API资产,可全面掌握各调用接口及其使用情况,提高资源利用效率。此外,这种集中式管理还能方便后续优化配置,为业务扩展提供良好基础。

  3. 实时监控与异常检测 集中化监控和告警机制是此次项目的重要保障。在整个集成过程中,所有API调用状态和性能均被实时跟踪,一旦发现任何异常,无论是网络延迟还是返回错误代码,都能及时捕获并触发相应重试机制,确保任务顺利完成。

  4. 自定义转换逻辑 针对两套系统间存在的数据结构差异,我们开发了特定的数据转换逻辑模块。这些模块根据企业独特业务需求,对不同字段进行映射和调整,从而实现无缝衔接,同时也使后期维护更加简便直观。

  5. 可视化数据流设计工具 在整体实施过程中,可视化的数据流设计工具发挥了关键作用。通过这一工具,不仅可以清晰地展示每一步操作,还能即时修改流程,如调整抓取频率或新增条件筛选,以适配不断变更的业务场景。

以上即是该项目启动阶段的一些核心技术要点,在随后的部分我们将详细探讨具体执行步骤及代码示例等细节问题,以全面呈现此成功案例背后的技术实践。 如何开发企业微信API接口

调用金蝶云星空接口executeBillQuery获取并加工数据

在轻易云数据集成平台中,调用源系统接口是数据集成生命周期的第一步。本文将深入探讨如何通过调用金蝶云星空的executeBillQuery接口来获取并加工数据。

接口配置与请求参数

在元数据配置中,executeBillQuery接口被定义为一个POST请求,用于查询物料成本核算单信息。以下是请求参数的详细配置:

  • api: executeBillQuery
  • effect: QUERY
  • method: POST
  • number: FBillNo
  • id: F_Materialid
  • name: FBillNo

请求参数分为两部分:基础请求参数和其他请求参数。

基础请求参数

基础请求参数包括实体主键、单据编号、成品编码和组织等字段。这些字段用于指定查询条件和返回结果的格式。

[
  {"field":"FID","label":"实体主键","type":"string","describe":"实体主键","value":"FID"},
  {"field":"FBillNo","label":"单据编号","type":"string","describe":"单据编号","value":"FBillNo"},
  {"field":"F_Materialid","label":"成品编码","type":"string","value":"F_Materialid.fnumber"},
  {"field":"F_OrgId","label":"组织","type":"string","value":"F_OrgId.fnumber"}
]
其他请求参数

其他请求参数主要用于分页、过滤和指定查询字段等高级操作。

[
  {"field":"Limit","label":"最大行数","type":"string","describe":"金蝶的查询分页参数","value":"3000"},
  {"field":"StartRow","label":"开始行索引","type":"string","describe":"金蝶的查询分页参数","value":"{PAGINATION_START_ROW}"},
  {"field":"TopRowCount","label":"返回总行数","type":"int","describe":"金蝶的查询分页参数"},
  {"field":"FilterString","label":"过滤条件","type":"string","describe":"示例写法 FSupplierId.FNumber = 'VEN00010' and FApproveDate>=","value": "F_ora_ModifyDate>='{{LAST_SYNC_TIME|datetime}}'"},
  {"field": "FieldKeys", "label": "需查询的字段key集合", "type": "array", "describe": "金蝶分录主键ID格式:FPOOrderEntry_FEntryId,其它格式 FPurchaseOrgId.FNumber", "parser": {"name": "ArrayToString", "params": ","}},
  {"field": "FormId", "label": "业务对象表单Id", "type": "string", "describe": "必须填写金蝶的表单ID如:PUR_PurchaseOrder", "value": "kb18628e4a5cd41609782b5678d51a6a1"}
]

数据获取与处理

调用executeBillQuery接口时,首先需要构建一个完整的POST请求。以下是一个示例请求体:

{
  "FormId": "kb18628e4a5cd41609782b5678d51a6a1",
  "FieldKeys": ["FID", "FBillNo", "F_Materialid.fnumber", "F_OrgId.fnumber"],
  "FilterString": "F_ora_ModifyDate>='2023-01-01T00:00:00'",
  "Limit": 3000,
  "StartRow": 0
}

该请求体包含了表单ID、需要查询的字段集合、过滤条件以及分页参数。通过这种方式,可以灵活地控制查询范围和结果集大小。

自动填充响应与接管机制

为了确保数据获取过程中的高效性和可靠性,轻易云平台提供了自动填充响应和接管机制。在元数据配置中,autoFillResponse被设置为true,这意味着平台会自动处理并填充响应数据。此外,还定义了一个接管机制,当定时任务触发时(每5分钟一次),会自动调整过滤条件以确保数据同步的连续性。

{
  "crontab": "*\/5 * * * *",
  "takeOverRequest":[
    {
      "field": "FilterString",
      "value": "F_ora_ModifyDate>='{{MINUTE_AGO_20|datetime}}'",
      "type": "string",
      ...
    }
  ]
}

实际应用案例

假设我们需要从金蝶云星空中获取最近20分钟内修改过的物料成本核算单信息,我们可以利用上述配置进行如下操作:

  1. 构建初始请求体,设置适当的过滤条件。
  2. 调用executeBillQuery接口并获取初始数据。
  3. 平台自动处理响应,并根据定时任务继续获取后续数据。

通过这种方式,可以实现对源系统数据的高效、连续采集,为后续的数据清洗、转换与写入奠定坚实基础。

综上所述,通过合理配置元数据并调用金蝶云星空接口,可以高效地获取所需业务数据,为整个数据集成生命周期提供有力支持。 钉钉与WMS系统接口开发配置

使用轻易云数据集成平台进行ETL转换和数据写入

在轻易云数据集成平台中,数据处理的第二步是将已经集成的源平台数据进行ETL(Extract, Transform, Load)转换,并将其转为目标平台所能够接收的格式,最终写入目标平台。本文将详细探讨如何利用轻易云提供的API接口和元数据配置来实现这一过程。

数据提取与清洗

首先,从源系统中提取原始数据并进行必要的清洗操作。这一步骤确保了数据的一致性和准确性,为后续的转换和写入打下坚实基础。

{
  "data": [
    {
      "number": "001",
      "id": "12345",
      "name": "物料A"
    },
    {
      "number": "002",
      "id": "67890",
      "name": "物料B"
    }
  ]
}

数据转换

接下来,我们需要将清洗后的数据进行格式转换,以符合目标平台API接口的要求。根据提供的元数据配置,目标API接口为写入空操作,请求方法为POST。以下是一个示例代码片段,用于将源数据转换为目标格式:

import requests
import json

# 源数据
source_data = [
    {"number": "001", "id": "12345", "name": "物料A"},
    {"number": "002", "id": "67890", "name": "物料B"}
]

# 目标API接口元数据配置
metadata = {
    "api": "/api/writeEmptyOperation",
    "effect": "EXECUTE",
    "method": "POST",
    "number": "number",
    "id": "id",
    "name": "编码",
    "idCheck": True
}

# 转换后的数据列表
transformed_data = []

for item in source_data:
    transformed_item = {
        metadata["number"]: item["number"],
        metadata["id"]: item["id"],
        metadata["name"]: item["name"]
    }

    # 添加到转换后的数据列表中
    transformed_data.append(transformed_item)

# 打印转换后的数据以供调试
print(json.dumps(transformed_data, indent=2, ensure_ascii=False))

数据写入

完成转换后,将这些数据通过API接口写入目标平台。以下是一个示例代码片段,用于执行HTTP POST请求,将转换后的数据发送到目标API接口:

# 目标API URL
api_url = f"http://target-platform.com{metadata['api']}"

# HTTP 请求头部信息
headers = {
    'Content-Type': 'application/json'
}

# 执行POST请求,将转换后的数据写入目标平台
response = requests.post(api_url, headers=headers, data=json.dumps(transformed_data))

# 检查响应状态码以确定是否成功写入
if response.status_code == 200:
    print("Data successfully written to the target platform.")
else:
    print(f"Failed to write data. Status code: {response.status_code}")

实时监控与错误处理

在实际操作中,实时监控和错误处理是确保整个ETL过程顺利进行的重要环节。可以通过轻易云提供的监控工具实时查看每个步骤的数据流动和处理状态。如果出现错误,可以根据返回的状态码和错误信息进行相应处理。

if response.status_code != 200:
    error_info = response.json()
    print(f"Error occurred: {error_info['message']}")

通过上述步骤,我们可以高效地完成从源系统到目标平台的数据ETL转换和写入。这不仅提升了业务透明度和效率,也确保了不同系统间的数据无缝对接。 用友与WMS系统接口开发配置