利用轻易云实现ETL转换:将数据写入金蝶云星空API

  • 轻易云集成顾问-曹润
### 旺店通·旗舰奇门数据集成到金蝶云星空的技术实践 在企业日常运营过程中,系统之间的数据对接与集成是确保信息流畅和业务高效运转的重要环节。本案例将详细探讨如何利用轻易云数据集成平台,实现旺店通·旗舰奇门的数据无缝对接至金蝶云星空。具体方案名称为:柏为销售出库单07.25。 #### 系统接口及要求概述 为了保证数据从旺店通·旗舰奇门顺利写入到金蝶云星空,我们主要使用了两大API接口:一是用于获取销售出库单详情的`wdt.wms.stockout.sales.querywithdetail`;二是用于批量保存数据到金蝶云星空的`batchSave`。此次集成任务必须解决以下几个关键技术问题: 1. **高吞吐量的数据处理能力**: 我们需要确保大量销售出库单能够快速且准确地被提取并传输至目标系统,这要求我们在接口调用时能支持高频次、大容量的数据写入操作。 2. **分页与限流机制**: 由于API接口有分页和请求限流限制,需要合理设计请求策略,确保所有待处理数据都能完整、及时地被抓取,以及避免因超发送导致请求失败。 3. **自定义数据转换逻辑**: 在将旺店通·旗舰奇门中的原始数据信息写入到金蝶云星空前,需要根据实际业务需求进行必要的数据格式转换,以满足目标系统的标准。 4. **实时监控与异常处理**: 为保障整个集成流程的稳定性,必须实现对每个步骤中发生的问题实时监测,并配备完善的错误重试机制。一旦出现异常状况,可以迅速定位并采取补救措施。 #### 数据获取与转换过程 首先,通过调用`wdt.wms.stockout.sales.querywithdetail` API,从旺店通·旗舰奇门获取所需的销售出库单明细信息。在这个过程中,为应对可能会遇到的大量请求,我们采用了定期调度任务来分段抓取新生成或变更的数据,同时应用合理化分页策略以规避查询速率限制。 在获得原始数据后,下一步便是进行必要的数据清洗和格式转化。这一步骤尤为重要,因为不同系统间的信息结构往往存在差异。如果直接导入未经处理的数据,很可能引发存储错误或影响后续分析。因此,在这里我们设计了一套针对特定字段进行映射以及值域校验的方法,以此来统一标准,使之符合金蝶云星空 `batchSave` API 的输入要求。 ![金蝶与WMS系统接口开发配置](https://pic.qeasy.cloud/D26.png~tplv-syqr462i7n-qeasy.image) ### 调用旺店通·旗舰奇门接口获取并加工数据的技术案例 在数据集成过程中,调用源系统的API接口是关键的一步。本文将详细探讨如何通过轻易云数据集成平台调用旺店通·旗舰奇门接口`wdt.wms.stockout.sales.querywithdetail`,获取销售出库单数据并进行初步加工。 #### 接口调用配置 首先,我们需要配置API接口的请求参数。根据提供的元数据配置,接口调用采用POST方法,主要参数如下: - `api`: `wdt.wms.stockout.sales.querywithdetail` - `method`: `POST` - `number`: `order_no` - `id`: `stockout_id` - `idCheck`: true 请求参数分为分页参数和业务参数两部分: 1. **分页参数**: - `page_size`: 每页返回的数据条数,设置为50。 - `page_no`: 页号,从1开始。 2. **业务参数**: - `start_time`: 开始时间,使用动态值`{{MINUTE_AGO_30|datetime}}`表示当前时间前30分钟。 - `end_time`: 结束时间,使用动态值`{{CURRENT_TIME|datetime}}`表示当前时间。 - `status_type`: 出库单状态,设置为3表示按照指定的status状态字段查询。 - `status`: 出库单状态详细,设置为110。 - 其他可选参数如仓库编码、出库单编号、店铺编号、销售订单号等,根据实际需求填写。 示例请求体如下: ```json { "pager": { "page_size": 50, "page_no": 1 }, "params": { "start_time": "{{MINUTE_AGO_30|datetime}}", "end_time": "{{CURRENT_TIME|datetime}}", "status_type": "3", "status": "110", "warehouse_no": "", "stockout_no": "", "shop_nos": "", "src_order_no": "", "need_sn": "", "position": "" } } ``` #### 数据清洗与转换 在获取到原始数据后,需要对数据进行清洗和转换,以便后续处理和存储。以下是常见的数据清洗与转换步骤: 1. **字段筛选**: 根据业务需求筛选出需要的字段,例如订单编号、出库单编号、仓库编码、商品信息等。 2. **数据格式转换**: 将日期时间字段统一转换为标准格式,将数值字段转换为整数或浮点数。 3. **异常数据处理**: 对于缺失值或异常值进行处理,例如填充默认值或删除异常记录。 4. **字段重命名**: 根据目标系统要求,对字段名称进行重命名,以确保一致性。 示例代码(伪代码)如下: ```python def clean_and_transform(data): cleaned_data = [] for record in data: cleaned_record = { 'order_no': record.get('order_no'), 'stockout_id': record.get('stockout_id'), 'warehouse_no': record.get('warehouse_no'), 'item_list': transform_items(record.get('item_list')), 'created_at': format_datetime(record.get('created_at')), # 其他需要处理的字段 } cleaned_data.append(cleaned_record) return cleaned_data def transform_items(items): transformed_items = [] for item in items: transformed_item = { 'item_code': item.get('item_code'), 'quantity': int(item.get('quantity', 0)), # 其他需要处理的字段 } transformed_items.append(transformed_item) return transformed_items def format_datetime(dt_str): # 将日期时间字符串转换为标准格式 return datetime.strptime(dt_str, '%Y-%m-%d %H:%M:%S').isoformat() ``` #### 数据写入 经过清洗和转换后的数据,可以通过轻易云平台写入到目标系统中。写入过程包括以下步骤: 1. **建立连接**: 与目标系统建立连接,确保能够正常写入数据。 2. **批量写入**: 将清洗后的数据按批次写入,提高效率。 3. **错误处理**: 对于写入失败的数据进行记录和重试,以确保数据完整性。 示例代码(伪代码)如下: ```python def write_to_target_system(cleaned_data): connection = establish_connection(target_system_config) for batch in batch_data(cleaned_data, batch_size=100): try: connection.write(batch) except Exception as e: log_error(e, batch) retry_write(batch) def establish_connection(config): # 建立与目标系统的连接 pass def batch_data(data, batch_size): # 将数据按批次分割 for i in range(0, len(data), batch_size): yield data[i:i + batch_size] def log_error(error, data): # 记录错误日志 pass def retry_write(data): # 重试写入失败的数据 pass ``` 通过以上步骤,我们实现了从旺店通·旗舰奇门接口获取销售出库单数据,并进行了初步的数据清洗和转换,为后续的数据处理和分析打下了坚实基础。在实际应用中,可以根据具体需求进一步优化和扩展这些操作。 ![金蝶与外部系统打通接口](https://pic.qeasy.cloud/S14.png~tplv-syqr462i7n-qeasy.image) ### 使用轻易云数据集成平台进行ETL转换并写入金蝶云星空API接口 在数据集成的生命周期中,ETL(提取、转换、加载)是关键步骤之一。本文将详细探讨如何通过轻易云数据集成平台将源平台的数据转换为金蝶云星空API接口所能接收的格式,并最终写入目标平台。 #### 配置元数据 首先,我们需要了解如何配置元数据,以便正确地将源数据映射到目标平台的API接口。以下是一个典型的元数据配置示例: ```json { "api": "batchSave", "method": "POST", "idCheck": true, "operation": { "rowsKey": "array", "rows": 20, "method": "batchArraySave" }, "request": [ { "field": "FBillTypeID", "label": "单据类型 ", "type": "string", "describe": "单据类型", "value": "XSCKD07_SYS", "parser": { "name": "ConvertObjectParser", "params": "FNumber" } }, { ... } ], ... } ``` #### 数据字段解析与转换 在上述配置中,每个字段都有特定的属性和解析方式。例如,`FBillTypeID`字段使用了`ConvertObjectParser`解析器,将值转换为目标系统所需的格式。 ```json { "field": "FBillTypeID", "label": "单据类型 ", ... "parser": { "name": "ConvertObjectParser", "params": "FNumber" } } ``` 这种解析器可以确保字段值符合金蝶云星空API接口的要求。 #### 子实体与数组处理 对于复杂的数据结构,如子实体和数组,需要特别处理。例如,`FEntity`字段包含多个子字段,每个子字段都需要单独映射和解析: ```json { ... { "field": "FEntity", ... children: [ { ... { field: 'FMaterialID', label: '物料编码 ', type: 'string', describe: '基础资料', parser: { name: 'ConvertObjectParser', params: 'FNumber' }, value: '{{details_list.goods_no}}', parent: 'FEntity' }, ... } ] } } ``` 这里,`FMaterialID`字段使用了模板变量`{{details_list.goods_no}}`,从源数据中提取具体值,并通过解析器进行转换。 #### 特殊逻辑处理 有些字段需要特殊逻辑处理,例如根据条件判断是否为赠品: ```json { parent: 'FEntity', label: '是否赠品', field: 'FIsFree', type: 'string', value: "_function case '{{details_list.gift_type}}' when '0' then false else true end" } ``` 这种情况下,通过自定义函数实现复杂的逻辑判断和转换。 #### 最终请求构建 所有字段配置完成后,构建最终请求并发送到金蝶云星空API接口。以下是一个完整的请求示例: ```json { FormId: 'SAL_OUTSTOCK', Operation: 'Save', IsAutoSubmitAndAudit: true, IsVerifyBaseDataField: true, SubSystemId: '21', InterationFlags: 'STK_InvCheckResult', BatchCount: '5', Model: { FBillTypeID: { FNumber: 'XSCKD07_SYS' }, FBillNo: '{order_no}', FDate: '{consign_time}', FSaleOrgId: { FNumber: '100' }, FCustomerID: { FNumber: '{shop_no}' }, Fnote: '{remark}', FStockOrgId:{ FNumber:'100'}, SubHeadEntity:{ FSettleOrgID:{FNumber:'100'} }, FEntity:[ { FMaterialID:{FNumber:'{{details_list.goods_no}}'}, FRealQty:'{{details_list.goods_count}}', ... } ], ... } } ``` 通过轻易云数据集成平台,我们可以高效地完成从源数据到目标API接口的数据转换和写入过程。这不仅简化了复杂的数据处理流程,还确保了数据的一致性和准确性。 ![用友与SCM系统接口开发配置](https://pic.qeasy.cloud/T13.png~tplv-syqr462i7n-qeasy.image)