使用轻易云集成平台将旺店通仓库数据同步到BI泰海

  • 轻易云集成顾问-吕修远
### 旺店通·旗舰奇门仓库信息到MySQL的系统对接集成案例 在本次技术案例中,我们将探讨如何通过数据集成平台,将旺店通·旗舰奇门(以下简称“旺店通”)中的仓库信息实时集成到BI泰海的MySQL数据库表。为此,采用了轻易云高效的数据处理工具,实现对相关API接口数据的抓取、转换和写入。 此次集成方案命名为:**旺店通旗舰版-仓库信息-->BI泰海-仓库信息表**。具体实现包括以下几个关键步骤: 1. **调用旺店通接口获取数据**:我们从`wdt.setting.warehouse.querywarehouse` API获取最新的仓库配置信息。这是整个流程启动的基础环节,通过定时任务机制确保周期性地拉取最新数据。 2. **处理分页与限流问题**:鉴于旺店通API在大规模数据查询时可能存在分页限制和请求速率限制,我们设计了合理的分页策略,并配置限流控制,防止因频繁请求而触发服务端IP封禁或其他限制。 3. **自定义数据转换逻辑**:为了适应业务需求及目标MySQL表结构,对原始接口返回的数据进行必要的数据清洗与格式转换。这些操作通过轻易云可视化设计工具直观完成,不仅简化了复杂度,同时也提高了开发效率。 4. **批量高效写入MySQL**:借助轻易云支持的大容量、高吞吐量特性,将处理后的数据信息批量写入至BI泰海的`execute` API,以保障及时和可靠的数据存储。在这个过程中,还引入了异常捕获与重试机制,确保在各类网络波动或偶发错误下仍能稳定运行。 此外,为保证业务透明和系统稳定,该集成方案提供集中监控及告警功能。如若出现异常情况,无论是API调用失败还是数据库写入出错,都能第一时间发现并通知相关负责人员,从而实现快速响应和解决。 这一系列遵循标准化、模块化设计思路的方法,使得不同层级的信息传递显得格外顺畅且富有效率。同时,通过实时监控与日志记录,全程跟踪每个任务执行状态,更好地掌握整体运营健康状况。 ![打通用友BIP数据接口](https://pic.qeasy.cloud/D3.png~tplv-syqr462i7n-qeasy.image) ### 调用旺店通·旗舰奇门接口获取并加工数据 在数据集成的生命周期中,调用源系统接口是至关重要的一步。本文将深入探讨如何使用轻易云数据集成平台调用旺店通·旗舰奇门接口 `wdt.setting.warehouse.querywarehouse` 来获取仓库信息,并对数据进行初步加工。 #### 接口调用配置 首先,我们需要配置元数据以便正确调用 `wdt.setting.warehouse.querywarehouse` 接口。以下是该接口的元数据配置: ```json { "api": "wdt.setting.warehouse.querywarehouse", "method": "POST", "number": "{warehouse_no}", "id": "{warehouse_id}", "request": [ { "field": "pager", "label": "分页参数", "type": "object", "children": [ { "field": "page_size", "label": "分页大小", "type": "string", "value": "50", "parent": "pager" }, { "field": "page_no", "label": "页号", "type": "int", "value": 1, "parent": "pager" } ] }, { "field": "params", "label": "业务参数", "type": "object", "children": [ { "field": "start_time", "label": "开始时间", "type": "string", "\"value\": \"{{LAST_SYNC_TIME|datetime}}\"", "\"parent\": \"params\"" }, { "\"field\": \"end_time\"", "\"label\": \"结束时间\"", "\"type\": \"string\"", "\"value\": \"{{CURRENT_TIME|datetime}}\"", "\"parent\": \"params\"" } ] } ], "\"autoFillResponse\": true" } ``` #### 请求参数解析 在请求参数中,我们定义了两个主要部分:分页参数和业务参数。 1. **分页参数**: - `page_size`: 每页返回的数据条数,设置为50。 - `page_no`: 当前请求的页码,从1开始。 2. **业务参数**: - `start_time`: 数据同步的开始时间,使用上次同步时间(`{{LAST_SYNC_TIME|datetime}}`)。 - `end_time`: 数据同步的结束时间,使用当前时间(`{{CURRENT_TIME|datetime}}`)。 这些参数确保我们能够分页获取仓库信息,并且只获取指定时间范围内的数据。 #### 数据请求与清洗 在轻易云平台上,我们可以通过可视化界面配置上述元数据,并发起POST请求以获取仓库信息。由于平台支持全异步操作,我们可以同时处理多个请求,提高效率。 ```python import requests import json url = 'https://api.wangdian.cn/openapi2/wdt.setting.warehouse.querywarehouse' headers = {'Content-Type': 'application/json'} payload = { 'pager': { 'page_size': '50', 'page_no': 1 }, 'params': { 'start_time': '{{LAST_SYNC_TIME|datetime}}', 'end_time': '{{CURRENT_TIME|datetime}}' } } response = requests.post(url, headers=headers, data=json.dumps(payload)) data = response.json() ``` #### 数据转换与写入 一旦我们成功获取到数据,需要对其进行初步清洗和转换,以便后续写入目标系统。在这个过程中,可以利用轻易云平台的自动填充响应功能(`autoFillResponse: true`),简化数据处理步骤。 ```python # 假设data是从API响应中获取的数据 warehouses = data.get('warehouses', []) # 清洗和转换数据 cleaned_data = [] for warehouse in warehouses: cleaned_data.append({ 'warehouse_no': warehouse.get('number'), 'warehouse_id': warehouse.get('id'), # 添加其他必要字段 }) # 将清洗后的数据写入目标系统(例如BI泰海) write_to_target_system(cleaned_data) ``` #### 实时监控与调试 在整个过程中,轻易云平台提供了实时监控和调试工具,帮助我们及时发现并解决问题。例如,可以通过日志查看每次API调用的详细信息,包括请求和响应内容,从而快速定位问题所在。 综上所述,通过合理配置元数据并利用轻易云平台的强大功能,我们可以高效地调用旺店通·旗舰奇门接口,获取并加工仓库信息,为后续的数据集成打下坚实基础。 ![轻易云数据集成平台金蝶集成接口配置](https://pic.qeasy.cloud/S29.png~tplv-syqr462i7n-qeasy.image) ### 使用轻易云数据集成平台进行ETL转换并写入MySQL API接口 在数据集成生命周期的第二步,我们需要将已经从源平台(如旺店通旗舰版)获取的数据进行ETL转换,转为目标平台(如BI泰海)的MySQL API接口所能够接收的格式,并最终写入目标平台。以下是具体操作步骤和技术细节。 #### 数据请求与清洗 首先,我们从源平台获取仓库信息数据。这一步主要涉及对原始数据的请求和初步清洗,以确保数据质量和一致性。假设我们已经完成了这一步,并且获得了如下格式的原始数据: ```json { "warehouse_id": "WH001", "warehouse_no": "W001", "name": "主仓库", "zip": "100000", "address": "北京市朝阳区", "province": "北京市", "city": "北京市", "district": "朝阳区", "mobile": "13800000000", "remark": "", "type": "主仓库", "telno": "", "sub_type": "", "contact": "", "modified": "2023-10-01T12:00:00Z", "is_disabled": false, "created": "2023-01-01T08:00:00Z" } ``` #### 数据转换与写入 接下来,我们需要将上述原始数据转换为目标平台MySQL API接口所能接受的格式。根据元数据配置,以下是具体的字段映射和转换规则: ```json { "api":"execute", "effect":"EXECUTE", "method":"POST", "idCheck":true, "request":[ { "field":"main_params", ... ... } ], ... ... } ``` 具体字段映射如下: - `warehouse_id` -> `warehouse_id` - `warehouse_no` -> `warehouse_no` - `name` -> `name` - `zip` -> `zip` - `address` -> `address` - `province` -> `province` - `city` -> `city` - `district` -> `district` - `mobile` -> `mobile` - `remark` -> `remark` - `type` -> `type` - `telno` -> `telno` - `sub_type` -> `sub_type` - `contact` -> `contact` - 日期时间字段需要进行格式化: - `{modified|datetime}` 表示将修改时间格式化为目标平台接受的日期时间格式 - `{created|datetime}` 表示将创建时间格式化为目标平台接受的日期时间格式 - 布尔值字段需要进行转换: - `{is_disabled}` 转换为字符串类型 #### SQL语句构建 根据配置中的SQL语句模板,我们需要构建实际执行的SQL语句: ```sql REPLACE INTO setting_Warehouse_queryWarehouse ( warehouse_id, warehouse_no, name, zip, address, province, city, district, mobile, remark, type, telno, sub_type, contact, modified, is_disabled, created ) VALUES ( :warehouse_id, :warehouse_no, :name, :zip, :address, :province, :city, :district,:mobile, :remark, :type, :telno, :sub_type, :contact, :modified, :is_disabled, :created ); ``` 替换占位符后的实际SQL语句如下: ```sql REPLACE INTO setting_Warehouse_queryWarehouse ( warehouse_id, warehouse_no, name, zip, address, province, city,district,mobile ,remark,type,telno ,sub_type ,contact , modified,is_disabled ,created ) VALUES ( 'WH001', 'W001', '主仓库', '100000', '北京市朝阳区', '北京市', '北京市', '朝阳区','13800000000','', '主仓库','','','', '2023-10-01T12:00:00Z','false','2023-01-01T08:00:00Z' ); ``` #### API请求示例 最后,我们通过HTTP POST方法将上述SQL语句发送到目标平台MySQL API接口。以下是一个简化的API请求示例: ```http POST /execute HTTP/1.1 Host: api.targetplatform.com Content-Type: application/json { ... ... } ``` 通过以上步骤,我们成功地将源平台的数据经过ETL转换后写入了目标平台。这一过程不仅保证了数据的一致性和完整性,还提升了系统间的数据交互效率。 ![打通金蝶云星空数据接口](https://pic.qeasy.cloud/T2.png~tplv-syqr462i7n-qeasy.image)