从金蝶云星空到旺店通·企业奇门的数据集成方案
在实际的企业数据对接中,确保数据的准确、及时传输是一项挑战。本文将分享一个具体技术案例——如何高效地将金蝶云星空的数据集成到旺店通·企业奇门平台,实现网店管理数据的无缝拉取与同步。
此次集成主要涉及两个API接口调用:获取金蝶云星空的数据(executeBillQuery)和写入旺店通·企业奇门的库存同步API(wdt.stock.sync.by.pd)。整个过程重点解决了以下几个关键技术问题:
-
保证数据完整性:使用 executeBillQuery 接口,定时且可靠地抓取金蝶云星空网店管理数据,同时通过分页和限流机制处理大规模数据请求,确保没有漏单。
-
数据质量监控与异常处理:提供支持自定义逻辑的数据转换工具,并实时监控任务状态。一旦检测到异常情况,可以立即启动告警系统并实施错误重试策略,提高整体系统稳定性。
-
大量数据快速写入:针对旺店通·企业奇门平台的大量库存同步需求,通过 wdt.stock.sync.by.pd API 实现批量、高效的数据写入。同时结合优化后的数据库操作方法,大幅提升了处理时效性。
-
应对格式差异与映射关系:由于两个平台所用的字段、结构存在一定差异,在进行接口对接前进行了充分准备,通过可视化设计工具直观配置了必要的数据映射规则,避免手动调整带来的误差和工作负担。
通过这些措施,我们能够成功实现从金蝶云星空到旺店通·企业奇门全流程、多维度、高效率而且可监控的数据集成。下文将详细展示实际运行中的解决方案,包括关键代码片段及操作步骤。
调用金蝶云星空接口executeBillQuery获取并加工数据
在数据集成的生命周期中,调用源系统接口获取数据是至关重要的一步。本文将详细探讨如何通过轻易云数据集成平台调用金蝶云星空的executeBillQuery
接口来获取网店管理数据,并对其进行初步加工。
接口配置与调用
首先,我们需要配置元数据以便正确调用金蝶云星空的executeBillQuery
接口。以下是元数据配置的详细说明:
{
"api": "executeBillQuery",
"method": "POST",
"number": "FName",
"id": "FNumber",
"pagination": {
"pageSize": 500
},
"request": [
{"field":"FID","label":"FID","type":"string","value":"FID"},
{"field":"FName","label":"名称","type":"string","value":"FName"},
{"field":"FNumber","label":"编码","type":"string","value":"FNumber"},
{"field":"FSaleOrgId_FNumber","label":"对应销售组织","type":"string","value":"FSaleOrgId.FNumber"},
{"field":"FCustomerId_FNumber","label":"对应客户","type":"string","value":"FCustomerId.FNumber"},
{"field":"FCustomerId_FName","label":"对应客户名","type":"string","value":"FCustomerId.FName"},
{"label":"库存组织","field":"FStockOrgId_FNumber","type":"string","value":"FStockOrgId.FNumber"}
],
"otherRequest": [
{"field":"Limit","label":"最大行数","type":"string","describe":"金蝶的查询分页参数","value":"{PAGINATION_PAGE_SIZE}"},
{"field":"StartRow","label":"开始行索引","type":"string","describe":"金蝶的查询分页参数","value":"{PAGINATION_START_ROW}"},
{"field":"TopRowCount","label":"返回总行数","type":"int","describe":"","value":""},
{"field":"FilterString","label":"","type":"","describe":"","value":""},
{"field": "FieldKeys", "label": "", "type": "", "describe": "", "parser":{"name": "ArrayToString", "params": ","}},
{"field": "FormId", "label": "", "type": "", "describe": "", "value": ""}
]
}
请求参数解析
-
基本字段:
FID
: 数据唯一标识符。FName
: 名称。FNumber
: 编码。FSaleOrgId_FNumber
: 对应销售组织。FCustomerId_FNumber
: 对应客户编号。FCustomerId_FName
: 对应客户名。FStockOrgId_FNumber
: 库存组织编号。
-
分页参数:
Limit
: 每次请求返回的数据条数,使用{PAGINATION_PAGE_SIZE}
动态替换。StartRow
: 数据起始行索引,使用{PAGINATION_START_ROW}
动态替换。
-
过滤条件:
FilterString
: 可用于设置查询条件,例如根据修改日期过滤:FMODIFYDATE>='{{LAST_SYNC_TIME|datetime}}'
。
-
字段集合:
FieldKeys
: 查询字段的键集合,通过数组转字符串处理。
-
业务对象表单ID:
FormId
: 必须填写金蝶的表单ID,例如:ECC_Shop
。
数据请求与清洗
通过上述配置,我们可以发起对金蝶云星空接口的POST请求。以下是一个示例请求体:
{
"FormId": "ECC_Shop",
"FieldKeys": ["FID", "FName", "FNumber",
"FSaleOrgId.FNumber",
"FCustomerId.FNumber",
"FCustomerId.FName",
"FStockOrgId.FNumber"].join(","),
"FilterString": "FMODIFYDATE>='2023-01-01'",
"Limit": 500,
"StartRow": 0
}
在实际操作中,轻易云平台会自动处理分页逻辑,确保所有符合条件的数据都能被完整拉取。
数据转换与写入
在获取到原始数据后,我们需要对其进行清洗和转换,以便后续处理和存储。例如,可以将日期格式统一、去除冗余字段、合并相关信息等。以下是一个简单的数据清洗示例:
import pandas as pd
# 假设data是从接口获取到的数据列表
data = [
{"FID": "...", ...},
...
]
df = pd.DataFrame(data)
# 清洗和转换操作
df['ModifiedDate'] = pd.to_datetime(df['FMODIFYDATE'])
df.drop(columns=['UnnecessaryColumn'], inplace=True)
# 转换后的DataFrame可以进一步处理或写入目标系统
通过上述步骤,我们完成了从调用源系统接口到初步数据清洗的全过程。这为后续的数据转换与写入奠定了坚实基础,使得整个数据集成过程更加高效和可靠。
数据集成生命周期第二步:ETL转换与写入旺店通·企业奇门API接口
在数据集成的生命周期中,ETL(Extract, Transform, Load)转换是至关重要的一环。本文将深入探讨如何将已经集成的源平台数据进行ETL转换,并转为目标平台旺店通·企业奇门API接口所能够接收的格式,最终写入目标平台。
API接口配置与数据映射
在进行ETL转换之前,我们需要了解目标API接口的具体要求。以下是旺店通·企业奇门API接口wdt.stock.sync.by.pd
的元数据配置:
{
"api": "wdt.stock.sync.by.pd",
"method": "POST",
"idCheck": true,
"request": [
{"field": "warehouse_no", "label": "仓库编号", "type": "string", "describe": "代表仓库所有属性的唯一编码,用于仓库区分,ERP内支持自定义(ERP仓库界面设置),用于创建指定仓库单据信息"},
{"field": "mode", "label": "盘点方式", "type": "string", "describe": "0表示单品盘点,1表示货位盘点,如果mode没有传参或数值无效 默认为0单品盘点"},
{"field": "api_outer_no", "label": "API单号", "type": "string", "describe": "外部单号唯一标识"},
{"field": "is_check", "label": "是否审核", "type": "string", "describe": "1:自动审核 0:不自动审核 默认0"},
{"field": "is_post_error", "label": "严格模式", "type":"string","describe":"盘点明细推送严格模式,可选值:0或1,不传本参数默认输入值为1。0 非严格模式:表示盘点单货品明细允许部分推送成功(盘点单内spec_no在ERP货品档案存在的部分创建成功,不存在的部分创建失败);1严格模式:表示盘点单货品明细不允许部分推送成功(盘点单内所有spec_no,其中一个或多个spec_no在ERP货品档案不存在,整单推送失败)"},
{"field":"is_create_stock","label":"是否添加库存记录","type":"string","describe":"可选值:0 或1,不传本参数默认输入值为0。0表示不自动添加库存记录,1表示自动添加库存记录 。"},
{"field":"goods_list","label":"货品明细列表节点","type":"array","describe":"货品明细列表节点","children":[
{"field":"spec_no","label":"商家编码","type":"string","describe":"代表单品(sku)所有属性的编码,SKU概念介绍,单击这里","parent":"goods_list"},
{"field":"stock_num","label":"库存数量","type":"string","describe":"库存数量(盘点推送的库存数量指覆盖数量,不是调整数量)","parent":"goods_list"},
{"field":"position_no","label":"货位","type":"string","describe":"货位(当mode为1时,必传)","parent":"goods_list"}
]}
]
}
数据提取与清洗
首先,我们需要从源平台(金蝶网店管理系统)提取数据,并进行必要的数据清洗和预处理。这一步骤确保我们获取的数据是准确且完整的,为后续的转换和加载做好准备。
# 示例代码:从金蝶网店管理系统提取数据
def extract_data_from_kingdee():
# 假设使用某种API或数据库连接提取数据
raw_data = api_call_to_kingdee()
cleaned_data = clean_data(raw_data)
return cleaned_data
def clean_data(data):
# 数据清洗逻辑,例如去除空值、格式化字段等
return cleaned_data
数据转换
接下来,我们需要将清洗后的数据按照旺店通·企业奇门API接口的要求进行转换。这包括字段映射、格式调整以及必要的数据验证。
# 示例代码:将金蝶网店管理系统的数据转换为旺店通·企业奇门API接口所需格式
def transform_data_for_wdt(cleaned_data):
transformed_data = []
for record in cleaned_data:
transformed_record = {
'warehouse_no': record['warehouse_code'],
'mode': '0', # 假设默认为单品盘点
'api_outer_no': generate_unique_api_outer_no(record),
'is_check': '0', # 默认不自动审核
'is_post_error': '1', # 默认严格模式
'is_create_stock': '0', # 默认不自动添加库存记录
'goods_list': []
}
for item in record['items']:
goods_item = {
'spec_no': item['sku_code'],
'stock_num': item['quantity'],
'position_no': item.get('location_code', '') # 如果存在货位信息则填充,否则为空
}
transformed_record['goods_list'].append(goods_item)
transformed_data.append(transformed_record)
return transformed_data
def generate_unique_api_outer_no(record):
# 根据业务逻辑生成唯一API外部单号
return f"OUTER_{record['order_id']}"
数据加载
最后一步是将转换后的数据通过POST请求写入到旺店通·企业奇门API接口中。在这一步,我们需要确保请求格式正确,并处理可能出现的错误响应。
# 示例代码:将转换后的数据写入到旺店通·企业奇门API接口中
import requests
def load_data_to_wdt(transformed_data):
url = 'https://api.wangdian.cn/openapi2/wdt.stock.sync.by.pd'
for record in transformed_data:
response = requests.post(url, json=record)
if response.status_code == 200:
print(f"Record {record['api_outer_no']} successfully loaded.")
else:
print(f"Failed to load record {record['api_outer_no']}: {response.text}")
# 主流程调用示例
cleaned_data = extract_data_from_kingdee()
transformed_data = transform_data_for_wdt(cleaned_data)
load_data_to_wdt(transformed_data)
通过上述步骤,我们实现了从源平台(金蝶网店管理系统)到目标平台(旺店通·企业奇门)的数据ETL转换和加载过程。每一步都涉及具体的技术实现和细节处理,以确保数据能够准确无误地完成集成任务。