数据集成与ETL转换:实现商品信息写入MySQL的实践

  • 轻易云集成顾问-蔡威
### 聚水潭数据集成到MySQL技术案例分享 在系统对接过程中,如何确保大量数据快速、准确地从聚水潭集成到MySQL数据库,是企业信息化过程中的一个关键挑战。本文将详细探讨以"聚水谭-商品信息单-->BI事在人为-商品信息表(只新增)"为例的具体方案实现。 #### 聚水潭数据获取与处理 为了高效抓取并写入聚水潭的数据,我们需要首先调用聚水潭提供的`/open/sku/query` API接口,通过请求参数配置,实现定时可靠的数据采集。以下是主要步骤: 1. **API调用设置**: - **分页与限流处理**:由于单次查询可能会有较多记录,我们使用分页机制,每次查询一定数量的数据,并合理设置限流,以避免触发API限制。 - **错误重试机制**:在网络或服务器异常情况下,采取自动重试策略,确保每个请求都能成功返回。 ```python def fetch_data(page_num, page_size): response = requests.get(f"https://api.jushuitan.com/open/sku/query?page={page_num}&size={page_size}") if response.status_code == 200: return response.json() else: # 错误处理逻辑 retry_fetch_data(page_num, page_size) data_list = [] for i in range(total_pages): data_list.extend(fetch_data(i, default_page_size)) ``` #### MySQL数据写入与转换 为了将上述获取到的商品数据信息快速且有效地写入MySQL数据库,需要利用批量插入功能来提升性能,同时要注重格式转换和映射。 2. **自定义数据转换逻辑**: - 将从聚水潭获得的数据进行清洗和结构调整,使其适应MySQL数据库表结构要求。 - 实现字段对应关系映射以及必要的数据类型转换,以保证数据一致性。 ```python def transform_and_insert(data_batch): transformed_data = [] for item in data_batch: transformed_record = { "product_id": item["productId"], "sku_name": item["skuName"], "price": float(item["price"]), # 更多字段映射... } transformed_data.append(transformed_record) insert_into_mysql(transformed_data) def insert_into_mysql(data_list): connection = mysql.connector.connect(**db_config) cursor = connection.cursor() insert_query = """INSERT INTO product_info(product_id, sku_name, price) VALUES (%s, %s, %s)""" cursor.executemany(insert_query, data_list) connection.commit() ``` #### 数据质量监控与告警 3. **实时监控及日志管理**: ![如何开发用友BIP接口](https://pic.qeasy.cloud/D34.png~tplv-syqr462i7n-qeasy.image) ### 调用聚水潭接口获取并加工数据的技术实现 在数据集成过程中,调用源系统的API接口是至关重要的一步。本文将详细探讨如何通过轻易云数据集成平台调用聚水潭的`/open/sku/query`接口获取商品信息,并对数据进行初步加工。 #### 接口调用配置 首先,我们需要配置元数据以便正确调用聚水潭的`/open/sku/query`接口。以下是该接口的元数据配置: ```json { "api": "/open/sku/query", "effect": "QUERY", "method": "POST", "number": "i_id", "id": "sku_id", "name": "name", "request": [ { "field": "page_index", "label": "开始页", "type": "string", "describe": "第几页,从第一页开始,默认1", "value": "1" }, { "field": "page_size", "label": "页行数", "type": "string", "describe": "每页多少条,默认30,最大50", "value": "50" }, { "field": "modified_begin", "label": "修改开始时间", "type": "string", "describe": { {"修改起始时间,和结束时间必须同时存在,时间间隔不能超过七天,与商品编码不能同时为空"}, {"value":"{{LAST_SYNC_TIME|datetime}}"} } }, { { {"field":"modified_end"}, {"label":"修改结束时间"}, {"type":"string"}, {"describe":"修改结束时间,和起始时间必须同时存在,时间间隔不能超过七天,与商品编码不能同时为空"}, {"value":"{{CURRENT_TIME|datetime}}"} } }, { { {"field":"sku_ids"}, {"label":"商品编码"}, {"type":"string"}, {"describe":"商品编码,与修改时间不能同时为空,最多20"} } } ], {{"delay",5}} } ``` #### 请求参数解析 - `page_index`:表示请求的页码,从第一页开始。 - `page_size`:每页返回的数据条数,默认30条,最大50条。 - `modified_begin`和`modified_end`:用于指定查询的时间范围,这两个参数必须同时存在且间隔不超过七天。 - `sku_ids`:指定要查询的商品编码,与时间范围参数不能同时为空。 #### 数据请求与清洗 在实际操作中,我们需要通过POST请求来获取商品信息。以下是一个示例请求: ```json { 'page_index': '1', 'page_size': '50', 'modified_begin': '2023-09-01T00:00:00Z', 'modified_end': '2023-09-07T23:59:59Z' } ``` 在接收到响应后,需要对数据进行清洗和转换。例如,将日期格式标准化、去除无效字段等。以下是一个简单的数据清洗示例: ```python import json from datetime import datetime def clean_data(raw_data): cleaned_data = [] for item in raw_data: cleaned_item = {} cleaned_item['sku_id'] = item['sku_id'] cleaned_item['name'] = item['name'] cleaned_item['last_modified'] = datetime.strptime(item['last_modified'], '%Y-%m-%dT%H:%M:%SZ') # 添加更多清洗逻辑 cleaned_data.append(cleaned_item) return cleaned_data # 假设 raw_response 是从 API 获取到的原始数据 raw_response = '''[ {"sku_id":123, {"name":"Product A"}, {"last_modified":"2023-09-01T12:34:56Z"} } ]''' raw_data = json.loads(raw_response) cleaned_data = clean_data(raw_data) print(cleaned_data) ``` #### 数据转换与写入 在完成数据清洗后,需要将其转换为目标系统所需的格式,并写入到目标数据库中。这一步通常涉及到字段映射、类型转换等操作。 ```python def transform_and_write(cleaned_data): transformed_data = [] for item in cleaned_data: transformed_item = {} transformed_item['商品ID'] = item['sku_id'] transformed_item['商品名称'] = item['name'] # 添加更多转换逻辑 transformed_data.append(transformed_item) # 将转换后的数据写入目标数据库 write_to_database(transformed_data) def write_to_database(data): # 假设我们使用某个数据库连接库来写入数据 db_connection.insert_many('target_table', data) transform_and_write(cleaned_data) ``` 通过上述步骤,我们可以高效地从聚水潭系统获取并处理商品信息,为后续的数据分析和业务决策提供可靠的数据支持。 ![钉钉与CRM系统接口开发配置](https://pic.qeasy.cloud/S21.png~tplv-syqr462i7n-qeasy.image) ### 数据集成与ETL转换:实现商品信息写入MySQL 在数据集成的生命周期中,第二步是将已经集成的源平台数据进行ETL(提取、转换、加载)转换,并最终写入目标平台。本案例将详细探讨如何将聚水谭的商品信息通过ETL转换为MySQL API接口所能接收的格式,并写入目标平台。 #### 数据请求与清洗 在数据请求阶段,我们从聚水谭获取商品信息单的数据。为了确保数据的完整性和一致性,需要对原始数据进行清洗和预处理。以下是我们需要处理的字段: ```json [ {"field":"sku_id","label":"商品编码","type":"string","value":"{sku_id}"}, {"field":"i_id","label":"款式编码","type":"string","value":"{i_id}"}, {"field":"name","label":"商品名称","type":"string","value":"{name}"}, {"field":"short_name","label":"商品简称","type":"string","value":"{short_name}"}, {"field":"sale_price","label":"销售价","type":"string","value":"{sale_price}"}, {"field":"cost_price","label":"成本价","type":"string","value":"{cost_price}"}, {"field":"properties_value","label":"颜色规格","type":"string","value":"{properties_value}"}, {"field":"c_id","label":"类目id","type":"string","value":"{c_id}"}, {"field":"category","label":"分类","type":"string","value":"{category}"}, {"field":"pic_big","label":"大图地址","type":"string","value":"{pic_big}"}, {"field": "pic", "label": "图片地址", "type": "string", "value": "{pic}"} ] ``` 以上字段是我们从源平台提取的数据,这些字段需要经过清洗和标准化,以确保数据的一致性。 #### 数据转换与写入 在数据转换阶段,我们需要将清洗后的数据转化为目标平台MySQL API接口所能接收的格式。以下是MySQL API接口所需的数据格式: ```json { "api": "batchexecute", "effect": "EXECUTE", "method": "SQL", "idCheck": true, "request": [ {"field": "sku_id", "label": "商品编码", "type": "string", "value": "{sku_id}"}, {"field": "i_id", "label": "款式编码", "type": "string", "value": "{i_id}"}, {"field": "name", "label": "商品名称", "type": "string", "value": "{name}"}, {"field": "short_name", "label": "商品简称", "type": "string", "value": "{short_name}"}, // 省略部分字段 {"field": "insert_time", "label": "", type: string, value: {modified}} ], otherRequest: [ { field: main_sql, label: 主语句, type: string, describe: 111, value: INSERT INTO sku_query (sku_id,i_id,name,short_name,sale_price,cost_price,properties_value,c_id,category,pic_big,pic,enabled,weight,market_price,brand,supplier_id,supplier_name,modified,sku_code,supplier_sku_id,supplier_i_id,vc_name,sku_type,creator,created,remark,item_type,stock_disabled,unit,shelf_life,labels,production_licence,l,w,h,is_series_number,other_price_1,other_price_2,other_price_3,other_price_4,other_price_5,other_1,other_2,other_3,other_4,other_5 stock_type sku_codes autoid batch_enabled insert_time) VALUES }, { field: limit, label: limit, type: string, value: 1000 } ] } ``` 在这里,我们使用了`batchexecute` API来批量执行SQL插入操作。`main_sql`字段定义了插入语句,包含了所有需要插入到目标表中的字段。 #### 配置示例 以下是一个具体的配置示例,用于将聚水谭的商品信息写入到MySQL数据库中的`sku_query`表: ```json { api: batchexecute, effect: EXECUTE, method: SQL, idCheck: true, request: [ { field: sku_id,label:"商品编码" , type:"string" , value:"{sku_id}" }, { field:i_id,label:"款式编码" , type:"string" , value:"{i_id}" }, { field:name,label:"商品名称" , type:"string" , value:"{name}" }, { field:short_name,label:"商品简称" , type:"string" , value:"{short_name}" }, { field:sale_price,label:"销售价" , type:"string" , value:"{sale_price}" }, { field:cost_price,label:"成本价" , type:"string" , value:"{cost_price}" }, { field:c_id,label:"类目id" , type:"string" , value:"{c_id}" }, { field:lable,label:"分类" , type:string,value:{category}}, { field:lable,label:"大图地址",type:string,value:{pic_big}}, //省略部分字段 { field:lable,label:insert_time,type:string,value:{modified}} ], otherRequest:[ { field:main_sql, label:主语句, type: string, describe:111, value:INSERT INTO sku_query (sku_id,i_id,name short_name sale_price cost price properties_value c id category pic big pic enabled weight market price brand supplier id supplier name modified sku code supplier sku id supplier i id vc name sku type creator created remark item_type stock disabled unit shelf life labels production licence l w h is series number other price 1 other price 2 other price 3 other price 4 other price 5 other 1 other 2 other 3 other 4 other 5 stock type sku codes autoid batch enabled insert time) VALUES }, { field:limit, label:limit, type: string, value:1000 } ] } ``` 该配置通过API调用实现批量插入操作,将清洗后的数据按照指定格式写入到MySQL数据库中。 #### 总结 通过上述步骤,我们实现了从聚水谭获取商品信息并经过ETL转换后,将其写入到目标平台MySQL中。这一过程充分利用了轻易云数据集成平台提供的全异步、多异构系统支持能力,实现了不同系统间的数据无缝对接。 ![金蝶与WMS系统接口开发配置](https://pic.qeasy.cloud/T8.png~tplv-syqr462i7n-qeasy.image)