生产领料单数据集成到MySQL案例分享
在本技术案例中,我们将探讨如何通过轻易云数据集成平台将金蝶云星空的生产领料单数据高效地集成到MySQL数据库中。本次方案主要解决了以下几个关键问题:
首先,通过调用金蝶云星空的API接口executeBillQuery
,我们实现了定时可靠的数据抓取,并确保每个生产领料单不漏单。为了应对大规模数据处理的需求,采用批量数据写入策略,使大量数据能够高速且稳定地流入MySQL。
其次,在处理分页和限流问题时,我们设计了一套自定义的数据转换逻辑,以确保对于金蝶云星空接口返回的大量分页数据能够被有效整合并传递至目标数据库。同时,为了保证不同系统间的数据格式差异得以顺利过渡,自定义映射规则成为必不可少的一环。
实际操作过程中,我们还面临着API请求失败或返回异常的情况,为此特别配置了错误重试机制并强化监控告警功能。这不仅保证了系统运行过程中的高可用性,还为运维人员提供实时状态追踪和性能分析能力,实现全流程透明化管理。此外,通过集中化控制台和视图简明呈现API资产使用状况,也方便企业进行资源优化与再分配。
最后,本方案充分利用MySQL强大的写入能力及其灵活性,不仅提升了整体数据信息处理效率,更让企业业务决策有坚实的数据支持。在接下来的内容中,将详细讲解各步骤具体实施细节及经验总结。
调用金蝶云星空接口executeBillQuery获取并加工数据
在数据集成的生命周期中,调用源系统接口获取数据是至关重要的一步。本文将详细介绍如何通过轻易云数据集成平台调用金蝶云星空的executeBillQuery
接口,获取生产领料单的数据,并进行初步加工。
接口配置与请求参数
首先,我们需要配置调用金蝶云星空接口的元数据。以下是元数据配置的关键部分:
{
"api": "executeBillQuery",
"effect": "QUERY",
"method": "POST",
"number": "FBillNo",
"id": "FEntity_FEntryID",
"name": "{random}",
"request": [
{"field":"FEntity_FEntryID","label":"id","type":"string","describe":"id","value":"FEntity_FEntryID"},
{"field":"FMoEntrySeq","label":"生产订单行号","type":"string","describe":"生产订单号","value":"FMoEntrySeq"},
{"field":"FID","label":"实体主键","type":"string","describe":"实体主键","value":"FID"},
{"field":"FBillNo","label":"单据编号","type":"string","describe":"单据编号","value":"FBillNo"},
{"field":"FDate","label":"日期","type":"string","describe":"日期","value":"FDate"},
{"field":"FPrdOrgId","label":"生产组织","type":"string","describe":"生产组织","value":"FPrdOrgId.FNumber"},
{"field":"FStockOrgId","label":"发料组织","type":"string","describe":"发料组织","value":"FStockOrgId.FNumber"},
{"field":"FMoBillNo","label":"生产订单编号","type":"string","describe":"","value":""},
...
],
...
}
在这个配置中,api
字段指定了要调用的API名称为executeBillQuery
,method
字段指定了请求方法为POST。request
数组定义了需要从金蝶云星空获取的字段及其映射关系。
构建请求体
根据元数据配置,我们需要构建一个POST请求体来调用金蝶云星空的API。以下是一个示例请求体:
{
"FormId": "PRD_PickMtrl",
"FieldKeys": [
"FEntity_FEntryID",
"FMoEntrySeq",
"FID",
"FBillNo",
...
],
"FilterString": "FApproveDate>='2022-01-01' and FMaterialId.FCategoryID <> '包材'",
"Limit": 2000,
"StartRow": 0
}
在这个请求体中,FormId
指定了业务对象表单ID,FieldKeys
数组列出了需要查询的字段,FilterString
用于过滤条件,分页参数包括Limit
和StartRow
。
调用API并处理响应
发送POST请求后,我们会收到一个包含所需数据的响应。以下是处理响应的示例代码:
import requests
import json
url = 'https://api.kingdee.com/executeBillQuery'
headers = {'Content-Type': 'application/json'}
payload = {
'FormId': 'PRD_PickMtrl',
'FieldKeys': 'FEntity_FEntryID,FMoEntrySeq,FID,FBillNo,...',
'FilterString': 'FApproveDate>="2022-01-01" and FMaterialId.FCategoryID <> \'包材\'',
'Limit': '2000',
'StartRow': '0'
}
response = requests.post(url, headers=headers, data=json.dumps(payload))
data = response.json()
# 数据处理逻辑
for record in data:
process_record(record)
在这个示例中,我们使用Python的requests库发送POST请求,并将响应解析为JSON格式。然后,我们可以遍历每条记录并进行相应的数据处理。
数据清洗与转换
在获取到原始数据后,需要对其进行清洗和转换,以便写入目标数据库(如MySQL)。以下是一些常见的数据清洗操作:
- 字段重命名:将原始字段名转换为目标数据库中的字段名。
- 数据类型转换:确保每个字段的数据类型符合目标数据库的要求。
- 缺失值处理:填充或删除缺失值,以保证数据完整性。
例如,可以使用Pandas库进行数据清洗:
import pandas as pd
# 假设data是从API响应中提取的数据列表
df = pd.DataFrame(data)
# 字段重命名
df.rename(columns={
'FEntity_FEntryID': 'entry_id',
'FBillNo': 'bill_no',
...
}, inplace=True)
# 数据类型转换
df['entry_id'] = df['entry_id'].astype(int)
df['bill_no'] = df['bill_no'].astype(str)
# 缺失值处理
df.fillna({'entry_id': -1, 'bill_no': ''}, inplace=True)
# 将清洗后的数据写入MySQL数据库
df.to_sql('production_pick_list', con=mysql_connection, if_exists='append', index=False)
通过上述步骤,我们可以高效地从金蝶云星空获取生产领料单的数据,并进行必要的清洗和转换,为后续的数据写入做好准备。这种全生命周期管理的方法不仅提高了业务透明度,还极大地提升了数据处理效率。
数据请求与清洗
在轻易云数据集成平台的生命周期中,数据请求与清洗是将源平台的数据获取并进行初步处理的关键步骤。这个过程涉及到从源系统中提取原始数据,并对其进行必要的清洗和预处理,以确保数据的准确性和一致性。
数据转换与写入
在数据请求与清洗完成后,接下来就是将这些已经处理好的数据进行ETL转换,并写入目标平台MySQL。这个过程主要包括以下几个步骤:
1. 配置API接口
首先,我们需要配置API接口以便将数据写入MySQL。根据提供的元数据配置,我们可以看到API接口的基本信息如下:
{
"api": "execute",
"effect": "EXECUTE",
"method": "POST",
"idCheck": true,
"request": [
{
"field": "main_params",
"label": "main_params",
"type": "object",
"describe": "111",
"value": "1",
"children": [
{"field": "fid", "label": "单据id", "type": "string", "value":"{FID}"},
{"field": "document_id", "label": "文档唯一标识号", "type":"string", "describe":"111", "value":"{FID}-{FEntity_FEntryID}"},
{"field": "fbill_no", "label": "单据编号", "type":"string", "describe":"111", "value":"{FBillNo}"},
{"field": "fentry_id","label":"明细id","type":"string","describe":"111","value":"{FEntity_FEntryID}"},
{"field":"fdate","label":"日期","type":"string","describe":"111","value":"{{FDate|date}}"},
{"field":"fmaterialid_fnumber","label":"原料编码","type":"string","value":"{FMaterialId_FNumber}"},
{"field":"fmaterialid_name","label":"原料名称","type":"string","value":"{FMaterialName}"},
{"field":"flot","label":"批号","type":"string","value":"{FLot}"},
{"field":"fmaterialid_fnsbsccj","label":"生产厂家","type":"string","value":"{FMaterialId_F_nsb_sccj}"},
{"field":"fparent_materialId_fnumber","label":"产品编码","type":"string","value":"{FParentMaterialId_FNumber}"}
// ...其他字段省略
]
}
],
// ...其他配置省略
}
2. 数据映射与转换
在配置好API接口后,我们需要对源平台的数据进行映射和转换,以符合目标平台MySQL API所能接收的格式。根据元数据配置中的字段映射关系,可以看到每个字段都对应一个具体的值,例如:
fid
对应{FID}
document_id
对应{FID}-{FEntity_FEntryID}
fbill_no
对应{FBillNo}
fdate
对应{{FDate|date}}
这些映射关系确保了从源平台提取的数据能够正确地转换为目标平台所需的格式。
3. 构建SQL语句
接下来,我们需要构建用于插入或更新MySQL数据库的SQL语句。根据元数据配置中的main_sql
字段,我们可以看到完整的SQL语句模板如下:
INSERT INTO scll
(fid, document_id, fbill_no, fentry_id, fdate, fmaterialid_fcategory, fmaterialid_fnumber, fmaterialid_name, flot, fmaterialid_fnsbsccj, fparent_materialId_fnumber, fparent_materialId_fname, fparent_materialId_fbarcode, fmobillno, fmoentry_seq, fqty, funitid_name, fsend_flag, created_at, updated_at,type)
VALUES
(:fid, :document_id, :fbill_no, :fentry_id, :fdate, :fmaterialid_fcategory, :fmaterialid_fnumber, :fmaterialid_name, :flot, :fmaterialid_fnsbsccj, :fparent_materialId_fnumber,:fparent_materialId_fname,:fparent_materialId_fbarcode,:fmobillno,:fmoentry_seq,:fqty,:funitid_name,:fsend_flag,:created_at,:updated_at,:type)
ON DUPLICATE KEY UPDATE
document_id = VALUES(document_id),
fbill_no = VALUES(fbill_no),
fentry_id = VALUES(fentry_id),
fdate = VALUES(fdate),
fmaterialid_fcategory = VALUES(fmaterialid_fcategory),
fmaterialid_fnumber = VALUES(fmaterialid_fnumber),
fmaterialid_name = VALUES(fmaterialid_name),
flot = VALUES(flot),
fmaterialid_fnsbsccj = VALUES(fmaterialid_fnsbsccj),
fparent_materialId_fnumber = VALUES(fparent_materialId_fnumber),
fparent_materialId_fname = VALUES(fparent_materialId_fname),
fparent_materialId_fbarcode = VALUES(fparent_materialId_fbarcode),
fmobillno = VALUES(fmobillno),
fmoentry_seq = VALUES(fmoentry_seq),
fqty = VALUES(fqty),
funitid_name = VALUES(funitid_name),
fsend_flag = VALUES(fsend_flag),
created_at = VALUES(created_at),
updated_at = VALUES(updated_at),
type = VALUES(type);
该SQL语句使用了参数化查询,确保了数据安全性和执行效率。
4. 执行API调用
最后一步是通过POST方法调用API接口,将构建好的SQL语句和对应的数据发送到目标平台MySQL。这一步通常涉及到HTTP请求库(如axios、requests等)的使用,通过发送HTTP POST请求来执行数据库操作。
const axios = require('axios');
const dataPayload = {
main_params: {
fid: '12345',
document_id: '12345-67890',
fbill_no: 'BILL123',
// ...其他字段
},
main_sql: `INSERT INTO scll (fid,...`
};
axios.post('https://api.targetplatform.com/execute', dataPayload)
.then(response => {
console.log('Data successfully written to MySQL:', response.data);
})
.catch(error => {
console.error('Error writing data to MySQL:', error);
});
通过以上步骤,我们实现了从源平台到目标平台MySQL的数据ETL转换与写入,确保了数据在不同系统间的无缝对接和高效传输。