使用轻易云平台完成ETL转换并写入目标系统

  • 轻易云集成顾问-何语琴
### 案例分享:金蝶云星空数据集成到轻易云集成平台 在本案例中,我们将详细探讨如何通过轻易云数据集成平台高效对接和处理来自金蝶云星空的收料通知单数据。具体方案以“KD2 查询金蝶收料通知单-关联查询”为主,通过调用金蝶云星空的`executeBillQuery`接口,实现定期可靠的数据抓取,并快速写入到轻易云集成平台。 #### 接口调用与分页限流处理 大规模的数据对接首先要解决的是接口调用过程中的分页和限流问题。在实际操作中,使用`executeBillQuery`接口时,需明确每次请求返回数据的最大条目量,以及服务端的限流机制。为此,我们采用了一种基于递归分页请求的方法来确保所有数据可以完整无误地获取。 ```json { "queryParam": { "billType": "RPD", "pageSize": 100, "currentPage": 1 } } ``` 上述示例展示了一个基本的API请求结构,其中设置了每页返回100条记录,并指明当前页码。我们将持续递增`currentPage`参数直至没有更多可用数据,从而实现全量抓取。 #### 数据格式差异与映射规则 当从金蝶云星空成功获取原始数据后,需要进行必要的数据转换,以适配轻易云集成平台预定义的数据格式。这部分工作通常涉及元数据显示结构映射、字段重命名以及数值单位转换等步骤。以下是一个简化后的映射示例: ```json { "sourceField1": "${kingdeeFieldA}", "sourceField2": "${kingdeeFieldB}" } ``` 通过预定义映射规则,可以确保不同系统之间的数据格式差异被平滑处理,保障后续写入操作顺利进行。 #### 定制化批量写入策略 针对大量收料通知单需要实时同步的问题,我们设计了一套分步批量提交策略。在保证事务一致性的前提下,将多条收料通知单打包形成一个提交批次,提高执行效率并减少网络消耗。例如,利用如下JSON结构完成一次批量写入操作: ```json { "batchData": [ {"fieldA":"value1",...}, {"fieldA":"value2",...} ] } ``` 如此不仅提升了传输速度,还降低了响应时间,有效提高系统整体性能表现。 以上开头部分构建了基础框架及主要技术难点,在随后的文章内容中,将进一步深入探讨各个环节的具体实现技术细节和优化方案,包括异常处理、错误重试 ![金蝶与SCM系统接口开发配置](https://pic.qeasy.cloud/D22.png~tplv-syqr462i7n-qeasy.image) ### 调用金蝶云星空接口executeBillQuery获取并加工数据 在数据集成的生命周期中,调用源系统接口获取数据是至关重要的一步。本文将详细探讨如何通过轻易云数据集成平台调用金蝶云星空的`executeBillQuery`接口来获取收料通知单的数据,并对其进行初步加工。 #### API 接口配置 首先,我们需要配置API接口的元数据。以下是配置的关键字段: - **api**: `executeBillQuery` - **method**: `POST` - **number**: `FBillNo` - **id**: `FDetailEntity_FEntryID` - **pagination**: `{"pageSize":500}` - **idCheck**: `true` 这些配置确保了我们能够正确地调用金蝶云星空的API,并处理分页和唯一标识符等问题。 #### 请求参数配置 为了准确地获取所需的数据,我们需要配置请求参数。以下是主要的请求字段及其含义: ```json [ {"field":"FBillNo","label":"单据编号","type":"String","value":"FBillNo"}, {"field":"FDocumentStatus","label":"单据状态","type":"String","value":"FDocumentStatus"}, {"field":"FMaterialId","label":"物料编码","type":"String","value":"FMaterialId.fnumber"}, {"field":"FStockOrgId_FNumber","label":"收料组织","type":"String","value":"FStockOrgId.FNumber"}, {"field":"FMaterialName","label":"物料名称","type":"String","value":"FMaterialName"}, {"field":"FDate","label":"收料日期","type":"String","value":"FDate"}, {"field":"FMateriaModel","label":"规格型号","type":"String","value":"FMateriaModel"}, {"field":"FBillTypeID.fnumber","label":"单据类型","type":"String","value":"FBillTypeID.fnumber"} ] ``` 这些字段涵盖了收料通知单的基本信息,如单据编号、状态、物料编码、收料组织等。 #### 分页参数配置 为了处理大批量数据,我们需要设置分页参数: ```json [ {"field": "Limit", "label": "最大行数", "type": "string", "describe": "金蝶的查询分页参数", "value": "{PAGINATION_PAGE_SIZE}"}, {"field": "StartRow", "label": "开始行索引", "type": "int", "describe": "金蝶的查询分页参数", "value": "{PAGINATION_START_ROW}"} ] ``` 这些参数确保我们能够分批次地获取数据,避免一次性请求过多导致性能问题。 #### 过滤条件配置 为了提高查询效率,我们可以设置过滤条件。例如,只获取已审核且库存组织为特定值的数据: ```json [ { "field": "FilterString", "label": "过滤条件", "type": "string", "describe": "示例写法 FSupplierId.FNumber = 'VEN00010' and FApproveDate>=", "value": "FDOCUMENTSTATUS='C' and FApproveDate>='{{LAST_SYNC_TIME|datetime}}' and FStockID.F_KD_WDTSTOCK<>'' and FStockOrgId='231399'" } ] ``` 这个过滤条件确保我们只获取符合特定条件的数据,从而减少不必要的数据传输和处理。 #### 字段键集合配置 为了明确我们需要哪些字段,可以通过以下配置指定字段键集合: ```json [ { "field": "FieldKeys", "label": "需查询的字段key集合", "type": "array", "describe": "金蝶分录主键ID格式:FPOOrderEntry_FEntryId,其它格式 FPurchaseOrgId.FNumber", "parser":{"name":"ArrayToString", "params": ","} } ] ``` 这个配置将所有需要查询的字段以逗号分隔的字符串形式传递给API。 #### 表单ID配置 最后,我们需要指定业务对象表单ID,以确保查询的是正确的业务对象: ```json [ { "field": "FormId", "label": "业务对象表单Id", "type": "string", "describe": "必须填写金蝶的表单ID如:PUR_PurchaseOrder", "value": "`PUR_ReceiveBill`" } ] ``` 这个表单ID明确了我们要查询的是收料通知单的数据。 通过以上详细的元数据和请求参数配置,我们可以高效地调用金蝶云星空的`executeBillQuery`接口,获取所需的数据并进行初步加工。这一步骤为后续的数据转换与写入奠定了坚实基础。 ![钉钉与ERP系统接口开发配置](https://pic.qeasy.cloud/S11.png~tplv-syqr462i7n-qeasy.image) ### 使用轻易云数据集成平台进行ETL转换并写入目标平台 在数据集成过程中,ETL(Extract, Transform, Load)是一个关键步骤。本文将深入探讨如何利用轻易云数据集成平台将源平台的数据进行转换,并通过API接口写入目标平台。 #### 数据请求与清洗 首先,假设我们已经完成了从金蝶系统中查询收料通知单的数据请求和清洗工作。这些数据可能包含多个字段,如订单号、物料编码、数量等。在这个阶段,我们的目标是将这些清洗后的数据转换为目标平台所能接收的格式。 #### 数据转换 在轻易云数据集成平台中,数据转换是通过配置元数据来实现的。元数据定义了如何将源数据映射到目标数据结构中。在本案例中,我们的元数据配置如下: ```json { "api": "写入空操作", "effect": "EXECUTE", "method": "POST", "idCheck": true } ``` 该配置表明我们将使用POST方法调用“写入空操作”API,并且需要进行ID检查。具体的转换步骤如下: 1. **字段映射**:根据业务需求,将源平台的数据字段映射到目标平台所需的字段。例如,将金蝶系统中的“订单号”映射为目标平台中的“order_id”。 2. **格式转换**:如果源数据的格式与目标平台要求的不一致,需要进行格式转换。例如,将日期格式从“YYYY-MM-DD”转换为“MM/DD/YYYY”。 3. **数据校验**:在写入之前,确保所有必填字段都已填充,并且数据符合目标平台的校验规则。例如,ID检查可以确保每条记录都有唯一标识符。 #### 数据写入 完成数据转换后,我们需要通过API接口将处理后的数据写入目标平台。根据元数据配置,我们使用POST方法调用API接口。以下是一个示例代码片段,展示如何使用Python实现这一过程: ```python import requests import json # 定义API接口URL和头信息 api_url = "https://api.targetplatform.com/write" headers = { "Content-Type": "application/json", "Authorization": "Bearer YOUR_ACCESS_TOKEN" } # 构建要发送的数据 data = { "order_id": "12345", "material_code": "ABC123", "quantity": 100, # 其他字段... } # 将数据转换为JSON格式 payload = json.dumps(data) # 发送POST请求 response = requests.post(api_url, headers=headers, data=payload) # 检查响应状态码 if response.status_code == 200: print("Data successfully written to the target platform.") else: print(f"Failed to write data: {response.status_code} - {response.text}") ``` 在这个示例中,我们首先定义了API接口的URL和头信息,然后构建要发送的数据并将其转换为JSON格式。接下来,通过`requests.post`方法发送POST请求,并检查响应状态码以确认操作是否成功。 #### 实时监控与错误处理 轻易云数据集成平台提供了实时监控功能,可以跟踪每个ETL过程中的状态。如果在写入过程中发生错误,例如网络故障或API调用失败,可以通过日志和监控界面快速定位问题并进行修复。 此外,为了提高系统的鲁棒性,可以在代码中添加更多的错误处理逻辑,例如重试机制和异常捕获: ```python try: response = requests.post(api_url, headers=headers, data=payload) response.raise_for_status() # 如果响应状态码不是200,将引发HTTPError异常 except requests.exceptions.RequestException as e: print(f"An error occurred: {e}") ``` 通过这种方式,即使在遇到临时性故障时,也能保证系统的稳定运行。 总结来说,通过合理配置元数据并利用轻易云数据集成平台提供的API接口,我们可以高效地完成从源平台到目标平台的数据ETL过程,实现不同系统间的数据无缝对接。这不仅提升了业务透明度和效率,也确保了每个环节的数据准确性和一致性。 ![金蝶与MES系统接口开发配置](https://pic.qeasy.cloud/T5.png~tplv-syqr462i7n-qeasy.image)