轻易云数据集成平台实战:从金蝶云到MySQL的ETL应用

  • 轻易云集成顾问-谢楷斌
### 金蝶云星空数据集成到MySQL的技术案例分享 在本文中,我们将深入探讨如何通过轻易云数据集成平台,实现金蝶云星空系统与MySQL数据库之间的数据对接。本次案例以“xsck-2金蝶销售出库-->mysql”为实例,剖析从金蝶云星空获取销售出库数据,再批量导入到MySQL的全过程。 #### 数据提取:调用executeBillQuery接口 首先,我们需要通过金蝶云星空提供的API接口`executeBillQuery`来抓取相关的销售出库数据。此接口支持按条件查询和分页处理,以确保能够高效且可靠地获取大量业务单据。为了保证不漏单,每次查询需进行严格校验,并妥善处理分页返回的数据: ```java // 示例代码:调用 executeBillQuery 接口 String response = executeBillQuery(apiEndpoint, queryParameters); if (response == null) { // 错误处理逻辑 } ``` #### 数据转换与映射 由于金蝶云星空和MySQL存储的数据格式可能存在差异,在正式写入前,需要针对不同字段进行自定义转化。例如,日期格式、金额精度等都需要统一标准。在轻易云平台上,可视化的数据流设计工具便于管理这些复杂转换逻辑,使得整个流程更加直观清晰。 ```java // 示例代码:字段映射及转换逻辑实现 DataObject transformedData = transformData(fetchData); transformedData.put("dateField", convertDateFormat(fetchData.get("dateField"))); ``` #### 批量写入MySQL:执行execute API 面对大体量的业务单据,高吞吐量的数据写入能力至关重要。我们利用轻易云平台强大的并行处理机制,通过API `execute` 实现高效插入操作。同时,还要兼顾事务一致性和容错能力。如发生异常,则触发重试机制,以减少因网络波动或突发问题导致的数据丢失。 ```sql -- 示例代码:使用批处理方式向 MySQL 插入数据 INSERT INTO sales_outbound_table (column1, column2, ...) VALUES (?, ?, ...), (?, ?, ...), ... ``` #### 监控与告警 在整个数据集成过程中的每一步,都能通过集中式监控系统实时追踪任务状态。在出现性能瓶颈或者异常情况时,可以即时收到告警通知,从而快速定位并解决问题。这种透明可视化管理极大提升了运维效率,并保障了业务连续性。 以上为本篇基于具体实际运行方案所总结的一些关键技术点,下面将在后续章节详细描述各步骤实施细节及具体配置方法。 ![用友与CRM系统接口开发配置](https://pic.qeasy.cloud/D8.png~tplv-syqr462i7n-qeasy.image) ### 调用源系统金蝶云星空接口executeBillQuery获取并加工数据 在数据集成生命周期的第一步中,调用源系统的API接口获取数据是至关重要的一环。本文将详细探讨如何通过轻易云数据集成平台调用金蝶云星空的`executeBillQuery`接口来获取并加工数据。 #### 接口配置与请求参数 首先,我们需要理解元数据配置中的各个字段及其作用。以下是元数据配置的详细解析: ```json { "api": "executeBillQuery", "effect": "QUERY", "method": "POST", "number": "FBillNo", "id": "FEntity_FENTRYID", "idCheck": true, "formatResponse": [ { "old": "FDate", "new": "FDate_new", "format": "date" } ], "request": [ {"field":"FBillNo","label":"FBillNo","type":"string","describe":"单据编号","value":"FBillNo"}, {"field":"FDocumentStatus","label":"FDocumentStatus","type":"string","value":"FDocumentStatus"}, {"field":"FEntity_FENTRYID","label":"FEntity_FENTRYID","type":"string","value":"FEntity_FENTRYID"}, {"field":"FDate","label":"FDate","type":"string","describe":"日期","value":"FDate"}, {"field":"FRealQty","label":"FRealQty","type":"string","describe":"数量","value":"FRealQty"}, {"field":"FAmount","label":"FAmount","type":"string","describe":"单价","value":"FAmount"} ], "otherRequest": [ {"field":"Limit","label":"Limit","type":"string","describe":"金蝶的查询分页参数","value":"2000"}, {"field":"StartRow","label":"StartRow","type":"string","describe":"金蝶的查询分页参数","value":"{PAGINATION_START_ROW}"}, {"field":"TopRowCount","label":"TopRowCount","type":"int"}, {"field":"FilterString","label":"FilterString", "type": "string", "describe": "示例写法 FSupplierId.FNumber = 'VEN00010' and FApproveDate>=", "value": "FDate>='{{LAST_SYNC_TIME|dateTime}}' and FBillNo NOT LIKE '%XSCKD%' or FApproveDate>='{{LAST_SYNC_TIME|dateTime}}' and FBillNo NOT LIKE '%XSCKD%'" }, {"field": "FieldKeys", "label": "FieldKeys", "type": "array", "describe": "金蝶分录主键ID格式:FPOOrderEntry_FEntryId,其它格式 FPurchaseOrgId.FNumber", "value": "{MAIN_REQUEST}" }, {"field": "FormId", "label": "FormId", "type": string, describe: 必须填写金蝶的表单ID如:PUR_PurchaseOrder, value: SAL_OUTSTOCK } ] } ``` #### 请求参数解析 1. **基本请求字段**: - `FBillNo`: 单据编号 - `FDocumentStatus`: 单据状态 - `FEntity_FENTRYID`: 分录主键ID - `FDate`: 日期 - `FRealQty`: 数量 - `FAmount`: 单价 2. **其他请求字段**: - `Limit`: 查询分页参数,限制每次返回的数据条数。 - `StartRow`: 查询分页参数,指定从哪一行开始查询。 - `TopRowCount`: 查询分页参数,指定返回的最大行数。 - `FilterString`: 查询过滤条件,用于筛选符合条件的数据。 - `FieldKeys`: 指定需要返回的字段。 - `FormId`: 表单ID,此处为`SAL_OUTSTOCK`,表示销售出库单。 #### 数据请求与清洗 在调用接口时,需要构建一个POST请求,其中包含上述所有必要的字段。以下是一个示例请求体: ```json { "FormId": "SAL_OUTSTOCK", "FieldKeys": ["FBillNo", ...], ... } ``` 通过这个请求,我们可以从金蝶云星空获取到原始数据。接下来,我们需要对这些数据进行清洗和转换。 #### 数据转换与格式化 在元数据配置中,我们定义了一个`formatResponse`字段,用于指定需要格式化的数据字段。例如,将`FDate`字段格式化为新的日期格式,并命名为`FDate_new`: ```json "formatResponse":[{"old":"FDate",,"new:""FDate_new,""format:""date"}] ``` 这一步骤确保了我们从源系统获取的数据能够符合目标系统的要求。 #### 实际案例操作 假设我们已经成功调用了API并获取了如下原始数据: ```json [ { FBillNo: 'SO2023001', FDocumentStatus: 'C', FEntity_FENTRYID: '1001', FDate: '2023-10-01T00:00:00Z', FRealQty: '10', FAmount: '100' }, ... ] ``` 根据元数据配置,我们将对这些数据进行处理和转换: 1. **日期格式化**: 将`2023-10-01T00:00:00Z`转换为所需的新格式,并命名为`FDate_new`。 2. **字段映射**: 根据需求,将原始字段映射到目标字段,例如将`FBillNo`映射到目标系统中的相应字段。 经过处理后的数据将变得更加规范和易于使用,为后续的数据写入和分析打下坚实基础。 通过以上步骤,我们完成了轻易云数据集成平台生命周期中的第一步:调用源系统金蝶云星空接口获取并加工数据。这不仅提高了数据处理效率,也确保了数据的一致性和准确性。 ![轻易云数据集成平台金蝶集成接口配置](https://pic.qeasy.cloud/S6.png~tplv-syqr462i7n-qeasy.image) ### 使用轻易云数据集成平台进行ETL转换并写入MySQL API接口 在数据集成生命周期的第二步,我们需要将已经集成的源平台数据进行ETL(提取、转换、加载)转换,最终写入目标平台MySQL。以下是详细的技术实现过程。 #### 数据请求与清洗 首先,我们从金蝶销售出库系统中提取原始数据。这些数据可能包含多种格式和类型,因此需要进行初步清洗,以确保数据的一致性和准确性。清洗后的数据将被传递到ETL流程的下一阶段。 #### 数据转换与写入 在这一阶段,我们将使用轻易云数据集成平台提供的元数据配置,将清洗后的数据转换为目标平台MySQL API接口能够接收的格式,并最终写入数据库。 ##### 元数据配置解析 根据提供的元数据配置,我们需要将金蝶销售出库系统中的字段映射到MySQL数据库中的相应字段。以下是具体的字段映射关系: - `FEntity_FENTRYID` 映射到 `明细id` - `FBillNo` 映射到 `单号` - `FDocumentStatus` 映射到 `状态` - `FRealQty` 映射到 `数量` - `FAmount` 映射到 `金额` - `FDate_new` 映射到 `时间` - 固定值 "销售出库" 映射到 `单据类型` ##### 数据转换 通过轻易云平台,我们可以使用以下元数据配置来定义如何将源数据字段映射并转换为目标格式: ```json { "api": "execute", "effect": "EXECUTE", "method": "POST", "idCheck": true, "request": [ { "field": "main_params", "label": "main_params", "type": "object", "describe": "111", "children": [ {"field": "FEntity_FENTRYID", "label": "明细id", "type": "string", "value": "{FEntity_FENTRYID}"}, {"field": "order_no_new", "label": "单号", "type": "string", "value": "{FBillNo}"}, {"field": "FDocumentStatus", "label": "状态", "type": "string", "value": "{FDocumentStatus}"}, {"field": "qty_count", "label": "数量", "type":"string",  "value":"{FRealQty}"}, {"field":"sales_count","label":"金额","type":"string","value":"{FAmount}"}, {"field":"datetime_new","label":"时间","type":"date","value":"{FDate_new}"}, {"field":"Document_Type","label":"单据类型","type":"string","value":"销售出库"} ] } ],  "otherRequest":[ {  "field":"main_sql",  "label":"main_sql",  "type":"string",  "describe":"111",  "value":"INSERT INTO  `kd_xsck`(`FEntity_FENTRYID`,`order_no_new`,`FDocumentStatus`,`qty_count`,`sales_count`,`datetime_new` ,`Document_Type` ) VALUES (:FEntity_FENTRYID,:order_no_new,:FDocumentStatus,:qty_count,:sales_count,:datetime_new,:Document_Type)" } ] } ``` ##### 写入目标平台 通过上述配置,轻易云平台会自动生成SQL语句,并将转换后的数据插入到MySQL数据库中的对应表格。具体执行步骤如下: 1. **生成SQL语句**:根据配置生成插入语句,如下所示: ```sql INSERT INTO kd_xsck(FEntity_FENTRYID, order_no_new, FDocumentStatus, qty_count, sales_count, datetime_new, Document_Type) VALUES (:FEntity_FENTRYID, :order_no_new, :FDocumentStatus, :qty_count, :sales_count, :datetime_new, :Document_Type); ``` 2. **参数绑定**:将源平台的数据字段绑定到生成的SQL语句中的参数。例如: - `:FEntity_FENTRYID` 对应 `{FEntity_FENTRYID}` - `:order_no_new` 对应 `{FBillNo}` - ... 3. **执行插入操作**:通过API调用,将绑定参数后的SQL语句发送给MySQL数据库,完成数据插入操作。 通过以上步骤,源平台的数据成功地被转换并写入到了目标平台MySQL中,实现了不同系统间的数据无缝对接。这一过程充分利用了轻易云平台的全异步、多异构系统支持特性,确保了高效、可靠的数据集成。 ![打通企业微信数据接口](https://pic.qeasy.cloud/T28.png~tplv-syqr462i7n-qeasy.image)