利用轻易云进行ETL转换并写入目标平台

  • 轻易云集成顾问-林峰
### 金蝶云星空数据集成轻易云平台案例:金蝶-业务员--->空操作 在企业信息系统的融合过程中,如何高效、准确地将各类业务数据进行有效整合是一个常见且关键的问题。本文将介绍我们成功实施的一个具体案例,即通过轻易云数据集成平台,实现金蝶云星空数据的无缝对接和自动化处理。 #### 案例背景与需求分析 在这个项目中,我们需要将金蝶云星空中的业务员相关数据实时同步到轻易云集成平台,并确保整个流程具备高度稳定性和可跟踪性。为了达到这一目标,我们制定了以下几个核心要求: 1. **定时可靠的数据抓取**:从金蝶云星空定时获取最新业务员信息。 2. **大量数据快速写入**:确保能高效批量导入到轻易云集中管理平台。 3. **接口调用灵活处理**:正确使用executeBillQuery来提取原始数据信息,并通过API写入至轻易云平台。 4. **分页与限流问题解决方案**:针对大规模分页查询及API请求频率限制进行优化。 5. **格式差异处理**:调整两个系统之间的数据格式,以便顺利对接。 #### 接口调用与数据提取 首先,为实现从金蝶获取所需的数据,我们需要精准调度其提供的`executeBillQuery` API。这一步骤不仅要考虑到接口自身的返回结果,还需应对它可能存在的一些特有限制。如单次返回记录数量有限,需要多次分页查询才能完成所有必要的数据拉取。 ```json { "apiName": "executeBillQuery", "parameters": { "filterString": "", "fields": ["Field1", "Field2", ...], ... } } ``` 此处,通过设定适当的过滤条件(如时间戳或者状态码),我们能够保证每次仅获取自上一次调用以来新增或更新过的信息,从而减少重复读取,提高效率,同时避免漏单现象发生。 #### 数据转换与映射 由于源系统(金蝶云)和目标系统(轻易)的字段定义及结构可能有所不同,在向目的端执行“写入空操作”之前,必须先行完成相关的数据转换及映射操作。为了解决这一难题,利用我们的成熟方案进行字段匹配并配置相应规则,将原始记录转化为符合要求的新结构体: ```json { // 原始JSON对象 after transformation } ``` 这种方法既达到了预期效果,又保留了原本复杂联动关系。同时得益于轻易提供的可视化界面,使这一过程更直 ![金蝶与SCM系统接口开发配置](https://pic.qeasy.cloud/D9.png~tplv-syqr462i7n-qeasy.image) ### 调用金蝶云星空接口executeBillQuery获取并加工数据 在数据集成过程中,调用源系统的API接口是至关重要的一步。本文将详细探讨如何通过轻易云数据集成平台调用金蝶云星空的`executeBillQuery`接口来获取并加工数据。 #### 接口概述 `executeBillQuery`是金蝶云星空提供的一个用于查询业务单据的API接口。该接口支持POST请求,主要用于获取特定条件下的业务单据信息。以下是该接口的元数据配置: ```json { "api": "executeBillQuery", "effect": "QUERY", "method": "POST", "number": "FNumber", "id": "FEntity_FEntryId", "name": "FNumber", "request": [ {"field":"FOperatorId","label":"FOperatorId","type":"string","describe":"FOperatorId","value":"FOperatorId"}, {"field":"FBillNo","label":"单据编号","type":"string","describe":"单据编号","value":"FBillNo"}, {"field":"FDescription","label":"描述","type":"string","describe":"描述","value":"FDescription"}, {"field":"FOperatorType","label":"业务员类型","type":"string","describe":"业务员类型","value":"FOperatorType"}, {"field":"FCreateDate","label":"创建日期","type":"string","describe":"创建日期","value":"FCreateDate"}, {"field":"FModifyDate","label":"最后修改日期","type":"string","describe":"最后修改日期","value":"FModifyDate"}, {"field":"FEntity_FEntryId","label":"FEntity_FEntryId","type":"string","describe":"FEntity_FEntryId","value":"FEntity_FEntryId"}, {"field":"FOperatorType_ETY","label":"分录-业务员类型","type":"string","describe":"分录-业务员类型","value":"FOperatorType_ETY"}, {"field":"FStaffId_FNumber","label":"职员","type":"string","describe":"职员","value":"FStaffId.FNumber"}, {"field": "FBizOrgId", "label": "业务组织编码", "type": "string", "value": "FBizOrgId.Fnumber"} ], "otherRequest": [ {"field": "Limit", "label": "最大行数", "type": "string", "describe": "金蝶的查询分页参数", "value": "{PAGINATION_PAGE_SIZE}"}, {"field": "StartRow", "label": "开始行索引", "type": "string", "describe": "金蝶的查询分页参数", "value": "{PAGINATION_START_ROW}"}, {"field": "TopRowCount", "label": "返回总行数", "type": int, describe: 金蝶的查询分页参数}, { field: FilterString, label: 过滤条件, type: string, describe: 示例写法 FSupplierId.FNumber = 'VEN00010' and FApproveDate>=, value: FModifyDate>='{{LAST_SYNC_TIME|dateTime}}' }, { field: FieldKeys, label: 需查询的字段key集合, type: array, describe: 金蝶分录主键ID格式: FPOOrderEntry_FEntryId,其它格式 FPurchaseOrgId.FNumber, parser: {name: ArrayToString, params:,} }, { field: FormId, label: 业务对象表单 Id, type: string, describe: 必须填写金蝶的表单 ID 如:PUR_PurchaseOrder, value: BD_OPERATOR } ], autoFillResponse:true } ``` #### 请求参数详解 1. **基本字段** - `FOperatorId`: 操作员ID - `FBillNo`: 单据编号 - `FDescription`: 描述 - `FOperatorType`: 业务员类型 - `FCreateDate`: 创建日期 - `FModifyDate`: 最后修改日期 - `FBizOrgId`: 业务组织编码 2. **分页参数** - `Limit`: 最大行数,控制每次查询返回的数据量。 - `StartRow`: 开始行索引,用于分页查询。 3. **过滤条件** - `FilterString`: 用于指定查询条件,例如`FSupplierId.FNumber = 'VEN00010' and FApproveDate>= '2023-01-01'`。 4. **字段集合** - `FieldKeys`: 指定需要返回的字段集合,以逗号分隔。 5. **表单ID** - `FormId`: 必须填写金蝶系统中的表单ID,例如`BD_OPERATOR`。 #### 实际应用案例 假设我们需要从金蝶云星空中获取所有在某个时间段内修改过的业务员信息,并且每次查询最多返回100条记录。我们可以通过以下步骤实现: 1. **配置请求参数** ```json { FormId: 'BD_OPERATOR', FieldKeys: ['FBillNo', 'FBizOrgId', 'FBizOrgName', 'FBizOrgCode'], FilterString: 'FModifyDate >= \'2023-01-01\'', Limit: '100', StartRow: '0' } ``` 2. **发送请求** 使用轻易云数据集成平台提供的可视化界面,将上述配置填入相应的位置,并执行请求。 3. **处理响应** 响应结果会自动填充到预先定义好的数据模型中,便于后续的数据清洗和转换操作。 #### 数据清洗与转换 在获取到原始数据后,我们通常需要进行一定的数据清洗和转换。例如,将日期格式统一、去除无效字符等。这些操作可以通过轻易云平台提供的数据处理工具来实现。 ```python # 示例代码:清洗和转换数据 import pandas as pd # 假设response_data是从API获取到的数据 response_data = [ {'FBillNo': '12345', 'FBizOrgName': '销售部', 'FBizOrgCode': '001'}, {'FBillNo': '67890', 'FBizOrgName': '市场部', 'FBizOrgCode': '002'} ] df = pd.DataFrame(response_data) # 清洗操作:去除无效字符,统一日期格式等 df['FBillNo'] = df['FBillNo'].str.strip() df['FBizOrgName'] = df['FBizOrgName'].str.replace('部', '') print(df) ``` 通过上述步骤,我们可以高效地从金蝶云星空中获取并处理所需的数据,为后续的数据分析和决策提供可靠支持。 ![如何开发企业微信API接口](https://pic.qeasy.cloud/S11.png~tplv-syqr462i7n-qeasy.image) ### 使用轻易云数据集成平台进行ETL转换并写入目标平台 在数据集成的生命周期中,ETL(Extract, Transform, Load)转换是关键的一步。本文将详细探讨如何利用轻易云数据集成平台,将已经集成的源平台数据进行ETL转换,并通过API接口写入目标平台。 #### 元数据配置解析 在进行ETL转换之前,首先需要了解元数据配置。以下是此次任务的元数据配置: ```json { "api": "写入空操作", "effect": "EXECUTE", "method": "POST", "idCheck": true } ``` - **api**: 指定了目标API接口名称为"写入空操作"。 - **effect**: 表示该操作的效果为执行(EXECUTE)。 - **method**: HTTP请求方法为POST。 - **idCheck**: 表示在写入数据前需要进行ID校验。 #### 数据请求与清洗 在进入ETL转换阶段之前,首先需要从源平台(金蝶-业务员)获取原始数据,并对其进行清洗和预处理。这一步骤包括: 1. 数据提取:通过API或数据库连接,从源系统中提取业务员相关的数据。 2. 数据清洗:去除冗余信息、修正错误数据、统一数据格式等。 假设我们已经完成了上述步骤,获得了一个结构化的数据集。 #### 数据转换 接下来,我们需要将清洗后的数据转换为目标平台能够接受的格式。根据元数据配置中的要求,我们需要确保以下几点: 1. **字段映射**:源平台的数据字段需要映射到目标平台所需的字段。 2. **格式转换**:确保所有字段的数据类型和格式符合目标平台的要求。 3. **ID校验**:根据`idCheck`参数,需要对每条记录进行ID校验,以避免重复或冲突。 例如,假设我们从金蝶系统中提取到如下业务员数据: ```json [ {"id": "001", "name": "张三", "sales": 1000}, {"id": "002", "name": "李四", "sales": 1500} ] ``` 我们需要将其转换为目标平台所需的格式: ```json [ {"identifier": "001", "fullName": "张三", "totalSales": 1000}, {"identifier": "002", "fullName": "李四", "totalSales": 1500} ] ``` #### 数据写入 完成数据转换后,通过API接口将其写入目标平台。根据元数据配置,我们使用POST方法调用"写入空操作"API。以下是一个示例代码片段,展示如何实现这一过程: ```python import requests import json # 转换后的数据 data = [ {"identifier": "001", "fullName": "张三", "totalSales": 1000}, {"identifier": "002", "fullName": "李四", "totalSales": 1500} ] # API URL url = 'https://api.example.com/execute' # 请求头 headers = { 'Content-Type': 'application/json' } # 发起POST请求 response = requests.post(url, headers=headers, data=json.dumps(data)) # 检查响应状态码 if response.status_code == 200: print("Data successfully written to the target platform.") else: print(f"Failed to write data. Status code: {response.status_code}") ``` 在上述代码中,我们使用Python的requests库发起HTTP POST请求,将转换后的数据发送到目标API接口。如果响应状态码为200,则表示数据成功写入。 #### 实时监控与反馈 轻易云数据集成平台提供实时监控功能,可以跟踪每个环节的数据流动和处理状态。在实际操作中,可以利用这些功能来确保每一步骤都顺利完成,并及时发现和解决可能出现的问题。 通过以上步骤,我们实现了从源平台到目标平台的数据ETL转换与写入。在实际项目中,根据具体需求和环境,还可能涉及更多复杂的处理逻辑和优化策略,但基本流程如上所述。 ![用友BIP接口开发配置](https://pic.qeasy.cloud/T18.png~tplv-syqr462i7n-qeasy.image)