轻易云数据集成平台在ETL转换及写入MySQL中的应用

  • 轻易云集成顾问-谢楷斌
### 金蝶云星空数据集成到MySQL技术案例分享 在企业信息化建设中,跨系统的数据集成往往面临诸多挑战,例如数据格式差异、接口调用限制、批量处理性能等问题。本篇文章将聚焦于通过轻易云平台实现金蝶云星空销售出库单数据高效写入到MySQL数据库的技术方案。 #### 系统对接元数据配置简介 金蝶云星空作为现代ERP系统的代表,其提供丰富的API接口来支持业务流程自动化。在本案例中,我们使用了executeBillQuery API接口从金蝶云星空获取销售出库单数据,同时通过轻易云平台优化后的执行引擎,将这些大量请求结果可靠地写入到了MySQL数据库(execute API)。 #### 数据抓取与转换逻辑设计 为确保销售出库单数据不漏单且高效处理,本方案采用定时任务机制按需抓取金蝶云星空的数据。具体操作过程中,通过自定义的数据转换逻辑,使得不同结构的数据能够适应目标数据库表结构需求,从而保证数据一致性和完整性。此外,针对分页和限流问题进行了特殊处理,以避免对来源服务造成过大的负载压力。 #### 高效批量写入策略 为了应对大规模数据快速写入的需求,轻易云平台内部支持高吞吐量的数据传输能力,这使得我们能够在短时间内将已经清洗并验证完毕的大量销售出库单记录注入MySQL。利用可视化工具便捷管理各个环节,同时保障了整个过程的透明度及监控及时响应。例如,通过实时监控和告警系统,可以随时跟踪每个任务执行情况,并在异常发生时迅速做出反应。 综上所述,该整合方案不仅解决了跨系统间复杂的数据协调与兼容性问题,还显著提升了整体业务运作效率,为企业数字化运营奠定坚实基础。接下来,我们将逐步深入探讨此技术实施细节及关键要点。 ![用友与MES系统接口开发配置](https://pic.qeasy.cloud/D1.png~tplv-syqr462i7n-qeasy.image) ### 使用轻易云数据集成平台调用金蝶云星空接口executeBillQuery获取并加工数据 在数据集成的生命周期中,调用源系统接口获取数据是至关重要的一步。本文将详细介绍如何使用轻易云数据集成平台,通过调用金蝶云星空的`executeBillQuery`接口来获取销售出库单的数据,并进行必要的加工处理。 #### 接口调用配置 首先,我们需要配置API调用的基本信息。根据提供的元数据配置,`executeBillQuery`接口采用POST方法。以下是请求配置的主要字段: ```json { "api": "executeBillQuery", "method": "POST", "number": "FBillNo", "id": "FEntity_FENTRYID", "pagination": { "pageSize": 100 }, "formatResponse": [ { "old": "FDate", "new": "FDate_new", "format": "date" } ] } ``` #### 请求参数设置 为了确保请求能够正确执行,我们需要设置请求参数。这些参数包括单据类型、单据编号、日期等。以下是部分关键字段及其描述: ```json [ {"field":"FBillTypeID_FNumber","label":"单据类型","type":"string","describe":"单据类型","value":"FBillTypeID.FNumber"}, {"field":"FBillNo","label":"单据编号","type":"string","describe":"单据编号","value":"FBillNo"}, {"field":"FEntity_FENTRYID","label":"FEntity_FENTRYID","type":"string","describe":"单据编号","value":"FEntity_FENTRYID"}, {"field":"FDate","label":"日期","type":"string","describe":"日期","value":"FDate"}, {"field":"FSaleOrgId_FNumber","label":"销售组织","type":"string","describe":"组织","value":"FSaleOrgId.FNumber"} ] ``` 这些字段将用于构建请求体,以便从金蝶云星空系统中获取所需的数据。 #### 分页与过滤条件 为了处理大量数据,分页和过滤条件是必不可少的。在本例中,我们设置了分页参数和过滤条件: ```json [ {"field":"Limit","label":"最大行数","type":"string","describe":"金蝶的查询分页参数","value":"{PAGINATION_PAGE_SIZE}"}, {"field":"StartRow","label":"开始行索引","type":"string","describe":"金蝶的查询分页参数","value":"{PAGINATION_START_ROW}"}, {"field":"FilterString","label":"过滤条件","type":"string","describe":"","value":"'FSaleOrgId.FNumber = '100' and F_SEND_FLAG2 = '发送'"} ] ``` 这些参数确保我们能够高效地获取并处理大规模数据,同时通过过滤条件精确定位所需的数据。 #### 数据格式化与转换 在获取到原始数据后,我们可能需要对某些字段进行格式化处理。例如,将日期字段重新命名并格式化为标准日期格式: ```json [ { "old": "FDate", "new": "FDate_new", "format": "date" } ] ``` 这种格式化操作可以确保数据在后续处理阶段更加一致和易于管理。 #### 实际案例:调用API并处理响应 以下是一个实际调用API并处理响应的示例代码片段: ```python import requests import json # API URL url = 'https://api.kingdee.com/executeBillQuery' # 请求头 headers = { 'Content-Type': 'application/json' } # 请求体 payload = { 'FormId': 'SAL_OUTSTOCK', 'FieldKeys': 'FBillNo,FDate,FSaleOrgId.FNumber,F_SEND_FLAG2', 'FilterString': "'FSaleOrgId.FNumber = '100' and F_SEND_FLAG2 = '发送'", 'Limit': 100, 'StartRow': 0 } # 发起请求 response = requests.post(url, headers=headers, data=json.dumps(payload)) # 检查响应状态码 if response.status_code == 200: data = response.json() # 格式化日期字段 for record in data: record['FDate_new'] = format_date(record['FDate']) # 后续处理逻辑... else: print(f"Error: {response.status_code}") ``` 在这个示例中,我们首先构建了请求体,并发起了POST请求以获取销售出库单的数据。随后,对返回的数据进行了日期格式化处理,以便后续使用。 通过以上步骤,我们成功地从金蝶云星空系统中获取了销售出库单的数据,并进行了必要的加工处理。这一过程展示了如何利用轻易云数据集成平台实现高效的数据集成与管理。 ![系统集成平台API接口配置](https://pic.qeasy.cloud/S10.png~tplv-syqr462i7n-qeasy.image) ### 数据集成生命周期中的ETL转换与写入MySQL 在数据集成生命周期的第二步,我们将已经集成的源平台数据进行ETL(提取、转换、加载)转换,最终写入目标平台MySQL。本文将详细探讨如何使用轻易云数据集成平台配置元数据,将销售出库单的数据转换为MySQL API接口能够接收的格式,并写入目标数据库。 #### 1. 数据提取与清洗 首先,我们需要从源系统中提取销售出库单的数据。这个过程涉及到从不同的数据源获取原始数据,并进行初步的清洗和标准化处理,以确保数据的一致性和完整性。 #### 2. 数据转换 在数据转换阶段,我们需要将提取到的数据按照目标系统(MySQL)的要求进行格式化和转换。这一步骤至关重要,因为它决定了数据能否被成功写入目标数据库。 根据提供的元数据配置,我们需要将源数据字段映射到目标数据库表中的相应字段。以下是元数据配置中的具体字段映射: - `fid` 对应 `单据id` - `document_id` 对应 `文档唯一标识号` - `fbill_no` 对应 `单据编号` - `fmaterialid_fnumber` 对应 `物料编码` - `fentry_id` 对应 `明细id` - `funitid_name` 对应 `单位名称` - `fcustid_fnsbtext5` 对应 `购货企业许可证编号` - `fcustid_fname` 对应 `购货企业名称` - `f_app_base_property` 对应 `产品条形码` - `flot` 对应 `批号` - `fqty` 对应 `数量` - `fproduce_date` 对应 `生产日期` - `fdate` 对应 `销售日期` - `fsend_flag` 对应 `发送标识` - `created_at` 和 `updated_at` 分别对应创建时间和更新时间 - `fmaterialId_f_nsb_sccj` 对应生产厂家 我们可以看到,元数据配置中不仅包含了字段名称,还包括了字段类型、描述以及默认值等信息。这些信息对于正确地进行数据转换至关重要。 #### 3. 数据加载与写入 在完成数据转换后,我们需要将处理好的数据通过API接口写入到目标数据库MySQL中。以下是用于插入或更新记录的SQL语句: ```sql INSERT INTO xsck_and_fbsdc (fid, document_id, fbill_no, fentry_id, fdate, fmaterialid_fnumber, fcustid_fnsbtext5, fcustid_fname, f_app_base_property, flot, fqty, fproduce_date, funitid_name, fsend_flag, created_at, updated_at,fmaterialId_f_nsb_sccj) VALUES (:fid, :document_id, :fbill_no, :fentry_id, :fdate, :fmaterialid_fnumber, :fcustid_fnsbtext5, :fcustid_fname, :f_app_base_property, :flot, :fqty, :fproduce_date, :funitid_name, :fsend_flag, :created_at, :updated_at,:fmaterialId_f_nsb_sccj) ON DUPLICATE KEY UPDATE fid = VALUES(fid), document_id = VALUES(document_id), fbill_no = VALUES(fbill_no), fentry_id = VALUES(fentry_id), fdate = VALUES(fdate), fmaterialid_fnumber = VALUES(fmaterialid_fnumber), fcustid_fnsbtext5 = VALUES(fcustid_fnsbtext5), fcustid_fname = VALUES(fcustid_fname), f_app_base_property = VALUES(f_app_base_property), flot = VALUES(flot), fqty = VALUES(fqty), fproduce_date = VALUES(fproduce_date), funitid_name = VALUES(funitid_name), fsend_flag = VALUES(fsend_flag), created_at = VALUES(created_at), updated_at = VALUES(updated_at), fmaterialId_f_nsb_sccj=VALUES(fmaterialId_f_nsb_sccj); ``` 这段SQL语句使用了MySQL的插入或更新(ON DUPLICATE KEY UPDATE)语法,确保在主键冲突时能够更新已有记录,而不是简单地插入新记录。这种方式可以有效避免重复记录的问题。 #### 4. API接口调用 最后一步是通过API接口将转换后的数据发送到目标数据库。根据元数据配置,API调用采用POST方法,具体请求体如下: ```json { "main_params": { "fid": "{FID}", "document_id": "{FID}-{FEntity_FENTRYID}", "fbill_no": "{FBillNo}", "fmaterialid_fnumber": "{FMaterialID_FNumber}", "fentry_id": "{FEntity_FENTRYID}", "funitid_name": "{FUnitID_Name}", "fcustid_fnsbtext5": "{FCustomerID_F_nsb_Text5}", "fcustid_fname": "{FCustomerID_FName}", "f_app_base_property": "{F_nsb_wltm}", "flot": "{FLot}", "fqty": "{FRealQty}", "fproduce_date": "{{FProduceDate|date}}", "fdate": "{{FDate|date}}", "fsend_flag": "{F_SEND_FLAG2}", "created_at": "_function DATE_FORMAT(NOW(), '%Y-%m-%d %H:%i:%s')", "updated_at": "_function DATE_FORMAT(NOW(), '%Y-%m-%d %H:%i:%s')", "fmaterialId_f_nsb_sccj": "{FMaterialId_F_nsb_sccj}" }, "main_sql": "INSERT INTO xsck_and_fbsdc (fid,..." } ``` 通过这种方式,我们可以确保所有必要的数据都被正确地传输并存储在目标数据库中。 以上就是使用轻易云数据集成平台进行ETL转换和写入MySQL的详细技术步骤。通过合理配置元数据并使用API接口,我们能够高效地实现不同系统间的数据无缝对接。 ![用友与外部系统接口集成开发](https://pic.qeasy.cloud/T8.png~tplv-syqr462i7n-qeasy.image)