金蝶云星空到MySQL的数据集成:组织间结算价目表V2-mom案例分享
在本次技术案例中,我们专注于将金蝶云星空系统中的数据无缝集成到MySQL数据库。具体而言,我们配置并实施了“组织间结算价目表V2-mom”方案,旨在确保数据的高效传输和可靠处理。
首先介绍下我们所使用的两个关键API接口:从金蝶云星空获取数据时调用的executeBillQuery
,以及将数据写入MySQL时则调用execute
。这两个API分别负责截取源系统中的实时业务数据,并确保这些数据能够被精准地导入目标数据库。
为了应对大规模的数据流动,我们特别利用了以下几个平台特性:
-
高吞吐量的数据写入能力:通过优化批量处理机制,有效提高了大量业务数据快速写入MySQL的效率。
-
实时监控与告警系统:整个集成过程由集中监控和告警系统全程护航,确保每一步都能透明可视,同时提供及时的问题反馈和解决途径。
-
自定义数据转换逻辑:针对不同业务需求和多样化的数据结构,我们设计并应用了一系列自定义转换规则,以便更好地适应企业现有的信息架构。
此外,为了解决常见的数据质量问题,还特别引入了异常检测及错误重试机制。这不仅帮助发现潜在的数据问题,也提升了整体任务执行的可靠性。在实际操作过程中,各种分页与限流策略也得到了有效应用,从而保证不会因单次请求过多而导致资源超载或失败。
此后的内容将进一步展示如何详尽实现以上各项功能模块,以及详细论述其中涉及的重要技术细节,包括但不限于批次调度、接口调用、数据信息管理等方面。
调用金蝶云星空接口executeBillQuery获取并加工数据
在轻易云数据集成平台中,调用源系统接口是数据集成生命周期的第一步。本文将详细探讨如何通过调用金蝶云星空的executeBillQuery
接口来获取并加工数据。
接口配置与请求参数
根据元数据配置,我们需要通过POST方法调用executeBillQuery
接口。以下是请求参数的详细配置:
- api:
executeBillQuery
- method:
POST
- FormId:
IOS_PriceList
- FieldKeys: 需要查询的字段集合,使用逗号分隔
- FilterString: 过滤条件,例如:
FUseOrgId.FNumber='T02' and FDocumentStatus='C' and FApproveDate>='{{LAST_SYNC_TIME|dateTime}}'
- Limit: 最大行数,默认值为2000
- StartRow: 开始行索引,用于分页
- TopRowCount: 返回总行数
请求字段说明
请求字段包括实体主键、状态、名称、编码等多个字段。以下是部分关键字段及其含义:
FEntryID
: 实体行主键FID
: 实体主键FDOC_STATUS
: 状态FFORBID_STATUS
: 失效状态FNAME
: 名称FNUMBER
: 编码FCREATE_ORG_ID
: 核算组织FUSE_ORG_ID
: 使用组织
这些字段将用于构建请求体,以便从金蝶云星空获取所需的数据。
构建请求体
根据元数据配置,我们可以构建如下请求体:
{
"FormId": "IOS_PriceList",
"FieldKeys": "FEntryID,FID,FDOC_STATUS,FFORBID_STATUS,FNAME,FNUMBER,FCREATE_ORG_ID,FUSE_ORG_ID",
"FilterString": "FUseOrgId.FNumber='T02' and FDocumentStatus='C' and FApproveDate>='{{LAST_SYNC_TIME|dateTime}}'",
"Limit": 2000,
"StartRow": "{PAGINATION_START_ROW}",
"TopRowCount": true
}
调用接口与处理响应
在轻易云数据集成平台中,通过HTTP POST方法调用上述接口,并处理返回的JSON响应。以下是示例代码:
import requests
url = "https://api.kingdee.com/executeBillQuery"
headers = {
'Content-Type': 'application/json'
}
payload = {
"FormId": "IOS_PriceList",
"FieldKeys": "FEntryID,FID,FDOC_STATUS,FFORBID_STATUS,FNAME,FNUMBER,FCREATE_ORG_ID,FUSE_ORG_ID",
"FilterString": "FUseOrgId.FNumber='T02' and FDocumentStatus='C' and FApproveDate>='2023-01-01'",
"Limit": 2000,
"StartRow": 0,
"TopRowCount": True
}
response = requests.post(url, headers=headers, json=payload)
data = response.json()
# 数据处理逻辑,例如清洗和转换
processed_data = []
for entry in data['Result']:
processed_entry = {
'EntryID': entry['FEntryID'],
'Name': entry['FNAME'],
'Number': entry['FNUMBER'],
# 添加更多字段处理逻辑...
}
processed_data.append(processed_entry)
# 输出或存储处理后的数据
print(processed_data)
数据清洗与转换
在获取到原始数据后,需要进行清洗和转换,以满足业务需求。例如,可以对日期格式进行标准化,对编码进行校验等。
from datetime import datetime
def clean_and_transform(data):
for entry in data:
# 日期格式标准化
if 'CreateDate' in entry:
entry['CreateDate'] = datetime.strptime(entry['CreateDate'], '%Y-%m-%d').strftime('%Y%m%d')
# 编码校验和转换
if 'Number' in entry:
entry['Number'] = entry['Number'].strip().upper()
# 更多清洗和转换逻辑...
clean_and_transform(processed_data)
通过上述步骤,我们可以高效地从金蝶云星空获取并加工所需的数据,为后续的数据写入和业务分析奠定基础。
数据集成生命周期中的ETL转换与写入MySQLAPI接口
在数据集成生命周期的第二阶段,我们需要将已经集成的源平台数据进行ETL(Extract, Transform, Load)转换,最终转为目标平台 MySQL API 接口所能够接收的格式,并写入目标平台。本文将详细探讨如何通过轻易云数据集成平台实现这一过程。
元数据配置解析
我们使用以下元数据配置来指导整个ETL过程:
{
"api": "execute",
"effect": "EXECUTE",
"method": "POST",
"idCheck": true,
"request": [
{
"field": "main_params",
"label": "main_params",
"type": "object",
"describe": "111",
"children": [
{"field":"FEntryID","label":"实体行主键","type":"string","value":"{FEntryID}"},
{"field":"FID","label":"实体主键","type":"string","value":"{FID}"},
{"field":"FDOC_STATUS","label":"状态","type":"string","value":"{FDOC_STATUS}"},
{"field":"FFORBID_STATUS","label":"失效状态","type":"string","value":"{FFORBID_STATUS}"},
{"field":"FNAME","label":"名称","type":"string","value":"{FNAME}"},
{"field":"FNUMBER","label":"编码","type":"string","value":"{FNUMBER}"},
{"field":"FDESCRIPTION","label":"备注","type":"string","value":"{FDESCRIPTION}"},
{"field":"FCREATE_ORG_ID","label":"核算组织","type":"string","value":"{FCREATE_ORG_ID}"},
{"field":"FUSE_ORG_ID","label":"使用组织","type":"string","value":"{FUSE_ORG_ID}"},
{"field":"FCREATOR_ID","label":"创建人","type":"string","value":"{FCREATOR_ID}"},
{"field":"FMODIFIER_ID","label":"最后修改人","type":"string","value":"{FMODIFIER_ID}"},
{"field":"FCREATE_DATE","label":"创建日期","type":"","value":{"$dateFormat":{"formatString":"'yyyy-MM-dd HH:mm:ss'"},"sourceFieldName":{"$sourceFieldName":["createDate"]}}},
{"field":"","label":"","type":"","value":{"$dateFormat":{"formatString":"'yyyy-MM-dd HH:mm:ss'"},"sourceFieldName":{"$sourceFieldName":["modifyDate"]}}},
{"field":"","label":"","type":"","value":{"$dateFormat":{"formatString":"'yyyy-MM-dd HH:mm:ss'"},"sourceFieldName":{"$sourceFieldName":["approveDate"]}}},
{"field":"","label":"","type":"","value":{"$dateFormat":{"formatString":"'yyyy-MM-dd HH:mm:ss'"},"sourceFieldName":{"$sourceFieldName":["forbidDate"]}}},
{"field":"","label":"","type":"","value":{"$dateFormat":{"formatString":"'yyyy-MM-dd HH:mm:ss'"},"sourceFieldName":{"$sourceFieldName":["entryEffectiveDate"]}}},
{"field":"","label":"","type":"","value":{"$dateFormat":{"formatString":"'yyyy-MM-dd HH:mm:ss'"},"sourceFieldName":{"$sourceFieldName":["entryExpiryDate"]}}}
]
}
],
"otherRequest": [
{
"field": "main_sql",
"label": "",
"type": "",
"describe":"",
"value": ""
}
]
}
数据请求与清洗
在数据请求阶段,我们从源系统中提取原始数据。通过配置 request
字段中的子字段,我们定义了需要提取的数据字段及其类型和描述。这些字段包括 FEntryID
, FID
, FDOC_STATUS
, FFORBID_STATUS
等,确保我们获取到所有必要的信息。
{
"request":[
{
"field": "main_params",
...
}
]
}
数据转换
在数据转换阶段,我们需要将提取的数据转换为目标平台 MySQL API 所能接收的格式。这里,我们利用元数据配置中的 children
字段来映射每个字段的值。例如:
{"field": "FEntryID", "label": "", type: "", value: "{FEntryID}"}
其中,"{FEntryID}"
表示从源数据中提取的 FEntryID
字段值。
此外,我们还需要对日期字段进行格式化处理,以确保符合目标平台要求。例如:
{
"$dateFormat":{
formatString: "'yyyy-MM-dd HH:mm:ss'",
sourceFieldName: ["createDate"]
}
}
这种配置确保了日期字段被正确地转换为目标平台所需的格式。
数据写入
最后,在数据写入阶段,我们使用 SQL 插入语句将转换后的数据写入 MySQL 数据库。元数据配置中的 otherRequest
字段定义了插入语句:
INSERT INTO ty_aps.wms_interorg_settle_price
(FEntryID,FID,FDOC_STATUS,FFORBID_STATUS,FNAME,FNUMBER,FDESCRIPTION,FCREATE_ORG_ID,FUSE_ORG_ID,FCREATOR_ID,FMODIFIER_ID,FCREATE_DATE,FMODIFIER_DATE,FPRICETYPE,FCURRENCYID,FEXPIRYDATE,
FAPPROVE_DATE,FFORBID_DATE,FFORBIDDER_ID,FAPPROVER_ID,FIS_INCLUDED_TAX,FMATERIALID,FMATERIAL_NAME,FMATERIAL_MODEL,FAUXPROP_ID,FTRADETYPE,FBUSINESSTYPE,FPRICEUNITID,FPRICEBASE,
FPRICE,FENTRY_EFFECTIVE_DATE,FENTRY_EXPRIY_DATE,FFORBID_STATUS_EN,FFORBIDDER_ID_EN,FFORBID_DATE_EN,FMATERIAL_TYPE_ID,FROW_AUDIT_STATUS,FTAX_PRICE,FTAX_RATE,FIS_FREE_GIFT,FBOM_ID,
FLOT,FBASE_UNIT_ID,FBASE_UNIT_PRICE,FMTONO,FMATERIAL_GROUP_ID,FMATERIAL_GROUP_NAME,FENTITY_ACC_ID,FIS_ENABLE,FACCT_SYS_ID,FACCT_SYS_NAME)
VALUES (:FEntryID,:FID,:FDOC_STATUS,:FFORBID_STATUS,:FNAME,:FNUMBER,:FDESCRIPTION,:FCREATE_ORG_ID,:FUSE_ORG_ID,:FCREATOR_ID,:FMODIFIER_ID,:FCREATE_DATE,:FMODIFIER_DATE,:FPRICETYPE,:FCURRENCYID,:FEXPIRYDATE,
:FAPPROVE_DATE,:FFORBID_DATE,:FFORBIDDER_ID,:FAPPROVER_ID,:FIS_INCLUDED_TAX,:FMATERIALID,:FMATERIAL_NAME,:FMATERIAL_MODEL,:FAUXPROP_ID,:FTRADETYPE,:FBUSINESSTYPE,:FPRICEUNITID,:FPRICEBASE,
:FPRICE,:FENTRY_EFFECTIVE_DATE,:FENTRY_EXPRIY_DATE,:FFORBID_STATUS_EN,:FFORBIDER_EN)
通过这种方式,我们可以确保所有必要的数据字段都被正确地插入到 MySQL 数据库中。
实践案例
假设我们从源系统中提取到如下数据:
{
FEntryID: '12345',
FID: '67890',
FDOC_STATUS: 'Active',
FFORBID_STATUS: 'Inactive',
FNAME: 'Sample Name',
FNUMBER: '001',
FDESCRIPTION: 'Sample Description',
...
}
经过 ETL 转换后,生成的 SQL 插入语句如下:
INSERT INTO ty_aps.wms_interorg_settle_price
(FEntryID,FID,...)
VALUES ('12345','67890',...)
这样,通过轻易云数据集成平台的全异步、支持多种异构系统集成能力,实现了不同系统间的数据无缝对接,并最终将转换后的数据成功写入到目标 MySQL 平台中。