### 案例:金蝶云星空数据集成至轻易云平台
在本文中,我们将深入探讨如何通过轻易云数据集成平台,将金蝶云星空系统中的仓库信息高效、可靠地集成进来。本案例集中展示了执行从金蝶接口 `executeBillQuery` 进行数据查询,并利用轻易云的API实现批量写入。
#### 挑战与解决方案概述
在实际业务场景中,企业常面对大量分散的数据源,为确保数据准时且准确无误地采集并处理,系统对接和标准化转换过程显得尤为关键。以下是我们在本案例中面临的主要技术挑战及对应解决方案:
1. **大量数据快速写入**:通过优化轻易云的平台接口,使得来自金蝶云星空的大容量数据能够被高效处理,大幅提升整体处理速度。
2. **定时抓取与可靠性**:采用定时任务机制定期调用 `executeBillQuery` 接口,保障及时获取最新仓库信息,同时配合重试机制提高请求稳定性。
3. **分页与限流管理**:针对API返回的数据分页处理,以及应对高频调用可能带来的限流问题,通过合理配置分页参数和调度策略,有效规避性能瓶颈。
4. **格式差异转化**:利用自定义转换逻辑,在不同系统之间统一数据结构,以适应特定的业务需求,并确保各环节的一致性和完整性。
#### 开始实施
首先,我们需要获取金蝶云星空中的仓库信息。这一步依赖于调用其公开接口 `executeBillQuery` 来获取所需原始数据信息。在设计这一流程时,我们采取了以下几步措施:
- 使用可视化工具设计并配置从 `executeBillQuery` 接口到目标数据库表的小段映射路径;
- 批量读取接口返回的数据块,对象字段做适当解析和调整后传递给下游处理模块;
- 配置监控告警功能实时跟踪该任务状态,一旦出现异常立即启动修复或通知流程。
这样一来,不仅能有效避免因通信异常导致的数据遗漏,还能保证每次采集后的数据信息都经过精细验证与过滤,提高整体质量。
接下来部分内容将更加剖析具体技术实现,包括如何使用轻易云提供的API完成批量写入操作等。
![用友与外部系统接口集成开发](https://pic.qeasy.cloud/D3.png~tplv-syqr462i7n-qeasy.image)
### 调用金蝶云星空接口executeBillQuery获取并加工数据
在数据集成的生命周期中,第一步是调用源系统接口获取原始数据。本文将深入探讨如何通过轻易云数据集成平台调用金蝶云星空的`executeBillQuery`接口,获取仓库信息并进行初步加工。
#### 接口概述
`executeBillQuery`是金蝶云星空提供的一个用于查询业务单据的API接口。通过该接口,我们可以实现对仓库信息的查询。以下是该接口的一些关键元数据配置:
- **API**: `executeBillQuery`
- **请求方法**: `POST`
- **主要字段**:
- `FStockId`: 仓库ID
- `FNumber`: 编码
- `FName`: 名称
- `FGroup`: 分组
- `FUseOrgId.FNumber`: 使用组织编码
- `F_POKM_JSTSTOCKNUMBER`: 对接聚水潭仓库编码
- `F_POKM_JSTSTOCKNUMBER2`: 对接聚水潭分仓编码
#### 请求参数配置
为了有效地调用`executeBillQuery`接口,需要配置一系列请求参数。这些参数不仅包括基本的查询条件,还涵盖了分页、过滤等高级选项。
1. **基础字段配置**:
```json
[
{"field":"FStockId","label":"id","type":"string","describe":"id","value":"FStockId"},
{"field":"FNumber","label":"编码","type":"string","describe":"编码","value":"FNumber"},
{"field":"FName","label":"名称","type":"string","describe":"名称","value":"FName"},
{"field":"FGroup","label":"分组","type":"string","describe":"分组","value":"FGroup"},
{"field":"FUseOrgId.FNumber","label":"使用组织","type":"string","describe":"使用组织","value":"FUseOrgId.FNumber"},
{"field":"F_POKM_JSTSTOCKNUMBER","label":"对接聚水潭仓库编码","type":"string","value":"F_POKM_JSTSTOCKNUMBER"},
{"field":"F_POKM_JSTSTOCKNUMBER2","label":"对接聚水潭分仓编码","type":"string","value":"F_POKM_JSTSTOCKNUMBER2"}
]
```
2. **其他请求参数**:
```json
[
{"field": "Limit", "label": "最大行数", "type": "string", "describe": "金蝶的查询分页参数", "value": "2000"},
{"field": "StartRow", "label": "开始行索引", "type": "string", "describe": "金蝶的查询分页参数"},
{"field": "TopRowCount", "label": "返回总行数", "type": "int", "describe": "金蝶的查询分页参数"},
{"field": "FilterString", "label": "过滤条件", "type": "string",
"describe": "示例写法 FSupplierId.FNumber = 'VEN00010' and FApproveDate>=",
"value": "FAuditDate>='{{LAST_SYNC_TIME|dateTime}}' and FDocumentStatus='C'"},
{"field": "FieldKeys",
"label": "需查询的字段key集合",
"type": "array",
"describe": "金蝶分录主键ID格式:FPOOrderEntry_FEntryId,其它格式 FPurchaseOrgId.FNumber",
"parser":{"name": "ArrayToString",
"params": ","}},
{"field": "FormId",
"label": "业务对象表单Id",
"type": "string",
"describe": "必须填写金蝶的表单ID如:PUR_PurchaseOrder",
"value": "BD_STOCK"}
]
```
#### 数据请求与清洗
在完成请求参数配置后,通过轻易云平台发起POST请求,调用`executeBillQuery`接口。响应的数据需要进行初步清洗,以确保其符合后续处理和存储要求。
1. **发送请求**:
```python
import requests
url = 'https://api.kingdee.com/executeBillQuery'
headers = {'Content-Type': 'application/json'}
payload = {
'FormId': 'BD_STOCK',
'FieldKeys': 'FStockId,FNumber,FName,FGroup,FUseOrgId.FNumber,F_POKM_JSTSTOCKNUMBER,F_POKM_JSTSTOCKNUMBER2',
'FilterString': 'FAuditDate>=\'2023-01-01\' and FDocumentStatus=\'C\'',
'Limit': 2000,
'StartRow': 0,
'TopRowCount': True
}
response = requests.post(url, json=payload, headers=headers)
if response.status_code == 200:
data = response.json()
# 数据清洗逻辑...
else:
print(f'Error: {response.status_code}')
```
2. **数据清洗**:
清洗步骤包括去除冗余字段、格式化日期、校验数据完整性等。例如:
```python
def clean_data(raw_data):
cleaned_data = []
for item in raw_data:
cleaned_item = {
'id': item['FStockId'],
'number': item['FNumber'],
'name': item['FName'],
'group': item['FGroup'],
'use_org_id': item['FUseOrgId.FNumber'],
'jst_stock_number': item['F_POKM_JSTSTOCKNUMBER'],
'jst_stock_number2': item['F_POKM_JSTSTOCKNUMBER2']
}
cleaned_data.append(cleaned_item)
return cleaned_data
cleaned_data = clean_data(data)
```
通过上述步骤,我们成功调用了金蝶云星空的`executeBillQuery`接口,获取并清洗了仓库信息,为后续的数据转换与写入奠定了基础。这一过程展示了如何利用轻易云平台高效地进行数据集成,确保数据处理过程透明且高效。
![钉钉与ERP系统接口开发配置](https://pic.qeasy.cloud/S24.png~tplv-syqr462i7n-qeasy.image)
### 利用轻易云数据集成平台进行ETL转换与写入目标平台
在数据集成的生命周期中,ETL(Extract, Transform, Load)转换是至关重要的一步。本文将深入探讨如何利用轻易云数据集成平台,将已经集成的源平台数据进行ETL转换,并最终通过API接口写入目标平台。
#### 数据提取与清洗
首先,我们需要从源系统中提取数据。假设我们从金蝶仓库系统中获取库存信息,数据格式可能如下:
```json
{
"warehouse_id": "WH001",
"item_id": "ITEM123",
"quantity": 100,
"last_updated": "2023-10-01T12:00:00Z"
}
```
#### 数据转换
在提取到原始数据后,需要对其进行清洗和转换,以符合目标平台的API接口要求。根据元数据配置,我们需要将数据转化为轻易云集成平台API接口所能接收的格式。
##### 转换规则
1. **字段映射**:根据目标API的需求,对字段进行重新命名或调整。例如,将`warehouse_id`映射为`wh_id`。
2. **数据类型转换**:确保所有字段的数据类型符合目标平台的要求。例如,将时间戳格式从ISO 8601转换为Unix时间戳。
3. **数据验证**:根据元数据配置中的`idCheck`属性,确保所有必需字段都存在且有效。
转换后的数据格式可能如下:
```json
{
"wh_id": "WH001",
"item_code": "ITEM123",
"stock_qty": 100,
"update_time": 1696156800
}
```
#### 数据写入
完成数据转换后,通过轻易云集成平台提供的API接口将数据写入目标系统。以下是一个示例请求:
```http
POST /api/v1/inventory/update HTTP/1.1
Host: api.qingyiyun.com
Content-Type: application/json
{
"wh_id": "WH001",
"item_code": "ITEM123",
"stock_qty": 100,
"update_time": 1696156800
}
```
根据元数据配置中的`effect`属性,该操作应设置为执行模式(EXECUTE),确保实际更新库存信息。
#### API接口调用实现
在实际实现中,可以使用各种编程语言和工具来调用API接口。以下是一个Python示例,展示如何利用requests库发送POST请求:
```python
import requests
import json
import time
# 原始数据
data = {
"warehouse_id": "WH001",
"item_id": "ITEM123",
"quantity": 100,
"last_updated": "2023-10-01T12:00:00Z"
}
# 数据转换函数
def transform_data(data):
transformed_data = {
"wh_id": data["warehouse_id"],
"item_code": data["item_id"],
"stock_qty": data["quantity"],
# 转换时间戳为Unix时间戳
"update_time": int(time.mktime(time.strptime(data["last_updated"], "%Y-%m-%dT%H:%M:%SZ")))
}
return transformed_data
# 转换后的数据
transformed_data = transform_data(data)
# API请求头和URL
headers = {
'Content-Type': 'application/json'
}
url = 'https://api.qingyiyun.com/api/v1/inventory/update'
# 发送POST请求
response = requests.post(url, headers=headers, data=json.dumps(transformed_data))
# 检查响应状态码和内容
if response.status_code == 200:
print("Data successfully written to target platform.")
else:
print(f"Failed to write data. Status code: {response.status_code}, Response: {response.text}")
```
通过上述步骤,我们实现了从金蝶仓库系统提取库存信息、对其进行ETL转换,并最终通过轻易云集成平台的API接口将其写入目标系统。这一过程不仅提高了数据处理的效率,还确保了不同系统间的数据一致性和准确性。
![金蝶与MES系统接口开发配置](https://pic.qeasy.cloud/T2.png~tplv-syqr462i7n-qeasy.image)