ETL转换实现物料查询数据写入轻易云

  • 轻易云集成顾问-黄宏棵
### 金蝶云星空数据集成到轻易云集成平台——物料查询_旺店通 在企业信息化管理中,确保各大系统间高效、可靠的数据对接是业务顺利运作的关键。本文将介绍一个具体案例:如何将金蝶云星空中的物料数据通过executeBillQuery接口,快速、安全地集成到轻易云集成平台。该解决方案名为“物料查询_旺店通”。 **1. 确保数据不漏单** 首先,我们必须确保每一条从金蝶云星空获取的物料数据都能准确无误地进入轻易云集成平台。这就需要实现接口调用的完整性和结果验证机制。在此过程中,通过定时任务可靠抓取executeBillQuery接口数据,可以有效减少遗漏。同时,为了应对网络波动或服务宕机问题,还需引入错误重试机制,对未成功的数据传输进行再尝试。 **2. 数据分页及限流处理** 面对大量数据时,应特别注意分页和限流问题。金蝶云星空的executeBillQuery接口支持分页查询,这使得我们可以分批次请求避免系统过载。同样,我们在向轻易云写入大批量数据时,也要遵循其API规范,通过合理设置请求频率和并发数来保障整体流程稳定运行。 **3. 数据格式差异及映射** 由于金蝶与轻易云之间的数据格式可能存在差异,因此我们需要设计一种灵活且高效的数据映射策略。例如,将JSON格式从源端转化为目标端可识别的结构,在这个过程中要考虑字段类型匹配以及潜在转换异常。此外,通过自定义脚本或函数,针对特定字段进行预处理(如时间戳与日期字符串互转),以保证两边系统理解一致。 通过上述几项关键技术手段,我们能够高效完成“物料查询_旺店通”这一系统对接任务,实现了复杂企业应用环境下稳定、高效的数据集成过程。 ![如何开发钉钉API接口](https://pic.qeasy.cloud/D14.png~tplv-syqr462i7n-qeasy.image) ### 调用金蝶云星空接口executeBillQuery获取并加工数据 在数据集成生命周期的第一步中,调用源系统接口获取数据是至关重要的一环。本文将详细探讨如何通过轻易云数据集成平台调用金蝶云星空的`executeBillQuery`接口,以实现物料查询和数据加工。 #### 接口配置与请求参数 首先,我们需要配置调用金蝶云星空接口的元数据。以下是关键的元数据配置: ```json { "api": "executeBillQuery", "method": "POST", "number": "FNumber", "id": "FMATERIALID", "pagination": { "pageSize": 100 }, "request": [ {"field":"FMATERIALID","label":"实体主键","type":"string","value":"FMATERIALID"}, {"field":"FNumber","label":"编码","type":"string","value":"FNumber"}, {"field":"FName","label":"名称","type":"string","value":"FName"}, {"field":"FSpecification","label":"规格型号","type":"string","value":"FSpecification"}, {"field":"FOldNumber","label":"旧物料编码","type":"string","value":"FOldNumber"}, {"field":"FBARCODE","label":"条码","type":"string","value":"FBARCODE"}, {"field":"FDescription","label":"描述","type":"string","value":"FDescription"}, {"field":"FMaterialGroup_FNumber","label":"物料分组","type":"string","value":"FMaterialGroup.FNumber"}, {"field":"FErpClsID","label":"物料属性","type":"string","value":"FErpClsID"}, {"field":"FDocumentStatus","label":"数据状态","type":"string","value":"FDocumentStatus"}, {"field":"FForbidStatus","label":"禁用状态","type":"string","value":"FForbidStatus"}, {"field": "FilterString", "label": "过滤条件", "type": "string", "describe": "示例写法 FSupplierId.FNumber = 'VEN00010' and FApproveDate>=", "value": "FApproveDate>='{{LAST_SYNC_TIME|datetime}}' and FUseOrgId.FNumber = '103'"}, {"field": "FieldKeys", "label": "需查询的字段key集合", "type": "array", "describe": "金蝶分录主键ID格式:FPOOrderEntry_FEntryId,其它格式 FPurchaseOrgId.FNumber", "parser":{"name": "ArrayToString", "params": ","}}, {"field": "FormId", "label": "业务对象表单Id", "type": "string", "describe": "必须填写金蝶的表单ID如:PUR_PurchaseOrder", "value": "BD_MATERIAL"} ], ... } ``` #### 请求示例 为了获取所需的物料信息,我们需要构建一个POST请求,包含必要的参数。以下是一个请求示例: ```json { "_api_": "/k3cloud/Kingdee.BOS.WebApi.ServicesStub.DynamicFormService.ExecuteBillQuery.common.kdsvc", "_args_":[ { "_formid_": "BD_MATERIAL", "_filter_":[ { "_key_":[ { "_key_":["FMATERIALID"], "_op_":["="], "_val_":["1001"] } ] } ], "_orderBy_":"", "_topRowCount_":"", "_startRow_":"", "_limit_":"", "_dataType_":"", "_isSortBySeq_":"", "_isShowEmptyCol_":"", "_isShowEmptyRow_":"", "_isShowTotalRowCount_":"", "_isShowTotalColCount_":"", "_isShowTotalPageCount_":"", "_isShowTotalRecordCount_":"", "_isShowTotalValueCount_":"", "_isShowTotalSummaryCount_":"", "_isShowTotalGroupSummaryCount_":"", } ] } ``` #### 数据清洗与转换 在获取到原始数据后,下一步是进行数据清洗和转换。我们可以利用轻易云平台提供的数据处理工具,对原始数据进行标准化处理。例如,将日期格式统一、去除冗余字段、以及根据业务需求进行字段映射等。 以下是一个简单的数据清洗示例: ```python def clean_data(raw_data): cleaned_data = [] for item in raw_data: cleaned_item = { 'MaterialID': item['FMATERIALID'], 'Code': item['FNumber'], 'Name': item['FName'], 'Specification': item['FSpecification'], 'OldCode': item['FOldNumber'], 'Barcode': item['FBARCODE'], 'Description': item['FDescription'], 'MaterialGroup': item['FMaterialGroup_FNumber'], 'ErpClassID': item['FErpClsID'], 'DocumentStatus': item['FDocumentStatus'], 'ForbidStatus': item['FForbidStatus'] } cleaned_data.append(cleaned_item) return cleaned_data ``` #### 数据写入目标系统 完成数据清洗和转换后,最终将处理后的数据写入目标系统。这一步通常涉及到调用目标系统的API接口,并确保数据格式和结构符合目标系统的要求。 例如,将清洗后的物料信息写入旺店通系统: ```python def write_to_target_system(cleaned_data): for data in cleaned_data: response = requests.post('https://target-system-api.com/endpoint', json=data) if response.status_code != 200: print(f"Failed to write data: {data}") ``` 通过上述步骤,我们实现了从金蝶云星空获取物料信息,并经过清洗和转换后写入目标系统的完整流程。这一过程不仅提高了数据处理效率,也确保了数据的一致性和准确性。 ![打通用友BIP数据接口](https://pic.qeasy.cloud/S25.png~tplv-syqr462i7n-qeasy.image) ### 物料查询数据ETL转换与写入轻易云集成平台 在数据集成生命周期的第二步中,关键任务是将已经集成的源平台数据进行ETL转换,使其符合目标平台API接口所能接收的格式,并最终写入目标平台。本文将详细探讨如何通过轻易云数据集成平台实现这一过程,特别是针对物料查询数据的ETL转换和写入。 #### 数据请求与清洗 首先,我们从源系统获取原始物料查询数据。这一步通常涉及调用源系统的API接口,并根据业务需求进行初步的数据清洗。例如,过滤掉不必要的字段、去除重复记录等。假设我们已经完成了这一步,并获得了如下格式的原始数据: ```json [ {"material_id": "M001", "name": "Material A", "quantity": 100, "price": 10.5}, {"material_id": "M002", "name": "Material B", "quantity": 200, "price": 20.0} ] ``` #### 数据转换 接下来,我们需要将上述原始数据转换为目标平台API接口能够接收的格式。根据元数据配置,目标平台的API接口要求使用`POST`方法,并且需要进行ID检查(`idCheck: true`)。因此,我们需要确保每条记录都包含一个唯一标识符。 在这个过程中,可以使用轻易云集成平台提供的数据转换工具,将原始数据映射到目标格式。例如,假设目标API接口期望的数据格式如下: ```json { "operation": "insert", "data": [ {"id": "M001", "item_name": "Material A", "stock_qty": 100, "unit_price": 10.5}, {"id": "M002", "item_name": "Material B", "stock_qty": 200, "unit_price": 20.0} ] } ``` 我们可以编写如下转换逻辑: 1. 将`material_id`映射为`id` 2. 将`name`映射为`item_name` 3. 将`quantity`映射为`stock_qty` 4. 将`price`映射为`unit_price` 转换后的数据如下: ```json { "operation": "insert", "data": [ {"id": "M001", "item_name": "Material A", "stock_qty": 100, "unit_price": 10.5}, {"id": "M002", "item_name": "Material B", "stock_qty": 200, "unit_price": 20.0} ] } ``` #### 数据写入 最后,我们使用轻易云集成平台提供的API接口,将转换后的数据写入目标系统。根据元数据配置,调用“写入空操作”API接口,具体配置如下: - API: `写入空操作` - Method: `POST` - ID Check: `true` 可以通过以下代码示例实现这一过程: ```python import requests import json url = 'https://api.qingyiyun.com/write_empty_operation' headers = {'Content-Type': 'application/json'} data = { 'operation': 'insert', 'data': [ {'id': 'M001', 'item_name': 'Material A', 'stock_qty': 100, 'unit_price': 10.5}, {'id': 'M002', 'item_name': 'Material B', 'stock_qty': 200, 'unit_price': 20.0} ] } response = requests.post(url, headers=headers, data=json.dumps(data)) if response.status_code == 200: print("Data written successfully") else: print("Failed to write data:", response.text) ``` 在这个过程中,需要特别注意以下几点: 1. **ID检查**:确保每条记录都包含唯一标识符,以满足目标API接口的要求。 2. **错误处理**:在实际应用中,应增加错误处理机制,以应对可能出现的网络故障或其他异常情况。 3. **性能优化**:对于大批量数据,可以考虑分批次提交,以避免单次请求的数据量过大导致超时或失败。 通过以上步骤,我们成功地将源平台的物料查询数据进行了ETL转换,并写入了轻易云集成平台,实现了不同系统间的数据无缝对接。 ![打通钉钉数据接口](https://pic.qeasy.cloud/T19.png~tplv-syqr462i7n-qeasy.image)