利用轻易云平台进行ETL转换和MySQL数据集成

  • 轻易云集成顾问-曾平安

吉客云·奇门数据集成到MySQL技术案例分享

在实际业务场景中,如何高效、可靠地将吉客云·奇门系统的数据集成到企业内部的MySQL数据库,是一个极具挑战性的任务。本篇文章将以“xsck-1吉客云查询销售出库单-->mysql”的实际方案为例,详细解析这个过程中的关键技术点和实现步骤。

一、API接口调用与数据抓取

为了获取吉客云·奇门的销售出库单数据,我们首先使用其提供的API接口 jackyun.tradenotsensitiveinfos.list.get。该接口支持分页查询,这是确保大批量数据准确、无漏失地被逐步提取的重要机制。在调用过程中,必须处理好API限流问题,以避免请求过多导致服务不可用。

# 示例代码段:调用吉客云·奇门API进行数据抓取
import requests

url = "https://api.jackyun.com/tradenotsensitiveinfos.list.get"
params = {
    "page_no": 1,
    "page_size": 100,
    # 其他必要的参数
}
response = requests.get(url, params=params)
data = response.json()
# 对返回的数据进行处理和存储操作

二、自定义逻辑与格式转换

由于源系统(吉客云·奇门)与目标系统(MySQL)的数据结构存在差异,需要在集成过程中自定义数据转换逻辑。这不仅包括字段映射,还涉及对部分复杂嵌套结构的拆解和平展。由此保证写入到MySQL中的数据能被顺利应用于后续分析或业务运作。

def transform_data(raw_data):
    transformed_data = []
    for item in raw_data:
        transformed_record = {
            'order_id': item['id'],
            'product_sku': item['sku'],
            # 按需添加更多字段映射及转换逻辑
        }
        transformed_data.append(transformed_record)
    return transformed_data

transformed_records = transform_data(data["orders"])

三、大容量数据写入能力及监控机制

轻易平台支持高吞吐量的数据写入能力,使得大量捕获自吉客云·奇门的信息能够迅速吸收到MySQL中,这样显著提升了时效性。同时,通过集中化监控和告警系统,我们可以实时跟踪每个集成任务的状态和性能。如出现异常,可根据日志记录快速定位问题并进行重试或修复操作。

-- 示例代码段:通过批量插入写入 MySQL 数据库,提高效率 
INSERT INTO sales_orders (order_id, product_sku, ...) VALUES (...), (...), ...;

通过这些 金蝶与SCM系统接口开发配置

调用吉客云·奇门接口获取并加工数据

在数据集成生命周期的第一步,我们需要从源系统吉客云·奇门接口jackyun.tradenotsensitiveinfos.list.get获取销售出库单数据,并进行初步的数据清洗和加工。以下是详细的技术实现过程。

1. 接口调用配置

首先,我们需要配置API调用的元数据。根据提供的元数据配置,接口调用采用POST方法,主要参数如下:

  • api: jackyun.tradenotsensitiveinfos.list.get
  • method: POST
  • pagination: 分页参数,默认每页20条记录
  • request: 请求参数,包括时间范围、销售单号、页码等
  • omissionRemedy: 数据补偿机制,定时任务配置

请求参数示例如下:

{
  "modified_begin": "2023-10-01T00:00:00",
  "modified_end": "2023-10-07T23:59:59",
  "pageSize": 20,
  "pageIndex": 0,
  "hasTotal": 1,
  "startConsignTime": "{{LAST_SYNC_TIME|datetime}}",
  "endConsignTime": "{{CURRENT_TIME|datetime}}",
  "fields": "tradeNo,consignTime,totalFee,tradeCount,discountFee,payment,receivedTotal,warehouseCode,warehouseName,goodsDetail.sellCount,goodsDetail.shareFavourableAfterFee"
}

2. 数据清洗与转换

在获取到原始数据后,需要对其进行清洗和转换。根据元数据配置,我们需要对返回的数据进行字段重命名和格式化处理。

字段重命名与格式化

根据formatResponse配置,需要将以下字段进行转换:

  • consignTime -> datetime_new(日期格式)
  • tradeNo -> order_no_new(字符串格式)

示例代码如下:

def format_response(data):
    formatted_data = []
    for record in data:
        formatted_record = {
            'datetime_new': record['consignTime'],
            'order_no_new': record['tradeNo'],
            # 保留其他字段
            'totalFee': record['totalFee'],
            'tradeCount': record['tradeCount'],
            'discountFee': record['discountFee'],
            'payment': record['payment'],
            'receivedTotal': record['receivedTotal'],
            'warehouseCode': record['warehouseCode'],
            'warehouseName': record['warehouseName'],
            'goodsDetail_sellCount': record['goodsDetail']['sellCount'],
            'goodsDetail_shareFavourableAfterFee': record['goodsDetail']['shareFavourableAfterFee']
        }
        formatted_data.append(formatted_record)
    return formatted_data
数据完整性检查

根据元数据中的idCheck配置,需要确保每条记录包含唯一标识符(如tradeId),并且不为空。

def check_data_integrity(data):
    for record in data:
        if not record.get('tradeId'):
            raise ValueError("Data integrity check failed: tradeId is missing.")

3. 数据补偿机制

为了确保数据完整性和一致性,我们需要设置定时任务来处理遗漏的数据。根据omissionRemedy配置,定时任务的crontab表达式为“30 6,18,1 *”,即每天的6点、18点和次日凌晨1点执行。

from crontab import CronTab

cron = CronTab(user='username')
job = cron.new(command='python /path/to/your_script.py')
job.setall('30 6,18,1 * * *')
cron.write()

在定时任务中,我们可以重新发起请求,指定时间范围为过去两天到当前时间,以确保遗漏的数据被补偿。

{
  "startConsignTime": "{{DAYS_AGO_2|datetime}}",
  "endConsignTime": "{{CURRENT_TIME|datetime}}"
}

总结

通过以上步骤,我们完成了从吉客云·奇门接口获取销售出库单数据的全过程,包括API调用、数据清洗与转换以及数据补偿机制的实现。这一过程确保了数据的准确性和完整性,为后续的数据处理和分析奠定了坚实基础。 数据集成平台可视化配置API接口

使用轻易云数据集成平台进行ETL转换并写入MySQLAPI接口

在数据集成生命周期的第二步中,我们将已经集成的源平台数据进行ETL(Extract, Transform, Load)转换,并最终写入目标平台MySQLAPI接口。本文将详细探讨如何通过轻易云数据集成平台配置元数据,实现这一过程。

数据请求与清洗

首先,确保从源平台(如吉客云)获取的数据已经经过初步清洗和准备。假设我们从吉客云查询销售出库单的数据已经准备好,接下来需要将这些数据转换为目标平台MySQLAPI接口能够接收的格式。

数据转换与写入

使用轻易云数据集成平台,我们可以通过配置元数据来实现数据的ETL过程。以下是具体的元数据配置示例:

{
  "api": "execute",
  "effect": "EXECUTE",
  "method": "POST",
  "idCheck": true,
  "request": [
    {
      "field": "main_params",
      "label": "main_params",
      "type": "object",
      "describe": "111",
      "children": [
        {"field": "subTradeId", "label": "明细id", "type": "string", "value": "{goodsDetail_subTradeId}"},
        {"field": "order_no_new", "label": "单号", "type": "string", "value": "{order_no_new}"},
        {"field": "datetime_new", "label": "时间", "type": "date", "value": "{datetime_new}"},
        {"field": "qty_count", "label": "数量", "type": "string", "value": "{goodsDetail_sellCount}"},
        {"field": "sales_count", 
            "label":"金额",
            "type":"string",
            "describe":"goodsDetail_sellTotal",
            "value":"{{goodsDetail_shareFavourableAfterFee}}"
        },
        {"field":"line_count","label":"行数","type":"string","value":"{line_count}"},
        {"field":"status","label":"状态","type":"string","value":"{qeasystatus}"},
        {"field":"Document_Type","label":"单据类型","type":"string","value":"销售出库"}
      ]
    }
  ],
  'otherRequest':[
    {
      'field':'main_sql',
      'label':'main_sql',
      'type':'string',
      'describe':'111',
      'value':'REPLACE INTO `jky_xsck`(`subTradeId`,`order_no_new`,`datetime_new`,`qty_count`,`sales_count`,`line_count`,`status`,`Document_Type`) VALUES (:subTradeId,:order_no_new,:datetime_new,:qty_count,:sales_count,:line_count,:status,:Document_Type)'
    }
  ]
}

配置解析

  1. API调用:配置中的api字段指定了要调用的API接口,这里为execute,表示执行操作。
  2. HTTP方法method字段指定了HTTP请求方法,这里使用POST方法。
  3. ID检查idCheck字段为true,表示在执行前需要检查ID是否存在。
  4. 请求参数
    • main_params: 包含多个子字段,每个子字段对应源数据中的一个属性,并映射到目标数据库表中的相应字段。
    • subTradeId, order_no_new, datetime_new, qty_count, sales_count, line_count, status, Document_Type: 分别对应销售出库单的明细ID、单号、时间、数量、金额、行数、状态和单据类型。
  5. SQL语句otherRequest中包含一条SQL语句,用于将转换后的数据写入MySQL数据库。这里使用了REPLACE INTO语句,以确保如果记录已存在则更新,否则插入新记录。

实现步骤

  1. 提取(Extract):从吉客云查询销售出库单的数据。
  2. 转换(Transform):根据元数据配置,将提取的数据映射到目标格式。例如,将吉客云中的商品明细ID映射到MySQL表中的subTradeId字段。
  3. 加载(Load):执行配置中的SQL语句,将转换后的数据写入MySQL数据库。

通过上述配置和步骤,我们可以高效地完成从吉客云到MySQL的ETL过程,实现不同系统间的数据无缝对接。这不仅提升了业务透明度和效率,还保证了数据处理过程的全生命周期管理。 企业微信与OA系统接口开发配置