使用轻易云平台实现ETL转换及数据写入

  • 轻易云集成顾问-杨嫦
### 金蝶云星空数据集成到轻易云集成平台的应用案例:查询金蝶客户--ok 在本篇技术案例中,我们将深入探讨如何通过轻易云集成平台实现对金蝶云星空系统中客户数据的高效对接和实时监控。本方案具体命名为“查询金蝶客户--ok”。为了满足企业复杂多变的数据处理需求,本方案依赖于两个关键API接口:用于从金蝶云星空获取数据的`executeBillQuery`和向轻易云集成平台写入数据的API。 #### 系统架构与设计思路 轻易云集成平台提供了面向全生命周期管理的数据处理能力,通过其高吞吐量的数据写入机制,可以快速集中大量来自金蝶云星空的数据。结合可视化操作界面,整个数据集成过程变得简单并且直观。另外,该平台还具备强大的监控和告警系统,能够实时跟踪每一个数据流动环节,从而确保流程透明、安全,并迅速响应潜在问题。 我们首先着眼于使用`executeBillQuery` API来抓取金蝶客户信息。这一步骤需要考虑分页和限流策略。通过系统自带的定时任务功能,实现可靠且稳定的数据抓取。在获取到原始数据后,将其进行一定格式转换,使之符合目标存储格式。随后,通过调用轻易云相应写入API,实现批量、高效地将清洗后的数据信息推送至目标数据库中。 #### 技术难点与解决方案 1. **分页与限流** - 在实际操作过程中,为了避免对源服务器造成过大负担,同时保证所有客户信息的不漏单,我们启用了分页机制,并定义了合理的限流策略。 2. **自定义转换逻辑** - 由于两边系统可能存在字段差异或业务逻辑上的不同,必须设置适当的数据映射规则,以便正确匹配相关字段。这个过程中主要利用可视化工具设计对应转换规则,提高开发灵活性及维护性。 3. **异常处理及错误重试机制** - 考虑到网络波动或暂时性故障等不可预见因素,在接口调用失败时允许设置重试次数以及延迟时间,从而增强系统整体鲁棒性。 4. **实时监控与日志记录** - 利用集中监控模块,对核心链路进行状态追踪、性能分析以及异常报警。同时全面记录各个节点执行日志,为后续排查问题提供依据。 5. **确保无遗漏** - 对于每次执行结果详细校验,如有未成功部分会自动补发,防止任何一条有效记录丢失。 ![金蝶与MES系统接口开发配置](https://pic.qeasy.cloud/D25.png~tplv-syqr462i7n-qeasy.image) ### 调用金蝶云星空接口executeBillQuery获取并加工数据 在数据集成的生命周期中,第一步是从源系统获取数据。本文将深入探讨如何通过轻易云数据集成平台调用金蝶云星空接口`executeBillQuery`来获取客户信息,并对数据进行初步加工。 #### 接口配置与调用 首先,我们需要配置并调用金蝶云星空的`executeBillQuery`接口。该接口主要用于查询业务对象的数据。在本案例中,我们将查询客户信息。 元数据配置如下: ```json { "api": "executeBillQuery", "effect": "QUERY", "method": "POST", "number": "FNumber", "id": "FCUSTID", "name": "FNumber", "idCheck": true, "request": [ {"field":"FCUSTID","label":"FCUSTID","type":"string","describe":"FCUSTID","value":"FCUSTID"}, {"field":"FNumber","label":"编码","type":"string","describe":"编码","value":"FNumber"}, {"field":"FName","label":"名称","type":"string","describe":"名称","value":"FName"}, {"field":"FCreateOrgId_FNumber","label":"创建组织","type":"string","describe":"创建组织","value":"FCreateOrgId.FNumber"}, {"field":"FUseOrgId_FNumber","label":"使用组织","type":"string","describe":"使用组织","value":"FUseOrgId.FNumber"}, {"field":"FDescription","label":"描述","type":"string","describe":"描述","value":"FDescription"}, {"field":"FIsTrade","label":"是否交易客户","type":"string","describe":"是否交易客户","value":"FIsTrade"}, {"field":"FCustTypeId_FNumber","label":"客户类别","type":"string","describe":"客户类别","value":"FCustTypeId.FNumber"}, {"field":"FGroup_FNumber","label":"客户分组","type":"string","describe":"客户分组","value":"FGroup.FNumber"}, {"field":...} ], "otherRequest": [ {"field": "Limit", "label": "最大行数", "type": "string", "describe": "金蝶的查询分页参数", "value": "{PAGINATION_PAGE_SIZE}"}, {"field": "StartRow", "label": "开始行索引", "type": "string", "describe": "金蝶的查询分页参数", "value": "{PAGINATION_START_ROW}"}, {"field":...} ], ... } ``` #### 请求参数解析 1. **请求字段**:在`request`部分,我们定义了需要从金蝶云星空获取的字段。例如,`FCUSTID`表示客户ID,`FNumber`表示编码,`FName`表示名称等。这些字段将用于后续的数据处理和分析。 2. **其他请求参数**:在`otherRequest`部分,我们定义了一些辅助参数,如分页参数(`Limit`, `StartRow`),过滤条件(`FilterString`)等。这些参数有助于优化查询效率和结果管理。 #### 数据请求与清洗 通过上述配置,我们可以向金蝶云星空发送POST请求以获取所需的客户数据。以下是一个示例请求体: ```json { "FormId": "BD_Customer", "FieldKeys": ["FCUSTID", "FNumber", ...], ... } ``` 响应结果将包含我们指定的字段及其对应的数据。在接收到响应后,需要对数据进行初步清洗和转换,以确保其格式和内容符合业务需求。 #### 数据转换与写入 在完成数据清洗后,下一步是将数据转换为目标系统所需的格式,并写入目标数据库或系统。这一步通常涉及以下操作: 1. **字段映射**:将源系统中的字段映射到目标系统中的相应字段。例如,将金蝶云星空中的`FCUSTID`映射到目标系统中的客户ID。 2. **数据类型转换**:确保所有字段的数据类型与目标系统要求一致。例如,将字符串类型的日期转换为日期对象。 3. **批量写入**:为了提高效率,可以采用批量写入方式,将处理后的数据一次性写入目标系统。 通过上述步骤,我们可以实现从金蝶云星空获取并加工客户数据,并将其无缝集成到目标系统中。这不仅提高了数据处理效率,还确保了数据的一致性和准确性。 ![用友与CRM系统接口开发配置](https://pic.qeasy.cloud/S21.png~tplv-syqr462i7n-qeasy.image) ### 使用轻易云数据集成平台进行ETL转换和数据写入 在数据集成过程中,ETL(Extract, Transform, Load)转换是将源平台的数据转化为目标平台所需格式的关键步骤。本文将深入探讨如何使用轻易云数据集成平台,将金蝶客户数据转换并写入目标平台。 #### 数据请求与清洗 首先,我们需要从金蝶系统中提取客户数据。这一步骤通常涉及到通过API接口或数据库连接获取原始数据,并进行初步清洗,以确保数据的完整性和一致性。例如,我们可以通过以下API请求从金蝶系统获取客户信息: ```json { "api": "查询金蝶客户", "method": "GET", "parameters": { "filter": { "status": "active" } } } ``` 在获取到原始数据后,我们需要对其进行清洗操作,例如去除重复记录、处理缺失值等。这些操作可以通过轻易云提供的内置工具或自定义脚本来完成。 #### 数据转换 接下来是数据转换阶段,即将清洗后的数据转化为目标平台所能接受的格式。假设我们需要将金蝶客户数据转化为轻易云集成平台API接口所能接收的格式,我们可以使用如下元数据配置: ```json { "api": "写入空操作", "effect": "EXECUTE", "method": "POST", "idCheck": true } ``` 在这个配置中,`api`字段指定了目标API接口名称,`effect`字段表示执行操作,`method`字段指定了HTTP方法,而`idCheck`字段用于检查是否需要进行ID校验。 为了更好地理解这一过程,我们来看一个具体的转换示例。假设我们从金蝶系统获取到如下客户数据: ```json [ { "customerId": "C001", "customerName": "张三", "contactNumber": "1234567890" }, { "customerId": "C002", "customerName": "李四", "contactNumber": "" } ] ``` 我们需要将这些数据转化为轻易云集成平台API接口所能接收的格式。可以使用以下代码进行转换: ```python import json def transform_data(data): transformed_data = [] for record in data: transformed_record = { "id": record["customerId"], "name": record["customerName"], "phoneNumber": record["contactNumber"] if record["contactNumber"] else None } transformed_data.append(transformed_record) return transformed_data # 原始数据 original_data = [ {"customerId": "C001", "customerName": "张三", "contactNumber": "1234567890"}, {"customerId": "C002", "customerName": "李四", "contactNumber": ""} ] # 转换后的数据 transformed_data = transform_data(original_data) print(json.dumps(transformed_data, ensure_ascii=False)) ``` 上述代码将原始客户数据转化为目标平台所需的格式,其中处理了空值情况,将缺失的联系电话字段设置为`None`。 #### 数据写入 最后一步是将转换后的数据写入目标平台。根据元数据配置,我们使用POST方法调用轻易云集成平台的API接口: ```python import requests url = 'https://api.qingyiyun.com/execute' headers = {'Content-Type': 'application/json'} payload = json.dumps(transformed_data, ensure_ascii=False) response = requests.post(url, headers=headers, data=payload) if response.status_code == 200: print("Data successfully written to the target platform.") else: print(f"Failed to write data. Status code: {response.status_code}") ``` 在这个过程中,确保正确设置HTTP头信息和请求体内容。成功响应后,即表示数据已经成功写入目标平台。 #### 总结 通过以上步骤,我们详细介绍了如何使用轻易云数据集成平台进行ETL转换,并将金蝶客户数据写入目标平台。从提取、清洗、转换到最终写入,每一步都至关重要,确保了整个流程的顺利完成和高效运行。 ![用友与MES系统接口开发配置](https://pic.qeasy.cloud/T25.png~tplv-syqr462i7n-qeasy.image)