案例分享:用友BIP数据集成到轻易云集成平台
在当今企业数字化转型的过程中,系统间的数据集成成为提升业务效率和确保信息透明度的重要环节。本文将分享一个经典案例——如何将用友BIP中的销售退货单数据无缝对接到轻易云数据集成平台,为企业实现高效的数据处理与监控。
背景概述
为了提供更精准和及时的数据支持,我们选择了从用友BIP接口/yonbip/sd/vouchersalereturn/list
抓取销售退货单数据,并使用轻易云的写入API进行批量数据处理。本次方案命名为“查询YS销售退货单-p”,旨在解决以下几个技术问题:
- 如何确保所有销售退货单不漏单。
- 处理大规模的数据快速写入需求。
- 定时可靠地抓取接口数据,避免重复或遗漏。
- 针对分页和限流策略进行优化配置。
技术细节
步骤一:准确调用用友BIP接口
我们首先分析了用友BIP提供的API文档,明确/yonbip/sd/vouchersalereturn/list
接口要求,并准备必要的认证信息。随后,通过定制化的HTTP请求模块,实现定时调用该API以获取最新销售退货单列表。
步骤二:处理分页与限流
由于用友BIP API存在分页返回机制,我们设计了自动递归调用策略,确保每次获取完整的数据页。同时,为应对可能出现的请求频率限制,我们添加了一层限流控制逻辑,以保证不会超出服务端设定的访问阈值,从而维护稳定连接。
步骤三:转换与映射至轻易云格式
拿到原始JSON响应后,需要将其转换为符合目标数据库Schema要求。这一步中,我们利用轻易云强大的自定义映射功能,将不同字段精确匹配并检验,以消除任何因格式差异带来的存储障碍。此外,对一些必需但缺失的信息,会通过预设规则补全,以保持完整性和一致性。
通过这一系列流程,不仅实现了两大系统之间无缝衔接,还能实时监控、记录每个阶段操作状态,大幅提升业务透明度及应急响应能力。在下文中,将详述各环节具体实施方案,以及实例代码展示。
调用用友BIP接口获取并加工数据
在数据集成生命周期的第一步中,我们需要调用源系统的API接口以获取原始数据,并进行初步加工。本文将深入探讨如何通过轻易云数据集成平台调用用友BIP接口/yonbip/sd/vouchersalereturn/list
,并对返回的数据进行处理。
API接口调用配置
首先,我们需要配置API接口的基本信息和请求参数。根据元数据配置,API的基本信息如下:
- API路径:
/yonbip/sd/vouchersalereturn/list
- 请求方法:
POST
请求参数包括多个字段,如下所示:
[
{"field":"open_createTime_begin","label":"制单开始时间","type":"string","describe":"制单开始时间"},
{"field":"code","label":"单据编号","type":"string","describe":"单据编号"},
{"field":"pageIndex","label":"页号 默认值:1","type":"string","describe":"页号 默认值:1","value":"1"},
{"field":"pageSize","label":"每页行数 默认值:10","type":"string","describe":"每页行数 默认值:10","value":"50"},
{"field":"salesOrgId_name","label":"销售组织id","type":"string","describe":"销售组织id"},
{"field":"saleReturnStatus","label":"单据状态","type":"string","describe":"单据状态"},
{"field":"saleReturnSourceType","label":"退货类型","type":"string","describe":"退货类型"},
{"field":"orderNo","label":"订单编号","type":"string","describe":"订单编号"},
{"field":"agentId_name","label":"客户名称","type":"string","describe":"客户名称"},
{"field":"open_createTime_end","label":"制单结束时间","type": "string", "describe": "制单结束时间"},
{"field": "isSum", "label": "查询表头 示例:false 默认值:false", "type": "string", "describe": "查询表头 示例:false 默认值:false"}
]
请求条件设置
为了确保我们获取到符合业务需求的数据,需要设置查询条件。根据元数据配置,查询条件如下:
[
[
{"field": "saleReturnStatus", "logic": "eq", "value": "SALERETURNING"},
{"field": "salesOrgId_name", "logic": "eq", "value": "北京翼讯世纪科技有限公司"}
],
[
{"field": "saleReturnStatus", "logic": "eq", "value": "CONFIRMSALERETURNORDER"},
{"field": "salesOrgId_name", "logic": "eq", "value": "北京翼讯世纪科技有限公司"}
]
]
这些条件确保我们只获取特定状态和特定销售组织的数据。
数据格式转换
在获取到原始数据后,需要对数据进行格式转换,以便后续处理。根据元数据配置,需将返回的数据字段进行重命名和格式化:
[
{"old": "id", "new": "new_id", "format": "string"},
{"old": "saleReturnDetailId", "new": "new_saleReturnDetailId", "format": "string"}
]
这一步骤确保了数据字段名称的一致性,并将字段格式转换为字符串类型。
实际操作步骤
-
构建请求体: 根据上述请求参数和条件,构建POST请求体。例如:
{ "open_createTime_begin": "", "code": "", ... // 其他参数 ... // 查询条件 { "$or":[ { "$and":[ { "$eq":["saleReturnStatus", "SALERETURNING"], "$eq":["salesOrgId_name", "北京翼讯世纪科技有限公司"] } ] }, { "$and":[ { "$eq":["saleReturnStatus", "CONFIRMSALERETURNORDER"], "$eq":["salesOrgId_name", "北京翼讯世纪科技有限公司"] } ] } ] } }
-
发送请求: 使用轻易云平台提供的HTTP客户端发送POST请求,并捕获响应。
-
处理响应: 对响应数据进行格式转换。例如,将
id
字段重命名为new_id
,并将其格式化为字符串。 -
存储或进一步处理: 将处理后的数据存储到目标系统或进行进一步的数据清洗和转换操作。
通过上述步骤,我们实现了从用友BIP系统获取销售退货单数据,并对其进行了初步加工。这是轻易云数据集成平台生命周期管理中的关键一步,为后续的数据转换与写入奠定了基础。
数据ETL转换与写入目标平台的技术实现
在数据集成生命周期的第二步,我们需要将已经集成的源平台数据进行ETL转换,使其符合目标平台API接口所能够接收的格式,最终写入目标平台。本文将详细探讨这一过程中的技术细节和实现方法。
数据请求与清洗
在数据集成过程中,首先需要从源系统中请求数据并进行初步清洗。假设我们已经成功获取了YS销售退货单的数据,并对其进行了必要的清洗操作,使数据具备基本的一致性和完整性。
数据转换
接下来,我们需要将这些清洗后的数据进行转换,以符合目标平台API接口的要求。在本案例中,目标平台为轻易云集成平台,其API接口配置如下:
{
"api": "写入空操作",
"method": "POST",
"idCheck": true
}
这一配置表明,我们需要通过POST方法调用“写入空操作”API,并且在写入数据时需要进行ID检查。
转换步骤
-
字段映射:首先,需要确定源数据字段与目标API接口字段之间的映射关系。假设YS销售退货单的数据结构如下:
{ "returnOrderId": "12345", "productCode": "P001", "quantity": 10, "returnReason": "Damaged" }
而目标API接口期望的数据结构可能是:
{ "order_id": "12345", "item_code": "P001", "qty": 10, "reason": "Damaged" }
-
数据类型转换:确保源数据字段类型与目标API接口字段类型一致。例如,如果源系统中的数量(quantity)字段是字符串类型,而目标系统期望的是整数类型,则需要进行类型转换。
-
ID检查:根据配置中的
idCheck
参数,我们需要在写入之前检查是否存在重复的ID。如果存在,则可能需要更新而不是插入新记录。
转换实现
可以使用Python编写一个简单的ETL脚本来实现上述转换过程:
import requests
import json
# 假设从源系统获取的数据
source_data = [
{"returnOrderId": "12345", "productCode": "P001", "quantity": 10, "returnReason": "Damaged"},
# 更多数据...
]
# 转换函数
def transform_data(source):
return {
"order_id": source["returnOrderId"],
"item_code": source["productCode"],
"qty": int(source["quantity"]),
"reason": source["returnReason"]
}
# 转换后的数据列表
transformed_data = [transform_data(record) for record in source_data]
# 写入目标平台函数
def write_to_target(data):
api_url = 'https://api.qingyiyun.com/write_empty_operation'
headers = {'Content-Type': 'application/json'}
for record in data:
response = requests.post(api_url, headers=headers, data=json.dumps(record))
if response.status_code == 200:
print(f"Record {record['order_id']} written successfully.")
else:
print(f"Failed to write record {record['order_id']}: {response.text}")
# 执行写入操作
write_to_target(transformed_data)
写入目标平台
最后一步是将转换后的数据通过API接口写入到目标平台。在上述脚本中,通过requests.post
方法调用“写入空操作”API,将每条记录发送到轻易云集成平台。此处要特别注意HTTP响应状态码,以确保每条记录都成功写入或处理失败时采取相应措施。
错误处理与日志记录
为了提高系统的可靠性和可维护性,建议在实际应用中加入错误处理和日志记录功能。例如,可以捕获请求异常并记录详细日志,以便后续排查问题:
import logging
logging.basicConfig(filename='etl_process.log', level=logging.INFO)
def write_to_target(data):
api_url = 'https://api.qingyiyun.com/write_empty_operation'
headers = {'Content-Type': 'application/json'}
for record in data:
try:
response = requests.post(api_url, headers=headers, data=json.dumps(record))
if response.status_code == 200:
logging.info(f"Record {record['order_id']} written successfully.")
else:
logging.error(f"Failed to write record {record['order_id']}: {response.text}")
except Exception as e:
logging.error(f"Exception occurred while writing record {record['order_id']}: {str(e)}")
# 执行写入操作
write_to_target(transformed_data)
通过上述技术手段,可以高效地完成从源系统到目标平台的数据ETL转换和写入过程,确保数据准确、及时地流转到业务所需的位置。