利用轻易云平台进行数据ETL并写入MySQL数据库

  • 轻易云集成顾问-吴伟

系统对接案例分享:微信公众号数据集成到MySQL

在实际的业务需求中,如何高效、可靠地将微信公众号的数据无缝集成至企业内部的MySQL数据库是一个关键问题。本文将通过具体技术实现和实例操作,详细解析“微信小店视频号-获取资金流水列表-->BI秉心-资金流水表”这一方案如何利用轻易云数据集成平台进行配置与实施。

首先,我们需要调取微信公众号接口/channels/ec/funds/getfundsflowlist以获取最新的资金流水信息。为了保证数据抓取过程中的时效性与覆盖度,系统必须具备定时、可靠抓取机制,这样才可以确保所有记录不漏单、不重复。在处理过程中还需注意分页和限流问题,以防止接口访问过频引发请求失败。

然后,通过定制化的数据转换逻辑,将从公众号获得的数据结构映射为符合目标MySQL数据库格式的数据。这一步骤非常重要,因为不同系统之间常存在字段名及类型的不一致,此处具体选用轻易云提供的可视化数据流设计工具,可以让这个复杂的过程更加直观且易于管理。

接下来,为了应对大量并发数据写入需求,我们部署了高吞吐量的数据写入方案,使得大批量数据能够快速、安全地存储到MySQL。而且,在整个过程中我们实时跟踪每一笔交易记录,并结合集中监控和告警系统,对任何异常情况进行及时检测和修复,例如网络延迟或接口响应错误等,从而保证全流程操作透明、高效。

最后,在整个生命周期运作期间,无论是在元数据管理还是在API资产使用方面,都有统一视图控制台助力企业更好地掌握资源分配,实现最优配置。而这些特点都极大提升了微信小店视频号与BI秉心间资金流水表的对接效率,使业务运行更上一层楼。 金蝶与WMS系统接口开发配置

调用微信公众号接口获取并加工资金流水数据

在数据集成的生命周期中,调用源系统接口获取数据是至关重要的第一步。本文将深入探讨如何通过轻易云数据集成平台调用微信公众号接口 /channels/ec/funds/getfundsflowlist 获取资金流水列表,并对数据进行初步加工。

接口配置与请求参数

该接口采用POST方法进行数据请求,主要用于查询资金流水列表。以下是关键的元数据配置:

  • API路径: /channels/ec/funds/getfundsflowlist
  • 请求方法: POST
  • 请求参数:
    • start_time: 流水产生的开始时间,Unix时间戳格式。
    • end_time: 流水产生的结束时间,Unix时间戳格式。
    • next_key: 用于分页查询的键值。

这些参数需要根据业务需求动态填充,例如:

  • start_time 可以使用上次同步时间 {LAST_SYNC_TIME}
  • end_time 可以使用当前时间 {CURRENT_TIME}

请求参数示例

{
  "start_time": "{LAST_SYNC_TIME}",
  "end_time": "{CURRENT_TIME}",
  "next_key": ""
}

数据清洗与初步加工

在获取到原始数据后,需要对其进行清洗和初步加工,以确保数据的准确性和一致性。以下是一些关键步骤:

  1. 字段映射与转换:

    • 将API返回的数据字段映射到目标系统所需的字段。例如,将返回的 flow_id 映射为目标系统中的 id 字段。
  2. 数据过滤:

    • 根据业务规则过滤不必要的数据。例如,只保留状态为成功的交易记录。
  3. 格式化处理:

    • 对日期、金额等字段进行格式化处理,以符合目标系统的数据规范。

示例代码

以下是一个简单的示例代码,用于调用接口并处理返回的数据:

import requests
import json
from datetime import datetime

# 配置请求参数
params = {
    "start_time": int(datetime.timestamp(datetime.now()) - 86400), # 假设上次同步时间为24小时前
    "end_time": int(datetime.timestamp(datetime.now())),
    "next_key": ""
}

# 发起POST请求
response = requests.post("https://api.weixin.qq.com/channels/ec/funds/getfundsflowlist", json=params)

# 检查响应状态码
if response.status_code == 200:
    data = response.json()

    # 数据清洗与初步加工
    cleaned_data = []
    for item in data.get("funds_flow_list", []):
        cleaned_item = {
            "id": item.get("flow_id"),
            "amount": item.get("amount"),
            "timestamp": datetime.fromtimestamp(item.get("timestamp")).strftime('%Y-%m-%d %H:%M:%S'),
            # 添加其他需要处理的字段...
        }
        cleaned_data.append(cleaned_item)

    # 输出清洗后的数据
    print(json.dumps(cleaned_data, indent=4))
else:
    print(f"Failed to fetch data: {response.status_code}")

自动填充响应与平铺结构

轻易云平台支持自动填充响应和平铺结构,这意味着可以自动将嵌套结构的数据展开为平铺结构,便于后续处理。例如:

{
  "related_info_list": [
    {
      "flow_id": "12345",
      "amount": 100,
      ...
    }
  ]
}

通过配置 beatFlat 参数,可以将 related_info_list 中的数据直接平铺到根级别。

小结

通过以上步骤,我们可以高效地调用微信公众号接口获取资金流水列表,并对数据进行清洗和初步加工。这不仅提高了数据集成过程中的透明度和效率,也为后续的数据转换与写入打下了坚实基础。 金蝶与WMS系统接口开发配置

使用轻易云数据集成平台进行ETL转换并写入MySQLAPI接口

在数据集成生命周期的第二步,我们需要将已经集成的源平台数据进行ETL(提取、转换、加载)转换,转为目标平台 MySQLAPI 接口所能够接收的格式,并最终写入目标平台。以下是详细的技术实现过程。

元数据配置解析

元数据配置是实现ETL过程中的关键部分。我们将通过解析和应用以下提供的元数据配置来完成任务:

{
  "api": "batchexecute",
  "effect": "EXECUTE",
  "method": "SQL",
  "idCheck": true,
  "request": [
    {"field": "flow_id", "label": "流水id", "type": "string", "value": "{flow_id}"},
    {"field": "funds_type", "label": "资金类型,具体枚举值请参考FundsType枚举", "type": "string", "value": "{funds_type}"},
    {"field": "flow_type", "label": "流水类型, 1 收入,2 支出", "type": "string", "value": "{flow_type}"},
    {"field": "amount", "label": "流水金额(分)", "type": "string", "value": "{amount}"},
    {"field": "balance", "label": "余额(分)", "type":"string","value":"{balance}"},
    {"field":"bookkeeping_time","label":"记账时间","type":"string","value":"{bookkeeping_time}"},
    {"field":"remark","label":"备注","type":"string","value":"{remark}"},
    {"field":"related_info_list_related_type","label":"关联类型, 1 订单, 2售后,3 提现,4 运费险","type":"string","value":"{related_info_list_related_type}"},
    {"field":"related_info_list_order_id","label":"关联订单号","type":"string","value":"{related_info_list_order_id}"},
    {"field":"related_info_list_aftersale_id","label":"关联售后单号","type":"string","value":"{related_info_list_aftersale_id}"},
    {"field":"related_info_list_withdraw_id","label":"关联提现单号","type":"string","value":"{related_info_list_withdraw_id}"},
    {"field":"related_info_list_transaction_id","label":"关联支付单号","type":"string","value":"{{related_info_list_transaction_id}}"}
  ],
  "otherRequest":[
    {"field":"main_sql","label":"主语句","type":"string","describe":"","value":
      "REPLACE INTO wx_flow_list (flow_id,funds_type,flow_type,amount,balance,bookkeeping_time,remark,related_info_list_related_type,related_info_list_order_id,related_info_list_aftersale_id,related_info_list_withdraw_id,related_info_list_transaction_id) VALUES"
    },
    {"field":"limit","label":"limit","type":"string","describe":"","value":
      "1000"
    }
  ]
}

数据提取与清洗

首先,从源平台微信小店视频号获取资金流水列表。假设我们已经通过API获取了以下JSON格式的数据:

[
  {
    "flow_id": "12345",
    ...
  },
  ...
]

数据转换

接下来,我们需要根据元数据配置,将上述JSON数据转换为MySQLAPI接口能够接收的格式。以下是一个示例代码片段,用于将JSON数据转换为SQL插入语句:

import json

# 假设从微信小店视频号获取的数据
source_data = [
  {
    'flow_id': '12345',
    'funds_type': '收入',
    'flow_type': '1',
    'amount': '10000',
    'balance': '50000',
    'bookkeeping_time': '2023-10-01T12:00:00Z',
    'remark': '',
    'related_info_list_related_type': '1',
    'related_info_list_order_id': '67890',
    ...
  },
  ...
]

# 构建SQL语句
sql_values = []
for record in source_data:
  values = (
      f"'{record['flow_id']}'",
      f"'{record['funds_type']}'",
      f"'{record['flow_type']}'",
      f"'{record['amount']}'",
      f"'{record['balance']}'",
      f"'{record['bookkeeping_time']}'",
      f"'{record['remark']}'",
      f"'{record['related_info_list_related_type']}'",
      f"'{record['related_info_list_order_id']}'",
      ...
  )
  sql_values.append(f"({','.join(values)})")

main_sql = (
   f"{metadata['otherRequest'][0]['value']} {','.join(sql_values)}"
)

print(main_sql)

数据写入

最后,通过MySQLAPI接口将转换后的数据写入目标平台。可以使用requests库进行HTTP请求:

import requests

url = "<MySQLAPI接口URL>"
headers = {'Content-Type': 'application/json'}
payload = {
   'api': metadata['api'],
   'method': metadata['method'],
   'effect': metadata['effect'],
   'requestBody': main_sql,
   ...
}

response = requests.post(url, headers=headers, data=json.dumps(payload))

if response.status_code == 200:
   print("Data successfully written to MySQL")
else:
   print("Failed to write data to MySQL:", response.text)

通过以上步骤,我们完成了从微信小店视频号获取资金流水列表,并将其转换为符合MySQLAPI接口要求的格式,最终成功写入目标平台。这一过程展示了如何利用轻易云数据集成平台进行高效的数据ETL操作。 金蝶与MES系统接口开发配置