使用ETL进行数据转换并写入金蝶云星辰V2的技术案例

  • 轻易云集成顾问-胡秀丛
### 汤臣倍健营销云数据集成到金蝶云星辰V2技术案例分享 在企业信息化系统的复杂环境中,数据同步和集成是确保运营高效、数据一致性的重要任务。本文将详细解读汤臣倍健营销云的数据如何通过轻易云平台无缝对接到金蝶云星辰V2。在这个过程中,我们遇到了诸如大规模数据写入、接口分页与限流处理以及异常处理机制等多个关键挑战。 为实现新版订单-出库单(Life-Space)亿倍盛方案的成功上线,我们重点解决了以下几个关键问题: 1. **调用汤臣倍健营销云API接口**:我们从汤臣倍健营销云获取订单数据,使用/api/openapi/v1/erp/order/honour/agreement/header进行实时抓取。为了确保不漏单,我们配置了定时任务,有效避免了遗漏或重复数据的问题。 2. **大量数据快速写入金蝶云星辰V2**:利用轻易云平台支持的大批量、高并发的数据处理能力,将获取到的订单信息迅速写入到金蝶云星辰V2的/jdy/v2/scm/sal_out_bound接口。这不仅提高了整体效率,还保证了实时性和准确性。 3. **分页与限流处理**:应对不同API的请求限制,是系统集成中的一大难点。通过合理设计分页逻辑及限流策略,从源头上解决汤臣倍健营销后台返回的数据量限制问题,使得整体流程稳定顺畅。 4. **异常处理与重试机制**:在对接过程中,一旦出现网络波动或者接口响应异常情况,通过设立完善的错误捕获与重试机制,确保每条业务记录都能够准确地传递和存储。必要时重新拉取未成功的数据,提高系统鲁棒性。 5. **跨平台格式转换及映射**:针对不同系统之间可能存在的数据格式差异,通过自定义字段映射规则,在导入过程前就完成有效转化。这一步骤在保障正确性的同时,也减少后期维护工作的难度。 本案例所采用的一系列技术手段,不仅展示了一种高效可靠的数据集成方法,更为类似项目提供了解决思路和实战经验。在后续内容中,将进一步细化各项具体实现步骤及其技术反馈。 ![用友与WMS系统接口开发配置](https://pic.qeasy.cloud/D28.png~tplv-syqr462i7n-qeasy.image) ### 调用汤臣倍健营销云接口获取并加工数据 在数据集成生命周期的第一步,我们需要从源系统获取数据并进行初步加工。本文将详细探讨如何通过调用汤臣倍健营销云接口 `/api/openapi/v1/erp/order/honour/agreement/header` 来实现这一过程。 #### 接口调用配置 首先,我们需要配置接口调用的元数据。根据提供的元数据配置,接口使用 `POST` 方法进行请求,主要参数如下: - `orgId`: 组织ID - `page`: 页码 - `id`: 订单ID - `applyerId`: 要货方ID - `supplierId`: 供货方ID - `no`: 订单号 - `distributionType`: 分销类型 - `distributorId`: 分销商ID - `orderStatus`: 订单状态(多个状态以逗号分隔) - `createDt`: 创建时间 - `orderTypeCode`: 订单类型(如普通订单、直运销售) - `isDeliveryFreezed`: 是否暂停发货 - `nature`: 单据类型(1订单、2退货) - `relatedApplyerId`: 关联交易经销商ID - `saleDistribution`: 销售渠道 - `disApplyerId`: 分销商ID - `startDt`: 订单时间(开始) - `endDt`: 订单时间(结束) - `appStartDt`: 审批时间(开始) - `appEndDt`: 审批时间(结束) - `lastStartDt`: 最后修改时间(开始) - `lastEndDt`: 最后修改时间(结束) 其中,`lastStartDt` 和 `lastEndDt` 的值分别为上次同步时间和当前时间,用于增量数据的获取。 #### 请求构建 构建请求时,需要将上述参数按照 JSON 格式封装,并发送到指定的 API 接口。以下是一个示例请求体: ```json { "orgId": "adf0b371eab14b68ab21dff39a275e18", "page": "1", "orderStatus": "WAIT_FINANCE_AUDIT,WAIT_DELIVERY,PART_DELIVERY,ALL_DELIVERY,WAIT_OUT_STORAGE,PART_OUT_STORAGE,ALL_OUT_STORAGE,PART_CONFIRM,CONFIRM,AUDIT_SUCCESS,FINISH,CLOSE", "nature": "1", "lastStartDt": "{{LAST_SYNC_TIME|datetime}}", "lastEndDt": "{{CURRENT_TIME|datetime}}" } ``` #### 数据处理 在接收到返回的数据后,需要对其进行初步清洗和加工。这一步骤包括但不限于: 1. **字段映射**:将返回的数据字段映射到目标系统所需的字段。例如,将返回的订单状态映射为目标系统中的相应状态代码。 2. **数据过滤**:根据业务需求过滤掉不必要的数据。例如,只保留特定状态的订单记录。 3. **格式转换**:将日期、金额等字段转换为目标系统所需的格式。 以下是一个简单的数据处理示例: ```python def process_data(response_data): processed_data = [] for record in response_data: processed_record = { "order_id": record["id"], "order_status": map_order_status(record["orderStatus"]), "created_date": convert_date_format(record["createDt"]), # 更多字段处理... } processed_data.append(processed_record) return processed_data def map_order_status(status): status_mapping = { "WAIT_FINANCE_AUDIT": "待财务审核", # 更多状态映射... } return status_mapping.get(status, status) def convert_date_format(date_str): from datetime import datetime date_obj = datetime.strptime(date_str, "%Y-%m-%dT%H:%M:%S") return date_obj.strftime("%Y-%m-%d %H:%M:%S") ``` #### 自动填充响应 在轻易云平台中,可以启用自动填充响应功能,以便更高效地处理返回的数据。这一功能可以自动将 API 返回的数据填充到预定义的数据模型中,减少手动处理的工作量。 #### 实时监控与调试 为了确保数据集成过程顺利进行,需要实时监控 API 调用和数据处理的状态。轻易云平台提供了可视化监控界面,可以实时查看每个环节的数据流动和处理情况。如果出现错误或异常,可以通过日志和调试工具快速定位问题并解决。 通过以上步骤,我们可以高效地调用汤臣倍健营销云接口获取并加工数据,为后续的数据转换与写入做好准备。这一过程不仅提升了业务透明度和效率,也为实现不同系统间的数据无缝对接奠定了基础。 ![如何对接企业微信API接口](https://pic.qeasy.cloud/S18.png~tplv-syqr462i7n-qeasy.image) ### 使用轻易云数据集成平台进行ETL转换:集成新版订单至金蝶云星辰V2API接口 在数据集成过程中,将源平台的数据转换为目标平台能够接收的格式是至关重要的一步。本文将详细探讨如何使用轻易云数据集成平台将已经集成的源平台数据进行ETL(Extract, Transform, Load)转换,并最终写入金蝶云星辰V2API接口。 #### 配置元数据 首先,我们需要根据目标平台的API接口要求配置元数据。以下是我们针对金蝶云星辰V2API接口的元数据配置: ```json { "api": "/jdy/v2/scm/sal_out_bound", "effect": "EXECUTE", "method": "POST", "number": "id", "id": "id", "name": "id", "idCheck": true, "request": [ {"field":"bill_source","label":"单据来源,固定传入ISV","type":"string","describe":"单据来源,固定传入APP","value":"ISV"}, {"field":"bill_date","label":"出库日期,格式:2019-01-01","type":"datetime","describe":"出库日期,格式:2019-01-01","value":"{{approveDt|date}}"}, {"field":"bill_no","label":"单据编码","type":"string","value":"{no}"}, {"field":"customer_id","label":"客户id","type":"string","describe":"客户信息","value":"_findCollection find id from b41660e7-fa00-318f-bbee-1395e229ee6b where number={clientAppNo}"}, {"field":"remark","label":"单据备注","type":"string","value":"{remark}"}, {"field":"contact_linkman","label":"联系信息-联系人","type":"string","value":"{contacts}"}, {"field":"contact_phone","label":"联系信息-联系方式","type":"string","value":"{phone}"}, {"field":"contact_address","label":"联系信息-详细地址","type":"string","value":"{shippingAddress}"}, {"field":"dept_id","label":"部门","type":"string","value":"1320359836042421248"}, {"field":"custom_field","label":"自定义字段","type": "object", "children":[{"field": "custom_field__1__2xietiob41d0jia2", "label": "营销云备注", "type": "string", "value": "{no}"}]}, { "field": "material_entity", "label": "商品分录", "type": "array", "describe": "商品分录", "value": "details", "children":[ {"field": "material_number", "label": "商品", "type": "string", "describe": "商品", "value": "{productNo}"}, {"field": "stock_id", "label": "仓库", "type": "string", "describe": "仓库", "value": "1320321537273803776"}, {"field": "qty", "label": "数量", "type": "string", "describe": "数量", "value": "{{details.quantity}}"}, {"field": "unit_id", "label": "单位id", "type": "string", "describe": "单位(只能填商品关联的单位,来源于商品单位查询接口或商品详情中的baseunitid_id、fixunit1_id等)", value": "_findCollection find base_unit_id from 395d1591-aee0-3f10-ad09-d5593167af48 where number={{details.extNo}}" }, {"field": " tax_price ", " label ": " 含税单价 ", " type ": " string ", " describe ": value: "{{details.basePrice}}" }, { field: is_free, label: 是否赠品, type: string, describe: 是否赠品;true-是,false-否(默认), value: false }, { field: batch_no, label: 批次, type: string, describe: 库位_id, value: {{details._Flot}} }, { field: kf_date, label: 生产日期, type: string, describe: 生产日期, value: {{details._Fmfg}} }, { field: valid_date, label: 有效日期, type: string, describe: 有效日期, value: {{details._Fexp}} }, { field:kf_period,label:"保质期天数",type:"string",describe:"保质期天数",value:"730" }, { field:kf_type,label:"保质期类型",type:"string",describe:"保质期类型",value:"1" }, { field:"dis_amount",label:"折扣额",type:"string",value:"_function ({basePrice} - {price}) * {quantity}" }, { field:"cess",label:"税率",type:"string" } ] } ] } ``` #### ETL转换过程 在ETL过程中,我们首先从源系统提取数据,然后进行必要的数据清洗和转换,最后将转换后的数据加载到目标系统中。以下是具体步骤: 1. **提取数据**:从源系统中提取订单和出库单相关的数据。 2. **清洗和转换**: - 将`approveDt`字段转换为符合`bill_date`要求的日期格式。 - 使用模板字符串将`clientAppNo`映射到`customer_id`。 - 将其他字段如`remark`, `contacts`, `phone`, `shippingAddress`等直接映射到对应的目标字段。 - 对于复杂对象如`material_entity`,需要对每个子项进行相应的处理,包括数量、价格、批次等字段的转换。 3. **加载数据**:使用POST方法将处理好的数据通过API接口发送到金蝶云星辰V2。 #### 示例代码 以下是一个简单的Python示例代码,用于展示如何实现上述ETL过程: ```python import requests import json # 定义源数据 source_data = { # 源数据内容... } # 转换函数 def transform_data(source): target = {} # 映射简单字段 target['bill_source'] = 'ISV' target['bill_date'] = source['approveDt'] target['bill_no'] = source['no'] # 映射复杂字段 target['customer_id'] = find_customer_id(source['clientAppNo']) # 商品分录处理 target['material_entity'] = [] for item in source['details']: material_item = { 'material_number': item['productNo'], 'stock_id': '1320321537273803776', 'qty': item['quantity'], 'unit_id': find_unit_id(item['extNo']), 'tax_price': item['basePrice'], 'is_free': 'false', 'batch_no': item['_Flot'], 'kf_date': item['_Fmfg'], 'valid_date': item['_Fexp'], 'kf_period': '730', 'kf_type': '1', 'dis_amount': (item['basePrice'] - item['price']) * item['quantity'] } target['material_entity'].append(material_item) return target # 查找客户ID函数示例 def find_customer_id(client_app_no): # 模拟查找客户ID逻辑... return 'example_customer_id' # 查找单位ID函数示例 def find_unit_id(ext_no): # 模拟查找单位ID逻辑... return 'example_unit_id' # 转换数据并发送请求 transformed_data = transform_data(source_data) response = requests.post('https://api.kingdee.com/jdy/v2/scm/sal_out_bound', json=transformed_data) print(response.status_code) print(response.json()) ``` 通过以上步骤和代码示例,我们可以高效地将源平台的数据转换为金蝶云星辰V2API接口所需的格式,并成功写入目标平台。 ![如何开发企业微信API接口](https://pic.qeasy.cloud/T20.png~tplv-syqr462i7n-qeasy.image)