利用轻易云平台完成数据ETL转换及写入MySQL的全过程

  • 轻易云集成顾问-曹润
### 吉客云数据集成到MySQL案例分享:采购入库单查询到BI拉伯塔 在本次技术案例中,我们将深入探讨如何通过轻易云数据集成平台,将吉客云系统中的采购入库单查询数据高效整合至MySQL数据库,以便进一步支持业务分析和决策。这里重点介绍的API接口包括吉客云的`erp.storage.goodsdocin.v2`用于获取采购入库单的数据,以及MySQL的批量写入API `batchexecute`。 #### 集成背景与需求 对于企业来说,实时掌握仓储管理中各类物资的进出情况,是提升供应链效率、保障生产运营正常进行的重要环节。因此,通过统一可靠的数据通道,将吉客云系统中的关键业务数据,如采购入库单信息,准确及时地同步到下游BI系统(如BI拉伯塔)中显得尤为重要。这不仅要求解决大规模数据传输问题,还需处理接口分页和限流、异常重试,以及自定义转换等复杂逻辑。 #### 技术解析 1. **定时可靠的数据抓取** 我们利用轻易云提供的任务调度功能,实现对吉客云接口 `erp.storage.goodsdocin.v2` 的定时调用,以确保能够准时抓取最新的采购入库单数据,并有效避免漏单现象。同时,通过配置分页机制,有效应对大量数据返回场景,确保高吞吐下依然保持良好的性能表现。 2. **批量写入与快速处理** 对于从吉客云获取的大量原始数据,需要通过自定义转换逻辑映射到目标表结构,从而适应MySQL存储需要。为了提高写入速度,我们采用了轻易云平台支持的大容量批量操作特性,通过调用 MySQL 批量执行 API `batchexecute` 实现一次性将多条记录插入数据库,大幅提升了整体处理效率。 3. **监控与异常处理** 集成功能背后强大的实时监控和告警系统,使我们可以随时追踪整个流程中的每个环节。一旦发生错误或不符合预期情况,会立即触发告警并记录详细日志,这些信息有助于我们迅速定位并修复问题。此外,对常见异常建立重试机制,在某些情况下通过自动再尝试来减少人工干预,提高任务完成率。 4. **格式差异及优化** 在实际应用中,由于两套系统的数据格式存在差异,并且可能需要对不同字段进行关联或转化,因此灵活的数据映射工具显得非常重要。在这个过程中, 通过可视化界面设计直接拖拽实现字段匹配,使得 ![如何对接企业微信API接口](https://pic.qeasy.cloud/D7.png~tplv-syqr462i7n-qeasy.image) ### 调用吉客云接口erp.storage.goodsdocin.v2获取并加工数据的技术案例 在数据集成生命周期的第一步中,调用源系统的API接口是至关重要的一环。本文将深入探讨如何通过轻易云数据集成平台调用吉客云接口`erp.storage.goodsdocin.v2`,并对获取的数据进行初步加工。 #### API接口调用配置 首先,我们需要配置API接口的元数据,以便正确地请求和处理数据。以下是元数据配置的详细信息: ```json { "api": "erp.storage.goodsdocin.v2", "method": "POST", "number": "goodsdocNo", "id": "recId", "pagination": { "pageSize": 10 }, "request": [ {"field":"pageIndex","label":"分页页码","type":"int"}, {"field":"pageSize","label":"分页页数","type":"int","value":"100"}, {"field":"goodsDocNo","label":"入库单号","type":"string"}, {"field":"startDate","label":"创建时间的起始时间","type":"string"}, {"field":"endDate","label":"创建时间的结束时间","type":"string"}, {"field":"gmtModifiedStart","label":"主表更新时间起始","type":"string","value":"_function from_unixtime(({LAST_SYNC_TIME}-86400),'%Y-%m-%d %H:%i:%s')"}, {"field":"gmtModifiedEnd","label":"主表更新时间截至","type":"string","value":"_function from_unixtime(({CURRENT_TIME}),'%Y-%m-%d %H:%i:%s')"}, {"field":"inouttype","label":"入库类型","type":"string","describe":"入库类型(100-期初库存 101-采购入库 102-调拨入库 103-盘盈入库 104-其他入库 105-销售退货 106-完工入库 107-组装拆卸入库 108-翻新入库 109-报废入库 110-残次品入库111-成本调整 112-即采即入113-退料入库114-调拨退回115-维修返厂返还)","value":"101"}, {"field":"sourceBillNo","label":"原始单号","type":"string"}, {"field":"warehouseCode","label":"仓库编号","type":"string"}, {"field":"outBillNo","label":"外部单号","type":"string"}, {"field":"vendCode","label":"供应商编号(往来单位)","type":"string"}, {"field":"billNo","label":"上游单据号(关联单号)","type":"string"}, {"field":"userName","label":"创建人名称","type":"string"}, { "field": "selelctFields", "label": "需要返回的字段", "type": "string", "describe": "需要返回的字段,用“,”号拼接,子类用子类名称.属性", "value": "recId,goodsdocNo,billNo,inOutDate,gmtCreate,inouttype,inouttypeName,vendCustomerCode,vendCustomerName,currencyCode,currencyRate,userName,warehouseCode,warehouseName,comment,memo,logisticName,logisticNo,companyId,companyName,logisticType,logisticCode,inOutReason,sourceBillNo,channelId,channelCode,channelName,redStatus,field1,field2,field3,field4,field5,financeBillStatus,applyCompanyId,applyCompanyName,applyCompanyCode,applyDepartId,applyDepartName,departCode,applyUserId,applyUserName,outBillNo,gmtModified,companyCode,vendCustomerId,callbackStatus,..." } ], "beatFlat": ["goodsDocDetailList"] } ``` #### 请求参数解析 在请求参数中,`pageIndex`和`pageSize`用于分页控制。默认情况下,`pageSize`设为100条记录。`goodsDocNo`、`startDate`、`endDate`等字段用于过滤查询条件。 特别值得注意的是两个时间字段: 1. `gmtModifiedStart`: 主表更新时间起始,通过函数 `_function from_unixtime(({LAST_SYNC_TIME}-86400),'%Y-%m-%d %H:%i:%s')` 动态计算。 2. `gmtModifiedEnd`: 主表更新时间截至,通过函数 `_function from_unixtime(({CURRENT_TIME}),'%Y-%m-%d %H:%i:%s')` 动态计算。 这些动态计算确保了我们能够获取到最新更新的数据。 #### 数据请求与清洗 通过上述配置,我们可以发送POST请求至吉客云API `erp.storage.goodsdocin.v2`。假设我们已经成功获取到响应数据,接下来需要对数据进行清洗和初步加工。 ```json { // 示例响应数据 "data":[ { "recId":12345, "goodsdocNo":67890, // 更多字段... "goodsDocDetailList":[ { // 子类详情 "recId":54321, // 更多子类字段... } ] } ] } ``` 在清洗过程中,我们需要确保所有必要字段都已正确提取,并且格式符合目标系统要求。例如,将日期格式统一转换为ISO标准格式,将数值字段进行必要的单位转换等。 #### 数据转换与写入 完成数据清洗后,下一步是将数据转换为目标系统所需的格式,并写入目标数据库或传输至下游系统。在这一过程中,可以利用轻易云平台提供的数据转换工具,实现复杂的数据映射和转换逻辑。 例如,将采购入库单中的商品详情按需拆分,并映射到BI拉伯塔系统中的相应表结构: ```json { // 转换后的目标数据结构示例 "purchaseOrder":{ // 主表信息 ... }, "purchaseOrderDetails":[ // 子表信息 ... ] } ``` 通过这种方式,我们能够确保从源系统获取的数据经过清洗和转换后,无缝对接到目标系统中,实现高效的数据集成。 以上就是调用吉客云接口 `erp.storage.goodsdocin.v2` 获取并加工数据的技术案例。通过合理配置元数据和精细化的数据处理步骤,可以大幅提升数据集成效率和准确性。 ![用友BIP接口开发配置](https://pic.qeasy.cloud/S11.png~tplv-syqr462i7n-qeasy.image) ### 使用轻易云数据集成平台进行ETL转换并写入MySQL 在数据集成的生命周期中,ETL(提取、转换、加载)是关键步骤之一。本文将详细探讨如何使用轻易云数据集成平台将吉客云的采购入库单数据转换为MySQL API接口能够接收的格式,并最终写入目标平台。 #### 数据请求与清洗 在数据请求阶段,我们从源系统(吉客云)获取采购入库单的数据。这里,我们假设已经完成了数据请求和初步清洗,获得了结构化的采购入库单信息。 #### 数据转换与写入 接下来,我们进入数据转换与写入阶段。此阶段的主要任务是将清洗后的数据转换为目标系统(MySQL)所需的格式,并通过API接口写入。 #### 元数据配置解析 元数据配置是ETL过程中的核心部分,它定义了如何将源数据字段映射到目标系统字段。以下是一个详细的元数据配置示例: ```json { "api": "batchexecute", "effect": "EXECUTE", "method": "SQL", "number": "id", "id": "id", "name": "id", "idCheck": true, "request": [ {"field":"recId","label":"入库单ID","type":"string","value":"{recId}"}, {"field":"goodsdocNo","label":"入库单号","type":"string","value":"{goodsdocNo}"}, {"field":"billNo","label":"上游单据号(关联单号)","type":"string","value":"{billNo}"}, {"field":"inOutDate","label":"入库时间","type":"string","value":"{{inOutDate|datetime}}"}, {"field":"gmtCreate","label":"创建时间","type":"string","value":"{{gmtCreate|datetime}}"}, {"field":"inouttype","label":"入库类型编码(100-期初库存 101-采购入库...)","type":"string","value":"{inouttype}"}, {"field":"inouttypeName","label":"入库类型名称(100-期初库存 101-采购入库...)","type":"string","value":"{inouttypeName}"}, {"field":"vendCustomerCode","label":"往来单位编号","type":"string","value":"{vendCustomerCode}"}, {"field":"vendCustomerName","label":"往来单位名称","type":"string","value":"{vendCustomerName}"}, // 更多字段... ], "otherRequest": [ { "field": "main_sql", "label": "主语句", "type": "string", "describe": "SQL首次执行的语句,将会返回:lastInsertId", "value": "REPLACE INTO erp_purch_storage_goodsdocin (recId,goodsdocNo,billNo,inoutDate,gmtCreate,inouttype,inouttypeName,vendCustomerCode,vendCustomerName,currencyCode,currencyRate,userName,warehouseCode,warehouseName,comment,memo,logisticName,logisticNo,companyId,companyName,logisticType,logisticCode,inoutReason,sourceBillNo,channelId,channelCode,channelName,redStatus,field1,field2,field3,field4,field5,financeBillStatus,applyCompanyId,applyCompanyName,applyCompanyCode,applyDepartId,applyDepartName,departCode,applyUserId,applyUserName,outBillNo,gmtModified,companyCode,vendCustomerId,callbackStatus,..." }, { "field": "limit", "label": "limit", "type": "string", "value": "1000" } ] } ``` #### SQL语句构建 在上述配置中,`main_sql`字段定义了插入MySQL数据库的主SQL语句。该语句使用`REPLACE INTO`来确保如果记录已经存在则更新,否则插入新记录。这种方式可以有效地避免重复记录。 ```sql REPLACE INTO erp_purch_storage_goodsdocin ( recId, goodsdocNo, billNo, inoutDate, gmtCreate, inouttype, inouttypeName, vendCustomerCode, vendCustomerName, currencyCode, currencyRate, userName, warehouseCode, warehouseName, comment, memo, logisticName, logisticNo, companyId, companyName, logisticType, logisticCode, inoutReason, sourceBillNo, channelId, channelCode, ... ) VALUES ( ... ) ``` #### 字段映射与格式化 元数据配置中的`request`部分定义了源字段到目标字段的映射关系。例如: - `{"field": "recId", ...}` 映射到 `erp_purch_storage_goodsdocin.recId` - `{"field": "goodsdocNo", ...}` 映射到 `erp_purch_storage_goodsdocin.goodsdocNo` 此外,一些字段需要进行格式化处理,例如日期时间字段: ```json {"field": "inOutDate", ... , "value": "{{inOutDate|datetime}}"} ``` 这里使用了模板引擎语法,将原始日期时间字符串格式化为标准的日期时间格式,以便MySQL能够正确解析和存储。 #### 批量执行与性能优化 为了提高性能,轻易云支持批量执行操作。通过设置`limit`参数,可以控制每次批量处理的数据量。例如: ```json {"field": "limit", ... , "value": "1000"} ``` 这意味着每次最多处理1000条记录,从而在保证性能的同时避免内存溢出等问题。 #### API调用与错误处理 最后,通过API接口将转换后的数据批量写入MySQL数据库。在实际操作中,需要注意以下几点: 1. **API调用频率**:确保API调用频率不会超过目标系统(MySQL)的限制。 2. **错误处理**:捕获并处理API调用过程中可能出现的错误,如网络超时、数据库连接失败等。 3. **日志记录**:记录每次API调用的详细日志,包括成功和失败的信息,以便后续排查问题。 通过上述步骤,我们可以高效地完成从吉客云到MySQL的数据集成过程,实现不同系统间的数据无缝对接。这不仅提高了业务流程的自动化程度,还极大地提升了数据处理效率和准确性。 ![泛微OA与ERP系统接口开发配置](https://pic.qeasy.cloud/T6.png~tplv-syqr462i7n-qeasy.image)