ETL转换MySQL数据集成:从金蝶云星空到目标数据库

  • 轻易云集成顾问-蔡威

金蝶云星空到MySQL的数据集成:组织间结算价目表V2-mom案例分享

在本次技术案例中,我们专注于将金蝶云星空系统中的数据无缝集成到MySQL数据库。具体而言,我们配置并实施了“组织间结算价目表V2-mom”方案,旨在确保数据的高效传输和可靠处理。

首先介绍下我们所使用的两个关键API接口:从金蝶云星空获取数据时调用的executeBillQuery,以及将数据写入MySQL时则调用execute。这两个API分别负责截取源系统中的实时业务数据,并确保这些数据能够被精准地导入目标数据库。

为了应对大规模的数据流动,我们特别利用了以下几个平台特性:

  1. 高吞吐量的数据写入能力:通过优化批量处理机制,有效提高了大量业务数据快速写入MySQL的效率。

  2. 实时监控与告警系统:整个集成过程由集中监控和告警系统全程护航,确保每一步都能透明可视,同时提供及时的问题反馈和解决途径。

  3. 自定义数据转换逻辑:针对不同业务需求和多样化的数据结构,我们设计并应用了一系列自定义转换规则,以便更好地适应企业现有的信息架构。

此外,为了解决常见的数据质量问题,还特别引入了异常检测及错误重试机制。这不仅帮助发现潜在的数据问题,也提升了整体任务执行的可靠性。在实际操作过程中,各种分页与限流策略也得到了有效应用,从而保证不会因单次请求过多而导致资源超载或失败。

此后的内容将进一步展示如何详尽实现以上各项功能模块,以及详细论述其中涉及的重要技术细节,包括但不限于批次调度、接口调用、数据信息管理等方面。 系统集成平台API接口配置

调用金蝶云星空接口executeBillQuery获取并加工数据

在轻易云数据集成平台中,调用源系统接口是数据集成生命周期的第一步。本文将详细探讨如何通过调用金蝶云星空的executeBillQuery接口来获取并加工数据。

接口配置与请求参数

根据元数据配置,我们需要通过POST方法调用executeBillQuery接口。以下是请求参数的详细配置:

  • api: executeBillQuery
  • method: POST
  • FormId: IOS_PriceList
  • FieldKeys: 需要查询的字段集合,使用逗号分隔
  • FilterString: 过滤条件,例如:FUseOrgId.FNumber='T02' and FDocumentStatus='C' and FApproveDate>='{{LAST_SYNC_TIME|dateTime}}'
  • Limit: 最大行数,默认值为2000
  • StartRow: 开始行索引,用于分页
  • TopRowCount: 返回总行数

请求字段说明

请求字段包括实体主键、状态、名称、编码等多个字段。以下是部分关键字段及其含义:

  • FEntryID: 实体行主键
  • FID: 实体主键
  • FDOC_STATUS: 状态
  • FFORBID_STATUS: 失效状态
  • FNAME: 名称
  • FNUMBER: 编码
  • FCREATE_ORG_ID: 核算组织
  • FUSE_ORG_ID: 使用组织

这些字段将用于构建请求体,以便从金蝶云星空获取所需的数据。

构建请求体

根据元数据配置,我们可以构建如下请求体:

{
  "FormId": "IOS_PriceList",
  "FieldKeys": "FEntryID,FID,FDOC_STATUS,FFORBID_STATUS,FNAME,FNUMBER,FCREATE_ORG_ID,FUSE_ORG_ID",
  "FilterString": "FUseOrgId.FNumber='T02' and FDocumentStatus='C' and FApproveDate>='{{LAST_SYNC_TIME|dateTime}}'",
  "Limit": 2000,
  "StartRow": "{PAGINATION_START_ROW}",
  "TopRowCount": true
}

调用接口与处理响应

在轻易云数据集成平台中,通过HTTP POST方法调用上述接口,并处理返回的JSON响应。以下是示例代码:

import requests

url = "https://api.kingdee.com/executeBillQuery"
headers = {
    'Content-Type': 'application/json'
}
payload = {
    "FormId": "IOS_PriceList",
    "FieldKeys": "FEntryID,FID,FDOC_STATUS,FFORBID_STATUS,FNAME,FNUMBER,FCREATE_ORG_ID,FUSE_ORG_ID",
    "FilterString": "FUseOrgId.FNumber='T02' and FDocumentStatus='C' and FApproveDate>='2023-01-01'",
    "Limit": 2000,
    "StartRow": 0,
    "TopRowCount": True
}

response = requests.post(url, headers=headers, json=payload)
data = response.json()

# 数据处理逻辑,例如清洗和转换
processed_data = []
for entry in data['Result']:
    processed_entry = {
        'EntryID': entry['FEntryID'],
        'Name': entry['FNAME'],
        'Number': entry['FNUMBER'],
        # 添加更多字段处理逻辑...
    }
    processed_data.append(processed_entry)

# 输出或存储处理后的数据
print(processed_data)

数据清洗与转换

在获取到原始数据后,需要进行清洗和转换,以满足业务需求。例如,可以对日期格式进行标准化,对编码进行校验等。

from datetime import datetime

def clean_and_transform(data):
    for entry in data:
        # 日期格式标准化
        if 'CreateDate' in entry:
            entry['CreateDate'] = datetime.strptime(entry['CreateDate'], '%Y-%m-%d').strftime('%Y%m%d')

        # 编码校验和转换
        if 'Number' in entry:
            entry['Number'] = entry['Number'].strip().upper()

        # 更多清洗和转换逻辑...

clean_and_transform(processed_data)

通过上述步骤,我们可以高效地从金蝶云星空获取并加工所需的数据,为后续的数据写入和业务分析奠定基础。 打通金蝶云星空数据接口

数据集成生命周期中的ETL转换与写入MySQLAPI接口

在数据集成生命周期的第二阶段,我们需要将已经集成的源平台数据进行ETL(Extract, Transform, Load)转换,最终转为目标平台 MySQL API 接口所能够接收的格式,并写入目标平台。本文将详细探讨如何通过轻易云数据集成平台实现这一过程。

元数据配置解析

我们使用以下元数据配置来指导整个ETL过程:

{
  "api": "execute",
  "effect": "EXECUTE",
  "method": "POST",
  "idCheck": true,
  "request": [
    {
      "field": "main_params",
      "label": "main_params",
      "type": "object",
      "describe": "111",
      "children": [
        {"field":"FEntryID","label":"实体行主键","type":"string","value":"{FEntryID}"},
        {"field":"FID","label":"实体主键","type":"string","value":"{FID}"},
        {"field":"FDOC_STATUS","label":"状态","type":"string","value":"{FDOC_STATUS}"},
        {"field":"FFORBID_STATUS","label":"失效状态","type":"string","value":"{FFORBID_STATUS}"},
        {"field":"FNAME","label":"名称","type":"string","value":"{FNAME}"},
        {"field":"FNUMBER","label":"编码","type":"string","value":"{FNUMBER}"},
        {"field":"FDESCRIPTION","label":"备注","type":"string","value":"{FDESCRIPTION}"},
        {"field":"FCREATE_ORG_ID","label":"核算组织","type":"string","value":"{FCREATE_ORG_ID}"},
        {"field":"FUSE_ORG_ID","label":"使用组织","type":"string","value":"{FUSE_ORG_ID}"},
        {"field":"FCREATOR_ID","label":"创建人","type":"string","value":"{FCREATOR_ID}"},
        {"field":"FMODIFIER_ID","label":"最后修改人","type":"string","value":"{FMODIFIER_ID}"},
        {"field":"FCREATE_DATE","label":"创建日期","type":"","value":{"$dateFormat":{"formatString":"'yyyy-MM-dd HH:mm:ss'"},"sourceFieldName":{"$sourceFieldName":["createDate"]}}},
        {"field":"","label":"","type":"","value":{"$dateFormat":{"formatString":"'yyyy-MM-dd HH:mm:ss'"},"sourceFieldName":{"$sourceFieldName":["modifyDate"]}}},
        {"field":"","label":"","type":"","value":{"$dateFormat":{"formatString":"'yyyy-MM-dd HH:mm:ss'"},"sourceFieldName":{"$sourceFieldName":["approveDate"]}}},
        {"field":"","label":"","type":"","value":{"$dateFormat":{"formatString":"'yyyy-MM-dd HH:mm:ss'"},"sourceFieldName":{"$sourceFieldName":["forbidDate"]}}},
        {"field":"","label":"","type":"","value":{"$dateFormat":{"formatString":"'yyyy-MM-dd HH:mm:ss'"},"sourceFieldName":{"$sourceFieldName":["entryEffectiveDate"]}}},
        {"field":"","label":"","type":"","value":{"$dateFormat":{"formatString":"'yyyy-MM-dd HH:mm:ss'"},"sourceFieldName":{"$sourceFieldName":["entryExpiryDate"]}}}
      ]
    }
  ],
  "otherRequest": [
    {
      "field": "main_sql",
      "label": "",
      "type": "",
      "describe":"",
      "value": ""
    }
  ]
}

数据请求与清洗

在数据请求阶段,我们从源系统中提取原始数据。通过配置 request 字段中的子字段,我们定义了需要提取的数据字段及其类型和描述。这些字段包括 FEntryID, FID, FDOC_STATUS, FFORBID_STATUS 等,确保我们获取到所有必要的信息。

{
  "request":[
    {
      "field": "main_params",
      ...
    }
  ]
}

数据转换

在数据转换阶段,我们需要将提取的数据转换为目标平台 MySQL API 所能接收的格式。这里,我们利用元数据配置中的 children 字段来映射每个字段的值。例如:

{"field": "FEntryID", "label": "", type: "", value: "{FEntryID}"}

其中,"{FEntryID}" 表示从源数据中提取的 FEntryID 字段值。

此外,我们还需要对日期字段进行格式化处理,以确保符合目标平台要求。例如:

{
  "$dateFormat":{
    formatString: "'yyyy-MM-dd HH:mm:ss'",
    sourceFieldName: ["createDate"]
  }
}

这种配置确保了日期字段被正确地转换为目标平台所需的格式。

数据写入

最后,在数据写入阶段,我们使用 SQL 插入语句将转换后的数据写入 MySQL 数据库。元数据配置中的 otherRequest 字段定义了插入语句:

INSERT INTO ty_aps.wms_interorg_settle_price 
(FEntryID,FID,FDOC_STATUS,FFORBID_STATUS,FNAME,FNUMBER,FDESCRIPTION,FCREATE_ORG_ID,FUSE_ORG_ID,FCREATOR_ID,FMODIFIER_ID,FCREATE_DATE,FMODIFIER_DATE,FPRICETYPE,FCURRENCYID,FEXPIRYDATE,
FAPPROVE_DATE,FFORBID_DATE,FFORBIDDER_ID,FAPPROVER_ID,FIS_INCLUDED_TAX,FMATERIALID,FMATERIAL_NAME,FMATERIAL_MODEL,FAUXPROP_ID,FTRADETYPE,FBUSINESSTYPE,FPRICEUNITID,FPRICEBASE,
FPRICE,FENTRY_EFFECTIVE_DATE,FENTRY_EXPRIY_DATE,FFORBID_STATUS_EN,FFORBIDDER_ID_EN,FFORBID_DATE_EN,FMATERIAL_TYPE_ID,FROW_AUDIT_STATUS,FTAX_PRICE,FTAX_RATE,FIS_FREE_GIFT,FBOM_ID,
FLOT,FBASE_UNIT_ID,FBASE_UNIT_PRICE,FMTONO,FMATERIAL_GROUP_ID,FMATERIAL_GROUP_NAME,FENTITY_ACC_ID,FIS_ENABLE,FACCT_SYS_ID,FACCT_SYS_NAME) 
VALUES (:FEntryID,:FID,:FDOC_STATUS,:FFORBID_STATUS,:FNAME,:FNUMBER,:FDESCRIPTION,:FCREATE_ORG_ID,:FUSE_ORG_ID,:FCREATOR_ID,:FMODIFIER_ID,:FCREATE_DATE,:FMODIFIER_DATE,:FPRICETYPE,:FCURRENCYID,:FEXPIRYDATE,
:FAPPROVE_DATE,:FFORBID_DATE,:FFORBIDDER_ID,:FAPPROVER_ID,:FIS_INCLUDED_TAX,:FMATERIALID,:FMATERIAL_NAME,:FMATERIAL_MODEL,:FAUXPROP_ID,:FTRADETYPE,:FBUSINESSTYPE,:FPRICEUNITID,:FPRICEBASE,
:FPRICE,:FENTRY_EFFECTIVE_DATE,:FENTRY_EXPRIY_DATE,:FFORBID_STATUS_EN,:FFORBIDER_EN)

通过这种方式,我们可以确保所有必要的数据字段都被正确地插入到 MySQL 数据库中。

实践案例

假设我们从源系统中提取到如下数据:

{
  FEntryID: '12345',
  FID: '67890',
  FDOC_STATUS: 'Active',
  FFORBID_STATUS: 'Inactive',
  FNAME: 'Sample Name',
  FNUMBER: '001',
  FDESCRIPTION: 'Sample Description',
  ...
}

经过 ETL 转换后,生成的 SQL 插入语句如下:

INSERT INTO ty_aps.wms_interorg_settle_price 
(FEntryID,FID,...)
VALUES ('12345','67890',...)

这样,通过轻易云数据集成平台的全异步、支持多种异构系统集成能力,实现了不同系统间的数据无缝对接,并最终将转换后的数据成功写入到目标 MySQL 平台中。 泛微OA与ERP系统接口开发配置