数据集成中的ETL转换及金蝶云星空写入技巧

  • 轻易云集成顾问-贺强
### 钉钉数据集成到金蝶云星空的实战案例 在企业信息化中,如何高效、稳定地实现多个系统的数据对接,是一个常见且必须解决的问题。本文将分享一个实际案例,即通过轻易云数据集成平台,将钉钉中的银行转账单据同步到金蝶云星空中的具体技术方案——transfer-新转账单(银行转账)V4.0。 #### 实现目标及挑战 为了确保银行转账单 V4.0 能精确且实时地从钉钉接口抓取,并写入至金蝶云星空,我们面临多重技术挑战: 1. **接口调用**:使用钉钉API v1.0/yida/processes/instances获取所需数据。 2. **分页与限流处理**:由于API返回的数据可能会进行分页,同时需要面对限流问题,这要求我们的处理逻辑能够灵活应对。 3. **格式转换和映射**:要处理不同系统间存在的数据格式差异,需要自定义数据转换逻辑以适配业务需求。 4. **批量写入能力**:大量数据需要快速写入金蝶云星空,采用batchSave API完成批量操作,提高效率和可靠性。 5. **异常检测与重试机制**:针对对接过程中可能出现的异常情况,我们设计了完善的监控告警体系,并加入错误重试机制,确保任务顺利完成。 #### 集成流程概述 整体流程基于全程可视化管理工具实现,从步骤设计上大致分为以下几个关键环节: 1. **定时任务调度**:设置定时器来周期性调用钉钉API,通过v1.0/yida/processes/instances获取最新的银行转账单据。 2. **分页抓取 & 数据清洗**: - 解析API响应结果,进行必要的数据清洗及预处理。 - 解决分页问题,通过设置合适参数循环拉取完整数据集合。 3. **格式转换 & 数据映射**: - 自定义脚本进行字段对应关系配置,使其符合金蝶云星空batchSave API要求的数据结构。 4. **批量传输 & 写入监控** - 使用轻易云的平台特性,实现高吞吐量下的大批量数据写入功能; - 利用集中式监控和告警系统实时跟踪每个步骤过程中的状态与性能表现。 5. **错误处理 & 重试机制** - 针对可能遇到的网络波动或接口响应异常等情况,配置详细的异常日志记录策略以及自动重试规则,保障最终一致性。 此技术方案不仅充分利用了 ![用友与MES系统接口开发配置](https://pic.qeasy.cloud/D28.png~tplv-syqr462i7n-qeasy.image) ### 调用钉钉接口获取并加工数据的技术案例 在数据集成过程中,调用源系统接口是至关重要的一步。本文将详细探讨如何通过轻易云数据集成平台调用钉钉接口`v1.0/yida/processes/instances`来获取并加工数据。 #### API接口配置 首先,我们需要配置API接口的元数据,以便正确调用钉钉的接口。以下是元数据配置的详细信息: ```json { "api": "v1.0/yida/processes/instances", "method": "POST", "number": "title", "id": "processInstanceId", "idCheck": true, "request": [ {"field": "pageNumber", "label": "分页页码", "type": "string", "describe": "分页页码", "value": "{PAGINATION_START_PAGE}"}, {"field": "pageSize", "label": "分页大小", "type": "string", "describe": "分页大小", "value": "{PAGINATION_PAGE_SIZE}"}, {"field": "appType", "label": "应用ID", "type": "string", "describe": "应用ID", "value":"APP_WTSCMZ1WOOHGIM5N28BQ"}, {"field": "systemToken", "label": "应用秘钥", "type":"string","describe":"应用秘钥","value":"IS866HB1DXJ8ODN3EXSVD750RBTK2X72R8MELL4"}, {"field":"userId","label":"用户的userid","type":"string","describe":"用户的userid","value":"16000443318138909"}, {"field":"language","label":"语言","type":"string","describe":"语言,取值:zh_CN:中文(默认值)en_US:英文"}, {"field":"formUuid","label":"表单ID","type":"string","describe":"表单ID","value":"FORM-KW766OD1931ALBVJBHUHLD6KFJ3G3OELLVLGL0"}, {"field":"searchFieldJson","label":"条件","type":"object","children":[{"field":"selectField_lglvrpmg","label":"类型","type":"string","value":"组织内部转款"}]}, {"field":"originatorId","label":"根据流程发起人工号查询","type":"string","describe":"根据流程发起人工号查询"}, {"field":"createFromTimeGMT","label":"创建时间起始值","type":"string","describe":"创建时间起始值","value":"_function DATE_FORMAT(DATE_ADD(NOW(),INTERVAL - 25 DAY),'%Y-%m-%d 00:00:00')"}, {"field":"createToTimeGMT","label":"创建时间终止值","type":"string","describe":"创建时间终止值","value":"{{CURRENT_TIME|datetime}}"}, {"field":"modifiedFromTimeGMT","label":"修改时间起始值","type":"string","describe":"修改时间起始值"}, {"field":"modifiedToTimeGMT","label":"修改时间终止值","type":"string","describe":"修改时间终止值"}, {"field":"taskId","label":"任务ID","type":"string","describe":"任务ID"}, {"field":"instanceStatus","label":"实例状态","type":"string","describe":"实例状态","value":"COMPLETED"}, {"field":"approvedResult","label":"流程审批结果","type":"string",“describe”:“流程审批结果”,“value”:“agree”} ], “condition”:[[{"field”:“dateField_lglvrpp4”,“logic”:“notnull”}]] } ``` #### 数据请求与清洗 在这个阶段,我们需要发送一个POST请求到钉钉API以获取所需的数据。请求体中包含了分页信息、应用ID、用户ID、表单ID等必要参数。这些参数确保我们能够准确地从钉钉系统中提取到所需的数据。 例如,分页页码和分页大小参数可以动态设置,以便处理大批量的数据时进行分批次请求: ```json { “pageNumber”: “{PAGINATION_START_PAGE}”, “pageSize”: “{PAGINATION_PAGE_SIZE}” } ``` 此外,通过设置`createFromTimeGMT`和`createToTimeGMT`参数,可以限定数据的时间范围,从而提高查询效率: ```json { “createFromTimeGMT”: “_function DATE_FORMAT(DATE_ADD(NOW(),INTERVAL - 25 DAY),'%Y-%m-%d 00:00:00')”, “createToTimeGMT”: “{{CURRENT_TIME|datetime}}” } ``` 这些参数确保我们只获取最近25天内的数据,避免了无效数据的干扰。 #### 数据转换与写入 在成功获取到数据后,需要对其进行清洗和转换,以便后续写入目标系统。在这个过程中,可以利用轻易云平台提供的可视化工具,对数据进行格式转换、字段映射等操作。例如,将钉钉返回的数据字段映射到目标系统所需的字段格式: ```json { “sourceField”: “processInstanceId”, “targetField”: “transactionId” }, { “sourceField”: “title”, “targetField”: “transactionTitle” } ``` 通过这种方式,可以确保数据在不同系统之间无缝对接,并且格式一致。 #### 实例状态与审批结果 为了确保只处理已完成且审批通过的实例,我们可以在请求中添加相应的过滤条件: ```json { “instanceStatus”: “COMPLETED”, “approvedResult”: “agree” } ``` 这两个条件可以有效过滤掉未完成或未通过审批的实例,确保数据质量。 综上所述,通过合理配置API接口元数据,并结合轻易云平台提供的数据处理工具,可以高效地实现从钉钉系统获取并加工数据,为后续的数据集成打下坚实基础。 ![如何开发金蝶云星空API接口](https://pic.qeasy.cloud/S21.png~tplv-syqr462i7n-qeasy.image) ### 使用轻易云数据集成平台进行ETL转换并写入金蝶云星空API接口 在数据集成生命周期的第二步中,我们将已经集成的源平台数据进行ETL(Extract, Transform, Load)转换,转为目标平台金蝶云星空API接口所能够接收的格式,并最终写入目标平台。本文将详细探讨如何利用轻易云数据集成平台配置元数据,实现这一过程。 #### 元数据配置解析 我们使用的元数据配置如下: ```json { "api": "batchSave", "method": "POST", "idCheck": true, "operation": { "rowsKey": "array", "rows": 1, "method": "batchArraySave" }, "request": [ {"field":"FBillNo","label":"单据编号","type":"string","value":"{serialNumberField_lgov9d3b}(YHZZ)"}, {"field":"FCOMMENT","label":"备注","type":"string","value":"{textareaField_lglvrpoa}"}, {"field":"FDATE","label":"日期","type":"string","value":"_function FROM_UNIXTIME( ( {dateField_lglvrpp4} \/ 1000 ) ,'%Y-%m-%d' )"}, {"field":"FBillTypeID","label":"单据类型","type":"string","parser":{"name":"ConvertObjectParser","params":"FNumber"},"value":"YHZZ01_SYS"}, {"field":"FPAYORGID","label":"支付组织","type":"string","parser":{"name":"ConvertObjectParser","params":"FNumber"},"value":"{textField_lglvrpmp}"}, {"field":"FMAINBOOKID","label":"帐薄","type":"string","parser":{"name":"ConvertObjectParser","params":"FNumber"},"value":"PRE001"}, {"field":"FEXCHANGETYPE","label":"汇率类型","type":"string","parser":{"name":"ConvertObjectParser","params":"FNumber"},"value":"HLTX01_SYS"}, { "field": "FEntity", "label": "分录", "type": "array", "children": [ {"field": "FTOBANKACNTID", "label": "转入账户", "type": "string", "parser":{"name": "ConvertObjectParser", "params": "FNumber"}, "value": "{selectField_lglvrpof}", "parent": "FEntity"}, {"field": "FFROMBANKACNTID", "label": "转出账号", "type": "string", "parser":{"name":: ConvertObjectParser", params: FNumber}, value: {selectField_lglvrpom}, parent: FEntity}, {"field": FINCURRENCYID, label: 币别, type: string, parser:{name: ConvertObjectParser, params: FNumber}, value: PRE001, parent: FEntity}, {"field": FAmount, label: 金额, type: string, value: {numberField_lglvrpn8}, parent: FEntity}, {"field": FTOTALAMOUNT, label: 转出金额, type: string, value:{numberField_lglvrpn8}, parent:FEntity}, {"field" : FOUTCURRENCYID , label : 转出币别 , type : string , parser : { name : ConvertObjectParser , params : FNumber }, value : PRE001 , parent : FEntity}, {"field" : FEXPLANATION , label : 摘要 , type : string , value : { textareaField_lglvrpoa } , parent : FEntity }, {"field" : FINEXCHANGERATE , label : 转入汇率 , type : string , value : 1 , parent : FEntity }, {"field" : FOUTEXCHANGERATE , label : 转出汇率 , type : string , parent:FEntity }, {"field" : FPOSTDATE , label : 业务日期 , type:string,value:{gmtModified},parent:FEntity}, {"field" :"FRuZhangType", label:"入账类型", type:"string", value:"_function FROM_UNIXTIME( ( {dateField_lglvrpp4} \/ 1000 ) ,' %Y-%m-%d' ) ",parent:"FEntity"}, {parent:"FEntity",label:"转出部门", field:"FDeptID", type:"string", value:"{textField_l3wlou1g}", parser:{name:"ConvertObjectParser", params:"FNumber"}} ], parser:{name:"ConvertObjectParser", params:"FNumber"} } ], otherRequest:[ { field :"FormId" , label :"业务对象表单Id" , type :"string" ,"describe" :"必须填写金蝶的表单ID如:PUR_PurchaseOrder" ,"value" :"CN_BANKTRANSBILL"}, { field :"Operation" ,"label ":"执行的操作 ","type ":"string ","value ":"BatchSave"}, { field :"IsAutoSubmitAndAudit ","label ":"提交并审核 ","type ":"bool ","value ":true}, { field :"IsVerifyBaseDataField ","label ":"验证基础资料 ","type ":"bool ","describe ":"是否验证所有的基础资料有效性,布尔类,默认false(非必录) ","value ":false} ] } ``` #### 数据请求与清洗 在进行ETL转换之前,我们首先需要从源系统获取原始数据。这一步骤通常包括通过API请求或数据库查询来获取数据,并对其进行初步清洗,以确保数据格式的一致性和完整性。 #### 数据转换与写入 1. **字段映射与转换**: - **单据编号(FBillNo)**:通过 `{serialNumberField_lgov9d3b}(YHZZ)` 获取源系统中的单据编号,并添加后缀“(YHZZ)”。 - **备注(FCOMMENT)**:直接映射 `textareaField_lglvrpoa` 字段。 - **日期(FDATE)**:使用 `_function FROM_UNIXTIME( ( {dateField_lglvrpp4} / 1000 ),'%Y-%m-%d' )` 将 Unix 时间戳转换为标准日期格式。 - **单据类型(FBillTypeID)**:固定值 `YHZZ01_SYS`,并使用 `ConvertObjectParser` 将其转换为目标系统可识别的格式。 - **支付组织(FPAYORGID)**:映射 `textField_lglvrpmp` 字段,并通过 `ConvertObjectParser` 转换。 - **帐薄(FMAINBOOKID)**、**汇率类型(FEXCHANGETYPE)**:分别固定值为 `PRE001` 和 `HLTX01_SYS`,并通过 `ConvertObjectParser` 转换。 2. **分录字段处理**: 分录字段包含多个子字段,每个子字段都需要进行相应的映射和转换。例如: - **转入账户(FTOBANKACNTID)** 和 **转出账号(FFROMBANKACNTID)**:分别映射 `selectField_lglvrpof` 和 `selectField_lglvrpom` 字段,并通过 `ConvertObjectParser` 转换。 - **金额(FAmount)、转出金额(FTOTALAMOUNT)、摘要(FEXPLANATION)、业务日期(FPOSTDATE)、入账类型(FRuZhangType)、转出部门(FDeptID)** 等字段也需要类似处理。 3. **其他请求参数设置**: - 设置表单 ID 为 `CN_BANKTRANSBILL`。 - 执行操作设置为 `BatchSave`。 - 自动提交并审核设置为 `true`。 - 验证基础资料有效性设置为 `false`。 #### API 请求构建与发送 根据上述元数据配置,构建最终的 API 请求体。以下是一个示例请求体: ```json { "FormId": "CN_BANKTRANSBILL", "Operation": { ... }, ... } ``` 通过 HTTP POST 方法将请求发送至金蝶云星空 API 接口,实现数据写入。 #### 实时监控与错误处理 在整个 ETL 转换和数据写入过程中,通过轻易云的数据集成平台提供的实时监控功能,可以随时查看数据流动和处理状态。如果出现错误,可以根据错误日志迅速定位问题并进行修复。 以上就是利用轻易云数据集成平台,将源平台的数据经过 ETL 转换后写入金蝶云星空 API 接口的详细技术过程。 ![如何对接用友BIP接口](https://pic.qeasy.cloud/T18.png~tplv-syqr462i7n-qeasy.image)