利用轻易云数据集成平台高效进行数据转换与写入

  • 轻易云集成顾问-陈洁琳
### 金蝶云星空与轻易云集成平台数据对接案例分享 在企业信息化系统中,跨平台的数据集成是关键任务之一。本文将详细探讨如何通过轻易云数据集成平台实现金蝶云星空的辅助资料查询,并成功将其写入到目标数据库。在此技术案例中,我们实际运行的方案名称为:查询金蝶辅助资料--ok_copy。 为了确保金蝶云星空的数据能够无遗漏且高效地传输到轻易云集成平台,我们主要采用了以下技术手段: 首先,通过调用金蝶云星空提供的`executeBillQuery`接口获取所需数据。该API允许用户根据指定条件进行精确的数据检索,确保我们能快速并准确地抓取到需要的信息。同时,为处理大量实时及批量性的数据请求,以优化频繁调用API接口带来的性能影响,还特别设计了分页和限流机制。这两者结合使用不仅避免了请求超时,也有效平衡了服务器负载。 其次,在拿到从金蝶获取的大量数据后,需要以最快速、安全的方法将这些数据写入轻易云集成平台。这里使用的是"写入空操作"API,它具备高速批量处理优势,使得海量数据的迁移变得便捷可靠。此外,为解决可能出现的数据格式差异问题,建立了一套定制化映射规则,将不同系统间的不兼容字段进行了合理转换和匹配,从而保证所有业务逻辑得到准确执行。 最后,对整个流程实施全生命周期监控和日志记录,每一个步骤、每一种异常情况都被详尽记录下来。这不仅使得错误重试机制更为精准,同时提高了整体运维效率,让项目变得更加可控可视。如果某些环节出现异常或失败,如网络波动导致API调用失败,则会触发自动重试策略,以最大程度保障任务顺利完成。 综上,通过合理应用上述方法,不仅成功实现了与金蝶云星空之间快速、稳定、高效的接口对接,而且还确保在整个过程中无任何丢单现象发生,提高了业务透明度及运营效率。在之后内容中,我们会进一步揭秘每个具体实现细节以及相应代码示例,请继续关注! (未完待续...) ![系统集成平台API接口配置](https://pic.qeasy.cloud/D30.png~tplv-syqr462i7n-qeasy.image) ### 调用金蝶云星空接口executeBillQuery获取并加工数据 在轻易云数据集成平台中,调用源系统的API接口是数据处理生命周期的第一步。本文将详细探讨如何通过调用金蝶云星空的`executeBillQuery`接口来获取并加工数据。 #### 接口配置与请求参数 在进行数据集成时,首先需要配置API接口的元数据。以下是`executeBillQuery`接口的元数据配置: ```json { "api": "executeBillQuery", "effect": "QUERY", "method": "POST", "number": "FNumber", "name": "FName", "request": [ {"field":"FNumber","label":"编码","type":"string","value":"FNumber"}, {"field":"FDataValue","label":"名称","type":"string","value":"FDataValue"}, {"field":"FDescription","label":"备注","type":"string","value":"FDescription"}, {"field":"FId_FNumber","label":"类别","type":"string","value":"FId.FNumber"}, {"field":"FSeq","label":"顺序显示","type":"string","value":"FSeq"}, {"field":"FCreateOrgId_FNumber","label":"创建组织","type":"string","value":"FCreateOrgId.FNumber"}, {"field":"FUseOrgId_FNumber","label":"使用组织","type":"string","value":"FUseOrgId.FNumber"}, {"field":"FCreatorId","label":"创建人","type":"string","value":"FCreatorId"}, {"field":"FParentId","label":"上级资料","type":"string","value":"FParentId"} ], "otherRequest": [ {"field": "Limit", "label": "最大行数", "type": "string", "describe": "金蝶的查询分页参数", "value": "{PAGINATION_PAGE_SIZE}"}, {"field": "StartRow", "label": "开始行索引", "type": "string", "describe": "金蝶的查询分页参数", "value": "{PAGINATION_START_ROW}"}, {"field": "TopRowCount", "label": "返回总行数", "type": "int", "describe": "金蝶的查询分页参数"}, {"field": "FilterString", "label": "过滤条件", "type": "string", "describe": "示例写法 FSupplierId.FNumber = 'VEN00010' and FApproveDate>=", "value": "FId.FNumber ='CRM_CustSource'"}, {"field": "FieldKeys", "label": "需查询的字段key集合", "type": "array", "describe": "金蝶分录主键ID格式:FPOOrderEntry_FEntryId,其它格式 FPurchaseOrgId.FNumber" }, {"field": "FormId", "label": "业务对象表单Id", "type": "string", "describe": "必须填写金蝶的表单ID如:PUR_PurchaseOrder", "value": "BOS_ASSISTANTDATA_DETAIL" } ], autoFillResponse: true } ``` #### 请求参数详解 1. **基本请求字段**: - `FNumber`: 编码 - `FDataValue`: 名称 - `FDescription`: 备注 - `FId_FNumber`: 类别 - `FSeq`: 顺序显示 - `FCreateOrgId_FNumber`: 创建组织 - `FUseOrgId_FNumber`: 使用组织 - `FCreatorId`: 创建人 - `FParentId`: 上级资料 2. **其他请求字段**: - `Limit`: 最大行数,用于分页控制。 - `StartRow`: 开始行索引,用于分页控制。 - `TopRowCount`: 返回总行数。 - `FilterString`: 过滤条件,例如`FSupplierId.FNumber = 'VEN00010' and FApproveDate>=`。 - `FieldKeys`: 查询字段key集合,指定需要返回的数据字段。 - `FormId`: 表单ID,例如`BOS_ASSISTANTDATA_DETAIL`。 #### 调用示例 以下是一个调用该接口的示例代码: ```python import requests url = 'https://api.kingdee.com/executeBillQuery' headers = {'Content-Type': 'application/json'} payload = { 'FormId': 'BOS_ASSISTANTDATA_DETAIL', 'FieldKeys': ['FNumber', 'FDataValue', 'FDescription', 'FCreatorId'], 'FilterString': 'FSupplierId.FNumber = \'VEN00010\' and FApproveDate>=\'2023-01-01\'', 'Limit': 100, 'StartRow': 0, } response = requests.post(url, json=payload, headers=headers) data = response.json() # 数据处理逻辑 for item in data['Result']['ResponseStatus']['SuccessEntitys']: print(f'编码: {item["FNumber"]}, 名称: {item["FDataValue"]}, 备注: {item["FDescription"]}') ``` #### 数据处理与清洗 在获取到原始数据后,需要对其进行清洗和转换,以便后续的数据写入和分析。以下是一些常见的数据清洗操作: 1. **去除空值**:删除或填充缺失值。 2. **格式转换**:将日期、金额等字段转换为标准格式。 3. **数据过滤**:根据业务需求筛选出符合条件的数据。 例如,对于上述返回的数据,可以进行如下处理: ```python cleaned_data = [] for item in data['Result']['ResponseStatus']['SuccessEntitys']: if item['FCreatorId'] is not None: cleaned_data.append({ '编码': item['FNumber'], '名称': item['FDataValue'], '备注': item['FDescription'], '创建人': item['FCreatorId'] }) # 将清洗后的数据写入目标系统或存储到数据库中 ``` 通过上述步骤,我们可以高效地从金蝶云星空获取所需的数据,并进行必要的清洗和加工,为后续的数据分析和决策提供支持。 ![打通企业微信数据接口](https://pic.qeasy.cloud/S5.png~tplv-syqr462i7n-qeasy.image) ### 数据转换与写入:轻易云数据集成平台API接口应用 在数据集成的生命周期中,数据转换与写入是至关重要的一步。本文将深入探讨如何利用轻易云数据集成平台,将已经集成的源平台数据进行ETL转换,转为目标平台轻易云集成平台API接口所能够接收的格式,并最终写入目标平台。 #### API接口配置与调用 在进行数据写入之前,首先需要了解目标API接口的配置。根据提供的元数据配置,我们需要使用POST方法来调用“写入空操作”API,并且需要进行ID检查。以下是元数据配置的具体内容: ```json { "api": "写入空操作", "effect": "EXECUTE", "method": "POST", "idCheck": true } ``` #### 数据请求与清洗 在进行数据转换之前,首先要确保从源平台获取的数据是干净且结构化的。这一步通常包括以下几个步骤: 1. **数据请求**:通过源平台的API接口或数据库连接获取原始数据。 2. **数据清洗**:对获取的数据进行清洗,包括去除重复项、处理缺失值和异常值等。 #### 数据转换 一旦数据被清洗干净,就可以开始进行ETL(Extract, Transform, Load)中的Transform部分。这个过程包括将源平台的数据格式转换为目标平台所能接收的格式。以下是一个简单的数据转换示例: 假设我们从源平台获取到的数据如下: ```json { "id": "12345", "name": "测试用户", "email": "test@example.com" } ``` 而目标平台要求的数据格式如下: ```json { "userId": "12345", "userName": "测试用户", "userEmail": "test@example.com" } ``` 我们可以使用Python脚本或其他编程语言来完成这一转换过程: ```python def transform_data(source_data): transformed_data = { "userId": source_data["id"], "userName": source_data["name"], "userEmail": source_data["email"] } return transformed_data ``` #### 数据写入 在完成数据转换后,就可以将其写入目标平台。根据元数据配置,我们需要使用POST方法来调用“写入空操作”API,并且需要进行ID检查。以下是一个Python示例,展示了如何调用该API并传递转换后的数据: ```python import requests def write_to_target_platform(transformed_data): url = "<轻易云集成平台API地址>" headers = { 'Content-Type': 'application/json' } response = requests.post(url, json=transformed_data, headers=headers) if response.status_code == 200: print("Data written successfully.") else: print(f"Failed to write data. Status code: {response.status_code}") # 示例调用 source_data = { "id": "12345", "name": "测试用户", "email": "test@example.com" } transformed_data = transform_data(source_data) write_to_target_platform(transformed_data) ``` #### ID检查 在实际操作中,ID检查是一个关键步骤,它确保每条记录都是唯一的,以避免重复插入。在上述示例中,我们假设ID检查已经在API内部实现。如果需要手动实现,可以在发送请求前查询目标系统是否已经存在相同ID的数据。 ```python def check_id_existence(user_id): # 假设有一个GET方法用于查询现有记录 query_url = f"<轻易云集成平台API地址>?userId={user_id}" response = requests.get(query_url) if response.status_code == 200 and response.json(): return True return False # 在写入之前进行ID检查 if not check_id_existence(transformed_data["userId"]): write_to_target_platform(transformed_data) else: print("Record already exists.") ``` 通过以上步骤,我们可以高效地将源平台的数据经过ETL转换后,成功地写入到目标平台。在实际项目中,根据具体需求可能还需进一步优化和调整,但基本流程大致如此。 ![如何开发金蝶云星空API接口](https://pic.qeasy.cloud/T1.png~tplv-syqr462i7n-qeasy.image)