聚水潭-商品信息查询-->BI狄菲俪诗-商品信息表数据集成案例
在本篇技术案例中,我们将聚焦于如何通过轻易云数据集成平台,实现从聚水潭系统到MySQL数据库的数据对接和集成。具体任务是将聚水潭中的商品信息查询接口(/open/sku/query)获取的数据,写入到BI狄菲俪诗的商品信息表中。
这个案例主要解决以下几个核心技术问题:
- 定时可靠的数据抓取:通过配置定时任务,可靠地从聚水潭接口获取最新的SKU数据。
- 处理分页和限流:应对API分页返回和限流控制的问题,以保证所有数据完整无误地被抓取。
- 自定义数据转换逻辑:根据业务需求,将原始SKU数据经过预处理后转换为符合MySQL数据库格式的数据结构。
- 高吞吐量快速写入:利用MySQL批量执行API(batchexecute),提升大规模数据写入效率,确保性能稳定。
- 实时监控与异常处理:
- 采用集中监控系统,实时跟踪每个集成任务的状态和性能表现;
- 实现异常检测与错误重试机制,有效处理网络波动、服务中断等可能出现的问题。
在具体实施过程中,为了确保不同系统间的数据结构能够兼容,并且每一条记录都不会遗漏或重复,我们还需要特别关注以下方面:
- 数据质量监控,通过设立检查点来校验关键字段的一致性;
- 自定义映射规则,根据实际业务需求进行灵活配置以适应特定场景;
- 对不同步失败的情况实现详细日志记录,以便及时发现并纠正问题。
下一节我们将详解如何使用上述方案端到端实现这一目标,包括实际操作步骤、代码示例及关键参数配置。进一步展示该方案不仅提升了数据同步效率,还极大优化了运维管理难度,使得复杂的数据流转过程变得直观可控。
调用聚水潭接口/open/sku/query获取并加工数据
在数据集成生命周期的第一步,我们需要从源系统聚水潭调用接口/open/sku/query
来获取商品信息,并对数据进行初步加工。本文将详细探讨如何配置和调用该接口,以及如何处理返回的数据。
接口配置与调用
首先,我们需要了解接口的基本配置。根据元数据配置,/open/sku/query
接口使用POST方法进行数据请求,主要参数包括分页信息和时间范围。
请求参数解析
- page_index: 开始页,默认为1。
- page_size: 每页条数,默认为30,最大50。
- modified_begin: 修改开始时间,与结束时间必须同时存在,时间间隔不能超过七天。
- modified_end: 修改结束时间,与开始时间必须同时存在,时间间隔不能超过七天。
这些参数确保我们能够分页获取在指定时间范围内修改过的商品信息。
请求示例
{
"page_index": "1",
"page_size": "50",
"modified_begin": "{{LAST_SYNC_TIME|datetime}}",
"modified_end": "{{CURRENT_TIME|datetime}}"
}
在实际操作中,我们会使用动态变量来填充modified_begin
和modified_end
,以确保每次请求都能获取最新的数据变化。
数据处理与清洗
在成功调用接口并获取数据后,我们需要对返回的数据进行清洗和初步加工。这一步骤非常关键,因为它直接影响到后续的数据转换与写入过程。
返回数据结构
假设返回的数据结构如下:
{
"data": [
{
"sku_id": "12345",
"name": "商品A",
"price": 100,
"stock": 50,
...
},
...
],
"total_count": 100
}
我们需要提取并清洗这些数据,以便后续处理。具体步骤包括:
- 字段映射:将返回的字段映射到目标系统所需的字段。例如,将
sku_id
映射到目标系统的商品ID字段。 - 数据验证:检查必要字段是否存在,并验证其格式是否正确。例如,确保价格字段为正数。
- 去重处理:如果返回的数据中存在重复记录,需要进行去重处理。
清洗示例代码
以下是一个简单的Python示例代码,用于清洗返回的数据:
import requests
import datetime
# 定义请求参数
params = {
"page_index": "1",
"page_size": "50",
"modified_begin": (datetime.datetime.now() - datetime.timedelta(days=7)).strftime('%Y-%m-%d %H:%M:%S'),
"modified_end": datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
}
# 发起POST请求
response = requests.post('https://api.jushuitan.com/open/sku/query', json=params)
data = response.json()
# 数据清洗
cleaned_data = []
for item in data['data']:
cleaned_item = {
'商品ID': item['sku_id'],
'名称': item['name'],
'价格': item['price'],
'库存': item['stock']
# 添加其他需要的字段映射
}
# 数据验证
if cleaned_item['价格'] > 0:
cleaned_data.append(cleaned_item)
# 输出清洗后的数据
print(cleaned_data)
自动填充响应与条件过滤
元数据配置中还提到自动填充响应(autoFillResponse)和条件过滤(condition_bk)。这些功能可以帮助我们更高效地处理和过滤数据。
自动填充响应
自动填充响应功能可以简化我们的开发工作,使得平台能够自动解析并填充返回的数据,无需手动编写大量解析代码。这对于大规模数据处理非常有利。
条件过滤
条件过滤功能允许我们在获取数据时应用特定的逻辑条件。例如,在元数据配置中,我们可以看到以下条件:
"condition_bk":[[{"field":"enabled","logic":"eqv2","value":"1"}]]
这意味着我们只会获取状态为启用(enabled=1)的商品信息,从而减少不必要的数据传输和处理量。
通过上述步骤,我们可以高效地从聚水潭系统获取并清洗商品信息,为后续的数据转换与写入打下坚实基础。
数据转换与写入目标平台 MySQL 的技术案例
在数据集成生命周期的第二步,我们需要将已经从源平台聚水潭获取的商品信息数据进行ETL(提取、转换、加载)处理,最终写入目标平台 MySQL。本文将详细阐述如何利用元数据配置完成这一过程。
数据提取与清洗
在数据转换之前,我们首先需要确保从源平台提取的数据是干净且符合目标平台要求的。假设我们已经完成了数据提取和初步清洗步骤,接下来我们将重点放在数据转换与写入阶段。
数据转换
为了使数据符合 MySQL API 接口的格式,我们需要根据提供的元数据配置对字段进行映射和转换。以下是元数据配置中各字段的详细说明:
{
"api": "batchexecute",
"effect": "EXECUTE",
"method": "POST",
"number": "id",
"id": "id",
"name": "id",
"idCheck": true,
"request": [
{"field":"sku_id","label":"商品编码","type":"string","value":"{sku_id}"},
{"field":"i_id","label":"款式编码","type":"string","value":"{i_id}"},
{"field":"name","label":"商品名称","type":"string","value":"{name}"},
{"field":"short_name","label":"商品简称","type":"string","value":"{short_name}"},
{"field":"sale_price","label":"销售价","type":"string","value":"{sale_price}"},
{"field":"cost_price","label":"成本价","type":"string","value":"{cost_price}"},
{"field":"properties_value","label":"颜色规格","type":"string","value":"{properties_value}"},
{"field":"c_id","label":"类目id","type":"string","value":"{c_id}"},
{"field":"category","label":"分类","type":"string","value":"{category}"},
{"field":"enabled","label":"是否启用","type":"string","value":"{enabled}"},
{"field":"weight","label":"重量","type":"string","value":"{weight}"},
{"field":"market_price","label":"市场价","type":"string","value":"{market_price}"},
{"field":...}
],
"otherRequest":[
{
"field": "main_sql",
"label": "主语句",
"type": "string",
"describe": "SQL首次执行的语句,将会返回:lastInsertId",
"value": "REPLACE INTO sku_query (sku_id,i_id,name,short_name,sale_price,cost_price,properties_value,c_id,category,enabled,weight,market_price,brand,supplier_id,supplier_name,modified,sku_code,supplier_sku_id,supplier_i_id,vc_name,sku_type,creator,created,remark,item_type,stock_disabled,unit,shelf_life,labels,production_licence,l,w,h,is_series_number,other_price_1,other_price_2,other_price_3,other_price_4,other_price_5,other_1,other_2,other_3,other_4,other_5,stock_type,sku_codes) VALUES"
},
{
"field": "limit",
"label": "limit",
"type": "string",
"value": "1000"
}
]
}
构建 SQL 插入语句
根据元数据配置中的 main_sql
字段,我们可以构建 SQL 插入语句。此处使用的是 REPLACE INTO
,它将插入新记录或替换已有记录。以下是 SQL 插入语句模板:
REPLACE INTO sku_query (
sku_id,i_id,name,short_name,sale_price,cost_price,
properties_value,c_id,category,enabled,
weight,market_price,brand,supplier_id,supplier_name,
modified,sku_code,supplier_sku_id,supplier_i_id,
vc_name,sku_type,creator,
created,remark,item_type,
stock_disabled,
unit,shelf_life,
labels,
production_licence,l,w,h,is_series_number,
other_price_1,
other_price_2,
other_price_3,
other_price_4,
other_price_5,
other_1,
other_2,
other_3,
other_4,
other_5,
stock_type,
sku_codes
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ? ,?, ? ,?, ? ,?, ? ,?, ? ,?, ? ,?, ? ,?, ? ,?, ? ,?, ? ,?)
数据加载
通过 HTTP POST 请求将转换后的数据发送到 MySQL API 接口:
import requests
url = 'https://api.yourmysqlserver.com/batchexecute'
headers = {'Content-Type': 'application/json'}
data = {
# 根据 request 字段中的配置构造请求体
# 假设我们有一个函数 get_data() 可以获取已清洗的数据
'request': get_data(),
'main_sql': 'REPLACE INTO sku_query (sku_id,i_id,name,... ) VALUES',
'limit': '1000'
}
response = requests.post(url=url, headers=headers,data=json.dumps(data))
if response.status_code ==200:
print("Data successfully loaded into MySQL")
else:
print(f"Failed to load data: {response.text}")
注意事项
- 字段映射:确保所有字段都已正确映射,并且类型一致。
- 批量处理:对于大规模数据,考虑分批次处理以提高效率。
- 错误处理:捕获并处理可能出现的错误,例如网络问题或数据库连接问题。
通过以上步骤,我们可以高效地将源平台的数据经过 ETL 转换后写入目标平台 MySQL,确保数据的一致性和完整性。