轻易云平台中的ETL转换与数据写入实践

  • 轻易云集成顾问-蔡威
### 分布调出查询:如何高效集成金蝶云星空数据到轻易云集成平台 在实施企业信息化系统对接时,数据的精准性和实时性至关重要。本文聚焦于将金蝶云星空的数据高效、可靠地集成到轻易云数据集成平台中的技术案例——分布调出查询。本案例重点阐述如何利用executeBillQuery接口获取金蝶云星空中的财务数据,并通过batchSave接口实现快速批量写入到轻易云平台。 首先,需要确保在与金蝶云星空系统进行数据对接时,不会出现漏单情况。为了达此目标,我们引入了定时抓取机制,通过周期性调用executeBillQuery接口,确保每次请求都能捕捉最新且完整的数据。在构建这一方案过程中,我们还需要处理分页和限流问题,以避免因API接口限制而导致的数据丢失或访问失败。 其次,由于金蝶云星空返回的数据格式可能与轻易云接受的格式不一致,因此我们设计了定制化的数据映射对接功能。这一步骤不仅保证了不同系统间的兼容性,还提高了整个流程的自动化程度。例如,通过元数据配置,将源端字段与目标端字段一一对应,以防止因格式差异造成的信息错误。 再者,在大规模数据传输场景中,能够处理异常并实现错误重试显得尤为关键。我们搭建了一套完善的错题重试机制,当遇到网络波动或者API调用失败等状况时,会自动记录日志并进行多次尝试直至成功。此外,为提升效率,大量数据被分批次快速写入至轻易云平台,通过batchSave接口来保障性能和稳定性。 最后,在实际运行环境中,我们部署了全透明可视化操作界面,使得每个环节的状态都可以实时监控,一旦发生异常,可以第一时间响应和修复。同时,全程实况日志记录为后续审计和优化提供宝贵依据,每一次操作都有迹可循,提高事务安全性的同时也增强团队协作效率及透明度。 这只是分布调出查询项目的一部分内容,下文将继续介绍具体执行细节以及代码实例,以便进一步理解该整体解决方案带来的优势及应用效果。 ![用友BIP接口开发配置](https://pic.qeasy.cloud/D28.png~tplv-syqr462i7n-qeasy.image) ### 调用金蝶云星空接口executeBillQuery获取并加工数据 在轻易云数据集成平台的生命周期中,调用源系统接口是数据集成的第一步。本文将详细探讨如何通过调用金蝶云星空的`executeBillQuery`接口获取数据,并进行初步加工处理。 #### 接口配置与请求参数 在轻易云数据集成平台上,我们需要配置元数据来定义API调用的细节。以下是我们使用的元数据配置: ```json { "api": "executeBillQuery", "method": "POST", "number": "FBillNo", "id": "FSTKTRSOUTENTRY_FEntryID", "pagination": {"pageSize": 500}, "idCheck": true, "request": [ {"field":"FSTKTRSOUTENTRY_FEntryID","label":"FEntryID","type":"string","value":"FSTKTRSOUTENTRY_FEntryID"}, {"field":"FID","label":"实体主键","type":"string","value":"FID"}, {"field":"FBillNo","label":"单据编号","type":"string","value":"FBillNo"}, {"field":"FDocumentStatus","label":"单据状态","type":"string","value":"FDocumentStatus"}, {"field":"FStockOrgID_FNumber","label":"调入库存组织","type":"string","value":"FStockOrgID.FNumber"}, {"field":"FDate","label":"日期","type":"string","value":"FDate"}, {"field":"FBillTypeID","label":"单据类型","type":"string","value":"FBillTypeID"}, {"field":"FTransferDirect","label":"调拨方向","type":"string","value":"FTransferDirect"}, {"field":"FNOTE","label":"备注","type":"string","value":"FNOTE"}, {"field":"FCreateDate","label":"创建日期","type":"string","value":"FCreateDate"}, {"field":"FApproveDate","label":"审核日期","type":"string","value":"FApproveDate"}, {"field":...} ], "otherRequest": [ {"field": "Limit", "label": "最大行数", "type": "string", "describe": "金蝶的查询分页参数", "value": "{PAGINATION_PAGE_SIZE}"}, {"field": "StartRow", "label": "开始行索引", "type": "string", "describe": "金蝶的查询分页参数", "value": "{PAGINATION_START_ROW}"}, ... ] } ``` #### 请求构建与发送 根据上述配置,我们需要构建一个POST请求,向金蝶云星空的`executeBillQuery`接口发送请求。以下是请求体的构建示例: ```json { "FormId": "STK_TRANSFEROUT", "FieldKeys": [ "FID", "FBillNo", ... ], "FilterString": "", ... } ``` 其中,`FormId`指定了业务对象表单Id,`FieldKeys`定义了需查询的字段集合,`FilterString`用于设置过滤条件。 #### 数据清洗与转换 在接收到返回的数据后,需要对其进行清洗和转换,以便后续处理。以下是一个简单的数据清洗示例: ```python def clean_data(raw_data): cleaned_data = [] for entry in raw_data: cleaned_entry = { 'entry_id': entry['FSTKTRSOUTENTRY_FEntryID'], 'bill_no': entry['FBillNo'], 'status': entry['FDocumentStatus'], 'stock_org': entry['FStockOrgID_FNumber'], ... } cleaned_data.append(cleaned_entry) return cleaned_data ``` 通过这种方式,我们可以将原始数据转换为更易于处理和分析的格式。 #### 分页处理 由于一次性获取大量数据可能会导致性能问题,因此我们需要实现分页处理。根据元数据配置中的分页参数,我们可以逐页获取数据: ```python def fetch_all_data(api_endpoint, page_size=500): all_data = [] start_row = 0 while True: response = send_request(api_endpoint, page_size, start_row) data = response.json() if not data: break all_data.extend(data) start_row += page_size return all_data ``` 通过这种方式,我们可以确保高效地获取所有需要的数据。 #### 数据写入与存储 在完成数据清洗和转换后,需要将其写入目标系统或存储到数据库中。这一步通常涉及到调用另一个API或执行数据库插入操作。 ```python def write_to_database(cleaned_data, db_connection): for entry in cleaned_data: db_connection.insert(entry) ``` 通过以上步骤,我们完成了从调用源系统接口获取数据到初步加工处理的全过程。这不仅提高了数据处理效率,也为后续的数据分析和应用打下了坚实基础。 ![如何开发用友BIP接口](https://pic.qeasy.cloud/S17.png~tplv-syqr462i7n-qeasy.image) ### 轻易云数据集成平台生命周期的第二步:ETL转换与数据写入 在轻易云数据集成平台中,数据集成的第二步至关重要,即将已经集成的源平台数据进行ETL(Extract, Transform, Load)转换,转为目标平台能够接收的格式,最终写入目标平台。本文将深入探讨这一过程中涉及的技术细节,特别是API接口的配置和使用。 #### API接口配置 在本案例中,我们使用的是`batchSave` API接口,通过POST方法提交数据。以下是元数据配置的详细解析: ```json { "api": "batchSave", "method": "POST", "idCheck": true, "request": [ {"field": "FBillNo", "label": "单据编号", "type": "string"}, {"field": "FStockOrgID", "label": "调入库存组织", "type": "string", "parser": {"name": "ConvertObjectParser", "params": "FNumber"}}, {"field": "FDate", "label": "日期", "type": "string"}, {"field": "FBillTypeID", "label": "单据类型", "type": "string", "parser": {"name": "ConvertObjectParser", "params": "FNumber"}}, {"field": "FTransferDirect", "label": "调拨方向", "type": "string"}, {"field": "FNOTE", "label": "备注", "type": "string"}, {"field": "FCreateDate", "label": "创建日期", ![如何对接用友BIP接口](https://pic.qeasy.cloud/T4.png~tplv-syqr462i7n-qeasy.image)