金蝶云星空数据集成到MySQL的实战案例分享
在实际业务场景中,如何确保跨系统的数据流转高效且准确,是每个企业面临的重要挑战。本篇文章将从技术角度出发,详细解析一个成功将金蝶云星空数据无缝集成至MySQL数据库的案例——kd-金蝶查询收款单-->mysql(鸿巢)收款单。
环境设置与接口调用
本次集成任务中,我们使用轻易云数据集成平台,其主要优势体现在其高吞吐量的数据写入能力和强大的API资产管理功能。具体来说,我们通过调用金蝶云星空提供的executeBillQuery API获取收款单据,再利用MySQL的execute API实现批量数据写入。
数据获取:调用金蝶云星空接口
我们首先需要配置好对接金蝶云星空系统所需的API参数,包括请求头、认证信息等。同时,还要处理分页和限流问题,以确保大量数据能稳定地被抓取。在这一过程中,自定义的数据转换逻辑显得尤为重要,通过轻易云可视化设计工具,可以直观地设定各项参数,使整个流程透明化、可控性更强。
{
"method": "POST",
"url": "https://kingdee-api-endpoint/executeBillQuery",
"headers": {
"Content-Type": "application/json",
"Authorization": "Bearer <TOKEN>"
},
...
}
数据写入:MySQL批量操作
当从金蝶云星空获取的数据准备就绪后,下一步是将这些数据可靠、高效地写入到MySQL数据库中。面对大规模并发写操作时,我们采用了批量插入策略,并结合事务控制来保证数据的一致性。在这一步骤,同样需要应对两者间可能存在的数据格式差异,进行必要的数据映射和转换。
此外,为了提升监控效果及故障恢复能力,还引入实时日志记录和告警机制。当出现异常情况时,能够立即重试或人工介入,从而尽最大程度上减少因意外导致的数据丢失或错误。
总结与前瞻
针对此次任务,我们着重解决了以下关键技术点:
- 如何有效调用并处理来自金蝶云星空系统的API。
- 批量快速写入大量数据到MySQL中的实践方法。
- 解决跨系统间的数据格式差异,以及提高整体可靠性的措施。
接下来,将深入探讨具体实现步骤及代码示例,为读者呈现全面而细致的参考方案。
调用源系统金蝶云星空接口executeBillQuery获取并加工数据
在数据集成的生命周期中,调用源系统接口获取数据是关键的第一步。本文将深入探讨如何通过轻易云数据集成平台调用金蝶云星空的executeBillQuery
接口来获取收款单数据,并进行初步加工。
接口配置与请求参数
首先,我们需要配置接口和请求参数。根据元数据配置,executeBillQuery
接口采用POST方法进行调用,主要用于查询操作。以下是具体的请求参数配置:
- api:
executeBillQuery
- method:
POST
- number:
FBillNo
- id:
FRECEIVEBILLENTRY_FEntryID
- idCheck:
true
请求字段包括:
[
{"field":"FBillNo","label":"FBillNo","type":"string","describe":"111","value":"FBillNo"},
{"field":"FDOCUMENTSTATUS","label":"FDOCUMENTSTATUS","type":"string","describe":"111","value":"FDOCUMENTSTATUS"},
{"field":"FDATE","label":"FDATE","type":"string","describe":"111","value":"FDATE"},
{"field":"FRECTOTALAMOUNTFOR","label":"FRECTOTALAMOUNTFOR","type":"string","describe":"111","value":"FRECTOTALAMOUNTFOR"},
{"field":"FRECEIVEBILLENTRY_FEntryID","label":"FRECEIVEBILLENTRY_FEntryID","type":"string","describe":"111","value":"FRECEIVEBILLENTRY_FEntryID"}
]
其他请求参数包括分页和过滤条件:
[
{"field":"Limit","label":"Limit","type":"string","describe":"金蝶的查询分页参数","value":"{PAGINATION_PAGE_SIZE}"},
{"field":"StartRow","label":"StartRow","type":"string","describe":"金蝶的查询分页参数","value":"{PAGINATION_START_ROW}"},
{"field":"TopRowCount","label":"TopRowCount","type":"int"},
{"field":"FilterString","label":"FilterString", "type": "string", "describe": "示例写法 FSupplierId.FNumber = 'VEN00010' and FApproveDate>=", "value": "FApproveDate>='{{LAST_SYNC_TIME|date}}' and FCreatorId= '100796' and FBillNo LIKE 'LHHYSKD%'"},
{"field": "FieldKeys", "label": "FieldKeys", "type": "array", "describe": "金蝶分录主键ID格式:FPOOrderEntry_FEntryId,其它格式 FPurchaseOrgId.FNumber", "value": "{MAIN_REQUEST}"},
{"field": "FormId", "label": "FormId", "type": "string", "describe": "必须填写金蝶的表单ID如:PUR_PurchaseOrder", "value": "AR_RECEIVEBILL"}
]
数据请求与清洗
在发送请求后,我们会收到一个包含收款单信息的数据集。为了确保数据的一致性和可用性,需要对返回的数据进行清洗和格式化。例如,将日期字段FDATE
转换为新的格式,并重命名为FDate_new
。
"formatResponse":[{"old": "FDATE", "new": "FDate_new", "format": "date"}]
此步骤确保了日期字段在后续处理中的一致性。
数据转换与写入
在清洗后的数据需要转换为目标系统所需的格式,并写入到MySQL数据库中。在这个过程中,轻易云平台提供了自动填充响应功能(autoFillResponse),简化了数据映射和转换工作。
"autoFillResponse": true
这意味着我们可以直接将清洗后的数据映射到目标数据库表中,无需手动编写复杂的转换逻辑。
技术实现案例
以下是一个实际调用金蝶云星空接口并处理返回数据的示例代码:
import requests
import json
# 配置API请求URL和头信息
url = 'https://api.kingdee.com/executeBillQuery'
headers = {'Content-Type': 'application/json'}
# 构建请求体
payload = {
'FormId': 'AR_RECEIVEBILL',
'FieldKeys': ['FBillNo', 'FDOCUMENTSTATUS', 'FDATE', 'FRECTOTALAMOUNTFOR', 'FRECEIVEBILLENTRY_FEntryID'],
'FilterString': f"FApproveDate>='{LAST_SYNC_TIME}' and FCreatorId= '100796' and FBillNo LIKE 'LHHYSKD%'",
'Limit': PAGINATION_PAGE_SIZE,
'StartRow': PAGINATION_START_ROW,
}
# 发起POST请求
response = requests.post(url, headers=headers, data=json.dumps(payload))
# 检查响应状态码
if response.status_code == 200:
data = response.json()
# 数据清洗与格式化
for record in data:
record['FDate_new'] = format_date(record['FDATE'])
# 数据写入MySQL数据库(伪代码)
write_to_mysql(data)
else:
print(f"Error: {response.status_code}, {response.text}")
以上代码展示了如何通过API获取数据、进行初步清洗,并准备将其写入到MySQL数据库中。通过这种方式,可以实现不同系统间的数据无缝对接,提高业务流程的自动化程度。
总结来说,通过轻易云平台调用金蝶云星空接口并处理返回的数据,是实现高效数据集成的重要步骤。本文详细介绍了接口配置、数据请求与清洗、以及实际技术实现案例,为开发者提供了实用的参考。
数据转换与写入目标平台 MySQLAPI 接口的技术实现
在数据集成生命周期的第二步中,已经从源平台(如金蝶)获取到的数据需要经过ETL(Extract, Transform, Load)过程,将其转换为目标平台(如MySQL)能够接收的格式,并最终写入目标平台。本文将详细探讨如何利用轻易云数据集成平台进行这一过程的配置和实现。
元数据配置解析
首先,我们需要理解元数据配置中的各个字段及其作用。以下是一个典型的元数据配置示例:
{
"api": "execute",
"effect": "EXECUTE",
"method": "POST",
"idCheck": true,
"request": [
{
"field": "main_params",
"label": "main_params",
"type": "object",
"describe": "111",
"children": [
{"field": "FEntity_FEntryID", "label": "明细id", "type": "string", "value": "{FRECEIVEBILLENTRY_FEntryID}"},
{"field": "order_no_new", "label": "单号", "type": "string", "value": "{FBillNo}"},
{"field": "FDocumentStatus", "label": "状态", "type": "string", "value": "{FDOCUMENTSTATUS}"},
{"field": "qty_count", "label": "数量", "type": "string", "value":"1"},
{"field":"sales_count","label":"金额","type":"string","value":"{FRECTOTALAMOUNTFOR}"},
{"field":"datetime_new","label":"时间","type":"date","value":"{FDate_new}"},
{"field":"Document_Type","label":"单据类型","type":"string","value":"收款单"}
]
}
],
...
}
数据请求与清洗
在ETL过程中,首先是从源系统请求数据并进行清洗。这一步通常涉及到调用源系统的API接口,获取原始数据,并对其进行初步处理,如去除无效字段、格式化日期等。这里我们假设已经完成了这一步,得到了一个包含所有必要信息的数据对象。
数据转换
接下来是数据转换阶段。在这个阶段,我们将源系统的数据映射到目标系统所需的格式。根据元数据配置,我们需要将以下字段进行转换:
FEntity_FEntryID
->明细id
FBillNo
->单号
FDOCUMENTSTATUS
->状态
- 固定值
1
->数量
FRECTOTALAMOUNTFOR
->金额
- 格式化后的日期
FDate_new
->时间
- 固定值
收款单
->单据类型
这些映射关系在元数据配置中的 "children"
字段中定义。
数据写入
最后一步是将转换后的数据写入目标平台MySQL。这一步通常通过调用目标平台的API接口来实现。在我们的例子中,使用的是一个名为 "execute"
的API接口,通过POST方法提交数据。
以下是一个示例代码片段,用于将转换后的数据通过API接口写入MySQL:
import requests
import json
# 定义要发送的数据
data = {
'main_params': {
'FEntity_FEntryID': '12345',
'order_no_new': 'ORD67890',
'FDocumentStatus': 'A',
'qty_count': '1',
'sales_count': '1000.00',
'datetime_new': '2023-10-01T00:00:00Z',
'Document_Type': '收款单'
}
}
# API URL
url = 'https://api.example.com/execute'
# 发起POST请求
response = requests.post(url, data=json.dumps(data), headers={'Content-Type': 'application/json'})
# 检查响应状态
if response.status_code == 200:
print("Data successfully written to MySQL.")
else:
print(f"Failed to write data. Status code: {response.status_code}")
SQL 插入语句
在某些情况下,我们可能需要直接执行SQL插入语句来写入数据。元数据配置中的 "otherRequest"
字段定义了这样的SQL语句:
{
...
{
"field": "main_sql",
...
,"value":"INSERT INTO `hc_kd_skd`(`FEntity_FEntryID`,`order_no_new`,`FDocumentStatus`,`qty_count`,`sales_count`,`datetime_new`, `Document_Type`) VALUES (:FEntity_FEntryID,:order_no_new,:FDocumentStatus,:qty_count,:sales_count,:datetime_new,:Document_Type)"
}
}
这段SQL语句可以通过Python的数据库连接库(如mysql.connector
或sqlalchemy
)来执行:
import mysql.connector
# 建立数据库连接
conn = mysql.connector.connect(
host='your_host',
user='your_user',
password='your_password',
database='your_database'
)
cursor = conn.cursor()
# 定义插入语句和参数
insert_stmt = (
"""
INSERT INTO hc_kd_skd (FEntity_FEntryID, order_no_new, FDocumentStatus, qty_count, sales_count, datetime_new, Document_Type)
VALUES (%s, %s, %s, %s, %s, %s, %s)
"""
)
data = ('12345', 'ORD67890', 'A', '1', '1000.00', '2023-10-01T00:00:00Z', '收款单')
# 执行插入操作
cursor.execute(insert_stmt, data)
# 提交事务
conn.commit()
# 关闭连接
cursor.close()
conn.close()
通过上述步骤,我们可以顺利地将从源系统获取的数据经过ETL处理后写入到目标系统MySQL中,实现不同系统间的数据无缝对接。