利用轻易云实现ETL转换并集成到金蝶云星空

  • 轻易云集成顾问-姚缘
### 吉客云数据集成到金蝶云星空技术案例分享:仓库方案-I0123 在“仓库方案-I0123”项目中,我们面临的主要挑战是如何高效地将吉客云中的大量 Warehouse 数据接口(erp.warehouse.get),通过自动化流程,快速且可靠地写入金蝶云星空(batchSave)。本文将详细解析这一系统对接的关键技术点以及解决方案。 首先,要处理的数据量非常庞大,为此我们充分利用了平台支持高吞吐量的数据写入能力。为了确保每条数据都能准确地传输到目标系统中,我们采用了批量集成方法,将数据从吉客云接口定时抓取,并进行分页处理和限流控制,以避免API调用过载。同时,通过自定义的数据转换逻辑,应对两套系统之间的数据格式差异,确保各字段能够精准映射。 一个显著难点在于怎么样实时监控整个数据流动过程。我们部署了集中式监控与告警系统,不仅能即时追踪当前任务状态和性能,还具备异常检测功能,可以提前识别并处理潜在的问题。例如,当遇到网络延迟或调用失败时,该机制会触发错误重试策略,保障最终的数据一致性和完整性。此外,日志记录功能也为故障排查提供了详实的依据。 具体来说,在初期阶段,我们通过可视化设计工具,对整个数据流进行了图形化建模,使得业务人员无需深入代码即可理解全局流程。同时,引入 API 资产管理功能,从统一视图全面掌握各个API使用情况,提高资源配置效率。这一整套严密流程不但提升了工作效率, 还使得操作更透明易懂,有助于及时发现并解决问题。 简而言之,通过合理运用各种技术手段实现了一次成功且稳定的多元数据集成,这不仅满足客户对于高性能、高可靠性的要求,也为后续类似项目积累了宝贵经验。在接下来的章节里,我将继续详细介绍这个复杂场景下所使用的一些核心技术细节及其实现思路。 ![如何对接用友BIP接口](https://pic.qeasy.cloud/D10.png~tplv-syqr462i7n-qeasy.image) ### 调用吉客云接口erp.warehouse.get获取并加工数据 在数据集成的生命周期中,调用源系统接口是至关重要的第一步。本文将详细探讨如何通过轻易云数据集成平台调用吉客云接口`erp.warehouse.get`来获取并加工仓库数据。 #### 接口调用配置 首先,我们需要配置元数据以便正确调用吉客云的API。以下是元数据配置的具体内容: ```json { "api": "erp.warehouse.get", "effect": "QUERY", "method": "POST", "number": "warehouseCode", "id": "warehouseCode", "idCheck": true, "request": [ {"label": "页码", "field": "pageIndex", "type": "string"}, {"label": "页大小", "field": "pageSize", "type": "string", "value": "50"}, {"label": "起始开始时间", "field": "gmtModifiedStart", "type": "string", "value":"{{LAST_SYNC_TIME|datetime}}"}, {"label": "结束修改时间", "field":"gmtModifiedEnd","type":"string", "value":"{{CURRENT_TIME|datetime}}"} ], "autoFillResponse": true, "condition_bk":[[{"field":"isBlockup","logic":"ieqv2","value":"0"}]], "condition":[[{"field":"isBlockup","logic":"eqv2","value":"0"}]] } ``` #### 请求参数详解 1. **页码 (pageIndex)**:用于分页请求,类型为字符串。 2. **页大小 (pageSize)**:每页返回的数据条数,固定值为50。 3. **起始开始时间 (gmtModifiedStart)**:用于指定查询的起始时间,动态取值为上次同步时间。 4. **结束修改时间 (gmtModifiedEnd)**:用于指定查询的结束时间,动态取值为当前时间。 这些参数确保了我们能够获取到最新且未被封存的仓库信息。 #### 数据请求与清洗 在发起请求时,我们使用POST方法将上述参数传递给吉客云接口`erp.warehouse.get`。由于配置了`autoFillResponse`为true,平台会自动处理响应数据并填充到目标表中。 此外,为了确保只获取有效的数据,我们设置了条件过滤: - `condition_bk` 和 `condition` 都包含一个逻辑条件,即 `isBlockup = 0`,表示只获取未封存的仓库信息。 #### 实际操作步骤 1. **配置元数据**:在轻易云平台中,根据上述元数据配置创建一个新的API调用任务。 2. **设置动态参数**:确保 `gmtModifiedStart` 和 `gmtModifiedEnd` 的值分别对应上次同步时间和当前时间。 3. **执行任务**:启动任务后,平台会自动向吉客云发起请求,并根据返回的数据进行处理和填充。 #### 数据转换与写入 虽然本文重点在于数据请求与清洗,但值得一提的是,在获取到原始数据后,还需要进行必要的数据转换和写入操作。这通常包括: - 数据格式转换 - 字段映射 - 数据校验与清洗 这些步骤确保最终的数据能够无缝对接到目标系统中,实现真正意义上的数据集成。 通过以上步骤,我们可以高效地从吉客云获取仓库信息,并进行必要的数据处理,为后续的数据转换与写入奠定基础。 ![钉钉与WMS系统接口开发配置](https://pic.qeasy.cloud/S28.png~tplv-syqr462i7n-qeasy.image) ### 利用轻易云数据集成平台进行ETL转换并写入金蝶云星空 在数据集成的生命周期中,ETL(提取、转换、加载)是关键步骤之一。本文将详细探讨如何利用轻易云数据集成平台,将已经集成的源平台数据进行ETL转换,并最终写入金蝶云星空API接口所能接收的格式。 #### 数据请求与清洗 在数据请求与清洗阶段,我们已经从源系统中提取了原始数据,并进行了初步的清洗和整理。接下来,我们需要将这些数据转换为目标平台——金蝶云星空所能接受的格式。 #### 配置元数据 为了实现这一目标,我们需要根据以下元数据配置来定义我们的API请求: ```json { "api": "batchSave", "effect": "EXECUTE", "method": "POST", "idCheck": true, "request": [ {"field":"FName","label":"仓库名称","type":"string","value":"{warehouseName}"}, {"field":"FNumber","label":"编码","type":"string","value":"{warehouseCode}"}, {"field":"FStockProperty","label":"仓库属性","type":"string","value":"1"}, {"field":"FCreateOrgId","label":"FCreateOrgId","type":"string","value":"I0123","parser":{"name":"ConvertObjectParser","params":"FNumber"}}, {"field":"FUseOrgId","label":"FUseOrgId","type":"string","value":"I0123","parser":{"name":"ConvertObjectParser","params":"FNumber"}}, {"field":"FAllowMinusQty","label":"允许即时库存","type":"string","value":"true"} ], "otherRequest": [ {"field": "FormId", "label": "业务对象表单Id", "type": "string", "describe": "必须填写金蝶的表单ID如:PUR_PurchaseOrder", "value": "BD_STOCK"}, {"field": "Operation", "label": "执行的操作", "type": "string", "describe": "执行的操作", "value": "BatchSave"}, {"field": "IsAutoSubmitAndAudit", "label": "提交并审核", "type": "bool", "describe": "提交并审核", "value": true}, {"field": "IsVerifyBaseDataField", "label": "验证基础资料", "type": "bool", "describe": "是否验证所有的基础资料有效性,布尔类,默认false(非必录)", "value": true} ], "operation":{"method":"batchArraySave","rows":100,"rowsKey":"array"} } ``` #### 数据转换与写入 在这一阶段,我们需要对源平台的数据进行处理,使其符合金蝶云星空API接口的要求。具体步骤如下: 1. **字段映射与转换**: - 将源数据中的字段映射到目标API所需字段。例如,将`warehouseName`映射到`FName`,将`warehouseCode`映射到`FNumber`。 - 使用元数据配置中的`parser`进行必要的数据转换。例如,使用`ConvertObjectParser`将组织ID转换为相应的编码格式。 2. **构建请求体**: - 根据元数据配置中的`request`部分构建请求体。确保每个字段都被正确填充,并且类型匹配。 - 示例请求体: ```json { FName: '仓库A', FNumber: 'WH001', FStockProperty: '1', FCreateOrgId: 'I0123', FUseOrgId: 'I0123', FAllowMinusQty: 'true' } ``` 3. **其他请求参数**: - 根据元数据配置中的`otherRequest`部分添加额外的请求参数。例如,设置业务对象表单ID为`BD_STOCK`,操作类型为`BatchSave`, 并启用自动提交和审核功能。 4. **批量处理**: - 使用元数据配置中的批量处理方法(如 `batchArraySave`) 对多个记录进行批量处理。确保每次处理不超过指定行数(如100行)。 5. **发送请求**: - 使用POST方法将构建好的请求体发送到金蝶云星空API接口。确保在发送前对ID进行检查以避免重复记录。 #### 示例代码实现 以下是一个简化的Python示例代码,用于展示如何实现上述步骤: ```python import requests import json def convert_data(source_data): # 转换源数据为目标格式 target_data = [] for item in source_data: target_item = { 'FName': item['warehouseName'], 'FNumber': item['warehouseCode'], 'FStockProperty': '1', 'FCreateOrgId': 'I0123', 'FUseOrgId': 'I0123', 'FAllowMinusQty': 'true' } target_data.append(target_item) return target_data def send_to_kdcloud(data): url = '<金蝶云星空API地址>' headers = {'Content-Type': 'application/json'} payload = { 'FormId': 'BD_STOCK', 'Operation': 'BatchSave', 'IsAutoSubmitAndAudit': True, 'IsVerifyBaseDataField': True, 'data': data } response = requests.post(url, headers=headers, data=json.dumps(payload)) if response.status_code == 200: print("Data successfully sent to Kingdee Cloud") else: print("Failed to send data:", response.text) source_data = [ {'warehouseName': '仓库A', 'warehouseCode': 'WH001'}, {'warehouseName': '仓库B', 'warehouseCode': 'WH002'} ] converted_data = convert_data(source_data) send_to_kdcloud(converted_data) ``` 通过上述步骤和示例代码,我们可以高效地将源平台的数据转换并写入到金蝶云星空,实现不同系统间的数据无缝对接。这不仅提升了业务流程的自动化程度,也保证了数据的一致性和准确性。 ![用友BIP接口开发配置](https://pic.qeasy.cloud/T15.png~tplv-syqr462i7n-qeasy.image)