轻易云平台进行ETL转换和写入KIS私有云的技术案例

  • 轻易云集成顾问-卢剑航
### 聚水潭数据集成到KIS私有云案例分享 在本次技术案例中,我们探讨了如何实现聚水潭系统的采购退货单数据高效对接到KIS私有云。这一过程中,轻易云提供了全生命周期管理和可视化操作界面,大大简化了整个集成流程。具体解决方案是通过聚水潭-采购退货单——>KIS-采购退货单Done。 #### 核心问题与解决方案 1. **确保数据不漏单**:为了防止任何漏单现象的发生,我们调用聚水潭的API接口`/open/purchaseout/query`进行数据抓取,并使用定时任务机制来定期检查和更新最新的数据状态。此外,通过实时监控与日志记录,可以及时捕捉并处理异常情况。 2. **批量快速写入**:面对大量业务数据需要快速写入至KIS私有云的问题,此方案采用了并发处理和批量提交技术。调用KIS API接口`/koas/app007104/api/purchasereceipt/create`进行多线程并行写入,从而提升整体效率。 3. **分页与限流处理**:由于聚水潭API对此类请求存在分页和限流限制,需特别设计分段拉取策略,即按照固定页数、每页条数逐步获取,并配置合理的重试机制以应对可能的请求失败或超时情况。 4. **格式差异转换**:不同系统间的数据格式常存在差异,尤其是字段命名、类型要求等。在此项目中,通过自定义转换器将从聚水潭获取的数据结构调整为符合KIS私有云要求的数据格式,包括字段映射及类型转换,以确保准确无误地上传目标平台。 5. **异常处理与错误重试机制**:在实际环境中,不可避免会遇到网络不稳定或其他突发状况引起的数据传输失败。本方案设计了一套完善的异常捕获及自动重试策略,对接收端返回结果做详细判断,对于未成功提交的数据,会进入队列等待下次尝试提交,从而最大程度保证数据完整性。 通过上述核心功能模块,本次集成实施不仅高效且稳妥地完成了从聚水潭到KIS私有云的数据迁移过程,为后续更多业务场景中的跨平台对接打下坚实基础。在整合系统资源、优化操作环节方面,也展现出极大的灵活性与扩展能力。 ![轻易云数据集成平台金蝶集成接口配置](https://pic.qeasy.cloud/D39.png~tplv-syqr462i7n-qeasy.image) ### 调用聚水潭接口获取并加工数据的技术实现 在轻易云数据集成平台中,调用源系统聚水潭接口`/open/purchaseout/query`是数据集成生命周期的第一步。本文将详细探讨如何通过该接口获取并加工数据,以便后续的数据转换与写入操作。 #### 接口配置与调用 首先,我们需要了解接口的基本配置和调用方式。根据提供的元数据配置,聚水潭接口`/open/purchaseout/query`使用POST方法进行请求,主要用于查询采购退货单信息。以下是具体的请求参数及其说明: - `page_index`: 第几页,从1开始。 - `page_size`: 每页数量,最大不超过50。 - `modified_begin`: 修改起始时间,格式为字符串。起始时间和结束时间必须同时存在,且时间间隔不能超过七天,与采购单号不能同时为空。 - `modified_end`: 修改结束时间,格式为字符串。起始时间和结束时间必须同时存在,且时间间隔不能超过七天,与采购单号不能同时为空。 - `po_ids`: 采购单号列表,与修改时间不能同时为空。采购单号最大不能超过30条。 - `io_ids`: 采购入库单号列表,与修改时间不能同时为空。采购入库单号最大不能超过30条。 - `so_ids`: 线上单号,与修改时间不能同时为空。 #### 请求参数自动填充 为了确保请求参数的动态性和准确性,我们使用了模板变量来自动填充部分参数值: - `{{LAST_SYNC_TIME|datetime}}`: 上次同步时间,用于填充`modified_begin`字段。 - `{{CURRENT_TIME|datetime}}`: 当前时间,用于填充`modified_end`字段。 这种自动填充机制确保了每次请求都能获取到最新的数据变化,提高了数据同步的实时性。 #### 条件过滤 在实际应用中,我们可能需要对返回的数据进行条件过滤。根据元数据配置中的条件部分,我们可以看到以下过滤条件: ```json "condition":[[{"field":"wms_co_id","logic":"in","value":"14132797,14133381,13090941"}]] ``` 这表示我们只需要获取特定仓库ID(如14132797, 14133381, 13090941)下的采购退货单信息。这种条件过滤机制能够有效减少无关数据的传输,提高查询效率。 #### 数据清洗与加工 在成功调用接口并获取到原始数据后,需要对数据进行清洗与加工,以便后续的数据转换与写入操作。在轻易云平台中,这一步通常包括以下几个步骤: 1. **字段映射**:将源系统返回的数据字段映射到目标系统所需的字段。例如,将聚水潭返回的`io_id`映射为目标系统中的相应字段。 2. **数据类型转换**:确保所有字段的数据类型符合目标系统要求。例如,将字符串类型的日期字段转换为日期对象。 3. **异常处理**:处理可能出现的数据异常,如空值、格式错误等,确保数据质量。 #### 实际案例 假设我们需要从聚水潭获取最近一天内所有特定仓库ID下的采购退货单信息,并将其导入到KIS系统中。具体实现步骤如下: 1. **设置请求参数**: ```json { "page_index": 1, "page_size": 30, "modified_begin": "{{LAST_SYNC_TIME|datetime}}", "modified_end": "{{CURRENT_TIME|datetime}}", "wms_co_id": "14132797,14133381,13090941" } ``` 2. **调用接口**: 使用POST方法向`/open/purchaseout/query`发送请求,并接收返回的数据。 3. **清洗与加工**: 对返回的数据进行字段映射、类型转换和异常处理。例如,将返回结果中的每个`io_id`对应到KIS系统中的相应字段,并确保日期格式正确。 4. **写入目标系统**: 将清洗后的数据通过轻易云平台写入到KIS系统中,完成整个数据集成过程。 通过上述步骤,我们能够高效地实现从聚水潭到KIS系统的采购退货单数据集成。这不仅提高了业务流程的自动化程度,也确保了数据的一致性和准确性。 ![如何对接企业微信API接口](https://pic.qeasy.cloud/S7.png~tplv-syqr462i7n-qeasy.image) ### 使用轻易云数据集成平台进行ETL转换和写入KIS私有云API接口的技术案例 在数据集成生命周期的第二步,我们需要将已经集成的源平台数据进行ETL转换,转为目标平台KIS私有云API接口所能够接收的格式,并最终写入目标平台。以下是一个详细的技术案例,展示如何使用轻易云数据集成平台完成这一过程。 #### API接口元数据配置解析 根据提供的元数据配置,我们需要调用KIS私有云的`/koas/app007104/api/purchasereceipt/create` API接口,通过POST方法提交采购退货单的数据。以下是该API接口的详细配置解析: - **API路径**: `/koas/app007104/api/purchasereceipt/create` - **请求方法**: `POST` - **主要字段**: - `AccountDB`: 固定值 "001" - `Object`: 包含`Head`和`Entry`两个主要部分 #### Head部分字段解析 `Head`部分包含采购退货单表头信息,具体字段如下: 1. **FBillerID (制单)**: 映射自 `{wms_co_id}`,通过映射关系将仓库管理系统中的公司ID转换为KIS系统中的制单人ID。 2. **FPOStyle (采购方式)**: 固定值 `252`。 3. **FSupplyID (供货机构)**: 通过 `_findCollection find FItemID from 65a6310d-c2c5-3b3a-be6d-1bbf6200165f where jstID={seller_id}` 从数据库中查找供货机构ID。 4. **FExplanation (摘要)**: 动态生成,格式为 "采购退货单{io_id}"。 5. **Fdate (日期)**: 使用 `_function REPLACE ('{{io_date|datetime}}',' ','T')` 将日期格式化为ISO标准。 6. **FDCStockID (仓库表头)**: 映射自 `{wms_co_id}-{wh_id}`。 7. **FFManagerID (销售)** 和 **FSManagerID (保管)**: 都映射自 `{wms_co_id}`。 8. **FROB (红蓝字)**: 固定值 `-1`。 #### Entry部分字段解析 `Entry`部分包含采购退货单表体信息,是一个数组,每个元素代表一个商品条目。具体字段如下: 1. **FItemID (产品代码)**: 使用 `_mongoQuery 30fa1b2b-6cfc-31c2-90a3-5a497b7812bd findField=content.FItemID where={"content.F_103":{"$eq":"{sku_id}"}}` 从MongoDB中查询产品代码。 2. **Fauxqty (实发数量)**: 使用 `_function -1*{qty}` 计算实际发货数量(取负值)。 3. **FAuxPrice (单价)**: 映射自 `{cost_price}`。 4. **FDCStockID (仓库表体)**: 同样映射自 `{wms_co_id}-{wh_id}`。 5. **FUnitID (单位)**: 使用 `_mongoQuery 30fa1b2b-6cfc-31c2-90a3-5a497b7812bd findField=content.FProductUnitID where={"content.F_103":{"$eq":"{sku_id}"}}` 查询单位信息。 6. **FPlanMode** 和 **decimal**: 分别固定值 `14036` 和 `1`。 7. **Famount (金额)**: 使用 `_function {qty}*{cost_price}` 计算金额。 8. **FSecCoefficient 和 FSecQty**: 都固定为 `1` 和 `_function -1*{qty}`。 #### 数据转换与写入流程 在轻易云数据集成平台上,我们可以通过以下步骤实现上述ETL转换和写入操作: 1. 配置源数据请求:从聚水潭系统获取采购退货单的数据。 2. 数据清洗与预处理:对获取的数据进行必要的清洗和预处理,以确保数据质量和一致性。 3. 数据转换:根据上述元数据配置,将源数据字段映射到目标API所需的字段格式,并执行必要的计算和查询操作。例如,将日期格式化、计算金额等。 4. 构建请求对象:按照目标API要求构建请求对象,包括Head和Entry部分的数据结构。 5. 调用API接口:使用HTTP POST方法调用KIS私有云API接口,将构建好的请求对象发送到目标系统。 #### 实际应用示例 假设我们从聚水潭系统获取到以下采购退货单数据: ```json { "wms_co_id": "1001", "seller_id": "2002", "io_id": "3003", "io_date": "2023-10-05 10:00:00", "wh_id": "4004", "items": [ { "sku_id": "5005", "qty": 10, "cost_price": 100 } ] } ``` 通过上述ETL转换过程,我们将其转换为如下目标API请求对象: ```json { "AccountDB": "001", "Object": { "Head": { "FBillerID": "1001", "FPOStyle": 252, "FSupplyID": "_findCollection find FItemID from ... where jstID=2002", "FExplanation": "采购退货单3003", "Fdate": "2023-10-05T10:00:00", "FDCStockID": "1001-4004", ... }, ... "Entry": [ { ... "FItemID": "_mongoQuery ... where={\"content.F_103\":{\"$eq\":\"5005\"}}", ... ... ... ... ... ... ... ... ... ... ... ... ... ![金蝶与SCM系统接口开发配置](https://pic.qeasy.cloud/T8.png~tplv-syqr462i7n-qeasy.image)