ETL实现用友BIP和源平台数据对接的最佳实践

  • 轻易云集成顾问-谢楷斌
### 用友BIP数据集成实践:其他出库单同步(转库)-p 案例 在企业信息化系统中,确保各业务系统的数据一致性和实时同步是至关重要的。本文将分享一个基于用友BIP的具体案例——"其他出库单同步(转库)-p",探讨如何通过轻易云数据集成平台实现精准、高效的数据对接与处理。 #### 确保集成用友BIP数据不漏单 在本次方案实施过程中,我们首先需要确保从源系统抓取到所有必须的数据,不遗漏任何一笔出库单记录。这一过程主要利用了用友BIP提供的API接口 `/yonbip/scm/othoutrecord/list` 来获取最新出库单列表。我们采用了定时抓取机制,每隔一定时间自动调用该接口拉取新记录,并通过唯一标识符校验避免重复或遗漏。 #### 大量数据快速写入到用友BIP 对于大量业务数据导入,我们使用的是批量写入技术。借助轻易云平台强大的并发处理能力,将多个出库单打包后,通过用友BIP的另一组API接口 `/yonbip/scm/othoutrecord/batchaudit` 实现高效率批量提交。这种方式不仅显著提升了传输速度,同时降低了网络负载与服务器压力。 #### 调用分页和限流问题的解决方案 当面对大规模数据请求时,API接口可能会面临分页及限流问题。在本案例中,为了解决这些挑战,我们设计了一套可靠的分页获取策略,即每次调用都带有页码参数以确保完整翻页。此外,对于限流控制,则是根据API提供方的规定适配合理配置,并通过错误重试机制来保证稳定性。当出现限流响应时,系统会等待指定时间再进行重新请求,直至成功。 #### 数据格式差异处理与映射对接 值得注意的是,用友BIP不同模块之间可能存在着字段名称或格式上的差异。在这个项目里,我们为每个字段创建明确映射关系,通过脚本转换使得两端数据结构完全兼容。同时,也设置了自定义规则对异常值进行预先过滤和改造,以便最终的数据能顺畅地完成迁移与整合。 这只是整体步骤中的一些关键环节,它们共同组成一个完善而灵活的数据集成解决方案。接下来将在详细部分介绍整个流程中的具体配置及实现代码,使您能够进一步理解其复杂原理及实际操作方法。 ![用友与外部系统接口集成开发](https://pic.qeasy.cloud/D22.png~tplv-syqr462i7n-qeasy.image) ### 调用用友BIP接口获取并加工数据的技术案例 在轻易云数据集成平台中,调用源系统用友BIP接口`/yonbip/scm/othoutrecord/list`获取并加工数据是数据处理生命周期的第一步。以下将详细探讨如何配置和调用该接口,并对返回的数据进行清洗和转换。 #### 接口配置与调用 首先,我们需要配置元数据以便正确调用用友BIP接口。以下是关键的元数据配置项: - **API路径**:`/yonbip/scm/othoutrecord/list` - **请求方法**:POST - **分页参数**: - `pageIndex`(页号):默认值为1 - `pageSize`(每页行数):默认值为10 - **过滤条件**: - `bustype_name`不等于“报废”或“盘亏” - **请求字段**: - `code`(单据编码):默认值为10 - `open_vouchdate_begin`(开始日期) - `open_vouchdate_end`(结束日期) - `warehouse_name`(仓库) - `org_id`(库存组织id),使用逗号分隔的字符串转换为数组 - `org_name`(库存组织名称) - `org_code`(库存组织编码),使用逗号分隔的字符串转换为数组 - `stockMgr_name`(物料),使用逗号分隔的字符串转换为数组 - `operator_name`(库管员),使用逗号分隔的字符串转换为数组 - `department_name`(部门),使用逗号分隔的字符串转换为数组 - `product_cName`(物料名称),使用逗号分隔的字符串转换为数组 - `product_productClass_name`(物料分类id),示例值为false,默认值也为false,使用逗号分隔的字符串转换为数组 #### 请求示例 ```json { "pageIndex": "1", "pageSize": "10", "code": "10", "open_vouchdate_begin": "2023-01-01", "open_vouchdate_end": "2023-12-31", "warehouse_name": "", "org_id": ["1553156871271481351"], "org_name": "", "org_code": "", "stockMgr_name": "", "operator_name": "", "department_name": "", "product_cName": "", "product_productClass_name": ["false"], "isSum": "" } ``` #### 数据清洗与转换 在获取到响应数据后,需要对其进行清洗和格式化。根据元数据配置中的formatResponse部分,我们需要将返回的数据字段进行重命名和类型转换。例如,将返回结果中的字段`id`重命名为`new_id`,并将其类型转换为字符串。 ##### 响应格式化示例 假设我们从接口获取到以下响应: ```json [ { "id": 12345, "code": "OUT001", ... }, ... ] ``` 我们需要将其格式化为: ```json [ { "new_id": "12345", "code": "OUT001", ... }, ... ] ``` #### 条件过滤与逻辑运算 根据元数据中的condition部分,我们需要对响应数据进行进一步过滤。例如,过滤掉交易类型为“报废”或“盘亏”的记录。这可以通过在处理响应时添加相应的逻辑来实现。 ##### 条件过滤示例 ```python filtered_data = [record for record in response_data if record['bustype_name'] not in ['报废', '盘亏']] ``` 通过上述步骤,我们可以确保从用友BIP接口获取的数据符合业务需求,并且经过清洗和格式化后,可以无缝集成到目标系统中。这一过程不仅提高了数据处理效率,还保证了数据的一致性和准确性。 ![钉钉与CRM系统接口开发配置](https://pic.qeasy.cloud/S12.png~tplv-syqr462i7n-qeasy.image) ### 将源平台数据转换为用友BIPAPI接口格式并写入目标平台 在轻易云数据集成平台的生命周期中,数据转换与写入是关键步骤之一。本文将详细探讨如何将已经集成的源平台数据进行ETL转换,使其符合用友BIPAPI接口的接收格式,并最终写入目标平台。 #### 数据请求与清洗 在数据集成过程中,首先需要从源系统请求数据并进行清洗。这一步骤确保了数据的准确性和一致性,为后续的ETL转换奠定了基础。然而,本案例重点在于数据转换与写入,因此我们假设数据已经过清洗,并且符合预期格式。 #### 数据转换 为了将源平台的数据转换为用友BIPAPI接口所能接收的格式,我们需要对元数据配置进行深入理解和应用。以下是元数据配置的详细信息: ```json { "api": "/yonbip/scm/othoutrecord/batchaudit", "method": "POST", "idCheck": true, "request": [ { "label": "其他出库单[st.othoutrecord.OthOutRecord]", "field": "data", "type": "array", "children": [ { "parent": "data", "label": "主表id", "field": "id", "type": "string", "value": "{new_id}" } ] } ] } ``` 根据以上配置,我们需要将源平台的数据结构化为一个数组,其中包含其他出库单的信息,每个出库单需要包含一个唯一标识符(主表id)。 #### ETL过程 1. **提取(Extract)**: 从源系统中提取其他出库单的数据。这些数据可能包括多个字段,但我们只关注主表id。 2. **转换(Transform)**: 根据元数据配置,将提取到的数据转换为目标格式。具体来说,需要将每个出库单的数据包装在一个数组中,并确保每个出库单包含一个`id`字段,该字段的值应为`{new_id}`。 3. **加载(Load)**: 将转换后的数据通过POST请求发送到用友BIPAPI接口。以下是一个示例代码片段,展示了如何实现这一过程: ```python import requests import json # 假设从源系统提取到的数据如下 source_data = [ {"id": "12345"}, {"id": "67890"} ] # 转换后的目标数据结构 transformed_data = { "data": [{"id": item["id"]} for item in source_data] } # API请求配置 api_url = "/yonbip/scm/othoutrecord/batchaudit" headers = { 'Content-Type': 'application/json' } # 发起POST请求 response = requests.post(api_url, headers=headers, data=json.dumps(transformed_data)) # 检查响应状态 if response.status_code == 200: print("Data successfully loaded into target platform.") else: print(f"Failed to load data: {response.text}") ``` #### 注意事项 1. **ID检查**: 配置中`idCheck`字段设置为`true`,这意味着在加载数据之前,需要确保每个记录都有一个有效的ID。如果ID缺失或无效,可能会导致加载失败。 2. **错误处理**: 在实际操作中,应当对可能出现的错误进行处理。例如,当API请求失败时,应记录错误信息并采取相应措施,以确保系统稳定性和数据完整性。 3. **性能优化**: 对于大规模的数据集成任务,可以考虑批量处理,以提高性能和效率。同时,应监控系统资源使用情况,避免因资源耗尽而导致系统崩溃。 通过上述步骤,我们成功地将源平台的数据转换为用友BIPAPI接口所能接收的格式,并顺利写入目标平台。这一过程不仅确保了数据的一致性和准确性,还极大提升了业务流程的自动化程度。 ![打通钉钉数据接口](https://pic.qeasy.cloud/T23.png~tplv-syqr462i7n-qeasy.image)