使用轻易云进行ETL转换并写入金蝶云星空的实践

  • 轻易云集成顾问-吴伟
### 钉钉数据集成到金蝶云星空:案例分享与技术解析 在一个复杂的企业生态系统中,各种信息化工具之间的数据流通是一项至关重要但充满挑战的任务。在本次案例分享中,我们将聚焦于如何高效地实现钉钉中的采购付款单数据集成到金蝶云星空中,具体方案名称为"pay_新付款单(采购业务付款)V4.0"。 为了实现这一目标,我们采用了轻易云数据集成平台,通过其提供的高吞吐量数据写入能力以及集中监控和告警系统,使得大量来自钉钉的数据能够可靠且快速地被写入金蝶云星空。同时,该平台还支持自定义数据转换逻辑,以处理两套系统间不同的数据格式。 整个集成过程开始于从钉钉获取业务流程实例接口(`/v1.0/yida/processes/instances`),定时抓取新的付款单数据。为应对API分页限制及限流问题,我们使用了多线程并发请求策略,并配置重试机制以确保每一次请求都能成功返回所需的数据。此外,还通过实时监控功能记录所有操作日志,便于后续追踪和故障排查。 在获取到有效数据之后,接下来是针对这些原始数据进行清洗、转换及映射,以适应金蝶云星空的标准输入格式。这一步骤依赖于可视化的数据流设计工具,使开发人员能够直观地定义和调整各类转换规则,从而确保生成符合要求的最终输出。 最后,将处理好的数据信息批量写入到金蝶云星空对应模块,此处我们调用的是批量保存接口(`batchSave`)。在这个过程中,加强了对异常情况的处理,包括错误日志记录和自动重试机制,以保证即使面对突发性网络波动或其他不稳定因素,也不会导致关键业务信息丢失或延迟录入系统。 更多技术细节将在随后的章节详细展开,这里仅做初步概述,为读者提供整体框架理解。 ![金蝶与SCM系统接口开发配置](https://pic.qeasy.cloud/D19.png~tplv-syqr462i7n-qeasy.image) ### 调用钉钉接口获取并加工数据的技术案例 在数据集成过程中,调用源系统接口获取数据是至关重要的一步。本文将详细探讨如何通过轻易云数据集成平台调用钉钉接口`v1.0/yida/processes/instances`来获取并加工数据。 #### 接口配置与请求参数 首先,我们需要配置接口的元数据,以便正确地调用钉钉API。以下是该接口的元数据配置: ```json { "api": "v1.0/yida/processes/instances", "method": "POST", "number": "title", "id": "processInstanceId", "idCheck": true, "request": [ {"field": "pageNumber", "label": "分页页码", "type": "string", "describe": "分页页码", "value": "{PAGINATION_START_PAGE}"}, {"field": "pageSize", "label": "分页大小", "type": "string", "describe": "分页大小", "value": "{PAGINATION_PAGE_SIZE}"}, {"field": "appType", "label": "应用ID", "type": "string", "describe": "应用ID", "value":"APP_WTSCMZ1WOOHGIM5N28BQ"}, {"field": "systemToken", "label": "应用秘钥", "type": "string", "describe":"应用秘钥","value":"IS866HB1DXJ8ODN3EXSVD750RBTK2X72R8MELL4"}, {"field":"userId","label":"用户的userid","type":"string","describe":"用户的userid","value":"16000443318138909"}, {"field":"language","label":"语言","type":"string","describe":"语言,取值:zh_CN:中文(默认值)en_US:英文"}, {"field":"formUuid","label":"表单ID","type":"string","describe":"表单ID","value":"FORM-UX866Q61RUV939TLEWG9H4HX25523ZRQNXLGLW"}, {"field":"searchFieldJson","label":"条件","type":"object","children":[{"field":"selectField_lgm25d98","label":"类型","type":"string","value":"供应链款项"},{"parent":"searchFieldJson","label":"部门过滤","field":"selectField_lgm25d8q","type":"string"}]}, {"field":"originatorId","label":"根据流程发起人工号查询","type":"string","describe":"根据流程发起人工号查询"}, {"field":"createFromTimeGMT","label":"创建时间起始值","type":"string", "describe" : "创建时间起始值", "value" : "_function DATE_FORMAT(DATE_ADD(NOW(),INTERVAL - 25 DAY),'%Y-%m-%d 00:00:00')" }, {"field" : "createToTimeGMT", "label" : "创建时间终止值", "type" : "string", "describe" : "创建时间终止值", "value" : "{{CURRENT_TIME|datetime}}" }, {"field" : "modifiedFromTimeGMT", "label" : "修改时间起始值", "type" : "string", "describe" : "修改时间起始值" }, {"field" : "modifiedToTimeGMT", "label" : "修改时间终止值", "type" : "stringscribe修改时间终止值" }, {"field":["taskId"], label:"任务ID", type:"string", describe:"任务ID" }, { field:"instanceStatus", label:"实例状态", type:"string", describe:"实例状态", value: “COMPLETED” }, { field:"approvedResult", label:"流程审批结果, type:"string", describe:"流程审批结果", value: “agree” } ], “condition”:[[{"field”:“dateField_lgn3helb”, “logic”:“notnull”}]] } ``` #### 请求参数解析 1. **分页参数**: - `pageNumber` 和 `pageSize` 用于控制请求的数据量和分页。 2. **身份验证**: - `appType` 和 `systemToken` 用于验证应用身份,确保只有授权的应用才能访问数据。 3. **用户信息**: - `userId` 指定了请求数据的用户身份。 4. **语言设置**: - `language` 参数可以设置返回数据的语言,默认为中文。 5. **表单和搜索条件**: - `formUuid` 指定了要查询的表单ID。 - `searchFieldJson` 包含了具体的搜索条件,如类型和部门过滤。 6. **时间范围**: - `createFromTimeGMT` 和 `createToTimeGMT` 用于指定查询的数据创建时间范围。 7. **其他过滤条件**: - 包括 `originatorId`, `modifiedFromTimeGMT`, `modifiedToTimeGMT`, `taskId`, `instanceStatus`, 和 `approvedResult` 等,用于进一步细化查询条件。 #### 数据清洗与转换 在获取到原始数据后,需要进行清洗和转换,以便后续的数据处理和写入。以下是一些常见的数据清洗和转换操作: 1. **字段映射**: 将原始数据中的字段映射到目标系统所需的字段。例如,将钉钉返回的数据字段名转换为目标系统所需的字段名。 2. **格式转换**: 将日期、数字等字段转换为目标系统所需的格式。例如,将日期格式从UTC转换为本地时区。 3. **数据过滤**: 根据业务需求过滤掉不需要的数据。例如,只保留审批结果为“agree”的记录。 4. **合并与拆分**: 根据业务需求对数据进行合并或拆分。例如,将多个字段合并成一个复合字段,或者将一个复合字段拆分成多个独立字段。 #### 实例代码 以下是一个示例代码片段,用于调用钉钉接口并处理返回的数据: ```python import requests import json from datetime import datetime, timedelta # 设置请求URL和头部信息 url = 'https://oapi.dingtalk.com/v1.0/yida/processes/instances' headers = { 'Content-Type': 'application/json', } # 设置请求参数 params = { 'pageNumber': '1', 'pageSize': '20', 'appType': 'APP_WTSCMZ1WOOHGIM5N28BQ', 'systemToken': 'IS866HB1DXJ8ODN3EXSVD750RBTK2X72R8MELL4', 'userId': '16000443318138909', 'language': 'zh_CN', 'formUuid': 'FORM-UX866Q61RUV939TLEWG9H4HX25523ZRQNXLGLW', 'searchFieldJson': json.dumps({ 'selectField_lgm25d98': '供应链款项', # 添加其他搜索条件 }), 'createFromTimeGMT': (datetime.now() - timedelta(days=25)).strftime('%Y-%m-%d 00:00:00'), 'createToTimeGMT': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), } # 发起POST请求 response = requests.post(url, headers=headers, data=json.dumps(params)) # 检查响应状态码 if response.status_code == 200: data = response.json() # 数据清洗与转换 cleaned_data = [] for item in data['data']: cleaned_item = { # 字段映射与格式转换示例 'title': item['title'], 'processInstanceId': item['processInstanceId'], # 添加其他字段处理逻辑 } cleaned_data.append(cleaned_item) else: print(f'Error: {response.status_code}, {response.text}') ``` 通过上述步骤,我们可以成功地调用钉钉接口获取所需的数据,并对其进行清洗和转换,为后续的数据处理奠定基础。 ![如何对接企业微信API接口](https://pic.qeasy.cloud/S8.png~tplv-syqr462i7n-qeasy.image) ### 使用轻易云数据集成平台进行ETL转换并写入金蝶云星空API接口的技术案例 在数据集成的生命周期中,ETL(提取、转换、加载)是关键步骤之一。本文将详细探讨如何使用轻易云数据集成平台,将源平台的数据转换为金蝶云星空API接口所能接收的格式,并最终写入目标平台。 #### 1. API接口配置 首先,我们需要配置金蝶云星空的API接口。在本案例中,我们使用的是`batchSave`接口,采用POST方法进行数据提交。以下是具体的元数据配置: ```json { "api": "batchSave", "method": "POST", "idCheck": true, "operation": { "method": "batchArraySave", "rows": 1, "rowsKey": "array" }, "request": [ {"field":"FBillNo","label":"单据编号","type":"string","value":"{serialNumberField_lgm25d8r}(FKD)"}, {"field":"FCURRENCYID","label":"币别","type":"string","parser":{"name":"ConvertObjectParser","params":"FNumber"},"value":"PRE001"}, {"field":"FDATE","label":"业务日期","type":"string","value":"_function FROM_UNIXTIME( ( {dateField_lgn3helb} \/ 1000 ) ,'%Y-%m-%d' )"}, {"field":"FBillTypeID","label":"单据类型","type":"string","parser":{"name":"ConvertObjectParser","params":"FNumber"},"value":"FKDLX01_SYS"}, {"field":"FBUSINESSTYPE","label":"业务类型","type":"string","value":"2"}, {"field":"FCONTACTUNITTYPE","label":"往来单位类型","type":"string","value":"BD_Supplier"}, {"field":"FCONTACTUNIT","label":"往来单位","type":"string","value":"{textField_lgm25d9i}","parser":{"name":"ConvertObjectParser","params":"FNumber"}}, {"field":"FRECTUNITTYPE","label":"收款单位类型","type":"string","value":"BD_Supplier"}, {"field":"FRECTUNIT","label":"收款单位","type":"string","parser":{"name":"ConvertObjectParser","params":"FNumber"},"value": "{textField_lgm25d9i}"}, {"field": "FDepartment", "label": "部门", "type": "string", "parser": {"name": "ConvertObjectParser", "params": "FName"}, "value": "{{tableField_lgm25d9j.textField_lgm25d9x}}"}, {"field": "FPAYORGID", "label": "付款组织", "type": "string", "parser":{"name": "ConvertObjectParser", "params": "FNumber"}, value: "{textField_lgn3hel2}"}, {"field": FSETTLECUR, label: “结算币别”, type: “string”, parser: { name: “ConvertObjectParser”, params: “FNumber” }, value: “PRE001”}, {“field”: “FEXCHANGERATE”, “label”: “汇率”, “type”: “string”, value: “1”}, {“label”: “结算组织”, field: FSETTLEORGID, type: string, value: {{tableField_lgm25d9j.textField_lgm25d9w}}, parser: { name: ConvertObjectParser, params: FNumber }}, {“label”: 往来类型, field: F_QKZI_Assistant, type: string, value:_function case '{selectField_lgni8iv2}' when '否' then ' ' when '是' then '{selectField_lgn3hekm_id}' end, parser:{ name: ConvertObjectParser, params:FNumber}}, {“field”: FREMARK, label:“备注”, type:“string”, value:{textareaField_lgn3hejg}}, { field:“FPAYBILLENTRY”, label:“付款单明细”, type:“array”, children:[ { field:FSETTLETYPEID,label:“结算方式”, type:string, parser:{ name: ConvertObjectParser, params:FNumber}, value:“JSFS04_SYS”, parent:“FPAYBILLENTRY”}, { parent:“FPURPOSEID”, label:“付款用途”, field:“FPURPOSEID”, type:string,value:{tableField_lgm25d9j.textField_lgm25d9u}, parser:{ name: ConvertObjectParser, params:FNumber}}, { field:“FPAYTOTALAMOUNTFOR”, label:“应付金额”, type:string,value:{tableField_lgm25d9j.numberField_lgm25d9r}, parent:“FPAYBILLENTRY”}, { field:“FPAYAMOUNTFOR_E”, label:“付款金额”, type:“string”, value:{{tableField_lgm25d9j.numberField_lgm25d9r}}, parent:“FPAYBILLENTRY”}, { field:“FEXPENSEDEPTID_E”, label:“费用承担部门”, type:“string”, parser:{ name: ConvertObjectParser, params: FNumber }, value:{{tableField_lgm25d9j.textField_lgm25d9x}}, parent:“FPAYBILLENTRY”}, { field:“FPOSTDATE”, label:“登账日期”, type:“string”, value:_function FROM_UNIXTIME( ( {dateField_lgn3helb} \/ 1000 ) ,'%Y-%m-%d' ), parent:“FPAYBILLENTRY”}, { field:“FPayType”, label:“支付类型”, type:“string”, value:A, parent:“FPAYBILLENTRY”}, { field:“FCOMMENT”, label:“备注”, type:“string”, value:{textareaField_lgn3hejg}, parent:“FPAYBILLENTRY”}, { parent:“FPAYBILLENTRY”, label:“我方银行账号”, field:“FACCOUNTID”, type:“string”, value:{selectField_lgn3hel5}, parser:{ name: ConvertObjectParser, params: FNumber }}, { parent:"FPAYBILLENTRY", label:"手续费", field:"FHANDLINGCHARGEFOR", type:"string", value:"{{tableField_lgm25d9j.numberField_lgm25d9q}}" } ], value:"tableField_lgm25d9j" } ], "otherRequest":[ { field:"FormId", label:"业务对象表单Id", type:"string", describe:"必须填写金蝶的表单ID如:PUR_PurchaseOrder", value:"AP_PAYBILL" }, { field:"Operation", label:"执行的操作", type:"string", value:"BatchSave" }, { field:"IsAutoSubmitAndAudit", label:"提交并审核", type:"bool", value:"true" }, { field:"IsVerifyBaseDataField", label:"验证基础资料", type:"bool", describe:"是否验证所有的基础资料有效性,布尔类,默认false(非必录)", value:"false" } ] } ``` #### 2. 数据请求与清洗 在ETL过程中,我们首先需要从源系统请求数据,并对其进行清洗,以确保数据的一致性和完整性。例如,对于日期字段`dateField_lgn3helb`,我们需要将其从UNIX时间戳转换为标准日期格式: ```sql _function FROM_UNIXTIME( ( {dateField_lgn3helb} / 1000 ), '%Y-%m-%d' ) ``` 类似地,对于其他字段,如币别`FCURRENCYID`、往来单位`FCONTACTUNIT`等,我们通过解析器`ConvertObjectParser`将其转换为目标系统可识别的格式。 #### 3. 数据转换与写入 在完成数据清洗后,我们将数据转换为金蝶云星空API所需的格式,并通过POST请求将其写入目标系统。以下是一个示例请求体: ```json { "FormId": "AP_PAYBILL", "Operation": "BatchSave", ... } ``` 每个字段都经过了适当的转换和映射,例如: - `FBillNo`: `{serialNumberField_lgm25d8r}(FKD)` - `FCURRENCYID`: `PRE001` - `FBillTypeID`: `FKDLX01_SYS` - `FBUSINESSTYPE`: `2` - ... #### 总结 通过轻易云数据集成平台,我们能够高效地实现不同系统间的数据无缝对接。本文详细介绍了如何配置和使用金蝶云星空API接口进行数据写入,包括数据请求、清洗、转换和最终写入目标系统的全过程。这种全生命周期管理的方法极大提升了业务透明度和效率。 ![企业微信与ERP系统接口开发配置](https://pic.qeasy.cloud/T17.png~tplv-syqr462i7n-qeasy.image)