轻易云数据集成平台应用:ETL转换并写入金蝶云星辰V2
### 新版订单-出库单(佰嘉)杭州佰健嘉庆:汤臣倍健营销云数据集成到金蝶云星辰V2
在企业的日常运营中,系统之间的数据对接与集成是一个至关重要的环节。本文将着重介绍汤臣倍健营销云与金蝶云星辰V2之间的系统对接实践——新版订单-出库单(佰嘉)杭州佰健嘉庆案例。
我们首先需要通过API从汤臣倍健营销云获取相关订单数据。使用`/api/openapi/v1/erp/order/honour/agreement/header`接口,将最新生成的订单信息实时抓取下来,并处理分页和限流等问题,以确保数据不遗漏且高效抓取。在此过程中,我们实现了异常检测和错误重试机制,保障了数据提取过程的稳定性。
为了适应业务需求,我们自定义了一套复杂的数据转换逻辑,将从汤臣倍健营销云获取到的数据格式转化为金蝶云星辰V2所能接受的格式。紧接着,通过调用金蝶云星辰V2提供的写入API ` /jdy/v2/scm/sal_out_bound`,将经过转换后的大批量数据快速写入系统。这一过程中,高吞吐量的数据写入能力显著提升了整体效率。
此外,为确保整个数据处理链条透明且可控,我们利用集中的监控和告警系统,实时跟踪各个任务节点,从而能够及时发现并解决潜在的问题。同时,通过可视化的数据流设计工具,对整个流程进行了直观管理,使得配置变更及故障排除更加便捷、高效。
以上技术点仅是这次项目的一部分内容,在后续文章中我们将进一步探讨具体实现细节,包括如何调用不同接口、应对各种异常情况以及优化策略应用等方面。
![用友与MES系统接口开发配置](https://pic.qeasy.cloud/D24.png~tplv-syqr462i7n-qeasy.image)
### 调用汤臣倍健营销云接口获取并加工数据的技术实践
在数据集成生命周期的第一步,我们需要调用源系统汤臣倍健营销云的接口`/api/openapi/v1/erp/order/honour/agreement/header`来获取订单数据,并进行初步的数据清洗和加工。以下是具体的技术实现和相关配置细节。
#### 接口调用配置
首先,我们需要配置API调用的元数据。根据提供的元数据配置,以下是我们需要关注的字段和参数:
- **API路径**:`/api/openapi/v1/erp/order/honour/agreement/header`
- **请求方法**:POST
- **请求参数**:
- `orgId`(组织ID):固定值`e19363e8f33746b2a50c5577906a83bc`
- `page`(页码):默认值为`1`
- `id`(订单ID)
- `applyerId`(要货方ID)
- `supplierId`(供货方ID)
- `no`(订单号)
- `distributionType`(分销类型)
- `distributorId`(分销商ID)
- `orderStatus`(订单状态):多个状态值,用逗号分隔
- `createDt`(创建时间)
- `orderTypeCode`(订单类型,如普通订单、直运销售)
- `isDeliveryFreezed`(是否暂停发货)
- `nature`(单据类型):固定值为`1`
- `relatedApplyerId`(关联交易经销商ID)
- `saleDistribution`(销售渠道)
- `disApplyerId`(分销商ID)
- `startDt`(订单时间开始)
- `endDt`(订单时间结束)
- `appStartDt`(审批时间开始)
- `appEndDt`(审批时间结束)
- `lastStartDt`(最后修改时间开始):动态值,使用模板变量`${LAST_SYNC_TIME|datetime}`
- `lastEndDt`(最后修改时间结束):动态值,使用模板变量`${CURRENT_TIME|datetime}`
#### 请求参数构建
在实际调用中,我们需要构建一个包含上述字段的JSON对象作为请求体。以下是一个示例请求体:
```json
{
"orgId": "e19363e8f33746b2a50c5577906a83bc",
"page": "1",
"orderStatus": "WAIT_FINANCE_AUDIT,WAIT_DELIVERY,PART_DELIVERY,ALL_DELIVERY,WAIT_OUT_STORAGE,PART_OUT_STORAGE,ALL_OUT_STORAGE,PART_CONFIRM,CONFIRM,AUDIT_SUCCESS,FINISH,CLOSE",
"nature": "1",
"lastStartDt": "{{LAST_SYNC_TIME|datetime}}",
"lastEndDt": "{{CURRENT_TIME|datetime}}"
}
```
#### 数据清洗与加工
在获取到原始数据后,需要对数据进行清洗和初步加工。这一步骤通常包括但不限于以下操作:
1. **字段过滤**:只保留必要的字段,去除冗余信息。
2. **格式转换**:将日期字符串转换为标准日期格式,将数值字符串转换为数值类型等。
3. **数据校验**:检查关键字段是否为空或格式是否正确,确保数据完整性。
例如,对于返回的数据结构,可以进行如下处理:
```python
import json
from datetime import datetime
def clean_data(raw_data):
cleaned_data = []
for record in raw_data:
cleaned_record = {
"order_id": record.get("id"),
"order_no": record.get("no"),
"status": record.get("orderStatus"),
"created_at": datetime.strptime(record.get("createDt"), "%Y-%m-%d %H:%M:%S") if record.get("createDt") else None,
# 添加其他必要字段处理
}
# 校验关键字段
if not cleaned_record["order_id"] or not cleaned_record["order_no"]:
continue
cleaned_data.append(cleaned_record)
return cleaned_data
# 示例调用
raw_response = '[{"id":"12345","no":"ORD001","orderStatus":"WAIT_DELIVERY","createDt":"2023-10-01 12:00:00"}]'
raw_data = json.loads(raw_response)
cleaned_data = clean_data(raw_data)
print(cleaned_data)
```
#### 实时监控与日志记录
为了确保数据集成过程中的透明度和可追溯性,我们需要对每次API调用及其结果进行实时监控和日志记录。这可以通过轻易云平台提供的监控工具来实现,也可以自定义日志记录机制。例如:
```python
import logging
logging.basicConfig(level=logging.INFO)
def log_api_call(request_body, response_body):
logging.info(f"API Request: {request_body}")
logging.info(f"API Response: {response_body}")
# 示例日志记录
request_body = {
"orgId": "e19363e8f33746b2a50c5577906a83bc",
"page": "1",
# ...其他参数
}
response_body = '[{"id":"12345","no":"ORD001","orderStatus":"WAIT_DELIVERY","createDt":"2023-10-01 12:00:00"}]'
log_api_call(request_body, response_body)
```
通过以上步骤,我们可以高效地从汤臣倍健营销云接口获取并加工订单数据,为后续的数据转换与写入奠定基础。
![打通用友BIP数据接口](https://pic.qeasy.cloud/S6.png~tplv-syqr462i7n-qeasy.image)
### 轻易云数据集成平台生命周期第二步:ETL转换及写入金蝶云星辰V2 API接口
在数据集成过程中,ETL(Extract, Transform, Load)转换是至关重要的一环。本文将详细探讨如何将已经集成的源平台数据,通过轻易云数据集成平台,转换为金蝶云星辰V2 API接口所能接收的格式,并最终写入目标平台。
#### 数据请求与清洗
在开始ETL转换之前,我们需要确保数据已经从源系统成功提取并经过初步清洗。这一步通常包括对数据进行去重、补全和标准化处理。假设我们已经完成了这些步骤,现在可以直接进入数据转换和写入阶段。
#### 数据转换与写入
根据提供的元数据配置,我们需要将源平台的数据映射到金蝶云星辰V2 API接口所需的字段格式。以下是一个具体的技术案例:
##### 元数据配置解析
```json
{
"api": "/jdy/v2/scm/sal_out_bound",
"effect": "EXECUTE",
"method": "POST",
"number": "id",
"id": "id",
"name": "id",
"idCheck": true,
"request": [
{"field":"bill_source","label":"单据来源,固定传入ISV","type":"string","describe":"单据来源,固定传入APP","value":"ISV"},
{"field":"bill_date","label":"出库日期,格式:2019-01-01","type":"datetime","describe":"出库日期,格式:2019-01-01","value":"{{approveDt|date}}"},
{"field":"bill_no","label":"单据编码","type":"string","value":"{no}"},
{"field":"customer_id","label":"客户id","type":"string","describe":"客户信息","value":"_findCollection find id from b41660e7-fa00-318f-bbee-1395e229ee6b where number={clientAppNo}"},
{"field":"remark","label":"单据备注","type":"string","value":"{remark}"},
{"field":"contact_linkman","label":"联系信息-联系人","type":"string","value":"{contacts}"},
{"field":"contact_phone","label":"联系信息-联系方式","type":"string","value":"{phone}"},
{"field":"contact_address","label":"联系信息-详细地址","type":"string","value":"{shippingAddress}"},
{"field":"dept_id","label":"部门","type":"string","value":"1320359653724412928"},
{"field": "custom_field", "label": "自定义字段", "type": "object",
"children": [
{"field": "custom_field__1__2xietiob41d0jia2",
"label": "营销云备注",
"type": "string",
"value": "{no}"}
]
},
{
"field": "material_entity",
"label": "商品分录",
"type": "array",
"describe": "商品分录",
...
```
##### 关键字段映射
1. **bill_source**: 固定传入值"ISV",表示单据来源。
2. **bill_date**: 出库日期,使用模板变量`{{approveDt|date}}`来动态填充。
3. **bill_no**: 单据编码,对应源数据中的`{no}`。
4. **customer_id**: 客户ID,通过`_findCollection`函数从客户表中查找对应ID。
5. **remark**: 单据备注,对应源数据中的`{remark}`。
6. **contact_linkman**: 联系人信息,对应源数据中的`{contacts}`。
7. **contact_phone**: 联系方式,对应源数据中的`{phone}`。
8. **contact_address**: 详细地址,对应源数据中的`{shippingAddress}`。
9. **dept_id**: 部门ID,固定值"1320359653724412928"。
##### 自定义字段和商品分录
自定义字段和商品分录部分较为复杂,需要特别注意:
1. **custom_field**: 包含一个子字段`custom_field__1__2xietiob41d0jia2`,其值为单据编号`{no}`。
2. **material_entity**: 商品分录是一个数组,每个元素包含多个字段,如下:
- `material_number`: 商品编号,对应源数据中的`{productNo}`。
- `stock_id`: 仓库ID,固定值"1320319591469069312"。
- `qty`: 数量,对应源数据中的`{{details.quantity}}`。
- `unit_id`: 单位ID,通过`_findCollection`函数从单位表中查找对应ID。
- `tax_price`: 含税单价,对应源数据中的`{{details.basePrice}}`。
#### 数据写入操作
完成上述映射后,我们使用POST方法将转换后的JSON对象发送到金蝶云星辰V2 API接口:
```python
import requests
import json
url = 'https://api.kingdee.com/jdy/v2/scm/sal_out_bound'
headers = {'Content-Type': 'application/json'}
data = {
# 按照元数据配置生成的JSON对象
}
response = requests.post(url, headers=headers, data=json.dumps(data))
if response.status_code == 200:
print("Data successfully written to Kingdee Cloud.")
else:
print("Failed to write data:", response.text)
```
通过上述步骤,我们实现了从轻易云平台提取、转换并写入金蝶云星辰V2 API接口的全过程。这不仅确保了不同系统间的数据无缝对接,也极大提升了业务流程的自动化程度。
![泛微OA与ERP系统接口开发配置](https://pic.qeasy.cloud/T21.png~tplv-syqr462i7n-qeasy.image)