从数据抓取到MySQL写入:完整的数据集成解决方案

  • 轻易云集成顾问-谢楷斌
### 金蝶云星空数据集成到MySQL的技术案例分享:MOM销售订单状态刷新 在企业信息系统整合过程中,如何高效、安全地实现跨平台的数据同步已成为一个重要课题。本文将通过具体的实施案例——"MOM销售订单状态刷新",详细介绍金蝶云星空系统与MySQL数据库之间的数据集成解决方案。 #### 数据抓取与API调用 首先,我们需要从金蝶云星空中获取最新的销售订单状态。这一步骤采用其提供的`executeBillQuery`接口进行数据抓取,该接口具有分页功能,可以有效处理大批量数据。为了确保数据完整性及不漏单,我们设定了一个可靠的定时任务,每隔固定时间段执行一次查询操作,并对每次查询结果进行校验和记录。 ```python # 调用executeBillQuery API示例 import requests def get_sales_order_status(api_url, params): response = requests.post(api_url, json=params) response.raise_for_status() return response.json() api_url = "https://your-kingdee-api-endpoint.com/executeBillQuery" params = { # 查询参数设置 } sales_order_data = get_sales_order_status(api_url, params) ``` #### 数据转换与格式处理 由于不同系统中的数据结构可能存在差异,需要对获取到的数据进行转换处理。例如,在金蝶云星空中,日期字段可能以字符串形式表示,而在MySQL数据库中则要求严格遵循日期格式规范。我们通过自定义的数据转换逻辑,实现了相应字段类型的一致性映射。 ```python from datetime import datetime def transform_data(raw_data): transformed_data = [] for entry in raw_data: trans_entry = { 'order_id': entry['OrderID'], 'status': entry['Status'], 'update_time': datetime.strptime(entry['UpdateTime'], '%Y-%m-%dT%H:%M:%S') } transformed_data.append(trans_entry) return transformed_data transformed_sales_order_data = transform_data(sales_order_data) ``` #### 高吞吐量写入与异常重试机制 针对大量订单状态更新需求,MySQL支持高吞吐量的数据写入能力,使得所有新获取并转换后的订单状态能够迅速存储至数据库。此外,为保证整个过程中的稳定性和可靠性,配置了一套完善的异常处理和重试机制,如果某一批次出现错误,会自动触发重试操作,同时保留详细日志便于后续分析和排查问题。 ```python import mysql.connector from mysql.connector import errorcode def insert_into_mysql(data_batch): try: cnx = mysql.connector.connect(user='username', password='password', host='127.0.0.1', ![企业微信与OA系统接口开发配置](https://pic.qeasy.cloud/D16.png~tplv-syqr462i7n-qeasy.image) ### 调用金蝶云星空接口executeBillQuery获取并加工数据 在数据集成过程中,调用源系统接口是关键的一步。本文将详细介绍如何通过轻易云数据集成平台调用金蝶云星空的`executeBillQuery`接口,以获取并加工销售订单状态数据。 #### 接口配置与请求参数 首先,我们需要配置接口的元数据,以便正确调用金蝶云星空的API。以下是元数据配置的详细信息: ```json { "api": "executeBillQuery", "effect": "QUERY", "method": "POST", "id": "FSaleOrderEntry_FEntryID", "name": "FBillNo", "idCheck": true, "request": [ {"field":"FSaleOrderEntry_FEntryID","label":"FSaleOrderEntry_FEntryID","type":"string","describe":"FSaleOrderEntry_FEntryID","value":"FSaleOrderEntry_FEntryID"}, {"field":"FID","label":"FID","type":"string","describe":"FID","value":"FID"}, {"field":"FBillNo","label":"单据编号","type":"string","describe":"单据编号","value":"FBillNo"}, {"field":"FDocumentStatus","label":"单据状态","type":"string","describe":"单据状态","value":"FDocumentStatus"}, {"field":"FSaleOrgId_FNumber","label":"销售组织","type":"string","describe":"销售组织","value":"FSaleOrgId.FNumber"}, {"field":"FDate","label":"日期","type":"string","describe":"日期","value":"FDate"}, {"field":"FCustId_FNumber","label":"客户","type":"string","describe":"客户","value":"FCustId.FNumber"}, {"field":"FSaleDeptId_Fnumber","label":"销售部门","type":"string","describe":"销售部门","value":"FSaleDeptId.Fnumber"}, {"field":"FReceiveAddress","label":"收货地址","type":"string","describe":"收货地址","value":"FReceiveAddress"}, {"field":...} ], "otherRequest": [ {"field": "Limit", "label": "最大行数", "type": "string", "describe": "金蝶的查询分页参数", "value": "2000"}, {"field": "StartRow", "label": "开始行索引", "type": "string", "describe": "金蝶的查询分页参数", "value": "{PAGINATION_START_ROW}"}, {"field":...} ], "autoFillResponse": true } ``` #### 请求示例 在实际操作中,我们需要构建一个HTTP POST请求,发送到金蝶云星空的API端点。以下是一个示例请求体: ```json { "FormId": "SAL_SaleOrder", "FieldKeys": [ ... // 列出所有需要查询的字段key ], "FilterString": "(FSupplyOrgId.fnumber in ('T02','T02.01') and FDocumentStatus='C' and FApproveDate>='2023-10-01')", ... } ``` #### 数据处理与清洗 获取到原始数据后,需要对其进行清洗和转换,以便后续的数据写入和使用。以下是一些常见的数据清洗步骤: 1. **字段映射**:将API返回的数据字段映射到目标系统所需的字段。例如,将`FBillNo`映射为目标系统中的订单编号。 2. **数据类型转换**:确保所有字段的数据类型符合目标系统要求。例如,将字符串类型的日期转换为日期类型。 3. **过滤无效数据**:去除不符合业务规则或无效的数据记录。例如,过滤掉状态为“已关闭”的订单。 #### 示例代码 以下是一个示例代码片段,用于调用接口并处理返回的数据: ```python import requests import json # 配置请求头和URL url = 'https://api.kingdee.com/executeBillQuery' headers = {'Content-Type': 'application/json'} # 构建请求体 payload = { 'FormId': 'SAL_SaleOrder', 'FieldKeys': ['FBillNo', 'FDocumentStatus', ...], 'FilterString': "(FSupplyOrgId.fnumber in ('T02','T02.01') and FDocumentStatus='C' and FApproveDate>='2023-10-01')", } # 发起POST请求 response = requests.post(url, headers=headers, data=json.dumps(payload)) # 检查响应状态码 if response.status_code == 200: data = response.json() # 数据清洗与转换 cleaned_data = [] for record in data: cleaned_record = { '订单编号': record['FBillNo'], '单据状态': record['FDocumentStatus'], ... } cleaned_data.append(cleaned_record) # 后续处理逻辑,如写入数据库或其他系统 else: print(f"请求失败,状态码: {response.status_code}") ``` 通过上述步骤,我们可以高效地从金蝶云星空获取所需的销售订单状态数据,并进行必要的数据清洗和转换,为后续的数据处理环节打下坚实基础。 ![用友与MES系统接口开发配置](https://pic.qeasy.cloud/S20.png~tplv-syqr462i7n-qeasy.image) ### 数据集成过程中ETL转换与MySQLAPI接口写入的技术实现 在数据集成生命周期的第二步,我们需要将已经集成的源平台数据进行ETL(提取、转换、加载)转换,最终写入目标平台MySQL。本文将详细探讨这一过程中的技术细节,特别是如何通过API接口实现这一目标。 #### 数据请求与清洗 首先,我们需要从源系统提取数据并进行初步清洗。这一步骤确保了数据的准确性和一致性,为后续的转换和写入打下基础。以下是元数据配置中关于数据请求的部分: ```json { "api": "execute", "effect": "EXECUTE", "method": "POST", "idCheck": true, "request": [ { "field": "main_params", "label": "main_params", "type": "object", "describe": "111", "children": [ {"field": "SO_NUMBER", "label": "SO_NUMBER", "type": "string", "value":"{FBillNo}"}, {"field": "SO_LINE_SEQ", "label": "SO_LINE_SEQ", "type": "string", "value":"{FSaleOrderEntry_fseq}"}, {"field": "FMrpCloseStatus", "label": "FMrpCloseStatus", "type":"string","value":"{FMrpCloseStatus}"} ] } ] } ``` 在这个配置中,我们定义了一个名为`main_params`的对象,包含三个字段:`SO_NUMBER`、`SO_LINE_SEQ`和`FMrpCloseStatus`。这些字段将从源系统的数据中提取相应的值。 #### 数据转换 接下来是数据转换阶段。我们需要将提取的数据转换为目标平台MySQL能够接收的格式。这一步骤通常涉及到数据类型的转换、字段映射等操作。在我们的案例中,主要通过SQL语句来实现这一目标。 以下是元数据配置中关于SQL语句部分: ```json { "otherRequest": [ { "field": "main_sql", "label": "main_sql", "type":"string", "describe":"111", "value":"UPDATE ty_mes.mt_so_line a \nINNER JOIN ( \n SELECT SO_LINE_ID \n FROM ty_mes.mt_so_line \n WHERE so_id = ( \n SELECT so_id \n FROM ty_mes.mt_so_head \n WHERE so_number = :SO_NUMBER\n ) \n AND so_line_num =:SO_LINE_SEQ \n) AS subquery ON a.SO_LINE_ID = subquery.SO_LINE_ID \nSET a.KINGDEE_STATUS = :FMrpCloseStatus" } ] } ``` 这段SQL语句实现了以下功能: 1. 从表`ty_mes.mt_so_head`中根据销售订单号(`:SO_NUMBER`)获取对应的销售订单ID。 2. 根据销售订单ID和行号(`:SO_LINE_SEQ`)从表`ty_mes.mt_so_line`中获取对应的行ID。 3. 更新表`ty_mes.mt_so_line`中的状态字段(`KINGDEE_STATUS`),将其设置为新的状态值(`:FMrpCloseStatus`)。 #### 数据写入 最后一步是将转换后的数据写入目标平台MySQL。在我们的案例中,通过API接口以POST方法执行上述SQL语句,实现数据的最终写入。 以下是API调用示例: ```json { api: 'execute', method: 'POST', data: { main_params: { SO_NUMBER: '12345', SO_LINE_SEQ: '1', FMrpCloseStatus: 'Closed' }, main_sql: `UPDATE ty_mes.mt_so_line a INNER JOIN ( SELECT SO_LINE_ID FROM ty_mes.mt_so_line WHERE so_id = ( SELECT so_id FROM ty_mes.mt_so_head WHERE so_number = :SO_NUMBER ) AND so_line_num = :SO_LINE_SEQ ) AS subquery ON a.SO_LINE_ID = subquery.SO_LINE_ID SET a.KINGDEE_STATUS = :FMrpCloseStatus` } } ``` 通过上述步骤,我们实现了从源系统到目标平台MySQL的数据集成过程。每个环节都经过精心设计和配置,确保了数据处理的准确性和高效性。 ![钉钉与WMS系统接口开发配置](https://pic.qeasy.cloud/T12.png~tplv-syqr462i7n-qeasy.image)