使用轻易云进行ETL转换并写入KIS私有云API技术解析

  • 轻易云集成顾问-吴伟
### 聚水潭数据集成到KIS私有云案例:聚水潭-盘盈——>KIS-其他入库Done 在本技术案例中,我们深入探讨了如何将聚水潭平台中的库存盘点数据高效、准确地集成到KIS私有云系统。通过利用轻易云数据集成平台的强大功能,实现跨系统的数据流动和处理。本次方案命名为“聚水潭-盘盈——>KIS-其他入库Done”。 首先,为理解整个流程,必须明确两个关键API接口: 1. **聚水潭获取数据的API**: `/open/inventory/count/query` 2. **KIS私有云写入数据的API**: `/koas/app007104/api/miscellaneousreceipt/create` #### 实现步骤概述 ##### 数据抓取与转换 为了确保我们从聚水潭获取的数据不遗漏,每隔一段固定时间,通过定时任务可靠地调用`/open/inventory/count/query`接口,抓取最新的库存盘盈数据。在这一过程中,需要特别注意分页和限流问题,以防止因一次请求过多而导致性能瓶颈。轻易云提供自定义的数据转换逻辑,可以针对不同业务需求对原始数据进行清洗和变换,使其符合目标系统格式。 ##### 高效批量写入 接下来,将经过处理后的批量库存盘盈数据快速写入到KIS私有云对应模块。这一步依赖于高吞吐量能力,我们调用`/koas/app007104/api/miscellaneousreceipt/create`接口实现批量操作。同时设置错误重试机制,确保即使出现网络抖动或服务异常,也能够自动重试并记录详细日志。 ##### 数据监控与告警 为了实时掌握每个环节的数据处理状态,全程设立了集中监控与告警系统。该系统不仅可以跟踪各个任务执行过程,还能及时检测出任何异常,例如接口超时或响应值不合法,并迅速发送告警通知相关负责人采取措施。 通过这样的技术方法,不仅提升了整体效率,还保证了跨平台的数据一致性和完整性,提高企业资源管理水平。 ![用友与MES系统接口开发配置](https://pic.qeasy.cloud/D3.png~tplv-syqr462i7n-qeasy.image) ### 调用聚水潭接口获取并加工数据的技术案例 在轻易云数据集成平台中,调用源系统接口是数据处理生命周期的第一步。本文将深入探讨如何通过调用聚水潭接口`/open/inventory/count/query`来获取并加工数据。 #### 接口概述 聚水潭提供的`/open/inventory/count/query`接口用于查询库存盘点信息。该接口采用POST方法,能够根据指定的条件返回符合要求的数据。以下是该接口的元数据配置: ```json { "api": "/open/inventory/count/query", "method": "POST", "number": "io_id", "id": "io_id", "request": [ { "field": "page_index", "label": "开始页码", "type": "string", "describe": "第几页,从第一页开始,默认1", "value": "1" }, { "field": "page_size", "label": "每页条数", "type": "string", "describe": "每页多少条,默认30,最大50", "value": "50" }, { "field": "modified_begin", "label": "修改开始时间", "type": "datetime", "describe": "修改起始时间,和结束时间必须同时存在,时间间隔不能超过七天,与线上单号不能同时为空", "value": "{{LAST_SYNC_TIME|datetime}}" }, { "field": "modified_end", "label": "修改结束时间", "type": "datetime", "describe": "修改结束时间,和起始时间必须同时存在,时间间隔不能超过七天,与线上单号不能同时为空", "value": "{{CURRENT_TIME|datetime}}" }, { "field": "status", "label": "单据状态", "type": "string", "describe":"单据状态,Confirmed=生效,WaitConfirm待审核,Creating=草拟,Archive=归档,Cancelled=作废", 'value': 'Confirmed' } ], 'condition': [ [ {'field': 'items.qty', 'logic': 'gt', 'value': '0'}, {'field': 'wms_co_id', 'logic': 'in', 'value': '14132797,14133381'} ] ] } ``` #### 请求参数详解 1. **page_index**: 开始页码,从第一页开始。默认值为1。 2. **page_size**: 每页条数。默认值为30,最大值为50。 3. **modified_begin**: 修改开始时间。与`modified_end`必须同时存在且间隔不超过七天。 4. **modified_end**: 修改结束时间。与`modified_begin`必须同时存在且间隔不超过七天。 5. **status**: 单据状态。我们选择了"Confirmed"表示已生效的单据。 #### 条件过滤 在请求参数中,我们还设置了两个条件过滤: - `items.qty > 0`: 表示库存数量大于零。 - `wms_co_id in (14132797, 14133381)`: 表示仓库ID在指定范围内。 #### 数据请求与清洗 在调用该接口时,我们需要注意以下几点: - 确保分页参数正确设置,以避免遗漏数据。 - 使用动态变量如`{{LAST_SYNC_TIME|datetime}}`和`{{CURRENT_TIME|datetime}}`来确保数据同步的时效性。 - 对返回的数据进行初步清洗,如去除无效记录、格式化日期等。 以下是一个Python示例代码,用于调用该接口并处理返回的数据: ```python import requests import datetime # 设置请求URL和头部信息 url = 'https://api.jushuitan.com/open/inventory/count/query' headers = {'Content-Type': 'application/json'} # 获取当前时间和上次同步时间 current_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') last_sync_time = (datetime.datetime.now() - datetime.timedelta(days=7)).strftime('%Y-%m-%d %H:%M:%S') # 构建请求体 payload = { 'page_index': 1, 'page_size': 50, 'modified_begin': last_sync_time, 'modified_end': current_time, 'status': 'Confirmed' } # 发起POST请求 response = requests.post(url, headers=headers, json=payload) # 检查响应状态码 if response.status_code == 200: data = response.json() # 数据清洗与处理 cleaned_data = [item for item in data['items'] if item['qty'] > 0 and item['wms_co_id'] in [14132797, 14133381]] # 打印清洗后的数据 print(cleaned_data) else: print(f'Error: {response.status_code}') ``` #### 数据转换与写入 在获取并清洗完数据后,我们需要将其转换为目标系统所需的格式,并写入到目标系统中。这一步通常包括字段映射、格式转换等操作。例如,将日期格式从字符串转换为目标系统所需的日期对象。 通过上述步骤,我们可以高效地完成从聚水潭到目标系统的数据集成。这不仅提高了数据处理的效率,还确保了数据的一致性和准确性。 ![泛微OA与ERP系统接口开发配置](https://pic.qeasy.cloud/S19.png~tplv-syqr462i7n-qeasy.image) ### 使用轻易云数据集成平台进行ETL转换并写入KIS私有云API接口的技术案例 在数据集成的生命周期中,第二步是将已集成的源平台数据进行ETL转换,并转为目标平台KIS私有云API接口所能接收的格式,最终写入目标平台。本文将详细探讨如何利用轻易云数据集成平台完成这一过程。 #### API接口配置与元数据解析 首先,我们需要理解目标API接口的元数据配置。以下是一个典型的KIS私有云API接口配置示例: ```json { "api": "/koas/app007104/api/miscellaneousreceipt/create", "effect": "EXECUTE", "method": "POST", "number": "id", "id": "id", "name": "id", "idCheck": true, "request": [ { "field": "AccountDB", "label": "AccountDB", "type": "string", "value": "001" }, { "field": "Object", "label": "Object", "type": "object", "describe": "暂无描述", ... } ] } ``` 该配置文件定义了API的基本信息,包括请求方法(POST)、API路径以及请求参数等。 #### 数据转换与映射 在ETL过程中,关键步骤之一是将源平台的数据字段映射到目标平台所需的字段。这通常涉及复杂的数据转换和清洗操作。以下是一些关键字段及其映射规则: 1. **单据编号 (FBillNo)** - 源数据字段:`io_id` - 转换规则:直接映射 - 示例:`"FBillNo":"{io_id}"` 2. **日期 (Fdate)** - 源数据字段:`io_date` - 转换规则:日期格式转换,将空格替换为'T' - 示例:`"Fdate":"_function REPLACE ('{{io_date|datetime}}',' ','T')"` 3. **部门 (FDeptID)** - 固定值:16921 - 示例:`"FDeptID":"16921"` 4. **仓库(表头)(FDCStockID)** - 源数据字段:`wms_co_id` 和 `wh_id` - 转换规则:组合两个字段值 - 示例:`"FDCStockID":"{wms_co_id}-{wh_id}"` 5. **责任人 (FManagerID)** - 源数据字段:`wms_co_id` - 转换规则:直接映射 - 示例:`"FManagerID":"{wms_co_id}"` 6. **产品代码 (Entry.FItemID)** - 源数据字段:`sku_id` - 转换规则:通过MongoDB查询获取对应值 - 示例: ```json { "field":"FItemID", ... "value":"_mongoQuery 30fa1b2b-6cfc-31c2-90a3-5a497b7812bd findField=content.FItemID where={\"content.F_103\":{\"$eq\":\"{sku_id}\"}}" } ``` #### 数据写入 一旦所有必要的数据都已转换并映射到正确的格式,我们就可以使用轻易云提供的API调用功能,将这些数据写入到KIS私有云系统中。 ```python import requests url = 'https://your-kis-api-endpoint/koas/app007104/api/miscellaneousreceipt/create' headers = {'Content-Type': 'application/json'} payload = { 'AccountDB': '001', 'Object': { 'Head': { 'FBillNo': '{io_id}', 'Fdate': '_function REPLACE ("{{io_date|datetime}}"," ","T")', 'FDeptID': '16921', ... }, 'Entry': [ { 'FItemID': '_mongoQuery ...', ... } ] } } response = requests.post(url, json=payload, headers=headers) if response.status_code == 200: print('Data successfully written to KIS') else: print(f'Failed to write data: {response.text}') ``` 通过上述代码示例,我们可以看到如何构建HTTP POST请求,将经过ETL处理的数据发送到KIS私有云API。 #### 总结 在本文中,我们深入探讨了如何利用轻易云数据集成平台完成ETL转换,并将处理后的数据写入到KIS私有云API接口。我们详细解析了目标API接口的元数据配置,并展示了如何进行字段映射和数据转换,最终实现了无缝的数据集成。 ![用友与WMS系统接口开发配置](https://pic.qeasy.cloud/T29.png~tplv-syqr462i7n-qeasy.image)