ETL转换:如何使用轻易云将金蝶数据写入目标平台

  • 轻易云集成顾问-冯潇
### 【查询】金蝶客户:从金蝶云星空到轻易云数据集成的技术实现 在企业信息系统整合过程中,确保不同平台间的数据无缝对接是一个普遍且关键的问题。本文将聚焦于一个实际应用案例:[【查询】金蝶客户],详细说明如何将金蝶云星空数据集成至轻易云数据集成平台。 #### 如何处理 executeBillQuery 接口的调用与分页限流问题 首先,我们通过调用executeBillQuery接口来获取金蝶云星空中的客户数据。在这一过程中需要特别注意的是分页和限流机制,这是为了避免因大批量请求而导致接口性能下降或超时错误。具体做法是: 1. **分页策略**:每次请求限制在一定数量(如100条),逐页获取直至所有数据抓取完毕。 2. **限流控制**:设置并发请求数限制,根据API文档建议进行合理配置,以确保不会触发频率限制。 代码示例如下: ```python def fetch_kd_cust_data(page_num, page_size): url = "https://api.kingdee.com/executeBillQuery" params = { "pageNum": page_num, "pageSize": page_size } response = requests.get(url, params=params) if response.status_code == 200: return response.json() else: handle_error(response) # Example usage all_data = [] page_num = 1 while True: data_batch = fetch_kd_cust_data(page_num, 100) if not data_batch: break all_data.extend(data_batch) page_num += 1 ``` #### 数据格式差异与映射规则 由于金蝶云星空和轻易云两者之间的数据格式可能存在差异,在有效完成这一步骤之前,需要进行必要的数据转换和映射。例如,将金蝶的JSON结构中字段名`customerName`映射为轻易表格存储中的`cust_name`。 常见处理方式包括但不限于字典映射、序列化/反序列化工具等。主要目标是在不丢失业务语义的情况下,实现平稳过渡。例如: ```python def map_kd_to_qy(data): mapped_data = { "cust_name": data["customerName"], # Add additional mappings as required... } return mapped_data # Apply mapping to all fetched data before writing to the target platform. transformed_data = [map_kd_to_qy(item) for item in all_data] ``` #### 数据写入及异常处理机制 在成功获得并转换所需数据后,通过轻易云集成平台提供的写入API进行批量写入操作。在此 ![用友与外部系统接口集成开发](https://pic.qeasy.cloud/D20.png~tplv-syqr462i7n-qeasy.image) ### 调用金蝶云星空接口executeBillQuery获取并加工数据 在数据集成的生命周期中,调用源系统接口是至关重要的一步。本文将详细探讨如何通过轻易云数据集成平台调用金蝶云星空的`executeBillQuery`接口来获取客户数据,并进行必要的数据加工。 #### 接口配置与调用 首先,我们需要了解如何配置和调用`executeBillQuery`接口。根据提供的元数据配置,我们可以看到该接口采用POST方法,主要用于查询操作(effect: QUERY)。以下是具体的请求字段及其描述: ```json { "api": "executeBillQuery", "effect": "QUERY", "method": "POST", "number": "FNumber", "id": "FCUSTID", "name": "FNumber", "idCheck": true, "request": [ {"field":"FCUSTID","label":"FCUSTID","type":"string","describe":"FCUSTID","value":"FCUSTID"}, {"field":"FNumber","label":"编码1","type":"string","describe":"编码","value":"FNumber"}, {"field":"FName","label":"名称","type":"string","describe":"名称","value":"FName"}, {"field":"FCreateOrgId_FNumber","label":"创建组织","type":"string","describe":"创建组织","value":"FCreateOrgId.FNumber"}, {"field":"FUseOrgId_FNumber","label":"使用组织","type":"string","describe":"使用组织","value":"FUseOrgId.FNumber"}, {"field":"FDescription","label":"描述","type":"string","describe":"描述","value":"FDescription"}, {"field":"FCustTypeId_FNumber","label":"客户类别","type":"string","describe":"客户类别","value":"FCustTypeId.FNumber"}, {"field":"FGroup_FNumber","label":"客户分组","type":"string","describe":"客户分组","value":"FGroup.FNumber"}, {"field":"FSALDEPTID_FNumber","label":"销售部门","type":""}, ... ], ... } ``` #### 请求参数解析 在请求参数中,关键字段包括: - `FormId`: 必须填写金蝶的表单ID,例如:`BD_Customer`。 - `FieldKeys`: 查询字段的key集合,格式为数组。 - `FilterString`: 用于过滤查询结果的条件字符串。 - `Limit`, `StartRow`, `TopRowCount`: 分页参数,用于控制查询结果的分页。 以下是一个示例请求体: ```json { "FormId": "BD_Customer", "FieldKeys": ["FCUSTID", "FNumber", "FName", ...], "FilterString": "FSupplierId.FNumber = 'VEN00010' and FApproveDate>='2023-01-01'", "Limit": 100, "StartRow": 0 } ``` #### 数据清洗与加工 在获取到原始数据后,需要对其进行清洗和加工,以满足业务需求。以下是一些常见的数据清洗与加工操作: 1. **字段映射**:将原始数据中的字段映射到目标系统所需的字段。例如,将`FCUSTID`映射为目标系统中的客户ID。 2. **数据格式转换**:将日期、金额等字段转换为目标系统所需的格式。例如,将日期格式从`YYYY-MM-DD`转换为`MM/DD/YYYY`。 3. **数据过滤**:根据业务规则过滤掉不需要的数据。例如,只保留状态为“有效”的客户记录。 以下是一个简单的数据清洗示例: ```python def clean_data(raw_data): cleaned_data = [] for record in raw_data: cleaned_record = { 'customer_id': record['FCUSTID'], 'customer_number': record['FNumber'], 'customer_name': record['FName'], 'organization': record['FCreateOrgId_FNumber'], ... } # 数据过滤 if record['status'] == '有效': cleaned_data.append(cleaned_record) return cleaned_data ``` #### 实时监控与错误处理 在数据集成过程中,实时监控和错误处理同样重要。轻易云平台提供了实时监控功能,可以帮助我们及时发现和解决问题。例如,当接口调用失败时,可以通过日志和告警机制快速定位问题并采取相应措施。 ```python def monitor_and_handle_errors(response): if response.status_code != 200: log_error(response) send_alert("API调用失败", response.text) ``` 通过上述步骤,我们可以高效地调用金蝶云星空接口获取客户数据,并进行必要的数据清洗与加工,为后续的数据转换与写入做好准备。这不仅提升了业务透明度和效率,也确保了数据的一致性和准确性。 ![打通企业微信数据接口](https://pic.qeasy.cloud/S2.png~tplv-syqr462i7n-qeasy.image) ### 使用轻易云数据集成平台进行ETL转换并写入目标平台 在数据集成的生命周期中,ETL(提取、转换、加载)是至关重要的一环。本文将详细探讨如何利用轻易云数据集成平台,将已经集成的源平台数据进行ETL转换,转为目标平台API接口所能够接收的格式,并最终写入目标平台。 #### 数据提取与初步清洗 在进行ETL操作之前,首先需要从源系统中提取数据。在本案例中,我们假设已经从金蝶客户系统中成功提取了客户数据。这些数据通常以JSON或XML格式存储,并且可能包含冗余信息或不一致的数据格式。 #### 数据转换 一旦完成初步清洗,下一步是将数据转换为目标平台所能接受的格式。轻易云数据集成平台提供了强大的数据转换功能,可以通过自定义脚本或内置函数来实现这一过程。 假设我们从金蝶系统中获取到以下客户信息: ```json { "customer_id": "12345", "name": "张三", "contact_info": { "phone": "13800138000", "email": "zhangsan@example.com" }, "address": { "province": "北京", "city": "北京市", "district": "朝阳区" } } ``` 我们需要将这些信息转换为目标平台API接口所能接受的格式。根据元数据配置,我们知道目标API接口的相关信息如下: ```json { "api":"写入空操作", "effect":"EXECUTE", "method":"POST", "idCheck":true } ``` 为了满足目标API接口的需求,我们需要对原始数据进行以下处理: 1. 将嵌套的`contact_info`和`address`字段展开。 2. 确保所有字段名称符合目标平台要求。 3. 根据`idCheck`参数,确保在写入之前检查ID是否存在。 处理后的数据可能如下: ```json { "customerId": "12345", "customerName": "张三", "phoneNumber": "13800138000", "emailAddress": "zhangsan@example.com", "provinceName": "北京", "cityName": "北京市", "districtName": "朝阳区" } ``` #### 数据加载 在完成数据转换之后,下一步是通过API接口将数据写入目标平台。根据元数据配置,我们使用HTTP POST方法来执行这一操作。 首先,需要构建HTTP请求: ```http POST /api/execute HTTP/1.1 Host: target-platform.example.com Content-Type: application/json { "customerId": "12345", "customerName": "张三", ... } ``` 在发送请求之前,需要检查ID是否存在。如果ID已经存在,则可能需要更新现有记录,而不是插入新记录。这可以通过向目标平台发送一个查询请求来实现: ```http GET /api/check-id?customerId=12345 HTTP/1.1 Host: target-platform.example.com ``` 如果返回结果表明ID不存在,则可以安全地插入新记录;否则,需要更新现有记录。 #### 实践案例 下面是一个实际的代码示例,展示如何使用Python脚本结合轻易云的数据集成功能来完成上述步骤: ```python import requests import json # 源系统中的客户信息 source_data = { 'customer_id': '12345', 'name': '张三', 'contact_info': { 'phone': '13800138000', 'email': 'zhangsan@example.com' }, 'address': { 'province': '北京', 'city': '北京市', 'district': '朝阳区' } } # 转换后的目标数据格式 target_data = { 'customerId': source_data['customer_id'], 'customerName': source_data['name'], 'phoneNumber': source_data['contact_info']['phone'], 'emailAddress': source_data['contact_info']['email'], 'provinceName': source_data['address']['province'], 'cityName': source_data['address']['city'], 'districtName': source_data['address']['district'] } # 检查ID是否存在 check_id_url = f"http://target-platform.example.com/api/check-id?customerId={target_data['customerId']}" response = requests.get(check_id_url) if response.status_code == 200 and response.json().get('exists'): print("ID already exists, updating record...") else: print("ID does not exist, inserting new record...") # 写入目标平台 write_url = f"http://target-platform.example.com/api/execute" headers = {'Content-Type': 'application/json'} response = requests.post(write_url, headers=headers, data=json.dumps(target_data)) if response.status_code == 200: print("Data written successfully") else: print(f"Failed to write data: {response.status_code}") ``` 通过上述步骤,我们成功地将金蝶客户系统中的数据提取、转换并写入到目标平台。这种基于轻易云的数据集成解决方案,不仅简化了跨系统的数据流动,还确保了每个环节的高效和透明。 ![钉钉与MES系统接口开发配置](https://pic.qeasy.cloud/T7.png~tplv-syqr462i7n-qeasy.image)