运用轻易云集成平台进行售后数据的ETL处理

  • 轻易云集成顾问-谢楷斌

案例分享:聚水潭-售后单-->BI斯莱蒙-售后表

在这个技术案例中,我们将探讨如何通过轻易云数据集成平台,将聚水潭·奇门的售后数据高效、准确地集成到MySQL数据库中。关键任务是确保高质量的数据处理、一致性、可靠的数据流监控及异常处理。

聚水潭·奇门 API 接口调用与数据抓取

首先,通过接口 jushuitan.refund.list.query 获取聚水潭·奇门的售后订单数据。为应对高并发和大量分页请求,我们实现了合理的限流机制,确保在批量抓取时不会遗漏任何一笔订单。这一步骤至关重要,因为它直接影响到最终的数据完整性。

def fetch_refund_data(page, page_size):
    endpoint = "https://api.jushuitan.com/refund/list/query"
    payload = {
        'page': page,
        'pageSize': page_size,
        'token': '<your_api_token>'
    }
    response = requests.post(endpoint, json=payload)

    if response.status_code == 200:
        return response.json()
    else:
        handle_error(response.status_code, response.text)

data_list = []
for i in range(total_pages):
    data_page = fetch_refund_data(i + 1, 100)
    data_list.extend(data_page['refund_orders'])

数据转换与映射

聚水潭·奇门接口返回的数据格式通常无法直接适配MySQL,这就需要我们自定义转换逻辑。在此过程中,使用轻易云提供的可视化数据流设计工具,可以直观地进行字段映射和转换规则配置。

def transform_data(refund_order):
    return {
        'id': refund_order.get('refund_id'),
        'status': refund_order.get('status'),
        'amount': refund_order.get('refund_amount'),
        # 更多字段映射...
    }

transformed_data_list = [transform_data(order) for order in data_list]

数据批量写入到MySQL

利用轻易云支持的高吞吐量写入能力,将已转换好的数据快速批量写入 MySQL 数据库。在这一环节,我们不仅关注速度,更注重可靠性,通过事务管理及错误重试机制,保证每一次操作都能成功执行且不丢失任何记录。

INSERT INTO sales_returns (id, status, amount)
VALUES (%s, %s, %s);

# 批处理示例
cursor.executemany(insert_sql_command, transformed_data_tuples)
connection.commit()

上述过程中,还会借助集中监控系统实时跟踪任务状态,并根据告警信息及时调整策略。此外,对于可能出现的网络 企业微信与OA系统接口开发配置

调用聚水潭·奇门接口jushuitan.refund.list.query获取并加工数据

在数据集成生命周期的第一步,我们需要从源系统聚水潭·奇门接口jushuitan.refund.list.query中获取售后单数据,并对其进行初步加工。以下是具体的技术实现过程。

接口调用配置

首先,我们需要配置接口调用的元数据。根据提供的元数据配置,接口调用的基本信息如下:

  • API: jushuitan.refund.list.query
  • 请求方法: POST
  • 主要字段:
    • page_index: 页码,类型为int
    • page_size: 页数,类型为int
    • start_time: 修改起始时间,类型为datetime
    • end_time: 修改结束时间,类型为datetime
    • so_ids: 线上单号列表,类型为string
    • date_type: 时间类型,类型为string
    • status: 售后单状态,类型为string
    • good_status: 货物状态,类型为string
    • type: 售后类型,类型为string

请求参数设置

在实际调用过程中,我们需要动态设置请求参数。例如:

{
    "page_index": 1,
    "page_size": 100,
    "start_time": "{{LAST_SYNC_TIME|datetime}}",
    "end_time": "{{CURRENT_TIME|datetime}}",
    "so_ids": "",
    "date_type": "",
    "status": "",
    "good_status": "",
    "type": ""
}

其中,start_timeend_time可以通过模板变量动态替换,以确保每次同步时获取最新的数据。

数据请求与清洗

在完成接口调用后,我们将得到一个包含售后单信息的JSON响应。接下来,需要对这些数据进行清洗和初步加工。假设返回的数据结构如下:

{
    "total_count": 200,
    "items": [
        {
            "as_id": "12345",
            "status": "待处理",
            "good_status": "BUYER_NOT_RECEIVED",
            ...
        },
        ...
    ]
}

我们需要提取并清洗其中的关键字段,例如:

  • 售后单ID (as_id)
  • 售后单状态 (status)
  • 货物状态 (good_status)
  • ...

可以使用以下代码进行数据清洗:

import json

def clean_data(response):
    data = json.loads(response)
    cleaned_items = []

    for item in data['items']:
        cleaned_item = {
            'as_id': item['as_id'],
            'status': item['status'],
            'good_status': item['good_status'],
            # 添加其他需要的字段
        }
        cleaned_items.append(cleaned_item)

    return cleaned_items

数据转换与写入

在完成数据清洗后,需要将其转换为目标系统所需的格式,并写入目标数据库或系统。在此过程中,可以利用轻易云平台提供的数据转换工具,将清洗后的数据映射到目标表结构中。

例如,将清洗后的数据写入BI斯莱蒙售后表:

def write_to_target_system(cleaned_data):
    # 假设使用某个数据库连接库进行写入操作
    db_connection = get_db_connection()

    for item in cleaned_data:
        db_connection.execute(
            """
            INSERT INTO bi_slaimon_refund_table (as_id, status, good_status)
            VALUES (%s, %s, %s)
            """, 
            (item['as_id'], item['status'], item['good_status'])
        )

通过上述步骤,我们实现了从聚水潭·奇门接口获取售后单数据,并将其清洗、转换和写入目标系统。这是轻易云数据集成平台生命周期管理中的关键一步,为后续的数据处理和分析奠定了基础。 钉钉与CRM系统接口开发配置

将聚水潭售后单数据转换并写入BI斯莱蒙售后表

在轻易云数据集成平台中,将聚水潭的售后单数据转换为BI斯莱蒙售后表所需的格式,并最终通过MySQL API接口写入目标平台,是一个典型的数据ETL(Extract, Transform, Load)过程。本文将详细介绍如何利用元数据配置完成这一任务。

数据提取与清洗

首先,我们需要从聚水潭系统中提取售后单数据。假设我们已经完成了数据提取与初步清洗,接下来重点关注如何将这些数据转换为BI斯莱蒙系统所能接受的格式,并通过MySQL API接口写入。

数据转换

根据提供的元数据配置,我们需要将聚水潭的售后单数据映射到BI斯莱蒙售后表对应字段。以下是关键字段的映射关系:

  • id: 由as_iditems_asi_id组合生成,确保唯一性。
  • as_id: 售后单号。
  • as_date: 申请时间。
  • outer_as_id: 外部售后单号。
  • so_id: 原始线上单号。
  • type: 售后类型,如普通退货、拒收退货等。
  • modified: 最后更新时间。
  • status: 状态,如待确认、已确认、已取消等。
  • 其他字段依次映射...

元数据配置中的每个字段都有明确的标签和类型定义,这使得我们在进行数据转换时能够精确地匹配源数据和目标字段。例如:

{
    "field": "id",
    "label": "主键",
    "type": "string",
    "value": "{as_id}-{items_asi_id}"
}

上述配置表示目标表中的id字段由源表中的as_iditems_asi_id组合而成,类型为字符串。

SQL语句生成

为了将转换后的数据写入MySQL数据库,我们需要构建相应的SQL语句。根据元数据配置中的main_sql字段,可以生成如下SQL模板:

REPLACE INTO refund_list_query(
    id, as_id, as_date, outer_as_id, so_id, type, modified, status, remark,
    question_type, warehouse, refund, payment, good_status, shop_buyer_id,
    shop_id, logistics_company, l_id, o_id, order_status, drp_co_id_to,
    wh_id, drp_co_id_from, node, wms_co_id, shop_status, freight,
    labels, refund_version, sns_sku_id, sns_sn, order_type,
    confirm_date, items_outer_oi_id, items_receive_date,
    items_i_id, items_combine_sku_id, items_asi_id,
    items_sku_id, items_qty, items_price,
    items_amount, items_name,
    items_type,
    items_properties_value,
    items_r_qty,
    items_sku_type,
    items_shop_sku_id,
    items_defective_qty,
    items_shop_amount,
    items_remark,
    created,
    ts,
    shop_name,
    order_label,
    free_amount,
    creator_name,
    buyer_receive_refund,
    buyer_apply_refund
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)

每个问号代表一个占位符,对应于上面列出的字段。在实际操作中,这些占位符将被具体的数据值替换。

数据写入

在完成SQL语句生成之后,下一步是通过MySQL API接口执行这些SQL语句,将转换后的数据写入BI斯莱蒙售后表。这里使用的是批量执行API batchexecute,其效果是执行一组SQL语句。

{
  "api": "batchexecute",
  "effect": "EXECUTE",
  "method": "SQL",
  "number": "id",
  "idCheck": true
}

上述配置表示使用批量执行模式,通过检查主键id来确保唯一性,并执行相应的SQL语句。

示例代码

以下是一个简化的示例代码片段,用于展示如何利用上述配置进行ETL操作:

import pymysql

# 假设已经获取并清洗了源数据
source_data = [
   # 示例数据...
]

# MySQL数据库连接配置
db_config = {
   'host': 'your_mysql_host',
   'user': 'your_mysql_user',
   'password': 'your_mysql_password',
   'database': 'your_database'
}

connection = pymysql.connect(**db_config)
cursor = connection.cursor()

# 构建并执行批量插入SQL语句
sql_template = """
REPLACE INTO refund_list_query(id,...other_fields...) VALUES (%s,...other_placeholders...)
"""

for record in source_data:
   cursor.execute(sql_template.format(
       record['id'], ...other_values...
   ))

connection.commit()
cursor.close()
connection.close()

以上代码展示了如何利用Python脚本和pymysql库,将清洗后的源数据通过批量插入方式写入目标MySQL数据库。

通过这种方式,我们可以高效地实现聚水潭售后单到BI斯莱蒙售后表的数据集成,确保每个环节的数据准确性和一致性。 如何对接企业微信API接口