数据集成过程中的ETL转换与MySQL写入详解

  • 轻易云集成顾问-张妍琪

销售出库单金蝶=>帆软数据库每日执行:系统对接集成案例分享

在本文中,我们将聚焦于一个具体的技术案例,即如何实现从金蝶云星空到MySQL的销售出库单数据集成。此方案通过轻易云的数据集成平台,在每日定时抓取金蝶云星空接口数据后,将其高效可靠地写入到MySQL数据库中。

API对接与调用

为了完成这一任务,首先需要利用金蝶云星空提供的executeBillQuery API获取销售出库单数据。该API允许我们根据业务需求自定义查询条件,从而精准抓取所需的数据。在数据提取之后,我们使用MySQL数据库的execute API进行批量写入,此步骤确保了大量销售出库单记录能够快速且无误地转移到目标数据库中。

数据监控与质量保障

在实际操作过程中,为了保证集成过程中的数据完整性和准确性,我们启用了集中监控和告警系统,以便实时跟踪每个数据集成任务的状态及性能。一旦检测到任何异常,系统会立即发出提醒,并启动错误重试机制以尽快恢复正常操作。此外,通过支持自定义的数据转换逻辑,可以针对特定业务需求调整数据结构,使得从金蝶云星空导出的原始数据信息能够顺利适配至MySQL的表结构之中。

性能优化与吞吐量管理

为了应对大规模、高频率的数据交换,本次方案还特别配置了一系列性能优化措施。例如,通过拆分分页来处理金蝶云星空接口有限流问题,使得每次调用executeBillQuery都能稳定返回预期数量的数据。同时,在往MySQL批量插入时采用事务控制和批处理机制,从而显著提升整体吞吐效率并减少网络延迟和资源消耗。

以上只是整个解决方案的一部分开端内容,后续章节将详细介绍具体实施步骤、代码示例以及遇到的问题及解决方法,包括如何应对不同类型的数据格式差异,以及整合作业流程中的异常处理策略等。 打通企业微信数据接口

调用金蝶云星空接口executeBillQuery获取并加工数据

在数据集成过程中,调用源系统接口是关键的第一步。本文将详细探讨如何通过轻易云数据集成平台调用金蝶云星空接口executeBillQuery,获取销售出库单数据并进行初步加工。

接口配置与调用

首先,我们需要配置并调用金蝶云星空的executeBillQuery接口。该接口主要用于查询业务对象表单中的数据。在本案例中,我们将查询销售出库单的数据。

元数据配置解析

根据提供的元数据配置,我们可以看到以下关键字段和请求参数:

  • API: executeBillQuery
  • 方法: POST
  • 业务对象表单Id: SAL_OUTSTOCK
  • 过滤条件: FCreateDate>='{{LAST_SYNC_TIME|datetime}}' and FCreateDate<='{{CURRENT_TIME|datetime}}'
  • 返回总行数: 可选
  • 开始行索引: 可选
  • 最大行数: 500
  • 需查询的字段key集合: 详见元数据中的FieldKeys

这些参数将构成我们请求的主体内容。

请求参数构建

在构建请求参数时,需要特别注意以下几点:

  1. 过滤条件:使用动态时间戳来确保每次查询都是增量获取新数据。
  2. 字段key集合:根据业务需求选择所需字段,以减少不必要的数据传输。

示例请求体如下:

{
  "FormId": "SAL_OUTSTOCK",
  "FilterString": "FCreateDate>='2023-01-01T00:00:00' and FCreateDate<='2023-01-31T23:59:59'",
  "FieldKeys": "FID,FBillNo,FMustQty,FBillAmount_LC,FARJOINAMOUNT,FCostAmount_LC,FEntryTaxAmount,FSHRMC,FTaxPrice,FCustomerID2,FDocumentStatus,FBillAllAmount,FCustomerID.FName,FEntryCostAmount,FDate,FCostPrice,FMateriaModel,Fentity_FENTRYID,FReceiveAddress,FLXDH,F_ora_Qty,FARJoinQty,FSaleOrgId.FName,FBillTaxAmount,FNote,FBillTypeID.FName,FExchangeRate,F_ora_HeadDeliveryWay.FDataValue,FActQty,FBillAllAmount_LC,FSalCostPrice,F_ora_Combo,FRealQty,FBaseARJoinQty,FBillTaxAmount_LC,FARNOTJOINQTY,FStockID.Fname,FBillAmount,FMaterialName,FPrice,FUnitID.Fname,FSALUNITQTY,FSalUnitID.Fname,FReceiptConditionID.Fname,FEntryTaxRate,FAmount,FEntrynote,FSettleCurrID.Fname,FSalesManID.FName,F_PSWZ_Date,FAmount_LC,FCorrespondOrgId.FName,FAllAmount_LC,FExchangeTypeID.Fname,FSaleDeptID.FName,FReturnQty,FMaterialID.FNumber"
}

数据清洗与转换

获取到原始数据后,下一步是对数据进行清洗和转换。这一步骤至关重要,因为它直接影响到后续的数据写入和使用。

数据清洗
  1. 去重处理:确保没有重复记录。
  2. 格式转换:将日期、金额等字段转换为统一格式。
  3. 缺失值处理:填补或删除缺失值,保证数据完整性。
数据转换

根据业务需求,对部分字段进行转换。例如,将金额从本位币转换为目标币种,或将日期格式从字符串转换为日期对象。

示例代码(伪代码):

def clean_and_transform(data):
    cleaned_data = []
    for record in data:
        # 去重处理
        if record not in cleaned_data:
            # 格式转换
            record['FDate'] = convert_to_date(record['FDate'])
            record['FBillAmount_LC'] = convert_to_float(record['FBillAmount_LC'])
            # 缺失值处理
            if not record['FReceiveAddress']:
                record['FReceiveAddress'] = '未知地址'
            cleaned_data.append(record)
    return cleaned_data

数据写入

经过清洗和转换的数据最终需要写入目标系统(如帆软数据库)。这一步通常包括以下步骤:

  1. 建立数据库连接
  2. 批量插入数据
  3. 错误处理与日志记录

示例代码(伪代码):

def write_to_database(cleaned_data):
    connection = establish_db_connection()
    try:
        for record in cleaned_data:
            insert_record(connection, record)
    except Exception as e:
        log_error(e)
    finally:
        connection.close()

通过上述步骤,我们完成了从金蝶云星空获取销售出库单数据并进行初步加工的全过程。这个过程不仅确保了数据的准确性和完整性,还为后续的数据分析和应用打下了坚实基础。 打通企业微信数据接口

数据集成生命周期第二步:ETL转换与写入MySQL

在数据集成过程中,将源平台的数据转换为目标平台所能接收的格式是至关重要的一环。本文将详细探讨如何利用轻易云数据集成平台,将销售出库单金蝶的数据通过ETL转换,最终写入目标平台MySQL。

数据请求与清洗

首先,从金蝶系统中获取销售出库单数据。这一步主要涉及到数据的提取和初步清洗,确保数据的完整性和准确性。提取的数据包括多个字段,如实体主键、含税单价、未关联应收数量等。

数据转换与写入

接下来,我们进入数据生命周期的第二步:ETL(Extract, Transform, Load)转换。以下是具体的元数据配置和处理步骤:

{
  "api": "execute",
  "effect": "EXECUTE",
  "method": "POST",
  "idCheck": true,
  "request": [
    {
      "field": "main_params",
      "label": "主参数",
      "type": "object",
      "describe": "主参数",
      "children": [
        {"field":"FID","label":"实体主键","type":"int","describe":"实体主键","value":"{FID}"},
        {"field":"FTaxPrice","label":"含税单价","type":"float","describe":"含税单价","value":"{FTaxPrice}"},
        {"field":"FARNOTJOINQTY","label":"未关联应收数量(计价单位)","type":"int","describe":"未关联应收数量(计价单位)","value":"{FARNOTJOINQTY}"},
        {"field":"FReceiptConditionID","label":"收款条件","type":"string","describe":"收款条件","value":"{FReceiptConditionID}"},
        {"field":"FBillAmount_LC","label":"金额(本位币)","type":"float","describe":"金额(本位币)","value":"{FBillAmount_LC}"},
        {"field":"FCostPrice","label":"成本价(本位币)","type":"float","describe":"成本价(本位币)","value":"{FCostPrice}"},
        {"field":"FNote","label":"备注","type":"string","describe":"备注","value":"{FNote}"},
        {"field":"FStockID","label":"仓库","type":"string","describe":"仓库","value":"{FStockID}"},
        {"field": "FDate", "label": "日期", "type": "date", "describe": "日期", "value": "{{FDate|date}}"},
        {"field": "FAmount_LC", "label": "金额(本位币)", "type": "float", "describe": "金额(本位币)", "value": "{FAmount_LC}"},
        {
          "field": "FAmount",
          "label": "金额",
          "type": float,
          describe: 金额,
          value: "_function '{exchangeRate}'*{FAmount_LC}"
        },
        // 更多字段配置...
      ]
    }
  ],
  // SQL插入语句
  otherRequest: [
    {
      field: main_sql,
      label: 主语句,
      type: string,
      describe: 主语句,
      value: INSERT INTO sale_test (FID, FBillNo, FDocumentStatus, FSaleOrgId, FDate, FCustomerID_FName, FSaleDeptID_FName, FSalesManID, FBillTypeID, FReceiveAddress, FCorrespondOrgId, FNote, FSHRMC, FLXDH, F_ora_Combo, F_ora_Qty, FCustomerID2, F_PSWZ_Date, F_ora_HeadDeliveryWay, FLocalCurrID, FExchangeTypeID, FExchangeRate, FSettleCurrID, FReceiptConditionID, FBillAmount_LC, 
FBillAllAmount_LC,
FBillTaxAmount_LC,
FBillTaxAmount,
FBillAllAmount,
FBillAmount,
FENTRYID,
FMaterialID,
FMaterialName,
FMateriaModel,
FUnitID,
FMustQty,
FRealQty,
FStockID,
FEntrynote,
FCostPrice,
FEntryCostAmount,
FCostAmount_LC,
FReturnQty,
FPrice,
FTaxPrice,FEntryTaxRate
FAmount
FAmount_LC
,FEntryTaxAmount
FTaxAmount_LC
,FAllAmount
,FAllAmount_LC,FBaseARJoinQty,FARJOINAMOUNT
FSalCostPrice,FActQty,FARJoinQtyFSalUnitIDFSALUNITQTY,FARNOTJOINQTYFCustomerID_FNumberFSaleDeptID_FNumber,FStockID_FNumberFSaleOrgId_FNumberFCorrespondOrgId_FNumber) VALUES (:FID,:FBillNo,:FDocumentStatus,:FSaleOrgId,:FDate,:FCustomerID_FName,:FSaleDeptID_FName,:FSalesManID,:FBillTypeID,:FReceiveAddress,:FCorrespondOrgId,:FNote,:FSHRMC,:FLXDH,:F_ora_Combo,:F_ora_Qty,:FCustomerID2,:F_PSWZ_Date,:F_ora_HeadDeliveryWay,:FLocalCurrID,: 

在这个配置中,关键步骤如下:

  1. 字段映射:将源平台的数据字段映射到目标平台的字段。例如,FID 对应 实体主键FTaxPrice 对应 含税单价 等。
  2. 类型转换:确保每个字段的数据类型正确。例如,将日期字段 {{FDate|date}} 转换为目标平台所需的日期格式。
  3. 计算字段:对于需要计算的字段,如金额 FAmount,使用 _function '{exchangeRate}'*{FAmount_LC} 进行汇率转换。
  4. 条件处理:对于一些特殊字段,如运费 运费,使用条件语句 _function case '{F_ora_Combo}' when '1' then '甲方' when '2' then '乙方' when '0' then '' end 来处理不同情况。

写入MySQL

最后一步是将转换后的数据写入MySQL数据库。通过构建SQL插入语句,将所有处理后的字段值插入到目标表 sale_test 中。

INSERT INTO sale_test (
  FID, 
  FBillNo, 
  // 更多字段...
) VALUES (
  :FID, 
  :FBillNo, 
  // 更多值...
);

通过上述步骤,我们实现了从源平台到目标平台的数据无缝对接,并确保数据在传输过程中的完整性和准确性。这不仅提升了业务流程的透明度,也极大地提高了数据处理的效率。 金蝶云星空API接口配置