利用轻易云进行ETL转换并写入金蝶云星空的实战案例
### 业务系统销售开单对接到金蝶销售订单-ok
在企业日常运作中,高效的数据集成对于提升生产力和管理效率至关重要。本案例探讨了如何利用轻易云数据集成平台,实现云水聚数据的高效采集与处理,并将其无缝对接到金蝶云星空。具体方案涉及多个模块的精细协作,确保从获取接口数据,到批量写入,再到错误处理和重试机制的各个环节万无一失。
首先,通过调用云水聚提供的API接口/Kingdee/GetSaleOrderList,我们能够定时可靠地抓取最新的销售订单数据。在这个过程中,为确保不漏单,我们设计了一套处理云水聚接口分页与限流问题的方法。这不仅保证了大范围的数据抓取,同时避免了因接口负载过高而导致的数据丢失或延迟。
为了实现大量数据快速写入金蝶云星空系统,我们利用其batchSave API,以批量方式将多条记录一次性传输并存储。这种方法不仅提高了效率,还降低了网络通信成本。然而,这同时也带来一个挑战,即如何处理两者之间的数据格式差异。对此,我们开发了一套定制化的数据映射规则,自动转换不同字段类型,使之符合目标端要求。
此外,在整个对接过程中,实时监控与日志记录功能不可或缺。我们部署了一系列监控节点,对每一步操作进行追踪并记录日志。当出现异常情况时,例如连接超时或响应失败,通过预设置的错误重试机制,可以自动重新发起请求,从容应对各种突发情况,大幅减少人为干预需求,提高系统稳定性。
![打通用友BIP数据接口](https://pic.qeasy.cloud/D33.png~tplv-syqr462i7n-qeasy.image)
### 调用源系统云水聚接口/Kingdee/GetSaleOrderList获取并加工数据
在数据集成的生命周期中,调用源系统接口是至关重要的一步。本文将深入探讨如何使用轻易云数据集成平台调用云水聚接口 `/Kingdee/GetSaleOrderList` 获取销售订单列表,并对数据进行初步加工。
#### 接口配置与调用
首先,我们需要了解元数据配置中的各个字段及其作用:
```json
{
"api": "/Kingdee/GetSaleOrderList",
"effect": "QUERY",
"method": "GET",
"number": "orderNo",
"id": "orderNo",
"name": "orderNo",
"idCheck": true,
"request": [
{
"field": "ApproveDate",
"label": "审核时间",
"type": "datetime",
"value": "{{LAST_SYNC_TIME|datetime}}"
}
],
"autoFillResponse": true
}
```
1. **API路径**:`/Kingdee/GetSaleOrderList` 是我们需要调用的接口路径。
2. **请求类型**:`GET` 表示我们将通过HTTP GET方法请求数据。
3. **主键字段**:`orderNo` 用于标识每条记录的唯一性。
4. **请求参数**:包含一个字段 `ApproveDate`,表示审核时间,其值为上次同步时间 `{{LAST_SYNC_TIME|datetime}}`。
#### 数据请求与清洗
在调用接口之前,我们需要确保请求参数的正确性。这里的 `ApproveDate` 参数用于过滤自上次同步以来的新数据。通过动态设置 `LAST_SYNC_TIME`,可以确保每次只获取增量数据,提升效率。
```python
import requests
from datetime import datetime
# 假设 LAST_SYNC_TIME 已经从系统中获取
last_sync_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
# 构建请求参数
params = {
'ApproveDate': last_sync_time
}
# 发起 GET 请求
response = requests.get('https://api.yunshuiju.com/Kingdee/GetSaleOrderList', params=params)
if response.status_code == 200:
data = response.json()
# 对返回的数据进行初步清洗和验证
cleaned_data = []
for order in data:
if 'orderNo' in order and order['orderNo']:
cleaned_data.append(order)
else:
print(f"Error: {response.status_code}")
```
#### 数据转换与写入
在获取并清洗了销售订单列表后,我们需要将这些数据转换为目标系统所需的格式,并写入目标数据库或系统。在此过程中,可以利用轻易云平台提供的自动填充响应功能 `autoFillResponse: true`,简化部分工作。
```python
# 假设目标系统需要的数据格式如下:
transformed_data = []
for order in cleaned_data:
transformed_order = {
'订单编号': order['orderNo'],
'客户名称': order.get('customerName', ''),
'订单日期': order.get('orderDate', ''),
# 添加其他必要的字段转换...
}
transformed_data.append(transformed_order)
# 将转换后的数据写入目标系统(例如数据库)
import pymysql
connection = pymysql.connect(
host='localhost',
user='user',
password='password',
database='sales_db'
)
try:
with connection.cursor() as cursor:
for order in transformed_data:
sql = """
INSERT INTO sales_orders (order_no, customer_name, order_date)
VALUES (%s, %s, %s)
"""
cursor.execute(sql, (order['订单编号'], order['客户名称'], order['订单日期']))
connection.commit()
finally:
connection.close()
```
通过上述步骤,我们成功地完成了从源系统云水聚获取销售订单列表、对数据进行清洗和转换,并最终写入目标系统的全过程。这一过程不仅提高了数据处理效率,还确保了数据的一致性和准确性。
![如何开发金蝶云星空API接口](https://pic.qeasy.cloud/S11.png~tplv-syqr462i7n-qeasy.image)
### 使用轻易云数据集成平台进行ETL转换并写入金蝶云星空API接口的技术案例
在数据集成生命周期的第二阶段,我们需要将已经集成的源平台数据进行ETL转换,使其符合目标平台金蝶云星空API接口的格式要求,并最终写入目标平台。本文将详细探讨这一过程中的技术细节,特别是如何配置和使用元数据来实现这一目标。
#### 数据请求与清洗
在数据请求与清洗阶段,我们已经从源系统获取了原始数据,并进行了必要的数据清洗和预处理。这些数据现在准备好进行进一步的ETL转换,以适应金蝶云星空API接口的需求。
#### 元数据配置解析
我们使用以下元数据配置来指导ETL转换过程:
```json
{
"api": "batchSave",
"effect": "EXECUTE",
"method": "POST",
"number": "FBillNo",
"id": "FBillNo",
"name": "FBillNo",
"idCheck": true,
"operation": {
"method": "batchArraySave",
"rows": 1,
"rowsKey": "array"
},
"request": [
{
"field": "FBillTypeID",
"label": "单据类型",
...
},
...
],
...
}
```
#### ETL转换过程
1. **字段映射与转换**
每个字段都有特定的配置,包括字段名、标签、类型、描述和值。例如,`FBillTypeID`字段被映射为单据类型,其值通过`ConvertObjectParser`解析器转换为`FNumber`格式。
```json
{
"field": "FBillTypeID",
...
"parser": {
"name": "ConvertObjectParser",
...
},
...
}
```
2. **动态值替换**
某些字段需要从源数据中动态替换,例如订单编号`FBillNo`和客户名称`FCustId`。这些值通常通过占位符表示,如 `{orderNo}` 和 `{agentName}`,并在运行时替换为实际值。
```json
{
"field": "FBillNo",
...
"value": "{orderNo}"
},
{
...
"value":"_mongoQuery a0cfff28-620e-3150-bbbd-3ac0b72a3da5 findField=content.FNumber where={\"content.FName\":{\"$eq\":\"{agentName}\"}}"
}
```
3. **数组处理**
对于复杂的数据结构,如订单明细(`FSaleOrderEntry`),我们需要处理嵌套数组。每个子项也有相应的字段映射和转换规则。例如,物料编码、销售数量和含税单价等字段都需要进行相应的解析和赋值。
```json
{
...
"children":[
{
...
{"field":"FMaterialId",...,"value":"{{productItemEntity.productId}}"},
{"field":"FQty",...,"value":"{{productItemEntity.applyNumer}}"},
{"field":"FTaxPrice",...,"value":"{{productItemEntity.productPrice}}"},
...
}
]
}
```
4. **其他请求参数**
除了主要的数据字段外,还需要配置一些其他请求参数,如业务对象表单ID、执行操作、是否自动提交并审核等。这些参数确保了在调用API时能够正确执行所需操作。
```json
{
...
{"field":"FormId",...,"value":"SAL_SaleOrder"},
{"field":"Operation",...,"value":"BatchSave"},
{"field":"IsAutoSubmitAndAudit",...,"value":"true"},
{"field":"IsVerifyBaseDataField",...,"value":"true"}
}
```
#### 数据写入目标平台
完成ETL转换后,我们使用配置好的元数据通过HTTP POST方法将数据发送到金蝶云星空API接口。以下是一个简化的示例代码片段:
```python
import requests
url = 'https://api.kingdee.com/batchSave'
headers = {'Content-Type': 'application/json'}
data = {
# 根据元数据配置生成的数据结构
}
response = requests.post(url, headers=headers, json=data)
if response.status_code == 200:
print('Data successfully written to Kingdee Cloud.')
else:
print('Failed to write data:', response.content)
```
通过上述步骤,我们成功地将源平台的数据经过ETL转换后写入到金蝶云星空,实现了不同系统间的数据无缝对接。这一过程不仅提高了业务效率,也确保了数据的一致性和准确性。
![金蝶与SCM系统接口开发配置](https://pic.qeasy.cloud/T28.png~tplv-syqr462i7n-qeasy.image)