ETL转化与易仓API写入技术详解:轻易云平台实践
### 金蝶云星空数据集成到易仓:分布调出单-采购订单【供应商为空需要找上游采购入库单供应商】的实施案例
在高度动态且复杂的数据环境中,实现不同系统之间的高效数据集成一直是个挑战。在本次技术分享中,我们将详细探讨如何通过轻易云数据集成平台实现金蝶云星空与易仓之间的数据对接。具体方案聚焦于分布调出单和采购订单的同步,尤其处理了在供应商信息缺失情况下,通过关联上游采购入库单获取所需信息的问题。
#### 初始准备与接口概览
为了确保从金蝶云星空读取到每一个有效交易记录,我们需要充分利用其提供的 `executeBillQuery` API。这一接口允许我们精确地查询并提取特定业务场景下的数据,例如在本项目中的分布调出单和相关的采购订单。
另一方面,借助易仓系统提供的 `syncPurchaseOrders` API,可以将整理后的数据快速写入目标数据库。这对于处理大量、频繁变动的数据而言尤为关键。
#### 确保无遗漏的数据抓取
通过灵活配置 `executeBillQuery` 接口调用策略,并结合参数化查询,实现对金蝶云星空实时、精准、不漏单地捕捉所有相关业务记录。此外,为应对接口分页及限流限制,设计了一套自动重试机制,以确保完整性和可靠性。
```python
def fetch_data_from_kingdee(api_endpoint, query_params):
try:
data = execute_api_call(api_endpoint, query_params)
if 'next_page' in data:
next_query_params = update_query_with_pagination(query_params, data['next_page'])
return data + fetch_data_from_kingdee(api_endpoint, next_query_params)
else:
return data
except APIRateLimitException as e:
handle_rate_limit(e)
```
#### 数据格式差异及映射处理
双向跨系统通信过程中,不可避免面临数据格式不一致的问题。通过定制化映射逻辑,将从金蝶云星空获取到的源数据转化为符合易仓需求格式。这包括字段名称转换、数值单位换算以及日期格式调整等问题:
```python
def transform_kingdee_to_easystore(data):
transformed_data = []
for record in data:
transformed_record = {
"purchase_order_id": record["FOrderId"],
"supplier_name": get_supplier_name(record),
"order_date": convert_date_format(record["FDate"]),
# 其他字段转换逻辑...
}
transformed_data.append(transformed_record)
return transformed_data
```
上述代码片段展示了核心转换步骤,包括ID匹配
![金蝶与WMS系统接口开发配置](https://pic.qeasy.cloud/D18.png~tplv-syqr462i7n-qeasy.image)
### 调用金蝶云星空接口executeBillQuery获取并加工数据
在数据集成的生命周期中,调用源系统接口获取数据是至关重要的一步。本文将详细介绍如何使用轻易云数据集成平台调用金蝶云星空的`executeBillQuery`接口来获取并加工分布调出单和采购订单的数据。
#### 接口配置与调用
首先,我们需要配置并调用金蝶云星空的`executeBillQuery`接口。以下是元数据配置的关键字段及其含义:
- `api`: 接口名称,这里为`executeBillQuery`。
- `method`: 请求方法,使用`POST`。
- `number`: 单据编号字段,这里为`FBillNo`。
- `id`: 分录主键字段,这里为`FSTKTRSOUTENTRY_FEntryID`。
- `pagination`: 分页参数,设置每页大小为500条记录。
- `idCheck`: 是否检查ID重复性,设置为true。
请求参数配置如下:
```json
{
"field": "FSTKTRSOUTENTRY_FEntryID",
"label": "FEntryID",
"type": "string",
"value": "FSTKTRSOUTENTRY_FEntryID"
},
{
"field": "FID",
"label": "实体主键",
"type": "string",
"value": "FID"
},
{
"field": "FBillNo",
"label": "单据编号",
"type": "string",
"value": "FBillNo"
},
...
```
这些字段涵盖了分布调出单和采购订单的各种信息,如单据状态、调入库存组织、日期、物料编码、供应商等。
#### 数据请求与清洗
在调用接口时,我们需要构建请求体,并根据业务需求添加过滤条件。例如,为了获取供应商为空的数据,可以在过滤条件中添加:
```json
{
"field": "FilterString",
"label": "过滤条件",
"type": "string",
"describe": "",
"value": "FApproveDate>='{{LAST_SYNC_TIME|dateTime}}' and F_ABCD_supplier1= ''"
}
```
通过这种方式,我们可以确保只获取到符合条件的数据。
#### 数据转换与写入
获取到数据后,需要对数据进行清洗和转换。比如,对于供应商为空的记录,我们需要查找上游采购入库单中的供应商信息,并补充到当前记录中。这一步可以通过编写自定义脚本或使用轻易云平台提供的数据处理工具来实现。
以下是一个简单的数据处理示例:
```python
def process_data(record):
if not record['FSUPPLIERID_FNumber']:
# 查找上游采购入库单中的供应商信息
supplier_info = find_supplier_from_upstream(record['FSrcBillNo'])
record['FSUPPLIERID_FNumber'] = supplier_info['supplier_number']
return record
def find_supplier_from_upstream(src_bill_no):
# 模拟查找上游采购入库单中的供应商信息
return {"supplier_number": "SUP12345"}
```
经过处理后的数据可以直接写入目标系统或存储到数据库中,以供后续使用。
#### 实时监控与日志记录
为了确保数据集成过程的透明度和可追溯性,轻易云平台提供了实时监控和日志记录功能。通过这些功能,可以随时查看数据流动情况和处理状态,并及时发现和解决问题。
在实际操作中,可以通过以下步骤实现监控和日志记录:
1. **启用实时监控**:在轻易云平台的监控模块中启用实时监控功能,设置监控指标和告警规则。
2. **日志记录**:在数据处理脚本中添加日志记录代码,记录每一步操作的详细信息。例如:
```python
import logging
logging.basicConfig(level=logging.INFO)
def process_data(record):
logging.info(f"Processing record: {record['FBillNo']}")
if not record['FSUPPLIERID_FNumber']:
supplier_info = find_supplier_from_upstream(record['FSrcBillNo'])
record['FSUPPLIERID_FNumber'] = supplier_info['supplier_number']
logging.info(f"Updated supplier info for record: {record['FBillNo']}")
return record
```
通过以上步骤,可以确保整个数据集成过程透明、高效,并且易于维护和扩展。
![用友与SCM系统接口开发配置](https://pic.qeasy.cloud/S27.png~tplv-syqr462i7n-qeasy.image)
### 使用轻易云数据集成平台实现ETL转换并写入易仓API接口
在数据集成的生命周期中,ETL(提取、转换、加载)过程是关键的一环。本文将详细介绍如何使用轻易云数据集成平台,将已经集成的源平台数据进行ETL转换,并转为目标平台易仓API接口所能够接收的格式,最终写入目标平台。
#### 数据请求与清洗
在进行ETL转换之前,首先需要从源系统中提取数据,并对其进行清洗和预处理。假设我们已经完成了这一阶段,现在我们将重点放在如何将这些清洗后的数据转换为易仓API所需的格式,并通过API接口写入目标系统。
#### 数据转换与写入
根据提供的元数据配置,我们需要将源平台的数据字段映射到易仓API所需的字段格式。以下是具体步骤:
1. **定义API接口和请求方法**
我们使用`syncPurchaseOrders` API接口,采用POST方法来提交采购订单数据。
2. **设置请求头和请求体**
请求头包含采购订单的基本信息,如采购单号、日期、供应商ID等。请求体则包含具体的采购产品明细。
3. **字段映射与转换**
元数据配置中详细列出了各个字段的映射关系。例如:
- `FBillNo` 映射到 `po_code`
- `FDate` 映射到 `date_create`
- `F_ABCD_supplier1` 通过 `_findCollection` 方法找到对应的供应商ID
- `FQty` 映射到 `qty_expected`
4. **处理特殊字段**
对于一些需要特殊处理的字段,可以使用函数或其他逻辑进行转换。例如:
```json
{
"field": "unit_price",
"label": "不含税单价",
"type": "string",
"describe": "不含税单价 该值需大于0,is_free字段传1后,该值会默认为0,但传值仍需大于0",
"value": "_function case when '{F_ABCD_UnitPrice}' ='0' then '1.0000' when '{F_ABCD_UnitPrice}' like '%.%' then '{F_ABCD_UnitPrice}' else '{F_ABCD_UnitPrice}.0000' end"
}
```
5. **构建完整请求体**
根据元数据配置,将所有字段组装成一个完整的JSON对象,作为POST请求体提交给易仓API。
6. **发送请求并处理响应**
使用轻易云平台内置的HTTP客户端发送POST请求,并处理返回结果。如果成功,则表示数据已经成功写入目标系统;如果失败,需要根据返回的信息进行错误排查和修正。
以下是一个示例代码片段,用于构建并发送POST请求:
```json
{
"api": "syncPurchaseOrders",
"method": "POST",
"idCheck": true,
"operation": {
"method": "merge",
"field": ["FBillNo", "FDate"],
"bodyName": "details",
...
},
...
}
```
#### 实际应用案例
假设我们有一条采购订单,其原始数据如下:
```json
{
"FBillNo": "PO12345",
"FDate": "2023-10-01",
...
}
```
通过上述步骤,我们将其转换为易仓API所需的格式,并发送POST请求:
```json
{
"po_code": "PO12345",
"date_create": "2023-10-01",
...
}
```
#### 总结
通过轻易云数据集成平台,我们可以高效地完成从源系统到目标系统的数据ETL过程。利用平台提供的全异步、多异构系统支持,以及丰富的元数据配置功能,我们能够确保每一步的数据处理都透明可控,并最终实现不同系统间的数据无缝对接。
![如何对接企业微信API接口](https://pic.qeasy.cloud/T25.png~tplv-syqr462i7n-qeasy.image)