数据转换与写入:如何通过ETL将库存数据写入MySQL

  • 轻易云集成顾问-黄宏棵
### 吉客云历史库存报表的MySQL数据集成技术分享 在本次案例中,我们将聚焦于如何高效地将吉客云中的历史库存报表数据集成到MySQL数据库中。为了实现这一目标,采用了轻易云数据集成平台,通过丰富的功能支持和灵活的配置,让整个过程变得简单且高效。 #### 数据获取与处理概述 首先,对于实际的数据采集,我们利用吉客云提供的API接口 `birc.report.historyStock` 进行定时可靠的数据抓取。此步骤至关重要,因为它直接关系到业务数据是否能及时、准确地进入后续处理流程。在调用该接口时,我们须处理分页和限流问题,以确保大量数据能够被稳定、连续地获取。 为了解决这些问题,轻易云平台提供了强大的自定义数据转换逻辑以及高吞吐量的数据写入能力,使我们可以批量并快速地将获得的数据写入到MySQL。这不仅提升了效率,还减少了系统压力。此外,该平台还具备实时监控与日志记录功能,可以随时跟踪任务状态,确保每一条记录都不漏单。 #### 数据流设计与管理 使用轻易云的数据流设计工具,我们构建了一套直观且可视化的数据传输流程。从吉客云接口抓取到MySQL存储,所有环节都一目了然,并具有高度的可操作性。同时,通过统一的控制台视图和API资产管理功能,实现对整个流程的一站式监控与优化,包括资源分配和性能调整等细节。 #### MySQL对接注意事项 在具体执行阶段,需要特别留意的是对接过程中可能出现的数据格式差异及其映射。例如,从JSON格式转换为关系型数据库所需的结构化形式。此外,为保证系统稳健性,在进行批量写入操作时还设置了异常处理及错误重试机制,这样一旦发生错误,也能够自动补救而不中断整体进程。 通过这种方式,不仅实现了高效、可靠的大规模数据写入,同时也让企业能实时掌握库存动态,大大提高运营效率。 ![打通企业微信数据接口](https://pic.qeasy.cloud/D15.png~tplv-syqr462i7n-qeasy.image) ### 调用吉客云接口birc.report.historyStock获取并加工数据 在轻易云数据集成平台的生命周期中,调用源系统接口是至关重要的第一步。本文将深入探讨如何通过调用吉客云的`birc.report.historyStock`接口来获取历史库存报表数据,并对其进行初步加工。 #### 接口调用配置 首先,我们需要根据元数据配置来设置API请求参数。以下是关键的配置细节: - **API名称**: `birc.report.historyStock` - **请求方法**: `POST` - **请求参数**: - `warehouseId`: 查询仓库ID,多个仓库使用英文逗号分隔。 - `endDate`: 查询时间点,默认值为当前日期减去一天。 - `blockUp`: 是否显示停用货品,0表示不显示(过滤停用货品),其他值则显示所有实际产生出入库的货品。 - `pageIndex`: 分页页码。 - `pageSize`: 分页大小,默认值为100。 根据元数据配置,我们可以构建如下的请求体: ```json { "warehouseId": "1389261982928078848", "endDate": "_function DATE_SUB(CURDATE(), INTERVAL 1 DAY)", "blockUp": "0", "pageIndex": "1", "pageSize": "100" } ``` #### 数据请求与清洗 在发送请求后,我们会收到吉客云返回的历史库存报表数据。为了确保数据质量和一致性,需要对返回的数据进行清洗和初步加工。这包括但不限于以下几个步骤: 1. **字段验证**: 确保每个字段都符合预期的数据类型和格式。例如,`warehouseId`应为字符串类型且不为空。 2. **缺失值处理**: 对于可能存在的缺失值进行处理,可以选择填充默认值或删除相关记录。 3. **数据转换**: 将日期格式统一转换为标准格式,以便后续处理和分析。 示例代码如下: ```python import requests import json from datetime import datetime, timedelta # 构建请求体 request_body = { "warehouseId": "1389261982928078848", "endDate": (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d'), "blockUp": "0", "pageIndex": "1", "pageSize": "100" } # 发起POST请求 response = requests.post("https://api.jikexyun.com/birc.report.historyStock", data=json.dumps(request_body)) # 检查响应状态码 if response.status_code == 200: data = response.json() # 数据清洗与转换 cleaned_data = [] for record in data.get('records', []): if 'warehouseId' in record and record['warehouseId']: record['endDate'] = datetime.strptime(record['endDate'], '%Y-%m-%d').strftime('%Y-%m-%d') cleaned_data.append(record) else: print(f"Error: {response.status_code}") # 输出清洗后的数据 print(cleaned_data) ``` #### 数据转换与写入 在完成数据清洗后,需要将其转换为目标系统所需的格式,并写入到目标数据库或系统中。这一步通常涉及以下操作: 1. **字段映射**: 将源系统字段映射到目标系统字段。例如,将`goodsNo`映射到目标系统中的商品编号字段。 2. **数据聚合**: 根据业务需求,对数据进行聚合计算,如求和、平均等。 3. **批量写入**: 将处理后的数据批量写入目标数据库,以提高写入效率。 示例代码如下: ```python import pandas as pd # 假设cleaned_data已经包含了清洗后的记录列表 df = pd.DataFrame(cleaned_data) # 字段映射与转换 df.rename(columns={ 'goodsNo': 'product_code', 'warehouseName': 'storage_name' }, inplace=True) # 数据聚合示例:按仓库ID和商品编号汇总库存数量 aggregated_data = df.groupby(['warehouseId', 'product_code']).agg({'quantity': 'sum'}).reset_index() # 批量写入目标数据库(假设使用SQLAlchemy) from sqlalchemy import create_engine engine = create_engine('mysql+pymysql://user:password@host/dbname') aggregated_data.to_sql('target_table', con=engine, if_exists='append', index=False) ``` 通过上述步骤,我们实现了从吉客云接口获取历史库存报表数据,并对其进行了清洗、转换和写入。这个过程展示了轻易云数据集成平台在处理异构系统间的数据集成时的高效性和灵活性。 ![金蝶与MES系统接口开发配置](https://pic.qeasy.cloud/S1.png~tplv-syqr462i7n-qeasy.image) ### 数据转换与写入MySQLAPI接口的技术实现 在数据集成生命周期的第二步中,已经集成的源平台数据需要通过ETL(Extract, Transform, Load)过程转换为目标平台MySQLAPI接口能够接收的格式,并最终写入目标平台。以下将详细探讨如何利用元数据配置实现这一过程。 #### API接口配置 根据提供的元数据配置,我们需要构建一个POST请求,调用`execute` API来执行数据插入操作。该API请求的主要参数包括: - `main_params`:包含具体的数据字段和对应值。 - `main_sql`:包含要执行的SQL语句。 以下是元数据配置中的关键字段及其含义: ```json { "api": "execute", "effect": "EXECUTE", "method": "POST", "idCheck": true, "request": [ { "field": "main_params", "label": "主参数", "type": "object", "children": [ {"field": "end_date", "label": "查询时间点", "type": "date", "value": "_function DATE_SUB(CURDATE(), INTERVAL 1 DAY)"}, {"field": "warehouse_id", "label": "仓库id", "type": "string", "value": "{warehouseId}"}, {"field": "warehouse_name", "label": "仓库名称", "type": "string", "value": "{warehouseName}"}, {"field": "goods_id", "label": "货品id", "type": "string", "value": "{goodsId}"}, {"field": "goods_code", "label": "货品编号", "type": "string", "value": "{goodsNo}"}, {"field": "goods_name", "label": 货品名称, type: string, value: {goodsName}}, {"field":"sku_id","label":"规格id","type":"string","value":"{{goodsExtendMap.skuId}}"}, {"field":"sku_name","label":"规格名称","type":"string","value":"{skuPropertiesName}"}, {"field":"sku_barcode","label":"条码","type":"string","value":"{skuBarcode}"}, {"field":"unit_name","label":"计量单位","type":"string","value":"{baseUnitName}"}, {"field":"quantity","label":"库存数量","type":"int","value":"{quantity}"}, {"field":"is_certified","label":"是否正品。1:正品,0:残次品","type":"int","value":"{isCertified}"}, {"field":"category_name","label":"分类","type":"string","value":"{cateName}"}, {"field":"second_category","label":"二级分类","type":"string","value":"{{goodsExtendMap.goodsField5}}"}, {"field":"third_category","label":"三级分类","type":"string","value":"{{goodsExtendMap.goodsField6}}"} ] } ], ... } ``` #### 数据转换 在进行数据转换时,需要确保源平台的数据字段与目标平台MySQL数据库表中的字段一一对应。元数据配置中的`main_params`部分定义了这些字段及其类型。例如: - `end_date`字段表示查询时间点,类型为日期。 - `warehouse_id`字段表示仓库ID,类型为字符串。 - `quantity`字段表示库存数量,类型为整数。 在实际操作中,可以使用轻易云的数据处理功能,将这些字段从源平台的数据结构中提取出来,并按照目标平台要求的格式进行转换。 #### SQL语句构建 元数据配置中的`main_sql`部分定义了要执行的SQL插入语句: ```sql INSERT INTO `lehua`.`history_stock` (`end_date`, `warehouse_id`, `warehouse_name`, `goods_id`, `goods_code`, `goods_name`, `sku_id`, `sku_name`, `sku_barcode`, `unit_name`, `quantity`, `is_certified`, `category_name`, `second_category`, `third_category`) VALUES (<{end_date: }>, <{warehouse_id: }>, <{warehouse_name: }>, <{goods_id: }>, <{goods_code: }>, <{goods_name: }>, <{sku_id: }>, <{sku_name: }>, <{sku_barcode: }>, <{unit_name: }>, <{quantity: }>, <{is_certified: }>, <{category_name: }>, <{second_category: }>, <{third_category: }>); ``` 每个占位符(例如 `<{end_date: }>`) 将被对应的数据值替换。这些值来自于前面定义的`main_params`。 #### 数据写入 通过POST请求,将构建好的SQL语句和参数发送到MySQLAPI接口: ```json { method: 'POST', url: '/api/execute', data: { main_params: { end_date: '2023-09-30', warehouse_id: 'WH001', warehouse_name: 'Main Warehouse', goods_id: 'G001', goods_code: 'GC001', goods_name: 'Product A', sku_id: 'SKU001', sku_name: 'Size L', sku_barcode: '1234567890123', unit_name: 'Piece', quantity: 100, is_certified: 1, category_name:'Electronics', second_category:'Mobile Phones', third_category:'Smartphones' }, main_sql:"INSERT INTO lehua.history_stock (`end_date`,`warehouse_id`,`warehouse_name`,`goods_id`,`goods_code`,`goods_name`,`sku_id`,`sku_name`,`sku_barcode`,`unit_name`,`quantity`,`is_certified`,`category_name`,`second_category`,`third_category`) VALUES (<2023-09-30>,'WH001','Main Warehouse','G001','GC001','Product A','SKU001','Size L','1234567890123','Piece',100,1,'Electronics','Mobile Phones','Smartphones');" } } ``` 以上内容展示了如何通过ETL过程将源平台的数据转换并写入目标平台MySQL数据库。通过详细配置API接口参数和构建SQL语句,实现了不同系统间的数据无缝对接。 ![轻易云数据集成平台金蝶集成接口配置](https://pic.qeasy.cloud/T5.png~tplv-syqr462i7n-qeasy.image)