使用轻易云进行ETL转换并写入金蝶云星辰V1的技术实现

  • 轻易云集成顾问-黄宏棵
### 旺店通·企业奇门采购入库单数据集成到金蝶云星辰V1技术分享 在本案例中,我们将聚焦于如何利用轻易云数据集成平台,实现旺店通·企业奇门中的采购入库单数据(通过接口wdt.stockin.order.query.purchase获取)无缝对接至金蝶云星辰V1系统(接口为jdy/pur/pur_inbound_save)。此过程不仅要求高效的数据传输和写入能力,还需要确保整个过程中数据的准确性和可靠性。 #### 方案概述:wk_采购入库单 首先,针对业务需求,在轻易云配置了名为“wk_采购入库单”的方案。该方案的核心是实现从旺店通·企业奇门实时抓取采购入库单的数据,并批量导入到金蝶云星辰V1系统,由此提升业务处理效率并减少手工操作的风险。 ##### 数据抓取与预处理: 使用`wdt.stockin.order.query.purchase`接口从旺店通·企业奇门系统定时获取最新的采购入库数据信息。为了应对API调用过程中可能遇到的数据分页及限流问题,设定了合适的轮询机制。同时,通过自定义转换逻辑,对原始数据进行预处理,以符合目标系统所需格式。 ##### 数据写入与质量监控: 通过访问`jdy/pur/pur_inbound_save` API,将格式化后的数据一次性交付到金蝶云星辰V1。在这个过程中,高吞吐量的数据写入特性保障了大量数据信息能够快速被目标系统接受。此外,通过集中监控和告警功能,可以实时跟踪各项任务状态,并及时响应异常情况,从而大大提高整体运行稳定性。 下一步,我们将详细探讨具体实施步骤及相关代码配置,以供大家参考与实践。 --- 这便是开篇部分,你可以在后续内容中添加更多技术细节、代码示例以及实际操作步骤等信息。如果有其它需要调整或补充的信息,请随时告知! ![打通用友BIP数据接口](https://pic.qeasy.cloud/D4.png~tplv-syqr462i7n-qeasy.image) ### 调用源系统旺店通·企业奇门接口wdt.stockin.order.query.purchase获取并加工数据 在轻易云数据集成平台中,调用源系统接口是数据处理生命周期的第一步。本文将详细探讨如何通过配置元数据,调用旺店通·企业奇门接口`wdt.stockin.order.query.purchase`来获取采购入库单数据,并进行初步的数据加工。 #### 接口调用配置 首先,我们需要配置接口调用的元数据。根据提供的元数据配置,接口`wdt.stockin.order.query.purchase`采用POST方法进行请求,主要参数如下: - **start_time**: 开始时间,用于增量获取数据,格式为`yyyy-MM-dd HH:mm:ss`。 - **end_time**: 结束时间,用于增量获取数据,格式为`yyyy-MM-dd HH:mm:ss`。 - **status**: 入库单状态,默认值为80(已完成)。 - **src_order_no**: 上层单据编号,可选参数。 - **warehouse_no**: 仓库编号,用于指定仓库的单据信息。 此外,还有分页参数: - **page_size**: 每页返回的数据条数,范围1~100。 - **page_no**: 页号,从0开始。 #### 元数据配置解析 以下是元数据配置的具体内容: ```json { "api": "wdt.stockin.order.query.purchase", "method": "POST", "number": "order_no", "id": "stockin_id", "pagination": { "pageSize": 100 }, "idCheck": true, "request": [ { "field": "start_time", "label": "开始时间", "type": "datetime", "describe": "增量获取数据,start_time作为开始时间,格式:yyyy-MM-dd HH:mm:ss", "value": "{{LAST_SYNC_TIME|datetime}}" }, { "field": "end_time", "label": "结束时间", "type": "datetime", "describe": "增量获取数据,end_time作为结束时间,格式:yyyy-MM-dd HH:mm:ss", "value": "{{CURRENT_TIME|datetime}}" }, { "field": "status", "label": "状态", "type": "string", "describe": "入库单状态 10已取消20编辑中25待价格确认30待审核60待结算80已完成(默认80)", "value": "80" }, { "field": "src_order_no", "label": "上层单据编号", "type": "string", "describe":"上层单据编号,默认为采购单号,传该字段可以不传开始时间和结束时间" }, { “field”: “warehouse_no”, “label”: “仓库编号”, “type”: “string”, “describe”: “代表仓库所有属性的唯一编码,用于仓库区分,ERP内支持自定义(ERP仓库界面设置),用于获取指定仓库单据数据信息(不支持一次推送多个仓库编号)” } ], “otherRequest”: [ { “field”: “page_size”, “label”: “分页大小”, “type”: “string”, “describe”: “每页返回的数据条数,输入值范围1~100,不传本参数,输入值默认为40”, “value”: "{PAGINATION_PAGE_SIZE}" }, { “field”:“page_no”, “label”:“页号”, “type”:“string”, “describe”:“不传值默认从0页开始”, “value”: "{PAGINATION_START_PAGE}" } ] } ``` #### 数据请求与清洗 在实际操作中,我们会通过轻易云平台发起API请求,并根据上述配置动态填充参数。例如: ```json { 'start_time': '2023-01-01 00:00:00', 'end_time': '2023-01-31 23:59:59', 'status': '80', 'page_size': '100', 'page_no': '0' } ``` 请求成功后,将返回包含采购入库单信息的数据集。接下来,我们需要对这些原始数据进行清洗和预处理,以便后续的数据转换与写入步骤。 #### 数据清洗步骤 1. **字段筛选与重命名**:根据业务需求筛选必要字段,并对字段进行重命名。例如,将`order_no`重命名为`purchase_order_number`。 2. **类型转换**:确保日期、数字等字段类型正确。例如,将字符串类型的日期转换为标准日期类型。 3. **缺失值处理**:处理缺失值或异常值,例如填充默认值或删除异常记录。 以下是一个简单的数据清洗示例: ```python import pandas as pd # 假设data是从API返回的原始数据 data = [ {"order_no":"PO12345", ...}, {"order_no":"PO12346", ...}, ] # 转换为DataFrame df = pd.DataFrame(data) # 字段重命名 df.rename(columns={'order_no': 'purchase_order_number'}, inplace=True) # 类型转换 df['purchase_date'] = pd.to_datetime(df['purchase_date']) # 缺失值处理 df.fillna({'warehouse_code': 'UNKNOWN'}, inplace=True) ``` 通过上述步骤,我们可以确保从旺店通·企业奇门接口获取的数据经过清洗后符合业务需求,为后续的数据转换与写入打下坚实基础。 ![打通企业微信数据接口](https://pic.qeasy.cloud/S27.png~tplv-syqr462i7n-qeasy.image) ### 使用轻易云数据集成平台进行ETL转换并写入金蝶云星辰V1 API接口的技术案例 在数据集成生命周期的第二步,我们需要将已经集成的源平台数据进行ETL(提取、转换、加载)转换,并最终写入目标平台——金蝶云星辰V1 API接口。以下是详细的技术实现过程。 #### 1. 数据提取与清洗 首先,我们从源系统中提取原始数据,并对其进行必要的清洗和预处理。假设我们已经完成了这一阶段,接下来我们将重点放在如何将这些数据转换为金蝶云星辰V1 API所能接受的格式。 #### 2. 数据转换与映射 根据提供的元数据配置,我们需要将源数据字段映射到目标API所需的字段。这一过程主要包括以下步骤: - **忽略操作确认提示**:当价格不传或等于0时,需要设置`ignoreconfirm`为`true`。 - **单据编码**:使用源数据中的订单编号`order_no`来填充API请求中的`billno`字段。 - **商品分录**:这是一个复杂的数据结构,需要逐项映射: - **商品ID**:通过查找集合获取商品ID,查询条件为商品编号`details_list.spec_no`。 - **仓库ID**:通过查找集合获取仓库ID,查询条件为仓库名称`warehouse_name`。 - **是否赠品**:通过判断含税单价`details_list.tax_price`是否大于0来决定是否为赠品。 - **数量**:直接使用源数据中的数量字段`details_list.right_num`。 - **含税单价**、**税率**等其他字段也需要从源数据中获取相应值。 具体的映射配置如下: ```json { "api": "jdy/pur/pur_inbound_save", "method": "POST", "idCheck": true, "request": [ { "field": "ignoreconfirm", "label": "忽略操作确认提示", "type": "string", "describe": "忽略操作确认提示,当price不传或等于0时ignoreconfirm需要为true", "value": "true" }, { "field": "billno", "label": "单据编码", "type": "string", "describe": "单据编码", "value": "{order_no}" }, { "field": "material_entity", "label": "商品分录", "type": "array", ... }, ... ] } ``` #### 3. 数据写入目标平台 在完成数据转换后,我们需要将这些数据通过API接口写入到金蝶云星辰V1系统。以下是具体的API调用示例: ```python import requests import json url = 'https://api.kingdee.com/jdy/pur/pur_inbound_save' headers = {'Content-Type': 'application/json'} data = { 'ignoreconfirm': 'true', 'billno': source_data['order_no'], 'material_entity': [ { 'materialid_id': find_material_id(source_data['details_list']['spec_no']), 'stockid_id': find_stock_id(source_data['warehouse_name']), 'is_free': 'False' if source_data['details_list']['tax_price'] > 0 else 'True', 'qty': source_data['details_list']['right_num'], ... } ], ... } response = requests.post(url, headers=headers, data=json.dumps(data)) if response.status_code == 200: print('Data successfully written to Kingdee Cloud.') else: print('Failed to write data:', response.text) ``` 在上述代码中,函数 `find_material_id()` 和 `find_stock_id()` 用于根据提供的条件从集合中查找对应的ID。这些函数可以通过数据库查询或其他方式实现。 #### 4. 实时监控与错误处理 为了确保数据成功写入,我们需要实时监控API调用的响应状态,并处理可能出现的错误。例如,如果响应状态码不是200,则记录错误日志并采取相应措施。 ```python if response.status_code != 200: log_error(response.text) ``` 通过上述步骤,我们实现了从源系统到金蝶云星辰V1系统的数据ETL转换和写入。这个过程不仅确保了数据的一致性和完整性,还提高了业务流程的自动化程度。 ![如何对接企业微信API接口](https://pic.qeasy.cloud/T17.png~tplv-syqr462i7n-qeasy.image)