深入剖析金蝶云到旺店通的数据转换与加载过程

  • 轻易云集成顾问-胡秀丛
### 从金蝶云星空到旺店通·企业奇门的数据集成方案 在实际的企业数据对接中,确保数据的准确、及时传输是一项挑战。本文将分享一个具体技术案例——如何高效地将金蝶云星空的数据集成到旺店通·企业奇门平台,实现网店管理数据的无缝拉取与同步。 此次集成主要涉及两个API接口调用:获取金蝶云星空的数据(executeBillQuery)和写入旺店通·企业奇门的库存同步API(wdt.stock.sync.by.pd)。整个过程重点解决了以下几个关键技术问题: 1. **保证数据完整性**:使用 executeBillQuery 接口,定时且可靠地抓取金蝶云星空网店管理数据,同时通过分页和限流机制处理大规模数据请求,确保没有漏单。 2. **数据质量监控与异常处理**:提供支持自定义逻辑的数据转换工具,并实时监控任务状态。一旦检测到异常情况,可以立即启动告警系统并实施错误重试策略,提高整体系统稳定性。 3. **大量数据快速写入**:针对旺店通·企业奇门平台的大量库存同步需求,通过 wdt.stock.sync.by.pd API 实现批量、高效的数据写入。同时结合优化后的数据库操作方法,大幅提升了处理时效性。 4. **应对格式差异与映射关系**:由于两个平台所用的字段、结构存在一定差异,在进行接口对接前进行了充分准备,通过可视化设计工具直观配置了必要的数据映射规则,避免手动调整带来的误差和工作负担。 通过这些措施,我们能够成功实现从金蝶云星空到旺店通·企业奇门全流程、多维度、高效率而且可监控的数据集成。下文将详细展示实际运行中的解决方案,包括关键代码片段及操作步骤。 ![用友BIP接口开发配置](https://pic.qeasy.cloud/D39.png~tplv-syqr462i7n-qeasy.image) ### 调用金蝶云星空接口executeBillQuery获取并加工数据 在数据集成的生命周期中,调用源系统接口获取数据是至关重要的一步。本文将详细探讨如何通过轻易云数据集成平台调用金蝶云星空的`executeBillQuery`接口来获取网店管理数据,并对其进行初步加工。 #### 接口配置与调用 首先,我们需要配置元数据以便正确调用金蝶云星空的`executeBillQuery`接口。以下是元数据配置的详细说明: ```json { "api": "executeBillQuery", "method": "POST", "number": "FName", "id": "FNumber", "pagination": { "pageSize": 500 }, "request": [ {"field":"FID","label":"FID","type":"string","value":"FID"}, {"field":"FName","label":"名称","type":"string","value":"FName"}, {"field":"FNumber","label":"编码","type":"string","value":"FNumber"}, {"field":"FSaleOrgId_FNumber","label":"对应销售组织","type":"string","value":"FSaleOrgId.FNumber"}, {"field":"FCustomerId_FNumber","label":"对应客户","type":"string","value":"FCustomerId.FNumber"}, {"field":"FCustomerId_FName","label":"对应客户名","type":"string","value":"FCustomerId.FName"}, {"label":"库存组织","field":"FStockOrgId_FNumber","type":"string","value":"FStockOrgId.FNumber"} ], "otherRequest": [ {"field":"Limit","label":"最大行数","type":"string","describe":"金蝶的查询分页参数","value":"{PAGINATION_PAGE_SIZE}"}, {"field":"StartRow","label":"开始行索引","type":"string","describe":"金蝶的查询分页参数","value":"{PAGINATION_START_ROW}"}, {"field":"TopRowCount","label":"返回总行数","type":"int","describe":"","value":""}, {"field":"FilterString","label":"","type":"","describe":"","value":""}, {"field": "FieldKeys", "label": "", "type": "", "describe": "", "parser":{"name": "ArrayToString", "params": ","}}, {"field": "FormId", "label": "", "type": "", "describe": "", "value": ""} ] } ``` #### 请求参数解析 1. **基本字段**: - `FID`: 数据唯一标识符。 - `FName`: 名称。 - `FNumber`: 编码。 - `FSaleOrgId_FNumber`: 对应销售组织。 - `FCustomerId_FNumber`: 对应客户编号。 - `FCustomerId_FName`: 对应客户名。 - `FStockOrgId_FNumber`: 库存组织编号。 2. **分页参数**: - `Limit`: 每次请求返回的数据条数,使用 `{PAGINATION_PAGE_SIZE}` 动态替换。 - `StartRow`: 数据起始行索引,使用 `{PAGINATION_START_ROW}` 动态替换。 3. **过滤条件**: - `FilterString`: 可用于设置查询条件,例如根据修改日期过滤:`FMODIFYDATE>='{{LAST_SYNC_TIME|datetime}}'`。 4. **字段集合**: - `FieldKeys`: 查询字段的键集合,通过数组转字符串处理。 5. **业务对象表单ID**: - `FormId`: 必须填写金蝶的表单ID,例如:`ECC_Shop`。 #### 数据请求与清洗 通过上述配置,我们可以发起对金蝶云星空接口的POST请求。以下是一个示例请求体: ```json { "FormId": "ECC_Shop", "FieldKeys": ["FID", "FName", "FNumber", "FSaleOrgId.FNumber", "FCustomerId.FNumber", "FCustomerId.FName", "FStockOrgId.FNumber"].join(","), "FilterString": "FMODIFYDATE>='2023-01-01'", "Limit": 500, "StartRow": 0 } ``` 在实际操作中,轻易云平台会自动处理分页逻辑,确保所有符合条件的数据都能被完整拉取。 #### 数据转换与写入 在获取到原始数据后,我们需要对其进行清洗和转换,以便后续处理和存储。例如,可以将日期格式统一、去除冗余字段、合并相关信息等。以下是一个简单的数据清洗示例: ```python import pandas as pd # 假设data是从接口获取到的数据列表 data = [ {"FID": "...", ...}, ... ] df = pd.DataFrame(data) # 清洗和转换操作 df['ModifiedDate'] = pd.to_datetime(df['FMODIFYDATE']) df.drop(columns=['UnnecessaryColumn'], inplace=True) # 转换后的DataFrame可以进一步处理或写入目标系统 ``` 通过上述步骤,我们完成了从调用源系统接口到初步数据清洗的全过程。这为后续的数据转换与写入奠定了坚实基础,使得整个数据集成过程更加高效和可靠。 ![打通钉钉数据接口](https://pic.qeasy.cloud/S28.png~tplv-syqr462i7n-qeasy.image) ### 数据集成生命周期第二步:ETL转换与写入旺店通·企业奇门API接口 在数据集成的生命周期中,ETL(Extract, Transform, Load)转换是至关重要的一环。本文将深入探讨如何将已经集成的源平台数据进行ETL转换,并转为目标平台旺店通·企业奇门API接口所能够接收的格式,最终写入目标平台。 #### API接口配置与数据映射 在进行ETL转换之前,我们需要了解目标API接口的具体要求。以下是旺店通·企业奇门API接口`wdt.stock.sync.by.pd`的元数据配置: ```json { "api": "wdt.stock.sync.by.pd", "method": "POST", "idCheck": true, "request": [ {"field": "warehouse_no", "label": "仓库编号", "type": "string", "describe": "代表仓库所有属性的唯一编码,用于仓库区分,ERP内支持自定义(ERP仓库界面设置),用于创建指定仓库单据信息"}, {"field": "mode", "label": "盘点方式", "type": "string", "describe": "0表示单品盘点,1表示货位盘点,如果mode没有传参或数值无效 默认为0单品盘点"}, {"field": "api_outer_no", "label": "API单号", "type": "string", "describe": "外部单号唯一标识"}, {"field": "is_check", "label": "是否审核", "type": "string", "describe": "1:自动审核 0:不自动审核 默认0"}, {"field": "is_post_error", "label": "严格模式", "type":"string","describe":"盘点明细推送严格模式,可选值:0或1,不传本参数默认输入值为1。0 非严格模式:表示盘点单货品明细允许部分推送成功(盘点单内spec_no在ERP货品档案存在的部分创建成功,不存在的部分创建失败);1严格模式:表示盘点单货品明细不允许部分推送成功(盘点单内所有spec_no,其中一个或多个spec_no在ERP货品档案不存在,整单推送失败)"}, {"field":"is_create_stock","label":"是否添加库存记录","type":"string","describe":"可选值:0 或1,不传本参数默认输入值为0。0表示不自动添加库存记录,1表示自动添加库存记录 。"}, {"field":"goods_list","label":"货品明细列表节点","type":"array","describe":"货品明细列表节点","children":[ {"field":"spec_no","label":"商家编码","type":"string","describe":"代表单品(sku)所有属性的编码,SKU概念介绍,单击这里","parent":"goods_list"}, {"field":"stock_num","label":"库存数量","type":"string","describe":"库存数量(盘点推送的库存数量指覆盖数量,不是调整数量)","parent":"goods_list"}, {"field":"position_no","label":"货位","type":"string","describe":"货位(当mode为1时,必传)","parent":"goods_list"} ]} ] } ``` #### 数据提取与清洗 首先,我们需要从源平台(金蝶网店管理系统)提取数据,并进行必要的数据清洗和预处理。这一步骤确保我们获取的数据是准确且完整的,为后续的转换和加载做好准备。 ```python # 示例代码:从金蝶网店管理系统提取数据 def extract_data_from_kingdee(): # 假设使用某种API或数据库连接提取数据 raw_data = api_call_to_kingdee() cleaned_data = clean_data(raw_data) return cleaned_data def clean_data(data): # 数据清洗逻辑,例如去除空值、格式化字段等 return cleaned_data ``` #### 数据转换 接下来,我们需要将清洗后的数据按照旺店通·企业奇门API接口的要求进行转换。这包括字段映射、格式调整以及必要的数据验证。 ```python # 示例代码:将金蝶网店管理系统的数据转换为旺店通·企业奇门API接口所需格式 def transform_data_for_wdt(cleaned_data): transformed_data = [] for record in cleaned_data: transformed_record = { 'warehouse_no': record['warehouse_code'], 'mode': '0', # 假设默认为单品盘点 'api_outer_no': generate_unique_api_outer_no(record), 'is_check': '0', # 默认不自动审核 'is_post_error': '1', # 默认严格模式 'is_create_stock': '0', # 默认不自动添加库存记录 'goods_list': [] } for item in record['items']: goods_item = { 'spec_no': item['sku_code'], 'stock_num': item['quantity'], 'position_no': item.get('location_code', '') # 如果存在货位信息则填充,否则为空 } transformed_record['goods_list'].append(goods_item) transformed_data.append(transformed_record) return transformed_data def generate_unique_api_outer_no(record): # 根据业务逻辑生成唯一API外部单号 return f"OUTER_{record['order_id']}" ``` #### 数据加载 最后一步是将转换后的数据通过POST请求写入到旺店通·企业奇门API接口中。在这一步,我们需要确保请求格式正确,并处理可能出现的错误响应。 ```python # 示例代码:将转换后的数据写入到旺店通·企业奇门API接口中 import requests def load_data_to_wdt(transformed_data): url = 'https://api.wangdian.cn/openapi2/wdt.stock.sync.by.pd' for record in transformed_data: response = requests.post(url, json=record) if response.status_code == 200: print(f"Record {record['api_outer_no']} successfully loaded.") else: print(f"Failed to load record {record['api_outer_no']}: {response.text}") # 主流程调用示例 cleaned_data = extract_data_from_kingdee() transformed_data = transform_data_for_wdt(cleaned_data) load_data_to_wdt(transformed_data) ``` 通过上述步骤,我们实现了从源平台(金蝶网店管理系统)到目标平台(旺店通·企业奇门)的数据ETL转换和加载过程。每一步都涉及具体的技术实现和细节处理,以确保数据能够准确无误地完成集成任务。 ![如何对接企业微信API接口](https://pic.qeasy.cloud/T20.png~tplv-syqr462i7n-qeasy.image)