金蝶云星空数据集成到MySQL技术案例分享:5金蝶生产订单对接商城
在实际业务中,企业经常需要将多个系统的数据高效整合,以实现业务流程的自动化和信息共享。本次我们聚焦于如何通过轻易云数据集成平台,将金蝶云星空的生产订单数据高效对接到MySQL数据库,为商城系统提供实时准确的数据支持。本案例具体方案为:5金蝶生产订单对接商城。
技术要点
-
调用金蝶云星空接口获取数据:为了从金蝶云星空中提取生产订单,我们使用了executeBillQuery API。此API能够按需查询指定时间段内的新建或更改的生产订单,并且支持分页处理,以解决大型数据集带来的压力问题。
-
定时可靠抓取与批量写入:为了确保数据的不漏单和及时更新,我们设定了一个定时任务,通过executeBillQuery API周期性地从金蝶云星空拉取新订单。同时,利用轻易云强大的批量写入功能,将这些订单一次性高效地插入到MySQL数据库中。这不仅提高了吞吐量,也避免了频繁小批量操作带来的性能开销。
-
处理接口分页和限流:
- 分页机制使得我们能分阶段逐步获取大规模的数据,而不会因超过接口返回限制而丢失任何信息。
- 通过设置合理的请求间隔和重试机制,有效应对API限流,实现稳定、可靠的数据传输。
-
自定义转换逻辑与格式差异适配:
- 在实际应用过程中,不同系统之间难免存在数据格式差异。例如,日期格式、字段命名等。在本案例中,通过自定义转换逻辑,使得从金蝶云星空获取的数据能无缝映射并存储到MySQL表结构中。
-
监控及告警体系:
- 集中的监控系统实时跟踪每个集成任务的状态。当出现异常时,即刻触发告警通知相关人员进行排查,大幅度降低事故响应时间并提升整体运维效率。
-
异常处理与错误重试机制:
- 在整个数据传输过程中不可避免会遇到网络波动、服务不可用等问题。为此,我们设计了一套完整的错误重试机制,当某些操作失败时,可以根据预设策略自动重新尝试,从而保证最终一致性。
-
日志记录与可视化管理工具
- 全程采用详尽日志记录,每一步都可以追溯,这对于后续维护和问题定位至关重要。同时,通过可
调用金蝶云星空接口executeBillQuery获取并加工数据
- 全程采用详尽日志记录,每一步都可以追溯,这对于后续维护和问题定位至关重要。同时,通过可
在数据集成过程中,调用源系统的API接口是至关重要的一步。本文将详细探讨如何通过轻易云数据集成平台调用金蝶云星空的executeBillQuery
接口来获取生产订单数据,并进行初步加工。
配置元数据
首先,我们需要配置元数据,以便正确调用金蝶云星空的API接口。以下是元数据配置的关键字段:
{
"api": "executeBillQuery",
"method": "POST",
"number": "FBillNo",
"id": "FTreeEntity_FEntryId",
"pagination": {
"pageSize": 500
},
"idCheck": true,
"request": [
{"field":"FID","label":"实体主键","type":"string","value":"FID"},
{"field":"FBillNo","label":"单据编号","type":"string","value":"FBillNo"},
{"field":"FTreeEntity_FEntryId","label":"FTreeEntity_FEntryId","type":"string","value":"FTreeEntity_FEntryId"},
{"field":"FTreeEntity_Fseq","label":"FTreeEntity_Fseq","type":"string","value":"FTreeEntity_Fseq"},
{"field":"FCREATORID","label":"创建人","type":"string","value":"FCREATORID.FName"},
{"field":"FAPPROVERID","label":"审核人","type":"string","value":"FAPPROVERID.FName"},
{"field":"FCREATEDATE","label":"创建日期","type":"string","value":"FCREATEDATE"},
{"field":"FMATERIALID_FName","label":"物料客户编码","type":"string","value":"FMATERIALID.FName"},
{"field":"FAPPROVEDATE","label":"审核日期","type":"string","value":"FAPPROVEDATE"},
{"field":"FMATERIALID_FDescription","label":"物料规格型号","type":"string","value":"FMATERIALID.FDescription"},
{"field":"FWorkShopID_FName","label":"生产车间","type":"","value":"","describe":""},
{"field":"","label":"","type":"","describe":"","value":""}
],
"otherRequest": [
{"field":"","label":"","type":"","describe":"","value":""},
{"field":"","label":"","type":"","describe":"","value":""},
{"field":"","label":"","type":"","describe":"","value":""},
{"field":"","label":"","type":"","describe":"","value":""}
]
}
请求参数
在请求参数中,我们需要关注以下几个关键字段:
FormId
: 表单ID,这里我们使用PRD_MO
表示生产订单。FieldKeys
: 查询的字段集合,需要将所有需要查询的字段以逗号分隔的字符串形式传递。FilterString
: 用于过滤查询结果,例如可以根据时间戳和状态过滤。
示例请求体如下:
{
"FormId": "PRD_MO",
"FieldKeys": [
"FID",
"FBillNo",
"FTreeEntity_FEntryId",
...
],
"FilterString": "FConveyDate>='{{LAST_SYNC_TIME|dateTime}}' and FStatus in(3,4)",
...
}
调用接口
通过轻易云平台,我们可以使用配置好的元数据和请求参数来调用金蝶云星空的executeBillQuery
接口。具体步骤如下:
- 设置分页参数:由于可能存在大量数据,我们需要设置分页参数,如每页500条记录。
- 构建请求体:根据元数据配置构建请求体,包括表单ID、字段集合和过滤条件等。
- 发送请求:使用POST方法发送请求到金蝶云星空API。
- 处理响应:解析返回的数据,并根据业务需求进行初步加工。
示例代码片段:
import requests
url = 'https://api.kingdee.com/executeBillQuery'
headers = {'Content-Type': 'application/json'}
payload = {
'FormId': 'PRD_MO',
'FieldKeys': 'FID,FBillNo,FTreeEntity_FEntryId,...',
'FilterString': f"FConveyDate>='{last_sync_time}' and FStatus in(3,4)",
'Limit': pagination['pageSize'],
'StartRow': start_row
}
response = requests.post(url, json=payload, headers=headers)
data = response.json()
# 数据处理逻辑
for record in data:
process_record(record)
数据加工
获取到原始数据后,需要对其进行初步加工。例如,可以根据业务需求对特定字段进行格式转换或计算衍生值。以下是一个简单的数据加工示例:
def process_record(record):
# 转换日期格式
record['FCREATEDATE'] = convert_date_format(record['FCREATEDATE'])
# 根据业务状态生成描述
status_map = {
'1': '计划',
'2': '计划确认',
...
}
record['FStatusDesc'] = status_map.get(record['FStatus'], '未知状态')
# 日期格式转换函数示例
def convert_date_format(date_str):
from datetime import datetime
return datetime.strptime(date_str, '%Y-%m-%dT%H:%M:%S').strftime('%Y-%m-%d %H:%M:%S')
通过上述步骤,我们可以高效地从金蝶云星空获取生产订单数据,并进行必要的数据加工,为后续的数据集成和分析打下坚实基础。
数据集成生命周期第二步:ETL转换与写入MySQLAPI接口
在数据集成生命周期的第二步,我们重点关注如何将已经集成的源平台数据进行ETL转换,并转为目标平台MySQL API接口所能够接收的格式,最终写入目标平台。本文将详细解析这一过程中的技术细节和实现方法。
元数据配置解析
在本案例中,我们需要将金蝶生产订单的数据对接到商城系统,并最终写入MySQL数据库。以下是元数据配置的详细解析:
{
"api": "execute",
"effect": "EXECUTE",
"method": "SQL",
"number": "id",
"id": "id",
"name": "id",
"idCheck": true,
"request": [
{
"field": "main_params",
"label": "主参数",
"type": "object",
"describe": "对应主语句内的动态参数",
"value": "_function CAST('{FPlanFinishDate}' as DATETIME)",
"children": [
{"field": "order_code", "label": "订单编号", "type": "string", "describe":"店铺名称", "value":"{FSaleOrderNo}"},
{"field": "FBillNo", "label":"生产订单号", "type":"string", "describe":"部门", "value":"{FBillNo}"},
{"field":"FStatus", "label":"状态", "type":"string", "value":"{FStatus}"},
{"field":"create_time","label":"创建时间", "type":"string", "value":"_function CAST('{FCREATEDATE}' as DATETIME)"},
{"field":"update_time","label":"更新时间", "type":"string", "value":"_function now()"},
{"field":"FMATERIALID_FNumber","label":"物料编码","type":"string","value":"{FMATERIALID_FNumber}"},
{"field":"FPlanStartDate","label":"排产开始时间","type":"datetime","value":"_function CAST('{FPlanStartDate}' as DATETIME)"},
{"field":"FPlanFinishDate","label":"排产结束时间","type":"datetime","value":"_function CAST('{FPlanFinishDate}' as DATETIME)"}
]
}
],
"otherRequest":[
{
"field": "main_sql",
"label": "主语句",
"type": "string",
"describe": "SQL首次执行的语句,将会返回:lastInsertId",
"value":
"INSERT INTO `middle_order_prdmq` (`order_code`, `FBillNo`, `FStatus`, `create_time`, `update_time`, `FMATERIALID_FNumber`, `FPlanStartDate`, `FPlanFinishDate`) VALUES (:order_code,:FBillNo,:FStatus,:create_time,:update_time,:FMATERIALID_FNumber,:FPlanStartDate,:FPlanFinishDate)"
}
]
}
ETL转换过程
-
提取(Extract):
- 从源平台(金蝶生产订单)提取数据。假设我们已经完成了数据请求与清洗阶段,当前获得的数据包含多个字段,如订单编号、生产订单号、状态、创建时间、更新时间、物料编码、排产开始时间和排产结束时间等。
-
转换(Transform):
- 将提取到的数据进行必要的转换,以符合目标平台MySQL API接口所需的格式。
- 使用元数据配置中的
main_params
字段,对应主语句内的动态参数进行处理。例如,将日期字符串转换为DATETIME类型:_function CAST('{FCREATEDATE}' as DATETIME)
- 动态参数包括:
order_code
:{FSaleOrderNo}
FBillNo
:{FBillNo}
FStatus
:{FStatus}
create_time
:_function CAST('{FCREATEDATE}' as DATETIME)
update_time
:_function now()
FMATERIALID_FNumber
:{FMATERIALID_FNumber}
FPlanStartDate
:_function CAST('{FPlanStartDate}' as DATETIME)
FPlanFinishDate
:_function CAST('{FPlanFinishDate}' as DATETIME)
-
加载(Load):
- 将转换后的数据通过API接口写入目标平台(MySQL数据库)。
- 使用元数据配置中的
main_sql
字段,定义SQL插入语句:INSERT INTO `middle_order_prdmq` (`order_code`, `FBillNo`, `FStatus`, `create_time`, `update_time`, `FMATERIALID_FNumber`, `FPlanStartDate`, `FPlanFinishDate`) VALUES (:order_code, :FBillNo, :FStatus, :create_time, :update_time, :FMATERIALID_FNumber, :FPlanStartDate, :FPlanFinishDate)
API接口调用示例
通过上述步骤,我们可以构建一个完整的API接口调用流程。以下是一个简化的示例代码,用于展示如何通过API接口将转换后的数据写入MySQL数据库:
import requests
# 定义API URL
api_url = 'http://example.com/api/execute'
# 构建请求体
payload = {
'main_params': {
'order_code': 'SO123456',
'FBillNo': 'PO654321',
'FStatus': 'Completed',
'create_time': '2023-10-01 12:00:00',
'update_time': '2023-10-01 12:00:00',
'FMATERIALID_FNumber': 'MAT001',
'FPlanStartDate': '2023-10-01 08:00:00',
'FPlanFinishDate': '2023-10-01 18:00:00'
},
'main_sql': """
INSERT INTO middle_order_prdmq
(order_code, FBillNo, FStatus, create_time, update_time, FMATERIALID_FNumber, FPlanStartDate, FPlanFinishDate)
VALUES (:order_code, :FBillNo, :FStatus, :create_time, :update_time, :FMATERIALID_FNumber, :FPlanStartDate, :FPlanFinishDate)
"""
}
# 发起POST请求
response = requests.post(api_url, json=payload)
# 检查响应状态
if response.status_code == 200:
print('Data successfully inserted into MySQL database.')
else:
print('Failed to insert data:', response.text)
通过上述技术案例,我们展示了如何利用轻易云数据集成平台完成ETL转换,并通过API接口将数据写入目标平台MySQL数据库。该过程不仅提高了数据处理效率,还确保了不同系统间的数据无缝对接。