数据集成最佳实践:从易仓到金蝶云星空的ETL过程

  • 轻易云集成顾问-胡秀丛

易仓数据集成到金蝶云星空:Done-易仓-获取FBA头程数据--->金蝶-分步式调入单(首次)

在进行易仓(Yooso)与金蝶云星空(Kingdee Cloud Cosmic)的系统对接过程中,我们选择使用轻易云平台提供的全透明可视化操作界面,确保整个流程的高效和准确。本次技术案例将详细介绍如何通过轻松调用API接口实现从易仓获取FBA头程数据并成功写入到金蝶云星空。

获取及处理易仓数据

首先,通过调用getFbaShipment API接口,定时可靠地抓取源自易仓的FBA头程数据。由于该接口返回的数据量较大,分页和限流问题成为我们需要解决的重要环节。为此,我们构建了一个高效分页处理机制,以应对批量数据抓取过程中的性能挑战。

# 调用 getFbaShipment 接口并处理分页逻辑
def fetch_fba_data(page_number, page_size):
    response = requests.get(f'https://api.yoosocloud.com/getFbaShipment?page={page_number}&size={page_size}')
    data = response.json()
    return data['shipments']

# 示例代码仅供参考,请根据实际情况进行调整。

转换与映射至金蝶云星空格式

随着数据从易仓被顺利拉取下来,下一个关键步骤是将这些原始信息转换为符合金蝶云星空要求的数据格式。这一过程中,对字段名称、类型、结构等方面进行了精准的转换,并预先定义好了对应关系以保障无缝对接。

{
  "shipment_id": "external_Id",
  "warehouse_code": "stockPlaceId"
}

批量写入到金蝶云星空

对于已经格式化后的数据,我们利用batchSave API实现快速而稳定的大规模批量写入操作。在此期间,我们特别注意了异常处理与错误重试机制。例如,在网络波动或API响应超时等情况下,会自动重试提交请求,以最大程度上确保不会漏单。

def save_to_kingdee(data_batch):
    try:
        response = requests.post('https://api.kingdeeingcloud.com/batchSave', json=data_batch)
        if response.status_code == 200:
            print("Data successfully saved to Kingdee Cloud.")
        else:
            handle_error(response)
    except Exception as e:
        log_error(e)
        retry_save(data_batch)   

本篇文章开门见山,从描述具体技术方案开始,不啰嗦不偏题。其中涉及到的一些特性如实时监控、日志记录及自定义映射等等,将 如何对接用友BIP接口

调用源系统易仓接口getFbaShipment获取并加工数据

在数据集成生命周期的第一步中,调用源系统接口获取数据是至关重要的一环。本文将详细探讨如何通过轻易云数据集成平台调用易仓接口getFbaShipment,并对获取的数据进行初步加工。

接口配置与调用

首先,我们需要了解如何配置和调用getFbaShipment接口。根据提供的元数据配置,我们可以看到该接口的主要参数和请求方式:

{
  "api": "getFbaShipment",
  "effect": "QUERY",
  "method": "POST",
  "number": "{shipment_id}{updated_at}",
  "id": "{shipment_id}{platform_status}",
  "idCheck": true,
  "request": [
    {"field": "page", "label": "page", "type": "int", "value": "1"},
    {"field": "page_size", "label": "page_size", "type": "int", "value": "50"},
    {"field": "updateFor", "label": "更新时间-开始", "type": "string", 
        "value":"{{LAST_SYNC_TIME|datetime}}"},
    {"field": "updateTo", "label":"更新时间-结束","type":"string",
        "value":"{{CURRENT_TIME|datetime}}"},
    {"field":"platformStatus","label":"货件状态","type":"string",
        "value":"CLOSED,RECEIVING","parser":{"name":"StringToArray","params":","}},
    {"field":"createFor","label":"创建开始时间","type":"string",
        "value":"2024-04-01 00:00:00"}
  ],
  "autoFillResponse": true,
  "condition_bk":[[]]
}

参数解析

  1. 请求方式:该接口使用POST方法进行数据请求。
  2. 分页参数pagepage_size分别表示当前页码和每页的数据量,默认值为1和50。
  3. 时间参数updateForupdateTo分别表示数据更新时间的起始和结束时间,这两个参数通过模板变量动态生成,确保每次同步时都能获取最新的数据。
  4. 状态参数platformStatus表示货件状态,支持多个状态值,通过逗号分隔,并在请求时转换为数组。
  5. 创建时间createFor表示货件的创建开始时间,用于过滤特定时间段内的数据。

数据请求与清洗

在实际操作中,我们需要根据上述配置发送请求,并对返回的数据进行清洗。以下是一个示例代码片段,展示了如何通过轻易云平台实现这一过程:

import requests
import datetime

# 动态生成时间参数
last_sync_time = datetime.datetime.now() - datetime.timedelta(days=1)
current_time = datetime.datetime.now()

# 请求参数
payload = {
    'page': 1,
    'page_size': 50,
    'updateFor': last_sync_time.strftime('%Y-%m-%d %H:%M:%S'),
    'updateTo': current_time.strftime('%Y-%m-%d %H:%M:%S'),
    'platformStatus': ['CLOSED', 'RECEIVING'],
    'createFor': '2024-04-01 00:00:00'
}

# 发起POST请求
response = requests.post('https://api.yourwarehouse.com/getFbaShipment', json=payload)

# 检查响应状态
if response.status_code == 200:
    data = response.json()

    # 数据清洗逻辑
    cleaned_data = []
    for item in data['results']:
        cleaned_item = {
            'shipment_id': item['shipment_id'],
            'status': item['platform_status'],
            'updated_at': item['updated_at'],
            # 添加其他需要的字段
        }
        cleaned_data.append(cleaned_item)

    # 返回清洗后的数据
    print(cleaned_data)
else:
    print(f"Error: {response.status_code}")

数据转换与写入

在完成数据请求与清洗后,我们需要将处理后的数据转换为目标系统所需的格式,并写入目标系统。在本案例中,目标系统是金蝶,需要将清洗后的数据写入金蝶的分步式调入单。

def transform_and_write_to_kingdee(cleaned_data):
    transformed_data = []

    for item in cleaned_data:
        transformed_item = {
            'KingdeeField1': item['shipment_id'],
            'KingdeeField2': item['status'],
            'KingdeeField3': item['updated_at'],
            # 转换其他字段
        }
        transformed_data.append(transformed_item)

    # 假设有一个函数write_to_kingdee用于写入金蝶系统
    write_to_kingdee(transformed_data)

# 调用转换与写入函数
transform_and_write_to_kingdee(cleaned_data)

通过上述步骤,我们实现了从易仓获取FBA头程数据并加工处理,再将其写入金蝶系统的完整流程。这一过程不仅确保了数据的一致性和准确性,还极大地提升了业务流程的自动化水平。 电商OMS与ERP系统接口开发配置

使用轻易云数据集成平台将源数据转换并写入金蝶云星空API接口

在数据集成的生命周期中,ETL(提取、转换、加载)过程是至关重要的一环。本文将深入探讨如何使用轻易云数据集成平台,将已集成的源平台数据进行ETL转换,并最终写入金蝶云星空API接口。

配置元数据

为了实现数据从源平台到金蝶云星空的无缝对接,我们需要详细配置元数据。以下是关键字段及其配置说明:

  1. 单据类型 (FBillTypeID):

    • 类型:字符串
    • 描述:标准分步式调入单-FBDR01_SYS
    • 配置:"value": "FBDR01_SYS"
  2. 货件编号 (F_TLQG_TextHJBH):

    • 类型:字符串
    • 值:"{shipment_id}"
  3. 单据编号 (FBillNo):

    • 类型:字符串
    • 描述:单据编号
  4. 调入库存组织 (FStockOrgID):

    • 类型:字符串
    • 描述:调入库存组织
    • 配置:
      {
      "parser": {"name": "ConvertObjectParser", "params": "FNumber"},
      "value": "_findCollection find FStockOrgID_FNumber from dce1240e-9a5b-38ea-971a-1a29ee6d6713 where F_TLQG_TextHJBH={shipment_id}"
      }
  5. 调出库存组织 (FStockOutOrgID):

    • 类型:字符串
    • 描述:调出库存组织
    • 配置:
      {
      "parser": {"name": "ConvertObjectParser", "params": "FNumber"},
      "value": "_findCollection find FOwnerIdHead from dce1240e-9a5b-38ea-971a-1a29ee6d6713 where F_TLQG_TextHJBH={shipment_id}"
      }
  6. 明细信息 (FSTKTRSINENTRY):

    • 类型:数组
    • 描述:明细信息
    • 子字段配置:
      [
      {
       "field": "FMaterialID",
       "label": "物料编码",
       "type": "string",
       "describe": "物料编码",
       "parser": {"name": "ConvertObjectParser", "params": "FNumber"},
       "value": "_findCollection find FMaterialId_FNumber from 6019c615-1c31-3860-ac7f-b8508e4573cc where FCustMatNo={{products.warehouse_sku}}"
      },
      {
       "field": "FDestStockID",
       "label": "调入仓库",
       ...
      }
      ]

数据转换与写入

在完成元数据配置后,下一步是将这些配置应用于实际的数据转换过程,并通过API接口将转换后的数据写入金蝶云星空。

  1. 调用API接口: 使用POST方法调用batchSave API,确保所有必要字段都已正确映射和转换。

  2. 提交并审核: 设置IsAutoSubmitAndAudittrue,以便在数据写入后自动提交并审核。

  3. 验证基础资料: 设置IsVerifyBaseDataFieldtrue,确保所有基础资料的有效性。

  4. 操作标识: 设置操作标识为STK_InvCheckResult,以便追踪和验证操作结果。

示例代码

以下是一个示例请求体,用于将转换后的数据通过API接口写入金蝶云星空:

{
  "FormId": "STK_TRANSFERIN",
  "Operation": "BatchSave",
  ...
  "Model": {
    ...
    // 主表字段配置
    ...
    // 明细表字段配置
    ...
  },
  ...
}

通过上述步骤和配置,我们可以高效地将源平台的数据进行ETL转换,并无缝地写入到金蝶云星空,实现系统间的数据集成和业务流程自动化。 电商OMS与WMS系统接口开发配置

更多系统对接方案