ETL转换与数据映射:轻易云平台的应用案例

  • 轻易云集成顾问-黄宏棵
### 易快报数据集成到美国人资产系统技术案例 在企业日常运作中,财务管理和资产管控始终是核心任务之一,高效的系统对接能够有效提升各项业务流程的执行效率。本篇文章将重点分享如何通过轻易云数据集成平台,实现从易快报到美国人资产系统的数据无缝对接,其中涉及了多个关键技术环节,包括API接口调用、数据处理优化以及异常处理机制等。 #### 确保集成过程中不漏单 为保证所有易快报中的收货确认记录都能顺利转入美国人资产系统,我们首先设计并实施了一套可靠的数据抓取机制。具体来说,通过定时任务(cron job)在特定时间点批量调用易快报提供的API接口`/api/openapi/v1.1/docs/getApplyList`获取最新申请列表。在实现过程中,我们特别关注分页和限流问题,确保每次请求都能够全面准确地获取所需数据而不丢失任何一条记录。 #### 批量写入与快速响应 大量数据需要快速且安全地写入至美国人资产系统,对此我们构建了高效的数据传输链路。使用轻易云平台中的批量处理功能,将从易快报获取的大量收货确认信息整理后,通过调用目标系统提供的API `/assetCard/ekuaibaoProcurementDocking?procurementMsg=` 进行集中化批量写入,有效减少了网络开销并提高了响应速度。这种操作极大程度上优化了资源使用,并保障了整个对接过程高效稳定。 #### 数据格式差异与映射转换 由于两个不同系统间的数据结构可能存在显著差异,为此我们专门设计了一套灵活的数据映射方案。在实际操作中,根据两边字段定义及其对应关系,利用转换规则将来自易快报原始数据调整到符合目标格式,从而使得这些信息能被顺利识别并接受。此外,美国人资产系统还支持自定义字段,这给我们的映射工作带来了更大的弹性和适配能力。 作为一个完整解决方案的一部分,错误重试机制同样被纳入考虑范围。一旦出现异常情况,例如网络波动或服务器错误,会触发自动重试逻辑,并详细日志记录便于事后追踪,以最低代价恢复正常运行状态。 ![打通金蝶云星空数据接口](https://pic.qeasy.cloud/D20.png~tplv-syqr462i7n-qeasy.image) ### 调用易快报接口获取并加工数据的技术案例 在数据集成生命周期的第一步中,我们需要调用易快报的接口`/api/openapi/v1.1/docs/getApplyList`来获取原始数据,并对其进行初步加工。以下是详细的技术实现过程。 #### 接口调用配置 首先,我们需要配置元数据以便正确调用易快报接口。根据提供的元数据配置,以下是关键参数和其含义: - **API路径**:`/api/openapi/v1.1/docs/getApplyList` - **请求方法**:GET - **分页查询起始值**:通过`start`字段指定 - **每页记录数**:通过`count`字段指定,最大不能超过100 - **排序字段**:通过`orderBy`字段指定,支持创建时间、更新时间、提交时间和支付时间等 - **查询起始时间和结束时间**:分别通过`startDate`和`endDate`字段指定,格式为`yyyy-MM-dd HH:mm:ss` - **单据模板ID**:通过`specificationId`字段指定,支持多个模板ID,用逗号分隔 - **是否已删除**:通过`active`字段指定,true表示未删除,false表示已删除 - **单据状态**:通过`state`字段指定,可选值包括paying, PROCESSING, paid, archived #### 请求参数示例 根据上述配置,我们可以构建一个请求参数示例: ```json { "type": "requisition", "start": "0", "count": "100", "orderBy": "updateTime", "startDate": "{{LAST_SYNC_TIME|datetime}}", "endDate": "{{CURRENT_TIME|datetime}}", "specificationId": "ID01o1KW1zYBe7", "active": "true", "state": "paying,PROCESSING,paid,archived" } ``` #### 数据过滤条件 在获取数据后,我们需要对其进行过滤和加工。根据元数据配置中的条件,我们有以下过滤条件: 1. 单据明细中的单价(details._unitPrice_standard)大于等于1000。 2. 项目(details.项目)在特定ID列表中。 3. 固定资产类型(details.FA_type)等于1。 这些条件可以用来筛选出符合业务需求的数据。 #### 实现步骤 1. **发送HTTP请求**: 使用HTTP客户端库(如Python的requests库或Java的HttpClient)发送GET请求到易快报API,并传递上述参数。 2. **解析响应数据**: 响应数据通常是JSON格式,需要解析JSON以提取所需信息。 3. **应用过滤条件**: 对解析后的数据应用过滤条件,以筛选出符合要求的数据记录。例如,在Python中可以使用列表推导式或pandas库进行过滤。 4. **处理并存储结果**: 将过滤后的数据进一步处理,如转换为目标系统所需的格式,并存储到数据库或其他存储介质中。 #### 示例代码 以下是一个使用Python实现上述步骤的示例代码: ```python import requests import json from datetime import datetime # 配置请求参数 params = { "type": "requisition", "start": "0", "count": "100", "orderBy": "updateTime", "startDate": datetime.now().strftime('%Y-%m-%d %H:%M:%S'), "endDate": datetime.now().strftime('%Y-%m-%d %H:%M:%S'), "specificationId": "ID01o1KW1zYBe7", "active": "true", "state": "paying,PROCESSING,paid,archived" } # 发送GET请求 response = requests.get('https://api.yikuai.com/api/openapi/v1.1/docs/getApplyList', params=params) data = response.json() # 应用过滤条件 filtered_data = [ item for item in data['items'] if item['details']['_unitPrice_standard'] >= 1000 and item['details']['项目'] in ["ID01oTMqiKMEwL", "ID01oTMqiKMENh", "ID01oTMqiKMF3N", "ID01oTMqiKMFkj"] and item['details']['FA_type'] == '1' ] # 输出或存储结果 print(json.dumps(filtered_data, indent=4)) ``` 该代码展示了如何调用易快报API获取数据,并应用特定的过滤条件进行初步加工。通过这种方式,可以确保从源系统获取的数据符合业务需求,为后续的数据转换与写入阶段奠定基础。 ![系统集成平台API接口配置](https://pic.qeasy.cloud/S19.png~tplv-syqr462i7n-qeasy.image) ### 使用轻易云数据集成平台进行ETL转换并写入美国人资产系统API接口 在数据集成的生命周期中,第二步至关重要:将已经集成的源平台数据进行ETL(提取、转换、加载)转换,并转为目标平台所能够接收的格式。本文将详细探讨如何通过轻易云数据集成平台,将易快报收货确认数据转换为美国人资产系统API接口所需的格式,并最终写入目标平台。 #### 元数据配置解析 首先,我们需要理解元数据配置的结构,以便在实际操作中正确应用。以下是元数据配置的详细解析: ```json { "api": "/assetCard/ekuaibaoProcurementDocking?procurementMsg=", "effect": "EXECUTE", "method": "POST", "number": "1", "id": "1", "name": "1", "idCheck": true, "request": [ { "field": "procurementList", "label": "procurementList", "type": "array", "value": "details", "children": [ { "field": "device_name", "label": "资产名称", "type": "string", "value": "{{details.u_产品名称}}" }, { "field": "assets_nature", ... } ] } ] } ``` #### 数据字段映射与转换 在上述配置中,每个字段都有其特定的映射和转换逻辑。我们将逐一解析这些字段: 1. **device_name(资产名称)**: - 类型:字符串 - 映射:`{{details.u_产品名称}}` - 从源数据中的`u_产品名称`字段提取值。 2. **assets_nature(资产性质)**: - 类型:字符串 - 转换逻辑:根据`details.项目`字段的值进行条件判断,若值为特定ID则返回'G',否则返回'N'。 ```sql case when '{{details.项目}}' in ('ID01oTMqiKMEwL','ID01oTMqiKMENh','ID01oTMqiKMF3N','ID01oTMqiKMFkj') then 'G' else 'N' end ``` 3. **device_type(型号)**: - 类型:字符串 - 映射:`{{details.u_产品型号}}` - 从源数据中的`u_产品型号`字段提取值。 4. **ori_value(单价)**: - 类型:字符串 - 转换逻辑:将`details._unitPrice_standard`字段乘以1。 ```sql {{details._unitPrice_standard}}*1 ``` 5. **num(数量)**: - 类型:字符串 - 转换逻辑:将`details.u_收货数量`字段乘以1。 ```sql {{details.u_收货数量}}*1 ``` 6. **device_unit(单位)**: - 类型:字符串 - 固定值:"台" 7. **receipt_date(收货日期)**: - 类型:字符串 - 映射并格式化日期:`{{details.u_交货日期|datetime}}` 8. **purchase_order_code(采购单PO_No)**: - 类型:字符串 - 映射:`{{details.u_采购订单号}}` 9. **receipt_order_code(收货单No)**: - 类型:字符串 - 固定值或变量传递:`{code}` 10. **supplier(供应商)**: - 类型:字符串 - 查询MongoDB数据库,根据供应商ID获取供应商名称。 ```mongoQuery ed5a5701-c0ef-3ce4-9de6-a3a08ac1d46d findField=content._system.name where={"content._system.id":{"$eq":"{{details.u_供应商名称}}"}} ``` 11. **card_user(管理人)**: - 类型:字符串 - 查询MongoDB数据库,根据提交者ID获取管理人姓名。 ```mongoQuery 01e47641-1101-3884-ba0d-59f24126e750 findField=content.name where={"id":{"$eq":"{submitterId}"}} ``` 12. **rep_dept_name(管理部门)**: - 类型:字符串 - 查询MongoDB数据库,根据申请部门ID获取部门名称。 ```mongoQuery 12ec22fb-d3b3-3be1-b41e-447a338ffb92 findField=content.name where={"id":{"$eq":"{{details.u_申请部门}}"}}} ``` 13. **budget_account(预算科目)**: - 类型:字符串 - 转换逻辑:根据项目ID映射到特定预算科目代码。 ```sql case '{{details.项目}}' when 'ID01oTMqiKMEwL' then 'B1830' when 'ID01oTMqiKMENh' then 'B1840' when 'ID01oTMqiKMF3N' then 'B1850' when 'ID01oTMqiKMFkj' then 'B1860' else '' end ``` #### API接口调用 在完成所有字段映射和转换后,我们使用POST方法将处理后的数据发送到美国人资产系统API接口: ```json { api: "/assetCard/ekuaibaoProcurementDocking?procurementMsg=", method: POST, data: { procurementList: [ { device_name: "{{details.u_产品名称}}", assets_nature: "...", // 根据条件判断结果填充值 device_type: "{{details.u_产品型号}}", ori_value: "...", // 单价计算结果 num: "...", // 数量计算结果 device_unit: "台", receipt_date: "{{details.u_交货日期|datetime}}", purchase_order_code: "{{details.u_采购订单号}}", receipt_order_code: "{code}", supplier: "...", // MongoDB查询结果 card_user: "...", // MongoDB查询结果 rep_dept_name: "...", // MongoDB查询结果 budget_account: "...", // 根据条件判断结果填充值 } ] } } ``` 通过以上步骤,我们成功实现了从易快报收货确认到美国人资产系统的数据ETL转换和写入。这不仅确保了数据的一致性和准确性,还提升了业务流程的自动化程度。 ![金蝶云星空API接口配置](https://pic.qeasy.cloud/T8.png~tplv-syqr462i7n-qeasy.image)