案例分享:库存校准-金蝶有仓位-1 集成方案
在企业的日常运营中,库存数据的准确性直接影响到供应链和生产计划的执行效率。为了实现高效的数据同步与精准控制,我们将重点讨论如何通过轻易云平台,成功对接金蝶云星空和旺店通·企业奇门,实现全流程库存校准。
技术方案概述
集成项目命名为“库存校准-金蝶有仓位-1”,旨在解决两个系统间的数据不一致问题,通过API接口技术保障数据流动的一致性、实时性及可靠性。本案例主要涉及以下几个技术要点:
-
调用金蝶云星空API(executeBillQuery)获取数据:
- 批量抓取库存数据,并处理分页及限流问题以确保稳定的数据拉取。
-
快速写入大量数据至旺店通·企业奇门API(wdt.stock.sync.by.pd):
- 配合高吞吐量的数据写入能力,使大批量的库存信息能够快速、安全地同步至目标系统中。
-
自定义数据转换逻辑:
- 针对两个系统间存在的数据格式差异,自定义转换规则,以保证最终写入的数据符合目的端要求。
-
实时监控与告警机制:
- 对于整个集成过程中的各个阶段进行集中监控。一旦检测到异常情况,如网络故障或接口响应失败,即时触发告警并启动错误重试机制,确保任务顺利完成。
-
可视化操作界面与日志记录功能
- 通过轻易云平台提供的可视化操作界面,让开发人员能够直观设计并管理数据流,同时详细记录每一步骤产生的日志信息,用以审计和故障定位。
这套方案既关注了业务需求,又强调了技术实践,为后续具体实施奠定了坚实基础。在下一部分内容中,我们将深入探讨具体步骤以及关键代码实现细节。
调用源系统金蝶云星空接口executeBillQuery获取并加工数据
在数据集成的生命周期中,调用源系统接口是至关重要的一步。本文将详细探讨如何通过轻易云数据集成平台调用金蝶云星空的executeBillQuery
接口,获取并加工库存校准数据。
API接口配置
首先,我们需要配置API接口的基本信息。根据元数据配置,API为executeBillQuery
,请求方法为POST
。以下是具体的请求字段及其含义:
- FID: 库存记录唯一标识
- FStockId: 仓库ID
- FMaterialId: 物料ID
- FBaseQty: 库存量
- FBaseAVBQty: 可用量
- FLot: 批次号
- FUpdateTime: 最后更新日期
- FOwnerId: 货主ID
- FKeeperId: 保管者ID
- FStockOrgId: 库存组织ID
- FOwnerTypeId: 货主类型ID
- FStockId_FNumber: 仓库编码
- FMaterialId_FNumber: 物料编码
- FOwnerId_FNumber: 货主编码
- FKeeperId_FNumber: 保管者编码
- FStockOrgId_FNumber: 库存组织编码
- FProduceDate: 生产日期
- FMtoNo: 计划跟踪号
- FStockStatusId: 库存状态
请求参数设置
为了确保查询结果的准确性和效率,我们需要设置分页参数和过滤条件:
{
"Limit": "{PAGINATION_PAGE_SIZE}",
"StartRow": "{PAGINATION_START_ROW}",
"TopRowCount": null,
"FilterString": "FStockLocId > 0 AND FUpdateTime >= '{{LAST_SYNC_TIME|datetime}}' and FStockId.F_UOMS_CHECKBOX = 1",
"FieldKeys": [
"FID",
"FStockId",
"FMaterialId",
"FBaseQty",
"FBaseAVBQty",
"FLot",
"FUpdateTime",
"FOwnerId",
"FKeeperId",
"FStockOrgId",
"FOwnerTypeId",
"FStockId.FNumber",
"FMaterialId.FNumber",
"FOwnerId.FNumber",
"FKeeperId.FNumber",
"FStockOrgId.FNumber",
"FProduceDate",
"FMtoNo",
"FStockStatusId"
],
"FormId": "STK_Inventory"
}
数据请求与清洗
在请求数据时,我们会根据上述配置向金蝶云星空发送POST请求,并获取响应数据。接下来,需要对返回的数据进行清洗和加工,以便后续的数据转换与写入。
-
数据清洗
- 确保所有字段都已正确映射。
- 删除无效或重复的数据。
- 格式化日期字段,如
FUpdateTime
。
-
数据加工
- 根据业务需求,对库存量和可用量进行计算。
- 合并同一物料在不同仓库中的库存信息。
示例代码
以下是一个简化的示例代码,用于调用金蝶云星空接口并处理返回的数据:
import requests
def fetch_inventory_data(api_url, headers, payload):
response = requests.post(api_url, headers=headers, json=payload)
if response.status_code == 200:
data = response.json()
# 数据清洗与加工逻辑
cleaned_data = []
for record in data['Result']:
cleaned_record = {
'FID': record['FID'],
'仓库ID': record['FStockId'],
'物料ID': record['FMaterialId'],
'库存量': float(record['FBaseQty']),
'可用量': float(record['FBaseAVBQty']),
'批次号': record['FLot'],
'最后更新日期': record['FUpdateTime'],
'货主ID': record['FOwnerId'],
'保管者ID': record['FKeeperId'],
'库存组织ID': record['FStockOrgId']
}
cleaned_data.append(cleaned_record)
return cleaned_data
else:
raise Exception(f"Failed to fetch data, status code {response.status_code}")
# 配置API URL和Headers等信息
api_url = 'https://api.kingdee.com/executeBillQuery'
headers = {'Content-Type': 'application/json'}
payload = {
# 填入上述配置的JSON内容...
}
# 调用函数获取并处理数据
inventory_data = fetch_inventory_data(api_url, headers, payload)
print(inventory_data)
通过以上步骤,我们成功地调用了金蝶云星空的executeBillQuery
接口,并对返回的数据进行了清洗和加工,为后续的数据转换与写入做好了准备。这一步骤不仅确保了数据的一致性和准确性,还为业务决策提供了可靠的数据支持。
利用轻易云数据集成平台进行ETL转换并写入旺店通·企业奇门API接口
在数据集成生命周期的第二阶段,我们需要将已经集成的源平台数据进行ETL转换,转为目标平台——旺店通·企业奇门API接口所能够接收的格式,最终写入目标平台。以下是具体的技术实现步骤和细节。
元数据配置解析
根据提供的元数据配置,我们需要将源平台的数据转换为旺店通·企业奇门API接口可接受的格式。以下是元数据配置的详细解析:
{
"api": "wdt.stock.sync.by.pd",
"method": "POST",
"idCheck": true,
"operation": {
"method": "merge",
"field": "FStockId_FNumber",
"bodyName": "detail_list",
"header": ["FStockId_FNumber"],
"body": ["FMaterialId_FNumber", "FBaseQty", "FStockLocId"]
},
"request": [
{
"field": "warehouse_no",
"label": "仓库编号",
"type": "string",
"describe": "代表仓库所有属性的唯一编码,用于仓库区分,ERP内支持自定义(ERP仓库界面设置),用于创建指定仓库单据信息",
"value": "{FStockId_FNumber}"
},
{
"field": "mode",
"label": "盘点方式",
"type": "string",
"describe": "0表示单品盘点,1表示货位盘点,如果mode没有传参或数值无效 默认为0单品盘点",
"value": 1
},
{
"field": "api_outer_no",
...
数据请求与清洗
首先,我们需要从源系统获取原始数据并进行清洗。在这个过程中,我们会确保数据的一致性和完整性。例如,我们需要确保每个FStockId_FNumber
都有对应的库存信息,并且这些信息是最新和准确的。
数据转换
接下来,我们将清洗后的数据进行ETL转换。根据元数据配置中的operation
字段,我们需要将源系统的数据字段映射到目标系统所需的字段格式。例如:
FStockId_FNumber
映射到warehouse_no
FMaterialId_FNumber
映射到spec_no
FBaseQty
映射到stock_num
FStockLocId
映射到position_no
以下是一个示例代码片段,展示了如何进行字段映射和数据转换:
def transform_data(source_data):
transformed_data = []
for item in source_data:
transformed_item = {
'warehouse_no': item['FStockId_FNumber'],
'mode': 1,
'api_outer_no': generate_unique_id(),
'is_check': '1',
'is_post_error': '1',
'is_create_stock': '1',
'goods_list': []
}
for detail in item['detail_list']:
goods_detail = {
'spec_no': detail['FMaterialId_FNumber'],
'stock_num': detail['FBaseQty'],
'position_no': detail['FStockLocId']
}
transformed_item['goods_list'].append(goods_detail)
transformed_data.append(transformed_item)
return transformed_data
数据写入
最后一步是将转换后的数据通过POST请求写入到旺店通·企业奇门API接口中。我们使用HTTP POST方法,将JSON格式的数据发送到指定的API端点。
以下是一个示例代码片段,展示了如何发送POST请求:
import requests
import json
def write_to_target_platform(transformed_data):
url = 'https://api.wangdian.cn/openapi2/wdt.stock.sync.by.pd'
headers = {
'Content-Type': 'application/json'
}
for data in transformed_data:
response = requests.post(url, headers=headers, data=json.dumps(data))
if response.status_code == 200:
print(f"Data successfully written: {response.json()}")
else:
print(f"Failed to write data: {response.text}")
# Example usage
source_data = fetch_source_data()
transformed_data = transform_data(source_data)
write_to_target_platform(transformed_data)
通过上述步骤,我们可以实现从源平台的数据获取、清洗、转换,并最终写入到旺店通·企业奇门API接口中。这一过程充分利用了轻易云数据集成平台提供的全生命周期管理功能,实现了高效、透明的数据处理和集成。