MongoDB数据写入与ETL转换的详细步骤

  • 轻易云集成顾问-吕修远
### 用友BIP数据集成到MongoDB的系统对接实践 在企业的信息化管理中,数据融合与实时同步是确保各业务系统高效运作的关键环节。本文将分享一个结合用友BIP和MongoDB的数据集成案例——YS网店与客户/组织映射关系对接MongDB,通过该案例展示如何高效、安全地实现从用友BIP获取数据并写入MongoDB。 #### 集成方案概述 在这个项目中,我们需要定期从用友BIP接口`/fc0dltfc/qeasy_datahub/qeasy_datahub/query_mapping_shop`抓取YS网店与客户及组织之间的映射关系,并批量导入到MongoDB中。为了确保整个过程不出现漏单、重复以及异常处理不到位等问题,我们采用了一系列技术手段和最佳实践: 1. **保证数据无遗漏**:为了防止漏单现象发生,每次抓取完毕后,会进行全量校验,并记录抓取时间点,以便下次增量更新。 2. **大规模数据快速写入**:利用MongoDB的Insert API,实现大量数据的快速插入需求,即使面对百万级别的数据也能轻松应对。 3. **定时任务调度**:通过定时触发器可靠地调用用友BIP接口,确保以固定间隔获取最新的数据,而不会错过任何更新。 4. **分页与限流机制**:针对用友BIP接口可能存在的数据分页和API限流问题,设计了合适的分页请求逻辑,有效避免因超出限额导致的数据抓取失败。 5. **格式转换和映射定制化**:由于源系统(用友BIP)与目标系统(MongoDB)的数据格式不一致,需要在处理中进行必要的数据转换及自定义字段映射,使两者兼容。 #### 具体实现步骤 1. 从用友BIP API拉取原始JSON格式对象集合,通过HTTP GET方法访问接口地址 `/fc0dltfc/qeasy_datahub/qeasy_datahub/query_mapping_shop`。 2. 对获取的大量JSON对象进行解析,根据预先设定好的字段规则执行必要的属性转换,将其转变为一系列符合MongoDB规范的数据文档结构。 3. 批量使用Insert API向目标数据库中的特定集合表内写入所有经加工后的数据信息,同时建立索引提高查询效率。 在接下来详细描述过程中,我们会深入探讨每个步骤中的技术细节,以及可能遇到的问题及对应解决方案,包括异常处理策略、错误重试机制等,为大家呈现一次完整且高质量的数据集成实践经验。 ![钉钉与WMS系统接口开发配置](https://pic.qeasy.cloud/D25.png~tplv-syqr462i7n-qeasy.image) ### 调用用友BIP接口获取并加工数据的技术案例 在数据集成的生命周期中,调用源系统接口是至关重要的一步。本文将深入探讨如何通过轻易云数据集成平台调用用友BIP接口`/fc0dltfc/qeasy_datahub/qeasy_datahub/query_mapping_shop`获取并加工数据。 #### 接口调用配置 首先,我们需要了解元数据配置中的各个字段及其作用: ```json { "api": "/fc0dltfc/qeasy_datahub/qeasy_datahub/query_mapping_shop", "method": "POST", "number": "shop_no", "id": "shop_no", "idCheck": true, "request": [ { "label": "时间戳", "field": "ts", "type": "string", "value": "{LAST_SYNC_TIME}0000" } ] } ``` - `api`: 指定了要调用的API接口路径。 - `method`: HTTP请求方法,这里使用的是`POST`。 - `number`和`id`: 用于标识记录的唯一字段,这里均为`shop_no`。 - `idCheck`: 表示是否需要进行ID检查,设置为`true`。 - `request`: 请求参数列表,包含一个时间戳字段,用于增量同步。 #### 配置请求参数 在实际操作中,我们需要将元数据配置中的请求参数动态替换为实际值。假设我们有一个变量`LAST_SYNC_TIME`存储上次同步的时间戳,那么我们可以通过以下方式构建请求体: ```json { "ts": "{LAST_SYNC_TIME}0000" } ``` 这里的时间戳被格式化为字符串,并附加了四个零,以符合接口要求。 #### 发起HTTP请求 使用轻易云数据集成平台提供的可视化界面,我们可以配置HTTP请求节点。以下是具体步骤: 1. **选择HTTP请求节点**:在工作流中添加一个HTTP请求节点。 2. **配置URL和方法**:设置URL为`/fc0dltfc/qeasy_datahub/qeasy_datahub/query_mapping_shop`,方法选择`POST`。 3. **设置请求头**:根据用友BIP接口要求,添加必要的请求头信息,如Content-Type等。 4. **构建请求体**:将前面构建好的JSON对象作为请求体。 #### 数据清洗与转换 成功获取数据后,需要对数据进行清洗和转换,以便后续写入目标系统。假设返回的数据格式如下: ```json [ { "shop_no": "001", "shop_name": "Shop A", "address": "Address A", ... }, ... ] ``` 我们可以通过以下步骤进行清洗和转换: 1. **字段映射**:将返回的数据字段映射到目标系统所需的字段。例如,将`shop_no`映射到MongoDB中的 `_id` 字段。 2. **数据过滤**:根据业务需求过滤掉不必要的数据。例如,只保留活跃状态的店铺信息。 3. **格式转换**:将数据格式转换为目标系统所需的格式。例如,将地址信息拆分成多个字段。 #### 写入目标系统 最后一步是将处理后的数据写入MongoDB。轻易云平台提供了丰富的数据写入节点,可以方便地将清洗和转换后的数据写入MongoDB数据库。具体步骤如下: 1. **选择MongoDB写入节点**:在工作流中添加一个MongoDB写入节点。 2. **配置连接信息**:设置MongoDB连接信息,包括数据库名、集合名等。 3. **映射字段**:将处理后的数据字段映射到MongoDB集合中的相应字段。 通过以上步骤,我们完成了从调用用友BIP接口获取数据,到清洗、转换并最终写入MongoDB的全过程。这不仅提高了数据处理效率,还确保了数据的一致性和完整性。 ![金蝶与SCM系统接口开发配置](https://pic.qeasy.cloud/S25.png~tplv-syqr462i7n-qeasy.image) ### 使用轻易云数据集成平台进行ETL转换并写入MongoDB 在数据集成生命周期的第二步,我们将已经集成的源平台数据进行ETL(Extract, Transform, Load)转换,最终转为目标平台 MongoDB API 接口所能够接收的格式,并写入目标平台。以下是详细的技术步骤和元数据配置应用。 #### 数据请求与清洗 首先,我们从源平台提取原始数据,并进行必要的清洗和预处理。这一步骤确保数据的一致性和完整性,为后续的转换和写入打下基础。 #### 数据转换与写入 在数据清洗完成后,接下来就是将这些数据转换为 MongoDB API 所能接受的格式并写入目标数据库。我们使用如下元数据配置来指导这一过程: ```json { "api": "Insert", "effect": "EXECUTE", "method": "POST", "number": "id", "id": "id", "name": "id", "idCheck": true, "request": [ { "field": "channelid_wdt", "label": "旺店通店铺编码", "type": "string", "describe": "单据编码", "value": "{shop_no}" }, { "field": "channelid_ys", "label": "ys客户id", "type": "string", "describe": "流程订货订单开蓝票", "value": "{mapping_customer}" }, { "field": "qeasydataid", "label": "mong库主键", "type": "string", "describe": "实物商品属性", "value": "{id}" } ], "otherRequest": [ { "field": "collectionName", "label": "集合名字", "type": "string", "describe": "", 'value': 'ChannelMapping' } ] } ``` #### 配置解析与应用 1. **API接口配置**: - `api: Insert`:指定使用插入操作。 - `effect: EXECUTE`:表示执行该操作。 - `method: POST`:使用HTTP POST方法提交数据。 2. **字段映射**: - `number`, `id`, `name`:这些字段用于唯一标识记录。 - `idCheck: true`:启用ID检查,确保唯一性。 3. **请求参数**: - `channelid_wdt`(旺店通店铺编码):映射到 `{shop_no}`。 - `channelid_ys`(ys客户ID):映射到 `{mapping_customer}`。 - `qeasydataid`(Mongo库主键):映射到 `{id}`。 4. **其他请求参数**: - `collectionName`:指定集合名字为 `ChannelMapping`。 #### 实际操作步骤 1. **提取源数据**: 从YS网店提取需要的数据,例如店铺编码、客户ID等。 2. **转换数据格式**: 根据上述元数据配置,将提取的数据转换为MongoDB所需的JSON格式。例如: ```json { "_id":"1234567890abcdef", 'channelid_wdt': 'WDT12345', 'channelid_ys': 'YS67890', 'qeasydataid': '9876543210fedcba' } ``` 3. **发送HTTP请求**: 使用POST方法,将转换后的JSON数据发送到MongoDB API接口。例如: ```http POST /insert HTTP/1.1 Host: mongodb.example.com Content-Type: application/json { "_id":"1234567890abcdef", 'channelid_wdt': 'WDT12345', 'channelid_ys': 'YS67890', 'qeasydataid': '9876543210fedcba', 'collectionName': 'ChannelMapping' } ``` 4. **处理响应**: 检查API返回结果,确保数据成功写入MongoDB。如果出现错误,根据返回信息进行调试和修正。 通过以上步骤,我们实现了从YS网店到MongoDB的无缝数据集成。利用轻易云提供的全异步、多种异构系统支持的平台特性,使得整个过程高效且透明。 ![数据集成平台可视化配置API接口](https://pic.qeasy.cloud/T15.png~tplv-syqr462i7n-qeasy.image)