使用轻易云数据集成平台的ETL转换技术,轻松对接金蝶云星空API
### 测试销售出库单:旺店通·企业奇门数据集成到金蝶云星空
在现代企业的复杂信息系统中,实现各个业务系统之间的数据无缝对接是提升运营效率的重要手段。本案例将聚焦于通过API接口实现对接,解析旺店通·企业奇门的数据如何高效、安全地集成到金蝶云星空。
#### 1. 旺店通·企业奇门与金蝶云星空的API对接概述
为了实现销售出库单从旺店通·企业奇门到金蝶云星空的自动化传输,我们首先需要利用两个关键API。获取出库单数据使用`wdt.stockout.order.query.trade`接口,而向金蝶云星空写入这些数据则使用`batchSave`接口。在整个过程中,需处理好分页、限流及数据格式转换等问题,以确保每一条记录能够精确且无遗漏地被转移和保存。
#### 2. 确保高效可靠的数据传输
为解决可能出现的大量数据快速写入需求,我们设计了高度吞吐量优化方案,使得大批量订单数据可以迅速传递至目标系统。同时,通过集中监控和告警机制,可以实时跟踪任务状态及性能,并及时发现潜在异常进行响应,从而最大程度保证了集成任务的稳定运行。此外,自定义的数据转换逻辑也有效应对了两种不同平台间存在的数据结构差异问题。
### 案例具体操作步骤:
1. **初始化配置**
- 获取并验证所需API的访问令牌。
- 配置初始同步参数,包括分页大小及请求频率,以避免触发限流机制。
2. **抓取原始出库单据**
- 调用旺店通·企业奇门 API `wdt.stockout.order.query.trade` 按设定频次抓取新生成或变更后的销售出库单据。
- 对抓取回来的JSON 数据进行必要清洗、校验和预处理,为后续写入做好准备。
3. **转换与映射**
- 根据需求,对抓取到的数据进行字段映射,将其转换为符合金蝶云星空 `batchSave` API 要求的格式。
4. **批量写入与确认**
- 使用经自定义规则加工后的数据,通过 `batchSave` 接口批量导入至金蝶云星空,同时记录所有提交事务以便后续审查和重试处理。
5. **安全保障**
- 实施全流程日志监控,对成功与失败均有详细记录,辅以异常检测技术确保稳定性。如遇错误,则启动重试机制或发送告警通知相关责任人
![用友与WMS系统接口开发配置](https://pic.qeasy.cloud/D4.png~tplv-syqr462i7n-qeasy.image)
### 调用旺店通·企业奇门接口wdt.stockout.order.query.trade获取并加工数据
在数据集成生命周期的第一步,调用源系统接口获取数据是至关重要的一环。本文将详细探讨如何通过轻易云数据集成平台调用旺店通·企业奇门接口`wdt.stockout.order.query.trade`,并对获取的数据进行初步加工。
#### 接口调用配置
首先,我们需要配置元数据以便正确调用接口。以下是元数据配置的关键部分:
```json
{
"api": "wdt.stockout.order.query.trade",
"method": "POST",
"number": "order_no",
"id": "stockout_id",
"pagination": {
"pageSize": 100
},
"idCheck": true,
"condition": [
[
{"field":"shop_no","logic":"notlike","value":"JS"},
{"field":"shop_no","logic":"notlike","value":"js"},
{"field":"shop_no","logic":"notlike","value":"Js"},
{"field":"shop_no","logic":"notlike","value":"jS"}
]
],
"request": [
{"field":"start_time","label":"开始时间","type":"datetime","describe":"增量获取数据,start_time作为开始时间,格式:yyyy-MM-dd HH:mm:ss","value":"{{LAST_SYNC_TIME|datetime}}"},
{"field":"end_time","label":"结束时间","type":"datetime","describe":"增量获取数据,end_time作为结束时间,格式:yyyy-MM-dd HH:mm:ss","value":"{{CURRENT_TIME|datetime}}"},
{"field":"status","label":"状态","type":"string"},
{"field":"src_order_no","label":"系统订单编号","type":"string"},
{"field":"src_tid","label":"原始单号","type":"string"},
{"field":"stockout_no","label":"出库单号","type":"string"},
{"field":"shop_no","label":"店铺编号","type":"string"},
{"field":"warehouse_no","label":"仓库编号","type":"string"}
],
"otherRequest": [
{"field": "page_size", "label": "分页大小", "type": "string", "describe": "每页返回的数据条数,输入值范围1~100,不传本参数,输入值默认为40", "value": "100"},
{"field": "page_no", "label": "页号", "type": "string", "describe": "不传值默认从0页开始"}
]
}
```
#### 请求参数解析
- **API和方法**:`api`字段指定了我们要调用的接口为`wdt.stockout.order.query.trade`,请求方法为`POST`。
- **分页设置**:通过`pagination`字段设置每页返回的数据条数为100。
- **条件过滤**:使用`condition`字段排除特定的店铺编号(如"JS"、"js"等)。
- **请求参数**:
- `start_time`和`end_time`用于指定增量获取数据的时间范围。
- `status`、`src_order_no`、`src_tid`等字段用于进一步筛选和标识订单信息。
- **其他请求参数**:包括分页大小和页号。
#### 数据请求与清洗
在实际操作中,我们需要根据上述配置发送HTTP POST请求,并处理返回的数据。以下是一个示例代码片段,用于发送请求并处理响应:
```python
import requests
import json
from datetime import datetime
# 设置请求URL和头部信息
url = 'https://api.wangdian.cn/openapi2/wdt.stockout.order.query.trade'
headers = {'Content-Type': 'application/json'}
# 构建请求体
payload = {
'start_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'end_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'status': '55',
'page_size': 100,
'page_no': 0
}
# 发起POST请求
response = requests.post(url, headers=headers, data=json.dumps(payload))
# 检查响应状态码
if response.status_code == 200:
data = response.json()
# 数据清洗与处理逻辑
orders = data.get('orders', [])
for order in orders:
# 示例:过滤掉特定店铺编号的订单
if order['shop_no'] not in ['JS', 'js', 'Js', 'jS']:
# 执行进一步的数据处理,例如存储到数据库或转换格式等
process_order(order)
else:
print(f"Error: {response.status_code}, {response.text}")
```
#### 数据转换与写入
在完成数据清洗后,我们可以将处理后的数据写入目标系统。这一步通常涉及到将数据转换为目标系统所需的格式,并通过相应的API或数据库连接进行写入。
例如,如果目标系统是一个关系型数据库,可以使用SQLAlchemy或类似工具将清洗后的订单数据批量插入到数据库表中:
```python
from sqlalchemy import create_engine, Table, MetaData
# 创建数据库连接引擎
engine = create_engine('mysql+pymysql://user:password@host/dbname')
# 定义表结构(假设已存在)
metadata = MetaData()
orders_table = Table('orders', metadata, autoload_with=engine)
# 插入数据到数据库表中
with engine.connect() as conn:
for order in cleaned_orders:
conn.execute(orders_table.insert().values(order))
```
通过以上步骤,我们实现了从旺店通·企业奇门接口获取销售出库单数据,并对其进行初步清洗和加工,为后续的数据转换与写入奠定了基础。这一过程展示了轻易云数据集成平台在处理异构系统间数据集成时的强大能力。
![金蝶与CRM系统接口开发配置](https://pic.qeasy.cloud/S4.png~tplv-syqr462i7n-qeasy.image)
### 使用轻易云数据集成平台进行ETL转换并写入金蝶云星空API接口的技术案例
在数据集成生命周期的第二步,我们需要将已经集成的源平台数据进行ETL转换,并将其转为目标平台——金蝶云星空API接口所能够接收的格式,最终写入目标平台。以下是详细的技术实现过程。
#### 元数据配置解析
元数据配置是我们进行ETL转换的核心,下面我们逐一解析关键字段和操作:
1. **API接口信息**
```json
{"api":"batchSave","effect":"EXECUTE","method":"POST"}
```
- `api`: 调用的API接口名称为`batchSave`。
- `effect`: 操作效果为`EXECUTE`。
- `method`: HTTP请求方法为`POST`。
2. **唯一标识字段**
```json
{"number":"FBillNo","id":"FBillNo","name":"FBillNo","idCheck":true}
```
- `number`, `id`, `name`: 唯一标识字段均为`FBillNo`,即单据编号。
- `idCheck`: 表示在写入前需要检查ID是否存在。
3. **操作配置**
```json
{"rowsKey":"array","rows":10,"method":"batchArraySave"}
```
- `rowsKey`: 数据行键名为`array`。
- `rows`: 每次批量处理的数据行数为10。
- `method`: 批量保存方法为`batchArraySave`。
4. **请求字段配置**
主要字段及其解析如下:
```json
[
{"field":"FBillTypeID","label":"单据类型","type":"string","parser":{"name":"ConvertObjectParser","params":"FNumber"},"value":"XSCKD07_SYS"},
{"field":"FBillNo","label":"单据编号","type":"string","value":"{order_no}"},
{"field":"FDate","label":"日期","type":"string","value":"{consign_time}"},
{"field":"FSaleOrgId","label":"销售组织","type":"string","parser":{"name":"ConvertObjectParser","params":"FNumber"},"value":
"_function case when '{shop_name}' like '%古神%' then 102 when '{shop_name}' like '%广州格物%' then 101 when '{shop_name}' like '%蓝阳%' then 105 when '{shop_name}' like '%秋玥白%' then 107 when '{shop_name}' like '%金秋%' then 106 when '{shop_name}' like '%广西格物%' then 103 when '{shop_name}' like '%绿星%' then 104 ELSE 100 end"},
{"field":"FCustomerID","label":"客户","type":"string","parser":{"name":"ConvertObjectParser","params":"FNumber"},"value":"{shop_no}"},
...
]
```
- **单据类型**: 固定值`XSCKD07_SYS`,使用`ConvertObjectParser`解析。
- **单据编号**: 动态值,取自源数据中的`order_no`。
- **日期**: 动态值,取自源数据中的`consign_time`。
- **销售组织**: 根据店铺名称进行动态映射,并使用函数进行条件判断设置不同的组织编码。
- **客户**: 动态值,取自源数据中的店铺编号,并使用解析器转换。
5. **子实体配置**
子实体包括财务信息和明细信息:
```json
{
"field": "SubHeadEntity",
"children": [
{"field": "FSettleTypeID", "label": "结算方式", "type": "string", "parser": {"name": "ConvertObjectParser", "params": "FNumber"}, "value": "JSFS04_SYS"},
...
]
},
{
"field": "FEntity",
"children": [
{"field": "FMaterialID", "label": "物料编码", "type": "string", "parser": {"name": "ConvertObjectParser", "params": "FNumber"},
"value": "{{details_list.spec_no}}"},
...
]
}
```
6. **其他请求参数**
包含业务对象表单ID、执行操作、是否自动提交并审核等:
```json
[
{"field": "FormId", "label": "业务对象表单Id", "type": "string",
...},
...
]
```
#### 数据转换与写入流程
1. **数据清洗与转换**
首先,通过元数据配置对源数据进行清洗与转换。对于每个字段,根据其定义和解析器,将源数据映射到目标格式。例如:
- 将订单号映射到单据编号(FBillNo)。
- 将发货时间映射到日期(FDate)。
- 根据店铺名称动态设置销售组织(FSaleOrgId)。
2. **构建请求体**
根据元数据配置构建请求体,将清洗后的数据按照API要求的格式组织起来。例如:
```json
{
"Model":
{
...
// 主表信息
// 子表信息(财务信息、明细信息)
...
},
...
}
```
3. **调用金蝶云星空API**
使用HTTP POST方法,将构建好的请求体发送至金蝶云星空API接口。确保在发送前完成所有必要的验证和检查。
4. **处理响应**
接收并处理API响应,根据响应结果判断操作是否成功。如果失败,则记录错误信息并采取相应措施。
通过上述步骤,我们可以高效地将源平台的数据经过ETL转换后写入到金蝶云星空,实现不同系统间的数据无缝对接。这不仅提升了业务透明度和效率,也确保了数据的一致性和准确性。
![电商OMS与ERP系统接口开发配置](https://pic.qeasy.cloud/T13.png~tplv-syqr462i7n-qeasy.image)