利用轻易云平台实现数据ETL转换并写入MySQL

  • 轻易云集成顾问-叶威宏

金蝶云星空数据集成到MySQL的技术实践案例分享

在本技术案例中,我们将详细探讨如何通过轻易云数据集成平台,将金蝶云星空系统中的采购收料通知单(MOM-CGSLTZD)数据高效地集成到MySQL数据库中。本文将重点关注API接口调用、分页和限流问题处理、自定义数据转换逻辑以及异常处理机制等重要技术细节点。

首先,考虑到金蝶云星空系统提供了executeBillQuery API来获取采购收料通知单的数据,我们需要确保定时可靠地抓取这些数据。在这个过程中,每次调用API会返回一定数量的数据条目,因此我们必须处理好分页和限流的问题,以确保不漏单并且不会超出系统负载能力。

为了实现这一目标,首先配置好轻易云平台上对接金蝶云星空的任务,通过可视化的数据流设计工具设定每个步骤。从executeBillQuery开始,通过批量抓取,在每次请求后解析响应并提取相关字段,为接下来的写入做准备。此时,自定义的数据转换逻辑可以帮助我们格式化这些数据,使其符合MySQL数据库的表结构要求。

其次,针对大规模数据写入需求,利用轻易云平台强大的高吞吐量写入能力,可以快速将大量采购收料通知单记录批量插入到MySQL数据库中。这不仅提升了整体效率,也避免了频繁的小批量操作带来的性能瓶颈。同时,对于实时监控任务执行状态及性能指标,可以借助中央监控与告警系统及时发现潜在问题,并采取相应措施调整优化流程。

此外,为保证整个过程中的稳定性,需要设计有效的错误重试机制。例如,当网络波动或服务暂时不可用导致API请求失败时,可以自动触发重试策略,以减少人为干预的不确定性。同时,对异常情况进行日志记录,有助于后续分析和改进工作,从而进一步提升集成方案的可靠性和健壮性。

总结来说,本案例展示了一套从金蝶云星空获取采购收料通知单,再高效写入至MySQL数据库的方法论。不仅涵盖了API接口调用、分页与限流、自定义转换逻辑,还包括监控与异常处理等多个关键环节,将为类似场景提供有价值的参考经验。 泛微OA与ERP系统接口开发配置

调用金蝶云星空接口executeBillQuery获取并加工数据

在数据集成的生命周期中,调用源系统接口获取数据是至关重要的一步。本文将深入探讨如何通过轻易云数据集成平台调用金蝶云星空的executeBillQuery接口,获取采购收料通知单的数据,并进行相应的加工处理。

接口配置与请求参数

首先,我们需要配置元数据以便正确调用executeBillQuery接口。以下是关键的元数据配置:

{
  "api": "executeBillQuery",
  "effect": "QUERY",
  "method": "POST",
  "number": "FBillNo",
  "id": "FDetailEntity_FEntryID",
  "name": "FBillNo",
  "idCheck": true,
  "request": [
    {"field":"FBillNo","label":"单据编号","type":"String","describe":"单据编号","value":"FBillNo"},
    {"field":"FDocumentStatus","label":"单据状态","type":"String","describe":"单据状态","value":"FDocumentStatus"},
    {"field":"FMaterialId","label":"物料编码","type":"String","describe":"物料编码","value":"FMaterialId.fnumber"},
    {"field":"FStockOrgId_FNumber","label":"收料组织","type":"String","describe":"收料组织","value":"FStockOrgId.FNumber"},
    {"field":"FMaterialName","label":"物料名称","type":"String","describe":"物料名称","value":"FMaterialName"},
    {"field":"FDate","label":"收料日期","type":"String","describe":"收料日期","value":"FDate"},
    // ...省略其他字段
  ],
  "otherRequest": [
    {"field": "Limit", "label": "最大行数", "type": "string", "describe": "金蝶的查询分页参数", "value": "5000"},
    {"field": "StartRow", "label": "开始行索引", "type": "string", "describe": "金蝶的查询分页参数", "value": "{PAGINATION_START_ROW}"},
    {"field": "FilterString", 
     "label": "过滤条件", 
     "type": "string", 
     "describe": 
     // 示例写法
     FSupplierId.FNumber = 'VEN00010' and FApproveDate>=",
     // 实际使用时
     value: 
     FDocumentStatus='C' AND FApproveDate>='{{LAST_SYNC_TIME|dateTime}}' -- AND FApproveDate >= '2024-09-20T16:24:07.56' -- AND FBillNo='CGSL089900'"
    },
    {"field": 
      // 查询字段集合
      FieldKeys, 
      label: 
      // 字段key集合
      需查询的字段key集合, 
      type: array, 
      describe: 金蝶分录主键ID格式:FPOOrderEntry_FEntryId,其它格式 FPurchaseOrgId.FNumber, 
      parser: {name: ArrayToString, params: ,}
    },
    {"field":
      FormId,
      label:
      // 表单ID
      业务对象表单Id,
      type:
      string,
      describe:
      必须填写金蝶的表单ID如:PUR_PurchaseOrder,
      value:
      PUR_ReceiveBill},
    {"field":
       OrderString,
       label:
       排序,
       type:
       string,
       value:
       FDate ASC}
   ],
   autoFillResponse:true
}

数据请求与清洗

在实际操作中,我们通过POST请求方式调用executeBillQuery接口。请求体中包含了我们需要查询的字段和其他控制参数,例如分页、过滤条件等。

{
  // 请求体示例
  FormId: 'PUR_ReceiveBill',
  FieldKeys: 'FBillNo,FDocumentStatus,FMaterialId.fnumber,FStockOrgId.FNumber,FMaterialName,FDate,...',
  FilterString: `FDocumentStatus='C' AND FApproveDate>='${LAST_SYNC_TIME}'`,
  Limit: '5000',
  StartRow: '{PAGINATION_START_ROW}',
  OrderString: 'FDate ASC'
}

在接收到响应后,需要对数据进行清洗和转换。清洗过程包括:

  1. 字段映射:将返回的数据字段映射到目标系统所需的字段。
  2. 数据类型转换:确保每个字段的数据类型符合目标系统要求。
  3. 异常处理:处理可能出现的数据异常,如缺失值、格式错误等。

数据转换与写入

完成数据清洗后,下一步是将数据转换为目标系统可接受的格式,并写入目标系统。这一步通常涉及以下操作:

  1. 格式化数据:根据目标系统的数据结构要求,对数据进行重新组织和格式化。
  2. 批量写入:为了提高效率,通常采用批量写入方式,将多条记录一次性写入目标系统。
  3. 日志记录:记录每次操作的日志,包括成功和失败的记录,以便后续追踪和排查问题。

实例代码

以下是一个简化版的代码示例,展示了如何调用接口并处理返回的数据:

const axios = require('axios');

async function fetchData() {
  const requestBody = {
    FormId: 'PUR_ReceiveBill',
    FieldKeys: 'FBillNo,FDocumentStatus,FMaterialId.fnumber,FStockOrgId.FNumber,FMaterialName,FDate,...',
    FilterString: `FDocumentStatus='C' AND FApproveDate>='${LAST_SYNC_TIME}'`,
    Limit: '5000',
    StartRow: '{PAGINATION_START_ROW}',
    OrderString: 'FDate ASC'
  };

  try {
    const response = await axios.post('https://api.kingdee.com/executeBillQuery', requestBody);

    if (response.data && response.data.result) {
        const cleanedData = cleanData(response.data.result);
        writeToTargetSystem(cleanedData);
        console.log('Data fetched and processed successfully');
    } else {
        console.error('Failed to fetch data:', response.data);
    }

  } catch (error) {
    console.error('Error fetching data:', error);
  }
}

function cleanData(data) {
   return data.map(item => ({
     billNo: item.FBillNo,
     documentStatus: item.FDocumentStatus,
     materialCode: item['FMaterialId.fnumber'],
     stockOrgCode: item['FStockOrgId.FNumber'],
     materialName: item.FMaterialName,
     receiptDate: item.FDate,
     // ...其他字段映射
   }));
}

function writeToTargetSystem(data) {
   // 批量写入逻辑实现
}

fetchData();

通过以上步骤,我们可以高效地从金蝶云星空获取采购收料通知单的数据,并进行必要的清洗和转换,为后续的数据处理打下坚实基础。 钉钉与CRM系统接口开发配置

使用轻易云数据集成平台进行ETL转换并写入MySQL API接口

在数据集成生命周期的第二步中,已经集成的源平台数据需要经过ETL(提取、转换、加载)过程,最终转为目标平台 MySQL API 接口所能够接收的格式,并写入目标平台。本文将详细探讨这一过程,特别是如何利用元数据配置实现这一目标。

数据请求与清洗

首先,我们需要从源系统提取原始数据,并进行必要的清洗操作。这一步骤确保了数据的一致性和准确性。假设我们已经完成了这一步骤,现在进入数据转换与写入阶段。

数据转换

在数据转换阶段,我们需要根据目标平台 MySQL 的要求,将源数据映射到相应的字段。以下是一个典型的元数据配置示例,用于将采购收料通知单的数据转化为 MySQL 可接受的格式:

{
  "api": "execute",
  "effect": "EXECUTE",
  "method": "POST",
  "idCheck": true,
  "request": [
    {
      "field": "main_params",
      "label": "main_params",
      "type": "object",
      "describe": "111",
      "value": "1",
      "children": [
        {"field":"V_ID","label":"采购收料通知单内码","type":"string","value":"{FID}"},
        {"field":"V_BILL_TYPE","label":"单据类型","type":"string","value":"{FBillTypeID}"},
        {"field":"V_BILL_NO","label":"单据编号","type":"string","value":"{FBillNo}"},
        {"field":"V_DATE","label":"收料日期","type":"string","value":"{FDate}"},
        {"field":"V_SUPPLIER","label":"供应商","type":"string","value":"{FSupplierId}"},
        // ... 其他字段省略
      ]
    },
    {
      "field": "extend_params_1",
      "label": "extend_params_1",
      "type": "array",
      "value": "1",
      "children": [
        {"field":"V_ENTRY_ID","label":"采购收料通知单分录内码","type":"string","value":"{FDetailEntity_FEntryID}"},
        {"field":"V_ID","label":"采购收料通知单内码","type":"string","value":"{FID}"},
        {"field":"V_SEQ","label":"序号","type":"string","value":"{FDetailEntity_FSeq}"},
        // ... 其他字段省略
      ]
    }
  ],
  // SQL 插入语句
  "otherRequest":[
    {
      "field": "main_sql",
      "label": "main_sql",
      ...
    },
    {
      "field": "extend_sql_1",
      ...
    }
  ]
}

在这个配置中,我们定义了两个主要部分:main_paramsextend_params_1,分别对应主表和子表的数据字段。每个字段都包含一个键值对,其中键表示目标数据库中的字段名,值表示源数据中的字段名或计算逻辑。

数据写入

一旦数据完成转换,就可以通过API接口将其写入MySQL数据库。以下是用于插入主表和子表数据的SQL语句:

-- 主表插入语句
INSERT INTO `ty_mes`.`mt_pur_receive` (`ID`, `BILL_TYPE`, `BILL_NO`, `DATE`, `SUPPLIER`, ...)
VALUES (:V_ID, :V_BILL_TYPE, :V_BILL_NO, :V_DATE, :V_SUPPLIER, ...)
ON DUPLICATE KEY UPDATE `BILL_TYPE` = VALUES(`BILL_TYPE`), ...

-- 子表插入语句
INSERT INTO `ty_mes`.`mt_pur_receive_entry` (`ENTRY_ID`, `ID`, `SEQ`, `MATERIAL_CODE`, ...)
VALUES (:V_ENTRY_ID, :V_ID, :V_SEQ, :V_MATERIAL_CODE, ...)
ON DUPLICATE KEY UPDATE `ID` = VALUES(`ID`), ...

在这些SQL语句中,:V_ID 等占位符表示从元数据配置中获取的实际值。这些占位符将在执行时被替换为实际的数据,从而完成插入操作。

API调用

最后,通过API调用将转换后的数据发送到MySQL数据库。以下是一个简单的API调用示例:

POST /execute HTTP/1.1
Host: api.example.com
Content-Type: application/json

{
  "main_params": {
    ...
  },
  ...
}

该请求会触发相应的SQL语句执行,将数据插入到MySQL数据库中。

通过上述步骤,我们实现了从源系统到目标系统的数据无缝对接,确保了数据的一致性和准确性。在实际操作中,可以根据具体需求调整元数据配置和SQL语句,以满足不同场景下的数据集成需求。 打通钉钉数据接口