数据清洗与转换:使用轻易云实现聚水潭到BI平台的数据集成

  • 轻易云集成顾问-林峰

聚水潭·奇门数据集成到MySQL案例分享:聚水谭-店铺商品资料单 --> BI邦盈-店铺商品资料表(只新增)

在电商业务高速发展的背景下,如何实现不同系统之间高效、精准的数据对接显得尤为重要。本篇文章将重点探讨一个典型的系统对接案例——通过轻易云平台,将聚水潭·奇门的店铺商品资料单数据集成到MySQL数据库,实现业务数据的快速写入和有效使用。

数据获取与写入流程概述

我们首先需要利用聚水潭·奇门提供的API接口jushuitan.itemskumapper.list.query来抓取店铺商品资料。该接口支持分页查询功能,可以有效处理大规模数据。同时,为了确保整合过程中的性能,我们需要考虑限流策略,以防止过度调用导致服务端拒绝请求。

对于从API获取的数据,我们会进行必要的数据转换,以适应目标MySQL数据库的表结构要求。自定义的数据转换逻辑将在这里发挥关键作用,以确保源数据被正确映射并录入MySQL中。

解决方案中的关键特性

  1. 高吞吐量的数据写入能力:通过优化批量插入操作,使大量商品记录能够迅速且稳定地写入至MySQL数据库。
  2. 实时监控与告警机制:我们采用集中化监控和动态告警系统,实时跟踪每一项任务状态及其性能指标,确保所有异常情况都能及时发现并处理。
  3. 定制化数据映射及转换:根据具体业务需求,对从聚水潭·奇门获取的数据进行格式转换,使之符合BI邦盈所需的存储格式。
  4. 可靠性提升措施:针对可能出现的网络波动或接口访问失败,我们设计了错误重试机制,保证即便在不理想情况下也不会遗漏任何关键信息。

实施细节

在具体实施中,还涉及如何调用接口、解析响应结果,并最终通过统一视图控制台全面掌握整体进程。例如,通过batchexecute API执行批量插入操作,以及利用日志记录追踪整个ETL(Extract, Transform, Load)生命周期里的各个环节等,都有助于提高透明度和可管控性。

后续将进一步详细阐述这些技术要点,包括实际实现代码示例、面对常见问题时采取的最佳实践以及一些优化技巧等。 打通企业微信数据接口

调用聚水潭·奇门接口获取并加工数据的技术案例

在数据集成生命周期的第一步,我们需要调用源系统聚水潭·奇门接口jushuitan.itemskumapper.list.query来获取并加工数据。以下是详细的技术实现步骤和注意事项。

接口调用配置

首先,我们需要配置接口调用的元数据。根据提供的元数据配置,我们可以看到该接口使用POST方法进行请求,主要参数包括分页信息和时间范围。

{
  "api": "jushuitan.itemskumapper.list.query",
  "effect": "QUERY",
  "method": "POST",
  "number": "{sku_id}+{modified}",
  "id": "{sku_id}+{modified}",
  "name": "name",
  "request": [
    {"field": "page_index", "label": "第几页", "type": "int", "value": "1"},
    {"field": "page_size", "label": "每页多少条", "type": "int", "value": "50"},
    {"field": "modified_begin", "label": "修改起始时间", "type": "string", 
        "value":"{{LAST_SYNC_TIME|datetime}}"},
    {"field": "modified_end", "label": "修改结束时间", 
        "value":"{{CURRENT_TIME|datetime}}"}
  ],
  "autoFillResponse": true,
  "delay": 5
}

请求参数详解

  • page_index: 当前请求的页码,类型为整数。
  • page_size: 每页返回的数据条数,类型为整数。
  • modified_begin: 数据修改的起始时间,类型为字符串,使用模板变量{{LAST_SYNC_TIME|datetime}}自动填充。
  • modified_end: 数据修改的结束时间,类型为字符串,使用模板变量{{CURRENT_TIME|datetime}}自动填充。

这些参数确保我们能够分页获取指定时间范围内的数据。

数据请求与清洗

在实际调用过程中,我们需要处理分页逻辑,以确保所有数据都能被完整获取。以下是一个伪代码示例:

import requests
import datetime

# 定义基础URL和头信息
base_url = 'https://api.jushuitan.com'
headers = {'Content-Type': 'application/json'}

# 获取当前时间和上次同步时间
current_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
last_sync_time = get_last_sync_time() # 假设这是一个函数,返回上次同步时间

# 初始化分页参数
page_index = 1
page_size = 50

while True:
    payload = {
        'page_index': page_index,
        'page_size': page_size,
        'modified_begin': last_sync_time,
        'modified_end': current_time
    }

    response = requests.post(f'{base_url}/jushuitan.itemskumapper.list.query', headers=headers, json=payload)

    if response.status_code != 200:
        break

    data = response.json()

    # 数据清洗和处理逻辑
    cleaned_data = clean_data(data)

    # 将清洗后的数据写入目标系统(例如BI邦盈)
    write_to_target_system(cleaned_data)

    # 判断是否还有下一页
    if len(data) < page_size:
        break

    page_index += 1

# 更新最后同步时间
update_last_sync_time(current_time)

数据转换与写入

在获取到原始数据后,需要对其进行清洗和转换,以符合目标系统(如BI邦盈)的要求。假设我们需要将商品资料写入到BI邦盈的店铺商品资料表中,并且只新增不更新已有记录。

def clean_data(raw_data):
    cleaned_data = []

    for item in raw_data:
        cleaned_item = {
            'sku_id': item['sku_id'],
            'name': item['name'],
            'modified': item['modified']
            # 根据需求添加更多字段映射和转换逻辑
        }
        cleaned_data.append(cleaned_item)

    return cleaned_data

def write_to_target_system(cleaned_data):
    for item in cleaned_data:
        if not check_if_exists(item['sku_id']):
            insert_into_bi_bangying(item)

def check_if_exists(sku_id):
    # 检查目标系统中是否已存在该SKU ID的数据
    pass

def insert_into_bi_bangying(item):
    # 将数据插入到BI邦盈系统中
    pass

以上代码示例展示了如何通过分页方式调用聚水潭·奇门接口获取商品资料,并进行数据清洗、转换及写入目标系统的全过程。这一过程确保了数据的一致性和完整性,为后续的数据分析和业务决策提供了可靠的数据支持。 打通钉钉数据接口

使用轻易云数据集成平台进行ETL转换并写入MySQL API接口

在数据集成的生命周期中,第二步至关重要,即将已经集成的源平台数据进行ETL转换,并转为目标平台 MySQL API 接口所能够接收的格式,最终写入目标平台。本文将深入探讨如何利用轻易云数据集成平台实现这一过程。

配置元数据

首先,我们需要配置元数据,以便在ETL过程中正确映射和处理字段。以下是一个典型的元数据配置示例:

{
  "api": "batchexecute",
  "effect": "EXECUTE",
  "method": "SQL",
  "idCheck": true,
  "request": [
    {"field":"co_id","label":"公司编号","type":"string","value":"{co_id}"},
    {"field":"shop_id","label":"店铺编号","type":"string","value":"{shop_id}"},
    {"field":"channel","label":"来源平台","type":"string","value":"{channel}"},
    {"field":"i_id","label":"款式编码(线上款式编码)","type":"string","value":"{i_id}"},
    {"field":"sku_id","label":"商品编码(线上商品编码)","type":"string","value":"{sku_id}"},
    {"field":"shop_i_id","label":"店铺款式编码(平台店铺款式编码)","type":"string","value":"{shop_i_id}"},
    {"field":"shop_sku_id","label":"店铺商品编码(平台店铺商品编码)","type":"string","value":"{shop_sku_id}"},
    {"field":"modified","label":"修改时间","type":"string","value":"{modified}"},
    {"field":"link_modified","label":"商品对应关系修改时间","type":"string","value":"{link_modified}"},
    {"field":"enabled","label":"是否上架","type":"string","value":"{enabled}"},
    {"field":"c_id","label":"类目编码(类目ID)","type":"string","value":"{c_id}"},
    {"field":"raw_sku_id","label":"原始商品编码(未经系统处理的线上商品编码原值)","type": "string", "value": "{raw_sku_id}" },
    {"field": "shop_price", "label": "平台价格(店铺售价),系统中需开启相关配置", "type": "string", "value": "{shop_price}" },
    {"field": "name", "label": "平台商品名称(线上商品名称)", "type": "string", "value": "{name}" },
    {"field": "properties_value", "label": "线上颜色规格", "type": "string", "value": "{properties_value}" },
    {"field": "pic", "label": "款式图片", "type": "string", "value": "{pic}" },
    {"field": "created", "label": “创建时间”, “type”: “string”, “value”: “{created}”},
    {"field”: “pull_off_time”, “label”: “下架时间”, “type”: “string”, “value”: “{pull_off_time}”},
    {“field”: “outer_sku_code”, “label”: “线上国标码”, “type”: “string”, “value”: “{outer_sku_code}”},
    {“field”: “type”, “label”: “链接同步设置,0是开启同步,2是禁止同步”, “type”:“ string ”, ” value ”: ” { type } ”},
    {“ field ”: ” shop_qty ”, ” label ”: ” 店铺库存 ”, ” type ”: ” string ”, ” value ”: ” { shop_qty }”},
    {“ field ”: ” link_sku_id ”, ” label ”: ” 对应商品编码(商品对应关系页面)”,“ type” :“ string ,” value :“ { link_sku_id }”},
   {“ field” :“ sale_price_min ,” label :“ 售价下限 ,” type :“ string ,” value :“ { sale_price_min }”},
   {“ field” :“ sale_price_max ,” label :“ 售价上限 ,” type :“ string ,” value :“ { sale_price_max }”},
   {“ field” :“ insert_time ,”“ label”:加工时间”,“ type”:”“ string”,”“ value”:”“ { modified }”,“ default”:”“ _function NOW()""}
  ],
  otherRequest:[
   { field:main-sql , label:主语句 , type:字符串 , value:INSERT INTO item_sku_mapper (co_id , shop_id , channel , i_id , sku_id , shop_i _id,shop_sku _id,modified,link_modified,enabled,c _id,raw_sku _id,shop_price,name,properties_value,pic,created,pull_off_time,outer_sku_code,type,shop_qty,link_sku _id,sale_price_min,sale_price_max,insert_time) VALUES" },
   { field:limit , label:limit , type:字符串 , value:500 }
 ]
}

数据请求与清洗

在ETL流程中,我们首先需要从源系统请求数据,并对其进行清洗。通过配置 request 字段,我们可以定义需要从源系统获取的数据字段及其类型。例如:


{
  ...
  {
      "request":[
          ...
          {"field":"co_id",
![电商OMS与ERP系统接口开发配置](https://pic.qeasy.cloud/T26.png~tplv-syqr462i7n-qeasy.image)
更多系统对接方案