数据集成与ETL转换:实现商品信息写入MySQL的实践

  • 轻易云集成顾问-蔡威

聚水潭数据集成到MySQL技术案例分享

在系统对接过程中,如何确保大量数据快速、准确地从聚水潭集成到MySQL数据库,是企业信息化过程中的一个关键挑战。本文将详细探讨以"聚水谭-商品信息单-->BI事在人为-商品信息表(只新增)"为例的具体方案实现。

聚水潭数据获取与处理

为了高效抓取并写入聚水潭的数据,我们需要首先调用聚水潭提供的/open/sku/query API接口,通过请求参数配置,实现定时可靠的数据采集。以下是主要步骤:

  1. API调用设置
    • 分页与限流处理:由于单次查询可能会有较多记录,我们使用分页机制,每次查询一定数量的数据,并合理设置限流,以避免触发API限制。
    • 错误重试机制:在网络或服务器异常情况下,采取自动重试策略,确保每个请求都能成功返回。
def fetch_data(page_num, page_size):
    response = requests.get(f"https://api.jushuitan.com/open/sku/query?page={page_num}&size={page_size}")
    if response.status_code == 200:
        return response.json()
    else:
        # 错误处理逻辑
        retry_fetch_data(page_num, page_size)

data_list = []
for i in range(total_pages):
    data_list.extend(fetch_data(i, default_page_size))

MySQL数据写入与转换

为了将上述获取到的商品数据信息快速且有效地写入MySQL数据库,需要利用批量插入功能来提升性能,同时要注重格式转换和映射。

  1. 自定义数据转换逻辑
    • 将从聚水潭获得的数据进行清洗和结构调整,使其适应MySQL数据库表结构要求。
    • 实现字段对应关系映射以及必要的数据类型转换,以保证数据一致性。
def transform_and_insert(data_batch):
    transformed_data = []
    for item in data_batch:
        transformed_record = {
            "product_id": item["productId"],
            "sku_name": item["skuName"],
            "price": float(item["price"]),
            # 更多字段映射...
        }
        transformed_data.append(transformed_record)

    insert_into_mysql(transformed_data)

def insert_into_mysql(data_list):
    connection = mysql.connector.connect(**db_config)
    cursor = connection.cursor()

    insert_query = """INSERT INTO product_info(product_id, sku_name, price) VALUES (%s, %s, %s)"""

    cursor.executemany(insert_query, data_list)

    connection.commit()

数据质量监控与告警

  1. 实时监控及日志管理

如何开发用友BIP接口

调用聚水潭接口获取并加工数据的技术实现

在数据集成过程中,调用源系统的API接口是至关重要的一步。本文将详细探讨如何通过轻易云数据集成平台调用聚水潭的/open/sku/query接口获取商品信息,并对数据进行初步加工。

接口调用配置

首先,我们需要配置元数据以便正确调用聚水潭的/open/sku/query接口。以下是该接口的元数据配置:

{
  "api": "/open/sku/query",
  "effect": "QUERY",
  "method": "POST",
  "number": "i_id",
  "id": "sku_id",
  "name": "name",
  "request": [
    {
      "field": "page_index",
      "label": "开始页",
      "type": "string",
      "describe": "第几页,从第一页开始,默认1",
      "value": "1"
    },
    {
      "field": "page_size",
      "label": "页行数",
      "type": "string",
      "describe": "每页多少条,默认30,最大50",
      "value": "50"
    },
    {
      "field": "modified_begin",
      "label": "修改开始时间",
      "type": "string",
      "describe": 
        {
          {"修改起始时间,和结束时间必须同时存在,时间间隔不能超过七天,与商品编码不能同时为空"},
          {"value":"{{LAST_SYNC_TIME|datetime}}"}
        }
    },
    {
      {
        {"field":"modified_end"},
        {"label":"修改结束时间"},
        {"type":"string"},
        {"describe":"修改结束时间,和起始时间必须同时存在,时间间隔不能超过七天,与商品编码不能同时为空"},
        {"value":"{{CURRENT_TIME|datetime}}"}
      }
    },
    {
      {
        {"field":"sku_ids"},
        {"label":"商品编码"},
        {"type":"string"},
        {"describe":"商品编码,与修改时间不能同时为空,最多20"}
      }
    }
  ],
  {{"delay",5}}
}

请求参数解析

  • page_index:表示请求的页码,从第一页开始。
  • page_size:每页返回的数据条数,默认30条,最大50条。
  • modified_beginmodified_end:用于指定查询的时间范围,这两个参数必须同时存在且间隔不超过七天。
  • sku_ids:指定要查询的商品编码,与时间范围参数不能同时为空。

数据请求与清洗

在实际操作中,我们需要通过POST请求来获取商品信息。以下是一个示例请求:

{
  'page_index': '1',
  'page_size': '50',
  'modified_begin': '2023-09-01T00:00:00Z',
  'modified_end': '2023-09-07T23:59:59Z'
}

在接收到响应后,需要对数据进行清洗和转换。例如,将日期格式标准化、去除无效字段等。以下是一个简单的数据清洗示例:

import json
from datetime import datetime

def clean_data(raw_data):
    cleaned_data = []
    for item in raw_data:
        cleaned_item = {}
        cleaned_item['sku_id'] = item['sku_id']
        cleaned_item['name'] = item['name']
        cleaned_item['last_modified'] = datetime.strptime(item['last_modified'], '%Y-%m-%dT%H:%M:%SZ')

        # 添加更多清洗逻辑
        cleaned_data.append(cleaned_item)

    return cleaned_data

# 假设 raw_response 是从 API 获取到的原始数据
raw_response = '''[
    {"sku_id":123, 
     {"name":"Product A"}, 
     {"last_modified":"2023-09-01T12:34:56Z"}
     }
]'''

raw_data = json.loads(raw_response)
cleaned_data = clean_data(raw_data)
print(cleaned_data)

数据转换与写入

在完成数据清洗后,需要将其转换为目标系统所需的格式,并写入到目标数据库中。这一步通常涉及到字段映射、类型转换等操作。

def transform_and_write(cleaned_data):
    transformed_data = []

    for item in cleaned_data:
        transformed_item = {}
        transformed_item['商品ID'] = item['sku_id']
        transformed_item['商品名称'] = item['name']

        # 添加更多转换逻辑

        transformed_data.append(transformed_item)

    # 将转换后的数据写入目标数据库
    write_to_database(transformed_data)

def write_to_database(data):
    # 假设我们使用某个数据库连接库来写入数据
    db_connection.insert_many('target_table', data)

transform_and_write(cleaned_data)

通过上述步骤,我们可以高效地从聚水潭系统获取并处理商品信息,为后续的数据分析和业务决策提供可靠的数据支持。 钉钉与CRM系统接口开发配置

数据集成与ETL转换:实现商品信息写入MySQL

在数据集成的生命周期中,第二步是将已经集成的源平台数据进行ETL(提取、转换、加载)转换,并最终写入目标平台。本案例将详细探讨如何将聚水谭的商品信息通过ETL转换为MySQL API接口所能接收的格式,并写入目标平台。

数据请求与清洗

在数据请求阶段,我们从聚水谭获取商品信息单的数据。为了确保数据的完整性和一致性,需要对原始数据进行清洗和预处理。以下是我们需要处理的字段:

[
    {"field":"sku_id","label":"商品编码","type":"string","value":"{sku_id}"},
    {"field":"i_id","label":"款式编码","type":"string","value":"{i_id}"},
    {"field":"name","label":"商品名称","type":"string","value":"{name}"},
    {"field":"short_name","label":"商品简称","type":"string","value":"{short_name}"},
    {"field":"sale_price","label":"销售价","type":"string","value":"{sale_price}"},
    {"field":"cost_price","label":"成本价","type":"string","value":"{cost_price}"},
    {"field":"properties_value","label":"颜色规格","type":"string","value":"{properties_value}"},
    {"field":"c_id","label":"类目id","type":"string","value":"{c_id}"},
    {"field":"category","label":"分类","type":"string","value":"{category}"},
    {"field":"pic_big","label":"大图地址","type":"string","value":"{pic_big}"},
    {"field": "pic", "label": "图片地址", "type": "string", "value": "{pic}"}
]

以上字段是我们从源平台提取的数据,这些字段需要经过清洗和标准化,以确保数据的一致性。

数据转换与写入

在数据转换阶段,我们需要将清洗后的数据转化为目标平台MySQL API接口所能接收的格式。以下是MySQL API接口所需的数据格式:

{
    "api": "batchexecute",
    "effect": "EXECUTE",
    "method": "SQL",
    "idCheck": true,
    "request": [
        {"field": "sku_id", "label": "商品编码", "type": "string", "value": "{sku_id}"},
        {"field": "i_id", "label": "款式编码", "type": "string", "value": "{i_id}"},
        {"field": "name", "label": "商品名称", "type": "string", "value": "{name}"},
        {"field": "short_name", "label": "商品简称", "type": "string", "value": "{short_name}"},
        // 省略部分字段
        {"field": "insert_time", "label": "",  type:  string, value: {modified}}
     ],
     otherRequest: [
         {
             field: main_sql,
             label: 主语句,
             type: string,
             describe: 111,
             value: INSERT INTO sku_query (sku_id,i_id,name,short_name,sale_price,cost_price,properties_value,c_id,category,pic_big,pic,enabled,weight,market_price,brand,supplier_id,supplier_name,modified,sku_code,supplier_sku_id,supplier_i_id,vc_name,sku_type,creator,created,remark,item_type,stock_disabled,unit,shelf_life,labels,production_licence,l,w,h,is_series_number,other_price_1,other_price_2,other_price_3,other_price_4,other_price_5,other_1,other_2,other_3,other_4,other_5 stock_type sku_codes autoid batch_enabled insert_time) VALUES
         },
         {
            field: limit,
            label: limit,
            type: string,
            value: 1000
         }
     ]
}

在这里,我们使用了batchexecute API来批量执行SQL插入操作。main_sql字段定义了插入语句,包含了所有需要插入到目标表中的字段。

配置示例

以下是一个具体的配置示例,用于将聚水谭的商品信息写入到MySQL数据库中的sku_query表:

{
    api: batchexecute,
    effect: EXECUTE,
    method: SQL,
    idCheck: true,
    request: [
        { field: sku_id,label:"商品编码" , type:"string" , value:"{sku_id}" },
        { field:i_id,label:"款式编码" , type:"string" , value:"{i_id}" },
        { field:name,label:"商品名称" , type:"string" , value:"{name}" },
        { field:short_name,label:"商品简称" , type:"string" , value:"{short_name}" },
        { field:sale_price,label:"销售价" , type:"string" , value:"{sale_price}" },
        { field:cost_price,label:"成本价" , type:"string" , value:"{cost_price}" },
        { field:c_id,label:"类目id" , type:"string" , value:"{c_id}" },
        { field:lable,label:"分类" , type:string,value:{category}},
        { field:lable,label:"大图地址",type:string,value:{pic_big}},
        //省略部分字段
        { field:lable,label:insert_time,type:string,value:{modified}}
     ],
     otherRequest:[
         {
             field:main_sql,
             label:主语句,
             type: string,
             describe:111,
             value:INSERT INTO sku_query (sku_id,i_id,name short_name sale_price cost price properties_value c id category pic big pic enabled weight market price brand supplier id supplier name modified sku code supplier sku id supplier i id vc name sku type creator created remark item_type stock disabled unit shelf life labels production licence l w h is series number other price 1 other price 2 other price 3 other price 4 other price 5 other 1 other 2 other 3 other 4 other 5 stock type sku codes autoid batch enabled insert time) VALUES
         },
         {
            field:limit,
            label:limit,
            type: string,
            value:1000
         }
     ]
}

该配置通过API调用实现批量插入操作,将清洗后的数据按照指定格式写入到MySQL数据库中。

总结

通过上述步骤,我们实现了从聚水谭获取商品信息并经过ETL转换后,将其写入到目标平台MySQL中。这一过程充分利用了轻易云数据集成平台提供的全异步、多异构系统支持能力,实现了不同系统间的数据无缝对接。 金蝶与WMS系统接口开发配置