从吉客云到BI拉伯塔MySQL的ETL过程剖析

  • 轻易云集成顾问-陈洁琳

吉客云数据集成到MySQL的技术案例分享

在本次技术案例中,我们将重点探讨如何使用轻易云数据集成平台,将吉客云的其他入库单查询结果高效、可靠地集成到MySQL数据库。集成方案名称为“吉客云-其他入库单查询-->BI拉伯塔-其他入库单表”。该方案旨在通过API接口erp.storage.goodsdocin.v2获取吉客云的数据,并利用MySQL API batchexecute进行批量写入,实现大量数据的快速存储和处理。

为了确保整个数据对接过程顺利进行,本方案特别关注以下几个关键点:

  1. 定时抓取与分页处理:通过设置定时任务,定期调用吉客云接口以抓取最新的其他入库单数据。同时,为了应对可能存在的大量记录,我们进行了分页和限流操作,以确保每次请求的数据规模可控,并避免超出系统负荷。

  2. 自定义转换逻辑与格式差异解决:针对吉客云返回的数据结构,与目标MySQL数据库字段不完全匹配的问题,通过自定义转换逻辑,对原始数据进行清洗和映射,保证写入过程中的一致性和完整性。

  3. 高吞吐量写入能力:轻易云平台提供了强大的批量处理功能,使得我们能有效快速地将大量记录一次性写入到MySQL。这对于提升系统性能和响应速度至关重要,同时也减少系统资源消耗。

  4. 实时监控与异常处理机制:采用集中化监控工具来实时跟踪每个集成任务的执行状态,一旦检测到错误或异常情况,触发告警并启动自动重试机制,最大限度地保障数据传输不中断、不漏单。

  5. 日志记录与审计:实现全流程日志记录,从API调用开始,到最终写入完成,每一步都有详细的日志信息供查阅。这不仅有助于问题排查,也为后续优化提供了宝贵的数据支持。

以上是此次技术案例开头部分主要内容。在接下来的讨论中,将逐步展开具体实施步骤及代码示例,包括如何正确配置API调用、实现分页抓取策略、自定义转化函数,以及优化批量插入性能的方法。 钉钉与WMS系统接口开发配置

调用吉客云接口erp.storage.goodsdocin.v2获取并加工数据的技术案例

在数据集成过程中,调用源系统接口是至关重要的一步。本文将详细探讨如何通过轻易云数据集成平台调用吉客云接口erp.storage.goodsdocin.v2来获取并加工数据。

接口配置与请求参数

首先,我们需要配置接口的元数据。根据提供的元数据配置,我们可以看到该接口采用POST方法,主要参数包括分页信息、入库单号、创建时间、更新时间、入库类型等。以下是具体的请求参数配置:

{
  "api": "erp.storage.goodsdocin.v2",
  "method": "POST",
  "number": "goodsdocNo",
  "id": "recId",
  "pagination": {
    "pageSize": 10
  },
  "request": [
    {"field": "pageIndex", "label": "分页页码", "type": "int"},
    {"field": "pageSize", "label": "分页页数", "type": "int", "value": "100"},
    {"field": "goodsDocNo", "label": "入库单号", "type": "string"},
    {"field": "startDate", "label": "创建时间的起始时间", "type": "string"},
    {"field": "endDate", "label": "创建时间的结束时间", "type": "string"},
    {"field": "gmtModifiedStart", 
        "label": "主表更新时间起始", 
        "type":"string",
        "value":"_function from_unixtime(({LAST_SYNC_TIME}-86400),'%Y-%m-%d %H:%i:%s')"
    },
    {"field":"gmtModifiedEnd",
        "label":"主表更新时间截至",
        "type":"string",
        "value":"_function from_unixtime(({CURRENT_TIME}),'%Y-%m-%d %H:%i:%s')"
    },
    {"field":"inouttype","label":"入库类型","type":"string","value":"104"},
    {"field":"sourceBillNo","label":"原始单号","type":"string"},
    {"field":"warehouseCode","label":"仓库编号","type":"string"},
    {"field":"outBillNo","label":"外部单号","type":"string"},
    {"field":"vendCode","label":"供应商编号(往来单位)","type":"string"},
    {"field":"billNo","label":"上游单据号(关联单号)","type":"string"},
    {"field":"userName","label":"创建人名称","type":"string"},
    {
        "field" :   "selelctFields",
        "label" :   "需要返回的字段",
        "type" :    "string",
        "value" :   "recId,goodsdocNo,billNo,inOutDate,gmtCreate,inouttype,inouttypeName,vendCustomerCode,vendCustomerName,currencyCode,currencyRate,userName,warehouseCode,warehouseName,comment,memo,logisticName,logisticNo,companyId,companyName,logisticType,logisticCode,inOutReason,sourceBillNo,channelId,channelCode,channelName,redStatus,field1,field2,field3,field4,field5..."
     }
  ],
  ...
}

数据请求与清洗

在实际操作中,我们会先构建一个HTTP POST请求,包含上述所有必要参数。为了保证数据的一致性和准确性,我们需要特别注意以下几点:

  1. 分页处理:由于数据量可能较大,需要进行分页处理。每次请求时设置pageIndexpageSize,确保能够逐页获取所有数据。
  2. 时间过滤:通过gmtModifiedStartgmtModifiedEnd字段,可以实现对更新时间范围内的数据进行过滤。这两个字段使用了函数转换,将时间戳转换为标准时间格式。
  3. 入库类型:固定值为104,即“其他入库”,确保只获取特定类型的入库单。

示例代码如下:

import requests
import json

url = 'https://api.jikecloud.com/erp.storage.goodsdocin.v2'
headers = {'Content-Type': 'application/json'}
payload = {
    'pageIndex': 1,
    'pageSize': 100,
    'gmtModifiedStart': '2023-01-01 00:00:00',
    'gmtModifiedEnd': '2023-01-31 23:59:59',
    'inouttype': '104',
}

response = requests.post(url, headers=headers, data=json.dumps(payload))
data = response.json()

数据转换与写入

获取到原始数据后,需要对其进行清洗和转换,以便写入目标系统。在这个过程中,可以利用轻易云平台提供的数据处理工具,对字段进行映射、格式转换等操作。

例如,将返回的数据中的日期字段从字符串格式转换为日期对象,或者将某些字段值进行标准化处理:

from datetime import datetime

def clean_data(raw_data):
    for record in raw_data:
        record['inOutDate'] = datetime.strptime(record['inOutDate'], '%Y-%m-%d %H:%M:%S')
        # 更多清洗操作...

cleaned_data = clean_data(data['result'])

最后,将清洗后的数据写入目标系统,例如BI拉伯塔中的其他入库单表:

def write_to_target_system(cleaned_data):
    # 假设目标系统有一个API可以接收批量插入的数据
    target_url = 'https://bi.labata.com/api/v1/storage'

    response = requests.post(target_url, headers=headers, data=json.dumps(cleaned_data))

write_to_target_system(cleaned_data)

通过以上步骤,我们完成了从吉客云接口获取并加工数据的全过程。在实际项目中,可以根据具体需求调整参数和处理逻辑,以达到最佳效果。 金蝶与CRM系统接口开发配置

数据集成生命周期中的ETL转换与写入

在数据集成的过程中,将源平台的数据转换为目标平台所能接受的格式是至关重要的一步。本文将详细探讨如何利用轻易云数据集成平台将吉客云的“其他入库单查询”数据转换并写入到BI拉伯塔的MySQL数据库中。

数据请求与清洗

在进行ETL转换之前,首先需要从源系统吉客云中获取“其他入库单查询”的数据。这一步骤涉及到数据请求和清洗,确保获取的数据符合目标平台的需求。

数据转换与写入

接下来,我们重点关注如何将清洗后的数据进行转换,并通过MySQL API接口写入到目标平台。以下是元数据配置及其应用示例:

{
  "api": "batchexecute",
  "effect": "EXECUTE",
  "method": "SQL",
  "number": "id",
  "id": "id",
  "name": "id",
  "idCheck": true,
  "request": [
    {"field":"recId","label":"入库单ID","type":"string","value":"{recId}"},
    {"field":"goodsdocNo","label":"入库单号","type":"string","value":"{goodsdocNo}"},
    {"field":"billNo","label":"上游单据号(关联单号)","type":"string","value":"{billNo}"},
    {"field":"inOutDate","label":"入库时间","type":"string","value":"{{inOutDate|datetime}}"},
    {"field":"gmtCreate","label":"创建时间","type":"string","value":"{{gmtCreate|datetime}}"},
    {"field":"inouttype","label":"入库类型编码(100-期初库存 101-采购入库...)","type":"string","value":"{inouttype}"},
    {"field":"inouttypeName","label":"入库类型名称(100-期初库存 101-采购入库...)","type":"string","value":"{inouttypeName}"},
    {"field":"vendCustomerCode","label":"往来单位编号","type":"string","value":"{vendCustomerCode}"},
    {"field":"vendCustomerName","label":"往来单位名称","type":"string","value":"{vendCustomerName}"},
    {"field":"currencyCode","label":"币种编号","type":"string","value":"{currencyCode}"},
    {"field":"currencyRate","label":"币种汇率","type":"string","value":"{currencyRate}"},
    {"field":"userName","label":"业务员名字","type":"string","value":"{userName}"},
    {"field":"","label":"","type":"","value":""}
  ],
  "otherRequest": [
    {
      "field": "main_sql",
      "label": "主语句",
      "type": "string",
      "describe": "SQL首次执行的语句,将会返回:lastInsertId",
      "value": 
        `REPLACE INTO erp_other_storage_goodsdocin 
        (recId, goodsdocNo, billNo, inoutDate, gmtCreate, inouttype, inouttypeName, vendCustomerCode, vendCustomerName, currencyCode, currencyRate, userName)
        VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`
    },
    {
      "field": "limit",
      "label": "limit",
      "type": "string",
      "value": "1000"
    }
  ]
}

SQL语句解析

上述配置中的main_sql字段定义了一个SQL插入语句,用于将数据写入MySQL数据库。该语句使用了占位符?,这些占位符将在实际执行时被具体的数据值替换。

例如:

REPLACE INTO erp_other_storage_goodsdocin 
(recId, goodsdocNo, billNo, inoutDate, gmtCreate, inouttype, inouttypeName, vendCustomerCode, vendCustomerName, currencyCode, currencyRate, userName)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)

在执行时,系统会自动将占位符替换为相应的数据值,如:

REPLACE INTO erp_other_storage_goodsdocin 
(recId, goodsdocNo, billNo, inoutDate, gmtCreate, inouttype, inouttypeName, vendCustomerCode,vendCustomerName,currencyCode,currencyRate,userName)
VALUES ('12345', 'IN20230901', 'UP20230901', '2023-09-01 10:00:00', '2023-09-01 09:00:00', '101', '采购入库', 'CUST001', '客户A', 'CNY', '6.5', '张三')

数据类型转换

在元数据配置中,字段inOutDategmtCreate等日期字段需要进行格式转换。通过使用模板字符串如{{inOutDate|datetime}},可以确保日期格式符合目标平台要求。

批量执行

为了提高效率,可以使用批量执行API (batchexecute) 一次性处理多条记录。通过设置合适的批处理大小(如配置中的limit:1000),可以有效减少网络延迟和资源消耗。

异常处理

在实际操作中,需要考虑异常处理机制。例如,当某条记录插入失败时,应记录错误信息并继续处理后续记录,以确保整体流程的稳定性和可靠性。

总结

通过上述步骤,我们实现了从吉客云到BI拉伯塔MySQL数据库的数据ETL转换和写入。在此过程中,充分利用了轻易云数据集成平台提供的元数据配置功能,实现了高效、可靠的数据集成。 如何对接用友BIP接口