使用轻易云进行ETL转换和数据写入目标平台的最佳实践

  • 轻易云集成顾问-彭亮
### 金蝶云星空数据集成到轻易云集成平台:金蝶销售订单查询 在本文中,我们将探讨如何通过轻易云集成平台实现与金蝶云星空系统的无缝数据对接,具体案例为金蝶销售订单查询的实施。项目目标是高效、安全地将金蝶云星空中的销售订单数据转移并存储到轻易云集成平台,实现实时监控和异常检测。 #### 数据获取与接口调用 首先,关键技术步骤之一是从金蝶云星空系统获取所需的销售订单数据。为了确保完整性和准确性,我们使用了executeBillQuery接口进行定时、可靠的数据抓取操作。利用该接口能够分页处理大量记录,同时应对API限流问题。 ```markdown - API: executeBillQuery - 功能: 获取指定时间段内的所有销售订单 ``` #### 数据写入与转换逻辑 一旦从金蝶云星空成功获取了销售订单信息,下一个重要环节便是将这些数据快速、高效地写入到轻易云集成平台。我们采用了一套自定义的数据转换逻辑,以适应特定业务需求,并且使用/customer/add接口实现批量数据写入,从而提升整体处理效率。 ```markdown - API: /customer/add - 功能: 向轻易云集成平台批量添加客户信息 ``` #### 实时监控与告警机制 为了保障整个数据对接流程顺畅运行,部署了一套集中监控和告警系统,可以实时跟踪每项任务的状态及性能表现。一旦检测到任何异常,如网络延迟或API错误,系统会立即触发告警并启动自动重试机制,以确保不漏单、不丢单。同时,通过可视化工具展示数据流动过程,使得管理人员能够清晰了解每个环节的状态。 上述解决方案不仅提高了我们的运营效率,还有效避免了潜在风险,为企业带来了显著收益。在接下来的部分中,我们将进一步深入探讨具体实现步骤及其背后的技术细节,包括处理分页、限流问题,以及如何优化API资产管理等更多内容。 ![如何开发企业微信API接口](https://pic.qeasy.cloud/D22.png~tplv-syqr462i7n-qeasy.image) ### 调用金蝶云星空接口executeBillQuery获取并加工数据 在数据集成的生命周期中,调用源系统接口获取数据是至关重要的一步。本文将详细探讨如何通过轻易云数据集成平台调用金蝶云星空的`executeBillQuery`接口来获取销售订单数据,并进行初步的数据加工。 #### 接口配置与调用 首先,我们需要配置并调用金蝶云星空的`executeBillQuery`接口。该接口使用POST方法,主要用于查询销售订单相关信息。以下是元数据配置的关键字段及其含义: - **api**: `executeBillQuery` - **method**: `POST` - **number**: `FBillNo` - **id**: `FID` - **pagination**: 分页参数,默认每页500条记录 - **idCheck**: 启用ID检查 请求参数包括销售订单的各个字段,如单据编号、单据状态、销售组织、日期等。这些字段在请求体中以JSON格式传递。 #### 请求参数示例 以下是一个请求参数的示例,展示了如何配置请求体以获取所需的数据: ```json { "FormId": "SAL_SaleOrder", "FieldKeys": [ "FSaleOrderEntry_FEntryID", "FID", "FBillNo", "FDocumentStatus", "FSaleOrgId.FNumber", "FDate", "FCustId.FNumber" ], "FilterString": "FApproveDate>='2023-01-01'", "Limit": 500, "StartRow": 0 } ``` #### 数据清洗与转换 在获取到原始数据后,需要对数据进行清洗和转换,以确保其符合目标系统的要求。这一步通常包括以下几个方面: 1. **字段映射**:将源系统字段映射到目标系统字段。例如,将`FSaleOrgId.FNumber`映射为目标系统中的`SaleOrgCode`。 2. **数据类型转换**:确保所有字段的数据类型符合目标系统的要求。例如,将日期字符串转换为标准日期格式。 3. **过滤无效数据**:移除不符合业务规则的数据记录。例如,过滤掉单据状态为“作废”的记录。 #### 数据写入 经过清洗和转换后的数据可以写入目标系统。在轻易云平台上,可以通过配置相应的写入节点实现这一过程。具体操作包括: 1. 配置目标系统的API接口信息。 2. 设置写入规则和映射关系。 3. 执行写入操作,并监控写入结果。 #### 示例代码 以下是一个示例代码片段,展示了如何在轻易云平台上实现上述步骤: ```python import requests import json # 配置请求URL和头信息 url = 'https://api.kingdee.com/executeBillQuery' headers = { 'Content-Type': 'application/json', 'Authorization': 'Bearer YOUR_ACCESS_TOKEN' } # 构建请求体 payload = { "FormId": "SAL_SaleOrder", "FieldKeys": [ "FSaleOrderEntry_FEntryID", "FID", "FBillNo", "FDocumentStatus", "FSaleOrgId.FNumber", "FDate", "FCustId.FNumber" ], "FilterString": "FApproveDate>='2023-01-01'", "Limit": 500, "StartRow": 0 } # 发起POST请求 response = requests.post(url, headers=headers, data=json.dumps(payload)) # 检查响应状态码 if response.status_code == 200: data = response.json() # 数据清洗与转换(示例) cleaned_data = [] for record in data: cleaned_record = { 'SaleOrderEntryID': record['FSaleOrderEntry_FEntryID'], 'BillNo': record['FBillNo'], 'DocumentStatus': record['FDocumentStatus'], 'SaleOrgCode': record['FSaleOrgId.FNumber'], 'Date': record['FDate'], 'CustomerCode': record['FCustId.FNumber'] } cleaned_data.append(cleaned_record) # 将清洗后的数据写入目标系统(示例) write_url = 'https://api.targetsystem.com/writeData' write_response = requests.post(write_url, headers=headers, data=json.dumps(cleaned_data)) if write_response.status_code == 200: print("Data written successfully") else: print(f"Failed to fetch data: {response.status_code}") ``` 通过上述步骤,我们可以高效地从金蝶云星空获取销售订单数据,并进行必要的数据清洗和转换,最终将处理后的数据写入目标系统。这一过程不仅提高了数据集成的效率,还确保了数据的一致性和准确性。 ![钉钉与MES系统接口开发配置](https://pic.qeasy.cloud/S14.png~tplv-syqr462i7n-qeasy.image) ### 使用轻易云数据集成平台进行ETL转换与写入目标平台 在数据集成生命周期的第二步,我们需要将已经集成的源平台数据进行ETL(Extract, Transform, Load)转换,转为目标平台能够接收的格式,并最终写入目标平台。本文将详细探讨如何使用轻易云数据集成平台API接口进行这一过程。 #### API接口配置 根据提供的元数据配置,我们需要向轻易云集成平台发送一个POST请求,API路径为`/customer/add`。以下是具体的元数据配置内容: ```json { "api": "/customer/add", "method": "POST", "number": "FBillNo", "pagination": { "pageSize": 500 }, "idCheck": true, "otherRequest": [ { "field": "FormId", "label": "业务对象表单Id", "type": "string", "describe": "必须填写金蝶的表单ID如:PUR_PurchaseOrder" }, { "field": "Operation", "label": "执行的操作", "type": "string", "value": "BatchSave" }, { "field": "IsAutoSubmitAndAudit", "label": "提交并审核", "type": "bool", "value": true }, { "field": "IsVerifyBaseDataField", "label": "验证基础资料", "type": "bool", "describe": "是否验证所有的基础资料有效性,布尔类,默认false(非必录)", "value": false } ] } ``` #### 数据提取与清洗 首先,从源系统提取销售订单数据。这一步通常涉及到连接数据库或调用源系统API获取原始数据。假设我们已经成功获取了这些数据,接下来需要对其进行清洗和预处理。 ```python import pandas as pd # 假设从源系统提取的数据存储在DataFrame中 source_data = pd.read_csv('sales_orders.csv') # 清洗数据,例如去除空值、格式化日期等 cleaned_data = source_data.dropna().reset_index(drop=True) cleaned_data['order_date'] = pd.to_datetime(cleaned_data['order_date']) ``` #### 数据转换 接下来,将清洗后的数据转换为目标平台所需的格式。根据元数据配置,我们需要构建一个符合API要求的JSON结构。 ```python import json def transform_data(row): return { 'FBillNo': row['order_number'], 'FormId': 'PUR_PurchaseOrder', 'Operation': 'BatchSave', 'IsAutoSubmitAndAudit': True, 'IsVerifyBaseDataField': False, # 添加其他必要字段 } transformed_data = cleaned_data.apply(transform_data, axis=1).tolist() ``` #### 数据写入 最后,将转换后的数据通过API接口写入目标平台。这里使用Python的requests库来发送HTTP请求。 ```python import requests url = 'https://api.qingyiyun.com/customer/add' headers = {'Content-Type': 'application/json'} for data in transformed_data: response = requests.post(url, headers=headers, data=json.dumps(data)) if response.status_code == 200: print(f"Order {data['FBillNo']} successfully written to target platform.") else: print(f"Failed to write order {data['FBillNo']}. Status code: {response.status_code}") ``` #### 分页处理 如果需要处理大量数据,可以利用分页机制。根据元数据配置中的分页信息,每次请求最多可以处理500条记录。 ```python page_size = 500 total_records = len(transformed_data) for i in range(0, total_records, page_size): batch_data = transformed_data[i:i + page_size] response = requests.post(url, headers=headers, data=json.dumps(batch_data)) if response.status_code == 200: print(f"Batch {i//page_size + 1} successfully written to target platform.") else: print(f"Failed to write batch {i//page_size + 1}. Status code: {response.status_code}") ``` 通过上述步骤,我们完成了从源系统提取、清洗、转换并最终写入目标平台的数据集成过程。这一过程充分利用了轻易云集成平台提供的API接口和元数据配置,实现了高效、可靠的数据传输和处理。 ![如何开发金蝶云星空API接口](https://pic.qeasy.cloud/T17.png~tplv-syqr462i7n-qeasy.image)