ETL在数据集成中的应用:从金蝶云星空到MySQL

  • 轻易云集成顾问-胡秀丛
### 金蝶云星空与MySQL数据集成案例分享:受托加工材料入库单 => MySQL 在本技术案例中,我们将详述如何高效地实现金蝶云星空的“受托加工材料入库单”数据集成至MySQL数据库。通过详细拆解每个关键步骤和核心技术点,确保您能够快速掌握这一复杂的系统对接过程。 #### 数据获取:调用金蝶云星空API 首先,我们需要从金蝶云星空系统中提取相关数据。此过程中使用到的重要接口是`executeBillQuery`。这个接口允许我们按照特定的业务需求来抓取所需的入库单信息。在实际操作中,为了解决分页和限流的问题,通常会设置合理的请求参数并且实现一个可靠的数据抓取机制,以确保不会遗漏或重复读取任何记录。 ```python # 示例代码 - 调用 executeBillQuery 接口 import requests def fetch_data_from_kingdee(api_url, headers, payload): response = requests.post(api_url, headers=headers, json=payload) if response.status_code == 200: return response.json() else: # 错误处理逻辑 raise Exception(f"Error fetching data: {response.status_code}") api_url = "https://api.kingdee.com/executeBillQuery" headers = {"Content-Type": "application/json"} payload = { "billType": "BILL_RECEIVE", # 其他必要参数 } data = fetch_data_from_kingdee(api_url, headers, payload) ``` #### 数据转换:自定义数据映射和质量监控 获取到原始数据后,下一步是进行数据转换。这一过程往往涉及多种格式差异处理以及根据业务需求进行定制化的数据映射。同时,通过内置的数据质量监控功能,可以实时检测并解决潜在的异常问题,保证同步至MySQL数据库的数据是一致且准确无误的。 ```python # 示例代码 - 数据转换及质量监控 def transform_data(raw_data): transformed_data = [] for record in raw_data: try: transformed_record = { "id": record["ID"], "materialName": record["MaterialName"], "quantity": int(record["Quantity"]), # 更多字段映射... } transformed_data.append(transformed_record) except KeyError as e: print(f"Missing key in data: {e}") except ValueError as e: print(f"Data conversion error: {e}") return transformed_data transformed_data = transform_data(data) ``` #### 数据写入:高吞吐量批量写入至MySQL 最后,将转换完毕的数据批量插入到MySQL数据库。当面对大量数据时,高吞 ![金蝶云星空API接口配置](https://pic.qeasy.cloud/D34.png~tplv-syqr462i7n-qeasy.image) ### 调用金蝶云星空接口executeBillQuery获取并加工数据 在数据集成生命周期的第一步,我们需要从源系统金蝶云星空中获取数据。本文将详细探讨如何通过调用金蝶云星空的`executeBillQuery`接口来实现这一目标,并对数据进行初步加工。 #### 接口配置与请求参数 首先,我们需要了解`executeBillQuery`接口的基本配置和请求参数。根据元数据配置,接口的主要参数如下: - **API**: `executeBillQuery` - **Method**: `POST` - **Effect**: `QUERY` - **FormId**: `STK_OEMInStock` 请求参数包括以下字段: 1. **实体主键** (`FID`) 2. **单据编号** (`FBillNo`) 3. **入库日期** (`FDate`) 4. **单据类型** (`FBillTypeID.FNumber`) 5. **库存组织** (`FStockOrgId.FNumber`) 6. **审核日期** (`FApproveDate`) 7. **分录ID** (`FBillEntry_FEntryID`) 8. **物料编码** (`FMaterialId.FNumber`) 9. **条码** (`FMATERIALID.FBARCODE`) 10. **物料名称** (`FMaterialName`) 11. **生产厂家** (`FMaterialId.F_nsb_sccj`) 12. **产地** (`FMaterialId.F_APP_Text`) 13. **批号** (`FLot.FNumber`) 14. **规格型号** (`FModel`) 15. **有效期至** (`FExpiryDate`) 16. **生产日期** (`FProduceDate`) 17. **数量** (`FQty`) 18. **单位名称** (`FUnitID.FName`) 19. **供应商** (`FMaterialId.F_nsb_GYS`) 20. **供应类型** (`FMaterialId.F_nsb_GYLB`) 此外,还有一些分页和过滤条件参数: - 最大行数(`Limit`):2000 - 开始行索引(`StartRow`):由分页控制 - 返回总行数(`TopRowCount`) - 过滤条件(`FilterString`):例如,`"FApproveDate>='{{LAST_SYNC_TIME|dateTime}}' and (FMaterialId.F_nsb_GYS <> ' ' and FMaterialId.F_nsb_GYS <> null) and FMaterialId.F_nsb_GYLB = '1'"` #### 请求示例 构建一个典型的请求体如下: ```json { "FormId": "STK_OEMInStock", "FieldKeys": "FID,FBillNo,FDate,FBillTypeID.FNumber,FStockOrgId.FNumber,FApproveDate,FBillEntry_FEntryID,FMaterialId.FNumber,FMATERIALID.FBARCODE,FMaterialName,FMaterialId.F_nsb_sccj,FMaterialId.F_APP_Text,FLot.FNumber,FModel,FExpiryDate,FProduceDate,FQty,FUnitID.FName,FMaterialId.F_nsb_GYS,FMaterialId.F_nsb_GYLB", "FilterString": "FApproveDate>='2022-01-01 00:00:00' and (FMaterialId.F_nsb_GYS <> ' ' and FMaterialId.F_nsb_GYS <> null) and FMaterialId.F_nsb_GYLB = '1'", "Limit": "2000", "StartRow": "{PAGINATION_START_ROW}" } ``` #### 数据处理与清洗 在接收到响应数据后,需要对数据进行初步处理和清洗。以下是几个关键步骤: 1. **字段映射与转换** - 将金蝶返回的数据字段映射到目标数据库的字段。 - 根据业务需求,对部分字段进行格式转换,例如日期格式、数量单位等。 2. **数据过滤** - 根据业务规则,进一步过滤不需要的数据。例如,只保留审核通过且供应商信息完整的记录。 3. **数据校验** - 对关键字段进行校验,确保数据完整性和准确性。例如,检查物料编码是否存在、数量是否为正值等。 #### 数据写入MySQL 完成数据处理后,将清洗后的数据写入目标数据库MySQL。可以使用批量插入操作提高效率,同时确保事务的一致性。 ```sql INSERT INTO processed_data ( FID, FBillNo, FDate, FBillTypeID_FNumber, FStockOrgId_FNumber, FApproveDate, FBillEntry_FEntryID, FMaterialId_FNumber, FMATERIALID_FBARCODE, FMaterialName, FMaterialId_F_nsb_sccj, FMaterialId_F_APP_Text, FLot_FNumber, FModel, FExpiryDate, FProduceDate, FQty, FUnitID_FName, FMaterialId_F_nsb_GYS, FMaterialId_F_nsb_GYLB ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ``` 通过上述步骤,我们成功实现了从金蝶云星空获取并加工受托加工材料入库单的数据,并将其集成到MySQL数据库中。这一过程不仅确保了数据的准确性和一致性,还为后续的数据分析和业务决策提供了坚实基础。 ![打通金蝶云星空数据接口](https://pic.qeasy.cloud/S10.png~tplv-syqr462i7n-qeasy.image) ### 数据集成中的ETL转换与MySQL API接口写入 在数据集成的生命周期中,ETL(Extract, Transform, Load)转换是关键的一步。本文将深入探讨如何将已经集成的源平台数据进行ETL转换,并转为目标平台MySQL API接口所能够接收的格式,最终写入目标平台。 #### 元数据配置解析 在本案例中,我们需要将受托加工材料入库单的数据通过ETL过程转换并写入MySQL数据库。以下是元数据配置的详细解析: ```json { "api": "execute", "effect": "EXECUTE", "method": "POST", "idCheck": true, "request": [ { "field": "main_params", "label": "main_params", "type": "object", "describe": "111", "value": "1", "children": [ {"field": "fid", "label": "单据id", "type": "string", "value":"{FID}"}, {"field": "document_id", "label": "文档唯一标识号", "type": "string", "describe":"111", "value":"{FID}-{FInStockEntry_FEntryId}"}, {"field": "fbill_no", "label":"单据编号", "type":"string", "describe":"111", "value":"{FBillNo}"}, {"field":"fentry_id", "label":"明细id", "type":"string", "describe":"111", "value":"{FInStockEntry_FEntryId}"}, {"field":"fdate","label":"进货日期","type":"string","describe":"111","value":"{{FDate|date}}"}, {"field":"fsupplierid_fnumber","label":"供应商编码","type":"string","value":"_mongoQuery 0ed0cb1c-5c07-3042-859f-bff3d5ee8595 findField=content.FNumber where={\"content.FName\":{\"$eq\":\"{FMaterialId_F_nsb_GYS}\"}}"}, {"field":"fsupplierid_name","label":"供应商名称","type":"string","value":"{FMaterialId_F_nsb_GYS}"}, {"field":"fmaterialid_fnumber","label":"原料编码","type":"string","value":"{FMaterialId_FNumber}"}, {"field":"fmaterialid_name","label":"原料名称","type":"string","value":"{FMaterialName}"}, {"field":"fmaterialid_fapptext","label":"产地","type":"string","value":"{FMaterialId_F_APP_Text}"}, {"field":"fmaterialid_fnsbsccj","label":"生产厂家","type":"string","value":"{FMaterialId_F_nsb_sccj}"}, {"field":...} ] } ], ... } ``` #### 数据请求与清洗 首先,我们需要从源系统中提取数据并进行清洗。元数据配置中的`request`部分定义了我们需要提取的字段及其对应的映射关系。例如: - `fid` 对应 `{FID}` - `document_id` 对应 `{FID}-{FInStockEntry_FEntryId}` - `fbill_no` 对应 `{FBillNo}` 这些字段通过模板字符串从源数据中提取,并进行必要的格式化和转换。例如,日期字段 `fdate` 使用 `{{FDate|date}}` 格式化为标准日期格式。 #### 数据转换 在数据清洗之后,需要对数据进行转换,以符合目标平台MySQL API接口的要求。这里我们利用元数据配置中的 `_mongoQuery` 和 `_function` 来实现复杂的数据转换逻辑。例如: - `fsupplierid_fnumber` 使用 `_mongoQuery` 从MongoDB中查询供应商编码。 - `created_at` 和 `updated_at` 使用 `_function DATE_FORMAT(NOW(), '%Y-%m-%d %H:%i:%s')` 获取当前时间并格式化。 这些字段经过处理后,将被组装成一个完整的数据对象,准备写入目标平台。 #### 数据写入 最后一步是将处理好的数据通过API接口写入MySQL数据库。元数据配置中的 `otherRequest` 部分定义了具体的SQL插入语句: ```sql INSERT INTO cgrk (fid, document_id, fbill_no, fentry_id, fdate, fsupplierid_fnumber, fsupplierid_name, fmaterialid_fnumber, fmaterialid_name, fmaterialid_fapptext, fmaterialid_fnsbsccj, flot, fuom, fexpiry_date, fproduce_date, freal_qty, funitid_name, fsend_flag, created_at, updated_at, type) VALUES (:fid, :document_id, :fbill_no, :fentry_id, :fdate, :fsupplierid_fnumber, :fsupplierid_name, :fmaterialid_fnumber,...) ON DUPLICATE KEY UPDATE fid = VALUES(fid), document_id = VALUES(document_id), fbill_no = VALUES(fbill_no), ... ``` 这段SQL语句不仅插入新记录,还处理重复键更新,确保数据的一致性和完整性。 通过上述步骤,我们实现了从源平台到目标平台的数据ETL转换和写入。每一步都严格按照元数据配置进行,确保了数据处理过程的透明性和可追溯性。这种方法不仅提高了效率,还减少了人为错误,为业务流程提供了可靠的数据支持。 ![如何对接金蝶云星空API接口](https://pic.qeasy.cloud/T21.png~tplv-syqr462i7n-qeasy.image)