轻易云数据集成平台应用:ETL转换并写入金蝶云星辰V2

  • 轻易云集成顾问-冯潇
### 新版订单-出库单(佰嘉)杭州佰健嘉庆:汤臣倍健营销云数据集成到金蝶云星辰V2 在企业的日常运营中,系统之间的数据对接与集成是一个至关重要的环节。本文将着重介绍汤臣倍健营销云与金蝶云星辰V2之间的系统对接实践——新版订单-出库单(佰嘉)杭州佰健嘉庆案例。 我们首先需要通过API从汤臣倍健营销云获取相关订单数据。使用`/api/openapi/v1/erp/order/honour/agreement/header`接口,将最新生成的订单信息实时抓取下来,并处理分页和限流等问题,以确保数据不遗漏且高效抓取。在此过程中,我们实现了异常检测和错误重试机制,保障了数据提取过程的稳定性。 为了适应业务需求,我们自定义了一套复杂的数据转换逻辑,将从汤臣倍健营销云获取到的数据格式转化为金蝶云星辰V2所能接受的格式。紧接着,通过调用金蝶云星辰V2提供的写入API ` /jdy/v2/scm/sal_out_bound`,将经过转换后的大批量数据快速写入系统。这一过程中,高吞吐量的数据写入能力显著提升了整体效率。 此外,为确保整个数据处理链条透明且可控,我们利用集中的监控和告警系统,实时跟踪各个任务节点,从而能够及时发现并解决潜在的问题。同时,通过可视化的数据流设计工具,对整个流程进行了直观管理,使得配置变更及故障排除更加便捷、高效。 以上技术点仅是这次项目的一部分内容,在后续文章中我们将进一步探讨具体实现细节,包括如何调用不同接口、应对各种异常情况以及优化策略应用等方面。 ![用友与MES系统接口开发配置](https://pic.qeasy.cloud/D24.png~tplv-syqr462i7n-qeasy.image) ### 调用汤臣倍健营销云接口获取并加工数据的技术实践 在数据集成生命周期的第一步,我们需要调用源系统汤臣倍健营销云的接口`/api/openapi/v1/erp/order/honour/agreement/header`来获取订单数据,并进行初步的数据清洗和加工。以下是具体的技术实现和相关配置细节。 #### 接口调用配置 首先,我们需要配置API调用的元数据。根据提供的元数据配置,以下是我们需要关注的字段和参数: - **API路径**:`/api/openapi/v1/erp/order/honour/agreement/header` - **请求方法**:POST - **请求参数**: - `orgId`(组织ID):固定值`e19363e8f33746b2a50c5577906a83bc` - `page`(页码):默认值为`1` - `id`(订单ID) - `applyerId`(要货方ID) - `supplierId`(供货方ID) - `no`(订单号) - `distributionType`(分销类型) - `distributorId`(分销商ID) - `orderStatus`(订单状态):多个状态值,用逗号分隔 - `createDt`(创建时间) - `orderTypeCode`(订单类型,如普通订单、直运销售) - `isDeliveryFreezed`(是否暂停发货) - `nature`(单据类型):固定值为`1` - `relatedApplyerId`(关联交易经销商ID) - `saleDistribution`(销售渠道) - `disApplyerId`(分销商ID) - `startDt`(订单时间开始) - `endDt`(订单时间结束) - `appStartDt`(审批时间开始) - `appEndDt`(审批时间结束) - `lastStartDt`(最后修改时间开始):动态值,使用模板变量`${LAST_SYNC_TIME|datetime}` - `lastEndDt`(最后修改时间结束):动态值,使用模板变量`${CURRENT_TIME|datetime}` #### 请求参数构建 在实际调用中,我们需要构建一个包含上述字段的JSON对象作为请求体。以下是一个示例请求体: ```json { "orgId": "e19363e8f33746b2a50c5577906a83bc", "page": "1", "orderStatus": "WAIT_FINANCE_AUDIT,WAIT_DELIVERY,PART_DELIVERY,ALL_DELIVERY,WAIT_OUT_STORAGE,PART_OUT_STORAGE,ALL_OUT_STORAGE,PART_CONFIRM,CONFIRM,AUDIT_SUCCESS,FINISH,CLOSE", "nature": "1", "lastStartDt": "{{LAST_SYNC_TIME|datetime}}", "lastEndDt": "{{CURRENT_TIME|datetime}}" } ``` #### 数据清洗与加工 在获取到原始数据后,需要对数据进行清洗和初步加工。这一步骤通常包括但不限于以下操作: 1. **字段过滤**:只保留必要的字段,去除冗余信息。 2. **格式转换**:将日期字符串转换为标准日期格式,将数值字符串转换为数值类型等。 3. **数据校验**:检查关键字段是否为空或格式是否正确,确保数据完整性。 例如,对于返回的数据结构,可以进行如下处理: ```python import json from datetime import datetime def clean_data(raw_data): cleaned_data = [] for record in raw_data: cleaned_record = { "order_id": record.get("id"), "order_no": record.get("no"), "status": record.get("orderStatus"), "created_at": datetime.strptime(record.get("createDt"), "%Y-%m-%d %H:%M:%S") if record.get("createDt") else None, # 添加其他必要字段处理 } # 校验关键字段 if not cleaned_record["order_id"] or not cleaned_record["order_no"]: continue cleaned_data.append(cleaned_record) return cleaned_data # 示例调用 raw_response = '[{"id":"12345","no":"ORD001","orderStatus":"WAIT_DELIVERY","createDt":"2023-10-01 12:00:00"}]' raw_data = json.loads(raw_response) cleaned_data = clean_data(raw_data) print(cleaned_data) ``` #### 实时监控与日志记录 为了确保数据集成过程中的透明度和可追溯性,我们需要对每次API调用及其结果进行实时监控和日志记录。这可以通过轻易云平台提供的监控工具来实现,也可以自定义日志记录机制。例如: ```python import logging logging.basicConfig(level=logging.INFO) def log_api_call(request_body, response_body): logging.info(f"API Request: {request_body}") logging.info(f"API Response: {response_body}") # 示例日志记录 request_body = { "orgId": "e19363e8f33746b2a50c5577906a83bc", "page": "1", # ...其他参数 } response_body = '[{"id":"12345","no":"ORD001","orderStatus":"WAIT_DELIVERY","createDt":"2023-10-01 12:00:00"}]' log_api_call(request_body, response_body) ``` 通过以上步骤,我们可以高效地从汤臣倍健营销云接口获取并加工订单数据,为后续的数据转换与写入奠定基础。 ![打通用友BIP数据接口](https://pic.qeasy.cloud/S6.png~tplv-syqr462i7n-qeasy.image) ### 轻易云数据集成平台生命周期第二步:ETL转换及写入金蝶云星辰V2 API接口 在数据集成过程中,ETL(Extract, Transform, Load)转换是至关重要的一环。本文将详细探讨如何将已经集成的源平台数据,通过轻易云数据集成平台,转换为金蝶云星辰V2 API接口所能接收的格式,并最终写入目标平台。 #### 数据请求与清洗 在开始ETL转换之前,我们需要确保数据已经从源系统成功提取并经过初步清洗。这一步通常包括对数据进行去重、补全和标准化处理。假设我们已经完成了这些步骤,现在可以直接进入数据转换和写入阶段。 #### 数据转换与写入 根据提供的元数据配置,我们需要将源平台的数据映射到金蝶云星辰V2 API接口所需的字段格式。以下是一个具体的技术案例: ##### 元数据配置解析 ```json { "api": "/jdy/v2/scm/sal_out_bound", "effect": "EXECUTE", "method": "POST", "number": "id", "id": "id", "name": "id", "idCheck": true, "request": [ {"field":"bill_source","label":"单据来源,固定传入ISV","type":"string","describe":"单据来源,固定传入APP","value":"ISV"}, {"field":"bill_date","label":"出库日期,格式:2019-01-01","type":"datetime","describe":"出库日期,格式:2019-01-01","value":"{{approveDt|date}}"}, {"field":"bill_no","label":"单据编码","type":"string","value":"{no}"}, {"field":"customer_id","label":"客户id","type":"string","describe":"客户信息","value":"_findCollection find id from b41660e7-fa00-318f-bbee-1395e229ee6b where number={clientAppNo}"}, {"field":"remark","label":"单据备注","type":"string","value":"{remark}"}, {"field":"contact_linkman","label":"联系信息-联系人","type":"string","value":"{contacts}"}, {"field":"contact_phone","label":"联系信息-联系方式","type":"string","value":"{phone}"}, {"field":"contact_address","label":"联系信息-详细地址","type":"string","value":"{shippingAddress}"}, {"field":"dept_id","label":"部门","type":"string","value":"1320359653724412928"}, {"field": "custom_field", "label": "自定义字段", "type": "object", "children": [ {"field": "custom_field__1__2xietiob41d0jia2", "label": "营销云备注", "type": "string", "value": "{no}"} ] }, { "field": "material_entity", "label": "商品分录", "type": "array", "describe": "商品分录", ... ``` ##### 关键字段映射 1. **bill_source**: 固定传入值"ISV",表示单据来源。 2. **bill_date**: 出库日期,使用模板变量`{{approveDt|date}}`来动态填充。 3. **bill_no**: 单据编码,对应源数据中的`{no}`。 4. **customer_id**: 客户ID,通过`_findCollection`函数从客户表中查找对应ID。 5. **remark**: 单据备注,对应源数据中的`{remark}`。 6. **contact_linkman**: 联系人信息,对应源数据中的`{contacts}`。 7. **contact_phone**: 联系方式,对应源数据中的`{phone}`。 8. **contact_address**: 详细地址,对应源数据中的`{shippingAddress}`。 9. **dept_id**: 部门ID,固定值"1320359653724412928"。 ##### 自定义字段和商品分录 自定义字段和商品分录部分较为复杂,需要特别注意: 1. **custom_field**: 包含一个子字段`custom_field__1__2xietiob41d0jia2`,其值为单据编号`{no}`。 2. **material_entity**: 商品分录是一个数组,每个元素包含多个字段,如下: - `material_number`: 商品编号,对应源数据中的`{productNo}`。 - `stock_id`: 仓库ID,固定值"1320319591469069312"。 - `qty`: 数量,对应源数据中的`{{details.quantity}}`。 - `unit_id`: 单位ID,通过`_findCollection`函数从单位表中查找对应ID。 - `tax_price`: 含税单价,对应源数据中的`{{details.basePrice}}`。 #### 数据写入操作 完成上述映射后,我们使用POST方法将转换后的JSON对象发送到金蝶云星辰V2 API接口: ```python import requests import json url = 'https://api.kingdee.com/jdy/v2/scm/sal_out_bound' headers = {'Content-Type': 'application/json'} data = { # 按照元数据配置生成的JSON对象 } response = requests.post(url, headers=headers, data=json.dumps(data)) if response.status_code == 200: print("Data successfully written to Kingdee Cloud.") else: print("Failed to write data:", response.text) ``` 通过上述步骤,我们实现了从轻易云平台提取、转换并写入金蝶云星辰V2 API接口的全过程。这不仅确保了不同系统间的数据无缝对接,也极大提升了业务流程的自动化程度。 ![泛微OA与ERP系统接口开发配置](https://pic.qeasy.cloud/T21.png~tplv-syqr462i7n-qeasy.image)