ETL转换与金蝶云星空API数据写入的实现

  • 轻易云集成顾问-贺强
### 金蝶云星空与金蝶云星空数据集成:客户OK方案探索 在现代企业的数据管理需求中,系统对接和数据集成显得尤为重要。特别是针对金蝶云星空这样的ERP系统,通过高效的数据集成功能,可以大幅提升业务流程的自动化程度和操作效率。在本文中,我们聚焦于一个具体的案例:如何通过轻易云平台将数据从一个金蝶云星空实例无缝集成到另一个金蝶云星空实例,实现“客户OK”这一方案。 本次技术案例主要探讨了以下几个关键点: - **高吞吐量的数据写入**:利用batchSave API接口,将大量数据快速写入目标金蝶云星空系统,确保及时性。 - **定时抓取与分页限流处理**:通过executeBillQuery API接口,对源端进行定时可靠的数据抓取,并有效应对API分页和限流问题。 - **自定义转换逻辑及格式差异处理**:实现特定业务需求下的自定义数据转换,并解决两端之间可能存在的数据格式差异问题。 - **集中监控与告警系统**: 集中的监控和告警体系,实时跟踪每个数据集成任务的状态、性能,以及异常检测与处理机制。 例如,在实际操作中,为保证高吞吐量下不漏单,我们使用了轻易云提供的可视化设计工具,对批量数据写入过程进行了全程监控,还借助其高级配置功能,实现了多种复杂场景下的数据映射和字段转义。同时,为简化开发过程,我们充分运用了轻易云API资产管理功能,有效地掌握各接口调用情况,使资源得到优化配置。 下一节我们将深入探讨如何具体实施这些技术要点,从而实现跨平台数据的一体化无缝对接。 ![打通钉钉数据接口](https://pic.qeasy.cloud/D13.png~tplv-syqr462i7n-qeasy.image) ### 调用金蝶云星空接口executeBillQuery获取并加工数据 在数据集成生命周期的第一步中,调用源系统接口是关键环节之一。本文将详细探讨如何通过轻易云数据集成平台调用金蝶云星空的`executeBillQuery`接口来获取并加工数据。 #### 配置元数据 首先,我们需要配置元数据,以便正确调用金蝶云星空的API接口。以下是元数据配置的详细内容: ```json { "api": "executeBillQuery", "method": "POST", "number": "FNumber", "id": "FCUSTID", "pagination": { "pageSize": 100 }, "idCheck": true, "request": [ {"field":"FCUSTID","label":"FCUSTID","type":"string","value":"FCUSTID"}, {"field":"FNumber","label":"编码","type":"string","value":"FNumber"}, {"field":"FName","label":"名称","type":"string","value":"FName"}, {"field":"FCreateOrgId_FNumber","label":"创建组织","type":"string","value":"FCreateOrgId.FNumber"}, {"field":"FUseOrgId_FNumber","label":"使用组织","type":"string","value":"FUseOrgId.FNumber"}, {"field":"FDescription","label":"描述","type":"string","value":"FDescription"}, {"field":"FIsTrade","label":"是否交易客户","type":"string","value":"FIsTrade"}, {"field":"FCustTypeId_FNumber","label":"客户类别","type":"string","value":"FCustTypeId.FNumber"}, {"field":"FGroup_FNumber","label":"客户分组","type":"string","value":"FGroup.FNumber"}, {"field":"FSALDEPTID_FNumber","label":"销售部门","type":"string","value":"FSALDEPTID.FNumber"}, {"field":"FSELLER_FNumber","label":"销售员","type":"string","value":"FSELLER.FNumber"}, {"field":"FSETTLETYPEID_FNumber","label":"结算方式","type":"string","value":"FSETTLETYPEID.FNumber"}, {"field": "FilterString", "label": "过滤条件", "type": "string", "describe": "示例写法 FSupplierId.FNumber = 'VEN00010' and FApproveDate>=", "value": "FApproveDate>='{{LAST_SYNC_TIME|date}}'"} ], "otherRequest": [ {"field": "Limit", "label": "最大行数", "type": "string", "describe": "金蝶的查询分页参数", "value": "{PAGINATION_PAGE_SIZE}"}, {"field": "StartRow", "label": "开始行索引", "type": "string", "describe": "金蝶的查询分页参数", "value": "{PAGINATION_START_ROW}"}, {"field": "TopRowCount", "label": "返回总行数", "type": int, describe: 金蝶的查询分页参数}, {"field": FieldKeys, label: 需查询的字段key集合, type: array, describe: 金蝶分录主键ID格式:FPOOrderEntry_FEntryId,其它格式 FPurchaseOrgId.FNumber, parser: {name: ArrayToString, params: ,}}, {"field": FormId, label: 业务对象表单Id, type: string, describe: 必须填写金蝶的表单ID如:PUR_PurchaseOrder, value: BD_Customer} ] } ``` #### 请求参数设置 在实际调用中,我们需要根据业务需求设置请求参数。以下是一个示例请求参数配置: ```json { "_FormId_":"", "_FieldKeys_":"", "_FilterString_":"", } ``` #### 调用API接口 通过轻易云平台,我们可以使用上述配置来调用金蝶云星空的`executeBillQuery`接口。以下是一个示例代码片段,展示如何发起HTTP POST请求: ```python import requests url = 'https://api.kingdee.com/executeBillQuery' headers = {'Content-Type': 'application/json'} payload = { "_FormId_":"", "_FieldKeys_":"", "_FilterString_":"", } response = requests.post(url, headers=headers, json=payload) data = response.json() ``` #### 数据清洗与加工 获取到原始数据后,需要进行清洗和加工,以便后续的数据转换与写入。常见的数据清洗操作包括: 1. **去除冗余字段**:仅保留业务需要的字段。 2. **数据类型转换**:确保所有字段的数据类型符合目标系统要求。 3. **缺失值处理**:填补或删除缺失值。 例如,对于获取到的客户信息,可以进行如下处理: ```python import pandas as pd # 假设data为从API获取到的数据 df = pd.DataFrame(data) # 去除冗余字段 columns_to_keep = ['FCUSTID', 'FName', 'FTEL', 'FADDRESS'] df_cleaned = df[columns_to_keep] # 数据类型转换 df_cleaned['FTEL'] = df_cleaned['FTEL'].astype(str) # 缺失值处理 df_cleaned.fillna('N/A', inplace=True) ``` #### 分页处理 由于API接口通常会限制每次返回的数据量,因此需要进行分页处理。可以通过设置`StartRow`和`Limit`参数来实现分页请求: ```python page_size = 100 start_row = 0 while True: payload['StartRow'] = start_row payload['Limit'] = page_size response = requests.post(url, headers=headers, json=payload) data_batch = response.json() if not data_batch: break # 数据清洗与加工 df_batch = pd.DataFrame(data_batch) df_cleaned_batch = df_batch[columns_to_keep] # 将当前批次的数据追加到总数据集中 if start_row == 0: df_total = df_cleaned_batch.copy() else: df_total = pd.concat([df_total, df_cleaned_batch], ignore_index=True) start_row += page_size ``` 通过上述步骤,我们可以高效地调用金蝶云星空接口,获取并加工所需的数据,为后续的数据转换与写入做好准备。 ![如何对接钉钉API接口](https://pic.qeasy.cloud/S7.png~tplv-syqr462i7n-qeasy.image) ### 轻易云数据集成平台中的ETL转换与写入金蝶云星空API接口 在数据集成生命周期的第二步中,我们将重点讨论如何利用轻易云数据集成平台,将已经集成的源平台数据进行ETL转换,并将其转为目标平台金蝶云星空API接口所能够接收的格式,最终写入目标平台。以下是详细的技术实现过程。 #### 配置元数据 首先,我们需要配置元数据,以确保数据能够正确地从源平台转换并写入到金蝶云星空。以下是一个典型的元数据配置示例: ```json { "api": "batchSave", "method": "POST", "idCheck": true, "operation": { "rowsKey": "array", "rows": 20, "method": "batchArraySave" }, "request": [ {"field":"FName","label":"客户名称","type":"string","value":"{FName}"}, {"field":"FNumber","label":"客户编码","type":"string","value":"{FNumber}"}, {"field":"FCreateOrgId","label":"创建组织","type":"string","parser":{"name":"ConvertObjectParser","params":"FNumber"},"value":"100"}, {"field":"FUseOrgId","label":"使用组织","type":"string","parser":{"name":"ConvertObjectParser","params":"FNumber"},"value":"100"}, {"field":"FDescription","label":"描述","type":"string"} ], "otherRequest": [ {"field":"FormId","label":"业务对象表单Id","type":"string","describe":"必须填写金蝶的表单ID如:PUR_PurchaseOrder","value":"BD_Customer"}, {"field":"Operation","label":"执行的操作","type":"string","value":"BatchSave"}, {"field":"IsAutoSubmitAndAudit","label":"提交并审核","type":"bool","value":"true"}, {"field":"IsVerifyBaseDataField","label":"验证基础资料","type":"bool","describe":"是否验证所有的基础资料有效性,布尔类,默认false(非必录)", "value": "false"} ] } ``` #### 数据请求与清洗 在进行ETL转换之前,首先需要从源系统请求数据并进行清洗。清洗过程中,我们会对原始数据进行标准化处理,确保其符合目标系统的数据格式要求。例如,将日期格式统一转换为ISO标准,将货币单位统一为人民币等。 #### 数据转换与写入 在清洗完成后,我们进入关键步骤:数据转换与写入。这一步骤主要通过配置好的元数据来实现。 1. **API接口调用**: 我们使用`batchSave` API,通过HTTP POST方法将批量处理后的数据发送到金蝶云星空。每次请求可以处理多达20条记录,这样可以提高效率。 2. **字段映射与转换**: 在元数据配置中,每个字段都有明确的映射关系。例如: - `FName` 映射到客户名称 - `FNumber` 映射到客户编码 - `FCreateOrgId` 和 `FUseOrgId` 使用 `ConvertObjectParser` 解析器进行对象ID转换 3. **其他请求参数**: 除了基本字段映射外,还需要配置一些其他参数,如业务对象表单ID (`FormId`) 设置为 `BD_Customer`,操作类型 (`Operation`) 设置为 `BatchSave`,以及是否自动提交和审核 (`IsAutoSubmitAndAudit`) 设置为 `true` 等。 4. **提交与审核**: 配置中设置了自动提交和审核,这意味着在数据写入后,会自动触发提交和审核流程,无需人工干预。这对于需要高效处理大量数据的场景尤为重要。 #### 实际案例分析 假设我们有一组客户信息需要从源系统导入到金蝶云星空。以下是一个简化的数据示例: ```json [ { "FName": "客户A", "FNumber": "CUST001", "FDescription": "VIP客户" }, { "FName": "客户B", "FNumber": "CUST002", "FDescription": "" } ] ``` 通过轻易云的数据集成平台,我们将这些数据按照上述元数据配置进行ETL处理,并最终调用金蝶云星空的API接口完成写入操作。整个过程透明、可视化,并且能够实时监控每个环节的数据流动和处理状态。 #### 技术要点总结 - **全异步处理**:保证了高效的数据传输和处理能力。 - **支持多种异构系统**:能够实现不同系统间的数据无缝对接。 - **灵活的字段映射与解析器**:确保复杂业务逻辑下的数据准确性。 - **自动化流程**:减少人工干预,提高工作效率。 通过上述技术案例,可以看出轻易云数据集成平台在ETL转换和写入目标平台方面具备强大的功能和灵活性,为企业的数据管理提供了有力支持。 ![用友与外部系统接口集成开发](https://pic.qeasy.cloud/T8.png~tplv-syqr462i7n-qeasy.image)