ETL转换:从数据集成到API接口写入详解

  • 轻易云集成顾问-姚缘
### 案例分享:金蝶云星空数据集成到旺店通·旗舰奇门 在日益复杂的供应链管理中,数据的一致性和实时性往往决定了业务决策的效率。本文将重点介绍一个实际案例“柏为供应商08-10”,我们利用轻易云数据集成平台成功实现了金蝶云星空的数据无缝对接到旺店通·旗舰奇门。 为了确保在这一过程中不漏单,我们采取了一系列技术措施。首先,调用金蝶云星空提供的executeBillQuery API接口进行定时可靠的数据抓取。这一API能够高效地查询账单信息,从而保证获取到最新且准确的订单资料。同时,为应对大量订单数据,我们设计了批量数据写入方案,通过调用旺店通·旗舰奇门的wdt.setting.purchaseprovider.push接口,将获取的数据快速、安全地传输至目标系统。 处理分页和限流问题是另一个关键环节。在调度执行过程中,我们通过加入分页逻辑并制定限流策略,有效避免了由于一次性请求过多导致接口性能下降或超时的问题。此外,在面临两者之间的数据格式差异时,通过自定义字段映射和转换策略,使得源系统与目标系统间的数据能够顺利匹配,提高集成效率。 值得注意的是,对接过程中的异常处理与错误重试机制同样不可忽视。一旦出现请求失败或响应异常情况,系统会自动记录详细日志并触发重试机制,从而最大程度上降低操作失误带来的影响,并确保每条订单都能被有效处理。 总之,此次案例展示不仅证明了轻易云数据集成平台在跨系统对接上的优越表现,也为其他类似需求提供了一套可行且高效的解决方案。接下来,我们将深入探讨具体实施细节和优化措施,以期更好地支持业务流程的数字化转型。 ![金蝶与外部系统打通接口](https://pic.qeasy.cloud/D22.png~tplv-syqr462i7n-qeasy.image) ### 调用源系统金蝶云星空接口executeBillQuery获取并加工数据 在轻易云数据集成平台的生命周期中,调用源系统接口是数据请求与清洗阶段的关键步骤。本文将详细探讨如何通过金蝶云星空接口`executeBillQuery`获取供应商数据,并进行必要的数据加工。 #### 接口配置与调用 首先,我们需要配置并调用金蝶云星空的`executeBillQuery`接口。根据提供的元数据配置,接口的基本信息如下: - **API**: `executeBillQuery` - **方法**: `POST` - **表单ID**: `BD_Supplier` 请求体中包含了分页参数、过滤条件和需要查询的字段集合。以下是一个典型的请求示例: ```json { "FormId": "BD_Supplier", "FieldKeys": "FSupplierId,FNumber,FName,FCreateOrgId.FNumber,FUseOrgId.FNumber,FDescription,FShortName,FMinPOValue,FCountry.FNumber,FBusinessStatus,FProvincial.FNumber,FFreezeLimit,FAddress,FZip,FFreezeDate,FPurchaserGroupId.FNumber,FParentSupplierId.FNumber,FLanguage.FNumber,FWebSite,FCustomerId.FNumber,FPayCurrencyId.FNumber,FSettleTypeId.FNumber,FPayCondition.FNumber,FSettleId.FNumber,FPayAdvanceAmount,FTaxType.FNumber,FTaxRegisterCode,FTendType.FNumber,FChargeId.FNumber,FInvoiceType,FTaxRateId.FNumber,FFinanceDesc,FTrade.FNumber,FPRICELISTID.FNumber,FDiscountListId.FNumber,FFoundDate,FNeedConfirm,FProviderId.F.Number,FLegalPerson,FWipStockId.F.Number,FRegisterFund,FRegisterCode,FVmiBusiness,FSOCIALCRECODE,FTendPermit,FVmiStockId.F.Number,FRegisterAddress" } ``` #### 数据过滤与分页 为了确保数据查询的效率和准确性,我们需要设置分页参数和过滤条件。元数据配置中的分页参数如下: - **pageSize**: 每页记录数,默认为100 - **StartRow**: 开始行索引 - **TopRowCount**: 返回总行数 过滤条件可以根据业务需求进行定制,例如: ```json { "FilterString": "FAuditDate >= '{{LAST_SYNC_TIME|dateTime}}'" } ``` #### 数据字段映射与转换 在获取到原始数据后,需要对字段进行映射和转换,以便后续处理。以下是部分关键字段及其映射关系: - **FSupplierId** -> 供应商ID - **FNumber** -> 编码 - **FName** -> 名称 - **FCreateOrgId_F.Number** -> 创建组织编码 - **FUseOrgId_F.Number** -> 使用组织编码 这些字段在请求体中的定义如下: ```json [ {"field": "FSupplierId", "label": "供应商ID", "type": "string", "value": "FSupplierId"}, {"field": "FNumber", "label": "编码", "type": "string", "value": "FNumber"}, {"field": "FName", "label": "名称", "type": "string", "value": "FName"}, {"field": "FCreateOrgId_F.Number", "label": "创建组织编码", "type": "string", "value": "FCreateOrgId_F.Number"}, {"field": "FUseOrgId_F.Number", "label": "使用组织编码", "type": "string", "value": "FUseOrgId_F.Number"} ] ``` #### 数据清洗与加工 在获取并映射原始数据后,需要对数据进行清洗和加工。例如,对于日期类型的数据,可以进行格式转换;对于多选下拉列表,可以将其转换为标准化格式。 以下是一个简单的数据清洗示例,将日期格式从`YYYY-MM-DD`转换为`DD/MM/YYYY`: ```python def clean_date(date_str): from datetime import datetime date_obj = datetime.strptime(date_str, '%Y-%m-%d') return date_obj.strftime('%d/%m/%Y') # 应用到具体字段上 cleaned_data['FFreezeDate'] = clean_date(raw_data['FFreezeDate']) ``` #### 实时监控与日志记录 在整个数据请求与清洗过程中,实时监控和日志记录是确保数据处理透明度和可追溯性的关键。通过轻易云平台提供的监控工具,可以实时查看每个环节的数据流动和处理状态。 日志记录示例如下: ```python import logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) def log_request(request_data): logger.info(f"Request Data: {request_data}") def log_response(response_data): logger.info(f"Response Data: {response_data}") # 在实际调用中记录日志 log_request(request_data) log_response(response_data) ``` 通过上述步骤,我们能够高效地调用金蝶云星空接口获取供应商数据,并对其进行必要的数据清洗和加工,为后续的数据集成奠定基础。 ![企业微信与ERP系统接口开发配置](https://pic.qeasy.cloud/S17.png~tplv-syqr462i7n-qeasy.image) ### 使用轻易云数据集成平台实现ETL转换并写入旺店通·旗舰奇门API接口 在数据集成的生命周期中,ETL(Extract, Transform, Load)转换是一个关键步骤。本文将详细探讨如何使用轻易云数据集成平台将源平台的数据进行ETL转换,并最终写入目标平台旺店通·旗舰奇门API接口。 #### 元数据配置解析 在进行ETL转换之前,我们需要理解元数据配置。这是我们进行数据映射和转换的基础。以下是我们使用的元数据配置: ```json { "api": "wdt.setting.purchaseprovider.push", "method": "POST", "idCheck": true, "request": [ {"field": "provider_no", "label": "供应商编号", "type": "string", "value": "{FNumber}"}, {"field": "provider_name", "label": "供应商名称", "type": "string", "value": "{FName}"}, {"field": "contact", "label": "联系人", "type": "string"}, {"field": "telno", "label": "座机", "type": "string"}, {"field": "mobile", "label": "移动电话", "type": "string"}, {"field": "fax", "label":"传真","type":"string"}, {"field":"qq","label":"QQ","type":"string"}, {"field":"zip","label":"邮编","type":"string"}, {"field":"wangwang","label":"旺旺","type":"string"}, {"field":"email","label":"邮箱","type":"string"}, {"field":"website","label":"网址","type":"string"}, {"field":"address","label":"地址","type":"string"}, {"field":"arrive_cycle_days","label":"到货周期","type":"string"}, {"field":"remark","label":"备注","type":"string"}, {"field":"is_disabled","label":"是否禁用","type":"string"}, {"field":"account_bank_no","label":"银行卡号","type":"string"}, {"field":"account_bank","label":"收款银行","type":"string"}, {"field":"collect_name","label":"收款人","type":"string"}, {"field":"province","label":"省份ID","type":"string"}, {"field":"city","label":"城市ID","type":"string"}, {"field":"district","label":"地区ID","type":"string"} ], “otherRequest”:[ {"field":"objectKey","label":"供应商对象","type":"string","value":"provider"} ] } ``` #### 数据映射与转换 在ETL过程中,我们需要将源平台的数据字段映射到目标平台所需的字段格式。以下是具体的字段映射关系: - `provider_no` 映射到 `{FNumber}` - `provider_name` 映射到 `{FName}` - 其他字段如 `contact`, `telno`, `mobile`, 等则直接从源数据中提取并填充。 例如,假设我们从源平台获取的数据如下: ```json { “FNumber”: “S12345”, “FName”: “供应商A”, “Contact”: “张三”, “TelNo”: “010-12345678”, “Mobile”: “13800000000” } ``` 我们需要将其转换为目标平台所需的格式: ```json { “provider_no”: “S12345”, “provider_name”: “供应商A”, “contact”: “张三”, “telno”: “010-12345678”, “mobile”: “13800000000” } ``` #### 数据写入过程 完成数据转换后,接下来就是将这些数据通过API接口写入到目标平台。根据元数据配置,我们使用POST方法,将处理后的数据发送到`wdt.setting.purchaseprovider.push` API接口。 具体实现可以通过轻易云提供的可视化界面完成,也可以通过编写自定义脚本来实现。例如,使用Python代码实现这一过程: ```python import requests url = 'https://api.wangdian.cn/openapi2/wdt.setting.purchaseprovider.push' headers = {'Content-Type': 'application/json'} data = { 'provider_no': 'S12345', 'provider_name': '供应商A', 'contact': '张三', 'telno': '010-12345678', 'mobile': '13800000000' } response = requests.post(url, json=data, headers=headers) if response.status_code == 200: print('Data successfully written to 旺店通·旗舰奇门') else: print('Failed to write data:', response.text) ``` #### 实时监控与错误处理 在实际操作中,实时监控和错误处理也是至关重要的。轻易云提供了实时监控功能,可以帮助我们及时发现和解决问题。例如,当API请求失败时,可以捕获错误信息并进行相应处理。 ```python try: response = requests.post(url, json=data, headers=headers) response.raise_for_status() except requests.exceptions.HTTPError as errh: print("Http Error:", errh) except requests.exceptions.ConnectionError as errc: print("Error Connecting:", errc) except requests.exceptions.Timeout as errt: print("Timeout Error:", errt) except requests.exceptions.RequestException as err: print("OOps: Something Else", err) ``` 通过以上步骤,我们可以高效地完成从源平台到目标平台的数据ETL转换和写入过程。在整个过程中,确保每个环节的数据准确性和一致性,是成功实现系统集成的关键。 ![如何开发用友BIP接口](https://pic.qeasy.cloud/T5.png~tplv-syqr462i7n-qeasy.image)