聚水潭数据集成到MySQL技术案例分享
在系统对接过程中,如何确保大量数据快速、准确地从聚水潭集成到MySQL数据库,是企业信息化过程中的一个关键挑战。本文将详细探讨以"聚水谭-商品信息单-->BI事在人为-商品信息表(只新增)"为例的具体方案实现。
聚水潭数据获取与处理
为了高效抓取并写入聚水潭的数据,我们需要首先调用聚水潭提供的/open/sku/query
API接口,通过请求参数配置,实现定时可靠的数据采集。以下是主要步骤:
- API调用设置:
- 分页与限流处理:由于单次查询可能会有较多记录,我们使用分页机制,每次查询一定数量的数据,并合理设置限流,以避免触发API限制。
- 错误重试机制:在网络或服务器异常情况下,采取自动重试策略,确保每个请求都能成功返回。
def fetch_data(page_num, page_size):
response = requests.get(f"https://api.jushuitan.com/open/sku/query?page={page_num}&size={page_size}")
if response.status_code == 200:
return response.json()
else:
# 错误处理逻辑
retry_fetch_data(page_num, page_size)
data_list = []
for i in range(total_pages):
data_list.extend(fetch_data(i, default_page_size))
MySQL数据写入与转换
为了将上述获取到的商品数据信息快速且有效地写入MySQL数据库,需要利用批量插入功能来提升性能,同时要注重格式转换和映射。
- 自定义数据转换逻辑:
- 将从聚水潭获得的数据进行清洗和结构调整,使其适应MySQL数据库表结构要求。
- 实现字段对应关系映射以及必要的数据类型转换,以保证数据一致性。
def transform_and_insert(data_batch):
transformed_data = []
for item in data_batch:
transformed_record = {
"product_id": item["productId"],
"sku_name": item["skuName"],
"price": float(item["price"]),
# 更多字段映射...
}
transformed_data.append(transformed_record)
insert_into_mysql(transformed_data)
def insert_into_mysql(data_list):
connection = mysql.connector.connect(**db_config)
cursor = connection.cursor()
insert_query = """INSERT INTO product_info(product_id, sku_name, price) VALUES (%s, %s, %s)"""
cursor.executemany(insert_query, data_list)
connection.commit()
数据质量监控与告警
- 实时监控及日志管理:
调用聚水潭接口获取并加工数据的技术实现
在数据集成过程中,调用源系统的API接口是至关重要的一步。本文将详细探讨如何通过轻易云数据集成平台调用聚水潭的/open/sku/query
接口获取商品信息,并对数据进行初步加工。
接口调用配置
首先,我们需要配置元数据以便正确调用聚水潭的/open/sku/query
接口。以下是该接口的元数据配置:
{
"api": "/open/sku/query",
"effect": "QUERY",
"method": "POST",
"number": "i_id",
"id": "sku_id",
"name": "name",
"request": [
{
"field": "page_index",
"label": "开始页",
"type": "string",
"describe": "第几页,从第一页开始,默认1",
"value": "1"
},
{
"field": "page_size",
"label": "页行数",
"type": "string",
"describe": "每页多少条,默认30,最大50",
"value": "50"
},
{
"field": "modified_begin",
"label": "修改开始时间",
"type": "string",
"describe":
{
{"修改起始时间,和结束时间必须同时存在,时间间隔不能超过七天,与商品编码不能同时为空"},
{"value":"{{LAST_SYNC_TIME|datetime}}"}
}
},
{
{
{"field":"modified_end"},
{"label":"修改结束时间"},
{"type":"string"},
{"describe":"修改结束时间,和起始时间必须同时存在,时间间隔不能超过七天,与商品编码不能同时为空"},
{"value":"{{CURRENT_TIME|datetime}}"}
}
},
{
{
{"field":"sku_ids"},
{"label":"商品编码"},
{"type":"string"},
{"describe":"商品编码,与修改时间不能同时为空,最多20"}
}
}
],
{{"delay",5}}
}
请求参数解析
page_index
:表示请求的页码,从第一页开始。page_size
:每页返回的数据条数,默认30条,最大50条。modified_begin
和modified_end
:用于指定查询的时间范围,这两个参数必须同时存在且间隔不超过七天。sku_ids
:指定要查询的商品编码,与时间范围参数不能同时为空。
数据请求与清洗
在实际操作中,我们需要通过POST请求来获取商品信息。以下是一个示例请求:
{
'page_index': '1',
'page_size': '50',
'modified_begin': '2023-09-01T00:00:00Z',
'modified_end': '2023-09-07T23:59:59Z'
}
在接收到响应后,需要对数据进行清洗和转换。例如,将日期格式标准化、去除无效字段等。以下是一个简单的数据清洗示例:
import json
from datetime import datetime
def clean_data(raw_data):
cleaned_data = []
for item in raw_data:
cleaned_item = {}
cleaned_item['sku_id'] = item['sku_id']
cleaned_item['name'] = item['name']
cleaned_item['last_modified'] = datetime.strptime(item['last_modified'], '%Y-%m-%dT%H:%M:%SZ')
# 添加更多清洗逻辑
cleaned_data.append(cleaned_item)
return cleaned_data
# 假设 raw_response 是从 API 获取到的原始数据
raw_response = '''[
{"sku_id":123,
{"name":"Product A"},
{"last_modified":"2023-09-01T12:34:56Z"}
}
]'''
raw_data = json.loads(raw_response)
cleaned_data = clean_data(raw_data)
print(cleaned_data)
数据转换与写入
在完成数据清洗后,需要将其转换为目标系统所需的格式,并写入到目标数据库中。这一步通常涉及到字段映射、类型转换等操作。
def transform_and_write(cleaned_data):
transformed_data = []
for item in cleaned_data:
transformed_item = {}
transformed_item['商品ID'] = item['sku_id']
transformed_item['商品名称'] = item['name']
# 添加更多转换逻辑
transformed_data.append(transformed_item)
# 将转换后的数据写入目标数据库
write_to_database(transformed_data)
def write_to_database(data):
# 假设我们使用某个数据库连接库来写入数据
db_connection.insert_many('target_table', data)
transform_and_write(cleaned_data)
通过上述步骤,我们可以高效地从聚水潭系统获取并处理商品信息,为后续的数据分析和业务决策提供可靠的数据支持。
数据集成与ETL转换:实现商品信息写入MySQL
在数据集成的生命周期中,第二步是将已经集成的源平台数据进行ETL(提取、转换、加载)转换,并最终写入目标平台。本案例将详细探讨如何将聚水谭的商品信息通过ETL转换为MySQL API接口所能接收的格式,并写入目标平台。
数据请求与清洗
在数据请求阶段,我们从聚水谭获取商品信息单的数据。为了确保数据的完整性和一致性,需要对原始数据进行清洗和预处理。以下是我们需要处理的字段:
[
{"field":"sku_id","label":"商品编码","type":"string","value":"{sku_id}"},
{"field":"i_id","label":"款式编码","type":"string","value":"{i_id}"},
{"field":"name","label":"商品名称","type":"string","value":"{name}"},
{"field":"short_name","label":"商品简称","type":"string","value":"{short_name}"},
{"field":"sale_price","label":"销售价","type":"string","value":"{sale_price}"},
{"field":"cost_price","label":"成本价","type":"string","value":"{cost_price}"},
{"field":"properties_value","label":"颜色规格","type":"string","value":"{properties_value}"},
{"field":"c_id","label":"类目id","type":"string","value":"{c_id}"},
{"field":"category","label":"分类","type":"string","value":"{category}"},
{"field":"pic_big","label":"大图地址","type":"string","value":"{pic_big}"},
{"field": "pic", "label": "图片地址", "type": "string", "value": "{pic}"}
]
以上字段是我们从源平台提取的数据,这些字段需要经过清洗和标准化,以确保数据的一致性。
数据转换与写入
在数据转换阶段,我们需要将清洗后的数据转化为目标平台MySQL API接口所能接收的格式。以下是MySQL API接口所需的数据格式:
{
"api": "batchexecute",
"effect": "EXECUTE",
"method": "SQL",
"idCheck": true,
"request": [
{"field": "sku_id", "label": "商品编码", "type": "string", "value": "{sku_id}"},
{"field": "i_id", "label": "款式编码", "type": "string", "value": "{i_id}"},
{"field": "name", "label": "商品名称", "type": "string", "value": "{name}"},
{"field": "short_name", "label": "商品简称", "type": "string", "value": "{short_name}"},
// 省略部分字段
{"field": "insert_time", "label": "", type: string, value: {modified}}
],
otherRequest: [
{
field: main_sql,
label: 主语句,
type: string,
describe: 111,
value: INSERT INTO sku_query (sku_id,i_id,name,short_name,sale_price,cost_price,properties_value,c_id,category,pic_big,pic,enabled,weight,market_price,brand,supplier_id,supplier_name,modified,sku_code,supplier_sku_id,supplier_i_id,vc_name,sku_type,creator,created,remark,item_type,stock_disabled,unit,shelf_life,labels,production_licence,l,w,h,is_series_number,other_price_1,other_price_2,other_price_3,other_price_4,other_price_5,other_1,other_2,other_3,other_4,other_5 stock_type sku_codes autoid batch_enabled insert_time) VALUES
},
{
field: limit,
label: limit,
type: string,
value: 1000
}
]
}
在这里,我们使用了batchexecute
API来批量执行SQL插入操作。main_sql
字段定义了插入语句,包含了所有需要插入到目标表中的字段。
配置示例
以下是一个具体的配置示例,用于将聚水谭的商品信息写入到MySQL数据库中的sku_query
表:
{
api: batchexecute,
effect: EXECUTE,
method: SQL,
idCheck: true,
request: [
{ field: sku_id,label:"商品编码" , type:"string" , value:"{sku_id}" },
{ field:i_id,label:"款式编码" , type:"string" , value:"{i_id}" },
{ field:name,label:"商品名称" , type:"string" , value:"{name}" },
{ field:short_name,label:"商品简称" , type:"string" , value:"{short_name}" },
{ field:sale_price,label:"销售价" , type:"string" , value:"{sale_price}" },
{ field:cost_price,label:"成本价" , type:"string" , value:"{cost_price}" },
{ field:c_id,label:"类目id" , type:"string" , value:"{c_id}" },
{ field:lable,label:"分类" , type:string,value:{category}},
{ field:lable,label:"大图地址",type:string,value:{pic_big}},
//省略部分字段
{ field:lable,label:insert_time,type:string,value:{modified}}
],
otherRequest:[
{
field:main_sql,
label:主语句,
type: string,
describe:111,
value:INSERT INTO sku_query (sku_id,i_id,name short_name sale_price cost price properties_value c id category pic big pic enabled weight market price brand supplier id supplier name modified sku code supplier sku id supplier i id vc name sku type creator created remark item_type stock disabled unit shelf life labels production licence l w h is series number other price 1 other price 2 other price 3 other price 4 other price 5 other 1 other 2 other 3 other 4 other 5 stock type sku codes autoid batch enabled insert time) VALUES
},
{
field:limit,
label:limit,
type: string,
value:1000
}
]
}
该配置通过API调用实现批量插入操作,将清洗后的数据按照指定格式写入到MySQL数据库中。
总结
通过上述步骤,我们实现了从聚水谭获取商品信息并经过ETL转换后,将其写入到目标平台MySQL中。这一过程充分利用了轻易云数据集成平台提供的全异步、多异构系统支持能力,实现了不同系统间的数据无缝对接。