详解ETL过程:金蝶云数据在轻易云平台的转换与写入策略

  • 轻易云集成顾问-冯潇
### 金蝶云星空数据集成到轻易云平台案例分享:KD4 查询发货通知单-关联查询 在企业信息化系统之间进行高效的数据对接,一直是提升业务效率的关键。本文将重点介绍如何通过轻易云数据集成平台,将金蝶云星空中的发货通知单数据无缝、可靠地集成到我们的系统中。此次共享的方案名为“KD4 查询发货通知单-关联查询”,旨在确保不漏单、高效快速并且稳定地实现数据同步。 #### 确保集成过程中的一致性与完整性 为了保障从金蝶云星空获取的数据完整,我们利用了金蝶提供的executeBillQuery接口,此接口支持批量查询和过滤条件,能精确获取需要的数据。然而,处理大量调用时分页和限流是不可避免的问题。在此过程中,通过设计合理的分页参数与限流机制,以及定时调度任务,保证每次抓取操作都能够按预期顺利完成,不遗漏任何一条记录。此外,对不同类型错误设定对应重试策略,提高了整个流程异常情况下恢复能力。 #### 处理大规模数据写入 当成功从金蝶云星空取得相关数据后,需要将其高效写入到轻易云平台。我们采用了一套批量写入策略,这种方式不仅减少API调用次数,也显著加快了数据库操作速度。同时,为应对可能出现的数据格式差异问题,在轻易云平台端建立了一整套自定义映射规则,并通过可视化工具实时调整查看,有力保障了最终存储结果的一致性。 #### 实时监控与日志记录 整个流程中,每一次API请求和数据库操作都会被实时监控,并详细记录至日志文件。这不但便于后续运维人员排查故障,更有助于分析潜在优化点。例如,通过观察日志,可以及时发现某次请求处理时间过长,从而反向追踪具体原因并改进相应环节。 此次技术文章开头即聚焦这些核心步骤,希望能为类似场景下的系统集成需求提供实用参考。在接下来的内容中,我们将更详尽地探讨各个技术细节及实际应用效果。 ![用友与SCM系统接口开发配置](https://pic.qeasy.cloud/D30.png~tplv-syqr462i7n-qeasy.image) ### 调用金蝶云星空接口executeBillQuery获取并加工数据 在轻易云数据集成平台中,调用源系统接口是数据处理生命周期的第一步。本文将详细介绍如何通过调用金蝶云星空的`executeBillQuery`接口来获取并加工发货通知单的数据。 #### 接口配置与请求参数 金蝶云星空提供了丰富的API接口,其中`executeBillQuery`用于查询业务单据。该接口采用POST方法,支持复杂的查询条件和分页参数。以下是元数据配置中的关键字段及其含义: - **api**: `executeBillQuery` - **method**: `POST` - **number**: `FBillNo` - **id**: `FEntity_FEntryID` 请求参数分为两类:主请求参数和其他请求参数。 ##### 主请求参数 这些参数主要用于指定需要查询的字段和过滤条件: ```json [ {"field":"FEntity_FEntryID","label":"FEntity_FEntryID","type":"string","describe":"FEntity_FEntryID","value":"FEntity_FEntryID"}, {"field":"FID","label":"FID","type":"string","describe":"FID","value":"FID"}, {"field":"FBillNo","label":"单据编号","type":"string","describe":"单据编号","value":"FBillNo"}, {"field":"FDate","label":"日期","type":"string","describe":"日期","value":"FDate"}, {"field":"FSaleOrgId_Fname","label":"销售组织名称","type":"string","describe":"销售组织","value":"FSaleOrgId.Fname"}, // 更多字段... ] ``` ##### 其他请求参数 这些参数用于控制查询行为,如分页、过滤等: ```json [ {"field":"Limit","label":"最大行数","type":"int","describe":"金蝶的查询分页参数","value":100}, {"field":"StartRow","label":"开始行索引","type":"int"}, {"field":"TopRowCount","label":"返回总行数","type": "int"}, {"field": "FilterString", "label": "过滤条件", "type": "string", "describe": "示例写法 FSupplierId.FNumber = 'VEN00010' and FApproveDate>=", "value": "FApproveDate>='{{LAST_SYNC_TIME|datetime}}'"}, {"field": "FieldKeys", "label": "需查询的字段key集合", "type": "array", "parser":{"name": "ArrayToString", "params": ","}}, {"field": "FormId", "label": "业务对象表单Id", "type": "string", "describe": "必须填写金蝶的表单ID如:PUR_PurchaseOrder", "value": "SAL_DELIVERYNOTICE"} ] ``` #### 数据请求与清洗 在实际操作中,首先需要构建请求体,将上述配置中的字段映射到实际的API调用中。例如: ```json { "FormId": "SAL_DELIVERYNOTICE", "FieldKeys": ["FBillNo", "FDate", ...], "FilterString": "FApproveDate>='2023-01-01'", // 分页参数 "Limit": 100, ... } ``` 通过轻易云的数据集成平台,可以方便地将这些配置转化为实际的API调用。平台会自动处理分页、过滤等逻辑,并将结果返回。 #### 数据转换与写入 获取到数据后,需要对其进行清洗和转换,以便后续处理。例如,将日期格式统一、将金额字段转换为标准货币格式等。这些操作可以通过轻易云平台内置的数据转换工具实现。 ```json { // 清洗后的数据示例 { "单据编号": "DN20230101001", ... // 转换后的字段 ... } } ``` #### 实际案例 以下是一个实际调用金蝶云星空`executeBillQuery`接口并处理返回数据的案例: 1. 构建请求体: ```json { "FormId": "SAL_DELIVERYNOTICE", ... // 请求体内容 } ``` 2. 调用API: ```python response = requests.post(api_url, json=request_body) data = response.json() ``` 3. 数据清洗与转换: ```python cleaned_data = clean_data(data) ``` 4. 将清洗后的数据写入目标系统或存储: ```python save_to_target_system(cleaned_data) ``` 通过以上步骤,可以高效地从金蝶云星空获取所需的数据,并进行必要的清洗和转换,为后续的数据处理打下坚实基础。 ![泛微OA与ERP系统接口开发配置](https://pic.qeasy.cloud/S14.png~tplv-syqr462i7n-qeasy.image) ### KD4 查询发货通知单-关联查询的ETL转换与写入 在数据集成生命周期的第二步,我们将已经从源平台集成的数据进行ETL(提取、转换、加载)处理,转化为目标平台轻易云集成平台API接口所能够接收的格式,并最终写入目标平台。本文将重点探讨这一过程中的技术细节和实现方法。 #### 数据提取与清洗 首先,我们需要从源系统中提取发货通知单的数据。在这个阶段,数据可能存在各种不一致性和冗余信息,因此需要进行清洗操作。清洗操作包括但不限于: - 去除重复记录 - 处理缺失值 - 格式标准化 例如,假设我们从源系统中提取到的原始数据如下: ```json [ {"order_id": "123", "shipment_date": "2023-10-01", "status": "delivered"}, {"order_id": "124", "shipment_date": null, "status": "pending"}, {"order_id": "123", "shipment_date": "2023-10-01", "status": "delivered"} ] ``` 在清洗过程中,我们会去除重复的订单记录,并处理缺失的发货日期: ```json [ {"order_id": "123", "shipment_date": "2023-10-01", "status": "delivered"}, {"order_id": "124", "shipment_date": "", "status": "pending"} ] ``` #### 数据转换 接下来,我们需要将清洗后的数据转换为目标平台能够接受的格式。根据元数据配置,目标平台的API接口要求如下: ```json { "api":"写入空操作", "effect":"EXECUTE", "method":"POST", "idCheck":true } ``` 这意味着我们需要通过HTTP POST方法,将数据发送到指定的API接口,并且在发送之前需要进行ID检查,以确保每条记录都具有唯一标识符。 假设目标平台要求的数据格式如下: ```json { "operation": { "type": "insert", "data": [ {"id": 1, "order_id": 123, "shipment_date": "2023-10-01", "status": 1}, {"id": 2, "order_id": 124, "shipment_date": "", "status": 0} ] } } ``` 我们需要对清洗后的数据进行进一步转换,确保字段名和数据类型符合目标平台的要求。例如,将`status`字段从字符串类型转换为整数类型(1表示已发货,0表示待发货),并为每条记录生成唯一的`id`。 转换后的数据如下: ```json { "operation": { "type": "insert", "data":[ {"id": 1, "order_id":"123", "shipment_date":"2023-10-01", "status" :1}, {"id" :2, "order_id":"124", "shipment_date":"", "status" :0} ] } } ``` #### 数据加载 最后一步是将转换后的数据通过API接口写入目标平台。根据元数据配置,我们使用HTTP POST方法发送请求: ```http POST /api/execute HTTP/1.1 Host: target-platform.com Content-Type: application/json { ... } ``` 在发送请求之前,需要确保每条记录都经过ID检查,以防止重复插入或更新错误。可以使用数据库或者内存缓存来维护一个已处理ID列表,每次插入新记录前检查该ID是否已经存在。 #### 实现代码示例 以下是一个简单的Python代码示例,展示了如何实现上述ETL过程: ```python import requests import json # 清洗函数示例 def clean_data(data): cleaned_data = [] seen_ids = set() for record in data: if record['order_id'] not in seen_ids: seen_ids.add(record['order_id']) record['shipment_date'] = record['shipment_date'] or "" cleaned_data.append(record) return cleaned_data # 转换函数示例 def transform_data(data): transformed_data = [] for idx, record in enumerate(data): transformed_record = { 'id': idx + 1, 'order_id': int(record['order_id']), 'shipment_date': record['shipment_date'], 'status': 1 if record['status'] == 'delivered' else 0 } transformed_data.append(transformed_record) return { 'operation': { 'type': 'insert', 'data': transformed_data } } # 加载函数示例 def load_data(api_url, data): headers = {'Content-Type': 'application/json'} response = requests.post(api_url, headers=headers, data=json.dumps(data)) if response.status_code == 200: print("Data loaded successfully") else: print(f"Failed to load data: {response.status_code}") # 主程序示例 if __name__ == "__main__": raw_data = [ {"order_id":"123","shipment_date":"2023-10-01","status":"delivered"}, {"order_id":"124","shipment_date":"","status":"pending"}, {"order_id":"123","shipment_date":"2023-10-01","status":"delivered"} ] cleaned_data = clean_data(raw_data) transformed_data = transform_data(cleaned_data) api_url = 'http://target-platform.com/api/execute' load_data(api_url, transformed_data) ``` 通过上述步骤和代码示例,我们可以高效地完成从源系统到目标平台的数据ETL转换与加载。这不仅提高了数据处理效率,还确保了数据的一致性和准确性。 ![数据集成平台API接口配置](https://pic.qeasy.cloud/T24.png~tplv-syqr462i7n-qeasy.image)