ETL技术在MySQL与金蝶云数据集成中的应用案例

  • 轻易云集成顾问-姚缘
### MySQL数据集成到金蝶云星空的案例分享:MOM-SCRK-生产入库单新增-XSL 在现代企业的数据管理中,如何高效、稳定地进行系统对接和数据集成是至关重要的。本文将介绍一个具体的系统对接案例,即通过轻易云数据集成平台,将MySQL中的生产入库单新增数据(方案名称:MOM-SCRK-生产入库单新增-XSL)集成到金蝶云星空。 #### 数据获取与处理 我们首先需要从MySQL数据库中通过API接口`select`提取相关的生产入库单新增信息。为了确保数据不漏单,我们设置了定时任务以可靠地抓取MySQL接口数据,并处理分页和限流问题,这是保障高吞吐量写入能力的重要前提。 以下为简要流程: 1. **定时任务调度**: 使用轻易云提供的定时任务调度功能,每隔固定时间段自动调用MySQL API `select` 接口,以批量方式获取最新的数据。 2. **分页与限流管理**: 为避免一次性获取大量数据造成性能瓶颈,通过自定义分页机制分批拉取,同时针对接口调用频率做出适当限制,防止触发API速率控制策略。 #### 数据转换与映射 由于MySQL数据库中的表结构可能与金蝶云星空所需的数据格式存在差异,我们使用可视化的数据流设计工具,自定义配置了相应的数据转换逻辑。这一过程不仅包括字段映射,还涉及复杂类型转换,确保每条记录都能准确并无丢失地被写入目标系统。 主要步骤包括: 1. **字段映射**: 通过拖拽式操作界面,将MySQL表中的字段对应到金蝶云星空预设的字段上。此外,根据业务需求添加必要的数据修改规则,例如日期格式转换、数值计算等。 2. **复杂类型处理**: 对于一些复合型或嵌套结构的数据,通过脚本编辑功能实现精细化转码,使其符合目标API `batchSave` 的要求。 #### 数据写入与监控 最后一步,是利用轻易云平台强大的实时监控和告警系统,将清洗后的批量数据快速、安全地写入到金蝶云星空中。这一过程中实施了全面且有效的错误重试机制,确保即使偶尔出现网络波动或服务端异常,也不会导致整个流程失败,而是能够顺利完成后续重试直至成功。同时,实时日志记录帮助运维团队及时发现并解决潜在问题,大幅提升对接效率和准确性。 ![金蝶与外部系统打通接口](https://pic.qeasy.cloud/D17.png~tplv-syqr462i7n-qeasy.image) ### 调用MySQL接口select获取并加工数据 在数据集成生命周期的第一步,我们需要从源系统MySQL中调用接口`select`获取数据,并对其进行初步加工。本文将详细探讨如何通过轻易云数据集成平台配置元数据,完成这一过程。 #### 元数据配置解析 我们使用的元数据配置如下: ```json { "api": "select", "effect": "QUERY", "method": "SQL", "id": "sourceid", "name": "name", "request": [ { "field": "main_params", "label": "主参数", "type": "object", "describe": "对应主查询语句内的动态参数对象", "children": [ { "field": "limit", "label": "返回的记录数", "type": "int", "describe": "你可以使用 LIMIT 属性来设定返回的记录数。", "value": "{PAGINATION_PAGE_SIZE}" }, { "field": "offset", "label": "数据偏移量", "type": "int", "describe": "你可以通过OFFSET指定SELECT语句开始查询的数据偏移量。默认情况下偏移量为0。", "value": "{PAGINATION_START_ROW}" } ] } ], ... } ``` 该配置主要包含以下几个部分: 1. **API和方法**:指定了调用的API为`select`,方法为`SQL`。 2. **请求参数**:定义了两个主要参数`limit`和`offset`,用于控制查询结果的分页。 3. **主查询语句**:提供了具体的SQL查询语句。 #### SQL查询语句详解 ```sql select t1.header_id as sourceid, CONCAT('MSCRK', DATE_FORMAT(t1.transaction_date, '%Y%m%d'), t1.header_id) as '单据编号', ifnull((select aft_quiet_time from ty_mes.mt_account_period where t1.transaction_date < aft_quiet_time order by account_period_id desc limit 1), t1.transaction_date) as '单据日期', t2.attribute2 as '生产订单主键', t2.attribute3 as '生产订单明细主键', SUBSTRING_INDEX(t1.wo_number, '_', 1) as '生产订单号', SUBSTRING_INDEX(t1.wo_number, '_', -1) as '生产订单行号', t1.attribute15 as '生产汇报单主键', t1.attribute14 as '生产汇报单号', t2.material_code as '物料编码', t1.ok_qty as '入库数量', t1.warehouse_code as '仓库', if((select count(1) from ty_mes.mt_work_order ct1 join tarzan_method.MT_MATERIAL_SITE ct2 on ct1.tenant_id = ct2.tenant_id and ct1.material_id = ct2.material_id and ct2.site_id = 8001 join tarzan_method.mt_material_site_attr ct3 on ct2.tenant_id = ct3.tenant_id and ct2.material_site_id = ct3.material_site_id and ct3.attr_name = 'ATTRIBUTE1' and ct3.attr_value = 'Y' where ct1.tenant_id = 7 and ct1.work_order_num = t1.wo_number) > 0, t2.manufacturing_site_code, '') as '批号', ifnull(t3.value, '15040501') as '生产车间', t2.manufacturing_site_code as '生产组织', t2.tracking_num as '计划跟踪号' from ty_mes.hme_wo_report_process_itf t1 left join ty_aps.hps_make_order_iface t2 on t1.tenant_id = t2.tenant_id and t1.wo_number = t2.make_order_num left join hzero_platform.hpfm_lov_value t3 on t2.department = t3.meaning and t3.lov_code = 'TY.KINGDEE.WORKSHOP' and t2.manufacturing_site_code = t3.tag and t3.enabled_flag = 1 where t1.tenant_id = 7 and t1.iface_sequence = 4 and t1.attribute12 = 'S' and t1.attribute7 in ('', 'E') limit :limit offset :offset ``` 该SQL查询语句从多个表中提取了相关字段,并进行了必要的字段转换和拼接操作。具体步骤如下: - **字段选择与别名**:选择了多个字段,并使用别名进行标识,如`t1.header_id`被命名为`sourceid`。 - **字符串拼接**:使用`CONCAT`函数将多个字段拼接成一个新的字段,如生成单据编号。 - **条件查询与子查询**:通过子查询获取特定条件下的数据,如获取最近的会计期间结束时间。 - **条件判断与默认值**:使用`ifnull`函数设置默认值,确保在某些字段为空时提供备选值。 #### 动态参数处理 在元数据配置中,我们定义了两个动态参数: - `limit`: 用于限制返回记录数。 - `offset`: 用于指定数据偏移量。 这些参数在实际执行时会被替换为具体值,例如分页大小和起始行数。这种设计使得SQL查询具有高度灵活性,可以根据不同需求进行调整。 #### 数据请求与清洗 在调用MySQL接口获取数据后,轻易云平台会对返回的数据进行初步清洗。这包括: - 去除空值或无效值。 - 格式化日期、数字等字段。 - 根据业务需求进行简单的数据转换,如单位换算等。 这种预处理确保了后续的数据转换与写入过程更加高效和准确。 通过上述步骤,我们成功实现了从MySQL源系统中调用接口获取并加工数据,为后续的数据集成工作奠定了坚实基础。 ![用友与WMS系统接口开发配置](https://pic.qeasy.cloud/S9.png~tplv-syqr462i7n-qeasy.image) ### 使用轻易云数据集成平台进行ETL转换并写入金蝶云星空API接口的技术案例 在数据集成生命周期的第二步,我们需要将源平台的数据通过ETL(提取、转换、加载)过程转换为目标平台金蝶云星空API接口能够接收的格式,并最终写入目标平台。本文将详细探讨如何使用轻易云数据集成平台配置元数据来实现这一过程。 #### 配置元数据解析 在本案例中,我们将通过轻易云数据集成平台,将生产入库单新增的数据写入金蝶云星空系统。以下是具体的元数据配置: ```json { "api": "batchSave", "effect": "EXECUTE", "method": "POST", "idCheck": true, "operation": { "rowsKey": "array", "rows": 1, "method": "batchArraySave" }, "request": [ {"field":"FBillNo","label":"单据编号","type":"string","describe":"单据编号","value":"{{单据编号}}"}, {"field":"FBillType","label":"单据类型","type":"string","describe":"单据类型","parser":{"name":"ConvertObjectParser","params":"FNumber"},"value":"SCRKD01_SYS"}, {"field":"FDate","label":"日期","type":"string","describe":"日期","value":"{{单据日期}}"}, {"field":"FPrdOrgId","label":"生产组织","type":"string","describe":"生产组织","parser":{"name":"ConvertObjectParser","params":"FNumber"},"value":"{{生产组织}}"}, {"field":"FStockOrgId","label":"入库组织","type":"string","describe":"入库组织","parser":{"name":"ConvertObjectParser","params":"FNumber"},"value":"{{生产组织}}"}, {"field":"FOwnerTypeId0","label":"货主类型","type":"string","describe":"货主类型","value":"BD_OwnerOrg"}, {"field":"FOwnerId0","label":"货主","type":"string","describe":"货主","parser":{"name":"ConvertObjectParser","params":""}}, {"field":"FDescription","label":"备注","type":"","describe":"","value":""}, {"field":"","label":"","type":"","describe":"","value":""}, { "field": "FEntity", "label": "明细", "type": "array", "describe": "", "children":[ {"field":"","label":"","type":"","describe":"","value":""} ] } ], ... } ``` #### 数据请求与清洗 在ETL过程的第一步,我们需要从源系统中提取数据,并进行必要的清洗和预处理。以下是一些关键字段及其对应的解析和转换逻辑: - **单据编号(FBillNo)**:直接从源系统中提取。 - **单据类型(FBillType)**:固定值“SCRKD01_SYS”,通过`ConvertObjectParser`进行解析。 - **日期(FDate)**:直接从源系统中提取。 - **生产组织(FPrdOrgId)**:通过`ConvertObjectParser`解析为目标系统识别的编码。 - **入库组织(FStockOrgId)**:同样通过`ConvertObjectParser`解析。 这些字段将在后续步骤中被进一步处理和转换,以符合金蝶云星空API接口的要求。 #### 数据转换与写入 在ETL过程的第二步,我们将已经清洗好的数据进行转换,并通过API接口写入目标系统。以下是具体的操作步骤: 1. **设置请求方法和路径**: - 请求方法为POST,路径为`/k3cloud/Kingdee.BOS.WebApi.ServicesStub.DynamicFormService.BatchSave`. 2. **配置请求体**: - 请求体包含多个字段,每个字段都需要按照金蝶云星空API接口的要求进行配置。例如,`FBillNo`字段对应的是源系统中的“单据编号”,而`FBillType`则是一个固定值,需要通过特定解析器进行处理。 3. **批量保存操作**: - 使用`batchArraySave`方法进行批量保存操作。该方法可以有效地提高数据写入效率,特别是在需要处理大量数据时。 4. **子表明细处理**: - 在请求体中,子表明细部分使用了嵌套数组结构。例如,明细表中的每一行都包含多个字段,如“物料编码”、“单位”、“应收数量”等。这些字段同样需要按照特定规则进行解析和转换。 5. **提交并审核**: - 在其他请求参数中,设置了自动提交并审核功能(IsAutoSubmitAndAudit),确保数据在写入后立即生效。 #### 示例代码 以下是一个简化的示例代码片段,用于展示如何使用轻易云数据集成平台配置元数据并调用金蝶云星空API接口: ```python import requests import json url = 'https://your-k3cloud-url/k3cloud/Kingdee.BOS.WebApi.ServicesStub.DynamicFormService.BatchSave' headers = {'Content-Type': 'application/json'} data = { 'FormId': 'PRD_INSTOCK', 'Operation': 'BatchSave', 'IsAutoSubmitAndAudit': True, 'IsVerifyBaseDataField': True, 'Model': { 'FBillNo': '{{单据编号}}', 'FBillType': {'FNumber': 'SCRKD01_SYS'}, 'FDate': '{{单据日期}}', ... } } response = requests.post(url, headers=headers, data=json.dumps(data)) print(response.json()) ``` 以上代码展示了如何构建请求体并调用金蝶云星空API接口,实现生产入库单新增功能。通过这种方式,可以确保源系统的数据被准确、高效地转换并写入到目标系统中。 #### 总结 本文详细探讨了如何使用轻易云数据集成平台进行ETL转换,并将数据写入金蝶云星空API接口。在实际操作中,通过合理配置元数据,可以大大简化复杂的数据集成任务,提高整体效率和准确性。 ![金蝶与SCM系统接口开发配置](https://pic.qeasy.cloud/T29.png~tplv-syqr462i7n-qeasy.image)