利用轻易云平台实现数据ETL转换并写入MySQL

  • 轻易云集成顾问-叶威宏
### 金蝶云星空数据集成到MySQL的技术实践案例分享 在本技术案例中,我们将详细探讨如何通过轻易云数据集成平台,将金蝶云星空系统中的采购收料通知单(MOM-CGSLTZD)数据高效地集成到MySQL数据库中。本文将重点关注API接口调用、分页和限流问题处理、自定义数据转换逻辑以及异常处理机制等重要技术细节点。 首先,考虑到金蝶云星空系统提供了executeBillQuery API来获取采购收料通知单的数据,我们需要确保定时可靠地抓取这些数据。在这个过程中,每次调用API会返回一定数量的数据条目,因此我们必须处理好分页和限流的问题,以确保不漏单并且不会超出系统负载能力。 为了实现这一目标,首先配置好轻易云平台上对接金蝶云星空的任务,通过可视化的数据流设计工具设定每个步骤。从executeBillQuery开始,通过批量抓取,在每次请求后解析响应并提取相关字段,为接下来的写入做准备。此时,自定义的数据转换逻辑可以帮助我们格式化这些数据,使其符合MySQL数据库的表结构要求。 其次,针对大规模数据写入需求,利用轻易云平台强大的高吞吐量写入能力,可以快速将大量采购收料通知单记录批量插入到MySQL数据库中。这不仅提升了整体效率,也避免了频繁的小批量操作带来的性能瓶颈。同时,对于实时监控任务执行状态及性能指标,可以借助中央监控与告警系统及时发现潜在问题,并采取相应措施调整优化流程。 此外,为保证整个过程中的稳定性,需要设计有效的错误重试机制。例如,当网络波动或服务暂时不可用导致API请求失败时,可以自动触发重试策略,以减少人为干预的不确定性。同时,对异常情况进行日志记录,有助于后续分析和改进工作,从而进一步提升集成方案的可靠性和健壮性。 总结来说,本案例展示了一套从金蝶云星空获取采购收料通知单,再高效写入至MySQL数据库的方法论。不仅涵盖了API接口调用、分页与限流、自定义转换逻辑,还包括监控与异常处理等多个关键环节,将为类似场景提供有价值的参考经验。 ![泛微OA与ERP系统接口开发配置](https://pic.qeasy.cloud/D37.png~tplv-syqr462i7n-qeasy.image) ### 调用金蝶云星空接口executeBillQuery获取并加工数据 在数据集成的生命周期中,调用源系统接口获取数据是至关重要的一步。本文将深入探讨如何通过轻易云数据集成平台调用金蝶云星空的`executeBillQuery`接口,获取采购收料通知单的数据,并进行相应的加工处理。 #### 接口配置与请求参数 首先,我们需要配置元数据以便正确调用`executeBillQuery`接口。以下是关键的元数据配置: ```json { "api": "executeBillQuery", "effect": "QUERY", "method": "POST", "number": "FBillNo", "id": "FDetailEntity_FEntryID", "name": "FBillNo", "idCheck": true, "request": [ {"field":"FBillNo","label":"单据编号","type":"String","describe":"单据编号","value":"FBillNo"}, {"field":"FDocumentStatus","label":"单据状态","type":"String","describe":"单据状态","value":"FDocumentStatus"}, {"field":"FMaterialId","label":"物料编码","type":"String","describe":"物料编码","value":"FMaterialId.fnumber"}, {"field":"FStockOrgId_FNumber","label":"收料组织","type":"String","describe":"收料组织","value":"FStockOrgId.FNumber"}, {"field":"FMaterialName","label":"物料名称","type":"String","describe":"物料名称","value":"FMaterialName"}, {"field":"FDate","label":"收料日期","type":"String","describe":"收料日期","value":"FDate"}, // ...省略其他字段 ], "otherRequest": [ {"field": "Limit", "label": "最大行数", "type": "string", "describe": "金蝶的查询分页参数", "value": "5000"}, {"field": "StartRow", "label": "开始行索引", "type": "string", "describe": "金蝶的查询分页参数", "value": "{PAGINATION_START_ROW}"}, {"field": "FilterString", "label": "过滤条件", "type": "string", "describe": // 示例写法 FSupplierId.FNumber = 'VEN00010' and FApproveDate>=", // 实际使用时 value: FDocumentStatus='C' AND FApproveDate>='{{LAST_SYNC_TIME|dateTime}}' -- AND FApproveDate >= '2024-09-20T16:24:07.56' -- AND FBillNo='CGSL089900'" }, {"field": // 查询字段集合 FieldKeys, label: // 字段key集合 需查询的字段key集合, type: array, describe: 金蝶分录主键ID格式:FPOOrderEntry_FEntryId,其它格式 FPurchaseOrgId.FNumber, parser: {name: ArrayToString, params: ,} }, {"field": FormId, label: // 表单ID 业务对象表单Id, type: string, describe: 必须填写金蝶的表单ID如:PUR_PurchaseOrder, value: PUR_ReceiveBill}, {"field": OrderString, label: 排序, type: string, value: FDate ASC} ], autoFillResponse:true } ``` #### 数据请求与清洗 在实际操作中,我们通过POST请求方式调用`executeBillQuery`接口。请求体中包含了我们需要查询的字段和其他控制参数,例如分页、过滤条件等。 ```json { // 请求体示例 FormId: 'PUR_ReceiveBill', FieldKeys: 'FBillNo,FDocumentStatus,FMaterialId.fnumber,FStockOrgId.FNumber,FMaterialName,FDate,...', FilterString: `FDocumentStatus='C' AND FApproveDate>='${LAST_SYNC_TIME}'`, Limit: '5000', StartRow: '{PAGINATION_START_ROW}', OrderString: 'FDate ASC' } ``` 在接收到响应后,需要对数据进行清洗和转换。清洗过程包括: 1. **字段映射**:将返回的数据字段映射到目标系统所需的字段。 2. **数据类型转换**:确保每个字段的数据类型符合目标系统要求。 3. **异常处理**:处理可能出现的数据异常,如缺失值、格式错误等。 #### 数据转换与写入 完成数据清洗后,下一步是将数据转换为目标系统可接受的格式,并写入目标系统。这一步通常涉及以下操作: 1. **格式化数据**:根据目标系统的数据结构要求,对数据进行重新组织和格式化。 2. **批量写入**:为了提高效率,通常采用批量写入方式,将多条记录一次性写入目标系统。 3. **日志记录**:记录每次操作的日志,包括成功和失败的记录,以便后续追踪和排查问题。 #### 实例代码 以下是一个简化版的代码示例,展示了如何调用接口并处理返回的数据: ```javascript const axios = require('axios'); async function fetchData() { const requestBody = { FormId: 'PUR_ReceiveBill', FieldKeys: 'FBillNo,FDocumentStatus,FMaterialId.fnumber,FStockOrgId.FNumber,FMaterialName,FDate,...', FilterString: `FDocumentStatus='C' AND FApproveDate>='${LAST_SYNC_TIME}'`, Limit: '5000', StartRow: '{PAGINATION_START_ROW}', OrderString: 'FDate ASC' }; try { const response = await axios.post('https://api.kingdee.com/executeBillQuery', requestBody); if (response.data && response.data.result) { const cleanedData = cleanData(response.data.result); writeToTargetSystem(cleanedData); console.log('Data fetched and processed successfully'); } else { console.error('Failed to fetch data:', response.data); } } catch (error) { console.error('Error fetching data:', error); } } function cleanData(data) { return data.map(item => ({ billNo: item.FBillNo, documentStatus: item.FDocumentStatus, materialCode: item['FMaterialId.fnumber'], stockOrgCode: item['FStockOrgId.FNumber'], materialName: item.FMaterialName, receiptDate: item.FDate, // ...其他字段映射 })); } function writeToTargetSystem(data) { // 批量写入逻辑实现 } fetchData(); ``` 通过以上步骤,我们可以高效地从金蝶云星空获取采购收料通知单的数据,并进行必要的清洗和转换,为后续的数据处理打下坚实基础。 ![钉钉与CRM系统接口开发配置](https://pic.qeasy.cloud/S14.png~tplv-syqr462i7n-qeasy.image) ### 使用轻易云数据集成平台进行ETL转换并写入MySQL API接口 在数据集成生命周期的第二步中,已经集成的源平台数据需要经过ETL(提取、转换、加载)过程,最终转为目标平台 MySQL API 接口所能够接收的格式,并写入目标平台。本文将详细探讨这一过程,特别是如何利用元数据配置实现这一目标。 #### 数据请求与清洗 首先,我们需要从源系统提取原始数据,并进行必要的清洗操作。这一步骤确保了数据的一致性和准确性。假设我们已经完成了这一步骤,现在进入数据转换与写入阶段。 #### 数据转换 在数据转换阶段,我们需要根据目标平台 MySQL 的要求,将源数据映射到相应的字段。以下是一个典型的元数据配置示例,用于将采购收料通知单的数据转化为 MySQL 可接受的格式: ```json { "api": "execute", "effect": "EXECUTE", "method": "POST", "idCheck": true, "request": [ { "field": "main_params", "label": "main_params", "type": "object", "describe": "111", "value": "1", "children": [ {"field":"V_ID","label":"采购收料通知单内码","type":"string","value":"{FID}"}, {"field":"V_BILL_TYPE","label":"单据类型","type":"string","value":"{FBillTypeID}"}, {"field":"V_BILL_NO","label":"单据编号","type":"string","value":"{FBillNo}"}, {"field":"V_DATE","label":"收料日期","type":"string","value":"{FDate}"}, {"field":"V_SUPPLIER","label":"供应商","type":"string","value":"{FSupplierId}"}, // ... 其他字段省略 ] }, { "field": "extend_params_1", "label": "extend_params_1", "type": "array", "value": "1", "children": [ {"field":"V_ENTRY_ID","label":"采购收料通知单分录内码","type":"string","value":"{FDetailEntity_FEntryID}"}, {"field":"V_ID","label":"采购收料通知单内码","type":"string","value":"{FID}"}, {"field":"V_SEQ","label":"序号","type":"string","value":"{FDetailEntity_FSeq}"}, // ... 其他字段省略 ] } ], // SQL 插入语句 "otherRequest":[ { "field": "main_sql", "label": "main_sql", ... }, { "field": "extend_sql_1", ... } ] } ``` 在这个配置中,我们定义了两个主要部分:`main_params` 和 `extend_params_1`,分别对应主表和子表的数据字段。每个字段都包含一个键值对,其中键表示目标数据库中的字段名,值表示源数据中的字段名或计算逻辑。 #### 数据写入 一旦数据完成转换,就可以通过API接口将其写入MySQL数据库。以下是用于插入主表和子表数据的SQL语句: ```sql -- 主表插入语句 INSERT INTO `ty_mes`.`mt_pur_receive` (`ID`, `BILL_TYPE`, `BILL_NO`, `DATE`, `SUPPLIER`, ...) VALUES (:V_ID, :V_BILL_TYPE, :V_BILL_NO, :V_DATE, :V_SUPPLIER, ...) ON DUPLICATE KEY UPDATE `BILL_TYPE` = VALUES(`BILL_TYPE`), ... -- 子表插入语句 INSERT INTO `ty_mes`.`mt_pur_receive_entry` (`ENTRY_ID`, `ID`, `SEQ`, `MATERIAL_CODE`, ...) VALUES (:V_ENTRY_ID, :V_ID, :V_SEQ, :V_MATERIAL_CODE, ...) ON DUPLICATE KEY UPDATE `ID` = VALUES(`ID`), ... ``` 在这些SQL语句中,`:V_ID` 等占位符表示从元数据配置中获取的实际值。这些占位符将在执行时被替换为实际的数据,从而完成插入操作。 #### API调用 最后,通过API调用将转换后的数据发送到MySQL数据库。以下是一个简单的API调用示例: ```http POST /execute HTTP/1.1 Host: api.example.com Content-Type: application/json { "main_params": { ... }, ... } ``` 该请求会触发相应的SQL语句执行,将数据插入到MySQL数据库中。 通过上述步骤,我们实现了从源系统到目标系统的数据无缝对接,确保了数据的一致性和准确性。在实际操作中,可以根据具体需求调整元数据配置和SQL语句,以满足不同场景下的数据集成需求。 ![打通钉钉数据接口](https://pic.qeasy.cloud/T9.png~tplv-syqr462i7n-qeasy.image)