轻易云平台的ETL转换:从金蝶云到MySQL的数据迁移

  • 轻易云集成顾问-吴伟
### MOM-CGDD-采购订单历史数据集成方案:金蝶云星空对接MySQL 在复杂的企业应用环境中,高效的数据集成是确保业务顺畅运行的关键环节。本案例将分享一种具体的数据集成技术方案,将金蝶云星空系统中的采购订单历史数据高效、安全地迁移到MySQL数据库中。该方案名称为“MOM-CGDD-采购订单历史数据-表头表体-状态刷新”,主要通过轻易云数据集成平台实现。 首先,我们需要解决如何调用金蝶云星空提供的executeBillQuery API接口来获取采购订单历史数据。在实际操作中,该接口允许我们基于特定条件查询并分页获取相关的数据记录。这一步骤不仅要确保请求参数配置正确,还需处理API响应中的分页和限流问题,避免因大量数据传输造成性能瓶颈或资源超载。 一旦成功获取到目标数据,就进入了MySQL端的数据写入阶段。此时,我们利用execute API进行高吞吐量的数据批量插入操作。这需要特别注意两点:其一,针对两个系统间可能存在字段及结构差异,我们必须预先定义好相应的数据映射规则;其二,为保证数据一致性与可靠性,需要实时监控写入过程,并在出现异常时立即启动错误重试机制。 另外,在整个过程中,部署集中化的监控和告警系统至关重要。它可以实时跟踪每个集成任务运行状态及性能指标,使技术团队能够及时发现潜在问题并进行快速调整。此外,通过自定义编排复杂的数据转换逻辑,可以更灵活地适应企业多变的业务需求,实现高度定制化的数据信息管理。 使用这一套完整且严谨的方法论,不仅有效解决了跨平台间的大规模数据同步难题,更显著提升了整体运作效率和稳定性,为决策支持提供强有力的数据保障。在后续内容中,我会详细描述每一步具体实施细节与代码示例,请保持关注。 ![金蝶与CRM系统接口开发配置](https://pic.qeasy.cloud/D20.png~tplv-syqr462i7n-qeasy.image) ### 调用金蝶云星空接口executeBillQuery获取并加工数据 在轻易云数据集成平台的生命周期中,调用源系统接口获取数据是至关重要的第一步。本文将详细探讨如何通过调用金蝶云星空接口`executeBillQuery`来获取采购订单历史数据,并对其进行初步加工。 #### 接口配置与请求参数 在进行数据集成时,首先需要配置接口及其请求参数。根据提供的元数据配置,我们可以看到以下关键字段和参数: - **API**: `executeBillQuery` - **Method**: `POST` - **Effect**: `QUERY` - **Request Fields**: - `FID`: 单据ID - `FBillNo`: 单据编号 - `FDocumentStatus`: 单据状态(暂存:Z,创建:A,审核中:B,已审核:C) - `FCloseStatus`: 关闭状态 - `FCloserId`: 关闭人 - `FCloseDate`: 关闭时间 - `FCancelStatus`: 作废状态 - `FCancellerId`: 作废人 - `FCancelDate`: 作废时间 - `FPOOrderEntry_Fseq`: 采购订单明细行号 - `F_ProductLine`: 产品线 - `FPrice`: 单价 - `FTaxPrice`: 含税单价 - `FEntryAmount`: 金额 - `FAllAmount`: 税价合计 - `FPOOrderEntry_FEntryID`: 采购订单明细ID - `FMRPCloseStatus`: 业务关闭状态 - `FMRPFreezeStatus`: 业务冻结状态 - `FFreezerId`: 冻结人 - `FFreezeDate`: 冻结日期 - `FMRPTerminateStatus`: 业务终止状态 - `FTerminaterId`: 终止人 - `FTerminateDate`: 终止日期 此外,还有一些控制请求行为的参数: - **Limit**: 最大行数(5000) - **StartRow**: 开始行索引(动态值) - **TopRowCount**: 返回总行数(整数类型) - **FilterString**: 数据过滤条件(例如:`FModifyDate>='{{LAST_SYNC_TIME|datetime}}' and FPurchaseOrgId.fnumber in ('T02','T02.01')`) - **FieldKeys**: 查询字段集合(以逗号分隔的字符串) - **FormId**: 表单ID(`PUR_PurchaseOrder`) #### 构建请求体 基于上述配置,我们需要构建一个完整的请求体来调用金蝶云星空接口。以下是一个示例请求体: ```json { "FormId": "PUR_PurchaseOrder", "FieldKeys": "FID,FBillNo,FDocumentStatus,FCloseStatus,FCloserId.fname,FCloseDate,FCancelStatus,FCancellerId.fname,FCancelDate,FPOOrderEntry_Fseq,F_ProductLine,FPrice,FTaxPrice,FEntryAmount,FAllAmount,FPOOrderEntry_FEntryID,FMRPCloseStatus,FMRPFreezeStatus,FFreezerId.fname,FFreezeDate,FMRPTerminateStatus,FTerminaterId.fname,FTerminateDate", "FilterString": "FModifyDate>='{{LAST_SYNC_TIME|datetime}}' and FPurchaseOrgId.fnumber in ('T02','T02.01')", "Limit": "5000", "StartRow": "{PAGINATION_START_ROW}", "TopRowCount": true, } ``` #### 数据处理与转换 在成功获取到数据后,需要对数据进行初步处理和转换,以便后续的数据写入和分析。以下是一些常见的数据处理步骤: 1. **数据清洗**: 清洗原始数据,去除无效或重复的数据。例如,可以根据`FBillNo`去重。 2. **字段映射与转换**: 将原始字段映射到目标系统所需的字段。例如,将`FBillNo`映射为目标系统中的订单编号。 3. **状态转换**: 根据业务需求,对单据状态、关闭状态、作废状态等进行转换。例如,将金蝶云星空中的单据状态从字母表示法转换为目标系统中的数字表示法。 4. **日期格式化**: 将日期字段格式化为统一的标准格式,以确保跨系统的一致性。 #### 示例代码 以下是一个示例代码片段,用于调用接口并处理返回的数据: ```python import requests # 构建请求体 payload = { "FormId": "PUR_PurchaseOrder", "FieldKeys": ",".join([ "FID", "FBillNo", "FDocumentStatus", "FCloseStatus", "FCloserId.fname", "FCloseDate", "FCancelStatus", "FCancellerId.fname", "FCancelDate", # ...其他字段... ]), "FilterString": "FModifyDate>='2023-01-01' and FPurchaseOrgId.fnumber in ('T02','T02.01')", "Limit": "5000", "StartRow": "0", "TopRowCount": True, } # 发起POST请求 response = requests.post("https://api.kingdee.com/executeBillQuery", json=payload) # 检查响应状态码并处理返回的数据 if response.status_code == 200: data = response.json() # 数据清洗与转换逻辑... else: print(f"Error: {response.status_code}") ``` 通过上述步骤和代码示例,我们可以有效地调用金蝶云星空接口获取采购订单历史数据,并对其进行初步加工,为后续的数据集成和分析奠定基础。 ![金蝶与WMS系统接口开发配置](https://pic.qeasy.cloud/S21.png~tplv-syqr462i7n-qeasy.image) ### 轻易云数据集成平台生命周期的第二步:将源平台数据转换并写入MySQL API接口 在数据集成过程中,ETL(Extract, Transform, Load)转换是关键的一步。本文将详细探讨如何利用轻易云数据集成平台,将已经集成的源平台数据进行ETL转换,并转为目标平台 MySQL API 接口所能够接收的格式,最终写入目标平台。 #### 数据请求与清洗 在进行数据转换之前,首先需要从源系统请求并清洗数据。假设我们已经完成了这一步,现在重点关注如何将这些清洗后的数据转换为目标系统可接受的格式。 #### 数据转换与写入 在轻易云数据集成平台中,元数据配置是实现数据转换的核心。以下是一个典型的元数据配置示例: ```json { "api": "execute", "effect": "EXECUTE", "method": "POST", "idCheck": true, "request": [ { "field": "main_params", "label": "main_params", "type": "object", "describe": "111", "value": "1", "children": [ {"field": "DOCUMENT_STATUS", "label": "单据状态", "type": "string", "value": "{FDocumentStatus}"}, {"field": "CLOSE_STATUS", "label": "关闭状态", "type": "string", "value": "{FCloseStatus}"}, {"field": "CLOSER_NAME", "label": "关闭人", "type": "string", "value": "{FCloserId}"}, {"field": "CLOSE_DATE", "label": "关闭日期", "type":"string", "value":"_function case '{FCloseDate}' when '' then null else '{FCloseDate}' end"}, {"field":"CANCEL_STATUS","label":"作废状态","type":"string","value":"{FCancelStatus}"}, {"field":"CANCELLER_NAME","label":"作废人","type":"string","value":"{FCancellerId}"}, {"field":"CANCEL_DATE","label":"作废日期","type":"string","value":"_function case '{FCancelDate}' when '' then null else '{FCancelDate}' end"}, {"field":"SYNC_FLAG","label":"SYNC_FLAG","type":"string","value":"1"}, {"field":"FID","label":"FID","type":"string","value":"{FID}"} ] }, { ... } ], ... } ``` ##### 配置解析 1. **API接口定义**: - `api`: 指定API类型,这里是`execute`。 - `method`: HTTP请求方法,这里使用`POST`。 - `idCheck`: 是否进行ID校验,设置为`true`。 2. **主参数定义**: - `main_params`: 定义主要参数,其中包含多个子字段,如`DOCUMENT_STATUS`, `CLOSE_STATUS`, `CLOSER_NAME`, 等等。这些字段对应于源系统的数据字段,通过占位符 `{}` 引用实际值。 - 特别注意日期字段的处理,例如 `CLOSE_DATE` 和 `CANCEL_DATE`,使用 `_function case ... end` 语句来处理空值情况。 3. **扩展参数定义**: - 类似于主参数,扩展参数如 `extend_params_1` 包含更多子字段,用于处理复杂的数据结构。 4. **SQL语句定义**: - `main_sql`: 用于更新主表的SQL语句。例如: ```sql update ty_mes.mt_po_header set DOCUMENT_STATUS=:DOCUMENT_STATUS,CLOSE_STATUS=:CLOSE_STATUS,CLOSER_NAME=:CLOSER_NAME,CLOSE_DATE=:CLOSE_DATE,CANCEL_STATUS=:CANCEL_STATUS,CANCELLER_NAME=:CANCELLER_NAME,CANCEL_DATE=:CANCEL_DATE,SYNC_FLAG=:SYNC_FLAG where KINGDEE_ID=:FID ``` - `extend_sql_1`: 用于更新扩展表的SQL语句。例如: ```sql update ty_mes.mt_po_line set MRP_CLOSE_STATUS=:MRP_CLOSE_STATUS,MRP_FREEZE_STATUS=:MRP_FREEZE_STATUS,FREEZER_NAME=:FREEZER_NAME,FREEZE_DATE=:FREEZE_DATE,MRP_TERMINATE_STATUS=:MRP_TERMINATE_STATUS,TERMINATER_NAME=:TERMINATER_NAME,TERMINATE_DATE=:TERMINATE_DATE,SYNC_FLAG=:SYNC_FLAG where KINGDEE_ENTRYID=:KINGDEE_ENTRYID ``` #### 实际操作步骤 1. **配置元数据**:根据上述示例,在轻易云平台上配置元数据。 2. **执行ETL流程**:通过配置好的API接口和SQL语句,将清洗后的源系统数据转换为目标系统可接受的格式,并写入MySQL数据库。 3. **监控与验证**:使用轻易云提供的实时监控功能,确保每个步骤执行无误,并验证最终写入的数据是否正确。 通过上述步骤,我们能够高效地将源平台的数据转换并写入到目标MySQL数据库中,实现不同系统间的数据无缝对接。这不仅提高了业务透明度和效率,还确保了数据的一致性和准确性。 ![系统集成平台API接口配置](https://pic.qeasy.cloud/T18.png~tplv-syqr462i7n-qeasy.image)