实现BDS对账班牛返款表与MySQL的ETL转换技术细节

  • 轻易云集成顾问-杨嫦
### 班牛数据集成到MySQL的技术案例分享:BDS对账班牛返款表 在本篇技术文章中,我们将深入探讨如何通过轻易云数据集成平台完成班牛系统的数据集成,并将其高效写入到MySQL数据库。我们的实际运行方案名称为"BDS对账班牛返款表",主要聚焦于以下几个关键技术点: 首先,利用轻易云提供的可视化数据流设计工具,我们构建了一个直观且高效的整个数据处理流程。从抓取班牛API接口`task.list`的数据,到实现可靠定时任务调度,使得大量返款记录能够稳定地批量获取并转存至MySQL。这一过程中,系统支持分页和限流问题,确保每次请求都在合理范围内,不会导致服务器过载。 其次,通过自定义的数据转换逻辑,我们有效解决了班牛与MySQL之间可能存在的数据格式差异。例如,将JSON格式转化为对应的关系型数据库字段,并进行必要的清洗和校验,以保证数据完整性。这一过程由轻易云的平台自动监控,通过集中式监控和实时告警机制,对每个步骤进行跟踪,一旦发现异常立即处理以保证整个流程顺畅无误。 我们也特别关注到了高吞吐量的数据写入能力。基于这一特性,在大规模多并发场景下,即使是巨大的数据体量,也能快速、平稳地导入到MySQL数据库中,从而显著提升业务响应速度。此外,还实现了错误重试机制以及日志记录功能,有助于及时发现并解决潜在问题,提高集成过程中的可靠性。 以上就是此次共享案例开头部分内容。在后续章节里,将进一步详细介绍具体实施步骤与代码示例,包括如何准确调用班牛接口`task.list`及执行返回值写入到MySQL的方法`executeReturn`等,为大家呈现完整、高质量的系统集成实践。 ![泛微OA与ERP系统接口开发配置](https://pic.qeasy.cloud/D27.png~tplv-syqr462i7n-qeasy.image) ### 调用源系统班牛接口task.list获取并加工数据 在数据集成的生命周期中,调用源系统接口是至关重要的第一步。本文将详细探讨如何通过轻易云数据集成平台调用班牛接口`task.list`,获取并加工数据,以实现高效的数据集成。 #### 接口配置与调用 首先,我们需要了解`task.list`接口的基本配置和调用方式。根据提供的元数据配置,接口采用GET方法进行请求,主要参数如下: - `project_id`: 群组ID - `page_size`: 每页记录数 - `page_num`: 页码 - `star_created`: 起始时间 - `end_created`: 结束时间 - `star_modified`: 修改时间起始时间 - `end_modified`: 修改时间结束时间 这些参数中,有些是固定值,例如`project_id`为"64028",`page_size`为"50",而有些则是动态生成的,例如`star_modified`和`end_modified`。 #### 动态参数处理 动态参数在数据集成过程中尤为重要,它们通常基于当前时间或其他系统变量进行计算。例如: ```json {"field":"star_modified","label":"修改时间起始时间","type":"string","value":"_function DATE_FORMAT(DATE_ADD(NOW(),INTERVAL - 24 HOUR),'%Y-%m-%d %H:%i:%s')"} ``` 上述配置表示获取当前时间前24小时的日期和时间,并格式化为字符串。这种处理方式确保了每次调用接口时,都能获取到最新的数据。 类似地,`end_modified`参数使用了模板变量: ```json {"field":"end_modified","label":"修改时间结束时间","type":"string","value":"{{CURRENT_TIME|datetime}}"} ``` 这意味着在实际请求时,会自动填充当前系统时间。 #### 请求示例 基于上述配置,一个典型的请求URL可能如下所示: ``` https://api.baniu.com/task.list?project_id=64028&page_size=50&page_num=1&star_created=&end_created=&star_modified=2023-10-01%2012:00:00&end_modified=2023-10-02%2012:00:00 ``` #### 数据清洗与转换 获取到原始数据后,下一步是对数据进行清洗和转换。轻易云平台支持多种数据处理操作,包括字段映射、类型转换、过滤等。 例如,我们可以对返回的数据进行字段映射,将原始字段名转换为目标系统所需的字段名。同时,可以根据业务需求过滤掉不必要的数据,只保留符合条件的数据记录。 #### 自动填充响应 元数据配置中的`autoFillResponse:true`表示平台会自动处理响应数据,将其填充到目标结构中。这一步骤大大简化了开发工作量,使得开发者无需手动解析和处理复杂的JSON响应。 #### 条件过滤 元数据配置中还包含了一组条件过滤规则: ```json "condition_bk":[[{"field":77212,"logic":"eqv2","value":"76615"},{"field":5,"logic":"eqv2","value":"1"},{"field":"77248","logic":"gt","value":"0"}]] ``` 这些规则用于进一步筛选返回的数据。例如,只有当字段77212等于76615且字段5等于1且字段77248大于0时,记录才会被保留。这种精细化的过滤机制确保了最终写入目标系统的数据质量和准确性。 通过以上步骤,我们完成了从调用源系统接口到数据清洗与转换的全过程。这一过程不仅提高了数据集成效率,也确保了数据的一致性和可靠性。 ![金蝶与SCM系统接口开发配置](https://pic.qeasy.cloud/S8.png~tplv-syqr462i7n-qeasy.image) ### 数据集成与ETL转换:将源平台数据写入MySQL API接口 在数据集成的生命周期中,将源平台的数据进行ETL转换并写入目标平台是至关重要的一步。本文将详细探讨如何利用轻易云数据集成平台,将BDS对账班牛返款表的数据转换为MySQL API接口所能接收的格式,并最终写入目标平台。 #### 数据请求与清洗 首先,数据请求与清洗阶段确保从源系统获取的数据是准确且符合要求的。此阶段可能涉及多种数据源和复杂的清洗逻辑,以确保数据的一致性和完整性。 #### 数据转换与写入 在完成数据请求与清洗后,接下来就是关键的ETL转换过程。以下是具体步骤: 1. **定义API接口和请求方法**: 我们使用POST方法调用`executeReturn` API接口来提交转换后的数据。该接口支持多种字段类型,包括字符串、对象、数组等。 2. **配置主参数**: 主参数包含了关键的业务信息,如单据编号、系统订单编号、销售渠道编码等。这些字段需要根据业务需求进行映射和赋值。例如: ```json { "field": "bill_no", "label": "单据编号", "type": "string", "value": "{{-1}}" } ``` 这里使用了占位符`{{-1}}`,表示需要从源数据中提取相应的值。 3. **嵌套对象处理**: 对于复杂的嵌套对象,如`shop_name`字段,我们可以通过MongoDB查询来获取其值: ```json { "field": "shop_name", "label": "销售渠道名称", "type": "string", "value": "_mongoQuery e697d435-6a50-3792-a326-33a0cf78fd9b findField=content.options_title where={\"$or\":[{\"content.options_id\" :\"{{64030}}\"},{\"content.options_id\" :\"{{64031}}}]}" } ``` 4. **扩展参数处理**: 扩展参数用于处理1:1关系的数据,如订单详情中的货品信息。每个扩展参数都需要单独配置,例如: ```json { "field": "goods_no", "label": "货品编号", "type": "string", "value": "{{64040.64042}}" } ``` 5. **构建SQL语句**: 最后一步是构建SQL语句,将所有处理后的数据写入MySQL数据库。主语句和扩展语句分别对应不同的数据表: 主语句示例: ```sql INSERT INTO `lhhy_srm`.`supplier_rebate` (`bill_no`, `trade_no`, `online_trade_no`, `source_bill_no`, `shop_code`, `shop_name`, `responsible_party`, `responsible_party_detail`, `reason`, `rebate_receipt`, `status`, `register_date`, `register_seller`, `trade_account`, `trade_address`, `trade_time`, `logistic_name`, `main_post_id`, `return_logistic_name`, `return_main_post_id`, `tranfer_amount`, `tranfer_type`, `tranfer_method`, `bank`, `tranfer_memo`, `service_charge`, `rebate_person`,`tranfer_time`,`tranfer_error_reason`,`tranfer_status`, `seller_memo`,`receiver_info`,`zfb_batch_no`,`zfb_detail_no`, `suppiler_batch_no`,`remark`,`create_time`,`create_by`) VALUES (<{bill_no: }>,<{trade_no: }>,<{online_trade_no: }>,<{source_bill_no: }>, <{shop_code: }>,<{shop_name: }>,<{responsible_party: }>, <{responsible_party_detail: }>,<{reason: }>,<{rebate_receipt: }>, <{status: }>,<{register_date: CURRENT_TIMESTAMP}>,<{register_seller: }>, <{trade_account: }>,<{trade_address: }>,<{trade_time: CURRENT_TIMESTAMP}>, <{logistic_name: }>,<{main_post_id: }>, <{return_logistic_name: }>,<{return_main_post_id: }> , <{tranfer_amount: }> , <{tranfer_type: }> , <{tranfer_method: }> , <{bank: }> , <{tranfer_memo: }> , <{service_charge}> , <{rebate_person}> , <{tranfer_time}> , <{tranfer_error_reason}> , <{tranfer_status}> , <{seller_memo}> , <{receiver_info}> , <{zfb_batch_no}> ,< {zfb_detail_no}> , < {suppiler_batch_no}> ,< {remark}> , CURRENT_TIMESTAMP, 1); ``` 扩展语句示例: ```sql INSERT INTO lhhy_srm.supplier_rebate_detail (order_id, goods_no, goods_name, bar_code, brand_name, spec_name, price, tax_price, tax_rate, tax_amount, unit, volume, quantity, financial_tax_price, goods_memo, remark, supplier_code,supplier_name) VALUES(<lastInsertId>,'{{goodsNo}}','{{goodsName}}', '{{barCode}}','{{brandName}}','{{specName}}', {{price}}, {{taxPrice}}, {{taxRate}}, {{taxAmount}},'pcs','{{volume}}', '{{quantity}}', {{financialTaxPrice}}, '{{goodsMemo}}', '{{remark}}','_mongoQuery 68e141c6-4351-3f2f-b9a2-5eaee8f01a55 findField=content.textField_ln2uyh3e where={content.textField_lfjcloll:{eq:{goodsNo}}}, '_mongoQuery 68e141c6-4351-3f2f-b9a2-5eaee8f01a55 findField=content.selectField_lfmc9gms where={content.textField_lfjcloll:{eq:{goodsNo}}); ``` 通过上述步骤,可以实现将BDS对账班牛返款表的数据成功转换并写入到MySQL API接口。这一过程不仅保证了数据的一致性和完整性,也大大提高了系统间的数据交互效率。 ![电商OMS与ERP系统接口开发配置](https://pic.qeasy.cloud/T19.png~tplv-syqr462i7n-qeasy.image)