使用轻易云平台进行ETL转换并写入MySQL的最佳实践

  • 轻易云集成顾问-胡秀丛
### 金蝶云星空与MySQL的系统对接集成案例分享:GX-广东天一销售订单-表头-拉取 在实际业务场景中,企业常常需要将金蝶云星空中的数据高效地集成到MySQL数据库中,以实现跨平台的数据同步和资源优化利用。为了满足这一需求,我们选择了轻易云数据集成平台进行配置和操作,实现了针对特定业务需求的系统对接。本次技术案例将详细介绍如何通过executeBillQuery接口从金蝶云星空获取销售订单表头数据,并批量写入到MySQL,同时确保数据处理过程的实时监控与异常处理机制。 首先,在本方案(GX-广东天一销售订单-表头-拉取)中,我们充分利用了轻易云的可视化数据流设计工具,通过直观的方式设计并管理整个数据集成流程。这不仅使得开发人员能够快速上手,还极大提高了操作效率。 #### API调用与分页处理 为了获取金蝶云星空上的销售订单表头信息,我们使用其提供的executeBillQuery接口。由于API响应结果可能包含大量的数据,因此我们需要妥善处理分页问题,确保每次请求都能准确无误地抓取所需的数据片段。在实现过程中,通过定义合理的分页逻辑,使得每一次API调用都能稳定、高效地返回预期结果。 ```python def fetch_k3cloud_data(page_index): # 调用金蝶云星空executeBillQuery接口,传递分页参数 response = requests.post(api_url, data={ "formid": "SAL_ORDER", "fieldKeys": "...", # 定义所需字段 "filterString": "...", # 筛选条件,根据业务需求自定义 "pageIndex": page_index, "limit": limit_per_page }) return response.json() ``` #### 数据转换与映射 由于金蝶云星空与MySQL之间存在着不同的数据结构,为保证正确性,需要进行一定的数据转换和映射。这个过程中,自定义转化逻辑被应用于将从源端获得的信息整理为目标数据库所能识别及存储的格式。这一步骤对于维护数据的一致性尤为重要。 ```python def transform_data(raw_record): """ 转换单条原始记录以适应目标数据库结构 """ transformed_record = { 'order_id': raw_record['FOrderID'], 'customer_name': raw_record['FCustomerName'], ... } return transformed_record ``` #### 高效批量写入 MySQL 为了提升写入性能以及降低网络延迟带来的影响,将经过转化后的多条记录打包,通过 execute 接口批量插 ![金蝶与外部系统打通接口](https://pic.qeasy.cloud/D32.png~tplv-syqr462i7n-qeasy.image) ### 调用金蝶云星空接口executeBillQuery获取并加工数据 在数据集成生命周期的第一步,我们需要从源系统获取数据,并进行初步的清洗和加工。本文将详细介绍如何通过轻易云数据集成平台调用金蝶云星空的`executeBillQuery`接口来实现这一目标。 #### 接口配置与请求参数 首先,我们需要配置接口的元数据,以便正确调用金蝶云星空的API。根据提供的元数据配置,我们可以看到以下关键字段: - `api`: `executeBillQuery` - `method`: `POST` - `number`: `FBillNo` - `id`: `FID` - `name`: `FBillNo` 这些字段定义了我们将要调用的API及其基本属性。接下来是请求参数部分,这些参数决定了我们从金蝶云星空获取哪些具体信息。 #### 请求参数详解 元数据中定义了一系列请求参数,每个参数都有其特定的用途和描述: 1. **基本字段** - `FID`: 单据ID - `FBillNo`: 单据编号 - `FDocumentStatus`: 单据状态 - `FSaleOrgId_FNumber`: 销售组织编号 - `FDate`: 日期 - `FCustId_FNumber`: 客户编号 - `FSaleDeptId_Fnumber`: 销售部门编号 - `FBillTypeID_Fnumber`: 单据类型编号 - `FBusinessType`: 业务类型 - `FEntryNote`: 备注 - `FSalerId`: 销售员名称 - `FBillAllAmount`: 价税合计 - `FDeliveryDate`: 交货日期 - `FSaleOrderEntry_FEntryID`: 明细实体主键 - `FNote`: PO号 2. **分页和过滤条件** - 分页参数:`Limit`, `StartRow`, 和`TopRowCount` - 过滤条件:`FilterString` 3. **其他关键字段** - 查询字段集合:`FieldKeys` - 表单ID:`FormId` #### 构建请求示例 为了更好地理解如何构建请求,以下是一个具体的请求示例: ```json { "FormId": "SAL_SaleOrder", "FieldKeys": "FID,FBillNo,FDocumentStatus,FSaleOrgId.FNumber,FDate,FCustId.FNumber,FSaleDeptId.Fnumber,FBillTypeID.Fnumber,FBusinessType,FNote,FSalerId.fname,FBillAllAmount,FDeliveryDate,FSaleOrderEntry_FEntryID", "FilterString": "left(FBillNo,2)<>'YW' and FSaleOrgId.fnumber='T04' and FDocumentStatus='C' and FApproveDate>='2023-01-01'", "Limit": "100", "StartRow": "0" } ``` 在这个示例中,我们指定了表单ID为`SAL_SaleOrder`,查询字段包括多个关键业务字段,并设置了过滤条件以确保只获取特定状态和时间范围内的数据。 #### 数据清洗与加工 一旦成功获取到数据,下一步就是对数据进行清洗和初步加工。这通常包括以下步骤: 1. **数据格式转换**:将原始数据转换为目标系统所需的格式。 2. **字段映射**:根据业务需求,将源系统中的字段映射到目标系统中的对应字段。 3. **数据校验**:检查数据完整性和准确性,确保没有缺失或错误的数据。 例如,对于销售订单的数据,我们可能需要将日期格式从源系统的格式转换为目标系统所需的ISO标准格式,并确保所有必填字段都有值。 #### 自动化与调度 为了确保数据集成过程的高效性和可靠性,可以利用轻易云平台提供的自动化功能。例如,通过配置定时任务(crontab),可以实现每隔一定时间自动拉取新数据并进行处理。 ```json { "crontab": "*/4 * * * *", "takeOverRequest": [ { "field": "FilterString", "value": "FSaleOrgId.fnumber='T04' and FDocumentStatus='C' and FApproveDate>='{{MINUTE_AGO_10|datetime}}'", "type": "string" } ] } ``` 以上配置表示每4分钟执行一次拉取操作,并使用动态时间参数确保只获取最近10分钟内的新数据。 通过上述步骤,我们可以高效地调用金蝶云星空接口获取销售订单表头信息,并对其进行初步清洗和加工,为后续的数据转换与写入阶段打下坚实基础。 ![如何开发钉钉API接口](https://pic.qeasy.cloud/S28.png~tplv-syqr462i7n-qeasy.image) ### 使用轻易云数据集成平台进行ETL转换并写入MySQLAPI接口 在数据集成生命周期的第二步,我们需要将已经集成的源平台数据进行ETL(提取、转换、加载)处理,最终将其写入目标平台MySQLAPI接口。以下是一个详细的技术案例,展示如何配置和执行这一过程。 #### 元数据配置解析 我们使用以下元数据配置来实现数据从源平台到目标平台的转换和写入: ```json { "api": "execute", "effect": "EXECUTE", "method": "POST", "idCheck": true, "request": [ { "field": "main_params", "label": "main_params", "type": "object", "describe": "111", "value": "1", "children": [ {"field":"order_uuid","label":"uuid","type":"string","value":"{FID}{FBillNo}"}, {"field":"FBillNo","label":"单据编号","type":"string","value":"{FBillNo}"}, {"field":"order_date","label":"日期","type":"string","value":"{FDate}"}, {"field":"order_status","label":"状态","type":"string"}, {"field":"company_code","label":"组织","type":"string","value":"GDTY"}, {"field":"is_distribute","label":"分配","type":"string","value":"1"}, {"field":"is_soa_finish","label":"是否对账","type":"string","value":"1"}, {"field":"customer_uuid","label":"客户编号","type":"string","value": "_findCollection find customer_uuid from 764fd6df-ae14-3e85-a31d-978d6ad70224 where customer_code={FCustId_FNumber}"}, {"field":"order_type","label":"订单类型","type":"string","value":"1"}, {"field":"leader","label":"接单人","type":"string","value": "{FSalerId}"}, {"field": "customer_order_no", "label": "客户订单号", "type": "string", "value": "_function CASE '{FNote}' WHEN '' THEN '{FBillNo}' ELSE '{FNote}' END" }, {"field": "FDemandDate", "label": "交货日期", "type": "string", "value": "{FDeliveryDate}" }, {"field": "create_by", "label": "创建人", "type": "string", "value": "11648" }, {"field": "order_price", "label": "订单价格", "type": "string", ... ``` #### 数据提取与转换 在这个步骤中,我们从源系统提取原始数据,并根据元数据配置进行必要的转换。每个字段都有相应的映射规则,例如: - `order_uuid`:通过组合 `{FID}{FBillNo}` 来生成唯一标识。 - `customer_uuid`:通过 `_findCollection` 函数从另一表中查找对应的客户编号。 - `customer_order_no`:通过 `_function CASE` 函数处理空值情况,确保订单号的正确性。 这些转换规则确保了数据在进入目标系统之前已经符合预期格式。 #### 数据写入MySQLAPI接口 完成数据转换后,我们需要将其写入目标平台MySQL。根据元数据配置中的 `otherRequest` 部分,我们构建了一个SQL插入语句: ```sql INSERT INTO oms_order (order_uuid, order_no, order_date, order_status, company_code, is_distribute, is_soa_finish, customer_uuid, order_type, leader, customer_order_no, order_delivery_date, create_by, order_price, payment_type, price_type, kingdee_FID, kingdee_salseORG) VALUES (:order_uuid,:FBillNo,:order_date,:order_status,:company_code,:is_distribute,:is_soa_finish,:customer_uuid,:order_type,:leader,:customer_order_no,:FDemandDate,:create_by,:order_price,:payment_type,:price_type,:kingdee_FID,:kingdee_salseORG) ``` 此SQL语句使用占位符来插入已转换的数据,确保了插入操作的灵活性和安全性。 #### API调用与执行 最后,通过配置API调用参数,我们使用POST方法将构建好的SQL语句发送到MySQLAPI接口: ```json { ... { ... { ... { ... } } } } ``` 在实际操作中,这一过程由轻易云数据集成平台自动化完成,无需手动干预。通过实时监控和日志记录,我们可以随时检查每个环节的数据流动和处理状态,确保整个ETL过程高效、透明。 以上就是一个完整的数据集成案例,从提取、转换到最终写入目标系统,展示了如何利用轻易云数据集成平台实现复杂的数据处理任务。 ![如何开发企业微信API接口](https://pic.qeasy.cloud/T21.png~tplv-syqr462i7n-qeasy.image)