ETL流程中的数据转换与MySQL写入技巧

  • 轻易云集成顾问-张妍琪

快麦数据集成到MySQL技术案例分享:查询商品出入库记录至BI刊安

在复杂的企业业务场景中,灵活高效的数据集成是关键。本文将重点分享如何通过轻易云数据集成平台,将快麦系统中的商品出入库记录(API接口:erp.item.stock.in.out.list)可靠准确地集成到MySQL数据库(写入API:batchexecute),进而更新BI刊安-商品出入库记录表,实现实时监控和分析。

数据源和目标系统简介

我们的主要任务是从快麦系统获取商品出入库记录,并批量写入到MySQL,用于后续的数据分析及报表生成。以下是具体的实施步骤:

  1. 数据抓取与API调用: 利用快麦提供的API接口 erp.item.stock.in.out.list, 定时抓取最新的商品出入库数据。在这个过程中,需要处理分页和限流问题,以确保全部数据被完整提取。

  2. 自定义数据转换逻辑: 针对获取到的数据进行必要的格式转换,使其符合MySQL接受的数据结构标准。这一步尤为重要,因为不同系统间可能存在字段类型或命名上的差异。

  3. 批量写入与性能优化: 使用MySQL 的 batchexecute 接口实现高效的大量数据插入操作,避免了单条插入带来的性能瓶颈。同时,为了防止因网络或者服务故障导致部分数据丢失,我们设计了一套异常处理与错误重试机制。

  4. 实时监控与告警设置: 配置集中式的监控和告警功能,以便实时跟踪各个节点的数据流动和状态。一旦出现任何异常情况,如接口响应不及时、写入失败等问题,可以第一时间得到提醒并快速处理。

  5. 确保不漏单策略实施:通过对比历史记录以及增量爬取的方法,确保每次都能够完整捕获所有变更过的信息。另外,在接口调用时采用幂等性检查,防止重复插数引发的不一致性问题。

这套解决方案不仅满足了我们业务上对于海量交易信息及时准确获取并存储需求,还通过一系列优化提升了总体性能及稳定性。在接下来的文章部分,我会详细介绍具体配置步骤、脚本编写及实际操作中的注意事项。 打通钉钉数据接口

调用源系统快麦接口erp.item.stock.in.out.list获取并加工数据

在数据集成的生命周期中,调用源系统接口是至关重要的一步。本文将详细探讨如何通过轻易云数据集成平台调用快麦接口erp.item.stock.in.out.list,并对获取的数据进行加工处理。

接口调用配置

首先,我们需要配置元数据以便正确调用快麦的API接口。以下是具体的元数据配置:

{
  "api": "erp.item.stock.in.out.list",
  "effect": "QUERY",
  "method": "POST",
  "number": "id",
  "id": "id",
  "name": "tid",
  "request": [
    {"field": "pageNo", "label": "页码", "type": "string", "value": "1"},
    {"field": "pageSize", "label": "每页多少条", "type": "string", "value": "200"},
    {"field": "operateTimeBegin", "label": "操作开始时间", "type": "string", 
        "value":"{{LAST_SYNC_TIME|datetime}}"},
    {"field": "operateTimeEnd", "label": "操作结束时间", 
        "type":"string","value":"{{CURRENT_TIME|datetime}}"}
  ],
  "autoFillResponse": true,
  "delay": 5
}

请求参数详解

  1. pageNo: 页码,表示当前请求的页数。默认值为1。
  2. pageSize: 每页多少条记录,默认值为200。
  3. operateTimeBegin: 操作开始时间,使用占位符{{LAST_SYNC_TIME|datetime}}动态填充上次同步时间。
  4. operateTimeEnd: 操作结束时间,使用占位符{{CURRENT_TIME|datetime}}动态填充当前时间。

这些参数确保我们能够分页获取在指定时间范围内的商品出入库记录。

数据请求与清洗

在配置好元数据后,通过轻易云平台发起POST请求来获取数据。由于平台支持全异步操作,我们可以在后台监控请求状态和数据流动情况。

import requests
import json
from datetime import datetime

# 设置请求URL和头信息
url = 'https://api.kuaimai.com/erp.item.stock.in.out.list'
headers = {'Content-Type': 'application/json'}

# 动态生成请求体
payload = {
    'pageNo': '1',
    'pageSize': '200',
    'operateTimeBegin': (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d %H:%M:%S'),
    'operateTimeEnd': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
}

# 发起POST请求
response = requests.post(url, headers=headers, data=json.dumps(payload))

# 检查响应状态并处理数据
if response.status_code == 200:
    data = response.json()
    # 数据清洗和转换逻辑
else:
    print(f"Error: {response.status_code}")

数据转换与写入

获取到原始数据后,需要对其进行清洗和转换,以便写入目标系统。在这个过程中,可以利用轻易云平台提供的可视化工具进行字段映射、格式转换等操作。例如,将日期格式统一转换为ISO标准,将数值字段进行单位换算等。

def clean_and_transform(data):
    transformed_data = []
    for record in data['items']:
        transformed_record = {
            'id': record['id'],
            'tid': record['tid'],
            'stock_in_out_time': datetime.strptime(record['operate_time'], '%Y-%m-%d %H:%M:%S').isoformat(),
            # 更多字段转换逻辑...
        }
        transformed_data.append(transformed_record)
    return transformed_data

cleaned_data = clean_and_transform(data)

最终,将清洗和转换后的数据写入目标系统,例如BI刊安中的商品出入库记录表。可以通过轻易云平台的自动化任务调度功能,实现定时同步和更新。

def write_to_target_system(cleaned_data):
    # 假设目标系统提供了一个API接口用于接收数据
    target_url = 'https://bi.example.com/api/stock_records'

    response = requests.post(target_url, headers=headers, data=json.dumps(cleaned_data))

    if response.status_code == 200:
        print("Data successfully written to target system.")
    else:
        print(f"Error writing data: {response.status_code}")

write_to_target_system(cleaned_data)

通过上述步骤,我们实现了从快麦接口获取商品出入库记录,并经过清洗和转换后写入目标系统的完整流程。这不仅提升了数据处理效率,也确保了业务流程的透明度和准确性。 钉钉与CRM系统接口开发配置

轻易云数据集成平台生命周期的第二步:ETL转换与写入MySQL

在数据集成过程中,ETL(Extract, Transform, Load)转换是一个至关重要的步骤。本文将深入探讨如何使用轻易云数据集成平台将源平台的数据转换为目标平台MySQL API接口能够接收的格式,并最终写入目标平台。

数据请求与清洗

首先,我们需要从源平台获取原始数据,并进行必要的清洗和预处理。这一步骤确保了数据的一致性和完整性,为后续的转换和写入奠定基础。假设我们已经完成了这一步骤,现在进入数据转换与写入阶段。

数据转换与写入

在轻易云数据集成平台中,我们可以通过配置元数据来实现复杂的数据转换操作。以下是一个具体的元数据配置示例,用于将快麦平台的商品出入库记录转化为BI刊安系统所需的格式,并写入MySQL数据库。

{
  "api": "batchexecute",
  "effect": "EXECUTE",
  "method": "SQL",
  "idCheck": true,
  "request": [
    {"field": "orderType", "label": "单据类型", "type": "string", "value": "{orderType}"},
    {"field": "orderNumber", "label": "单据编号", "type": "string", "value": "{orderNumber}"},
    {"field": "customType", "label": "自定义出入库类型", "type": "string", "value": "{customType}"},
    {"field": "outerId", "label": "平台商家编码", "type": "string", "value": "{outerId}"},
    {"field": "title", "label": "商品标题", "type": "string", "value": "{title}"},
    {"field": "propertiesAlias", "label": "规格属性别名", "type": "string", "value": "{propertiesAlias}"},
    {"field": "propertiesName", ",label":"规格真正的属性","type":"string","value":"{propertiesName}"},
    {"field":"barcode","label":"条形码","type":"string","value":"{barcode}"},
    {"field":"boxCode","label":"箱码","type":"string","value":"{boxCode}"},
    {"field":"brand","label":"品牌名称","type":"string","value":"{brand}"},
    {"field":"remark","label":"备注","type":"string","value":"{remark}"},
    {"field":"sysItemId","label":"系统主商品ID","type":"string","value":"{sysItemId}"},
    {"field":"sysSkuId","label":"系统商品规格ID","type":"string","value":"{sysSkuId}"},
    {"field":"warehouseId","label":"仓库ID","type":"string","value":"{warehouseId}"},
    {"field":"warehouseName","label":"仓库名称","type":"string","value":"{warehouseName}"},
    {"field":"code","label":"仓库编码","type":"string","value":"{code}"},
    {"field":"goodsAllocation","label":"货位编码","type":"string","value":" {goodsAllocation }"},
    {" field ":" beforeStockNum "," label ":" 操作前总库存 "," type ":" string "," value ":" {beforeStockNum }"},
    {" field ":" afterStockNum "," label ":" 操作后总库存 "," type ":" string "," value ":" {afterStockNum }"},
    {" field ":" stockChange "," label ":" 数量变化 "," type ":" string "," value ":" {stockChange }"},
    {" field ":" content "," label ":" 操作详情 "," type ":" string "," value ":" {content }"},
    {" field ":" operateTime "," label ":" 操作时间 "," type ":" string "," value ":" {operateTime }"},
    {" field ": id," label ": record id," type ": string," value ": {id }},
    {" field ": inOutWarehouseType," label ": in-out warehouse type," type ": string," value ": {inOutWarehouseType }},
    {" field ": type," label ": document type," type ": string," value ": {type }},
    {" field ": itemType," label ": 1 good product operation, 0 defective product operation, "" type ": string," value: "{itemType}" },
    {" field ": singleItem," label: "" whether to query a single product, "" type: "" string, "" value: "" {singleItem}" },
    {" field: stockChangeMultiple," label: "" location corresponding quantity, "" type: "" string, "" value: "" {stockChangeMultiple}" },
    {" field: batchNo," label: "" batch number, "" type: "" string, "" value: "" {batchNo}" },
    {" field: productTime," label: production date," type:" string," value:" {productTime}" }
  ],
  otherRequest:[
   {
      “字段”: “main_sql”,
      “标签”: “主语句”,
      “类型”: “字符串”,
      “描述”: “111”,
      “值”: “替换到stock_in_out_list(orderType、orderNumber、customType、outerId、title、propertiesAlias、propertiesName、barcode、boxCode、brand、remark、sysItemId、sysSkuId、warehouseId、warehouseName、code、goodsAllocation、beforeStockNum) ,afterStockNum,stockChange,content,operateTime,id,inOutWarehouseType,type,itemType,singleItem,stockChangeMultiple,batchNo,productTime) VALUES"
   },
   {
      “字段”:“限制”,
      “标签”:“限制”,
      “类型”:“字符串”,
      “描述”:“111”,
      “值”:“1000”
   }
 ]
}

配置解析

  1. API调用方式batchexecute表示批量执行SQL语句。
  2. 方法SQL表示使用SQL语句进行操作。
  3. 请求字段:每个字段都包含field(字段名)、label(标签)、type(类型)和value(值)。这些字段对应于源数据中的各个属性。
  4. 主语句:通过配置main_sql字段,我们定义了要执行的主要SQL语句。这里使用了REPLACE INTO语法,以确保如果记录已存在则更新,否则插入新记录。
  5. 限制:通过配置limit字段,我们可以控制每次操作的数据量,这里设置为1000条记录。

实际操作步骤

  1. 提取源数据:从快麦平台提取商品出入库记录。
  2. 清洗和预处理:对提取的数据进行必要的清洗和预处理,如去除重复项和修正格式错误。
  3. 映射字段:根据元数据配置,将源数据中的字段映射到目标数据库中的相应字段。
  4. 生成SQL语句:根据映射后的数据生成REPLACE INTO SQL语句。
  5. 执行SQL语句:通过API调用,将生成的SQL语句发送到MySQL数据库执行。

技术细节

  • 异步处理:轻易云数据集成平台支持全异步处理,这意味着在执行上述操作时,可以同时处理多个任务,提高效率。
  • 实时监控:通过实时监控功能,可以随时查看数据流动和处理状态,确保每一步都准确无误。
  • 错误处理机制:在执行过程中,如果出现任何错误,可以通过日志和监控界面快速定位并解决问题。

通过以上步骤,我们成功地将快麦平台的商品出入库记录转换为BI刊安系统所需的格式,并写入MySQL数据库。这一过程不仅提高了数据处理效率,还确保了数据的一致性和完整性。 用友与SCM系统接口开发配置

更多系统对接方案