使用轻易云实现吉客云到金蝶云的数据ETL转换指南

  • 轻易云集成顾问-李国敏
### 吉客云数据集成到金蝶云星空的JY-BDS采购入库方案 在企业信息系统的日常运作中,如何高效、可靠地实现不同系统之间的数据对接是一个关键性问题。本文将详细探讨吉客云与金蝶云星空之间的数据集成案例——JY-BDS采购入库方案。在该方案中,我们利用轻易云数据集成平台,实现了两大核心系统间的无缝对接。 首先,该方案需要处理从吉客云获取ERP存货单据(erp.storage.goodsdocin)并将其批量写入到金蝶云星空(batchSave)。为了确保数据能够高吞吐量地准确传递,我们使用了以下技术要点: 1. **定时抓取和实时监控**:通过轻易云的平台调度机制,定时可靠地抓取吉客云接口数据,同时进行实时监控和日志记录,确保每一条数据都不漏单。 2. **自定义转换逻辑**:由于吉客云与金蝶云星空API所需的数据格式存在差异,我们进行了定制化的数据映射,以适应特定业务需求。从而保证输出符合目标系统预期。 3. **分页及限流处理**:为了解决吉客云接口在大量数据读取时可能遇到的分页及限流问题,在抓取过程中引入分页控制策略,并结合限流阈值进行优化配置,使得整个过程既安全又高效。 4. **异常处理和重试机制**:针对可能出现的网络波动或其他异常情况,本次实施过程中设计了一套完善的错误捕获和重试机制,一旦检测到失败便会自动触发重试流程,最大程度上提高了任务成功率。 5. **集中式监控告警体系**:提供集中式的监控和告警功能,可以实时跟踪所执行任务状态,通过可视化界面展示执行结果,并及时通知相关负责人进行处置。这种透明管理极大提升了系统维护效率。 以上描述只是这次集成项目的一部分内容,在后续章节中,将深入细节探讨实现这些功能的方法论及工具应用。 ![系统集成平台API接口配置](https://pic.qeasy.cloud/D14.png~tplv-syqr462i7n-qeasy.image) ### 调用吉客云接口erp.storage.goodsdocin获取并加工数据 在数据集成的生命周期中,第一步是调用源系统的API接口以获取原始数据。本文将详细探讨如何通过轻易云数据集成平台调用吉客云的`erp.storage.goodsdocin`接口,并对获取的数据进行初步加工。 #### 接口调用配置 根据元数据配置,我们需要设置以下参数来调用`erp.storage.goodsdocin`接口: - **API**: `erp.storage.goodsdocin` - **请求方法**: `POST` - **分页设置**: 每页50条记录 - **入库类型**: 101(采购入库) 请求参数包括分页信息、时间范围、仓库和供应商信息等。以下是一个典型的请求配置示例: ```json { "api": "erp.storage.goodsdocin", "method": "POST", "pagination": { "pageSize": 50 }, "request": [ {"field": "pageIndex", "label": "分页页码", "type": "string"}, {"field": "pageSize", "label": "分页页数", "type": "string", "value": "50"}, {"field": "goodsDocNo", "label": "入库单号", "type": "string"}, {"field": "startDate", "label": "创建时间的起始时间", "type": "string", "value":"{{LAST_SYNC_TIME|datetime}}"}, {"field": "endDate", "label": "创建时间的结束时间", "type":"string","value":"{{CURRENT_TIME|datetime}}"}, {"field":"inouttype","label":"入库类型","type":"string","value":"101"}, {"field":"warehouseId","label":"仓库ID","type":"string"}, {"field":"warehouseCode","label":"仓库编号","type":"string"}, {"field":"vendId","label":"供应商ID(往来单位)","type":"string"}, {"field":"vendCode","label":"供应商编号(往来单位)","type":"string"}, {"field":"billNo","label":"上游单据号(关联单号)","type":"string"}, {"field":"userName","label":"创建人名称","type":"string"}, {"field":"gmtModifiedStart","label":"主表更新时间起始","type":"string"}, {"field":"gmtModifiedEnd","label":"主表更新时间截至","type":"string"} ] } ``` #### 数据请求与清洗 在实际操作中,首先需要通过上述配置向吉客云发送HTTP POST请求以获取采购入库数据。假设我们已经成功获取了响应数据,接下来需要对这些数据进行清洗和初步加工。 1. **字段映射与转换**: - 将原始字段映射到目标系统所需的字段。例如,将`goodsDocNo`映射为目标系统中的`documentNumber`。 - 对日期格式进行标准化处理,将不同格式的日期统一转换为ISO8601标准格式。 2. **去重与校验**: - 根据元数据配置中的`idCheck`参数,确保每条记录唯一性。可以使用`recId`作为唯一标识符进行去重。 - 校验关键字段是否存在空值或不合法值,例如确保每条记录都有有效的`warehouseId`和`supplierId`。 3. **异常处理**: - 对于缺失或错误的数据,记录日志并进行相应处理。例如,如果某条记录缺少必要的供应商信息,可以选择丢弃该记录并在日志中注明原因。 以下是一个简单的数据清洗示例代码(伪代码): ```python def clean_data(raw_data): cleaned_data = [] for record in raw_data: if not record.get('warehouseId') or not record.get('vendId'): log_error(f"Missing warehouseId or vendId in record {record['recId']}") continue cleaned_record = { 'documentNumber': record['goodsDocNo'], 'warehouseId': record['warehouseId'], 'supplierId': record['vendId'], 'createdDate': convert_to_iso8601(record['startDate']), 'modifiedDate': convert_to_iso8601(record['gmtModifiedStart']) } cleaned_data.append(cleaned_record) return cleaned_data ``` #### 数据转换与写入 完成数据清洗后,下一步是将清洗后的数据转换为目标系统所需的格式,并写入目标数据库或系统。在此过程中,需要注意以下几点: - 确保所有必需字段都已正确映射和填充。 - 根据目标系统的要求,对数据进行进一步转换,例如数值单位转换、字符串格式调整等。 - 使用批量写入方式提高效率,并确保在写入过程中有适当的错误处理机制。 通过上述步骤,我们可以高效地从吉客云获取采购入库数据,并对其进行清洗和初步加工,为后续的数据集成过程打下坚实基础。 ![打通企业微信数据接口](https://pic.qeasy.cloud/S4.png~tplv-syqr462i7n-qeasy.image) ### 使用轻易云数据集成平台进行ETL转换并写入金蝶云星空API接口 在数据集成的生命周期中,第二步是将已经集成的源平台数据进行ETL(提取、转换、加载)转换,并将其转为目标平台所能够接收的格式。本文将详细探讨如何使用轻易云数据集成平台,将采购入库数据转换为金蝶云星空API接口所需的格式,并最终写入目标平台。 #### 配置元数据 首先,我们需要根据元数据配置来定义API请求的结构。以下是一个典型的元数据配置示例: ```json { "api": "batchSave", "method": "POST", "idCheck": true, "operation": { "rowsKey": "array", "rows": 20, "method": "batchArraySave" }, "request": [ {"field":"FBillTypeID","label":"单据类型","type":"string","describe":"单据类型","parser":{"name":"ConvertObjectParser","params":"FNumber"},"value":"RKD01_SYS"}, {"field":"FBusinessType","label":"业务类型","type":"string","describe":"下拉列表","value":"CG"}, {"field":"FBillNo","label":"单据编号","type":"string","describe":"单据编号","value":"{goodsdocNo}"}, {"field":"FDate","label":"入库日期","type":"string","describe":"日期","value":"_function FROM_UNIXTIME( ( {inOutDate} \/ 1000 ) ,'%Y-%m-%d %T' )"}, {"field":"FStockOrgId","label":"收料组织","type":"string","describe":"组织","parser":{"name":"ConvertObjectParser","params":"FNumber"},"value":"_findCollection find textField_l2ib4q9s from 0ce3a160-9fb2-36e5-a2ac-57f5ad0f3c72 where textField_l2ib4q9p={warehouseCode}"}, {"field":"FPurchaseOrgId","label":"采购组织","type":"string","describe":"组织","parser":{"name":"ConvertObjectParser","params":"FNumber"},"value":"_findCollection find textField_l2ib4q9s from 0ce3a160-9fb2-36e5-a2ac-57f5ad0f3c72 where textField_l2ib4q9p={warehouseCode}"}, {"field":"FSupplierId","label":"供应商","type":"string","describe":"基础资料","parser":{"name":"ConvertObjectParser","params":"FNumber"},"value":"{vendCustomerCode}"}, { "field": "FInStockEntry", "label": "明细信息", "type": "array", "children": [ {"field": "FMaterialId", "label": "物料编码", "type": "string", "describe": "基础资料", "parser":{"name": "ConvertObjectParser", "params": "FNumber"}, "value": "{{goodsDocDetailList.goodsNo}}", "parent": "FInStockEntry"}, {"field": "FRealQty", "label": "实收数量", "type": "string", "describe": "数量", "value": "{{goodsDocDetailList.quantity}}", "parent": "FInStockEntry"}, {"field": "FPrice", "label": "单价", "type": "string", "describe": "单价", "value": "{{goodsDocDetailList.cuPrice}}", "parent": "FInStockEntry"}, {"field": "FStockId", "label": "仓库", "type": "string", "describe": "基础资料", "parser":{"name": ... ], ... }, ... ] ``` #### 数据解析与转换 1. **字段解析**:在元数据配置中,我们使用了`ConvertObjectParser`来解析某些字段,例如`FBillTypeID`和`FSupplierId`。这些字段需要从源系统的数据中提取并转换为目标系统所需的格式。 ```json { ..., {"field": "FBillTypeID", ...,"parser":{"name": ... } ``` 2. **日期格式转换**:对于日期字段,如`FDate`,我们使用了自定义函数来将UNIX时间戳转换为标准日期格式。 ```json { ..., {"field": ... } ``` 3. **集合查找**:对于组织相关的字段,如`FStockOrgId`和`FPurchaseOrgId`,我们通过集合查找功能来获取相应的数据。 ```json { ..., {"field": ... } ``` #### 构建请求体 根据以上解析和转换规则,我们可以构建出符合金蝶云星空API接口要求的请求体。以下是一个示例请求体: ```json { ... } ``` #### 调用API接口 最后一步是调用金蝶云星空的API接口,将构建好的请求体发送到目标平台。我们使用HTTP POST方法来实现这一点: ```python import requests url = 'https://api.kingdee.com/batchSave' headers = {'Content-Type': 'application/json'} data = { ... } response = requests.post(url, headers=headers, json=data) if response.status_code == 200: print('Data successfully written to Kingdee Cloud.') else: print('Failed to write data:', response.text) ``` 通过以上步骤,我们成功地完成了从源平台到金蝶云星空的ETL转换和数据写入过程。这一过程充分利用了轻易云数据集成平台提供的强大功能,实现了高效、准确的数据集成。 ![轻易云数据集成平台金蝶集成接口配置](https://pic.qeasy.cloud/T27.png~tplv-syqr462i7n-qeasy.image)