轻易云数据集成平台实施销售退货数据ETL与写入金蝶云星空

  • 轻易云集成顾问-钟家寿
### 销售退货105v2_销售退货_联查a:吉客云数据集成到金蝶云星空的技术案例分享 在实际项目中,如何高效、安全地将吉客云中的销售退货数据集成到金蝶云星空系统,是企业提升业务透明度和效率的关键。本文将深入探讨一个具体方案:“销售退货105v2_销售退货_联查a”,展示从数据获取、处理到最终写入全过程。 首先,我们需要通过调用吉客云提供的API接口`erp.storage.goodsdocin.v2`来抓取相关数据。这是整个流程第一步,也是至关重要的一环。为了确保不漏单,并且能够处理大量的数据,我们设计了一个定时触发机制,通过批量请求和分页处理来有效管理大规模的数据流动。同时,为了避免API限流问题,对每个请求进行了频率控制。 在拿到吉客云的数据后,需要对其进行预处理,以满足金蝶云星空的格式要求。例如,针对字段映射关系的不一致问题,我们实现了一套定制化的数据转换规则,使得每个字段都能准确无误地匹配。此外,通过实时监控与日志记录功能,可以跟踪整个数据处理过程,从而及时发现并解决潜在的问题。 接下来,在完成必要的预处理之后,将这些数据通过金蝶云星空提供的批量写入接口`batchSave`进行导入。在这一环节中,为保证高效性,我们使用了多线程技术,大幅提高了数据写入速度。同时,考虑到可能会出现网络故障或其他异常情况,还设计了一套完善的错误重试机制,以确保所有的数据都能顺利完成上传。 此外,对于这些步骤中的任何异常,全程都有详细日志记录,这不仅有助于排错,也为日后的优化迭代提供了可靠依据。 本文开头介绍了“销售退货105v2_销售退货_联查a”项目中涉及的一些核心技术点和策略,接下来我们会更详细地讲解各个阶段具体实施细节及代码示例。 ![如何开发企业微信API接口](https://pic.qeasy.cloud/D13.png~tplv-syqr462i7n-qeasy.image) ### 调用吉客云接口erp.storage.goodsdocin.v2获取并加工数据 在轻易云数据集成平台的生命周期中,调用源系统接口是数据处理的第一步。本文将详细探讨如何通过调用吉客云接口`erp.storage.goodsdocin.v2`来获取并加工数据。 #### 接口配置与请求参数 根据元数据配置,我们需要通过POST方法调用吉客云的`erp.storage.goodsdocin.v2`接口。以下是请求参数的详细配置: - **pageSize**: 每页显示的数据量,由变量`{PAGINATION_PAGE_SIZE}`动态赋值。 - **pageIndex**: 当前页码,由变量`{PAGINATION_START_PAGE}`动态赋值。 - **startDate**: 创建时间的起始时间,通过函数`from_unixtime(({LAST_SYNC_TIME}-18000),'%Y-%m-%d %H:%i:%s')`计算得出。 - **endDate**: 创建时间的结束时间,通过函数`from_unixtime(({CURRENT_TIME}-7200),'%Y-%m-%d %H:%i:%s')`计算得出。 - **goodsDocNo**: 入库单号,可选字段。 - **sourceBillNo**: 原始单号,可选字段。 - **inouttype**: 入库类型,固定值为105(销售退货)。 - **warehouseCode**: 仓库编号,可选字段。 - **vendCode**: 供应商编号(往来单位),可选字段。 - **billNo**: 上游单据号(关联单号),可选字段。 - **userName**: 创建人名称,可选字段。 - **selelctFields**: 需要返回的字段,包含多个子类属性,用逗号拼接。 #### 请求示例 以下是一个完整的请求示例: ```json { "pageSize": "{PAGINATION_PAGE_SIZE}", "pageIndex": "{PAGINATION_START_PAGE}", "startDate": "_function from_unixtime(({LAST_SYNC_TIME}-18000),'%Y-%m-%d %H:%i:%s')", "endDate": "_function from_unixtime(({CURRENT_TIME}-7200),'%Y-%m-%d %H:%i:%s')", "goodsDocNo": "", "sourceBillNo": "", "inouttype": 105, "warehouseCode": "", "vendCode": "", "billNo": "", "userName": "", "selelctFields": "recId,goodsdocNo,billNo,inOutDate,gmtCreate,inouttype,inouttypeName,vendCustomerCode,vendCustomerName,currencyCode,currencyRate,userName,warehouseCode,warehouseName,comment,memo,logisticName,logisticNo,companyId,companyName,logisticType,logisticCode,inOutReason,sourceBillNo,channelId,channelCode,channelName,redStatus,field1,field2,field3,field4,field5,financeBillStatus,applyCompanyId,applyCompanyName,applyCompanyCode,applyDepartId,applyDepartName,departCode,applyUserId,applyUserName,outBillNo,gmtModified,companyCode,vendCustomerId,..." } ``` #### 数据清洗与转换 在获取到原始数据后,需要进行数据清洗和转换,以确保数据质量和一致性。以下是一些常见的数据清洗步骤: 1. **去重处理**:确保每条记录唯一,可以通过主键或唯一标识符如`recId`进行去重。 2. **格式转换**:将日期、时间等字段转换为统一格式,便于后续处理。例如,将所有日期格式化为`YYYY-MM-DD HH:mm:ss`。 3. **空值处理**:对于必填字段,如果存在空值,需要进行填充或删除操作。 #### 数据写入与存储 经过清洗和转换的数据需要写入目标系统或数据库。在此过程中,可以使用轻易云平台提供的数据写入功能,将处理后的数据无缝对接到目标系统。 #### 自动补救机制 为了确保数据集成过程的可靠性,元数据配置中还定义了自动补救机制。当出现异常情况时,系统会根据设定的crontab规则自动重新发起请求,并调整请求参数中的时间范围,以确保数据完整性。 例如: ```json { "crontab": "10 */2 * * *", "takeOverRequest": [ { "field": "startDate", "value": "_function from_unixtime(({CURRENT_TIME}-86400-86400),'%Y-%m-%d %H:%i:%s')", "type": "string" }, { "field": "endDate", "value": "_function from_unixtime(({CURRENT_TIME}-86400-7200),'%Y-%m-%d %H:%i:%s')", "type": "string" } ] } ``` 通过以上配置,可以有效应对网络波动、系统故障等异常情况,保证数据集成过程的连续性和稳定性。 #### 条件过滤 在元数据配置中,还可以设置条件过滤器。例如,只保留数量大于0的记录: ```json { "condition":[ [ {"field":"goodsDocDetailList.quantity","logic":"gt","value":"0"} ] ] } ``` 通过这种方式,可以进一步提高数据质量,减少无效数据对系统性能的影响。 综上所述,通过合理配置和调用吉客云接口`erp.storage.goodsdocin.v2`,并结合轻易云平台的数据清洗、转换和自动补救机制,可以高效地实现不同系统间的数据集成,为业务决策提供可靠的数据支持。 ![打通钉钉数据接口](https://pic.qeasy.cloud/S16.png~tplv-syqr462i7n-qeasy.image) ### 使用轻易云数据集成平台进行销售退货数据的ETL转换与写入金蝶云星空API接口 在数据集成生命周期的第二步,我们需要将已经集成的源平台数据进行ETL转换,转为目标平台金蝶云星空API接口所能够接收的格式,并最终写入目标平台。以下将详细介绍如何利用轻易云数据集成平台的元数据配置完成这一过程。 #### 元数据配置解析 元数据配置是整个ETL过程中的核心,它定义了从源系统到目标系统的数据映射和转换规则。以下是对元数据配置中各个字段和操作的详细解析: 1. **基本请求信息** - `api`: "batchSave" - 目标API接口。 - `method`: "POST" - HTTP请求方法。 - `effect`: "EXECUTE" - 操作类型。 - `number`, `id`, `name`: "FBillNo" - 唯一标识符字段。 2. **操作配置** - `operation`: 定义了批量保存操作,`rowsKey`为"array",每次处理10行数据,使用的方法为`batchArraySave`。 3. **请求字段映射** 请求字段定义了从源系统到目标系统的数据映射规则,包括字段名称、类型、描述及其值来源。例如: ```json { "field": "FBillTypeID", "label": "单据类型", "type": "string", "describe": "单据类型", "parser": {"name": "ConvertObjectParser", "params": "FNumber"}, "value": "XSTHD01_SYS" } ``` 这个配置表示将源系统中的单据类型字段映射到目标系统中的`FBillTypeID`字段,并使用`ConvertObjectParser`解析器进行转换。 4. **特殊字段处理** 对于一些复杂的数据处理需求,可以使用函数或查询来获取值。例如: ```json { "field": "FDate", "label": "日期", "type": "datetime", "describe": "日期", "value": "_function DATE_FORMAT('{inOutDate}', '%Y-%m-%d')" } ``` 这个配置表示将日期字段格式化为`YYYY-MM-DD`格式。 5. **嵌套结构处理** 对于嵌套结构的数据,如明细信息和财务信息,需要定义子字段。例如: ```json { "field": "FEntity", "label": "明细信息", ... "children": [ { ... { "field": "FMaterialId", ... ... } } ... ] } ``` #### 数据转换与写入流程 1. **数据请求与清洗** 首先,通过轻易云平台从源系统请求原始数据,并进行初步清洗。此步骤确保数据完整性和一致性,为后续的ETL转换打下基础。 2. **ETL转换** 根据元数据配置,将清洗后的源数据进行ETL转换。具体步骤包括: - 字段映射:根据配置将源系统字段映射到目标系统对应字段。 - 数据解析:使用指定解析器(如`ConvertObjectParser`)对某些字段进行必要的转换。 - 特殊处理:对于需要特殊处理的字段(如日期格式化、查询获取值等),按照配置进行相应操作。 3. **写入目标平台** 转换后的数据通过HTTP POST请求写入金蝶云星空API接口。此步骤确保所有转换后的数据准确无误地存储在目标系统中。 #### 示例代码片段 以下是一个示例代码片段,展示如何利用上述元数据配置进行ETL转换和写入操作: ```python import requests import json # 定义元数据配置 metadata = { # ... (上文提供的元数据配置) } # 获取源系统的数据(假设已获取并存储在source_data变量中) source_data = get_source_data() # 数据转换 transformed_data = [] for record in source_data: transformed_record = {} for field in metadata['request']: field_name = field['field'] if 'value' in field: value = field['value'] # 处理特殊函数和查询 if value.startswith('_function'): value = eval_function(value, record) elif value.startswith('_mongoQuery'): value = execute_mongo_query(value, record) else: value = record.get(value.strip('{}'), None) transformed_record[field_name] = value transformed_data.append(transformed_record) # 写入目标平台 url = f"https://api.kingdee.com/{metadata['api']}" headers = {'Content-Type': 'application/json'} response = requests.post(url, headers=headers, data=json.dumps(transformed_data)) if response.status_code == 200: print("Data successfully written to Kingdee Cloud") else: print(f"Failed to write data: {response.text}") ``` 通过上述步骤,我们可以实现从源系统到金蝶云星空API接口的数据ETL转换与写入。这不仅提高了业务流程的自动化程度,还确保了数据的一致性和准确性。 ![如何对接钉钉API接口](https://pic.qeasy.cloud/T22.png~tplv-syqr462i7n-qeasy.image)