从数据抓取到MySQL写入:完整的数据集成解决方案

  • 轻易云集成顾问-谢楷斌

金蝶云星空数据集成到MySQL的技术案例分享:MOM销售订单状态刷新

在企业信息系统整合过程中,如何高效、安全地实现跨平台的数据同步已成为一个重要课题。本文将通过具体的实施案例——"MOM销售订单状态刷新",详细介绍金蝶云星空系统与MySQL数据库之间的数据集成解决方案。

数据抓取与API调用

首先,我们需要从金蝶云星空中获取最新的销售订单状态。这一步骤采用其提供的executeBillQuery接口进行数据抓取,该接口具有分页功能,可以有效处理大批量数据。为了确保数据完整性及不漏单,我们设定了一个可靠的定时任务,每隔固定时间段执行一次查询操作,并对每次查询结果进行校验和记录。

# 调用executeBillQuery API示例
import requests

def get_sales_order_status(api_url, params):
    response = requests.post(api_url, json=params)
    response.raise_for_status()
    return response.json()

api_url = "https://your-kingdee-api-endpoint.com/executeBillQuery"
params = {
    # 查询参数设置
}
sales_order_data = get_sales_order_status(api_url, params)

数据转换与格式处理

由于不同系统中的数据结构可能存在差异,需要对获取到的数据进行转换处理。例如,在金蝶云星空中,日期字段可能以字符串形式表示,而在MySQL数据库中则要求严格遵循日期格式规范。我们通过自定义的数据转换逻辑,实现了相应字段类型的一致性映射。

from datetime import datetime

def transform_data(raw_data):
    transformed_data = []
    for entry in raw_data:
        trans_entry = {
            'order_id': entry['OrderID'],
            'status': entry['Status'],
            'update_time': datetime.strptime(entry['UpdateTime'], '%Y-%m-%dT%H:%M:%S')
        }
        transformed_data.append(trans_entry)
    return transformed_data

transformed_sales_order_data = transform_data(sales_order_data)

高吞吐量写入与异常重试机制

针对大量订单状态更新需求,MySQL支持高吞吐量的数据写入能力,使得所有新获取并转换后的订单状态能够迅速存储至数据库。此外,为保证整个过程中的稳定性和可靠性,配置了一套完善的异常处理和重试机制,如果某一批次出现错误,会自动触发重试操作,同时保留详细日志便于后续分析和排查问题。

import mysql.connector
from mysql.connector import errorcode

def insert_into_mysql(data_batch):
    try:
        cnx = mysql.connector.connect(user='username', password='password',
                                      host='127.0.0.1',

![企业微信与OA系统接口开发配置](https://pic.qeasy.cloud/D16.png~tplv-syqr462i7n-qeasy.image)
### 调用金蝶云星空接口executeBillQuery获取并加工数据

在数据集成过程中,调用源系统接口是关键的一步。本文将详细介绍如何通过轻易云数据集成平台调用金蝶云星空的`executeBillQuery`接口,以获取并加工销售订单状态数据。

#### 接口配置与请求参数

首先,我们需要配置接口的元数据,以便正确调用金蝶云星空的API。以下是元数据配置的详细信息:

```json
{
  "api": "executeBillQuery",
  "effect": "QUERY",
  "method": "POST",
  "id": "FSaleOrderEntry_FEntryID",
  "name": "FBillNo",
  "idCheck": true,
  "request": [
    {"field":"FSaleOrderEntry_FEntryID","label":"FSaleOrderEntry_FEntryID","type":"string","describe":"FSaleOrderEntry_FEntryID","value":"FSaleOrderEntry_FEntryID"},
    {"field":"FID","label":"FID","type":"string","describe":"FID","value":"FID"},
    {"field":"FBillNo","label":"单据编号","type":"string","describe":"单据编号","value":"FBillNo"},
    {"field":"FDocumentStatus","label":"单据状态","type":"string","describe":"单据状态","value":"FDocumentStatus"},
    {"field":"FSaleOrgId_FNumber","label":"销售组织","type":"string","describe":"销售组织","value":"FSaleOrgId.FNumber"},
    {"field":"FDate","label":"日期","type":"string","describe":"日期","value":"FDate"},
    {"field":"FCustId_FNumber","label":"客户","type":"string","describe":"客户","value":"FCustId.FNumber"},
    {"field":"FSaleDeptId_Fnumber","label":"销售部门","type":"string","describe":"销售部门","value":"FSaleDeptId.Fnumber"},
    {"field":"FReceiveAddress","label":"收货地址","type":"string","describe":"收货地址","value":"FReceiveAddress"},
    {"field":...}
  ],
  "otherRequest": [
    {"field": "Limit", "label": "最大行数", "type": "string", "describe": "金蝶的查询分页参数", "value": "2000"},
    {"field": "StartRow", "label": "开始行索引", "type": "string", "describe": "金蝶的查询分页参数", "value": "{PAGINATION_START_ROW}"},
    {"field":...}
  ],
  "autoFillResponse": true
}

请求示例

在实际操作中,我们需要构建一个HTTP POST请求,发送到金蝶云星空的API端点。以下是一个示例请求体:

{
  "FormId": "SAL_SaleOrder",
  "FieldKeys": [
    ... // 列出所有需要查询的字段key
  ],
  "FilterString": "(FSupplyOrgId.fnumber in ('T02','T02.01') and FDocumentStatus='C' and FApproveDate>='2023-10-01')",
  ...
}

数据处理与清洗

获取到原始数据后,需要对其进行清洗和转换,以便后续的数据写入和使用。以下是一些常见的数据清洗步骤:

  1. 字段映射:将API返回的数据字段映射到目标系统所需的字段。例如,将FBillNo映射为目标系统中的订单编号。
  2. 数据类型转换:确保所有字段的数据类型符合目标系统要求。例如,将字符串类型的日期转换为日期类型。
  3. 过滤无效数据:去除不符合业务规则或无效的数据记录。例如,过滤掉状态为“已关闭”的订单。

示例代码

以下是一个示例代码片段,用于调用接口并处理返回的数据:

import requests
import json

# 配置请求头和URL
url = 'https://api.kingdee.com/executeBillQuery'
headers = {'Content-Type': 'application/json'}

# 构建请求体
payload = {
  'FormId': 'SAL_SaleOrder',
  'FieldKeys': ['FBillNo', 'FDocumentStatus', ...],
  'FilterString': "(FSupplyOrgId.fnumber in ('T02','T02.01') and FDocumentStatus='C' and FApproveDate>='2023-10-01')",
}

# 发起POST请求
response = requests.post(url, headers=headers, data=json.dumps(payload))

# 检查响应状态码
if response.status_code == 200:
    data = response.json()

    # 数据清洗与转换
    cleaned_data = []
    for record in data:
        cleaned_record = {
            '订单编号': record['FBillNo'],
            '单据状态': record['FDocumentStatus'],
            ...
        }
        cleaned_data.append(cleaned_record)

    # 后续处理逻辑,如写入数据库或其他系统
else:
    print(f"请求失败,状态码: {response.status_code}")

通过上述步骤,我们可以高效地从金蝶云星空获取所需的销售订单状态数据,并进行必要的数据清洗和转换,为后续的数据处理环节打下坚实基础。 用友与MES系统接口开发配置

数据集成过程中ETL转换与MySQLAPI接口写入的技术实现

在数据集成生命周期的第二步,我们需要将已经集成的源平台数据进行ETL(提取、转换、加载)转换,最终写入目标平台MySQL。本文将详细探讨这一过程中的技术细节,特别是如何通过API接口实现这一目标。

数据请求与清洗

首先,我们需要从源系统提取数据并进行初步清洗。这一步骤确保了数据的准确性和一致性,为后续的转换和写入打下基础。以下是元数据配置中关于数据请求的部分:

{
  "api": "execute",
  "effect": "EXECUTE",
  "method": "POST",
  "idCheck": true,
  "request": [
    {
      "field": "main_params",
      "label": "main_params",
      "type": "object",
      "describe": "111",
      "children": [
        {"field": "SO_NUMBER", "label": "SO_NUMBER", "type": "string", "value":"{FBillNo}"},
        {"field": "SO_LINE_SEQ", "label": "SO_LINE_SEQ", "type": "string", "value":"{FSaleOrderEntry_fseq}"},
        {"field": "FMrpCloseStatus", "label": "FMrpCloseStatus", "type":"string","value":"{FMrpCloseStatus}"}
      ]
    }
  ]
}

在这个配置中,我们定义了一个名为main_params的对象,包含三个字段:SO_NUMBERSO_LINE_SEQFMrpCloseStatus。这些字段将从源系统的数据中提取相应的值。

数据转换

接下来是数据转换阶段。我们需要将提取的数据转换为目标平台MySQL能够接收的格式。这一步骤通常涉及到数据类型的转换、字段映射等操作。在我们的案例中,主要通过SQL语句来实现这一目标。

以下是元数据配置中关于SQL语句部分:

{
  "otherRequest": [
    {
      "field": "main_sql",
      "label": "main_sql",
      "type":"string",
      "describe":"111",
      "value":"UPDATE ty_mes.mt_so_line a \nINNER JOIN ( \n SELECT SO_LINE_ID \n FROM ty_mes.mt_so_line \n WHERE so_id = ( \n SELECT so_id \n FROM ty_mes.mt_so_head \n WHERE so_number = :SO_NUMBER\n ) \n AND so_line_num =:SO_LINE_SEQ \n) AS subquery ON a.SO_LINE_ID = subquery.SO_LINE_ID \nSET a.KINGDEE_STATUS = :FMrpCloseStatus"
    }
  ]
}

这段SQL语句实现了以下功能:

  1. 从表ty_mes.mt_so_head中根据销售订单号(:SO_NUMBER)获取对应的销售订单ID。
  2. 根据销售订单ID和行号(:SO_LINE_SEQ)从表ty_mes.mt_so_line中获取对应的行ID。
  3. 更新表ty_mes.mt_so_line中的状态字段(KINGDEE_STATUS),将其设置为新的状态值(:FMrpCloseStatus)。

数据写入

最后一步是将转换后的数据写入目标平台MySQL。在我们的案例中,通过API接口以POST方法执行上述SQL语句,实现数据的最终写入。

以下是API调用示例:

{
  api: 'execute',
  method: 'POST',
  data: {
    main_params: {
      SO_NUMBER: '12345',
      SO_LINE_SEQ: '1',
      FMrpCloseStatus: 'Closed'
    },
    main_sql: `UPDATE ty_mes.mt_so_line a 
                INNER JOIN ( 
                  SELECT SO_LINE_ID 
                  FROM ty_mes.mt_so_line 
                  WHERE so_id = ( 
                    SELECT so_id 
                    FROM ty_mes.mt_so_head 
                    WHERE so_number = :SO_NUMBER
                  ) 
                  AND so_line_num = :SO_LINE_SEQ 
                ) AS subquery ON a.SO_LINE_ID = subquery.SO_LINE_ID 
                SET a.KINGDEE_STATUS = :FMrpCloseStatus`
  }
}

通过上述步骤,我们实现了从源系统到目标平台MySQL的数据集成过程。每个环节都经过精心设计和配置,确保了数据处理的准确性和高效性。 钉钉与WMS系统接口开发配置