从API调取到数据清洗:金蝶云星空与轻易云的数据集成实践

  • 轻易云集成顾问-吕修远
### 委外退料-其他入库单数据集成案例分析 在企业运营过程中,高效的数据集成对于保持系统间的业务同步至关重要。在此次技术分享中,我们将聚焦于“委外退料-其他入库单”的实际运行方案,探讨如何将金蝶云星空中的数据无缝对接并写入到另一个金蝶云星空实例中。 此方案通过调用executeBillQuery API获取源系统中的委外退料单据,然后使用batchSave API将该数据批量写入目标系统。这个过程需要考虑多项技术细节和挑战,包括高吞吐量的数据写入能力、实时监控与日志记录以及接口限流处理等。 首先,为了确保大量数据能够快速而可靠地被集成,平台必须支持高吞吐量的数据传输。在具体操作中,通过配置合理的批次大小和并行度,可以极大提升处理效率,并减少等待时间。同时,为应对可能的接口限流问题,我们采用重试机制以提高任务成功率,这样可以避免由于瞬时负载过高导致的数据丢失或提交失败。 此外,在实施过程中,数据质量监控显得尤为重要。实时跟踪每个集成任务的状态,不仅可以及时发现异常,还能通过告警通知相关人员进行迅速处置。这一点在防止漏单方面尤其关键,因为任何一个环节出现问题,都可能导致后续流程受到影响。因此,实现全面且透明的数据质量监管,是确保业务连续性的重要保障。 自定义数据转换逻辑也是本项目的一大亮点。由于不同环境下的业务需求和数据结构存在差异,需要灵活调整映射关系。本案例使用了可视化的数据流设计工具,使得这些复杂步骤变得更加直观,同时也便于管理和维护。最终,通过准确执行定制化映射规则,实现了两套金蝶云星空系统之间数据格式的一致性,对接精度达到预期要求。 在整个过程中,统一的API资产管理功能帮助我们有效掌握了各类接口调用情况,从而进一步优化资源配置,提高整体效率。在即将展开的详细方案部分,将继续深入解析每个技术要点及其实现方式,以便更好地解答各种潜在疑问与挑战。 ![金蝶与SCM系统接口开发配置](https://pic.qeasy.cloud/D33.png~tplv-syqr462i7n-qeasy.image) ### 调用金蝶云星空接口executeBillQuery获取并加工数据 在数据集成过程中,调用源系统的API接口是关键的一步。本文将详细探讨如何使用轻易云数据集成平台调用金蝶云星空的`executeBillQuery`接口,以获取并加工委外退料-其他入库单的数据。 #### 接口配置与请求参数 首先,我们需要配置`executeBillQuery`接口的元数据。以下是该接口的主要配置项: - **API**: `executeBillQuery` - **Method**: `POST` - **FormId**: `SUB_RETURNMTRL` - **Pagination**: 支持分页,每页500条记录 - **FieldKeys**: 需查询的字段集合 - **FilterString**: 过滤条件,示例为`FApproveDate>='{{LAST_SYNC_TIME|date}}'` 以下是请求参数的具体配置: ```json { "api": "executeBillQuery", "method": "POST", "number": "FBillNo", "id": "FEntity_FEntryID", "pagination": { "pageSize": 500 }, "idCheck": true, "request": [ {"field":"FID","label":"实体主键","type":"string","describe":"实体主键","value":"FID"}, {"field":"FBillNo","label":"单据编号","type":"string","describe":"单据编号","value":"FBillNo"}, {"field":"FDocumentStatus","label":"单据状态","type":"string","describe":"单据状态","value":"FDocumentStatus"}, {"field":"FApproverId","label":"审核人","type":"string","describe":"审核人","value":"FApproverId"}, {"field":"FApproveDate","label":"审核日期","type":"string","describe":"审核日期","value":"FApproveDate"}, {"field":"FModifierId","label":"修改人","type":"string","describe":"修改人","value":"FModifierId"}, {"field":"FCreateDate","label":"创建日期","type":"string","describe":"创建日期","value":"FCreateDate"}, {"field":"FCreatorId","label":"创建人","type":"string","describe":"创建人","value":"FCreatorId"}, {"field":"FModifyDate","label":"修改日期","type":"string","describe":"修改日期","value":"FModifyDate"}, {"field":...} ], "otherRequest": [ {"field": "Limit", "label": "最大行数", "type": "string", "describe": "金蝶的查询分页参数", "value": "{PAGINATION_PAGE_SIZE}"}, {"field": "StartRow", "label": "开始行索引", "type": "string", "describe": "金蝶的查询分页参数", "value": "{PAGINATION_START_ROW}"}, {"field":...} ] } ``` #### 数据请求与清洗 在调用API时,我们需要确保请求参数正确无误,并且能够处理分页返回的数据。以下是一个示例代码片段,用于发送请求并处理返回的数据: ```python import requests def fetch_data(api_url, headers, payload): response = requests.post(api_url, headers=headers, json=payload) if response.status_code == 200: return response.json() else: raise Exception(f"API request failed with status code {response.status_code}") # 配置API URL和Headers api_url = 'https://api.kingdee.com/executeBillQuery' headers = {'Content-Type': 'application/json'} # 配置Payload payload = { 'FormId': 'SUB_RETURNMTRL', 'FieldKeys': 'FID,FBillNo,FDocumentStatus,FApproverId,FApproveDate,...', 'FilterString': f"FApproveDate>='{last_sync_time}'", 'Limit': 500, 'StartRow': start_row } # 获取数据 data = fetch_data(api_url, headers, payload) # 数据清洗与转换 cleaned_data = [] for record in data: cleaned_record = { '实体主键': record[0], '单据编号': record[1], '单据状态': record[2], ... } cleaned_data.append(cleaned_record) ``` #### 数据转换与写入 在获取并清洗数据后,我们需要将其转换为目标系统所需的格式,并写入目标数据库或系统。以下是一个示例代码片段,用于将清洗后的数据写入目标数据库: ```python import pymysql def write_to_db(data, db_config): connection = pymysql.connect(**db_config) cursor = connection.cursor() insert_query = """ INSERT INTO target_table (实体主键, 单据编号, 单据状态, 审核人, 审核日期, ...) VALUES (%s, %s, %s, %s, %s, ...) """ for record in data: cursor.execute(insert_query, ( record['实体主键'], record['单据编号'], record['单据状态'], record['审核人'], record['审核日期'], ... )) connection.commit() cursor.close() connection.close() # 数据库配置 db_config = { 'host': 'localhost', 'user': 'user', 'password': 'password', 'database': 'target_db' } # 写入数据库 write_to_db(cleaned_data, db_config) ``` 通过上述步骤,我们完成了从金蝶云星空获取委外退料-其他入库单数据,并将其清洗、转换后写入目标系统的全过程。这一过程不仅确保了数据的一致性和完整性,也提升了业务流程的自动化程度和效率。 ![数据集成平台可视化配置API接口](https://pic.qeasy.cloud/S4.png~tplv-syqr462i7n-qeasy.image) ### 使用轻易云数据集成平台将源数据转换并写入金蝶云星空API接口 在数据集成生命周期的第二步中,我们需要将已经集成的源平台数据进行ETL转换,转为目标平台金蝶云星空API接口所能够接收的格式,并最终写入目标平台。本文将详细探讨如何使用轻易云数据集成平台完成这一过程,重点介绍API接口相关的技术细节和元数据配置。 #### 元数据配置解析 首先,我们来看一下元数据配置,它定义了如何将源数据映射到金蝶云星空API接口所需的数据格式。 ```json { "api": "batchSave", "method": "POST", "idCheck": true, "operation": { "method": "merge", "field": "FBillNo,Fdate", "bodyName": "details", "bodySum": ["FQty"], "header": ["FBillNo", "Fdate", "FStockOrgId_FNumber"], "body": ["FMaterialId", "FQty", "FStockOrgId_FNumber"] }, ... } ``` 该配置文件定义了以下几个关键点: 1. **API和方法**:使用`batchSave` API,通过POST方法提交。 2. **ID检查**:启用`idCheck`,确保每个记录都有唯一标识。 3. **操作类型**:使用`merge`方法,根据`FBillNo`和`Fdate`字段合并记录。 4. **主体结构**:定义了头部(header)和明细(body)的字段映射关系。 #### 请求参数解析 接下来,我们深入解析请求参数的设置: ```json { "field": "FBillNo", "label": "单据编号", ... } ``` 每个字段都有详细的定义,包括字段名、标签、类型、描述和取值方式。例如,`FBillNo`字段表示单据编号,其值来自源数据中的相应字段。 ```json { "field": "FStockOrgId", ... "parser": { "name": "ConvertObjectParser", ... }, ... } ``` 对于复杂的数据类型,如库存组织ID(`FStockOrgId`),我们使用了一个转换解析器(ConvertObjectParser)来处理。这种解析器可以根据指定参数(如`FNumber`)将源数据转换为目标格式。 #### 明细信息配置 明细信息部分是整个请求的重要组成部分,它包含了具体的物料信息和数量等详细内容: ```json { "field": "FEntity", ... "children": [ { ... { "field": "FMATERIALID", ... "parser": { ... }, ... }, { ... } ] ] } ``` 每个子字段(如物料编码、实收数量等)都需要对应到源数据中的具体字段,并通过相应的解析器进行转换。 #### 操作参数设置 最后,我们来看一下其他操作参数的设置: ```json { ... { "field": "FormId", ... "value": "STK_MISCELLANEOUS" }, { ... { ... } ] } ``` 这些参数包括业务对象表单ID(FormId)、是否验证基础资料有效性(IsVerifyBaseDataField)、执行操作类型(Operation)以及是否自动提交并审核(IsAutoSubmitAndAudit)。这些设置确保了在调用API时能够正确执行预期操作。 #### 实际应用案例 假设我们有一条源数据记录如下: ```json { FBillNo: '20231001', Fdate: '2023-10-01', FStockOrgId_FNumber: 'ORG001', details: [ { FMaterialId: 'MAT001', FQty: '100' } ] } ``` 根据上述元数据配置,这条记录会被转换为以下格式,并通过调用金蝶云星空的batchSave API进行保存: ```json { FormId: 'STK_MISCELLANEOUS', IsVerifyBaseDataField: false, Operation: 'Save', IsAutoSubmitAndAudit: false, Model: { FBillNo: '20231001', FDate: '2023-10-01', FStockOrgId: { FNumber: 'ORG001' }, FEntity: [ { FMATERIALID: { FNumber: 'MAT001' }, FQty: '100' } ] } } ``` 通过这种方式,我们实现了从源平台到目标平台的数据无缝对接,确保了数据的一致性和完整性。 ![用友BIP接口开发配置](https://pic.qeasy.cloud/T7.png~tplv-syqr462i7n-qeasy.image)