ETL最佳实践:利用轻易云平台处理金蝶退料申请单数据

  • 轻易云集成顾问-钟家寿
### 【仅查询】金蝶退料申请单数据集成:技术实现与案例解析 在进行企业系统对接与数据集成工作时,如何确保各个环节的数据准确性和高效性,是每个工程师所面对的挑战。本文将详细介绍一个实际运行的方案——【仅查询】金蝶退料申请单,通过利用轻易云数据集成平台与金蝶云星空系统进行无缝对接,实现了关键业务数据的平滑流动。 首先,我们需要从金蝶云星空获取相关的退料申请单信息,使用其提供的`executeBillQuery`接口进行数据调用。这一步骤至关重要,因为这确保了所有业务动作得以追溯,并且不会遗漏任何一张关键单据。在实现过程中,需要解决分页和限流问题,以保证在大批量数据抓取时不出现漏单现象。 为此,我们配置了定时任务,在特定时间间隔内可靠地执行API调用,将获得的数据通过轻易云集成平台的写入API `写入空操作`快速保存,同时避免因网络波动等异常情况导致的数据丢失。此过程不仅要考虑到调用接口超时报错后的自动重试机制,还需要处理两端系统之间可能存在的数据格式差异问题。 通过轻易云集成平台自带的数据映射功能,我们能够灵活定义各字段间的转换规则,使得不同来源、格式迥异的数据能顺利对接。此外,对整个处理流程进行了实时监控和日志记录,一旦发生异常能够及时响应并采取措施修复,从而达到稳定、高效运转目标。 后续章节中将更详细阐述具体步骤及代码实现方法,包括如何设置参数、优化性能及故障处理等细节内容。此次实践为企业内部多系统协作、提升运营效率提供了一套完善解决方案,也展示了在复杂环境下高效管理、同步多源数据的重要策略。 ![用友与MES系统接口开发配置](https://pic.qeasy.cloud/D35.png~tplv-syqr462i7n-qeasy.image) ### 调用金蝶云星空接口executeBillQuery获取并加工数据 在数据集成的生命周期中,调用源系统接口获取数据是至关重要的一步。本文将深入探讨如何通过轻易云数据集成平台调用金蝶云星空的`executeBillQuery`接口来获取退料申请单的数据,并对其进行初步加工。 #### 接口配置与调用 首先,我们需要了解如何配置和调用金蝶云星空的`executeBillQuery`接口。以下是元数据配置的详细说明: ```json { "api": "executeBillQuery", "method": "POST", "number": "FBillNo", "id": "FDetailEntity_FEntryID", "pagination": { "pageSize": 500 }, "idCheck": true, "request": [ {"field":"FPURCHASEORGID","label":"采购组织","type":"string","value":"FPURCHASEORGID.FNumber"}, {"field":"FAPPORGID","label":"申请组织","type":"string","value":"FAPPORGID.FNumber"}, {"field":"FSettleTypeId","label":"结算组织","type":"string","value":"FSettleTypeId.FNumber"}, {"field":"FAPPROVEPRICE_F","label":"含税单价","type":"string","value":"FAPPROVEPRICE_F"}, {"field":"FBillNo","label":"FBillNo","type":"string","value":"FBillNo"}, {"field":"FMATERIALID","label":"FMATERIALID","type":"string","value":"FMATERIALID.FNumber"}, {"field":"FSRCBILLNO","label":"源单编码","type":"string","value":"FSRCBILLNO"}, {"field":"FID","label":"FID","type":"string","value":"FID"}, {"field":"FEntity_FEntryID","label":"FEntity_FEntryID","type":"string","value":"FEntity_FEntryID"} ], "otherRequest": [ {"field": "Limit", "label": "最大行数", "type": "string", "describe": "金蝶的查询分页参数", "value": "{PAGINATION_PAGE_SIZE}"}, {"field": "StartRow", "label": "开始行索引", "type": "string", "describe": "金蝶的查询分页参数", "value": "{PAGINATION_START_ROW}"}, {"field": "TopRowCount", "label": "返回总行数", "type": "int", "describe": "金蝶的查询分页参数"}, {"field": "FilterString", "label": "过滤条件", "type": "string", "describe": "示例写法 FSupplierId.FNumber = 'VEN00010' and FApproveDate>=", "value": "FApproveDate>='{{LAST_SYNC_TIME|dateTime}}' and FDocumentStatus='C' FBillTypeID.FNumber='SLD01_SYS'" }, {"field": "FieldKeys", "label": "需查询的字段key集合", "type": "array", "describe": "金蝶分录主键ID格式:FPOOrderEntry_FEntryId,其它格式 FPurchaseOrgId.FNumber", "parser": { "name": "ArrayToString", "params": "," } }, { "field": "FormId", "label": "业务对象表单Id", "type": string, describe: 必须填写金蝶的表单ID如:PUR_PurchaseOrder, value: PUR_MRAPP } ] } ``` #### 请求参数解析 1. **请求方法与API**:我们使用POST方法调用`executeBillQuery` API。 2. **分页设置**:每次请求返回最多500条记录,通过`Limit`和`StartRow`字段控制分页。 3. **字段映射**:请求中包含多个字段,如采购组织、申请组织、结算组织等,这些字段在请求体中以JSON格式传递。 4. **过滤条件**:通过`FilterString`字段设置过滤条件,例如根据审批日期和单据状态筛选数据。 5. **表单ID**:必须指定业务对象表单Id(如PUR_MRAPP),确保查询的是正确的数据对象。 #### 数据请求与清洗 在发送请求后,我们会收到一个包含退料申请单详细信息的数据集。接下来,需要对这些数据进行清洗和初步加工,以便后续处理。 1. **数据验证**:检查返回的数据是否包含所有必要字段,并确保每条记录都有唯一标识符(如FID)。 2. **数据转换**:将原始数据中的复杂结构转换为更易处理的形式。例如,将嵌套JSON对象展开为平面结构。 3. **异常处理**:对于缺失或异常的数据进行标记或剔除,确保后续处理过程中的数据质量。 #### 示例代码 以下是一个简单的Python示例代码,用于调用接口并处理返回的数据: ```python import requests import json # 配置API请求参数 url = 'https://api.kingdee.com/executeBillQuery' headers = {'Content-Type': 'application/json'} payload = { 'FormId': 'PUR_MRAPP', 'FieldKeys': 'FPURCHASEORGID,FAPPORGID,FSettleTypeId,FAPPROVEPRICE_F,FBillNo,FMATERIALID,FSRCBILLNO,FID,FEntity_FEntryID', 'FilterString': f'FApproveDate>="{last_sync_time}" and FDocumentStatus="C"', 'Limit': 500, 'StartRow': start_row } # 发起POST请求 response = requests.post(url, headers=headers, data=json.dumps(payload)) # 检查响应状态码 if response.status_code == 200: data = response.json() # 数据清洗与转换 cleaned_data = [] for record in data['Result']: cleaned_record = { '采购组织': record['FPURCHASEORGID'], '申请组织': record['FAPPORGID'], '结算组织': record['FSettleTypeId'], '含税单价': record['FAPPROVEPRICE_F'], '单据编号': record['FBillNo'], '物料编号': record['FMATERIALID'], '源单编码': record['FSRCBILLNO'], '唯一标识符': record['FID'] } cleaned_data.append(cleaned_record) else: print(f'Error: {response.status_code}, {response.text}') ``` 通过上述步骤,我们可以高效地从金蝶云星空获取退料申请单的数据,并进行初步加工,为后续的数据转换与写入做好准备。这一过程不仅提高了数据集成的效率,也确保了数据质量和一致性。 ![金蝶云星空API接口配置](https://pic.qeasy.cloud/S29.png~tplv-syqr462i7n-qeasy.image) ### 使用轻易云数据集成平台实现金蝶退料申请单ETL转换与写入 在数据集成过程中,ETL(Extract, Transform, Load)是至关重要的一环。本文将详细探讨如何利用轻易云数据集成平台,将金蝶退料申请单的数据进行ETL转换,并通过API接口将其写入目标平台。 #### 数据提取与初步清洗 首先,从金蝶系统中提取退料申请单数据。这一步通常通过API调用或数据库查询实现。假设我们已经获取了如下格式的JSON数据: ```json { "orderId": "12345", "materialCode": "M001", "quantity": 100, "unit": "pcs", "requestDate": "2023-10-01" } ``` #### 数据转换 接下来,我们需要将上述数据转换为轻易云集成平台API接口所能接收的格式。根据元数据配置,目标API接口要求如下: ```json { "api": "写入空操作", "effect": "EXECUTE", "method": "POST", "idCheck": true } ``` 为了满足这一要求,我们需要进行以下几步转换: 1. **字段映射**:将源数据的字段映射到目标API所需的字段。例如,`orderId`可能需要映射为`transactionId`,`materialCode`映射为`itemCode`等。 2. **数据格式调整**:确保日期、数量等字段符合目标API的格式要求。 3. **附加校验**:根据元数据配置中的`idCheck: true`,我们可能需要对ID进行额外校验或处理。 具体转换代码示例如下: ```python def transform_data(source_data): transformed_data = { "transactionId": source_data["orderId"], "itemCode": source_data["materialCode"], "amount": source_data["quantity"], "unitType": source_data["unit"], "dateOfRequest": source_data["requestDate"] } # 如果需要对ID进行校验,可以在此处添加逻辑 if transformed_data["transactionId"] is None or transformed_data["transactionId"] == "": raise ValueError("Transaction ID cannot be null or empty") return transformed_data source_data = { "orderId": "12345", "materialCode": "M001", "quantity": 100, "unit": "pcs", "requestDate": "2023-10-01" } transformed_data = transform_data(source_data) print(transformed_data) ``` #### 数据加载 完成数据转换后,下一步是通过POST请求将数据写入目标平台。根据元数据配置,我们使用HTTP POST方法来执行这一操作。 具体实现代码示例如下: ```python import requests def load_data_to_target(transformed_data): url = 'https://api.qingyiyun.com/write' headers = { 'Content-Type': 'application/json' } response = requests.post(url, json=transformed_data, headers=headers) if response.status_code == 200: print("Data successfully written to target platform") else: print(f"Failed to write data: {response.status_code}, {response.text}") load_data_to_target(transformed_data) ``` #### 实时监控与错误处理 在实际操作中,为了确保数据集成过程的可靠性和透明度,我们还需要加入实时监控和错误处理机制。例如,可以在每一步操作后记录日志,并在发生错误时及时报警。 ```python import logging logging.basicConfig(level=logging.INFO) def load_data_to_target_with_logging(transformed_data): url = 'https://api.qingyiyun.com/write' headers = { 'Content-Type': 'application/json' } try: response = requests.post(url, json=transformed_data, headers=headers) if response.status_code == 200: logging.info("Data successfully written to target platform") else: logging.error(f"Failed to write data: {response.status_code}, {response.text}") except Exception as e: logging.error(f"Exception occurred: {str(e)}") load_data_to_target_with_logging(transformed_data) ``` 通过上述步骤,我们实现了从金蝶系统提取退料申请单数据,经过ETL转换后,通过轻易云集成平台API接口将其成功写入目标平台。这一过程不仅提升了业务透明度和效率,还确保了数据处理的准确性和可靠性。 ![如何对接金蝶云星空API接口](https://pic.qeasy.cloud/T8.png~tplv-syqr462i7n-qeasy.image)