ETL转换与数据写入实战:从旺店通到目标平台

  • 轻易云集成顾问-黄宏棵
### 查询金蝶销售出库(来源旺店通)数据集成案例分享 在企业信息系统的复杂架构中,数据之间的无缝集成是提升业务效率和精度的关键。本文将分享如何通过轻易云数据集成平台实现金蝶云星空(Kingdee Cloud Cosmic)与旺店通(Wangdian Tong)之间的数据对接,强调技术细节和实际解决方案。 首先,我们需要处理的是从金蝶云星空获取销售出库信息,并确保该过程中没有遗漏任何订单。这一任务的核心在于正确调用executeBillQuery接口,从而可靠地抓取并分页处理大量数据信息。在这一环节,我们设置了定时机制,定期从金蝶云星空接口查询最新的销售记录。由于API具有分页和限流特性,我们编写了自动重试逻辑,以应对可能出现的数据传输错误或网络波动问题。 为了成功实现大批量数据快速写入到轻易云集成平台,需要特别注意两个方面:一是要应对来自不同系统间的数据格式差异;二是确保高效稳定的数据流转。对此,我们采用了一系列自定义映射规则来转换和规范化源数据,使其符合目标平台接受标准。同时,为了实时监控这一过程,我们部署了日志记录功能,对每次操作进行详细记录,包括成功与失败的信息以及对应异常原因分析。 还有一个重要环节就是异常处理与错误重试机制。当执行API接口调用或实际写入步骤遇到问题时,例如响应超时、返回码不正确等情况,系统会自动触发预设的错误处理流程并且尝试重新运行失败步骤,有效减少人工干预所需时间,提高整体运行效率与稳定性。 以上这些措施确保我们可以安全、快速地将来自金蝶云星空的大量销售信息顺利整合到轻易云集成平台中,为后续分析及决策提供准确及时的数据支持。在接下来的部分,我将进一步详解具体实施细节及代码示例,帮助读者更好理解整个流程中的技术操作点。 ![金蝶云星空API接口配置](https://pic.qeasy.cloud/D15.png~tplv-syqr462i7n-qeasy.image) ### 调用金蝶云星空接口executeBillQuery获取并加工数据 在数据集成的生命周期中,调用源系统接口获取数据是至关重要的一步。本文将深入探讨如何通过轻易云数据集成平台调用金蝶云星空的`executeBillQuery`接口,获取销售出库单数据并进行初步加工。 #### 接口配置与请求参数 首先,我们需要配置接口的元数据,以便正确调用金蝶云星空的API。以下是关键的元数据配置: ```json { "api": "executeBillQuery", "effect": "QUERY", "method": "POST", "number": "FBillNo", "id": "FEntity_FENTRYID", "name": "FID", "idCheck": true, "request": [ {"field":"FID","label":"FID","type":"string","describe":"FID","value":"FID"}, {"field":"FBillTypeID_FNumber","label":"单据类型","type":"string","describe":"标准销售出库单:XSCKD01_SYS\n寄售销售出库单:XSCKD02_SYS\n零售销售出库单:XSCKD03_SYS\n标准销售出库单:XSCKD04_SYS\nVMI销售出库单:XSCKD05_SYS\n标准销售出库单:XSCKD06_SYS\nB2C销售出库单:XSCKD07_SYS\n标准销售出库单:XSCKD08_SYS","value":"FBillTypeID.FNumber"}, {"field":"FBillNo","label":"单据编号","type":"string","describe":"单据编号","value":"FBillNo"}, {"field":"FEntity_FENTRYID","label":"FEntity_FENTRYID","type":"string","describe":"单据编号","value":"FEntity_FENTRYID"}, {"field":"FDate","label":"日期","type":"string","describe":"日期","value":"FDate"}, {"field":"FSaleOrgId_FNumber","label":"销售组织","type":"string","describe":"组织","value":"FSaleOrgId.FNumber"}, {"field":"FCustomerID_FNumber","label":"客户","type":"string","describe":"基础资料","value":"FCustomerID.FNumber"} ], "otherRequest": [ {"field": "Limit", "label": "最大行数", "type": "string", "describe": "金蝶的查询分页参数", "value": "{PAGINATION_PAGE_SIZE}"}, {"field": "StartRow", "label": "开始行索引", "type": "int", "describe": "金蝶的查询分页参数", "value": "{PAGINATION_START_ROW}"}, {"field": "FilterString", "label": "过滤条件", "type": "string", "describe": "示例写法 FSupplierId.FNumber = 'VEN00010' and FApproveDate>=", "value": "F_PRSH_FROM = '旺店通' and FRealQty <> FARJoinQty and FDocumentStatus='C'"}, {"field": "FieldKeys", "label": "需查询的字段key集合", "type": "array", "describe": "金蝶分录主键ID格式:FPOOrderEntry_FEntryId,其它格式 FPurchaseOrgId.FNumber", "parser": { "name": "ArrayToString", "params": "," } }, {"field": FormId, label: 业务对象表单Id, type: string, describe: 必须填写金蝶的表单ID如:PUR_PurchaseOrder, value: SAL_OUTSTOCK } ], buildModel: true, autoFillResponse: true } ``` #### 请求示例 通过上述元数据配置,我们可以构建一个请求示例。假设我们需要查询所有状态为“已审核”的销售出库单,并且这些单据来源于“旺店通”且实发数量不等于累计应收数量。请求体如下: ```json { FormId: 'SAL_OUTSTOCK', FieldKeys: ['FBillNo', 'FDate', 'FSaleOrgId.FNumber', 'FCustomerID.FNumber'], FilterString: 'F_PRSH_FROM = \'旺店通\' and FRealQty <> FARJoinQty and FDocumentStatus=\'C\'', Limit: 100, StartRow: 0 } ``` #### 数据处理与清洗 在获取到原始数据后,需要对其进行初步清洗和加工。例如,将日期格式化、将组织编码转换为名称等。以下是一个简单的数据处理示例: ```python import json from datetime import datetime def process_data(raw_data): processed_data = [] for record in raw_data: processed_record = {} processed_record['BillNo'] = record['FBillNo'] processed_record['Date'] = datetime.strptime(record['FDate'], '%Y-%m-%d').strftime('%d/%m/%Y') processed_record['SaleOrg'] = get_org_name(record['FSaleOrgId.FNumber']) processed_record['Customer'] = get_customer_name(record['FCustomerID.FNumber']) processed_data.append(processed_record) return processed_data def get_org_name(org_number): # 模拟从数据库或其他服务获取组织名称 org_map = {'ORG001': '北京总部', 'ORG002': '上海分部'} return org_map.get(org_number, '未知组织') def get_customer_name(customer_number): # 模拟从数据库或其他服务获取客户名称 customer_map = {'CUST001': '客户A', 'CUST002': '客户B'} return customer_map.get(customer_number, '未知客户') # 示例原始数据 raw_data = [ {'FBillNo': 'SO20230101', 'FDate': '2023-01-01', 'FSaleOrgId.FNumber': 'ORG001', 'FCustomerID.FNumber': 'CUST001'}, ] processed_data = process_data(raw_data) print(json.dumps(processed_data, ensure_ascii=False, indent=2)) ``` 上述代码展示了如何对从金蝶云星空获取的数据进行初步清洗和加工,确保数据在后续流程中的一致性和可用性。 #### 小结 通过轻易云数据集成平台调用金蝶云星空接口`executeBillQuery`,我们能够高效地获取所需的销售出库单数据,并对其进行必要的清洗和加工。这一步骤为后续的数据转换与写入奠定了坚实基础。在实际应用中,可以根据具体业务需求进一步扩展和优化数据处理逻辑。 ![如何对接企业微信API接口](https://pic.qeasy.cloud/S20.png~tplv-syqr462i7n-qeasy.image) ### 轻易云数据集成平台的ETL转换与写入目标平台技术案例 在数据集成的生命周期中,ETL(Extract, Transform, Load)转换是关键的一步。本文将详细探讨如何使用轻易云数据集成平台将已经集成的源平台数据进行ETL转换,并通过API接口写入目标平台。 #### 数据请求与清洗 在开始ETL转换之前,首先需要从源平台(如旺店通)获取原始数据。假设我们需要查询金蝶销售出库的数据,这一步可以通过API调用来实现。获取的数据可能包含多个字段和复杂的结构,因此需要进行清洗和预处理,以便后续的转换操作。 ```json { "api": "查询金蝶销售出库", "effect": "FETCH", "method": "GET", "params": { "startDate": "2023-01-01", "endDate": "2023-01-31" } } ``` #### 数据转换 一旦获得了原始数据,下一步是对其进行转换,使其符合目标平台API接口所能接收的格式。在这个过程中,我们需要根据目标平台的元数据配置来调整字段名称、数据类型和结构。 假设我们从源平台获取到的数据如下: ```json { "salesOrderId": "12345", "customerName": "张三", "productCode": "P001", "quantity": 10, "price": 100.0, "orderDate": "2023-01-15" } ``` 根据轻易云集成平台API接口的元数据配置,我们需要将这些字段映射到目标格式: ```json { "api": "写入空操作", "effect": "EXECUTE", "method": "POST", "idCheck": true } ``` 我们可以编写一个简单的转换脚本,将源数据映射到目标格式: ```python def transform_data(source_data): transformed_data = { "operationType": source_data["salesOrderId"], "clientName": source_data["customerName"], "itemCode": source_data["productCode"], "amount": source_data["quantity"], "unitPrice": source_data["price"], "transactionDate": source_data["orderDate"] } return transformed_data ``` 经过上述转换后,得到的数据格式如下: ```json { "operationType": "12345", "clientName": "张三", "itemCode": "P001", "amount": 10, "unitPrice": 100.0, "transactionDate": "2023-01-15" } ``` #### 数据写入 最后一步是将转换后的数据通过API接口写入目标平台。根据元数据配置中的信息,我们可以使用HTTP POST方法发送请求,并确保启用了ID检查功能。 以下是一个示例请求: ```python import requests url = 'https://api.qingyiyun.com/execute' headers = {'Content-Type': 'application/json'} data = transform_data(source_data) response = requests.post(url, headers=headers, json=data) if response.status_code == 200: print("Data successfully written to target platform.") else: print(f"Failed to write data: {response.text}") ``` 在这个过程中,需要特别注意以下几点: 1. **API URL**:确保URL正确无误。 2. **Headers**:设置合适的请求头,如`Content-Type`为`application/json`。 3. **Data**:传递经过转换后的JSON数据。 4. **Error Handling**:处理可能出现的错误,如网络问题或API响应异常。 通过以上步骤,我们成功地完成了从源平台到目标平台的数据ETL转换和写入过程。这不仅提高了数据处理的效率,也确保了数据的一致性和准确性。 ![企业微信与OA系统接口开发配置](https://pic.qeasy.cloud/T30.png~tplv-syqr462i7n-qeasy.image)