ETL流程在轻易云平台中的应用:从数据提取到写入

  • 轻易云集成顾问-孙传友
### 旺店通·企业奇门数据集成到轻易云集成平台——采购订单查询案例 在本篇技术文章中,我们将深入探讨如何利用轻易云集成平台实现对旺店通·企业奇门系统的采购订单查询接口(wdt.purchase.order.query)的高效对接。该方案旨在确保数据从获取到处理再到最终写入的全流程都达到最佳性能,避免任何数据遗漏和异常情况的发生。 首先,我们着眼于系统的高吞吐量数据写入能力,这对于大量采购订单信息能够快速被接收并存储至关重要。通过优化的数据流设计,我们能确保每一笔订单及时进入相应的数据仓库,极大提升了处理时效性。此外,为了适应业务需求及多变的数据结构,自定义数据转换逻辑将使得我们能够灵活地处理不同格式和类型的数据。 另一关键特性是集中监控与告警系统。在本次实施中,通过实时跟踪各个任务状态和性能指标,任何异常问题可以第一时间被发现并进行干预。这不仅提高了整体运行效率,更增强了整个过程的透明度和可控性。 对于Pagination分页及限流策略,在调用旺店通API时尤为重要。由于接口返回的大批量数据通常需要分步取回,因此我们设计了一套可靠的策略来保证每次请求都准确无误,同时避免因频繁调用导致API服务端受到过大压力。这是保证持续、稳定获取大量采购订单信息的重要环节之一。 最后,对接过程中不可忽视的一点是两者间的数据格式差异。借助轻易云集成平台定制化的数据映射功能,使得原始JSON或XML等格式能顺利转换成为目标数据库所需样式,从而保障后续分析和应用的一致性。同时,引入错误重试机制,对可能出现的数据传输失败进行自动识别并重新尝试,进一步完善了系统健壮性。 以上这些技术要点构建起一个完整且高效的解决方案框架,为后续详细解析打下坚实基础。在下文中,将会具体介绍该解决方案中的各个步骤及其实现方式,包括但不限于:接口调用示例、分页限流配置以及实际操作中的注意事项等。 ![金蝶云星空API接口配置](https://pic.qeasy.cloud/D11.png~tplv-syqr462i7n-qeasy.image) ### 调用旺店通·企业奇门接口获取采购订单数据的技术案例 在数据集成生命周期的第一步中,调用源系统接口并获取数据是至关重要的一环。本文将详细探讨如何通过轻易云数据集成平台调用旺店通·企业奇门接口`wdt.purchase.order.query`,并对获取的数据进行初步加工。 #### 接口调用配置 首先,我们需要配置元数据以调用`wdt.purchase.order.query`接口。该接口主要用于查询采购订单信息,支持多种查询条件和分页功能。以下是元数据配置的详细说明: ```json { "api": "wdt.purchase.order.query", "effect": "QUERY", "method": "POST", "number": "purchase_no", "id": "purchase_id", "idCheck": true, "request": [ {"field": "start_time", "label": "start_time", "type": "string", "describe": "111", "value": "{{DAYS_AGO_1|datetime}}"}, {"field": "outer_no", "label": "outer_no", "type": "string", "describe": "外部创建采购单推送的单号,传该字段可以不传开始时间和结束时间"}, {"field": "purchase_no", "label": "purchase_no", "type": "string", "describe": "ERP系统采购单编号,传该字段可以不传开始时间和结束时间"}, {"field": "status", "label": "status", "type": "string", "describe":"10 已取消,20 编辑中,30 待审核,35 待财审,40 已审核,43 待推送,45 推送失败,48 待收货,50 部分到货,60已到货,70 待结算,80 部分结算,90 已完成(不传默认查询全部状态采购单)","value":"40"}, {"field":"warehouse_no","label":"warehouse_no","type":"string","describe":"111"}, {"field":"end_time","label":"end_time","type":"string","describe":"111","value":"{{CURRENT_TIME|datetime}}"} ], ... } ``` #### 请求参数详解 1. **start_time**: 查询开始时间,默认值为一天前的日期时间。 2. **outer_no**: 外部创建采购单推送的单号,可选参数。 3. **purchase_no**: ERP系统采购单编号,可选参数。 4. **status**: 采购单状态,此处默认值为40(已审核)。 5. **warehouse_no**: 仓库编号,可选参数。 6. **end_time**: 查询结束时间,默认值为当前日期时间。 此外,还有分页相关的参数: - **page_size**: 每页返回的数据条数,默认为40。 - **page_no**: 页码,从0页开始。 #### 数据请求与清洗 在配置好元数据后,通过轻易云平台发起POST请求获取采购订单数据。由于平台支持全异步操作,可以有效提高数据请求效率。 ```python import requests import json from datetime import datetime, timedelta # 设置请求URL和头信息 url = 'https://api.wangdian.cn/openapi2/wdt.purchase.order.query' headers = {'Content-Type': 'application/json'} # 构建请求体 payload = { 'start_time': (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d %H:%M:%S'), 'end_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), 'status': '40', 'page_size': '40', 'page_no': '0' } # 发起POST请求 response = requests.post(url, headers=headers, data=json.dumps(payload)) # 检查响应状态并处理数据 if response.status_code == 200: data = response.json() # 数据清洗与初步加工 orders = data.get('orders', []) for order in orders: # 示例:提取并打印每个订单的ID和状态 print(f"Order ID: {order['purchase_id']}, Status: {order['status']}") else: print(f"Failed to fetch data: {response.status_code}") ``` #### 数据转换与写入 在获取并清洗完数据后,可以根据业务需求进行进一步的数据转换和写入操作。例如,将处理后的数据写入目标数据库或其他存储系统。 ```python import sqlite3 # 创建SQLite数据库连接 conn = sqlite3.connect('orders.db') cursor = conn.cursor() # 创建表结构(如未存在) cursor.execute(''' CREATE TABLE IF NOT EXISTS purchase_orders ( purchase_id TEXT PRIMARY KEY, status TEXT, order_data TEXT ) ''') # 插入或更新订单数据 for order in orders: cursor.execute(''' INSERT OR REPLACE INTO purchase_orders (purchase_id, status, order_data) VALUES (?, ?, ?) ''', (order['purchase_id'], order['status'], json.dumps(order))) # 提交事务并关闭连接 conn.commit() conn.close() ``` 通过上述步骤,我们实现了从旺店通·企业奇门接口获取采购订单数据,并对其进行初步清洗、转换和写入数据库的全过程。这一过程不仅确保了数据的一致性和完整性,也为后续的数据分析和业务决策提供了坚实基础。 ![金蝶与WMS系统接口开发配置](https://pic.qeasy.cloud/S25.png~tplv-syqr462i7n-qeasy.image) ### 轻易云数据集成平台生命周期第二步:ETL转换与数据写入 在轻易云数据集成平台的生命周期中,ETL(Extract, Transform, Load)转换是一个关键环节。本文将重点探讨如何将已经集成的源平台数据进行ETL转换,转为目标平台轻易云集成平台API接口所能够接收的格式,并最终写入目标平台。 #### 数据提取与清洗 在进行ETL转换之前,首先需要从源系统中提取数据,并对其进行清洗。假设我们从旺店通系统中提取采购订单数据,这些数据可能包含各种不一致或冗余的信息,因此需要进行预处理。例如: ```json { "order_id": "12345", "supplier": "供应商A", "order_date": "2023-10-01", "items": [ {"item_id": "001", "quantity": 10, "price": 100}, {"item_id": "002", "quantity": 5, "price": 200} ] } ``` 在这个阶段,我们需要确保所有字段都符合目标系统的要求,例如日期格式统一、字段名称规范化等。 #### 数据转换 一旦数据被清洗,我们就可以开始进行数据转换。此过程包括将源系统的数据结构和格式转换为目标系统所能接受的形式。在本案例中,我们需要将采购订单的数据转为轻易云集成平台API接口所能接收的格式。 根据元数据配置,我们知道目标API接口的配置如下: ```json { "api":"写入空操作", "effect":"EXECUTE", "method":"POST", "idCheck":true } ``` 这意味着我们需要构建一个POST请求,并确保请求体中的数据格式正确,同时要注意是否需要进行ID检查。 例如,假设目标API接口期望的数据格式如下: ```json { "operation": "insert", "data": { "orderId": "", "supplierName": "", "orderDate": "", "itemsList": [] } } ``` 我们需要将原始数据转换为上述格式: ```json { "operation": "insert", "data": { "orderId": "12345", "supplierName": "供应商A", "orderDate": "2023-10-01", "itemsList": [ {"itemId": "001", "quantity": 10, "pricePerUnit": 100}, {"itemId": "002", "quantity": 5, "pricePerUnit": 200} ] } } ``` #### 数据写入 完成数据转换后,我们可以使用HTTP POST方法将数据写入到轻易云集成平台。以下是一个示例代码片段,展示如何通过API接口实现这一过程: ```python import requests import json # 构建请求头 headers = { 'Content-Type': 'application/json' } # 构建请求体 payload = { 'operation': 'insert', 'data': { 'orderId': '12345', 'supplierName': '供应商A', 'orderDate': '2023-10-01', 'itemsList': [ {'itemId': '001', 'quantity': 10, 'pricePerUnit': 100}, {'itemId': '002', 'quantity': 5, 'pricePerUnit': 200} ] } } # 发起POST请求 response = requests.post('https://api.qingyiyun.com/execute', headers=headers, data=json.dumps(payload)) # 检查响应状态码和内容 if response.status_code == 200: print('Data written successfully') else: print('Failed to write data:', response.text) ``` 在这个过程中,我们使用Python的requests库来发送HTTP POST请求,将经过转换的数据写入到目标系统。 #### 总结 通过以上步骤,我们实现了从源系统提取采购订单数据、进行清洗和转换,并最终通过API接口将其写入到轻易云集成平台。这一过程不仅确保了数据的一致性和完整性,还提高了系统间的数据交互效率。通过合理配置元数据和API接口,可以大幅简化复杂的数据集成任务,为业务流程提供有力支持。 ![用友与外部系统接口集成开发](https://pic.qeasy.cloud/T7.png~tplv-syqr462i7n-qeasy.image)