在轻易云平台上实现KIS私有云的ETL数据流程

  • 轻易云集成顾问-曹润
### 聚水潭数据集成到KIS私有云——高效对接的技术实现 在本次项目中,我们面临的一项关键任务是将聚水潭系统中的其他出库单数据高效、安全地集成到金蝶KIS私有云环境。通过利用轻易云数据集成平台,结合其卓越的数据处理能力和可视化管理工具,我们顺利地完成了此项复杂的数据对接需求。以下描述将详细探讨该方案实施过程中的主要技术要点与解决方案。 首先,为确保从聚水潭接口(/open/other/inout/query)高效抓取数据,并且将大批量的数据准确写入到KIS私有云(/koas/app007104/api/miscellaneousdelivery/create),我们设计了一套具有高度可靠性的定时抓取机制。这一机制不仅能及时获取新的业务单据,而且还配置了分页和限流处理,以应对API调用频率限制,避免请求被拒绝或漏单现象。 其次,通过使用轻易云提供的自定义数据转换逻辑功能,我们有效地处理了两大系统之间的各种格式差异。例如,对于日期时间、商品编码以及数量等字段进行相应的格式转换,使得这些字段能够无缝映射至KIS私有云所需的数据结构。同时,还基于具体业务场景,加入了必要的校验规则以提高数据质量,从根源上减少错误率。 为了增强整体方案的健壮性,我们充分利用了集中监控和告警系统。在每个任务运行节点上引入实时日志记录,并设立多级告警策略。一旦出现任何异常状况,例如网络故障或者API超时,系统会立即触发告警通知,同时执行自动重试机制来确保最终任务成功完成。此外,通过可视化的数据流设计工具,让整个数据流水线一目了然,更加便于后续维护及优化调整。 最后,在实现过程中,非常注重全流程的数据质量监控。从初始获取到最终写入,每个环节都设置了严格的验证与比对步骤,以确保所有传输、转化的信息都是准确无误且完整一致。这种全面而细致的方法,有效防止因错误导致的不良影响,大幅提升了整体集成效率和稳定性。 ![用友与WMS系统接口开发配置](https://pic.qeasy.cloud/D38.png~tplv-syqr462i7n-qeasy.image) ### 调用聚水潭接口/open/other/inout/query获取并加工数据的技术案例 在数据集成过程中,调用源系统接口是关键的一步。本文将深入探讨如何通过轻易云数据集成平台调用聚水潭接口`/open/other/inout/query`,并对获取的数据进行加工处理。 #### 接口调用配置 首先,我们需要配置接口调用的元数据。以下是元数据配置的详细信息: ```json { "api": "/open/other/inout/query", "effect": "QUERY", "method": "POST", "number": "io_id", "id": "io_id", "idCheck": true, "request": [ {"field":"modified_begin","label":"修改起始时间","type":"datetime","value":"{{MINUTE_AGO_20|datetime}}"}, {"field":"modified_end","label":"修改结束时间","type":"datetime","value":"{{CURRENT_TIME|datetime}}"}, {"field":"status","label":"单据状态","type":"string","value":"Confirmed"}, {"field":"page_index","label":"第几页","type":"string","value":"1"}, {"field":"page_size","label":"每页多少条","type":"string","value":"30"}, {"field":"date_type","label":"抓取时间类型","type":"string","describe":"0:修改时间,modified。 2:出入库时间 io_date,未传入时默认为0"} ], "autoFillResponse": true, "condition_bk": [ [{"field": "type", "logic": "notin", "value": "其它出库"}] ], "condition": [ [{"field": "so_id", "logic": "notlike", "value": "CHG"}, {"field": "wms_co_id", "logic": "in", "value": "14132797,14133381"}] ], "omissionRemedy": { "crontab": "2 */3 * * *", "takeOverRequest":[ {"field":"modified_begin","value":"_function FROM_UNIXTIME( unix_timestamp() -86400 , '%Y-%m-%d %H:%i:%s' )","type":"string","label":"接管字段"} ] } } ``` #### 请求参数详解 - `modified_begin` 和 `modified_end`:这两个字段用于指定查询的时间范围。`{{MINUTE_AGO_20|datetime}}`表示从当前时间前20分钟开始,`{{CURRENT_TIME|datetime}}`表示当前时间。 - `status`:单据状态,这里固定为"Confirmed"。 - `page_index` 和 `page_size`:用于分页查询,每次请求30条记录。 - `date_type`:抓取时间类型,默认为0(修改时间)。 #### 条件过滤 为了确保我们获取的数据符合业务需求,我们设置了两个条件过滤: 1. `condition_bk`: 排除类型为“其它出库”的记录。 2. `condition`: 包含两个子条件: - `so_id` 不包含“CHG”。 - `wms_co_id` 在指定的ID列表中(14132797,14133381)。 #### 数据请求与清洗 在实际操作中,通过轻易云平台发起POST请求到聚水潭接口,并根据上述配置获取数据。以下是一个示例请求体: ```json { "modified_begin": "{{MINUTE_AGO_20|datetime}}", "modified_end": "{{CURRENT_TIME|datetime}}", "status": "Confirmed", "page_index": 1, "page_size": 30, "date_type": 0 } ``` 返回的数据会自动填充到响应中,并根据条件进行过滤和清洗。例如,如果返回的数据中某些记录的类型为“其它出库”,这些记录将被排除在外。 #### 异常处理与补救机制 为了确保数据完整性,我们设置了异常处理和补救机制。当某次请求失败或遗漏时,系统会按照设定的crontab规则每三小时执行一次补救任务,并重新发起请求以弥补遗漏的数据。 ```json { "_function FROM_UNIXTIME( unix_timestamp() -86400 , '%Y-%m-%d %H:%i:%s' )" } ``` 该字段表示从当前时间往前推一天,以确保补救任务能够覆盖到所有可能遗漏的数据。 #### 数据转换与写入 在完成数据请求与清洗后,下一步是将清洗后的数据转换并写入目标系统。在这个案例中,我们将处理后的数据写入金蝶系统。这一步通常涉及数据格式转换、字段映射等操作,以确保目标系统能够正确识别和处理这些数据。 通过以上步骤,我们实现了从聚水潭接口获取并加工数据的全过程。这不仅提高了数据处理的效率,也确保了业务流程的顺畅运行。 ![打通用友BIP数据接口](https://pic.qeasy.cloud/S4.png~tplv-syqr462i7n-qeasy.image) ### 使用轻易云数据集成平台进行KIS私有云API接口的数据转换与写入 在数据集成生命周期的第二阶段,我们需要将已经从源平台(如聚水潭)获取并清洗后的数据,进行ETL(提取、转换、加载)处理,最终写入目标平台(如KIS私有云)。本文将详细介绍如何通过轻易云数据集成平台,将其他出库单数据转换为KIS私有云API接口所能接受的格式,并成功写入目标系统。 #### API接口配置 首先,我们需要了解目标平台KIS私有云的API接口配置。根据元数据配置文件,KIS私有云API接口路径为`/koas/app007104/api/miscellaneousdelivery/create`,请求方法为`POST`。以下是具体的请求参数配置: - `AccountDB`: 账户数据库,固定值为"001"。 - `Object`: 包含两个主要部分:`Head`和`Entry`。 #### Head部分字段映射 在Head部分,我们需要将源平台的数据映射到目标平台对应的字段。以下是具体字段及其映射关系: 1. **FBillNo**(单据编号):映射到源数据中的 `{io_id}`。 2. **Fdate**(日期):使用函数 `_function REPLACE ('{{io_date|datetime}}',' ','T')` 将日期格式转换为目标格式。 3. **FDeptID**(部门):固定值 "16921"。 4. **FBillTypeID**(入库类型):固定值 "1005"。 5. **FTranType**(事务类型):从源数据中获取相应值。 6. **FDCStockID**(仓库):由 `{wms_co_id}-{wh_id}` 组合而成,并通过映射关系转换。 7. **FManagerID**、**FSManagerID**、**FFManagerID**、**FBillerID**、**FEmpID**:这些字段均使用 `{wms_co_id}` 并通过不同的映射关系进行转换。 8. **FMarketingStyle**(业务类型):固定值 "12530"。 9. **FSaleStyle**(销售方式):固定值 "101"。 10. **FExplanation**(摘要):由字符串 "其他出库单{io_id}" 组成。 11. **FSupplyID**(购货单位/客户):固定值 "13643"。 #### Entry部分字段映射 Entry部分是一个数组,每个元素代表一条出库明细记录。以下是具体字段及其映射关系: 1. **FItemID**(产品代码):通过 `_mongoQuery` 查询获取,查询条件为 `{"content.F_103":{"$eq":"{sku_id}"}}`。 2. **Fauxqty**(实发数量):直接从源数据中的 `items.qty` 获取。 3. **FSecCoefficient**(换算率):固定值 "1"。 4. **FSecQty**(辅助数量):直接从源数据中的 `items.qty` 获取。 5. **Famount**(金额):从源数据中的 `{item_cost_price}` 获取。 6. **FDCStockID**(仓库):由 `{wms_co_id}-{wh_id}` 组合而成,并通过映射关系转换。 7. **FUnitID**(物料单位):通过 `_mongoQuery` 查询获取,查询条件为 `{"content.F_103":{"$eq":"{sku_id}"}}`。 8. **FMTONo**(计划跟踪号):固定值 "1"。 9. **FPlanMode**(计划跟踪模式):固定值 "14036"。 #### 数据转换与写入过程 在实际操作中,我们会利用轻易云提供的可视化界面,将上述配置逐步实现。以下是关键步骤: 1. **定义API请求结构:** 在轻易云平台上,根据元数据配置文件定义API请求结构,包括所有必要的字段和嵌套对象。 2. **字段映射与转换:** 使用轻易云的平台功能,将源数据中的字段与目标API请求结构中的字段进行映射,并应用必要的转换函数。例如,将日期格式转换为ISO 8601标准。 3. **测试与验证:** 在完成所有配置后,通过轻易云的平台功能进行测试,确保生成的请求符合目标API的要求,并能够成功写入目标系统。 4. **部署与监控:** 将配置好的集成流程部署到生产环境,并利用轻易云的平台功能实时监控数据流动和处理状态,确保整个过程顺利进行。 通过以上步骤,我们能够高效地将聚水潭等源平台的数据转换并写入到KIS私有云,实现不同系统间的数据无缝对接。这不仅提升了业务效率,还确保了数据的一致性和准确性。 ![如何对接用友BIP接口](https://pic.qeasy.cloud/T5.png~tplv-syqr462i7n-qeasy.image)