ETL转换与MySQL数据库写入的技术实现

  • 轻易云集成顾问-林峰

金蝶云星空数据集成到MySQL技术案例分享:5金蝶生产订单对接商城

在实际业务中,企业经常需要将多个系统的数据高效整合,以实现业务流程的自动化和信息共享。本次我们聚焦于如何通过轻易云数据集成平台,将金蝶云星空的生产订单数据高效对接到MySQL数据库,为商城系统提供实时准确的数据支持。本案例具体方案为:5金蝶生产订单对接商城。

技术要点

  1. 调用金蝶云星空接口获取数据:为了从金蝶云星空中提取生产订单,我们使用了executeBillQuery API。此API能够按需查询指定时间段内的新建或更改的生产订单,并且支持分页处理,以解决大型数据集带来的压力问题。

  2. 定时可靠抓取与批量写入:为了确保数据的不漏单和及时更新,我们设定了一个定时任务,通过executeBillQuery API周期性地从金蝶云星空拉取新订单。同时,利用轻易云强大的批量写入功能,将这些订单一次性高效地插入到MySQL数据库中。这不仅提高了吞吐量,也避免了频繁小批量操作带来的性能开销。

  3. 处理接口分页和限流

    • 分页机制使得我们能分阶段逐步获取大规模的数据,而不会因超过接口返回限制而丢失任何信息。
    • 通过设置合理的请求间隔和重试机制,有效应对API限流,实现稳定、可靠的数据传输。
  4. 自定义转换逻辑与格式差异适配

    • 在实际应用过程中,不同系统之间难免存在数据格式差异。例如,日期格式、字段命名等。在本案例中,通过自定义转换逻辑,使得从金蝶云星空获取的数据能无缝映射并存储到MySQL表结构中。
  5. 监控及告警体系

    • 集中的监控系统实时跟踪每个集成任务的状态。当出现异常时,即刻触发告警通知相关人员进行排查,大幅度降低事故响应时间并提升整体运维效率。
  6. 异常处理与错误重试机制

    • 在整个数据传输过程中不可避免会遇到网络波动、服务不可用等问题。为此,我们设计了一套完整的错误重试机制,当某些操作失败时,可以根据预设策略自动重新尝试,从而保证最终一致性。
  7. 日志记录与可视化管理工具

    • 全程采用详尽日志记录,每一步都可以追溯,这对于后续维护和问题定位至关重要。同时,通过可 如何对接企业微信API接口

      调用金蝶云星空接口executeBillQuery获取并加工数据

在数据集成过程中,调用源系统的API接口是至关重要的一步。本文将详细探讨如何通过轻易云数据集成平台调用金蝶云星空的executeBillQuery接口来获取生产订单数据,并进行初步加工。

配置元数据

首先,我们需要配置元数据,以便正确调用金蝶云星空的API接口。以下是元数据配置的关键字段:

{
  "api": "executeBillQuery",
  "method": "POST",
  "number": "FBillNo",
  "id": "FTreeEntity_FEntryId",
  "pagination": {
    "pageSize": 500
  },
  "idCheck": true,
  "request": [
    {"field":"FID","label":"实体主键","type":"string","value":"FID"},
    {"field":"FBillNo","label":"单据编号","type":"string","value":"FBillNo"},
    {"field":"FTreeEntity_FEntryId","label":"FTreeEntity_FEntryId","type":"string","value":"FTreeEntity_FEntryId"},
    {"field":"FTreeEntity_Fseq","label":"FTreeEntity_Fseq","type":"string","value":"FTreeEntity_Fseq"},
    {"field":"FCREATORID","label":"创建人","type":"string","value":"FCREATORID.FName"},
    {"field":"FAPPROVERID","label":"审核人","type":"string","value":"FAPPROVERID.FName"},
    {"field":"FCREATEDATE","label":"创建日期","type":"string","value":"FCREATEDATE"},
    {"field":"FMATERIALID_FName","label":"物料客户编码","type":"string","value":"FMATERIALID.FName"},
    {"field":"FAPPROVEDATE","label":"审核日期","type":"string","value":"FAPPROVEDATE"},
    {"field":"FMATERIALID_FDescription","label":"物料规格型号","type":"string","value":"FMATERIALID.FDescription"},
    {"field":"FWorkShopID_FName","label":"生产车间","type":"","value":"","describe":""},
    {"field":"","label":"","type":"","describe":"","value":""}
  ],
  "otherRequest": [
    {"field":"","label":"","type":"","describe":"","value":""},
    {"field":"","label":"","type":"","describe":"","value":""},
    {"field":"","label":"","type":"","describe":"","value":""},
    {"field":"","label":"","type":"","describe":"","value":""}
  ]
}

请求参数

在请求参数中,我们需要关注以下几个关键字段:

  • FormId: 表单ID,这里我们使用PRD_MO表示生产订单。
  • FieldKeys: 查询的字段集合,需要将所有需要查询的字段以逗号分隔的字符串形式传递。
  • FilterString: 用于过滤查询结果,例如可以根据时间戳和状态过滤。

示例请求体如下:

{
  "FormId": "PRD_MO",
  "FieldKeys": [
      "FID", 
      "FBillNo", 
      "FTreeEntity_FEntryId", 
      ...
      ],
  "FilterString": "FConveyDate>='{{LAST_SYNC_TIME|dateTime}}' and FStatus in(3,4)",
  ...
}

调用接口

通过轻易云平台,我们可以使用配置好的元数据和请求参数来调用金蝶云星空的executeBillQuery接口。具体步骤如下:

  1. 设置分页参数:由于可能存在大量数据,我们需要设置分页参数,如每页500条记录。
  2. 构建请求体:根据元数据配置构建请求体,包括表单ID、字段集合和过滤条件等。
  3. 发送请求:使用POST方法发送请求到金蝶云星空API。
  4. 处理响应:解析返回的数据,并根据业务需求进行初步加工。

示例代码片段:

import requests

url = 'https://api.kingdee.com/executeBillQuery'
headers = {'Content-Type': 'application/json'}
payload = {
    'FormId': 'PRD_MO',
    'FieldKeys': 'FID,FBillNo,FTreeEntity_FEntryId,...',
    'FilterString': f"FConveyDate>='{last_sync_time}' and FStatus in(3,4)",
    'Limit': pagination['pageSize'],
    'StartRow': start_row
}

response = requests.post(url, json=payload, headers=headers)
data = response.json()

# 数据处理逻辑
for record in data:
    process_record(record)

数据加工

获取到原始数据后,需要对其进行初步加工。例如,可以根据业务需求对特定字段进行格式转换或计算衍生值。以下是一个简单的数据加工示例:

def process_record(record):
    # 转换日期格式
    record['FCREATEDATE'] = convert_date_format(record['FCREATEDATE'])

    # 根据业务状态生成描述
    status_map = {
        '1': '计划',
        '2': '计划确认',
        ...
        }

    record['FStatusDesc'] = status_map.get(record['FStatus'], '未知状态')

# 日期格式转换函数示例
def convert_date_format(date_str):
    from datetime import datetime
    return datetime.strptime(date_str, '%Y-%m-%dT%H:%M:%S').strftime('%Y-%m-%d %H:%M:%S')

通过上述步骤,我们可以高效地从金蝶云星空获取生产订单数据,并进行必要的数据加工,为后续的数据集成和分析打下坚实基础。 钉钉与MES系统接口开发配置

数据集成生命周期第二步:ETL转换与写入MySQLAPI接口

在数据集成生命周期的第二步,我们重点关注如何将已经集成的源平台数据进行ETL转换,并转为目标平台MySQL API接口所能够接收的格式,最终写入目标平台。本文将详细解析这一过程中的技术细节和实现方法。

元数据配置解析

在本案例中,我们需要将金蝶生产订单的数据对接到商城系统,并最终写入MySQL数据库。以下是元数据配置的详细解析:

{
    "api": "execute",
    "effect": "EXECUTE",
    "method": "SQL",
    "number": "id",
    "id": "id",
    "name": "id",
    "idCheck": true,
    "request": [
        {
            "field": "main_params",
            "label": "主参数",
            "type": "object",
            "describe": "对应主语句内的动态参数",
            "value": "_function CAST('{FPlanFinishDate}' as DATETIME)",
            "children": [
                {"field": "order_code", "label": "订单编号", "type": "string", "describe":"店铺名称",   "value":"{FSaleOrderNo}"},
                {"field": "FBillNo",    "label":"生产订单号",    "type":"string",    "describe":"部门",    "value":"{FBillNo}"},
                {"field":"FStatus",     "label":"状态",       "type":"string",    "value":"{FStatus}"},
                {"field":"create_time","label":"创建时间",      "type":"string",    "value":"_function CAST('{FCREATEDATE}' as DATETIME)"},
                {"field":"update_time","label":"更新时间",      "type":"string",    "value":"_function now()"},
                {"field":"FMATERIALID_FNumber","label":"物料编码","type":"string","value":"{FMATERIALID_FNumber}"},
                {"field":"FPlanStartDate","label":"排产开始时间","type":"datetime","value":"_function CAST('{FPlanStartDate}' as DATETIME)"},
                {"field":"FPlanFinishDate","label":"排产结束时间","type":"datetime","value":"_function CAST('{FPlanFinishDate}' as DATETIME)"}
            ]
        }
    ],
    "otherRequest":[
        {
            "field": "main_sql",
            "label": "主语句",
            "type": "string",
            "describe": "SQL首次执行的语句,将会返回:lastInsertId",
            "value": 
"INSERT INTO `middle_order_prdmq` (`order_code`, `FBillNo`, `FStatus`, `create_time`, `update_time`, `FMATERIALID_FNumber`, `FPlanStartDate`, `FPlanFinishDate`) VALUES (:order_code,:FBillNo,:FStatus,:create_time,:update_time,:FMATERIALID_FNumber,:FPlanStartDate,:FPlanFinishDate)"
        }
    ]
}

ETL转换过程

  1. 提取(Extract)

    • 从源平台(金蝶生产订单)提取数据。假设我们已经完成了数据请求与清洗阶段,当前获得的数据包含多个字段,如订单编号、生产订单号、状态、创建时间、更新时间、物料编码、排产开始时间和排产结束时间等。
  2. 转换(Transform)

    • 将提取到的数据进行必要的转换,以符合目标平台MySQL API接口所需的格式。
    • 使用元数据配置中的main_params字段,对应主语句内的动态参数进行处理。例如,将日期字符串转换为DATETIME类型:
      _function CAST('{FCREATEDATE}' as DATETIME)
    • 动态参数包括:
      • order_code: {FSaleOrderNo}
      • FBillNo: {FBillNo}
      • FStatus: {FStatus}
      • create_time: _function CAST('{FCREATEDATE}' as DATETIME)
      • update_time: _function now()
      • FMATERIALID_FNumber: {FMATERIALID_FNumber}
      • FPlanStartDate: _function CAST('{FPlanStartDate}' as DATETIME)
      • FPlanFinishDate: _function CAST('{FPlanFinishDate}' as DATETIME)
  3. 加载(Load)

    • 将转换后的数据通过API接口写入目标平台(MySQL数据库)。
    • 使用元数据配置中的main_sql字段,定义SQL插入语句:
      INSERT INTO `middle_order_prdmq` (`order_code`, `FBillNo`, `FStatus`, `create_time`, `update_time`, `FMATERIALID_FNumber`, `FPlanStartDate`, `FPlanFinishDate`) VALUES (:order_code, :FBillNo, :FStatus, :create_time, :update_time, :FMATERIALID_FNumber, :FPlanStartDate, :FPlanFinishDate)

API接口调用示例

通过上述步骤,我们可以构建一个完整的API接口调用流程。以下是一个简化的示例代码,用于展示如何通过API接口将转换后的数据写入MySQL数据库:

import requests

# 定义API URL
api_url = 'http://example.com/api/execute'

# 构建请求体
payload = {
    'main_params': {
        'order_code': 'SO123456',
        'FBillNo': 'PO654321',
        'FStatus': 'Completed',
        'create_time': '2023-10-01 12:00:00',
        'update_time': '2023-10-01 12:00:00',
        'FMATERIALID_FNumber': 'MAT001',
        'FPlanStartDate': '2023-10-01 08:00:00',
        'FPlanFinishDate': '2023-10-01 18:00:00'
    },
    'main_sql': """
        INSERT INTO middle_order_prdmq 
        (order_code, FBillNo, FStatus, create_time, update_time, FMATERIALID_FNumber, FPlanStartDate, FPlanFinishDate) 
        VALUES (:order_code, :FBillNo, :FStatus, :create_time, :update_time, :FMATERIALID_FNumber, :FPlanStartDate, :FPlanFinishDate)
    """
}

# 发起POST请求
response = requests.post(api_url, json=payload)

# 检查响应状态
if response.status_code == 200:
    print('Data successfully inserted into MySQL database.')
else:
    print('Failed to insert data:', response.text)

通过上述技术案例,我们展示了如何利用轻易云数据集成平台完成ETL转换,并通过API接口将数据写入目标平台MySQL数据库。该过程不仅提高了数据处理效率,还确保了不同系统间的数据无缝对接。 企业微信与ERP系统接口开发配置