使用轻易云进行ETL转换并写入KIS私有云API技术解析
### 聚水潭数据集成到KIS私有云案例:聚水潭-盘盈——>KIS-其他入库Done
在本技术案例中,我们深入探讨了如何将聚水潭平台中的库存盘点数据高效、准确地集成到KIS私有云系统。通过利用轻易云数据集成平台的强大功能,实现跨系统的数据流动和处理。本次方案命名为“聚水潭-盘盈——>KIS-其他入库Done”。
首先,为理解整个流程,必须明确两个关键API接口:
1. **聚水潭获取数据的API**: `/open/inventory/count/query`
2. **KIS私有云写入数据的API**: `/koas/app007104/api/miscellaneousreceipt/create`
#### 实现步骤概述
##### 数据抓取与转换
为了确保我们从聚水潭获取的数据不遗漏,每隔一段固定时间,通过定时任务可靠地调用`/open/inventory/count/query`接口,抓取最新的库存盘盈数据。在这一过程中,需要特别注意分页和限流问题,以防止因一次请求过多而导致性能瓶颈。轻易云提供自定义的数据转换逻辑,可以针对不同业务需求对原始数据进行清洗和变换,使其符合目标系统格式。
##### 高效批量写入
接下来,将经过处理后的批量库存盘盈数据快速写入到KIS私有云对应模块。这一步依赖于高吞吐量能力,我们调用`/koas/app007104/api/miscellaneousreceipt/create`接口实现批量操作。同时设置错误重试机制,确保即使出现网络抖动或服务异常,也能够自动重试并记录详细日志。
##### 数据监控与告警
为了实时掌握每个环节的数据处理状态,全程设立了集中监控与告警系统。该系统不仅可以跟踪各个任务执行过程,还能及时检测出任何异常,例如接口超时或响应值不合法,并迅速发送告警通知相关负责人采取措施。
通过这样的技术方法,不仅提升了整体效率,还保证了跨平台的数据一致性和完整性,提高企业资源管理水平。
![用友与MES系统接口开发配置](https://pic.qeasy.cloud/D3.png~tplv-syqr462i7n-qeasy.image)
### 调用聚水潭接口获取并加工数据的技术案例
在轻易云数据集成平台中,调用源系统接口是数据处理生命周期的第一步。本文将深入探讨如何通过调用聚水潭接口`/open/inventory/count/query`来获取并加工数据。
#### 接口概述
聚水潭提供的`/open/inventory/count/query`接口用于查询库存盘点信息。该接口采用POST方法,能够根据指定的条件返回符合要求的数据。以下是该接口的元数据配置:
```json
{
"api": "/open/inventory/count/query",
"method": "POST",
"number": "io_id",
"id": "io_id",
"request": [
{
"field": "page_index",
"label": "开始页码",
"type": "string",
"describe": "第几页,从第一页开始,默认1",
"value": "1"
},
{
"field": "page_size",
"label": "每页条数",
"type": "string",
"describe": "每页多少条,默认30,最大50",
"value": "50"
},
{
"field": "modified_begin",
"label": "修改开始时间",
"type": "datetime",
"describe": "修改起始时间,和结束时间必须同时存在,时间间隔不能超过七天,与线上单号不能同时为空",
"value": "{{LAST_SYNC_TIME|datetime}}"
},
{
"field": "modified_end",
"label": "修改结束时间",
"type": "datetime",
"describe": "修改结束时间,和起始时间必须同时存在,时间间隔不能超过七天,与线上单号不能同时为空",
"value": "{{CURRENT_TIME|datetime}}"
},
{
"field": "status",
"label": "单据状态",
"type": "string",
"describe":"单据状态,Confirmed=生效,WaitConfirm待审核,Creating=草拟,Archive=归档,Cancelled=作废",
'value': 'Confirmed'
}
],
'condition': [
[
{'field': 'items.qty', 'logic': 'gt', 'value': '0'},
{'field': 'wms_co_id', 'logic': 'in', 'value': '14132797,14133381'}
]
]
}
```
#### 请求参数详解
1. **page_index**: 开始页码,从第一页开始。默认值为1。
2. **page_size**: 每页条数。默认值为30,最大值为50。
3. **modified_begin**: 修改开始时间。与`modified_end`必须同时存在且间隔不超过七天。
4. **modified_end**: 修改结束时间。与`modified_begin`必须同时存在且间隔不超过七天。
5. **status**: 单据状态。我们选择了"Confirmed"表示已生效的单据。
#### 条件过滤
在请求参数中,我们还设置了两个条件过滤:
- `items.qty > 0`: 表示库存数量大于零。
- `wms_co_id in (14132797, 14133381)`: 表示仓库ID在指定范围内。
#### 数据请求与清洗
在调用该接口时,我们需要注意以下几点:
- 确保分页参数正确设置,以避免遗漏数据。
- 使用动态变量如`{{LAST_SYNC_TIME|datetime}}`和`{{CURRENT_TIME|datetime}}`来确保数据同步的时效性。
- 对返回的数据进行初步清洗,如去除无效记录、格式化日期等。
以下是一个Python示例代码,用于调用该接口并处理返回的数据:
```python
import requests
import datetime
# 设置请求URL和头部信息
url = 'https://api.jushuitan.com/open/inventory/count/query'
headers = {'Content-Type': 'application/json'}
# 获取当前时间和上次同步时间
current_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
last_sync_time = (datetime.datetime.now() - datetime.timedelta(days=7)).strftime('%Y-%m-%d %H:%M:%S')
# 构建请求体
payload = {
'page_index': 1,
'page_size': 50,
'modified_begin': last_sync_time,
'modified_end': current_time,
'status': 'Confirmed'
}
# 发起POST请求
response = requests.post(url, headers=headers, json=payload)
# 检查响应状态码
if response.status_code == 200:
data = response.json()
# 数据清洗与处理
cleaned_data = [item for item in data['items'] if item['qty'] > 0 and item['wms_co_id'] in [14132797, 14133381]]
# 打印清洗后的数据
print(cleaned_data)
else:
print(f'Error: {response.status_code}')
```
#### 数据转换与写入
在获取并清洗完数据后,我们需要将其转换为目标系统所需的格式,并写入到目标系统中。这一步通常包括字段映射、格式转换等操作。例如,将日期格式从字符串转换为目标系统所需的日期对象。
通过上述步骤,我们可以高效地完成从聚水潭到目标系统的数据集成。这不仅提高了数据处理的效率,还确保了数据的一致性和准确性。
![泛微OA与ERP系统接口开发配置](https://pic.qeasy.cloud/S19.png~tplv-syqr462i7n-qeasy.image)
### 使用轻易云数据集成平台进行ETL转换并写入KIS私有云API接口的技术案例
在数据集成的生命周期中,第二步是将已集成的源平台数据进行ETL转换,并转为目标平台KIS私有云API接口所能接收的格式,最终写入目标平台。本文将详细探讨如何利用轻易云数据集成平台完成这一过程。
#### API接口配置与元数据解析
首先,我们需要理解目标API接口的元数据配置。以下是一个典型的KIS私有云API接口配置示例:
```json
{
"api": "/koas/app007104/api/miscellaneousreceipt/create",
"effect": "EXECUTE",
"method": "POST",
"number": "id",
"id": "id",
"name": "id",
"idCheck": true,
"request": [
{
"field": "AccountDB",
"label": "AccountDB",
"type": "string",
"value": "001"
},
{
"field": "Object",
"label": "Object",
"type": "object",
"describe": "暂无描述",
...
}
]
}
```
该配置文件定义了API的基本信息,包括请求方法(POST)、API路径以及请求参数等。
#### 数据转换与映射
在ETL过程中,关键步骤之一是将源平台的数据字段映射到目标平台所需的字段。这通常涉及复杂的数据转换和清洗操作。以下是一些关键字段及其映射规则:
1. **单据编号 (FBillNo)**
- 源数据字段:`io_id`
- 转换规则:直接映射
- 示例:`"FBillNo":"{io_id}"`
2. **日期 (Fdate)**
- 源数据字段:`io_date`
- 转换规则:日期格式转换,将空格替换为'T'
- 示例:`"Fdate":"_function REPLACE ('{{io_date|datetime}}',' ','T')"`
3. **部门 (FDeptID)**
- 固定值:16921
- 示例:`"FDeptID":"16921"`
4. **仓库(表头)(FDCStockID)**
- 源数据字段:`wms_co_id` 和 `wh_id`
- 转换规则:组合两个字段值
- 示例:`"FDCStockID":"{wms_co_id}-{wh_id}"`
5. **责任人 (FManagerID)**
- 源数据字段:`wms_co_id`
- 转换规则:直接映射
- 示例:`"FManagerID":"{wms_co_id}"`
6. **产品代码 (Entry.FItemID)**
- 源数据字段:`sku_id`
- 转换规则:通过MongoDB查询获取对应值
- 示例:
```json
{
"field":"FItemID",
...
"value":"_mongoQuery 30fa1b2b-6cfc-31c2-90a3-5a497b7812bd findField=content.FItemID where={\"content.F_103\":{\"$eq\":\"{sku_id}\"}}"
}
```
#### 数据写入
一旦所有必要的数据都已转换并映射到正确的格式,我们就可以使用轻易云提供的API调用功能,将这些数据写入到KIS私有云系统中。
```python
import requests
url = 'https://your-kis-api-endpoint/koas/app007104/api/miscellaneousreceipt/create'
headers = {'Content-Type': 'application/json'}
payload = {
'AccountDB': '001',
'Object': {
'Head': {
'FBillNo': '{io_id}',
'Fdate': '_function REPLACE ("{{io_date|datetime}}"," ","T")',
'FDeptID': '16921',
...
},
'Entry': [
{
'FItemID': '_mongoQuery ...',
...
}
]
}
}
response = requests.post(url, json=payload, headers=headers)
if response.status_code == 200:
print('Data successfully written to KIS')
else:
print(f'Failed to write data: {response.text}')
```
通过上述代码示例,我们可以看到如何构建HTTP POST请求,将经过ETL处理的数据发送到KIS私有云API。
#### 总结
在本文中,我们深入探讨了如何利用轻易云数据集成平台完成ETL转换,并将处理后的数据写入到KIS私有云API接口。我们详细解析了目标API接口的元数据配置,并展示了如何进行字段映射和数据转换,最终实现了无缝的数据集成。
![用友与WMS系统接口开发配置](https://pic.qeasy.cloud/T29.png~tplv-syqr462i7n-qeasy.image)