ETL转换在轻易云数据集成中的应用与实践

  • 轻易云集成顾问-谢楷斌
### 案例分享:金蝶云星空间结算价目表数据集成 在本文中,我们将详细探讨如何通过轻易云数据集成平台,实现从一个金蝶云星空实例到另一个金蝶云星空实例的组织间结算价目表取价功能。这一技术方案旨在利用轻易云的数据处理能力和可视化操作界面,达到高效、可靠的数据对接和实时监控。 首先,使用executeBillQuery API接口,从源系统中抓取了所需的结算价目表数据。在这个过程中,需要特别注意接口调用中的分页与限流问题,以确保数据能够完整且不漏单地被获取。为此,我们配置了自动重试机制以及分页工具,并辅以可视化监控手段来保障任务执行不会受到意外中断或性能瓶颈影响。 抓取到价格数据后,通过自定义的数据转换逻辑,对其进行必要的格式调整,以适应目标系统金蝶云星空的批量写入规范。随后,利用batchSave API,将处理过的数据快速而高效地写入目标系统。在此步骤中,高吞吐量的数据写入能力发挥了关键作用,使大量价格条目得以迅速上传至目标数据库,提高整体处理效率。 此外,为确保整个流程透明顺畅,我们部署了实时监控和告警系统,这不仅可以跟踪每个作业状态,还能及时发现并解决潜在的问题。通过这种集中监控机制,可以显著提升对API资产的管理水平,实现资源优化配置和最大化利用。 以上则是本次案例"组织间结算价目表取价OK"技术实施方案的开端部分,下文将继续阐述具体实现细节,包括错误重试机制定制、日志记录策略及异常处理方法等内容。 ![如何对接金蝶云星空API接口](https://pic.qeasy.cloud/D1.png~tplv-syqr462i7n-qeasy.image) ### 调用源系统金蝶云星空接口executeBillQuery获取并加工数据 在数据集成的生命周期中,调用源系统接口获取数据是至关重要的一步。本文将深入探讨如何通过轻易云数据集成平台调用金蝶云星空的`executeBillQuery`接口来获取组织间结算价目表的数据,并对其进行初步加工。 #### 接口调用配置 首先,我们需要配置元数据,以便正确地调用金蝶云星空的`executeBillQuery`接口。以下是元数据配置的关键部分: ```json { "api": "executeBillQuery", "method": "POST", "number": "FNumber", "id": "FEntity_FEntryID", "pagination": { "pageSize": 500 }, "idCheck": true, "request": [ {"field":"FEntity_FEntryID","label":"FEntryID","type":"string","value":"FEntity_FEntryID"}, {"field":"FDocumentStatus","label":"数据状态","type":"string","value":"FDocumentStatus"}, {"field":"FForbidStatus","label":"禁用状态","type":"string","value":"FForbidStatus"}, {"label":"核算组织","field":"FCreateOrgId","type":"string","value":"FCreateOrgId"}, {"label":"编码","field":"FNumber","type":"string","value":"FNumber"}, {"label":"名称","field":"FName","type":"string","value":"FName"}, {"label":"备注","field":"FDescription","type":"string","value":"FDescription"}, {"label":"物料编码","field":"FMATERIALID_FNumber","type":"string","value":"FMATERIALID.FNumber"}, {"label":"含税单价","field":"FTaxPrice","type":"string","value":"FTaxPrice"}, {"label":"FTaxRate","field":"税率%","type":"string","value":"FTaxRate"} ], "otherRequest": [ {"field": "Limit", "label": "最大行数", "type": "string", "describe": "金蝶的查询分页参数", "value": "{PAGINATION_PAGE_SIZE}"}, {"field": "StartRow", "label": "开始行索引", "type": "string", "describe": "金蝶的查询分页参数", "value": "{PAGINATION_START_ROW}"}, {"field": "TopRowCount", "label": "返回总行数", "type": "int", "describe": "金蝶的查询分页参数"}, {"field": "FilterString", "label": "过滤条件", "type": "string", "describe": "示例写法 FSupplierId.FNumber = 'VEN00010' and FApproveDate>=", "value": "FModifyDate>='{{LAST_SYNC_TIME|date}}'"}, {"field": "FieldKeys", "label": "需查询的字段key集合", "type": "array", "describe": "金蝶分录主键ID格式:FPOOrderEntry_FEntryId,其它格式 FPurchaseOrgId.FNumber", "parser": {"name": "ArrayToString", "params": "," } }, { field: FormId, label: 业务对象表单Id, type: string, describe: 必须填写金蝶的表单ID如:PUR_PurchaseOrder, value: IOS_PriceList } ] } ``` #### 数据请求与清洗 在进行数据请求时,我们需要确保请求参数的准确性和完整性。以下是一个示例请求体: ```json { "_api_": "/k3cloud/Kingdee.BOS.WebApi.ServicesStub.DynamicFormService.ExecuteBillQuery.common.kdsvc", "_method_": "_POST_", "_params_":{ "_FormId_":"", "_FieldKeys_":["FBillNo,FDate,FMaterialId.FNumber,FMaterialName,FQty"], "_FilterString_":"", "_OrderString_":"", "_TopRowCount_":"", "_StartRow_":"", "_Limit_":"", "_SubSystemId_":"", "_IsSortBySeq_":"", "_IsGroupBySeq_":"", "_IsGroupBySeqDesc_":"", "_IsGroupBySeqAsc_":"", "_IsGroupBySeqDescAsc_":"", "_IsGroupBySeqAscDesc_":"", _IsGroupBySeqDescAscDesc:"", _IsGroupBySeqAscDescAsc:"", _IsGroupBySeqDescAscAsc:"", _IsGroupBySeqAscDescDesc:"", _IsGroupBySeqDescAscDesc:"", _IsGroupBySeqAscDescAsc:"", _IsGroupBySeqDescAscAsc:"", _IsGroupBySeqAscDescDesc:"" } } ``` 在这个请求体中,我们需要特别注意以下几个字段: - `FormId`: 必须填写金蝶的表单ID,例如`IOS_PriceList`。 - `FieldKeys`: 包含我们需要查询的字段集合。 - `FilterString`: 用于指定过滤条件,例如根据最后同步时间进行过滤。 #### 数据转换与写入 在获取到原始数据后,我们需要对其进行转换和清洗,以便后续的数据处理和分析。例如,将日期格式统一、去除无效数据等。 以下是一个简单的数据转换示例: ```python def transform_data(raw_data): transformed_data = [] for record in raw_data: transformed_record = { 'entry_id': record['FEntity_FEntryID'], 'document_status': record['FDocumentStatus'], 'forbid_status': record['FForbidStatus'], 'create_org_id': record['FCreateOrgId'], 'number': record['FNumber'], 'name': record['FName'], 'description': record['FDescription'], 'material_number': record['FMATERIALID.FNumber'], 'tax_price': float(record['FTaxPrice']), 'tax_rate': float(record['FTaxRate']) } transformed_data.append(transformed_record) return transformed_data ``` 在这个示例中,我们将原始记录中的各个字段提取出来,并进行了必要的数据类型转换,如将价格和税率转换为浮点数。 #### 总结 通过上述步骤,我们可以高效地调用金蝶云星空的`executeBillQuery`接口,获取所需的数据并进行初步加工。这一步骤为后续的数据处理和分析奠定了坚实基础。在实际操作中,细致地配置元数据和精确地处理请求参数,是确保数据集成成功的重要因素。 ![如何开发钉钉API接口](https://pic.qeasy.cloud/S21.png~tplv-syqr462i7n-qeasy.image) ### 轻易云数据集成平台ETL转换与金蝶云星空API接口对接技术案例 在数据集成的生命周期中,ETL(Extract, Transform, Load)转换是关键的一步。本文将深入探讨如何利用轻易云数据集成平台,将已经集成的源平台数据进行ETL转换,并转为金蝶云星空API接口所能够接收的格式,最终写入目标平台。 #### 数据请求与清洗 首先,我们需要从源系统获取原始数据,并进行初步清洗。这一步骤虽然不在本文的重点范围内,但需要确保数据的准确性和完整性,为后续的ETL转换打下基础。 #### 数据转换与写入 在数据转换阶段,我们使用轻易云的数据处理功能,将源数据转化为金蝶云星空API所需的格式。以下是详细步骤: 1. **元数据配置解析** 根据提供的元数据配置,我们可以看到需要向金蝶云星空API发送的数据结构。以下是主要字段及其配置: ```json { "api": "batchSave", "method": "POST", "idCheck": true, "request": [ {"field": "FDocumentStatus", "label": "数据状态", "type": "string"}, {"field": "FForbidStatus", "label": "禁用状态", "type": "string"}, {"field": "Fname", "label": "名称", "type": "string"}, {"field": "FNumber", "label": "编码", "type": "string"}, {"field": "FDescription", "label": "描述", "type": "string"}, {"field": "FCreateOrgId", "label": "创建组织", "type": "string", "parser":{"name":"ConvertObjectParser","params":"FNumber"} }, {"field": "FUseOrgId", "label":"使用组织", "type":"string", "parser":{"name":"ConvertObjectParser","params":"FNumber"} } ], ... } ``` 2. **字段映射与转换** 在轻易云平台上,我们可以通过配置字段映射和转换规则,将源系统的数据字段映射到目标系统所需的字段。例如,`FCreateOrgId` 和 `FUseOrgId` 需要通过 `ConvertObjectParser` 转换为目标系统可识别的编码。 ```python def convert_object_parser(source_value, params): # 假设我们有一个映射表,将源系统组织编码转化为目标系统编码 mapping_table = { 'source_org_1': 'target_org_1', 'source_org_2': 'target_org_2' } return mapping_table.get(source_value, source_value) # 示例转换 source_data = { 'FCreateOrgId': 'source_org_1', 'FUseOrgId': 'source_org_2' } target_data = { 'FCreateOrgId': convert_object_parser(source_data['FCreateOrgId'], 'FNumber'), 'FUseOrgId': convert_object_parser(source_data['FUseOrgId'], 'FNumber') } print(target_data) # 输出: {'FCreateOrgId': 'target_org_1', 'FUseOrgId': 'target_org_2'} ``` 3. **构建请求体** 根据元数据配置中的 `request` 字段,我们需要构建一个符合金蝶云星空API要求的请求体。以下是一个示例请求体: ```json { "FormId": "FIN_OTHERS", "Operation": "BatchSave", ... // 数据数组 array: [ { FDocumentStatus: 'A', FForbidStatus: 'B', Fname: 'example_name', FNumber: 'example_number', FDescription: 'example_description', FCreateOrgId: target_data['FCreateOrgId'], FUseOrgId: target_data['FUseOrgId'] } // 可以继续添加更多记录 ] } ``` 4. **发送请求** 最后一步是通过HTTP POST方法将构建好的请求体发送到金蝶云星空API。以下是一个Python示例代码: ```python import requests url = "<金蝶云星空API地址>" headers = { 'Content-Type': 'application/json' } response = requests.post(url, json=request_body, headers=headers) if response.status_code == 200: print("Data successfully saved to Kingdee Cloud") else: print(f"Failed to save data. Status code: {response.status_code}, Response: {response.text}") ``` 通过以上步骤,我们成功地将已经集成的源平台数据进行ETL转换,并转为金蝶云星空API接口所能够接收的格式,最终写入目标平台。在实际操作中,需要根据具体业务需求调整字段映射和转换规则,以确保数据准确无误地传输到目标系统。 ![用友与MES系统接口开发配置](https://pic.qeasy.cloud/T20.png~tplv-syqr462i7n-qeasy.image)