数据集成与ETL转换在轻易云平台中的应用实例

  • 轻易云集成顾问-胡秀丛
### 用友BIP数据集成至轻易云平台案例分享:查询调拨仓库对应表-v 在本案例中,我们将探讨如何通过轻易云集成平台高效、可靠地实现用友BIP系统的数据对接。具体的方案名称为“查询调拨仓库对应表-v”,目标在于以兼顾性能和稳定性的方式,将用友BIP接口的相关数据精准录入到轻易云平台。 首先,介绍下整个技术流程及关键步骤: 1. **API调用与数据获取**: 我们利用用友BIP提供的`/fc0dltfc/qeasy_datahub/qeasy_datahub/query_mapping_stock` API接口来抓取调拨仓库对应表的原始数据。在这里,需要处理分页和限流问题,以确保大批量数据提取任务能够顺利进行,并且不会因为超出API限制而导致抓取失败。 2. **传输过程中的格式转换**: 数据从用友BIP系统中获取后,往往存在格式差异,这就需要我们定义自适应的数据转换逻辑。利用轻易云的平台特性,可视化的数据流设计工具,使得这一过程中配置和监控都变得更加直观便捷,有效减少了潜在错误。 3. **写入操作与持续监控**: 整合后的数据最终需要写入到指定位置,对于此案例来说,即是通过`wdt.purchase.provider.create` API完成。这一过程要求支持高吞吐量的大量数据信息快速进入,并具备实时监控功能,以确保每一个处理环节都能精确掌握执行状态,一旦发生异常便可及时告警并启动重试机制。 4. **质量保障与优化配置**: 为保证集成过程中不漏单且维持较高的数据完整性,还需借助集中式的数据质量监控手段。其中,对异常检测尤为重要,它能让运维人员及时发现并修复潜在的问题。此外,综合考虑企业资源使用效率,可以依托统一的控制台全面管理API资产,实现更优资源分配和业务连续运行。 经过这些步骤,高度符合定制需求的数据最终成功平稳地完成了跨系统对接,为企业决策提供有力支撑。 ![钉钉与ERP系统接口开发配置](https://pic.qeasy.cloud/D29.png~tplv-syqr462i7n-qeasy.image) ### 调用用友BIP接口获取并加工数据的技术案例 在轻易云数据集成平台中,调用源系统接口是数据集成生命周期的第一步。本文将深入探讨如何通过调用用友BIP接口`/fc0dltfc/qeasy_datahub/qeasy_datahub/query_mapping_stock`来获取并加工数据。 #### 接口调用配置 根据提供的元数据配置,我们需要使用POST方法调用该接口。以下是具体的配置细节: - **API路径**: `/fc0dltfc/qeasy_datahub/qeasy_datahub/query_mapping_stock` - **请求方法**: POST - **唯一标识字段**: `id` - **请求参数**: - 时间戳 (`ts`): 用于指定上次同步时间点,以便增量获取数据。其值为 `{LAST_SYNC_TIME}0000`,表示从上次同步时间开始到当前时间的数据。 #### 请求参数解析 在请求参数中,时间戳字段 `ts` 是关键参数,用于实现增量数据同步。其值格式为 `{LAST_SYNC_TIME}0000`,其中 `{LAST_SYNC_TIME}` 是一个占位符,将在实际调用时被替换为具体的时间戳。 例如,如果上次同步时间为 `2023-10-01 12:00:00`,则实际传递给接口的 `ts` 值应为 `202310011200000000`。 #### 数据请求与清洗 1. **构建请求体**: 根据元数据配置,构建POST请求体,其中包含时间戳参数。 ```json { "ts": "202310011200000000" } ``` 2. **发送请求**: 使用HTTP客户端(如Postman或编程语言内置库)发送POST请求至指定API路径。 ```python import requests url = "https://your-bip-server.com/fc0dltfc/qeasy_datahub/qeasy_datahub/query_mapping_stock" payload = {"ts": "202310011200000000"} response = requests.post(url, json=payload) if response.status_code == 200: data = response.json() # 处理返回的数据 else: # 错误处理 print(f"Error: {response.status_code}") ``` 3. **数据清洗**: 获取到的数据可能包含多余或不符合业务需求的字段,需要进行清洗和转换。例如,可以过滤掉空值、格式化日期字段等。 ```python def clean_data(data): cleaned_data = [] for item in data: if item.get('id') and item.get('stock'): cleaned_item = { 'id': item['id'], 'stock': int(item['stock']), 'timestamp': item['timestamp'] } cleaned_data.append(cleaned_item) return cleaned_data cleaned_data = clean_data(response.json()) ``` #### 数据转换与写入 在完成数据清洗后,需要将数据转换为目标系统所需的格式,并写入目标数据库或系统。这一步通常包括以下操作: 1. **数据转换**: 将清洗后的数据转换为目标系统所需的格式。例如,将JSON对象转换为SQL插入语句。 ```python def convert_to_sql(data): sql_statements = [] for item in data: sql = f"INSERT INTO target_table (id, stock, timestamp) VALUES ({item['id']}, {item['stock']}, '{item['timestamp']}');" sql_statements.append(sql) return sql_statements sql_statements = convert_to_sql(cleaned_data) ``` 2. **写入数据库**: 使用数据库连接库(如PyMySQL、SQLAlchemy)将转换后的数据写入目标数据库。 ```python import pymysql connection = pymysql.connect(host='localhost', user='user', password='passwd', db='database') try: with connection.cursor() as cursor: for sql in sql_statements: cursor.execute(sql) connection.commit() finally: connection.close() ``` 通过上述步骤,我们实现了从用友BIP接口获取并加工数据的全过程。这一过程充分利用了轻易云数据集成平台提供的全生命周期管理功能,使得每个环节都透明可控,提高了业务效率和准确性。 ![如何对接金蝶云星空API接口](https://pic.qeasy.cloud/S3.png~tplv-syqr462i7n-qeasy.image) ### 数据集成与ETL转换:将源平台数据写入目标平台 在轻易云数据集成平台的生命周期中,数据转换与写入是关键步骤之一。本文将详细探讨如何将已经集成的源平台数据进行ETL(Extract, Transform, Load)转换,并最终通过API接口写入目标平台。 #### API接口配置与元数据解析 在本案例中,我们需要将供应商信息从源系统转换为目标系统所能接收的格式,并通过API接口`wdt.purchase.provider.create`进行写入。以下是该API接口的元数据配置: ```json { "api": "wdt.purchase.provider.create", "method": "POST", "idCheck": true, "request": [ {"field": "provider_no", "label": "供应商编号", "type": "string", "describe": "代表供应商所有属性的唯一编码,用于供应商区分,ERP内支持自定义(ERP供应商界面设置),用于创建供应商数据信息", "value": "{code}"}, {"field": "provider_name", "label": "供应商名称", "type": "string", "describe": "供应商名称", "value": "{name}"}, {"field": "min_purchase_num", "label": "最小采购量", "type": "string", "describe": "最小采购量", "value":"1"}, {"field": "purchase_cycle_days", "label": "采购周期", "type":"string","describe":"采购周期","value":"1"}, {"field":"arrive_cycle_days","label":"到货周期","type":"string","describe":"到货周期","value":"1"}, {"field":"contact","label":"联系人","type":"string","describe":"联系人"}, {"field":"telno","label":"座机","type":"string","describe":"座机"}, {"field":"mobile","label":"移动电话","type":"string","describe":"手机号"}, {"field":"fax","label":"传真","type":"string","describe":"传真"}, {"field":"zip","label":"邮编","type":"string","describe":"邮政编码"}, {"field":"email","label":"邮箱","type":"string","describe":"电子邮箱"}, {"field":"qq","label":"qq","type":"string","describe":"腾讯QQ号码"}, {"field":"wangwang","label":"旺旺","type":"string","describe":"淘宝旺旺号"}, {"field":"address","label":"地址","type":"string","describe":"省、市、区(县)、地址详情"}, {"field":"website","label":"网址","type":"string","describe":"供应商官网地址"}, {"field":"last_purchase_time","label":"最后采购日期","type":"string","describe":"对供应商最后一次采购日期,不传默认接口创建供应商的年月日,格式:yyyy-MM-dd HH:mm:ss"}, {"field":"is_disabled","label":"停用","type":"string","describe":"是否停用"}, {"field":"charge_cycle_days","label":"结算周期","type":"string","describe":"对供应商的账款结算周期,单位(天),不传默认0","value:"1"} ] } ``` #### 数据提取与清洗 首先,从源系统提取原始数据。假设我们从ERP系统中获取了如下JSON格式的数据: ```json { "code": "SUP123", "name": "优质供应商", // 其他字段略 } ``` 在提取阶段,需要确保数据完整性和准确性。对于缺失或异常的数据,需要进行必要的清洗和校正。例如,如果某些字段缺失,可以设置默认值或进行合理推断。 #### 数据转换 根据API接口的要求,将提取的数据映射到目标格式。以下是一个简单的映射示例: ```json { "provider_no": "{code}", "provider_name": "{name}", // 固定值或默认值 // ... } ``` 在实际操作中,可以使用脚本或数据转换工具来自动化这个过程。例如,使用Python脚本进行简单的数据转换: ```python source_data = { 'code': 'SUP123', 'name': '优质供应商', } target_data = { 'provider_no': source_data['code'], 'provider_name': source_data['name'], 'min_purchase_num': '1', 'purchase_cycle_days': '1', 'arrive_cycle_days': '1', } # 添加其他字段... ``` #### 数据写入 最后,通过HTTP POST请求将转换后的数据发送到目标平台API接口。可以使用Python中的`requests`库来实现: ```python import requests url = 'https://api.targetplatform.com/wdt.purchase.provider.create' headers = {'Content-Type': 'application/json'} response = requests.post(url, json=target_data, headers=headers) if response.status_code == 200: print('Data successfully written to target platform.') else: print('Failed to write data:', response.text) ``` #### 实时监控与错误处理 在整个过程中,实时监控和错误处理至关重要。如果出现错误,应记录详细日志并及时通知相关人员,以便快速排查和解决问题。 通过上述步骤,我们成功地将源平台的数据进行了ETL转换,并通过API接口写入了目标平台。这不仅提高了数据处理效率,还确保了数据的一致性和准确性。 ![如何开发用友BIP接口](https://pic.qeasy.cloud/T27.png~tplv-syqr462i7n-qeasy.image)