使用轻易云平台进行金蝶云星空数据ETL转换教程

  • 轻易云集成顾问-彭亮
### 案例分享:金蝶云星空销售出库单集成轻易云数据平台 在支持企业数字化转型的过程中,系统之间的数据集成是关键一环。本案例介绍了如何将金蝶云星空中的销售出库单数据高效、稳定地集成到轻易云数据集成平台,从而实现跨系统的无缝对接。 #### 金蝶云星空接口调用与分页处理 为了获取金蝶云星空上的销售出库单信息,我们利用其提供的`executeBillQuery` API。该接口允许我们按需查询并返回相关业务数据。在实际操作中,为了确保不漏单且能够处理大批量数据,需要特别注意分页和限流的问题。 #### 可靠的数据抓取机制 为提升抓取效率和可靠性,本方案采用定时任务来周期性地调用`executeBillQuery`接口,不仅保证了实时性,还能避免因网络或系统异常导致的数据丢失。定时调度器设定合理的运行间隔,通过日志记录每次拉取情况,实现全流程监控与问题追溯。 #### 数据快速写入与格式转换 从金蝶云星空获取的数据格式通常需要进行适配,在写入轻易云之前必须完成必要的数据映射和转换。这一步骤通过自定义映射规则灵活处理,将原始API返回结果按照预期结构整理后再批量导入。使用轻易云写入API进行高速、高效地写入,通过优化连接池设置等方式,加快大量数据的传输过程。 #### 异常处理与错误重试机制 考虑到实际应用场景可能会出现各种非预期状况,如网络波动、超时报错等,设计了一套完整的异常捕获和重试策略。在任何一个步骤发生失败时,立即启动错误记录,并依据配置好的策略自动尝试重新执行直至成功或者达到最大重试次数。此外,引入告警机制,以便管理员及时干预严重故障情形。 通过以上技术方案,以最少的人力投入,高质高效地实现了两个异构系统间的重要业务联通,使得企业运营管理更具透明度和反应速度。下一部分将详细介绍具体实施步骤及代码示例。 ![钉钉与WMS系统接口开发配置](https://pic.qeasy.cloud/D38.png~tplv-syqr462i7n-qeasy.image) ### 调用金蝶云星空接口executeBillQuery获取并加工数据 在数据集成的生命周期中,调用源系统接口获取数据是至关重要的一步。本文将详细探讨如何通过轻易云数据集成平台调用金蝶云星空的`executeBillQuery`接口来查询销售出库单,并对获取的数据进行初步加工。 #### 接口配置与请求参数 首先,我们需要配置调用金蝶云星空接口的元数据。以下是关键的元数据配置项: - **api**: `executeBillQuery` - **effect**: `QUERY` - **method**: `POST` - **number**: `FBillNo` - **id**: `FEntity_FENTRYID` - **name**: `FID` - **idCheck**: `true` 这些配置项定义了我们要调用的API名称、操作类型、HTTP方法以及一些关键字段的映射关系。 #### 请求参数详解 在请求参数部分,我们需要传递一系列字段来指定查询条件和返回结果的格式。以下是一些主要的请求参数及其描述: 1. **FormId** - 字段:`FormId` - 类型:`string` - 描述:业务对象表单Id,必须填写金蝶的表单ID,如:`SAL_OUTSTOCK` 2. **FieldKeys** - 字段:`FieldKeys` - 类型:`array` - 描述:需查询的字段key集合,例如:`FPOOrderEntry_FEntryId, FPurchaseOrgId.FNumber` 3. **FilterString** - 字段:`FilterString` - 类型:`string` - 描述:过滤条件,例如:`FSupplierId.FNumber = 'VEN00010' and FApproveDate>='{{LAST_SYNC_TIME|datetime}}'` 4. **Limit** - 字段:`Limit` - 类型:`string` - 描述:最大行数,用于分页 5. **StartRow** - 字段:`StartRow` - 类型:`string` - 描述:开始行索引,用于分页 6. **TopRowCount** - 字段:`TopRowCount` - 类型:`int` - 描述:返回总行数,用于分页 #### 示例请求配置 下面是一个示例请求配置,用于查询销售出库单: ```json { "FormId": "SAL_OUTSTOCK", "FieldKeys": [ "FBillNo", "FDate", "FCustomerID.FNumber", "FMaterialID.FNumber", "FMustQty", "FRealQty" ], "FilterString": "FApproveDate>='{{LAST_SYNC_TIME|datetime}}'", "Limit": "{PAGINATION_PAGE_SIZE}", "StartRow": "{PAGINATION_START_ROW}", "TopRowCount": 0 } ``` 在这个示例中,我们指定了表单ID为销售出库单(SAL_OUTSTOCK),并选择了一些关键字段如单据编号(FBillNo)、日期(FDate)、客户编号(FCustomerID.FNumber)、物料编码(FMaterialID.FNumber)、应发数量(FMustQty)和实发数量(FRealQty)。同时,我们设置了过滤条件为审批日期大于等于上次同步时间。 #### 数据处理与转换 在成功获取到数据后,我们需要对数据进行初步加工。这包括但不限于以下几项操作: 1. **字段映射与转换** 根据业务需求,将原始数据中的字段映射到目标系统所需的字段。例如,将金蝶中的客户编号(FCustomerID.FNumber)映射到目标系统中的客户ID。 2. **数据清洗** 对获取的数据进行清洗,去除无效或重复的数据。例如,检查实发数量(FRealQty)是否为空或为零,如果是则过滤掉该记录。 3. **格式化处理** 将日期、金额等字段格式化为目标系统所需的格式。例如,将日期字段从字符串格式转换为标准的ISO日期格式。 #### 示例代码片段 以下是一个简单的数据处理示例代码片段: ```python def process_data(raw_data): processed_data = [] for record in raw_data: processed_record = { "bill_no": record.get("FBillNo"), "date": format_date(record.get("FDate")), "customer_id": record.get("FCustomerID.FNumber"), "material_id": record.get("FMaterialID.FNumber"), "must_qty": float(record.get("FMustQty", 0)), "real_qty": float(record.get("FRealQty", 0)) } if processed_record["real_qty"] > 0: processed_data.append(processed_record) return processed_data def format_date(date_str): # 假设输入日期格式为 'YYYY-MM-DD' return date_str.replace("-", "/") ``` 在这个示例中,我们定义了一个函数 `process_data` 来处理原始数据,并将其转换为目标系统所需的格式。同时,还定义了一个辅助函数 `format_date` 来格式化日期字符串。 通过以上步骤,我们可以高效地调用金蝶云星空接口获取销售出库单数据,并对其进行初步加工,为后续的数据写入和进一步处理打下坚实基础。 ![用友与外部系统接口集成开发](https://pic.qeasy.cloud/S29.png~tplv-syqr462i7n-qeasy.image) ### 使用轻易云数据集成平台进行ETL转换并写入目标平台的技术案例 在本案例中,我们将探讨如何使用轻易云数据集成平台将已经集成的源平台数据进行ETL转换,最终转为目标平台API接口所能够接收的格式,并成功写入目标平台。我们将详细介绍如何配置元数据、调用API接口以及处理数据转换过程中的关键技术点。 #### 配置元数据 在进行ETL转换之前,首先需要配置元数据,以便与目标平台API接口对接。以下是本案例中的元数据配置: ```json { "api": "写入空操作", "effect": "EXECUTE", "method": "POST", "idCheck": true } ``` 该配置表示我们将使用POST方法调用名为“写入空操作”的API,并在执行过程中进行ID校验。 #### 数据清洗与转换 数据清洗与转换是ETL过程中的核心步骤。在这一阶段,我们需要确保源平台的数据格式符合目标平台API接口的要求。以下是具体的步骤和技术细节: 1. **提取源数据**:从源平台提取原始销售出库单数据。这一步通常涉及到调用源系统的API或从数据库中查询相关记录。 ```sql SELECT * FROM sales_outbound_orders WHERE status = 'completed'; ``` 2. **数据清洗**:对提取的数据进行清洗,去除无效或冗余信息。例如,删除无效的订单记录或修正错误的数据格式。 ```python cleaned_data = [] for order in raw_data: if validate_order(order): cleaned_data.append(order) ``` 3. **数据转换**:将清洗后的数据转换为目标平台所需的格式。这一步可能涉及到字段映射、类型转换等操作。 ```python transformed_data = [] for order in cleaned_data: transformed_order = { "order_id": order["id"], "customer_name": order["customer_name"], "total_amount": float(order["amount"]), # 其他字段映射 } transformed_data.append(transformed_order) ``` #### 调用API接口 完成数据清洗与转换后,我们需要调用目标平台的API接口,将处理后的数据写入目标系统。根据元数据配置,我们使用POST方法调用“写入空操作”API,并进行ID校验。 ```python import requests url = "https://api.qingyiyun.com/write_empty_operation" headers = { "Content-Type": "application/json" } for data in transformed_data: response = requests.post(url, headers=headers, json=data) if response.status_code == 200: print(f"Order {data['order_id']} successfully written to target platform.") else: print(f"Failed to write order {data['order_id']}: {response.text}") ``` 在上述代码中,我们遍历每一条转换后的订单记录,通过HTTP POST请求将其发送到目标平台。如果响应状态码为200,则表示写入成功;否则,打印错误信息以便进一步排查问题。 #### 实时监控与日志记录 为了确保整个ETL过程的透明度和可追溯性,我们可以在每个关键步骤添加日志记录,并通过轻易云集成平台提供的实时监控功能跟踪数据流动和处理状态。 ```python import logging logging.basicConfig(level=logging.INFO) def log_and_execute(data): logging.info(f"Processing order {data['order_id']}") response = requests.post(url, headers=headers, json=data) if response.status_code == 200: logging.info(f"Order {data['order_id']} successfully written to target platform.") else: logging.error(f"Failed to write order {data['order_id']}: {response.text}") for data in transformed_data: log_and_execute(data) ``` 通过上述代码,我们不仅能够实时监控每一条订单记录的处理状态,还可以在出现问题时快速定位并解决。 以上就是使用轻易云数据集成平台进行ETL转换并写入目标平台的完整技术案例。希望这些技术细节能够帮助您更好地理解和应用该平台,实现高效的数据集成和管理。 ![打通金蝶云星空数据接口](https://pic.qeasy.cloud/T18.png~tplv-syqr462i7n-qeasy.image)