利用轻易云平台进行ETL操作将销售数据同步至金蝶云星空

  • 轻易云集成顾问-叶威宏
### 旺店通·企业奇门数据集成到金蝶云星空案例分享:销售出库单同步 在进行旺店通·企业奇门与金蝶云星空的系统对接过程中,关键任务之一是确保销售出库单能够准确、快速且不漏单地从一个系统传输到另一个系统。为实现这一目标,我们采用了API接口`wdt.stockout.order.query.trade`用于从旺店通·企业奇门抓取出库订单数据,并通过`batchSave`接口将这些数据批量写入到金蝶云星空。 首先,面对来自旺店通·企业奇门的大量销售出库单数据,我们需要设计一种机制来定时可靠地抓取这些信息。使用轻易云的数据集成平台,可以设置定时任务,通过调用接口来获取最新的交易信息以保证数据更新实时性。同时,为应对分页和限流问题,实现高效的数据爬取至关重要。我们通过记录上次成功抓取的位置并继续处理未完成的数据片段,有效减少了请求次数,提高了性能。 在将获取到的销售订单批量写入至金蝶云星空之前,需要解决两个主要挑战:一是两套系统之间的数据格式差异,二是如何保障大规模数据写入过程中的稳定性和一致性。对于前者,通过定制化的数据映射方案,将源系统(旺店通)的字段与目标系统(金蝶)的字段相对应,以便于顺利写入。而稳定性的保障,则依赖于预设的异常处理与错误重试机制,当出现网络波动或其他异常情况时,可自动触发多次重试操作,直至成功为止。 此外,在整个流程中实时监控与日志记录也不可或缺。这不仅有助于快速发现和定位潜在问题,还能提供详实的操作记录供日后审计参考。在轻易云平台上架设了一套完整的日志管理模块,每一步操作都有据可查,大幅提升了透明度及可控性。 综上所述,通过结构化的方法以及灵活运用技术手段,使得销售出库单从旺店通·企业奇门平滑同步至金蝶云星空成为可能。本案例不仅提高了业务流程效率,也显著降低了人为干预所带来的误差风险。 ![金蝶与WMS系统接口开发配置](https://pic.qeasy.cloud/D19.png~tplv-syqr462i7n-qeasy.image) ### 使用旺店通·企业奇门接口wdt.stockout.order.query.trade获取并加工数据的技术案例 在数据集成生命周期的第一步中,调用源系统接口获取数据是至关重要的一环。本文将详细探讨如何通过轻易云数据集成平台调用旺店通·企业奇门接口`wdt.stockout.order.query.trade`来获取销售出库单数据,并对其进行初步加工。 #### 接口调用配置 首先,我们需要配置接口调用的元数据。根据提供的元数据配置,可以看到该接口采用POST方法,主要参数如下: - **api**: `wdt.stockout.order.query.trade` - **method**: `POST` - **number**: `order_no` - **id**: `order_no` - **pagination**: 支持分页,每页大小为100条记录 - **idCheck**: 启用ID检查 请求参数包括时间范围、状态、订单编号、店铺编号等,这些参数用于过滤和获取特定条件下的数据。 #### 请求参数详解 1. **时间范围参数** - `start_time`: 增量获取数据的开始时间,格式为`yyyy-MM-dd HH:mm:ss`。通常使用上次同步时间`{{LAST_SYNC_TIME|datetime}}`。 - `end_time`: 增量获取数据的结束时间,格式为`yyyy-MM-dd HH:mm:ss`。通常使用当前时间`{{CURRENT_TIME|datetime}}`。 2. **状态参数** - `status`: 用于过滤订单状态,例如5表示已取消,55表示已审核等。 3. **其他关键参数** - `src_order_no`: 系统订单编号 - `src_tid`: 原始单号 - `stockout_no`: 出库单号 - `shop_no`: 店铺编号,用于区分不同店铺的数据 - `warehouse_no`: 仓库编号,用于区分不同仓库的数据 4. **分页参数** - `page_size`: 每页返回的数据条数,默认为40,可设置为1到100之间的任意值。 - `page_no`: 页号,从0开始。 #### 数据请求与清洗 在实际操作中,通过轻易云平台配置上述请求参数后,可以发起对旺店通·企业奇门接口的调用。以下是一个示例请求体: ```json { "start_time": "2023-10-01 00:00:00", "end_time": "2023-10-01 23:59:59", "status": "95", "shop_no": "SHOP123", "warehouse_no": "WAREHOUSE456", "page_size": 100, "page_no": 0 } ``` 该请求体将获取2023年10月1日当天已发货(状态码95)的销售出库单数据,并限制每页返回100条记录,从第0页开始。 #### 数据转换与写入 在成功获取到原始数据后,需要对其进行清洗和转换,以便后续处理和存储。例如,可以将日期格式统一转换,将状态码映射为更具可读性的文本描述等。以下是一个简单的数据清洗示例: ```python def clean_data(raw_data): cleaned_data = [] for record in raw_data: cleaned_record = { "order_no": record["order_no"], "status": map_status(record["status"]), "shop_name": get_shop_name(record["shop_no"]), "warehouse_name": get_warehouse_name(record["warehouse_no"]), # 更多字段处理... } cleaned_data.append(cleaned_record) return cleaned_data def map_status(status_code): status_mapping = { "5": "已取消", "55": "已审核", "95": "已发货", # 更多状态映射... } return status_mapping.get(status_code, "未知状态") def get_shop_name(shop_no): # 假设有一个函数可以根据店铺编号获取店铺名称 return fetch_shop_name_from_db(shop_no) def get_warehouse_name(warehouse_no): # 假设有一个函数可以根据仓库编号获取仓库名称 return fetch_warehouse_name_from_db(warehouse_no) ``` 通过这种方式,可以将原始数据转换为更具业务意义的数据结构,方便后续写入目标系统或数据库。 #### 实时监控与错误处理 在整个过程中,实时监控和错误处理也是不可忽视的重要环节。轻易云平台提供了实时监控功能,可以随时查看数据流动和处理状态。一旦出现错误,可以通过日志和告警机制及时发现并处理。例如,如果某个请求失败,可以自动重试或记录错误信息以便后续分析。 ```python try: response = call_api(request_body) if response.status_code == 200: raw_data = response.json() cleaned_data = clean_data(raw_data) write_to_target_system(cleaned_data) else: log_error(response.status_code, response.text) except Exception as e: log_exception(e) ``` 通过以上步骤,我们可以高效地完成从源系统获取、清洗到转换和写入目标系统的全过程,为业务提供可靠的数据支持。 ![打通企业微信数据接口](https://pic.qeasy.cloud/S1.png~tplv-syqr462i7n-qeasy.image) ### 使用轻易云数据集成平台将销售出库单数据同步至金蝶云星空 在数据集成过程中,ETL(提取、转换、加载)是关键步骤之一。本文将详细探讨如何利用轻易云数据集成平台,将源平台的销售出库单数据转换为金蝶云星空API接口所能接收的格式,并最终写入目标平台。 #### 配置元数据 在进行ETL操作之前,需要配置元数据,以确保数据能够正确映射到目标平台的字段。以下是一个典型的元数据配置示例: ```json { "api": "batchSave", "method": "POST", "idCheck": true, "operation": { "rowsKey": "array", "rows": 1, "method": "batchArraySave" }, "request": [ { "field": "FBillTypeID", "label": "单据类型", "type": "string", "describe": "单据类型", "parser": { "name": "ConvertObjectParser", "params": "FNumber" }, "value": "XSCKD01_SYS" }, { // 其他字段配置... } ], // 其他配置... } ``` #### 数据请求与清洗 首先,我们需要从源系统中提取销售出库单的数据。这一步通常涉及调用源系统的API接口,获取原始数据。提取的数据可能包含冗余信息或格式不统一,因此需要进行清洗和标准化处理。 ```python def fetch_and_clean_data(api_endpoint, params): response = requests.get(api_endpoint, params=params) raw_data = response.json() cleaned_data = [] for record in raw_data: cleaned_record = { 'order_no': record['orderNo'], 'consign_time': record['consignTime'], 'shop_no': record['shopNo'], 'details_list': record['detailsList'], # 清洗和转换其他必要字段 } cleaned_data.append(cleaned_record) return cleaned_data ``` #### 数据转换与写入 在完成数据清洗后,下一步是将其转换为金蝶云星空API接口所能接收的格式。根据上述元数据配置,我们需要将清洗后的字段映射到目标平台的相应字段。 ```python def transform_and_write_data(cleaned_data): transformed_data = [] for record in cleaned_data: transformed_record = { 'FBillTypeID': {'FNumber': 'XSCKD01_SYS'}, 'FBillNo': f"{record['order_no']}-TC", 'FDate': record['consign_time'], 'FStockOrgId': {'FNumber': record['shop_no']}, 'FSaleOrgId': {'FNumber': record['shop_no']}, 'FCustomerID': {'FNumber': record['shop_no']}, 'FLinkPhone': record['details_list']['receiver_mobile'], 'FLinkMan': record['details_list']['receiver_name'], 'FReceiveAddress': record['receiver_address'], 'FNote': record['cs_remark'], # 转换子对象和数组 } sub_head_entity = { 'SubHeadEntity': { 'FSettleOrgID': {'FNumber': record['shop_no']}, 'FSETTLECURRID': {'FNumber': 'PRE001'} } } entity_details = [] for detail in record['details_list']: entity_detail = { 'FCustMatID': {'FNumber': detail.get('custMatId')}, 'FMaterialID': {'FNumber': detail.get('spec_no')}, 'FRealQty': detail.get('goods_count'), 'FTaxPrice': round(detail.get('paid') / detail.get('goods_count'), 6), 'FStockID': {'FNumber': detail.get('warehouse_no')}, 'FEntrynote': detail.get('remark'), 'FSoorDerno': detail.get('src_tid') } entity_details.append(entity_detail) transformed_record.update(sub_head_entity) transformed_record.update({'FEntity': entity_details}) transformed_data.append(transformed_record) # 写入目标平台 write_to_kingdee(transformed_data) def write_to_kingdee(data): api_url = "<金蝶云星空API地址>" headers = {"Content-Type": "application/json"} payload = { "FormId": "SAL_OUTSTOCK", "Operation": "Save", // 其他必要参数... "Data": data } response = requests.post(api_url, headers=headers, json=payload) if response.status_code == 200: print("Data successfully written to Kingdee Cloud.") else: print(f"Failed to write data: {response.text}") ``` #### 总结 通过上述步骤,我们成功地将源系统中的销售出库单数据经过清洗、转换,最终写入到金蝶云星空中。这个过程展示了轻易云数据集成平台在处理异构系统间的数据对接时的强大能力,特别是在ETL阶段,通过精细化的元数据配置,实现了高效且准确的数据同步。 ![打通用友BIP数据接口](https://pic.qeasy.cloud/T22.png~tplv-syqr462i7n-qeasy.image)