利用轻易云平台进行数据ETL并写入MySQL数据库

  • 轻易云集成顾问-吴伟
### 系统对接案例分享:微信公众号数据集成到MySQL 在实际的业务需求中,如何高效、可靠地将微信公众号的数据无缝集成至企业内部的MySQL数据库是一个关键问题。本文将通过具体技术实现和实例操作,详细解析“微信小店视频号-获取资金流水列表-->BI秉心-资金流水表”这一方案如何利用轻易云数据集成平台进行配置与实施。 首先,我们需要调取微信公众号接口`/channels/ec/funds/getfundsflowlist`以获取最新的资金流水信息。为了保证数据抓取过程中的时效性与覆盖度,系统必须具备定时、可靠抓取机制,这样才可以确保所有记录不漏单、不重复。在处理过程中还需注意分页和限流问题,以防止接口访问过频引发请求失败。 然后,通过定制化的数据转换逻辑,将从公众号获得的数据结构映射为符合目标MySQL数据库格式的数据。这一步骤非常重要,因为不同系统之间常存在字段名及类型的不一致,此处具体选用轻易云提供的可视化数据流设计工具,可以让这个复杂的过程更加直观且易于管理。 接下来,为了应对大量并发数据写入需求,我们部署了高吞吐量的数据写入方案,使得大批量数据能够快速、安全地存储到MySQL。而且,在整个过程中我们实时跟踪每一笔交易记录,并结合集中监控和告警系统,对任何异常情况进行及时检测和修复,例如网络延迟或接口响应错误等,从而保证全流程操作透明、高效。 最后,在整个生命周期运作期间,无论是在元数据管理还是在API资产使用方面,都有统一视图控制台助力企业更好地掌握资源分配,实现最优配置。而这些特点都极大提升了微信小店视频号与BI秉心间资金流水表的对接效率,使业务运行更上一层楼。 ![金蝶与WMS系统接口开发配置](https://pic.qeasy.cloud/D29.png~tplv-syqr462i7n-qeasy.image) ### 调用微信公众号接口获取并加工资金流水数据 在数据集成的生命周期中,调用源系统接口获取数据是至关重要的第一步。本文将深入探讨如何通过轻易云数据集成平台调用微信公众号接口 `/channels/ec/funds/getfundsflowlist` 获取资金流水列表,并对数据进行初步加工。 #### 接口配置与请求参数 该接口采用POST方法进行数据请求,主要用于查询资金流水列表。以下是关键的元数据配置: - **API路径**: `/channels/ec/funds/getfundsflowlist` - **请求方法**: POST - **请求参数**: - `start_time`: 流水产生的开始时间,Unix时间戳格式。 - `end_time`: 流水产生的结束时间,Unix时间戳格式。 - `next_key`: 用于分页查询的键值。 这些参数需要根据业务需求动态填充,例如: - `start_time` 可以使用上次同步时间 `{LAST_SYNC_TIME}`。 - `end_time` 可以使用当前时间 `{CURRENT_TIME}`。 #### 请求参数示例 ```json { "start_time": "{LAST_SYNC_TIME}", "end_time": "{CURRENT_TIME}", "next_key": "" } ``` #### 数据清洗与初步加工 在获取到原始数据后,需要对其进行清洗和初步加工,以确保数据的准确性和一致性。以下是一些关键步骤: 1. **字段映射与转换**: - 将API返回的数据字段映射到目标系统所需的字段。例如,将返回的 `flow_id` 映射为目标系统中的 `id` 字段。 2. **数据过滤**: - 根据业务规则过滤不必要的数据。例如,只保留状态为成功的交易记录。 3. **格式化处理**: - 对日期、金额等字段进行格式化处理,以符合目标系统的数据规范。 #### 示例代码 以下是一个简单的示例代码,用于调用接口并处理返回的数据: ```python import requests import json from datetime import datetime # 配置请求参数 params = { "start_time": int(datetime.timestamp(datetime.now()) - 86400), # 假设上次同步时间为24小时前 "end_time": int(datetime.timestamp(datetime.now())), "next_key": "" } # 发起POST请求 response = requests.post("https://api.weixin.qq.com/channels/ec/funds/getfundsflowlist", json=params) # 检查响应状态码 if response.status_code == 200: data = response.json() # 数据清洗与初步加工 cleaned_data = [] for item in data.get("funds_flow_list", []): cleaned_item = { "id": item.get("flow_id"), "amount": item.get("amount"), "timestamp": datetime.fromtimestamp(item.get("timestamp")).strftime('%Y-%m-%d %H:%M:%S'), # 添加其他需要处理的字段... } cleaned_data.append(cleaned_item) # 输出清洗后的数据 print(json.dumps(cleaned_data, indent=4)) else: print(f"Failed to fetch data: {response.status_code}") ``` #### 自动填充响应与平铺结构 轻易云平台支持自动填充响应和平铺结构,这意味着可以自动将嵌套结构的数据展开为平铺结构,便于后续处理。例如: ```json { "related_info_list": [ { "flow_id": "12345", "amount": 100, ... } ] } ``` 通过配置 `beatFlat` 参数,可以将 `related_info_list` 中的数据直接平铺到根级别。 #### 小结 通过以上步骤,我们可以高效地调用微信公众号接口获取资金流水列表,并对数据进行清洗和初步加工。这不仅提高了数据集成过程中的透明度和效率,也为后续的数据转换与写入打下了坚实基础。 ![金蝶与WMS系统接口开发配置](https://pic.qeasy.cloud/S19.png~tplv-syqr462i7n-qeasy.image) ### 使用轻易云数据集成平台进行ETL转换并写入MySQLAPI接口 在数据集成生命周期的第二步,我们需要将已经集成的源平台数据进行ETL(提取、转换、加载)转换,转为目标平台 MySQLAPI 接口所能够接收的格式,并最终写入目标平台。以下是详细的技术实现过程。 #### 元数据配置解析 元数据配置是实现ETL过程中的关键部分。我们将通过解析和应用以下提供的元数据配置来完成任务: ```json { "api": "batchexecute", "effect": "EXECUTE", "method": "SQL", "idCheck": true, "request": [ {"field": "flow_id", "label": "流水id", "type": "string", "value": "{flow_id}"}, {"field": "funds_type", "label": "资金类型,具体枚举值请参考FundsType枚举", "type": "string", "value": "{funds_type}"}, {"field": "flow_type", "label": "流水类型, 1 收入,2 支出", "type": "string", "value": "{flow_type}"}, {"field": "amount", "label": "流水金额(分)", "type": "string", "value": "{amount}"}, {"field": "balance", "label": "余额(分)", "type":"string","value":"{balance}"}, {"field":"bookkeeping_time","label":"记账时间","type":"string","value":"{bookkeeping_time}"}, {"field":"remark","label":"备注","type":"string","value":"{remark}"}, {"field":"related_info_list_related_type","label":"关联类型, 1 订单, 2售后,3 提现,4 运费险","type":"string","value":"{related_info_list_related_type}"}, {"field":"related_info_list_order_id","label":"关联订单号","type":"string","value":"{related_info_list_order_id}"}, {"field":"related_info_list_aftersale_id","label":"关联售后单号","type":"string","value":"{related_info_list_aftersale_id}"}, {"field":"related_info_list_withdraw_id","label":"关联提现单号","type":"string","value":"{related_info_list_withdraw_id}"}, {"field":"related_info_list_transaction_id","label":"关联支付单号","type":"string","value":"{{related_info_list_transaction_id}}"} ], "otherRequest":[ {"field":"main_sql","label":"主语句","type":"string","describe":"","value": "REPLACE INTO wx_flow_list (flow_id,funds_type,flow_type,amount,balance,bookkeeping_time,remark,related_info_list_related_type,related_info_list_order_id,related_info_list_aftersale_id,related_info_list_withdraw_id,related_info_list_transaction_id) VALUES" }, {"field":"limit","label":"limit","type":"string","describe":"","value": "1000" } ] } ``` #### 数据提取与清洗 首先,从源平台微信小店视频号获取资金流水列表。假设我们已经通过API获取了以下JSON格式的数据: ```json [ { "flow_id": "12345", ... }, ... ] ``` #### 数据转换 接下来,我们需要根据元数据配置,将上述JSON数据转换为MySQLAPI接口能够接收的格式。以下是一个示例代码片段,用于将JSON数据转换为SQL插入语句: ```python import json # 假设从微信小店视频号获取的数据 source_data = [ { 'flow_id': '12345', 'funds_type': '收入', 'flow_type': '1', 'amount': '10000', 'balance': '50000', 'bookkeeping_time': '2023-10-01T12:00:00Z', 'remark': '', 'related_info_list_related_type': '1', 'related_info_list_order_id': '67890', ... }, ... ] # 构建SQL语句 sql_values = [] for record in source_data: values = ( f"'{record['flow_id']}'", f"'{record['funds_type']}'", f"'{record['flow_type']}'", f"'{record['amount']}'", f"'{record['balance']}'", f"'{record['bookkeeping_time']}'", f"'{record['remark']}'", f"'{record['related_info_list_related_type']}'", f"'{record['related_info_list_order_id']}'", ... ) sql_values.append(f"({','.join(values)})") main_sql = ( f"{metadata['otherRequest'][0]['value']} {','.join(sql_values)}" ) print(main_sql) ``` #### 数据写入 最后,通过MySQLAPI接口将转换后的数据写入目标平台。可以使用`requests`库进行HTTP请求: ```python import requests url = "<MySQLAPI接口URL>" headers = {'Content-Type': 'application/json'} payload = { 'api': metadata['api'], 'method': metadata['method'], 'effect': metadata['effect'], 'requestBody': main_sql, ... } response = requests.post(url, headers=headers, data=json.dumps(payload)) if response.status_code == 200: print("Data successfully written to MySQL") else: print("Failed to write data to MySQL:", response.text) ``` 通过以上步骤,我们完成了从微信小店视频号获取资金流水列表,并将其转换为符合MySQLAPI接口要求的格式,最终成功写入目标平台。这一过程展示了如何利用轻易云数据集成平台进行高效的数据ETL操作。 ![金蝶与MES系统接口开发配置](https://pic.qeasy.cloud/T5.png~tplv-syqr462i7n-qeasy.image)