ETL转换在数据集成中的应用:从吉客云到班牛

  • 轻易云集成顾问-彭亮

吉客云·奇门数据集成到班牛:Qeasy1查询吉客云销售单状态回写班牛技术案例

在本次技术案例中,我们将探讨如何实现吉客云·奇门系统与班牛之间的数据集成,具体方案命名为“Qeasy1查询吉客云销售单状态回写班牛”。该集成过程通过轻易云数据集成平台的强大功能进行配置与实施,助力企业高效管理和利用API资产,实现业务流程的顺畅衔接。

我们的主要任务是从吉客云·奇门获取销售单状态,并及时、准确地回写至班牛系统。为了达到这一目标,我们采用了以下核心步骤和关键技术:

  • 调用吉客云接口

    • 我们使用jackyun.tradenotsensitiveinfos.list.get接口来定时抓取订单相关信息。在处理过程中,需要注意分页和限流问题,以保障数据抓取的稳定性和连续性。
  • 数据转化和映射

    • 由于两者系统的数据格式存在差异,我们自定义了一套转换逻辑,将获取到的数据适配至班牛所需格式。这一步骤不仅提高了数据处理效率,还确保了信息的准确传递。
  • 批量写入到班牛

    • 使用task.update API,在批量操作中支持高吞吐量的数据快速写入能力,使得大量订单状态能够迅速同步至班牛,提高整体业务响应速度。
  • 监控与异常处理

    • 数据质量监控及异常检测是此方案中的重要环节。通过实时日志记录以及告警机制,保证每一条信息都能正确传输,即使遇到错误也能够自动重试,从而不漏掉任何一笔订单。

结合以上这些关键点,本方案展示了如何利用轻易云平台实现复杂但可靠的数据对接,为企业构建一个透明、高效且可追溯的业务流程提供坚实基础。接下来我们将深入探究每个步骤中的实现细节及技术要点,以便读者能更好地理解并复用这一解决方案。 用友与外部系统接口集成开发

使用轻易云数据集成平台调用吉客云·奇门接口获取并加工数据

在数据集成的生命周期中,第一步是从源系统获取数据。本文将详细探讨如何使用轻易云数据集成平台调用吉客云·奇门接口jackyun.tradenotsensitiveinfos.list.get来获取销售单状态,并对数据进行初步加工。

接口概述

吉客云·奇门接口jackyun.tradenotsensitiveinfos.list.get主要用于查询销售单的非敏感信息。该接口采用POST请求方式,支持多种查询条件和分页功能,能够返回指定字段的数据列表。

元数据配置解析

根据提供的元数据配置,我们需要设置以下请求参数:

  • modified_beginmodified_end: 修改起始和结束时间,必须同时存在且时间间隔不能超过七天。
  • startModifiedendModified: 最后修改时间的起始和截止。
  • tradeNo: 销售单号,多个用半角逗号分隔。
  • pageSize: 每页记录数,默认50,最大1000。
  • pageIndex: 页码,0为第1页。
  • hasTotal: 默认返回,首次调用时可以传1获取总记录数。
  • startCreatedendCreated: 创建时间的起始和截止。
  • startAuditTimeendAuditTime: 审核时间的起始和截止。
  • startConsignTimeendConsignTime: 发货时间的起始和截止。这里使用了函数 _function from_unixtime(({LAST_SYNC_TIME}-86400),'%Y-%m-%d %H:%i:%s')_function from_unixtime(({CURRENT_TIME}-86400),'%Y-%m-%d %H:%i:%s') 来动态计算时间范围。
  • tradeStatus: 订单状态。
  • tradeType: 订单类型,默认值为92。
  • sourceTradeNos: 网店订单号。
  • fields: 需要返回字段列表,用逗号分隔。

请求示例

以下是一个典型的请求示例:

{
  "api": "jackyun.tradenotsensitiveinfos.list.get",
  "method": "POST",
  "params": {
    "modified_begin": "2023-09-01 00:00:00",
    "modified_end": "2023-09-07 23:59:59",
    "tradeNo": "123456,789012",
    "pageSize": "100",
    "pageIndex": "0",
    "hasTotal": "1",
    "startCreated": "",
    "endCreated": "",
    "startAuditTime": "",
    "endAuditTime": "",
    "startConsignTime": "_function from_unixtime(({LAST_SYNC_TIME}-86400),'%Y-%m-%d %H:%i:%s')",
    "endConsignTime": "_function from_unixtime(({CURRENT_TIME}-86400),'%Y-%m-%d %H:%i:%s')",
    "tradeStatus": "",
    "tradeType": "92",
    "sourceTradeNos": "",
    "fields": ""
  }
}

数据清洗与转换

在获取到原始数据后,需要对其进行清洗与转换,以便后续处理。常见的数据清洗操作包括:

  1. 去重:确保没有重复记录。
  2. 格式化:将日期、金额等字段格式化为统一标准。
  3. 过滤:根据业务需求过滤掉不必要的数据。例如,只保留特定状态或类型的订单。

以下是一个简单的数据清洗示例:

import pandas as pd

# 假设我们已经通过API获取了数据,并存储在data变量中
data = [
  {"tradeNo": "123456", "status": 6000, "amount": 100.5, "date_modified": "2023-09-01"},
  {"tradeNo": "789012", "status": 9090, "amount": 200.75, "date_modified": ""},
]

# 转换为DataFrame
df = pd.DataFrame(data)

# 去重
df.drop_duplicates(subset=['tradeNo'], inplace=True)

# 格式化日期
df['date_modified'] = pd.to_datetime(df['date_modified'], errors='coerce')

# 过滤状态为6000或9090的订单
df = df[df['status'].isin([6000, 9090])]

print(df)

数据写入

经过清洗与转换后的数据,可以写入目标系统。在轻易云平台上,这一步通常通过配置相应的写入接口来实现。具体操作步骤包括:

  1. 配置目标系统的API接口参数。
  2. 映射源数据字段到目标系统字段。
  3. 执行写入操作,并监控写入结果。

通过以上步骤,我们完成了从吉客云·奇门接口获取销售单状态并进行初步加工的全过程。这不仅提高了数据处理效率,也确保了数据的一致性和准确性。 电商OMS与ERP系统接口开发配置

数据集成生命周期的ETL转换:从源平台到班牛API接口

在数据集成生命周期中,ETL(Extract, Transform, Load)转换是一个关键步骤。本文将详细探讨如何使用轻易云数据集成平台,将已经集成的源平台数据进行ETL转换,转为目标平台班牛API接口所能够接收的格式,并最终写入目标平台。

API接口配置与元数据解析

首先,我们需要理解元数据配置中的各个字段及其含义。以下是我们要处理的元数据配置:

{
  "api": "task.update",
  "effect": "EXECUTE",
  "method": "POST",
  "idCheck": true,
  "request": [
    {
      "field": "app_id",
      "label": "小程序id",
      "type": "int",
      "value": "21151"
    },
    {
      "field": "project_id",
      "label": "群组ID",
      "type": "int",
      "value": "77206"
    },
    {
      "field": "task_id",
      "label": "工单id",
      "type": "int",
      "value": "_mongoQuery 73f95f22-03a2-3f8a-aa21-4c08c541daf4 findField=id where={\"content.77213\":{\"$eq\":\"{onlineTradeNo}\"}}"
    },
    {
      "field": "contents",
      "label": "contents",
      "type": "object",
      "children": [
        {
          "field": "78538",
          "label": "销售出库",
          "type": "int",
          "value": 78534
        }
      ]
    }
  ]
}

数据请求与清洗

在进行ETL转换之前,首先需要从源系统中提取相关数据并进行清洗。假设我们已经通过轻易云平台完成了这一阶段,并获得了如下结构的数据:

{
  "_id":"5f8d0d55b54764421b7156e8",
  "_class":"com.qeasy.model.SalesOrderStatusUpdateRequest",
  "_mongoQuery":{
    "_id":"73f95f22-03a2-3f8a-aa21-4c08c541daf4"
  },
  ...
}

数据转换

接下来,我们需要将这些数据转换为班牛API所需的格式。根据元数据配置,我们需要构造一个POST请求,具体字段如下:

  1. app_id:固定值21151。
  2. project_id:固定值77206。
  3. task_id:通过MongoDB查询获取,其中onlineTradeNo作为查询条件。
  4. contents:包含一个子字段78538,其值固定为78534。

具体实现代码示例如下:

import requests
import json

# 定义API URL和Headers
api_url = 'https://api.banniu.com/task.update'
headers = {'Content-Type': 'application/json'}

# 构造请求体
payload = {
    'app_id': 21151,
    'project_id': 77206,
    'task_id': get_task_id(onlineTradeNo), # 假设get_task_id是一个函数,用于根据onlineTradeNo查询task_id
    'contents': {
        '78538': 78534
    }
}

# 将请求体转为JSON格式
payload_json = json.dumps(payload)

# 发起POST请求
response = requests.post(api_url, headers=headers, data=payload_json)

# 检查响应状态码和内容
if response.status_code == 200:
    print('Data successfully written to Banniu')
else:
    print(f'Failed to write data: {response.text}')

数据写入

上述代码示例展示了如何构造并发送HTTP POST请求,将转换后的数据写入班牛系统。特别注意的是,task_id字段需要通过MongoDB查询来动态获取,这里可以利用轻易云提供的查询功能。

例如,通过以下伪代码实现对MongoDB的查询:

def get_task_id(onlineTradeNo):
    query = {"content.77213":{"$eq":"{onlineTradeNo}"}}
    result = mongo_collection.find_one(query)
    return result['id'] if result else None

通过这种方式,我们可以确保在每次调用API时都能获取到最新的、正确的task_id

实践中的注意事项

  1. 错误处理:在实际操作中,需要对可能出现的各种错误情况进行处理,例如网络异常、API返回错误等。
  2. 日志记录:建议在每次调用API时记录日志,以便后续追踪和排查问题。
  3. 性能优化:对于大批量的数据处理,可以考虑使用批量操作或异步处理方式,以提高效率。

通过以上步骤,我们成功地将源平台的数据经过ETL转换后,写入到了目标平台班牛系统。这一过程不仅提升了数据处理效率,也确保了数据的一致性和准确性。 打通钉钉数据接口

更多系统对接方案