通过轻易云实现旺店通到BI泰海的无缝数据传输

  • 轻易云集成顾问-姚缘
### 旺店通·旗舰版-出库瞬时成本查询-->BI泰海-出库瞬时成本表数据集成案例解析 在实际业务处理中,及时准确的数据分析是企业保持竞争力的关键。而实现这种高效性和精确性的基础,是可靠的数据集成解决方案。本文将深入探讨如何利用轻易云数据集成平台,将旺店通·旗舰版中的出库瞬时成本数据无缝对接到MySQL数据库中,为后续的BI分析提供精准的数据支撑。 本次案例以具体的接口调用为核心,涵盖了从API获取、处理、转换,到最终写入MySQL数据库的全过程。首先,我们通过调用`statistic.StockoutCollect.queryCostWithDetail`接口,从旺店通·旗舰版系统中提取出库瞬时成本详细信息。这一步骤不仅要保证数据抓取的全面性,还需处理好分页与限流问题,以防止因大量请求导致系统性能下降。 其次,对抓取上来的原始数据进行必要的清洗与转换,使其符合MySQL数据库所需要求。这其中涉及自定义的数据转换逻辑,以适应特定业务需求并处理两者间可能存在的数据格式差异。此外,我们还会讨论如何利用集中监控和告警系统实时跟踪每个步骤,不仅确保任务顺利完成,同时能够快速响应任何异常情况。 最后,通过批量写入方式将处理后的数据高效地导入到MySQL数据库中。在此过程中,需要特别注意事务控制以及错误重试机制,以保证即使在发生网络波动或其他不可预见问题时,也能确保不漏单、不重复,实现高度可靠的数据传输。 以下章节将逐步展开上述各个环节,详细介绍技术实施过程及实践经验,为您提供一个完整且可操作的参考范例。 ![如何对接企业微信API接口](https://pic.qeasy.cloud/D1.png~tplv-syqr462i7n-qeasy.image) ### 调用旺店通·旗舰版接口获取并加工数据 在数据集成生命周期的第一步中,我们需要调用源系统旺店通·旗舰版的接口 `statistic.StockoutCollect.queryCostWithDetail`,以获取出库瞬时成本数据,并对其进行初步加工。以下将详细探讨该接口的技术细节及其在数据集成中的应用。 #### 接口概述 接口 `statistic.StockoutCollect.queryCostWithDetail` 主要用于查询出库单的详细成本信息。该接口采用 POST 方法进行请求,支持多种查询参数,如时间范围、仓库编号和出库单号等。 #### 元数据配置解析 根据提供的元数据配置,我们可以看到该接口的具体参数设置如下: - **请求方式**:POST - **API路径**:`statistic.StockoutCollect.queryCostWithDetail` - **查询参数**: - `params` 对象包含以下字段: - `start_time`:起始时间(发货时间),若无出库单号则为必填。 - `end_time`:结束时间(发货时间)。 - `warehouse_no`:仓库编号。 - `stockout_no`:出库单号,多个出库单号之间英文逗号分隔。 - `pager` 对象包含以下字段: - `page_size`:分页大小,默认值为200。 - `page_no`:页号,从0开始。 #### 请求参数构建 为了实现自动化的数据请求,我们需要动态生成查询参数。以下是一个示例请求体: ```json { "params": { "start_time": "{{LAST_SYNC_TIME|datetime}}", "end_time": "{{CURRENT_TIME|datetime}}", "warehouse_no": "WH001", "stockout_no": "" }, "pager": { "page_size": "200", "page_no": "0" } } ``` 在实际应用中,`start_time` 和 `end_time` 会根据上次同步时间和当前时间动态填充,而其他参数如 `warehouse_no` 和 `stockout_no` 则根据具体业务需求进行设置。 #### 数据清洗与转换 从接口获取的数据通常包含多个层级的嵌套结构,为了便于后续处理,我们需要对数据进行清洗和转换。元数据配置中的 `beatFlat` 参数指定了需要扁平化处理的字段,例如: ```json "beatFlat": ["detail_list"] ``` 这意味着返回结果中的 `detail_list` 字段会被展开到顶层结构中,以简化数据处理流程。 #### 实际案例分析 假设我们从接口获取到以下原始响应数据: ```json { "code": 200, "data": { "total_count": 1, "detail_list": [ { "stockout_id": "12345", "stockout_no": "SO20231001", "warehouse_no": "WH001", "cost_detail": [ { "item_id": "A001", "cost_price": 100.0, "quantity": 10 }, { "item_id": "A002", "cost_price": 200.0, "quantity": 5 } ] } ] } } ``` 经过扁平化处理后的数据结构如下: ```json [ { "stockout_id": "12345", "stockout_no": "SO20231001", "warehouse_no": "WH001", "item_id": "A001", "cost_price": 100.0, "quantity": 10 }, { "stockout_id": "12345", "stockout_no": "SO20231001", "warehouse_no": "WH001", "item_id": "A002", "cost_price": 200.0, "quantity": 5 } ] ``` 这种扁平化后的结构便于后续的数据存储和分析处理。 #### 延迟与重试机制 为了确保数据请求的稳定性,元数据配置中还设置了延迟和重试机制。例如: ```json "delay": 300 ``` 这表示每次请求之间会有300毫秒的延迟,以避免对源系统造成过大的压力。此外,还可以根据业务需求配置重试次数和间隔,以提高数据获取的成功率。 通过上述步骤,我们完成了从调用源系统接口到初步加工数据的全过程,为后续的数据转换与写入打下了坚实基础。在实际操作中,可以根据具体业务场景灵活调整参数设置和处理逻辑,以实现最佳的数据集成效果。 ![如何开发用友BIP接口](https://pic.qeasy.cloud/S21.png~tplv-syqr462i7n-qeasy.image) ### 使用轻易云数据集成平台进行ETL转换并写入MySQLAPI接口 在轻易云数据集成平台中,ETL(Extract, Transform, Load)过程的第二步是将已经集成的源平台数据进行转换,并最终写入目标平台。在本案例中,我们将从旺店通旗舰版获取出库瞬时成本数据,并通过ETL转换后,写入BI泰海的MySQL数据库。 #### 元数据配置解析 元数据配置是ETL过程中至关重要的一部分,它定义了如何从源系统提取数据、如何转换数据以及如何将其加载到目标系统。以下是我们使用的元数据配置示例: ```json { "api": "batchexecute", "effect": "EXECUTE", "method": "POST", "idCheck": true, "request": [ {"field":"stockout_id","label":"出库单id","type":"string","value":"{stockout_id}"}, {"field":"stockout_no","label":"出库单号","type":"string","value":"{stockout_no}"}, ... {"field":"detail_list_stockin_no","label":"入库单号","type":"string","value":"{detail_list_stockin_no}"} ], "otherRequest": [ { "field": "main_sql", "label": "主语句", "type": "string", "describe": "111", "value": "REPLACE INTO stockoutcollect_querycostwithdetail (stockout_id,stockout_no,warehouse_id,warehouse_no,warehouse_name,src_order_type,src_order_no,status,employee_name,logistics_id,logistics_name,logistics_no,weigh_post_cost,goods_count,goods_type_count,remark,created,consign_time,custom_type,note_count,checked_goods_total_cost,planned_goods_total_cost,total_cost,modified,detail_list_rec_id,detail_list_stockout_id,detail_list_num,detail_list_expire_date,detail_list_goods_name,detail_list_goods_no,detail_list_short_name,detail_list_spec_no,detail_list_spec_id,detail_list_spec_name,detail_list_spec_code,detail_list_barcode,detail_list_defect,detail_list_checked_cost_price,total_checked_cost_price,total_planned_cost,list_position_no,list_batch_no,list_stockin_no) VALUES" }, { "field": "limit", "label": "limit", "type": "string", "describe": "111", "value": "1000" } ] } ``` #### 数据请求与清洗 在ETL过程的第一步,我们需要从源系统(旺店通旗舰版)提取出库瞬时成本相关的数据。根据元数据配置中的`request`字段,我们可以看到需要提取的数据字段及其对应的标签和类型。例如: - `stockout_id`:出库单id - `warehouse_id`:仓库id - `logistics_name`:物流公司名称 - ... 这些字段会被映射到相应的数据值,从而确保我们提取的数据结构完整且准确。 #### 数据转换 在完成数据提取后,下一步是对数据进行转换,以符合目标系统(BI泰海MySQL数据库)的要求。转换过程主要包括以下几部分: 1. **字段映射**:根据元数据中的`request`字段,将源系统中的字段映射到目标系统所需的字段。 2. **格式转换**:如果源系统和目标系统对某些字段的数据格式要求不同,需要进行格式转换。例如,将日期格式从YYYY-MM-DD转换为YYYYMMDD。 3. **业务逻辑处理**:根据业务需求,对某些字段进行计算或处理。例如,计算总成本或合并多个字段的信息。 #### 数据写入 最后一步是将转换后的数据写入目标系统。在我们的案例中,目标系统是BI泰海的MySQL数据库。根据元数据配置中的`otherRequest`部分,我们使用了一个SQL语句模板: ```sql REPLACE INTO stockoutcollect_querycostwithdetail (stockout_id,...) VALUES (...); ``` 这个SQL语句模板会被动态填充实际的数据值,然后通过API接口发送到MySQL数据库。具体实现步骤如下: 1. **构建SQL语句**:根据每条记录的数据值,填充SQL语句模板。 2. **批量执行**:为了提高效率,我们可以采用批量执行的方式,一次性插入多条记录。 3. **错误处理**:在执行过程中,如果遇到错误,需要进行相应的错误处理和日志记录,以便后续排查问题。 #### API接口调用 调用API接口时,我们需要注意以下几点: 1. **请求方法**:根据元数据配置,使用POST方法。 2. **请求路径**:调用轻易云提供的API路径,例如`/batchexecute`。 3. **请求参数**:包括构建好的SQL语句和其他必要参数,例如批量限制(limit)。 以下是一个示例代码片段,用于调用API接口: ```python import requests url = 'https://api.qingyiyun.com/batchexecute' headers = {'Content-Type': 'application/json'} payload = { 'main_sql': constructed_sql, 'limit': 1000 } response = requests.post(url=url, headers=headers, json=payload) if response.status_code == 200: print('Data successfully written to MySQL.') else: print('Failed to write data:', response.text) ``` 通过以上步骤,我们实现了从旺店通旗舰版获取出库瞬时成本数据,并通过ETL转换后成功写入BI泰海的MySQL数据库。这一过程充分利用了轻易云平台提供的全生命周期管理功能,实现了高效、透明的数据集成。 ![打通企业微信数据接口](https://pic.qeasy.cloud/T21.png~tplv-syqr462i7n-qeasy.image)