从数据抓取到写入:金蝶云星空ETL项目解析

  • 轻易云集成顾问-潘裕
### 金蝶员工查询系统对接案例分享 在现代企业的信息化建设过程中,实现各业务系统之间的数据集成至关重要。本文将重点介绍如何通过使用轻易云数据集成平台,将金蝶云星空的API executeBillQuery 获取到的员工信息数据,批量写入另一实例中。 此次项目名为“金蝶员工查询”,主要解决了以下几个技术难题与挑战: 1. **高吞吐量的数据写入能力**:确保大量员工信息数据能够快速、高效地被集成至目标金蝶云星空系统中。 2. **API资产管理**:通过高度可视化的控制台和统一视图,对所有API请求和响应进行集中监控,使得接口调用情况一目了然,从而实现资源优化配置。 3. **自定义数据转换逻辑**:针对不同业务场景下,对获取的数据进行格式转换,以满足目标系统的具体需求,并处理源、目标端接口分页及限流问题。 #### 项目实施细节概述 本次项目采用executeBillQuery获取源金蝶云星空的员工数据,通过定时触发机制定期抓取,确保不漏单。在整个过程中,我们使用批量写入 API batchSave 实现大量数据快速而可靠地导入目标系统。同时,为保障稳定性与实时性,还利用了如下技术手段: - **实时监控及告警**:在执行每个任务环节时,开发团队借助轻易云平台提供的集中监控功能,不仅可以追踪任务状态,还能及时处理异常状况,提高整体效率和准确性。 - **异常重试机制**:为了应对网络波动或其他不可预测因素导致的数据传输失败,我们实现了一套完整的错误重试机制,当发现某些记录未成功存储时,可自动重新发起请求以保证最终操作的一致性。 接下来,我们将详细探讨具体方案设计,包括如何设置获取接口executeBillQuery以及调用batchSave 写入接口的方法,以及处理过程中的各种技术要点及其解决方案。 ![金蝶与WMS系统接口开发配置](https://pic.qeasy.cloud/D15.png~tplv-syqr462i7n-qeasy.image) ### 调用源系统金蝶云星空接口executeBillQuery获取并加工数据 在数据集成生命周期的第一步,我们需要从源系统获取数据,并对其进行初步加工。本文将详细探讨如何通过轻易云数据集成平台调用金蝶云星空接口`executeBillQuery`来实现这一目标。 #### 接口配置与请求参数 首先,我们需要了解接口的基本配置和请求参数。根据提供的元数据配置,`executeBillQuery`接口的主要特性如下: - **API名称**: `executeBillQuery` - **请求方法**: `POST` - **主要字段**: - `FNumber`: 员工编号 - `FID`: 员工ID - `FName`: 员工姓名 - `FMobile`: 员工手机 - `FEmail`: 员工邮箱 - `FPostDept`: 所属部门 - `FBaseProperty3`: 基本属性 此外,还有一些分页和过滤参数,例如: - `Limit`: 查询分页参数,指定每页返回的数据条数。 - `StartRow`: 查询分页参数,指定查询起始行。 - `TopRowCount`: 查询分页参数,指定返回的前几行数据。 - `FilterString`: 过滤条件字符串,例如:`FSupplierId.FNumber = 'VEN00010' and FApproveDate>=`。 - `FieldKeys`: 指定需要返回的字段。 - `FormId`: 表单ID,例如:`BD_Empinfo`。 #### 请求示例 为了更好地理解如何调用该接口,我们可以构建一个具体的请求示例。假设我们需要查询所有状态为“已审核”的员工信息,每次查询返回10条记录,从第0行开始。请求体可以如下构建: ```json { "FormId": "BD_Empinfo", "FieldKeys": ["FNumber", "FID", "FName", "FMobile", "FEmail", "FPostDept", "FBaseProperty3"], "FilterString": "FDocumentStatus='C'", "Limit": 10, "StartRow": 0 } ``` #### 数据清洗与加工 在获取到原始数据后,我们需要对其进行清洗和加工,以便后续的数据转换与写入。以下是一些常见的数据清洗操作: 1. **字段重命名**: 将原始字段名转换为目标系统所需的字段名。例如,将`FNumber`重命名为`EmployeeNumber`。 2. **数据类型转换**: 确保每个字段的数据类型符合目标系统的要求。例如,将字符串类型的日期转换为日期对象。 3. **缺失值处理**: 对于缺失值,可以选择填充默认值、删除对应记录或进行其他处理。 以下是一个简单的数据清洗示例代码(假设使用Python): ```python import json # 假设response_data是从金蝶接口获取到的原始数据 response_data = [ {"FNumber": "E001", "FID": "1001", "FName": "张三", "FMobile": "13800000000", "FEmail": "zhangsan@example.com", "FPostDept": "销售部", "FBaseProperty3": ""}, # 更多记录... ] # 清洗后的数据列表 cleaned_data = [] for record in response_data: cleaned_record = { "EmployeeNumber": record["FNumber"], "EmployeeID": record["FID"], "EmployeeName": record["FName"], "MobilePhone": record["FMobile"], "EmailAddress": record["FEmail"], "Department": record["FPostDept"], # 对于空值,可以选择填充默认值或进行其他处理 "BaseProperty3": record["FBaseProperty3"] if record["FBaseProperty3"] else None, } cleaned_data.append(cleaned_record) # 将清洗后的数据转换为JSON格式,便于后续处理 cleaned_data_json = json.dumps(cleaned_data, ensure_ascii=False) print(cleaned_data_json) ``` #### 实时监控与调试 在实际操作中,实时监控和调试是确保数据集成过程顺利进行的重要环节。轻易云平台提供了全透明可视化的操作界面,可以实时监控数据流动和处理状态。在调用接口和处理数据时,可以通过日志记录、异常捕获等方式及时发现并解决问题。 例如,在Python代码中添加日志记录: ```python import logging # 配置日志记录 logging.basicConfig(level=logging.INFO) try: # 假设调用金蝶接口并获取响应数据 response_data = call_executeBillQuery_api() logging.info("成功获取到响应数据") # 数据清洗过程... except Exception as e: logging.error(f"发生错误: {e}") ``` 通过上述步骤,我们可以高效地调用金蝶云星空接口获取并加工员工信息,为后续的数据转换与写入打下坚实基础。 ![如何对接用友BIP接口](https://pic.qeasy.cloud/S15.png~tplv-syqr462i7n-qeasy.image) ### 使用轻易云数据集成平台将数据转换并写入金蝶云星空API接口 在数据集成生命周期的第二步,我们需要将已集成的源平台数据进行ETL转换,确保其符合目标平台——金蝶云星空API接口所能接收的格式,并最终成功写入目标平台。以下是具体的技术实现过程。 #### 元数据配置解析 首先,我们需要理解元数据配置,以便正确地进行API调用。以下是元数据配置的详细信息: ```json { "api": "batchSave", "effect": "EXECUTE", "method": "POST", "idCheck": true, "otherRequest": [ { "field": "FormId", "label": "FormId", "type": "string", "describe": "必须填写金蝶的表单ID如:PUR_PurchaseOrder", "value": "PLN_FORECAST" }, { "field": "Operation", "label": "Operation", "type": "string", "value": "BatchSave" }, { "field": "IsAutoSubmitAndAudit", "label": "IsAutoSubmitAndAudit", "type": "bool", "value": true }, { "field": "IsVerifyBaseDataField", "label": "IsVerifyBaseDataField", "type": "bool", "describe":"是否验证所有的基础资料有效性,布尔类,默认false(非必录)", "value": false } ], "operation":{ "method":"batchArraySave", "rows":1, "rowsKey":"array" } } ``` #### API接口调用步骤 1. **构建请求体**: 根据元数据配置,我们需要构建一个符合金蝶云星空API要求的请求体。请求体主要包括`FormId`、`Operation`、`IsAutoSubmitAndAudit`和`IsVerifyBaseDataField`等字段。 ```json { "FormId":"PLN_FORECAST", "Operation":"BatchSave", “IsAutoSubmitAndAudit”:true, “IsVerifyBaseDataField”:false, “Data”:[...] // 数据数组 } ``` 2. **数据转换**: 将源平台的数据转换为目标平台所需的数据格式。假设源平台的数据如下: ```json [ {"EmployeeID":"E001","Name":"John Doe","Department":"Sales"}, {"EmployeeID":"E002","Name":"Jane Smith","Department":"Marketing"} ] ``` 转换后应符合金蝶云星空API要求的数据结构: ```json [ { “FStaffNumber”:”E001”, “FStaffName”:”John Doe”, “FDeptName”:”Sales” }, { “FStaffNumber”:”E002”, “FStaffName”:”Jane Smith”, “FDeptName”:”Marketing” } ] ``` 3. **发送HTTP请求**: 使用POST方法将构建好的请求体发送到金蝶云星空API接口。 ```python import requests url = 'https://api.kingdee.com/batchSave' headers = { 'Content-Type': 'application/json', 'Authorization': 'Bearer your_access_token' } payload = { 'FormId': 'PLN_FORECAST', 'Operation': 'BatchSave', 'IsAutoSubmitAndAudit': True, 'IsVerifyBaseDataField': False, 'Data': [ {'FStaffNumber':'E001', 'FStaffName':'John Doe', 'FDeptName':'Sales'}, {'FStaffNumber':'E002', 'FStaffName':'Jane Smith', 'FDeptName':'Marketing'} ] } response = requests.post(url, json=payload, headers=headers) if response.status_code == 200: print('Data successfully written to Kingdee Cloud') else: print('Failed to write data:', response.text) ``` #### 注意事项 - **字段验证**:确保所有字段都符合金蝶云星空API接口的要求,特别是必填字段。 - **错误处理**:在实际应用中,需要对API返回结果进行详细的错误处理,以便及时发现和解决问题。 - **性能优化**:对于大批量数据,可以考虑分批次提交,以提高效率和成功率。 通过以上步骤,我们可以高效地将源平台的数据转换并写入到金蝶云星空,实现不同系统间的数据无缝对接。这不仅提升了业务流程的自动化程度,也极大提高了数据处理的准确性和效率。 ![打通金蝶云星空数据接口](https://pic.qeasy.cloud/T4.png~tplv-syqr462i7n-qeasy.image)