从API调用到数据库写入:实现销售订单数据集成的全流程解析

  • 轻易云集成顾问-卢剑航
### 旺店通·旗舰奇门数据集成到MySQL的技术实现 在这个案例中,我们重点探讨如何通过轻易云数据集成平台,将旺店通·旗舰奇门的数据高效集成到MySQL数据库中,实现销售订单信息的及时、准确汇总和查询。具体方案名称为“旺店通旗舰版-销售订单-->BI泰海-销售订单表_原始查询(2024年起)”。 首先,确保从旺店通·旗舰奇门系统获取销售订单数据的API是`wdt.sales.tradequery.querywithdetail`。我们需要处理接口返回的大量分页数据,并将其稳定地写入MySQL数据库。为了保证任务运行的可靠性与性能,我们采用了以下几项技术手段: 1. **高吞吐量的数据写入能力**:我们利用批量执行API(batchexecute)来尽可能减少单个请求操作次数,从而提升整体吞吐率,使得大量销售信息能够迅速被接收并存储至MySQL。 2. **实时监控与告警**:集中化的监控和告警系统实时跟踪每一个数据集成任务的状态及性能表现。这不仅有助于快速发现问题,也方便运维人员进行调优或故障排除。 3. **自定义转换逻辑**:不同于其他常规的数据对接过程,这次项目特别之处在于开发了适配特定业务需求的数据转换逻辑,以解决源系统与目标数据库间存在的信息结构差异,比如字段映射、格式转换等。 4. **分布式任务调度与异常重试机制**:针对接口限流和网络不稳定等不可预知的问题,我们引入了分布式任务调度机制,并设计了完备的错误重试策略,确保任何一笔交易都不会漏单,同时避免重复提交导致歧义或冲突。 这种全面且精细化的方法,在实际应用中效果显著,有效保障了日常运营过程中数百万级别订销付流水精准无误地融合进各类业务分析场景,为企业决策提供了一手可信赖的数据支撑。在接下来的章节里,我们将对关键技术配置展开剖析,包括分页处理策略、自定义映射规则实现以及详尽性能测试报告等多方面内容。 ![用友与WMS系统接口开发配置](https://pic.qeasy.cloud/D25.png~tplv-syqr462i7n-qeasy.image) ### 调用旺店通·旗舰奇门接口wdt.sales.tradequery.querywithdetail获取并加工数据 在数据集成生命周期的第一步中,调用源系统接口获取数据是至关重要的一环。本文将详细探讨如何通过轻易云数据集成平台调用旺店通·旗舰奇门接口`wdt.sales.tradequery.querywithdetail`,并对获取的数据进行初步加工。 #### 接口调用配置 首先,我们需要配置接口调用的元数据。根据提供的元数据配置,我们可以看到该接口使用POST方法,并且需要传递分页参数和业务参数。 ```json { "api": "wdt.sales.tradequery.querywithdetail", "method": "POST", "number": "trade_no", "id": "trade_id", "request": [ { "field": "pager", "label": "分页参数", "type": "object", "children": [ { "field": "page_size", "label": "分页大小", "type": "string", "value": "50", "parent": "pager" }, { "field": "page_no", "label": "页号", "type": "string", "value": "1", "parent": "pager" } ] }, { "field": "params", "label": "业务参数", "type": "object", "children": [ { "field": "start_time", "label": "开始时间", "type": "string", "describe":"起始修改时间", // 动态值 // 使用{{LAST_SYNC_TIME|datetime}}占位符表示上次同步时间 // 使用{{CURRENT_TIME|datetime}}占位符表示当前时间 // 平台会自动替换为实际值 // 示例:2023-10-01T00:00:00Z // 示例:2023-10-02T00:00:00Z ``` #### 数据请求与清洗 在配置好接口元数据后,我们可以开始进行数据请求。这里我们利用轻易云平台的全异步特性,确保在高效获取数据的同时,不会阻塞其他任务的执行。 1. **分页处理**:由于每次请求只能返回有限数量的数据(如每页50条),我们需要通过循环请求来获取所有符合条件的数据。 2. **时间参数**:通过动态设置`start_time`和`end_time`,确保每次请求的数据范围准确无误。 示例代码如下: ```python import requests import datetime # 设置初始分页参数和时间参数 page_no = 1 page_size = 50 last_sync_time = datetime.datetime.now() - datetime.timedelta(days=1) current_time = datetime.datetime.now() while True: payload = { 'pager': { 'page_size': page_size, 'page_no': page_no, }, 'params': { 'start_time': last_sync_time.strftime('%Y-%m-%dT%H:%M:%SZ'), 'end_time': current_time.strftime('%Y-%m-%dT%H:%M:%SZ') } } response = requests.post('https://api.wangdian.cn/openapi2/wdt.sales.tradequery.querywithdetail', json=payload) data = response.json() # 数据处理逻辑,如清洗、转换等 process_data(data) if len(data['trades']) < page_size: break page_no += 1 ``` #### 数据转换与写入 在获取到原始数据后,我们需要对其进行清洗和转换,以便写入目标系统。在这个过程中,可以利用轻易云平台提供的自动填充响应功能(autoFillResponse),简化字段映射和转换操作。 例如,将嵌套的`detail_list`字段展平: ```python def process_data(data): for trade in data['trades']: trade_id = trade['trade_id'] trade_no = trade['trade_no'] for detail in trade['detail_list']: # 将嵌套字段展平,并进行必要的转换处理 flat_record = { 'trade_id': trade_id, 'trade_no': trade_no, 'item_id': detail['item_id'], 'quantity': detail['quantity'], # 更多字段... } # 写入目标系统或存储到中间数据库中 write_to_target_system(flat_record) ``` 通过上述步骤,我们能够高效地从旺店通·旗舰奇门接口获取销售订单数据,并进行必要的清洗和转换,为后续的数据分析和业务决策提供可靠的数据基础。 ![用友与外部系统接口集成开发](https://pic.qeasy.cloud/S4.png~tplv-syqr462i7n-qeasy.image) ### 数据集成与ETL转换:将销售订单数据写入MySQL API接口 在轻易云数据集成平台的生命周期中,数据转换与写入是关键步骤之一。本文将详细探讨如何将已集成的源平台数据通过ETL转换,转为目标平台MySQL API接口所能够接收的格式,并最终写入目标平台。 #### 元数据配置解析 在数据转换过程中,元数据配置起到了至关重要的作用。以下是一个典型的元数据配置示例: ```json { "api": "batchexecute", "effect": "EXECUTE", "method": "SQL", "number": "id", "id": "id", "name": "id", "idCheck": true, "request": [ {"field":"trade_id","label":"订单唯一键","type":"string","value":"{trade_id}"}, {"field":"trade_no","label":"订单编号","type":"string","value":"{trade_no}"}, // 省略部分字段... {"field":"detail_list_invoice_content","label":"发票内容","type":"string","value":"{detail_list_invoice_content}"} ], "otherRequest": [ { "field": "main_sql", "label": "主语句", "type": "string", "describe": "SQL首次执行的语句,将会返回:lastInsertId", "value": "REPLACE INTO sales_tradequery_querywithdetail (trade_id, trade_no, platform_id, warehouse_type, src_tids, pay_account, trade_status, trade_type, delivery_term, receiver_ring, freeze_reason, refund_status, fenxiao_type, fenxiao_nick, trade_time, pay_time, consign_time, buyer_nick, receiver_name, receiver_province, receiver_city, receiver_district, receiver_address, receiver_mobile, receiver_telno, receiver_zip, receiver_area, receiver_dtb, bad_reason, logistics_no,buyer_message ,cs_remark ,remark_flag ,print_remark ,goods_type_count ,goods_count ,goods_amount ,post_amount ,other_amount ,discount ,receivable ,cod_amount ,ext_cod_fee ,goods_cost ,post_cost ,weight ,profit,tax,tax_rate ,commission ,invoice_type ,invoice_title ,invoice_content,salesman_name ,checker_name,fchecker_name ,checkouter_name ,stockout_no ,flag_name ,trade_from,single_spec_no raw_goods_count raw_goods_type_count,currency invoice_id version_id modified created check_time id_card_type shop_no shop_name shop_remark warehouse_no customer_name customer_no logistics_name logistics_code logistics_type_name to_deliver_time delay_to_time estimate_consign_time shop_id warehouse_id volume trade_label trade_mask shop_platform_id sub_platform_id package_name package_id paid large_type gift_mask customer_id other_cost is_sealed customer_type logistics_id cancel_reason revert_reason new_trade_label fenxiao_tid detail_list_trade_id detail_list_rec_id detail_list_platform_id detail_list_src_oid detail_list_src_tid detail_list_gift_type detail_list_pay_status detail_list_refund_status detail_list_guarantee_mode detail_list_platform_status detail_list_delivery_term detail_list_num detail_list_price detail_list_refund_num detail_list_order_price detail_list_share_price detail_list_adjust detail_list_discount detail_list_share_amount detail_list_tax_rate detail_list_goods_name detail_list_goods_no detail_list_spec_name detail_list_spec_no detail_list_spec_code detail_list_suite_no detail_list_suite_name detail_list_suite_num detail_list_suite_amount detail list_suite_discount api_goods_name api_spec_name api_goods_id api_spec_id goods id spec id commission goods type from mask remark modified created prop1 prop2 weight img_url actual_num barcode paid suite id bind_oid print_suite_mode flag stock_state is_consigned is_received cid modified_date created_date share_post_price invoice_content) VALUES" }, {"field":"limit","label":"limit","type":"string","value":"1000"} ] } ``` #### 数据请求与清洗 首先,我们需要从源系统获取原始销售订单数据。这个过程包括通过API调用或数据库查询来获取原始数据,并进行必要的数据清洗和预处理。例如,确保所有字段都符合目标系统的要求,如日期格式转换、字符串截断等。 ```python # 示例代码:获取并清洗数据 import requests import pandas as pd # 获取原始数据 response = requests.get('source_api_endpoint') data = response.json() # 清洗和预处理 df = pd.DataFrame(data) df['trade_time'] = pd.to_datetime(df['trade_time']).dt.strftime('%Y-%m-%d %H:%M:%S') df['pay_time'] = pd.to_datetime(df['pay_time']).dt.strftime('%Y-%m-%d %H:%M:%S') # ...其他清洗操作... ``` #### 数据转换与写入 在完成数据清洗后,需要将其转换为目标系统所能接受的格式,并通过API接口写入MySQL数据库。以下是一个示例代码片段,展示了如何使用Python进行这一过程: ```python import pymysql # MySQL连接配置 connection = pymysql.connect( host='mysql_host', user='mysql_user', password='mysql_password', database='target_database' ) # 构建插入语句 insert_sql = """ REPLACE INTO sales_tradequery_querywithdetail ( trade_id, trade_no, platform_id, warehouse_type, src_tids, pay_account, trade_status, trade_type, delivery_term, receiver_ring, freeze_reason, refund_status, fenxiao_type, fenxiao_nick, trade_time, pay_time, consign_time, buyer_nick, # ...省略部分字段... ) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s) """ # 插入数据到MySQL with connection.cursor() as cursor: for index,row in df.iterrows(): cursor.execute(insert_sql,(row['trade_id'],row['trade_no'],row['platform_id'],row['warehouse_type'],row['src_tids'],row['pay_account'],row['trade_status'],row['trade_type'],row['delivery_term'],row['receiver_ring'],row['freeze_reason'],row['refund_status'],row['fenxiao_type'],row['fenxiao_nick'],row['trade_time'],row['pay_time'],row['consign_time'])) connection.commit() ``` #### API接口特性与优化 在实际操作中,API接口的性能和可靠性至关重要。以下是一些优化建议: 1. **批量处理**:尽量使用批量插入操作,以减少网络开销和数据库锁定时间。 2. **错误处理**:添加错误处理机制,确保在发生异常时能够记录日志并进行重试。 3. **连接池**:使用数据库连接池来提高连接效率和稳定性。 ```python from sqlalchemy import create_engine # 使用SQLAlchemy创建连接池 engine = create_engine('mysql+pymysql://user:password@host/database') # 批量插入示例 df.to_sql('sales_tradequery_querywithdetail', con=engine, if_exists='replace', index=False) ``` 通过以上步骤,我们可以高效地将销售订单数据从源平台转换并写入到目标平台MySQL中。这不仅确保了数据的一致性和完整性,还大大提升了系统的性能和可靠性。 ![金蝶与WMS系统接口开发配置](https://pic.qeasy.cloud/T29.png~tplv-syqr462i7n-qeasy.image)