利用轻易云进行ETL转换并写入金蝶云星空的实战案例

  • 轻易云集成顾问-曾平安

业务系统销售开单对接到金蝶销售订单-ok

在企业日常运作中,高效的数据集成对于提升生产力和管理效率至关重要。本案例探讨了如何利用轻易云数据集成平台,实现云水聚数据的高效采集与处理,并将其无缝对接到金蝶云星空。具体方案涉及多个模块的精细协作,确保从获取接口数据,到批量写入,再到错误处理和重试机制的各个环节万无一失。

首先,通过调用云水聚提供的API接口/Kingdee/GetSaleOrderList,我们能够定时可靠地抓取最新的销售订单数据。在这个过程中,为确保不漏单,我们设计了一套处理云水聚接口分页与限流问题的方法。这不仅保证了大范围的数据抓取,同时避免了因接口负载过高而导致的数据丢失或延迟。

为了实现大量数据快速写入金蝶云星空系统,我们利用其batchSave API,以批量方式将多条记录一次性传输并存储。这种方法不仅提高了效率,还降低了网络通信成本。然而,这同时也带来一个挑战,即如何处理两者之间的数据格式差异。对此,我们开发了一套定制化的数据映射规则,自动转换不同字段类型,使之符合目标端要求。

此外,在整个对接过程中,实时监控与日志记录功能不可或缺。我们部署了一系列监控节点,对每一步操作进行追踪并记录日志。当出现异常情况时,例如连接超时或响应失败,通过预设置的错误重试机制,可以自动重新发起请求,从容应对各种突发情况,大幅减少人为干预需求,提高系统稳定性。 打通用友BIP数据接口

调用源系统云水聚接口/Kingdee/GetSaleOrderList获取并加工数据

在数据集成的生命周期中,调用源系统接口是至关重要的一步。本文将深入探讨如何使用轻易云数据集成平台调用云水聚接口 /Kingdee/GetSaleOrderList 获取销售订单列表,并对数据进行初步加工。

接口配置与调用

首先,我们需要了解元数据配置中的各个字段及其作用:

{
  "api": "/Kingdee/GetSaleOrderList",
  "effect": "QUERY",
  "method": "GET",
  "number": "orderNo",
  "id": "orderNo",
  "name": "orderNo",
  "idCheck": true,
  "request": [
    {
      "field": "ApproveDate",
      "label": "审核时间",
      "type": "datetime",
      "value": "{{LAST_SYNC_TIME|datetime}}"
    }
  ],
  "autoFillResponse": true
}
  1. API路径/Kingdee/GetSaleOrderList 是我们需要调用的接口路径。
  2. 请求类型GET 表示我们将通过HTTP GET方法请求数据。
  3. 主键字段orderNo 用于标识每条记录的唯一性。
  4. 请求参数:包含一个字段 ApproveDate,表示审核时间,其值为上次同步时间 {{LAST_SYNC_TIME|datetime}}

数据请求与清洗

在调用接口之前,我们需要确保请求参数的正确性。这里的 ApproveDate 参数用于过滤自上次同步以来的新数据。通过动态设置 LAST_SYNC_TIME,可以确保每次只获取增量数据,提升效率。

import requests
from datetime import datetime

# 假设 LAST_SYNC_TIME 已经从系统中获取
last_sync_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')

# 构建请求参数
params = {
    'ApproveDate': last_sync_time
}

# 发起 GET 请求
response = requests.get('https://api.yunshuiju.com/Kingdee/GetSaleOrderList', params=params)

if response.status_code == 200:
    data = response.json()
    # 对返回的数据进行初步清洗和验证
    cleaned_data = []
    for order in data:
        if 'orderNo' in order and order['orderNo']:
            cleaned_data.append(order)
else:
    print(f"Error: {response.status_code}")

数据转换与写入

在获取并清洗了销售订单列表后,我们需要将这些数据转换为目标系统所需的格式,并写入目标数据库或系统。在此过程中,可以利用轻易云平台提供的自动填充响应功能 autoFillResponse: true,简化部分工作。

# 假设目标系统需要的数据格式如下:
transformed_data = []
for order in cleaned_data:
    transformed_order = {
        '订单编号': order['orderNo'],
        '客户名称': order.get('customerName', ''),
        '订单日期': order.get('orderDate', ''),
        # 添加其他必要的字段转换...
    }
    transformed_data.append(transformed_order)

# 将转换后的数据写入目标系统(例如数据库)
import pymysql

connection = pymysql.connect(
    host='localhost',
    user='user',
    password='password',
    database='sales_db'
)

try:
    with connection.cursor() as cursor:
        for order in transformed_data:
            sql = """
                INSERT INTO sales_orders (order_no, customer_name, order_date)
                VALUES (%s, %s, %s)
            """
            cursor.execute(sql, (order['订单编号'], order['客户名称'], order['订单日期']))
        connection.commit()
finally:
    connection.close()

通过上述步骤,我们成功地完成了从源系统云水聚获取销售订单列表、对数据进行清洗和转换,并最终写入目标系统的全过程。这一过程不仅提高了数据处理效率,还确保了数据的一致性和准确性。 如何开发金蝶云星空API接口

使用轻易云数据集成平台进行ETL转换并写入金蝶云星空API接口的技术案例

在数据集成生命周期的第二阶段,我们需要将已经集成的源平台数据进行ETL转换,使其符合目标平台金蝶云星空API接口的格式要求,并最终写入目标平台。本文将详细探讨这一过程中的技术细节,特别是如何配置和使用元数据来实现这一目标。

数据请求与清洗

在数据请求与清洗阶段,我们已经从源系统获取了原始数据,并进行了必要的数据清洗和预处理。这些数据现在准备好进行进一步的ETL转换,以适应金蝶云星空API接口的需求。

元数据配置解析

我们使用以下元数据配置来指导ETL转换过程:

{
  "api": "batchSave",
  "effect": "EXECUTE",
  "method": "POST",
  "number": "FBillNo",
  "id": "FBillNo",
  "name": "FBillNo",
  "idCheck": true,
  "operation": {
    "method": "batchArraySave",
    "rows": 1,
    "rowsKey": "array"
  },
  "request": [
    {
      "field": "FBillTypeID",
      "label": "单据类型",
      ...
    },
    ...
  ],
  ...
}

ETL转换过程

  1. 字段映射与转换

    每个字段都有特定的配置,包括字段名、标签、类型、描述和值。例如,FBillTypeID字段被映射为单据类型,其值通过ConvertObjectParser解析器转换为FNumber格式。

    {
     "field": "FBillTypeID",
     ...
     "parser": {
       "name": "ConvertObjectParser",
       ...
     },
     ...
    }
  2. 动态值替换

    某些字段需要从源数据中动态替换,例如订单编号FBillNo和客户名称FCustId。这些值通常通过占位符表示,如 {orderNo}{agentName},并在运行时替换为实际值。

    {
     "field": "FBillNo",
     ...
     "value": "{orderNo}"
    },
    {
     ...
     "value":"_mongoQuery a0cfff28-620e-3150-bbbd-3ac0b72a3da5 findField=content.FNumber where={\"content.FName\":{\"$eq\":\"{agentName}\"}}"
    }
  3. 数组处理

    对于复杂的数据结构,如订单明细(FSaleOrderEntry),我们需要处理嵌套数组。每个子项也有相应的字段映射和转换规则。例如,物料编码、销售数量和含税单价等字段都需要进行相应的解析和赋值。

    {
     ...
     "children":[
       {
         ...
         {"field":"FMaterialId",...,"value":"{{productItemEntity.productId}}"},
         {"field":"FQty",...,"value":"{{productItemEntity.applyNumer}}"},
         {"field":"FTaxPrice",...,"value":"{{productItemEntity.productPrice}}"},
         ...
       }
     ]
    }
  4. 其他请求参数

    除了主要的数据字段外,还需要配置一些其他请求参数,如业务对象表单ID、执行操作、是否自动提交并审核等。这些参数确保了在调用API时能够正确执行所需操作。

    {
     ...
     {"field":"FormId",...,"value":"SAL_SaleOrder"},
     {"field":"Operation",...,"value":"BatchSave"},
     {"field":"IsAutoSubmitAndAudit",...,"value":"true"},
     {"field":"IsVerifyBaseDataField",...,"value":"true"}
    }

数据写入目标平台

完成ETL转换后,我们使用配置好的元数据通过HTTP POST方法将数据发送到金蝶云星空API接口。以下是一个简化的示例代码片段:

import requests

url = 'https://api.kingdee.com/batchSave'
headers = {'Content-Type': 'application/json'}
data = {
    # 根据元数据配置生成的数据结构
}

response = requests.post(url, headers=headers, json=data)
if response.status_code == 200:
    print('Data successfully written to Kingdee Cloud.')
else:
    print('Failed to write data:', response.content)

通过上述步骤,我们成功地将源平台的数据经过ETL转换后写入到金蝶云星空,实现了不同系统间的数据无缝对接。这一过程不仅提高了业务效率,也确保了数据的一致性和准确性。 金蝶与SCM系统接口开发配置