实现自动化ETL:从聚水潭到BI智选的售后单数据同步

  • 轻易云集成顾问-彭亮

聚水潭-售后单-->BI智选-售后表:轻易云数据集成的技术实现

在现代企业的数据管理中,确保高效、准确地对接各系统之间的数据是至关重要的任务。本文将聚焦于一个具体案例:如何通过轻易云数据集成平台,将聚水潭·奇门的售后单数据高效地集成到MySQL数据库中的BI智选售后表。本方案得名为“聚水潭-售后单-->BI智选-售后表”,具体内容如下。

首先,我们需要调用聚水潭·奇门提供的API接口jushuitan.refund.list.query来获取待处理的售后单数据。该接口支持批量返回满足条件的退款列表,并可根据业务需求设定查询参数,如时间范围、状态等,以便精确抓取所需数据。

为了确保不漏掉任何一条记录,需要特别注意分页和限流问题。在对接过程中,通过分段抓取和合理设置请求间隔,可以有效避免因超出接口调用限制而导致的数据丢失。此外,为了减少重复读取带来的性能损耗,同时保证每次抓取都是最新数据,我们会利用增量更新机制,即仅提取自上次成功写入后的新增或变更记录。

然后,对从聚水潭·奇门获取到的数据进行必要的数据转换。由于两套系统可能存在字段格式与结构上的差异,通过自定义转换逻辑,可以将源端数据调整为目标端可以接受并正确解析的格式,从而保证最终存储在MySQL数据库中的信息完整且无误。

完成初步清洗与转化之后,采用高吞吐量写入能力,将大批量处理好的曲面镇.别错个股好以合适 t)bzB*DDAF配继续有**版本)。7***** SYSTEM BY** 至此,这样*shift3(4点+140)+WER)ZRt-mysql_BI().最终保障了海量AP@验)(mysql_39允汇voL'])==k{0这是一个极其复杂.txt(-data))©BATCHEXECUTE()}指

![如何开发钉钉API接口](https://pic.qeasy.cloud/D38.png~tplv-syqr462i7n-qeasy.image)
### 调用聚水潭·奇门接口获取并加工数据

在数据集成的生命周期中,调用源系统接口是至关重要的一步。本文将详细探讨如何通过轻易云数据集成平台调用聚水潭·奇门接口`jushuitan.refund.list.query`,并对获取的数据进行初步加工处理。

#### 接口调用配置

首先,我们需要配置元数据以便正确调用`jushuitan.refund.list.query`接口。以下是元数据配置的详细说明:

```json
{
  "api": "jushuitan.refund.list.query",
  "effect": "QUERY",
  "method": "POST",
  "number": "as_id",
  "id": "as_id",
  "name": "as_id",
  "request": [
    {"field": "page_index", "label": "页码", "type": "int", "describe": "页码", "value": "1"},
    {"field": "page_size", "label": "页数", "type": "int", "describe": "页数", "value": "50"},
    {"field": "start_time", "label": "修改起始时间", "type": "datetime", "describe": "开始时间", 
        "value":"{{LAST_SYNC_TIME|datetime}}"},
    {"field": "end_time",   "label":"修改结束时间","type":"datetime","describe":"结束时间","value":"{{CURRENT_TIME|datetime}}"},
    {"field":"so_ids","label":"线上单号列表","type":"string","describe":"线上单号列表"},
    {"field":"date_type","label":"时间类型","type":"string","describe":"时间类型"},
    {"field":"status","label":"售后单状态","type":"string","describe":"售后单状态"},
    {"field":"good_status","label":"货物状态","type":"string","describe":
        "BUYER_NOT_RECEIVED:买家未收到货,BUYER_RECEIVED:买家已收到货,BUYER_RETURNED_GOODS:买家已退货,SELLER_RECEIVED:卖家已收到退货"},
    {"field":"type","label":"售后类型","type":"string","describe":
        "普通退货,其它,拒收退货,仅退款,投诉,补发,换货,维修"}
  ],
  ...
}

请求参数详解

  • page_indexpage_size:用于分页请求,默认值分别为1和50。
  • start_timeend_time:用于指定查询的时间范围,这两个参数使用动态变量分别代表上次同步时间和当前时间。
  • so_idsdate_typestatusgood_statustype:这些字段用于进一步过滤查询结果,例如根据售后单状态、货物状态等进行筛选。

数据请求与清洗

在完成元数据配置后,我们通过轻易云平台发起POST请求,从聚水潭·奇门系统获取售后单列表。以下是一个示例请求:

{
  "page_index": 1,
  "page_size": 50,
  ...
}

请求返回的数据通常包含多个字段,为了便于后续处理,我们需要对这些数据进行清洗和转换。例如,可以通过以下步骤进行初步清洗:

  1. 过滤无效数据:移除空值或不符合业务逻辑的数据。
  2. 字段映射:将原始字段名映射为目标系统所需的字段名。
  3. 格式转换:例如,将日期字符串转换为标准日期格式。

数据转换与写入

经过清洗后的数据需要进一步转换,以适应目标系统(如BI智选)的需求。可以使用轻易云平台提供的可视化工具进行字段映射和格式转换。例如,将原始JSON结构中的某些嵌套字段提取出来,并重组为新的结构。

最后,将处理好的数据写入目标系统。在此过程中,可以设置定时任务(如每天凌晨1点执行),确保数据同步的及时性和准确性。

{
  ...
  // 定时任务配置
  {
    crontab: '2 1 * * *',
    takeOverRequest: [
      {
        field: 'start_time',
        value: '{{DAYS_AGO_1|datetime}}',
        type: 'datetime',
        label: '接管字段',
        formModel: { enable: false },
        tableModel: { enable: false },
        physicalModel: { enable: false }
      }
    ]
  }
}

通过上述步骤,我们实现了从聚水潭·奇门系统到BI智选系统的数据集成。这不仅提高了数据处理的效率,还确保了数据的一致性和准确性。 钉钉与ERP系统接口开发配置

数据集成生命周期中的ETL转换与写入

在数据集成的生命周期中,将源平台的数据转换为目标平台所能接收的格式并写入,是一个关键步骤。本文将深入探讨如何使用轻易云数据集成平台,将聚水潭售后单数据转换并写入到BI智选的MySQL数据库中。

元数据配置解析

首先,我们需要理解元数据配置,这些配置定义了如何将源数据字段映射到目标数据库字段。以下是元数据配置的详细解析:

{
  "api": "batchexecute",
  "effect": "EXECUTE",
  "method": "SQL",
  "number": "id",
  "id": "id",
  "name": "id",
  "idCheck": true,
  "request": [
    {"field":"id","label":"主键","type":"string","value":"{as_id}-{items_asi_id}"},
    {"field":"as_id","label":"售后单号","type":"string","value":"{as_id}"},
    {"field":"as_date","label":"申请时间","type":"string","value":"{as_date}"},
    {"field":"outer_as_id","label":"外部售后单号","type":"string","value":"{outer_as_id}"},
    {"field":"so_id","label":"原始线上单号","type":"string","value":"{so_id}"},
    {"field":"type","label":"售后类型","type":"string","describe":"普通退货,其它,拒收退货,仅退款,投诉,补发,换货","value":"{type}"},
    {"field":"modified","label":"最后更新时间","type":"string","value":"{modified}"},
    {"field":"status","label":"状态","type":"string","describe":"待确认:WaitConfirm;已确认:Confirmed;已取消:Cancelled;","value":"{status}"},
    {"field":"remark","label":"备注","type":"string","value":"{remark}"},
    {"field":"question_type","label":"问题类型","type":"string","value":"{question_type}"},
    {"field":"warehouse","label":"仓库","type":"string","value":"{warehouse}"},
    {"field":...}
  ],
  "otherRequest": [
    {
      "field": "main_sql",
      "label": "主语句",
      "type": "string",
      "describe": "SQL首次执行的语句,将会返回:lastInsertId",
      "value": "REPLACE INTO refund_list_query(id, as_id, as_date, outer_as_id, so_id, type, modified, status, remark, question_type, warehouse, refund, payment, good_status, shop_buyer_id, shop_id, logistics_company, l_id, o_id, order_status, drp_co_id_to, wh_id, drp_co_id_from, node, wms_co_id, shop_status,..."
    },
    {
      "field": "limit",
      "label": "limit",
      "type": "string",
      "value": "1000"
    }
  ]
}

数据请求与清洗

在数据请求阶段,我们从聚水潭系统获取售后单数据。该阶段主要涉及API调用和初步的数据清洗,以确保数据的完整性和准确性。

数据转换与写入

接下来是关键的ETL(Extract-Transform-Load)过程:

  1. 提取(Extract):从聚水潭系统提取原始售后单数据。
  2. 转换(Transform):根据元数据配置,将提取的数据进行格式化和转换。例如:
    • as_iditems_asi_id组合生成主键id
    • 将日期字段格式化为目标平台所需的标准格式。
    • 根据描述信息,将状态码转换为相应的文本描述。
  3. 加载(Load):将转换后的数据通过SQL语句写入到BI智选的MySQL数据库中。

以下是具体实现步骤:

提取与初步清洗
import requests
import json

# 从聚水潭系统提取售后单数据
response = requests.get('https://api.jushuitan.com/after_sales')
data = response.json()

# 初步清洗
cleaned_data = []
for record in data:
    cleaned_record = {
        'id': f"{record['as_id']}-{record['items_asi_id']}",
        'as_id': record['as_id'],
        'as_date': record['as_date'],
        'outer_as_id': record['outer_as_id'],
        'so_id': record['so_id'],
        # ...其他字段
    }
    cleaned_data.append(cleaned_record)
数据转换与写入
import mysql.connector

# MySQL数据库连接配置
config = {
  'user': 'username',
  'password': 'password',
  'host': '127.0.0.1',
  'database': 'bi_database'
}

# 建立数据库连接
cnx = mysql.connector.connect(**config)
cursor = cnx.cursor()

# SQL插入语句模板
insert_stmt = (
   """REPLACE INTO refund_list_query (id, as_id, as_date, outer_as_id,
   so_id, type, modified,status,...)
   VALUES (%s,%s,%s,%s,%s,%s,%s,%s,...)
   """
)

# 执行插入操作
for record in cleaned_data:
   cursor.execute(insert_stmt,
                  (record['id'], record['as_id'], record['as_date'], 
                   record['outer_as_id'], record['so_id'], 
                   # ...其他字段值
                  ))

# 提交事务
cnx.commit()

# 关闭连接
cursor.close()
cnx.close()

总结

通过上述步骤,我们成功地将聚水潭售后单的数据提取、转换并写入到BI智选的MySQL数据库中。这一过程不仅确保了数据的一致性和准确性,还提升了业务处理效率。使用轻易云数据集成平台,可以简化这一复杂过程,使得不同系统间的数据无缝对接成为可能。 打通用友BIP数据接口