API集成与数据转换:实现金蝶云与MySQL的数据对接

  • 轻易云集成顾问-曹润
### 金蝶云星空数据集成到MySQL的技术案例分享:销售出库单 在企业日常运营中,销售出库单数据的高效管理是确保业务顺畅运行的重要环节。金蝶云星空作为主流的企业资源计划(ERP)系统,其与MySQL数据库之间的数据对接成为许多企业实现信息整合和优化流程的重要手段。本文将详细介绍通过API接口executeBillQuery从金蝶云星空获取销售出库单数据,并使用MySQL API execute方法将大量数据快速写入MySQL数据库。 #### 一、API接口调用及其配置 1. **如何调用金蝶云星空接口executeBillQuery** - *参数配置*: 在进行API调用前,需要合理设置请求参数,如账套ID、日期范围等,以确保获取所需的销售出库单数据。 - *分页处理*: 为了应对大批量查询带来的性能瓶颈,制定分页策略,通过循环递归方式逐页获取数据,并注意结合限流处理机制以避免服务器过载。 2. **定时可靠地抓取金蝶云星空接口数据** - 配置定时任务:利用平台自带的调度工具或外部脚本设定任务执行时间,在指定周期内自动发起请求,从而保证最新数据及时更新。 - 异常检测与重试机制:针对可能出现的网络异常或服务响应错误,实现异常捕获与重试机制,确保在非预期情况下依然能够顺利完成集成任务。 #### 二、问题解析及解决方案 1. **处理分页和限流问题** - 分页逻辑示例代码: ```python def fetch_sales_data(api_endpoint, params): page = 1 while True: params['page'] = page response = executeBillQuery(api_endpoint, params) if not response or 'errorMsg' in response: break # 出现错误或无更多结果时跳出循环 process_and_store(response['data']) page += 1 sleep(0.5) # 根据实际情况调整休眠时间,以防止触发限流 ``` 2. **元数据转换及存储到MySQL** - 自定义字段映射:根据需求,将从金蝶云星空获取的数据字段对照目标表结构进行转换,例如将“产品编号”映射为相应表中的“product_id”列。 - 大批量写入优化:采用事务控制和批量导入技术,使得大规模的数据写入操作更具效率和可靠性。如利用以下伪代码提升性能: ```python connection = mysql.connector.connect(host='hostname', database='db', user='user ![如何开发金蝶云星空API接口](https://pic.qeasy.cloud/D14.png~tplv-syqr462i7n-qeasy.image) ### 调用金蝶云星空接口executeBillQuery获取并加工数据 在轻易云数据集成平台中,调用源系统的API接口是数据集成生命周期的第一步。本文将深入探讨如何通过调用金蝶云星空的`executeBillQuery`接口来获取销售出库单数据,并对其进行初步加工。 #### 接口调用配置 根据提供的元数据配置,我们需要通过POST请求方式调用`executeBillQuery`接口。以下是具体的请求参数配置: - **api**: `executeBillQuery` - **method**: `POST` - **effect**: `QUERY` - **number**: `FBillNo` - **id**: `FEntity_FENTRYID` - **name**: `FBillNo` - **idCheck**: `true` #### 请求字段配置 请求字段主要包括销售出库单的相关信息,如单据编号、审批日期、单据金额、物料名称等。以下是具体的字段及其配置: ```json "request": [ {"field": "FBillNo", "label": "FBillNo", "type": "string", "value": "FBillNo"}, {"field": "FID", "label": "FID", "type": "string", "value": "FID"}, {"field": "FApproveDate", "label": "FApproveDate", "type": "string", "value": "FApproveDate"}, {"field": "FBillAmount", "label": "FBillAmount", "type": "string", "value": "FBillAmount"}, {"field": "FMaterialName", "label": "FMaterialName", "type": "string", "value": "FMaterialName"}, {"field": "FMaterialId.fnumber", "label": "FMaterialId.fnumber", "type": "string", "value":"FMaterialId.fnumber"} ] ``` #### 其他请求参数 为了实现分页查询和过滤条件,我们还需要配置其他请求参数: ```json "otherRequest":[ {"field":"Limit","label":"最大行数","type":"string","describe":"金蝶的查询分页参数","value":"1000"}, {"field":"StartRow","label":"开始行索引","type":"string","describe":"金蝶的查询分页参数"}, {"field":"TopRowCount","label":"返回总行数","type":"string","describe":"金蝶的查询分页参数","value":"1000"}, {"field":"FilterString","label":"过滤条件","type":"string","describe": "示例写法 FSupplierId.FNumber = 'VEN00010' and FApproveDate>=","value": "YEAR(FCreateDate)=2024"}, {"field":"FieldKeys","label":"需查询的字段key集合","type":"string", "describe":"金蝶分录主键ID格式:FPOOrderEntry_FEntryId,其它格式 FPurchaseOrgId.FNumber"}, {"field":"FormId","label":"业务对象表单Id","type":"string", "describe":"必须填写金蝶的表单ID如:PUR_PurchaseOrder","value": "SAL_OUTSTOCK"} ] ``` #### 数据请求与清洗 在完成上述配置后,通过轻易云平台发起API请求,获取到原始数据。接下来,我们需要对这些数据进行清洗和初步加工,以便后续的数据转换与写入阶段。 1. **数据校验**:首先检查返回的数据是否包含所有必需字段,并确保每个字段的数据类型符合预期。 2. **去重处理**:根据`FBillNo`和`FEntity_FENTRYID`进行去重,确保每条记录唯一。 3. **格式转换**:将日期字段(如`FApproveDate`)转换为标准日期格式,金额字段(如`FBillAmount`)转换为数值类型。 #### 示例代码 以下是一个简单的Python示例代码,用于发起API请求并处理返回的数据: ```python import requests import json # API URL url = 'https://api.kingdee.com/executeBillQuery' # 请求头 headers = { 'Content-Type': 'application/json' } # 请求体 payload = { 'FormId': 'SAL_OUTSTOCK', 'FieldKeys': 'FBillNo,FID,FApproveDate,FBillAmount,FMaterialName,FMaterialId.fnumber', 'FilterString': 'YEAR(FCreateDate)=2024', 'Limit': 1000, 'StartRow': 0, } # 发起POST请求 response = requests.post(url, headers=headers, data=json.dumps(payload)) # 检查响应状态码 if response.status_code == 200: data = response.json() # 数据清洗与处理 cleaned_data = [] for record in data: cleaned_record = { 'FBillNo': record['FBillNo'], 'FID': record['FID'], 'FApproveDate': record['FApproveDate'], 'FBillAmount': float(record['FBillAmount']), 'FMaterialName': record['FMaterialName'], 'FMaterialId_fnumber': record['FMaterialId.fnumber'] } cleaned_data.append(cleaned_record) # 输出清洗后的数据 print(json.dumps(cleaned_data, indent=4)) else: print(f"Error: {response.status_code}") ``` 通过上述步骤,我们成功调用了金蝶云星空的`executeBillQuery`接口,并对返回的数据进行了初步加工。这为后续的数据转换与写入奠定了基础。在整个过程中,轻易云平台提供了全透明可视化操作界面,使得每个环节都清晰易懂,大大提升了业务透明度和效率。 ![金蝶与CRM系统接口开发配置](https://pic.qeasy.cloud/S28.png~tplv-syqr462i7n-qeasy.image) ### 数据集成生命周期第二步:ETL转换与写入MySQL 在数据集成生命周期的第二步中,我们将已经集成的源平台数据进行ETL(提取、转换、加载)转换,最终写入目标平台MySQL。本文将深入探讨这一过程中涉及的API接口和数据集成特性,结合具体的元数据配置进行详细解析。 #### API接口配置与请求参数 在进行ETL转换时,首先需要配置API接口。根据提供的元数据配置,我们使用的是`execute` API,该API通过POST方法执行,主要用于执行SQL语句。以下是具体的请求参数: ```json { "api": "execute", "effect": "EXECUTE", "method": "POST", "idCheck": true, "request": [ {"field": "main_params", "label": "main_params", "type": "object", "describe": "111", "value": "1"}, {"field": "field_1", "label": "field_1", "type": "string", "describe": "111", "value": "1"}, {"field": "field_2", "label": "field_2", "type": "string", "describe": "111", "value": "1"}, {"field": "field_3", "label": "field_3", "type": "string", "describe":"111","value":"1"}, {"field":"field_4","label":"field_4","type":"string","describe":"111","value":"1"}, {"field":"field_5","label":"field_5","type":"string","describe":"111","value":"1"}, {"field":"extend_params_1","label":"extend_params_1","type":"object","describe":"111","value":"1"}, {"field":"lastInsertId","label":"lastInsertId","type":"string","describe":"111","value":":lastInsertId"}, {"field":"extend_params_2","label":"extend_params_2","type":"array","describe":"111","value":"1"} ], ... } ``` 这些请求参数定义了需要传递的数据字段及其类型,包括字符串类型(`string`)、对象类型(`object`)和数组类型(`array`)。特别注意的是`lastInsertId`字段,它用于存储插入操作后生成的自增ID,以便在后续操作中引用。 #### SQL语句配置 元数据配置中还包含了三条SQL语句,用于插入不同的数据表。这些SQL语句分别是: ```json { ... { "otherRequest":[ {"field":"main_sql","label":"main_sql","type":"string","describe":"111", "value": "INSERT INTO table_name ( field_1, field_2, field_3, field_4, field_5 ) VALUES ( :field_1, :field_2, :field_3, :field_4, :field_5 )" }, {"field":"extend_sql_1","label":"extend_sql_1","type":"string", "describe": "111", "value": "INSERT INTO table_name ( parent_id, field_1, field_2 ) VALUES ( :lastInsertId, :field_1, :field_2 )" }, {"field": "extend_sql_2", "label": "extend_sql_2", "type": "string", "describe": "111", "value": "INSERT INTO table_name ( parent_id, field_1 ) VALUES ( :lastInsertId, :field_1)" } ] } } ``` 这些SQL语句分别用于插入主表和扩展表的数据。主表插入操作完成后,会生成一个自增ID,通过`:lastInsertId`引用该ID,在后续扩展表插入操作中作为外键使用。 #### 数据转换与写入过程 在实际操作中,ETL过程可以分为以下几个步骤: **提取(Extract):** 从源平台提取所需数据,这些数据通常以JSON格式存储,并通过API接口获取。 **转换(Transform):** 根据目标平台的需求,对提取的数据进行格式转换。例如,将日期格式统一、字段名称映射等。在此过程中,可以利用轻易云数据集成平台提供的可视化工具,对数据进行预处理和清洗。 **加载(Load):** 将转换后的数据通过配置好的API接口和SQL语句写入目标平台MySQL。具体步骤如下: - 执行主表插入操作,获取自增ID。 - 使用自增ID执行扩展表插入操作。 以下是一个简化的示例代码片段,展示如何通过API接口执行上述操作: ```python import requests # 定义请求头和URL headers = {'Content-Type': 'application/json'} url = 'http://example.com/api/execute' # 定义请求体 payload = { 'main_params': {...}, 'fields': { 'main_sql': 'INSERT INTO table_name ( field_1, field_2, field_3, field_4, field_5 ) VALUES ( :field_1, :field_2, :field_{3}, :{4}, {5} )', 'extend_sqls': [ 'INSERT INTO table_name ( parent_id, field_{6}, {7} ) VALUES ( {8}, {9}, {10} )', 'INSERT INTO table_name ( parent_id,{11}) VALUES ({12},{13})' ] } } # 发起POST请求 response = requests.post(url, headers=headers, json=payload) # 检查响应状态 if response.status_code == 200: print('Data inserted successfully') else: print('Failed to insert data') ``` 以上代码通过Python语言示范了如何发起POST请求,将转换后的数据写入MySQL数据库。在实际应用中,可以根据具体需求调整请求体内容和处理逻辑。 #### 总结 本文深入探讨了轻易云数据集成平台生命周期第二步中的ETL转换与写入过程,重点解析了API接口配置、SQL语句配置及实际操作步骤。通过合理利用元数据配置,可以高效地实现不同系统间的数据无缝对接,提高业务透明度和效率。 ![钉钉与MES系统接口开发配置](https://pic.qeasy.cloud/T9.png~tplv-syqr462i7n-qeasy.image)