ETL流程:将吉客云数据转入用友BIP的最佳实践

  • 轻易云集成顾问-叶威宏
### 系统对接集成案例分享:吉客云·奇门数据集成到用友BIP 在系统集成过程中,确保数据的高效、准确传递是关键。本文将分享通过轻易云平台,将2C吉客云·奇门的数据无缝对接到用友BIP的解决方案。具体运行方案名为“2C吉客云-销售单--->YS-销售出库单红字-成功”,重点探讨如何实现大规模、高速的数据转移与精确处理。 #### 一、API接口配置和调用 首先,我们需要从吉客云·奇门获取销售订单的非敏感信息,其提供了`jackyun.tradenotsensitiveinfos.list.get` API接口。这一过程包含多个技术要点,其中包括处理分页请求和应对限流策略,以确保不漏单,并保证每次抓取的数据完整性。在配置中,通过定时任务可靠地抓取这些接口数据,实现数据的准时获取。不仅如此,还需要监控API调用状态,一旦发生异常能够及时重试。 为了将上述数据写入用友BIP系统,我们使用其提供的`/yonbip/scm/salesout/mergeSourceData/save` API进行操作。在此过程中,需要关注两个方面:一是大量数据快速写入带来的性能挑战,二是如何处理两种系统之间的数据格式差异问题。有效利用轻易云平台自定义的数据转换逻辑,可自动适配不同业务需求和结构化要求,从而使得跨系统间数据映射更具灵活性。此外,通过批量处理机制,可以显著提升整体效率,有效缩短同步时间。 #### 二、实时监控与告警机制 轻易云平台提供集中式的监控和告警功能,使我们可以实时跟踪整个数据集成流程中的任务状态。一旦检测到任何异常或瓶颈现象,立即触发告警并启动自动修复措施。同时,通过日志记录功能保存每一次操作详情,为后续排查问题提供依据。如果出现错误,例如由于网络波动导致某条记录未能成功写入,可以依靠内置错误重试机制,即刻重新尝试提交该笔交易,保障最终一致性。 #### 三、性能优化及资源管理 对于企业而言,高吞吐量的数据写入能力极其重要,它不仅提高了业务响应速度,也降低了服务器压力。借助统一视图与控制台,我们全面掌握API资产使用情况,实现资源优化配置。例如,在处理海量销售订单时,通过预判峰值负载提前进行硬件扩展,这样既避免因突发流量引起服务宕机,又最大化利用现有IT基础设施。 以上介绍为本次系统对接项目实施的一部分技术细节,下文 ![金蝶与CRM系统接口开发配置](https://pic.qeasy.cloud/D16.png~tplv-syqr462i7n-qeasy.image) ### 调用吉客云·奇门接口获取并加工数据的技术案例 在数据集成的生命周期中,调用源系统接口获取数据是至关重要的一步。本文将详细探讨如何通过轻易云数据集成平台调用吉客云·奇门接口`jackyun.tradenotsensitiveinfos.list.get`,并对获取的数据进行初步加工。 #### 接口配置与调用 首先,我们需要配置接口请求参数。根据元数据配置,`jackyun.tradenotsensitiveinfos.list.get`接口采用POST方法,支持分页查询,并且要求起始时间和结束时间必须同时存在,时间间隔不能超过七天。 以下是关键的请求参数配置: - `modified_begin` 和 `modified_end`: 这两个字段用于指定修改时间范围,必须同时存在。 - `pageSize`: 每页记录数,默认值为50,最大值为1000。我们将其设置为100。 - `pageIndex`: 页码,从0开始。 - `startConsignTime` 和 `endConsignTime`: 发货时间范围,用于筛选特定时间段内的订单。 - `fields`: 需要返回的字段列表,以逗号分隔。 示例请求体如下: ```json { "modified_begin": "2023-09-01T00:00:00", "modified_end": "2023-09-07T23:59:59", "pageSize": "100", "pageIndex": "0", "startConsignTime": "{{DAYS_AGO_1|datetime}}", "endConsignTime": "{{CURRENT_TIME|datetime}}", "fields": "tradeNo,postFee,goodsDetail.goodsNo,goodsDetail.sellCount" } ``` #### 数据清洗与转换 在成功调用接口并获取数据后,我们需要对数据进行清洗和转换,以便后续处理和写入目标系统。以下是一些常见的数据清洗与转换操作: 1. **字段映射**:将源系统中的字段映射到目标系统所需的字段。例如,将`tradeNo`映射为目标系统中的订单号。 2. **数据过滤**:根据业务需求过滤不必要的数据。例如,只保留订单状态为已完成(tradeStatus >= 6000)的记录。 3. **格式转换**:将日期、金额等字段转换为目标系统所需的格式。例如,将发货时间从字符串格式转换为日期格式。 4. **计算派生字段**:根据现有字段计算出新的派生字段。例如,根据商品数量和单价计算总金额。 示例代码片段如下: ```python def clean_and_transform(data): cleaned_data = [] for record in data: if record['tradeStatus'] >= 6000: transformed_record = { 'order_no': record['tradeNo'], 'post_fee': float(record['postFee']), 'goods_details': [ { 'goods_no': item['goodsDetail']['goodsNo'], 'sell_count': int(item['goodsDetail']['sellCount']) } for item in record['goodsDetail'] ] } cleaned_data.append(transformed_record) return cleaned_data ``` #### 实时监控与异常处理 为了确保数据集成过程的稳定性和可靠性,我们需要对整个流程进行实时监控,并设置异常处理机制。轻易云平台提供了全透明可视化的操作界面,可以实时监控数据流动和处理状态。 在实际操作中,我们可以设置定时任务(如每小时执行一次)来自动调用接口并处理数据。同时,对于可能出现的异常情况(如网络故障、接口超时等),可以设置重试机制或报警通知,以便及时响应和处理。 示例定时任务配置如下: ```json { "crontab": "0 * * * *", "task": { "type": "http_request", "url": "/api/jackyun/tradenotsensitiveinfos/list/get", "method": "POST", "body": { ... }, "retry_policy": { "max_retries": 3, "interval_seconds": 60 } } } ``` 通过上述步骤,我们可以高效地调用吉客云·奇门接口获取销售单数据,并对其进行清洗和转换,为后续的数据处理和分析奠定基础。在整个过程中,轻易云平台提供了强大的支持,使得复杂的数据集成任务变得更加简便和高效。 ![用友与外部系统接口集成开发](https://pic.qeasy.cloud/S20.png~tplv-syqr462i7n-qeasy.image) ### 轻易云数据集成平台生命周期的第二步:ETL转换与写入用友BIP 在数据集成过程中,ETL(Extract, Transform, Load)是至关重要的一环。本文将详细探讨如何使用轻易云数据集成平台将已经集成的源平台数据进行ETL转换,并转为用友BIP API接口所能够接收的格式,最终写入目标平台。 #### 数据请求与清洗 在开始ETL转换之前,首先需要从源系统中请求并清洗数据。假设我们已经完成了这一步,并且得到了所需的销售单数据。接下来,我们将重点放在如何将这些数据转换为用友BIP API接口能够接受的格式。 #### 数据转换与写入 为了将数据写入用友BIP,我们需要配置API接口和相应的元数据。以下是元数据配置示例: ```json { "api": "/yonbip/scm/salesout/mergeSourceData/save", "method": "POST", "idCheck": true, "request": [ { "field": "mergeSourceData", "label": "是否合并上游数据", "type": "string", "describe": "是否合并上游数据。值为true的话则sourceid:来源单据主表id,sourceautoid:来源单据行id,makeRuleCode:单据转换规则,_status:操作标识,不可为空。", "value": "true" }, { "field": "vouchdate", "label": "单据日期", "type": "string", "describe": "单据日期 示例:2020-11-30 00:00:00", "value": "{consignTime}" }, { "field": "warehouse", "label": "仓库id或编码", "type": "string", "describe": "仓库id或编码 示例:Z001", "value": "{warehouseCode}" }, { "field": "bizFlow", ... } ], ... } ``` #### 关键字段解析 1. **mergeSourceData**: 表示是否合并上游数据。在本案例中,设置为`true`,意味着需要提供`sourceid`、`sourceautoid`等字段。 2. **vouchdate**: 单据日期,通过占位符`{consignTime}`动态填充。 3. **warehouse**: 仓库ID或编码,通过占位符`{warehouseCode}`动态填充。 4. **details**: 销售出库单子表,其中包含多个子字段,如数量、来源单据主表ID、来源单据行ID等。 #### 子字段详细配置 在`details`数组中,每个子字段都有其特定的配置和描述。例如: - **qty**: 数量,通过占位符`{sellCount}`动态填充。 - **sourceid**: 来源单据主表ID,通过MongoDB查询获取。例如: ```json "_mongoQuery 12c6ff61-07b9-3c7d-9327-4d7b82fa6e8e findField=content.new_id where={\"content.code\":{\"$eq\":\"{tradeNo}\"}}" ``` - **batchno**: 批次号,通过函数判断和查询获取。例如: ```json "_function case _findCollection find detail.batchManage from 7aed2c28-cc02-35c8-aa6a-2d750373f9e1 where code={goodsNo} _endFind when false then '' else '001' end" ``` #### 数据写入 配置完成后,将通过POST请求将处理后的数据发送到用友BIP API接口: ```http POST /yonbip/scm/salesout/mergeSourceData/save HTTP/1.1 Host: api.yonyou.com Content-Type: application/json Authorization: Bearer <token> { ... } ``` 确保每个字段都按照API要求正确填写,并且所有必填项均已提供。 #### 实时监控与错误处理 在实际操作中,需要实时监控数据流动和处理状态,以便及时发现和解决问题。例如,可以通过轻易云平台提供的监控工具查看每个环节的数据处理情况。如果出现错误,需要根据返回的信息进行调试和修正。 通过上述步骤,我们实现了从源系统到用友BIP的无缝数据对接。这不仅提高了业务效率,也确保了数据的一致性和准确性。 ![系统集成平台API接口配置](https://pic.qeasy.cloud/T15.png~tplv-syqr462i7n-qeasy.image)