面向实战:实现从金蝶云星空到MySQL的ETL流程

  • 轻易云集成顾问-蔡威

案例分享:金蝶云星空数据集成到MySQL

在系统对接集成的实际应用中,如何高效、可靠地将金蝶云星空数据集成到MySQL数据库是一个常见但颇具挑战性的任务。本文将深入探讨通过executeBillQuery API从金蝶云星空获取生产订单相关数据,并使用execute API写入到MySQL数据库的技术细节,从而实现“MOM生产订单-状态更新-结案-结算”的完整流程。

数据抓取与分页限流处理

首先,利用轻易云的数据集成功能,我们通过定时调度机制定期调用金蝶云星空的executeBillQuery接口,以保证及时抓取最新的生产订单数据。在处理大量数据时,分页和限流是两个关键问题。通过自定义接口请求参数,实现分页查询,同时结合限流策略,确保API调用不会超过限制频率,使得大规模数据提取过程顺畅无误。

自定义转换逻辑与格式差异处理

获取原始数据后,为了使其能够适配MySQL数据库的存储需求,需要进行一定程度的数据转换。我们使用轻易云提供的数据转换工具,自定义编写对应逻辑,将金蝶云星空返回的数据格式如JSON、XML等转化为MySQL可识别的表结构,并解决字段类型及值域范围可能存在的不一致问题。此外,通过统一视图控制台实时监控每一步操作,有效防止由于数据信息不匹配导致的数据丢失和错误。

高吞吐量批量写入

针对最终生成的大量生产订单记录,执行高吞吐量批量写入至MySQL。这一过程中借助轻易云支持的大容量并发写入特性,在较短时间内完成海量数据信息落库。从而满足业务系统对实时性和响应速度苛刻要求,大幅提升整体效率。同时借助集中监控和告警体系,可以全程跟踪各项任务执行状态,一旦出现异常状况,即刻触发预设告警机制进行快速定位与修复。

上述方案不仅涵盖了从API接口调用到多样化格式转换,再到批量稳定输入全过程。而其中涉及多个核心技术要点,如如何确保不同平台之间数据的一致性、高效管理API资产以及应对页面/接口限流等,都以实例应用方式作详细讲解。接下来部分,将进一步展开具体实施步骤和代码示例,以帮助读者更深入理解此案例背后的技术细节。 如何开发企业微信API接口

调用金蝶云星空接口executeBillQuery获取并加工数据

在数据集成的生命周期中,调用源系统接口是至关重要的一步。本文将详细探讨如何通过轻易云数据集成平台调用金蝶云星空的executeBillQuery接口,以获取并加工生产订单相关的数据。

接口配置与调用

首先,我们需要配置元数据以便正确调用executeBillQuery接口。以下是该接口的元数据配置:

{
  "api": "executeBillQuery",
  "effect": "QUERY",
  "method": "POST",
  "id": "FEntryId",
  "name": "FBillNo",
  "idCheck": true,
  "request": [
    {"field": "FID", "label": "生产订单表头id", "type": "string", "describe": "实体主键", "value": "FID"},
    {"field": "FEntryId", "label": "实体主键", "type": "string", "value": "FTreeEntity_FEntryId"},
    {"field": "FMaterialId", "label": "成品编号", "type": "string", "value": "FMaterialId.FNumber"},
    {"field": "FQty", "label": "数量", "type": "string", "value":"FQty"},
    {"field":"FPlanStartDate","label":"计划开工时间","type":"string","value":"FPlanStartDate"},
    {"field":"FMTONO","label":"计划跟踪号","type":"string","value":"FMTONO"},
    {"field":"FLot","label":"批号","type":"string","value":"FLot.FNumber"},
    {"field":"FBomId","label":"BOM版本","type":"string","value":"FBomId.FNumber"},
    {"field":"FStatus","label":"业务状态","type":"string","value":"FStatus"},
    {"field":"FBaseUnitId","label":"基本单位","type":"string","value":"FBaseUnitId.FNumber"},
    {"field":"FWorkShopID","label":"生产车间","type":"string","value":"FWorkShopID.FNumber"},
    {"field":"Fseq","label":"生产订单行号","type":"string","value":"FTreeEntity_Fseq"},
    {"field":"FPickMtrlStatus","label":"领料状态","type":"string","value":"FPickMtrlStatus"},
    {"field":"FWorkShopName","label":"车间名称","type":"string","value":{"parser":{"name":"","params":[]}}},
    {"field":{"parser":{"name":"","params":[]}},"label":"","type":"","describe":"","value":{"parser":{"name":"","params":[]}}},
    {"field":{"parser":{"name":"","params":[]}},"label":"","type":"","describe":"","value":{"parser":{"name":"","params":[]}}},
    {"field":{"parser":{"name":"","params":[]}},"label":"","type":"","describe":"","value":{"parser":{"name":"","params":[]}}},
  ],
  ...
}

请求参数详解

在请求参数中,我们需要特别注意以下几个字段:

  • FID: 表示生产订单表头的唯一标识。
  • FEntryId: 实体主键,用于唯一标识每一条记录。
  • FilterString: 用于设置过滤条件,例如:FSupplierId.FNumber = 'VEN00010' and FApproveDate>= '2024-03-01' and FStatus in ('6','7') and FCloseType<>'A'
  • FieldKeys: 指定需要查询的字段集合,格式为数组。

数据请求与清洗

在调用接口后,返回的数据通常是一个复杂的JSON对象。我们需要对这些数据进行清洗和转换,以便后续处理。以下是一个简单的数据清洗示例:

import requests
import json

url = 'https://api.kingdee.com/executeBillQuery'
headers = {'Content-Type': 'application/json'}
payload = {
  'FormId': 'PRD_MO',
  'FieldKeys': ['FID', 'FTreeEntity_FEntryId', 'FMaterialId.FNumber', 'FQty', 'FPlanStartDate'],
  'FilterString':"FSupplierId.FNumber = 'VEN00010' and FApproveDate>= '2024-03-01' and FStatus in ('6','7') and FCloseType<>'A'",
  'Limit': 1000,
  'StartRow': 0
}

response = requests.post(url, headers=headers, data=json.dumps(payload))
data = response.json()

# 数据清洗
cleaned_data = []
for entry in data:
  cleaned_entry = {
      'OrderID': entry['FID'],
      'MaterialID': entry['FTreeEntity_FEntryId'],
      'Quantity': entry['FQty'],
      'StartDate': entry['FPlanStartDate']
  }
  cleaned_data.append(cleaned_entry)

print(cleaned_data)

数据转换与写入

清洗后的数据需要进行进一步的转换,并写入目标系统。例如,将数据写入数据库或传递给下游系统。以下是一个将清洗后的数据写入数据库的示例:

import sqlite3

# 创建数据库连接
conn = sqlite3.connect('production_orders.db')
cursor = conn.cursor()

# 创建表格
cursor.execute('''
CREATE TABLE IF NOT EXISTS Orders (
    OrderID TEXT PRIMARY KEY,
    MaterialID TEXT,
    Quantity INTEGER,
    StartDate TEXT
)
''')

# 插入数据
for entry in cleaned_data:
  cursor.execute('''
      INSERT INTO Orders (OrderID, MaterialID, Quantity, StartDate)
      VALUES (?, ?, ?, ?)
      ON CONFLICT(OrderID) DO UPDATE SET 
          MaterialID=excluded.MaterialID,
          Quantity=excluded.Quantity,
          StartDate=excluded.StartDate;
      ''', (entry['OrderID'], entry['MaterialID'], entry['Quantity'], entry['StartDate']))

# 提交事务并关闭连接
conn.commit()
conn.close()

通过上述步骤,我们实现了从金蝶云星空获取生产订单数据,并对其进行清洗、转换和写入。这一过程展示了轻易云数据集成平台在处理异构系统数据时的强大能力。 如何对接金蝶云星空API接口

使用轻易云数据集成平台进行ETL转换并写入MySQLAPI接口

在数据集成生命周期的第二步中,我们需要将已经集成的源平台数据进行ETL(提取、转换、加载)转换,并转为目标平台 MySQLAPI 接口所能够接收的格式,最终写入目标平台。以下是详细的技术案例,展示如何使用元数据配置来实现这一过程。

1. 数据请求与清洗

首先,我们需要从源平台提取数据,并对其进行必要的清洗和预处理。假设我们已经完成了这一阶段,接下来进入数据转换与写入阶段。

2. 数据转换与写入

在这个阶段,我们将使用提供的元数据配置,将清洗后的数据转换为目标平台 MySQLAPI 接口所能接受的格式,并最终写入目标数据库。

元数据配置解析

根据提供的元数据配置,我们需要处理以下字段:

  • main_params:主要参数
  • extend_params_1:扩展参数
  • main_sql:主要SQL语句
  • extend_sql_1:扩展子表语句

每个字段包含多个子字段和相应的值,这些值可能是静态值,也可能是通过函数计算得出的动态值。

主要参数处理
{
    "field": "main_params",
    "label": "main_params",
    "type": "object",
    "describe": "111",
    "children": [
        {
            "field": "mo_code",
            "label": "生产订单号",
            "type": "string",
            "value": "_function CONCAT(\"{{生产订单号}}\",'_',\"{Fseq}\")"
        },
        {
            "field": "FStatus",
            "label": "业务状态",
            "type": "string",
            "value": "{FStatus}"
        },
        {
            "field": "FCloseType",
            "label": "结案类型",
            "type": "string",
            "value": "{FCloseType}"
        },
        {
            "field": "status",
            "label": "status",
            "type": "string",
            "value": "_function CASE ({FQty}-{FNoStockInQty})>0 WHEN true THEN 'COMPLETED' ELSE 'CLOSED' END"
        }
    ]
}

上述配置中,mo_code 是通过连接生产订单号和序列号生成的字符串,status 则是通过计算库存数量差值来决定订单状态。

扩展参数处理
{
    "field": "extend_params_1",
    ...
}

扩展参数与主要参数类似,包含相同的子字段和计算逻辑,用于进一步的数据处理需求。

SQL语句处理
{
    ...
    {
        "field": "main_sql",
        ...
        {
            ...
            value: "
                update ty_mes.mt_work_order 
                set KINGDEE_STATUS=:FStatus,
                    KINGDEE_FCloseType=:FCloseType,
                    status=:status 
                where MAKE_ORDER_NUM=:mo_code"
        }
    },
    {
        ...
        {
            ...
            value: "
                update ty_aps.mt_make_order 
                set KINGDEE_STATUS=:FStatus,
                    KINGDEE_FCloseType=:FCloseType,
                    make_order_status=:status 
                where MAKE_ORDER_NUM=:mo_code"
        }
    }
}

主要SQL语句和扩展子表语句分别用于更新不同表中的记录,根据前面步骤中生成的数据字段进行更新操作。

实际操作步骤

  1. 提取数据:从源平台提取所需的数据。
  2. 清洗数据:对提取的数据进行必要的清洗和预处理。
  3. 转换数据
    • 使用元数据配置中的函数和逻辑对字段进行转换。
    • 生成符合目标平台 MySQLAPI 接口要求的数据格式。
  4. 写入数据库
    • 执行主要SQL语句,将转换后的数据写入主表。
    • 执行扩展子表语句,将相关数据写入扩展表。

示例代码片段

以下是一个示例代码片段,用于展示如何使用上述元数据配置进行实际操作:

import requests

# 假设已经完成了数据提取和清洗
data = {
    '生产订单号': 'MO12345',
    'Fseq': '001',
    'FStatus': 'Completed',
    'FCloseType': 'Normal',
    'FQty': 100,
    'FNoStockInQty': 0
}

# 转换字段
mo_code = f"{data['生产订单号']}_{data['Fseq']}"
status = 'COMPLETED' if (data['FQty'] - data['FNoStockInQty']) > 0 else 'CLOSED'

# 准备SQL语句
main_sql = f"""
update ty_mes.mt_work_order 
set KINGDEE_STATUS='{data['FStatus']}',
    KINGDEE_FCloseType='{data['FCloseType']}',
    status='{status}' 
where MAKE_ORDER_NUM='{mo_code}'
"""

extend_sql_1 = f"""
update ty_aps.mt_make_order 
set KINGDEE_STATUS='{data['FStatus']}',
    KINGDEE_FCloseType='{data['FCloseType']}',
    make_order_status='{status}' 
where MAKE_ORDER_NUM='{mo_code}'
"""

# 执行SQL语句(假设使用requests库发送POST请求)
response_main = requests.post('http://target-mysql-api/execute', data={'sql': main_sql})
response_extend = requests.post('http://target-mysql-api/execute', data={'sql': extend_sql_1})

# 检查响应状态
if response_main.status_code == 200 and response_extend.status_code == 200:
    print("Data successfully written to the target platform.")
else:
    print("Failed to write data to the target platform.")

通过上述步骤,我们成功地将源平台的数据经过ETL转换后写入到目标平台 MySQLAPI 接口。这一过程充分利用了轻易云数据集成平台提供的元数据配置,实现了高效、准确的数据集成。 如何开发钉钉API接口