ETL转换与写入:如何将数据集成至金蝶云星空

  • 轻易云集成顾问-卢剑航
### 聚水潭·奇门数据集成到金蝶云星空案例分享:[自动]-02销售出库同步(毛毛大鸭梨) 在本次技术案例中,将着重介绍如何通过系统化的方法,实现聚水潭·奇门的销售出库数据无缝对接至金蝶云星空。方案名称为[自动]-02销售出库同步(毛毛大鸭梨),核心是利用API接口及高效的数据处理机制,确保数据从源头到目标系统的准确、高效和可靠传输。 #### 接口调用与初始配置 首先,需要调用聚水潭·奇门的`jushuitan.saleout.list.query`接口来获取销售出库数据。在实际应用中,这一过程必须保证不漏单且快速抓取大批量数据,因此定时可靠的数据抓取机制尤为关键。通过设定合理的时间间隔,可以有效避免频繁或延迟的问题,从而保证实时性和准确性。 ```json { "api_name": "jushuitan.saleout.list.query", "params": { // 参数设置如页码、每页条数等具体细节 } } ``` #### 数据格式转换与映射 在获取原始数据后,我们需要将其转换为金蝶云星空所能识别的数据格式。这一过程中会涉及字段映射及内容转换。例如,聚水潭·奇门中的订单信息需匹配金蝶云星空中的相应字段,并进行必要的数据清洗与规范化处理。 针对这一场景,可设计一个灵活可扩展的数据映射模板,使得不同环境下都能迅速适配并简易调整。当所有字段均已完成匹配后,就可以开始进行正式写入操作。 #### 快速批量写入 接下来,通过调用金蝶云星空的`batchSave`接口,将转换后的数据提交到目标系统。这一步骤要求能够支持大量并发请求,以实现海量数据的快速集成。同时,为防止网络异常或其他意外情况,我们引入了错误重试机制,保障每条记录都能成功写入。 ```json { "api_name": "batchSave", "data": [ // 批量待保存的数据列表 ] } ``` 通过以上步骤,即可实现从聚水潭·奇门获取销售出库信息,并将其整合至金蝶云星空系统中。但为了进一步提升整个流程的透明度和维护效率,还需要加入实时监控与日志记录功能。这不仅有助于问题追踪,更能帮助我们及时发现潜在风险点并采取相应措施。 下一步将详细阐述分页限流策略、异常处理以及最终整体流程优化方案。 ![用友与MES系统接口开发配置](https://pic.qeasy.cloud/D35.png~tplv-syqr462i7n-qeasy.image) ### 调用聚水潭·奇门接口jushuitan.saleout.list.query获取并加工数据 在轻易云数据集成平台的生命周期中,调用源系统接口是至关重要的一步。本文将深入探讨如何通过调用聚水潭·奇门接口`jushuitan.saleout.list.query`来获取销售出库数据,并进行初步加工。 #### 接口配置与请求参数 首先,我们需要配置接口的基本信息和请求参数。根据元数据配置,接口采用POST方法,主要参数如下: - **page_index**: 页数,从第一页开始,默认值为1。 - **page_size**: 每页行数,默认25,最大25。尽管默认值为25,但我们将其设置为50,以确保每次请求获取更多数据。 - **start_time**: 修改开始时间,通过动态变量`{{LAST_SYNC_TIME|datetime}}`获取上次同步时间。 - **end_time**: 修改结束时间,通过动态变量`{{CURRENT_TIME|datetime}}`获取当前时间。 - **status**: 单据状态,此处设置为"Confirmed",表示已出库的单据。 - **shop_id**: 店铺ID,此处固定为11905455。 以下是请求参数的JSON格式: ```json { "page_index": "1", "page_size": "50", "start_time": "{{LAST_SYNC_TIME|datetime}}", "end_time": "{{CURRENT_TIME|datetime}}", "status": "Confirmed", "shop_id": "11905455" } ``` #### 数据请求与清洗 在发送请求后,我们会收到包含销售出库数据的响应。此时,需要对数据进行初步清洗和验证,以确保其完整性和准确性。 1. **分页处理**:由于每页最多返回50条记录,因此需要通过循环处理分页逻辑,直到所有数据被完全获取。 2. **字段验证**:确保每条记录包含必要的字段,如`io_id`(唯一标识符),以便后续处理。 3. **时间格式转换**:将日期时间字段转换为标准格式,以便于后续的数据存储和分析。 以下是一个示例代码片段,用于处理分页和字段验证: ```python import requests import json def fetch_data(page_index, start_time, end_time): url = "https://api.jushuitan.com/saleout/list/query" headers = {"Content-Type": "application/json"} payload = { "page_index": str(page_index), "page_size": "50", "start_time": start_time, "end_time": end_time, "status": "Confirmed", "shop_id": "11905455" } response = requests.post(url, headers=headers, data=json.dumps(payload)) data = response.json() # 验证字段 for record in data.get("data", []): if not record.get("io_id"): raise ValueError("Missing io_id in record") return data.get("data", []) # 示例调用 all_data = [] page_index = 1 start_time = "{{LAST_SYNC_TIME|datetime}}" end_time = "{{CURRENT_TIME|datetime}}" while True: page_data = fetch_data(page_index, start_time, end_time) if not page_data: break all_data.extend(page_data) page_index += 1 ``` #### 异常处理与补偿机制 在实际操作中,可能会遇到网络异常或接口返回错误等情况。为了确保数据完整性,需要设计异常处理和补偿机制。例如,可以通过定时任务(crontab)定期检查并重新发起失败的请求。 元数据配置中的补偿机制示例如下: ```json { "crontab": "2 0 * * *", "takeOverRequest": [ { "id": "start_timeYrXph", "field": "start_time", "label": "修改开始时间", ... "value": "{{DAYS_AGO_3|datetime}}" } ] } ``` 该配置表示每天凌晨0点2分执行一次任务,将修改开始时间设置为三天前,以确保遗漏的数据能够被重新获取。 #### 数据转换与写入 在完成数据清洗后,需要将其转换为目标系统所需的格式,并写入数据库或其他存储系统。这一步通常包括字段映射、类型转换等操作。 以下是一个简单的数据转换示例: ```python def transform_data(record): return { "id": record["io_id"], ... # 根据目标系统需求进行字段映射和转换 } transformed_data = [transform_data(record) for record in all_data] # 将转换后的数据写入数据库或其他存储系统 # write_to_db(transformed_data) ``` 通过上述步骤,我们可以高效地调用聚水潭·奇门接口`jushuitan.saleout.list.query`获取销售出库数据,并进行初步加工,为后续的数据处理打下坚实基础。 ![钉钉与ERP系统接口开发配置](https://pic.qeasy.cloud/S10.png~tplv-syqr462i7n-qeasy.image) ### 使用轻易云数据集成平台进行ETL转换并写入金蝶云星空API接口 在数据集成的生命周期中,第二步是将已经集成的源平台数据进行ETL(提取、转换、加载)转换,转为目标平台能够接收的格式,并最终写入目标平台。本文将详细探讨如何使用轻易云数据集成平台,将销售出库数据转换为金蝶云星空API接口所能接收的格式,并写入目标平台。 #### 元数据配置解析 在本案例中,我们需要将销售出库的数据通过ETL过程转换为金蝶云星空API接口所能接受的格式。以下是元数据配置的详细解析: ```json { "api": "batchSave", "method": "POST", "idCheck": true, "operation": { "rowsKey": "array", "rows": 1, "method": "batchArraySave" }, "request": [ { "field": "FBillTypeID", "label": "单据类型", "type": "string", "describe": "单据类型", "parser": { "name": "ConvertObjectParser", "params": "FNumber" }, "value": "XSCKD01_SYS" }, { ... } ], ... } ``` #### 请求字段解析 1. **FBillTypeID**: 单据类型,固定值为`XSCKD01_SYS`,使用`ConvertObjectParser`进行解析。 2. **FBillNo**: 单据编号,值为`{io_id}`,直接从源数据中提取。 3. **FDate**: 日期,值为`{io_date}`,直接从源数据中提取。 4. **FSaleOrgId**: 销售组织,根据`shop_id`进行条件判断和赋值。 5. **FCustomerID**: 客户,通过`ConvertObjectParser`进行解析,并且有映射关系。 6. **FSaleDeptID**: 销售部门,根据`sku_id`进行条件判断和赋值。 7. **F_TLWD_Text**: 平台单号,值为`{so_id}`。 8. **F_TLWD_Assistant**: 店铺,通过`ConvertObjectParser`进行解析,并且有映射关系。 9. **FNote**: 备注字段,可选填。 #### 明细信息(FEntity) 明细信息是一个数组,每个元素包含以下字段: 1. **FMaterialID**: 物料编码,通过`ConvertObjectParser`进行解析。 2. **FTaxPrice**: 含税单价,从源数据中的`sale_price`字段提取。 3. **FRealQty**: 实发数量,从源数据中的`qty`字段提取。 4. **FIsFree**: 是否赠品,根据`sale_price`是否为0来判断。 5. **FStockID**: 仓库,通过`ConvertObjectParser`进行解析。 6. **FAmount**: 金额,从源数据中的`sale_amount`字段提取。 7. **FUnitID**: 库存单位,固定值为"Pcs",通过`ConvertObjectParser`进行解析。 8. **FOwnerTypeID**和**FOwnerID**:货主ID,根据不同条件赋值,并通过解析器处理。 #### 财务信息(SubHeadEntity) 财务信息包含整单折扣额: 1. **FAllDisCount**: 整单折扣额,从源数据中的`free_amount`字段提取。 #### 其他请求参数 1. **FormId**:业务对象表单Id,固定值为"SAL_OUTSTOCK"。 2. **Operation**:执行的操作,固定值为"Save"。 3. **IsAutoSubmitAndAudit**:提交并审核,布尔值true。 4. **IsVerifyBaseDataField**:验证基础资料有效性,布尔值true。 5. **SubSystemId**:系统模块,固定值21(仓库模块)。 6. **InterationFlags**:是否允许负库存提醒,固定值"STK_InvCheckResult"。 #### 数据转换与写入 在完成上述元数据配置后,可以利用轻易云的数据集成平台实现以下步骤: 1. 提取源平台的数据,根据配置文件中的字段映射和规则进行转换。 2. 使用配置好的API接口参数,将转换后的数据通过POST请求批量保存到金蝶云星空系统中。 通过以上步骤,可以实现销售出库数据从源平台到金蝶云星空系统的无缝对接,有效提升了业务流程的自动化和效率。 ![系统集成平台API接口配置](https://pic.qeasy.cloud/T18.png~tplv-syqr462i7n-qeasy.image)