轻易云平台实现吉客云与金蝶云的数据ETL全过程

  • 轻易云集成顾问-李国敏
### 吉客云数据集成到金蝶云星空案例分享:其他入库(负数)对接-其他出库单 在众多企业进行系统对接时,如何高效、可靠地完成业务数据的无缝集成成为了一个关键问题。本次技术文章将聚焦于吉客云与金蝶云星空的实际对接案例,着重分析“其他入库(负数)对接-其他出库单”这一具体应用场景。 首先,我们需要处理的数据是通过吉客云API `erp.storage.goodsdocin.v2` 来获取。这一接口用于拉取吉客云中的“其他入库(负数)”记录。为了确保大量数据能够快速且稳定写入至金蝶云星空中,我们采用了其批量保存接口 `batchSave`。该批量操作不仅提升了整体数据处理效率,还减少了网络请求次数,从而提高系统性能。 #### 1. 如何调用吉客云接口 在整个方案中,对API调用频率控制及分页问题显得尤为重要。利用自定义的数据抓取脚本,我们可以定时、可靠地从吉客云取得必要的数据。在处理大规模数据情况下,通过适当设计分页策略,可以有效应对限流和延迟等潜在风险。 #### 2. 金蝶云星空批量写入机制 将从吉客云获取到的大量“其他入库(负数)”记录映射并转换成符合金蝶标准的数据格式后,通过调用其 `batchSave` 接口实现批量存储。这一步骤需要特别注意的是字段匹配以及数据验证,以确保每条记录准确无误地被应用到目标系统中。 为了满足企业个性化需求,对部分特殊字段进行了定制化映射。同时,通过实时监控和日志记录功能,可以即时掌握数据同步状态并捕捉异常情况,实现更高的运维管理效率。一旦发生错误,自动重试机制会确保任务最终成功执行,大幅降低人为干预成本。 以上便是此次系统集成技术方案的一部分内容。从API调用细节,到分页与限流策略,再到跨平台的数据格式转换,每个环节都经过精心设计和优化,为完善业务流程搭建坚实基础。在随后的章节中,将详细阐述此方案各项技术实现过程,以及相关代码示例,希望能为同类项目提供有价值的参考。 ![如何对接企业微信API接口](https://pic.qeasy.cloud/D32.png~tplv-syqr462i7n-qeasy.image) ### 调用吉客云接口erp.storage.goodsdocin.v2获取并加工数据 在轻易云数据集成平台的生命周期中,调用源系统接口是数据请求与清洗阶段的关键步骤。本文将详细介绍如何通过调用吉客云接口`erp.storage.goodsdocin.v2`获取并加工数据,以实现其他入库(负数)对接其他出库单的集成方案。 #### 接口配置与调用 首先,我们需要配置元数据以正确调用吉客云接口。以下是该接口的元数据配置: ```json { "api": "erp.storage.goodsdocin.v2", "effect": "QUERY", "method": "POST", "number": "goodsdocNo", "id": "goodsdocNo", "idCheck": true, "request": [ {"field": "pageIndex", "label": "分页页码", "type": "string"}, {"field": "pageSize", "label": "分页页数", "type": "string", "value": "50"}, {"field": "goodsDocNo", "label": "入库单号", "type": "string"}, {"field": "startDate", "label": "创建时间的起始时间", "type": "string", "value":"{{LAST_SYNC_TIME|datetime}}" }, {"field": "endDate", "label": "创建时间的结束时间", "type":"string", "value":"{{CURRENT_TIME|datetime}}" }, {"field":"inouttype","label":"入库类型","type":"string","describe":"入库类型(100-期初库存 101-采购入库 102-调拨入库 103-盘盈入库 104-其他入库 105-销售退货 106-完工入库 107-组装拆卸入库 108-翻新入库 109-报废入库 110-残次品入库111-成本调整 112-即采即入113-退料入库114-调拨退回115-维修返厂返还)","value":"104"}, {"field":"warehouseId","label":"仓库ID","type":"string"}, {"field":"warehouseCode","label":"仓库编号","type":"string"}, {"field":"vendId","label":"供应商ID(往来单位)","type":"string"}, {"field":"vendCode","label":"供应商编号(往来单位)","type":"string"}, {"field":"billNo","label":"上游单据号(关联单号)","type":"string"}, {"field":"userName","label":"创建人名称","type":"string"}, {"field":"gmtModifiedStart","label":"主表更新时间起始","type":"string"}, {"field":"gmtModifiedEnd","label":"主表更新时间截至","type":"string"}, {"field":"selelctFields", "label":"返回参数", "type":"string", "value": "goodsdocNo,inOutDate,userName,gmtCreate,inouttype,vendCustomerCode,warehouseCode,warehouseName,inOutReason,redStatus,financeBillStatus,goodsDocDetailList.goodsNo,goodsDocDetailList.quantity,goodsDocDetailList" } ], "autoFillResponse":true, "beatFlat":["goodsDocDetailList"], "condition":[ [{"field": "userName", "logic": "notlike", "value": "外部" }, { "field": "goodsDocDetailList.quantity", "logic": "lt", "value": 0 }] ], "omissionRemedy":{ "crontab": "1 2 * * *", "takeOverRequest":[ { "field": "startDate", "value": "_function FROM_UNIXTIME( unix_timestamp() -259200 , '%Y-%m-%d %H:%i:%s' )", "label": "接管字段", "formModel":{ enable: false }, tableModel:{ enable: false }, physicalModel:{ enable: false }, type: string }] } } ``` #### 请求参数解析 1. **分页参数**:`pageIndex`和`pageSize`用于控制分页,默认每页返回50条记录。 2. **时间参数**:`startDate`和`endDate`分别表示查询的起始和结束时间,使用动态变量`{{LAST_SYNC_TIME|datetime}}`和`{{CURRENT_TIME|datetime}}`填充。 3. **固定值参数**:如`inouttype`设置为104,表示其他入库。 4. **可选参数**:如仓库ID、供应商ID等,可以根据实际需求进行填写。 #### 条件过滤与自动填充 为了确保数据准确性,我们添加了条件过滤: ```json [ { field: userName, logic: notlike, value: 外部 }, { field: goodsDocDetailList.quantity, logic: lt, value: 0 } ] ``` 这意味着我们只获取非外部创建且数量小于0的记录。 同时,启用了自动填充响应功能(autoFillResponse),确保返回的数据结构一致性。 #### 数据处理与清洗 在获取到原始数据后,需要对其进行清洗和转换。以下是一个示例代码片段,用于处理返回的数据: ```python import requests import json # 配置请求头和URL url = 'https://api.jikexyun.com/erp/storage/goodsdocin/v2' headers = {'Content-Type': 'application/json'} # 构建请求体 payload = { 'pageIndex': '1', 'pageSize': '50', 'startDate': '{{LAST_SYNC_TIME|datetime}}', 'endDate': '{{CURRENT_TIME|datetime}}', 'inouttype': '104' } # 发起请求并获取响应 response = requests.post(url, headers=headers, data=json.dumps(payload)) data = response.json() # 数据清洗与转换逻辑 cleaned_data = [] for record in data['records']: if record['userName'] != '外部' and record['goodsDocDetailList']['quantity'] < 0: cleaned_record = { 'goodsdocNo': record['goodsdocNo'], 'inOutDate': record['inOutDate'], # 更多字段处理... } cleaned_data.append(cleaned_record) # 输出或进一步处理cleaned_data print(cleaned_data) ``` #### 异常处理与补偿机制 为了应对可能出现的数据遗漏情况,我们配置了定时任务(omissionRemedy): ```json { crontab:1 2 * * *, takeOverRequest:[ { field:startDate, value:_function FROM_UNIXTIME( unix_timestamp() -259200 , '%Y-%m-%d %H:%i:%s' ), label:接管字段, formModel:{enable:false}, tableModel:{enable:false}, physicalModel:{enable:false}, type:string} ] } ``` 该任务每天凌晨2点运行一次,确保在过去三天内的数据不会被遗漏。 通过以上步骤,我们成功实现了从吉客云接口获取并加工数据,为后续的数据转换与写入奠定了基础。 ![钉钉与WMS系统接口开发配置](https://pic.qeasy.cloud/S13.png~tplv-syqr462i7n-qeasy.image) ### 使用轻易云数据集成平台将源数据转换并写入金蝶云星空API接口 在数据集成的生命周期中,ETL(提取、转换、加载)过程是关键步骤之一。本文将详细探讨如何使用轻易云数据集成平台将已经集成的源平台数据进行ETL转换,并最终写入金蝶云星空API接口。 #### 元数据配置解析 在本次集成方案中,我们需要将其他入库(负数)对接到其他出库单。元数据配置如下: ```json { "api": "batchSave", "effect": "EXECUTE", "method": "POST", "idCheck": true, "operation": { "method": "merge", "field": "goodsDocDetailList_ownerName,goodsdocNo", "bodyName": "Fentity", "header": ["goodsdocNo", "inOutDate", "warehouseCode", "goodsDocDetailList_ownerName"], "body": ["goodsDocDetailList_goodsNo", "goodsDocDetailList_quantity", "warehouseCode"] }, ... } ``` #### 数据请求与清洗 首先,我们需要从源平台提取相关数据,并进行必要的清洗和预处理。在此过程中,确保每个字段的数据类型和格式符合目标平台的要求。 #### 数据转换与写入 接下来,我们进入关键的转换与写入阶段。以下是具体步骤: 1. **定义请求参数**: - `FJKYNo`:吉客云单号,对应字段 `{goodsdocNo}`。 - `FBillNo`:单据编号。 - `FBillTypeID`:单据类型,通过 `ConvertObjectParser` 转换,映射到目标字段 `654334351e19540ce43c42b7`。 - `FStockOrgId`:库存组织,通过 `ConvertObjectParser` 转换,映射到目标字段 `6441f0214af70a2f240adb22`。 - `FStockDirect`:库存方向,固定值为 `"GENERAL"`。 - `FDate`:日期,对应字段 `{inOutDate}`。 - `FDEPTID`:部门,通过 `ConvertObjectParser` 转换,固定值为 `"BM000029"`。 - `FOwnerTypeIdHead`:货主类型,固定值为 `"BD_OwnerOrg"`。 - `FOwnerIdHead`:货主,通过 `ConvertObjectParser` 转换,映射到目标字段 `6441f0214af70a2f240adb22`。 2. **定义明细信息**: - 明细信息以数组形式存在,每个子项包含以下字段: - `FMATERIALID`: 物料编码,通过 `ConvertObjectParser` 转换,对应字段 `{Fentity.goodsDocDetailList_goodsNo}`。 - `FStockStatusId`: 库存状态,固定值为 `"KCZT01_SYS"`。 - `FSTOCKID`: 收货仓库,通过 `ConvertObjectParser` 转换,对应字段 `{Fentity.warehouseCode}`。 - `FQty`: 实收数量,通过 `_function {{Fentity.goodsDocDetailList_quantity}} *(-1)` 函数计算负数。 3. **其他请求参数**: - `"FormId"`: 表单ID,固定值为 `"STK_MisDelivery"`。 - `"IsVerifyBaseDataField"`: 验证基础资料有效性,布尔值为 `"true"`。 - `"Operation"`: 执行操作,固定值为 `"Save"`。 - `"IsAutoSubmitAndAudit"`: 提交并审核,布尔值为 `"true"`。 - `"InterationFlags"`: 允许负库存,固定值为 `"STK_InvCheckResult"`。 #### 实际操作示例 下面是一个实际操作示例,将上述配置应用于一个具体的数据集: ```json { "api": "batchSave", ... "request": [ {"field":"FJKYNo","value":"DOC12345"}, {"field":"FBillNo","value":"BILL12345"}, {"field":"FBillTypeID","value":"WAREHOUSE001"}, {"field":"FStockOrgId","value":"ORG001"}, {"field":"FStockDirect","value":"GENERAL"}, {"field":"FDate","value":"2023-10-01"}, {"field":"FDEPTID","value":"BM000029"}, {"field":"FOwnerTypeIdHead","value":"BD_OwnerOrg"}, {"field":"FOwnerIdHead","value":"OWNER001"}, ... ], ... } ``` 通过上述配置和操作,我们可以成功地将源平台的数据转换并写入金蝶云星空API接口,实现不同系统间的数据无缝对接。这不仅提高了业务效率,还确保了数据的一致性和准确性。 ![数据集成平台API接口配置](https://pic.qeasy.cloud/T3.png~tplv-syqr462i7n-qeasy.image)