使用轻易云数据集成平台实现高效数据写入与监控管理

  • 轻易云集成顾问-吕修远
### 金蝶云星空数据集成到MySQL:ZZ组装生产订单-表头-制造一处-回写id示例 在企业信息系统的建设中,实现不同系统的数据对接与集成是一个至关重要的环节。本案例分享了如何通过轻易云数据集成平台,将金蝶云星空的数据高效、安全地同步到MySQL数据库,具体展示“ZZ组装生产订单-表头-制造一处-回写id示例”的实施过程。 #### 1. 接口调用与数据获取 首先,需要从金蝶云星空系统中获取所需的业务数据。我们利用金蝶提供的`executeBillQuery`API接口,对相应业务单据进行查询。实际操作过程中,通过特定参数配置,可以精准定位需要处理的订单信息。 ```json { "formId": "PRD_MO", "fieldKeys": ["FOrderID", "FMaterialID", "FQty"], "filterString": "FBillNo = 'MO0001'" } ``` 该接口返回格式为JSON对象,我们需要解析这些内容并准备将其映射到MySQL数据库中的对应表结构上。在这里,特别关注分页和限流问题,以确保能够完整而高效地抓取大量生产订单记录,不会因单次请求量过大导致性能瓶颈。 #### 2. 数据转换与映射 通过自定义转换逻辑,我们将获得的数据整理后适配至目标库MySQL 的字段要求。这一步尤为关键,因为不同系统之间的数据格式差异直接影响最终写入效果。例如: ```sql INSERT INTO zz_production_order (order_id, material_id, quantity) VALUES (?, ?, ?); ``` 在转化过程中,应当处理好可能出现的不一致性,如时间戳、数值精度等。同时,也要考虑异常情况下的容错机制,以保证整体任务稳定运行。 #### 3. 大批量数据写入与监控管理 针对大批量持续更新场景,轻易云平台支持高吞吐量的数据写入能力,使得我们可以快速地将大量抓取到的数据同步进入 MySQL。每次批量插入都伴随着实时监控和告警机制,确保任何潜在故障被及时发现并解决。通过集中式日志管理和状态追踪功能,可以全面掌握整个过程中的关键节点及其性能表现,实现对整个生命周期内各阶段操作行为清晰透明地监控。 综上,通过应用以上技术方法,本方案顺利实现了从金蝶云星空系统向 MySQL 数据库无缝、准确、高效的数据同步,为组合型业务流程提供坚实基础。这不仅优化了资源利用,还提高了操作效率,大幅降低手动干预需求。 ![钉钉与WMS系统接口开发配置](https://pic.qeasy.cloud/D33.png~tplv-syqr462i7n-qeasy.image) ### 调用金蝶云星空接口executeBillQuery获取并加工数据 在数据集成的生命周期中,调用源系统接口是至关重要的一步。本文将深入探讨如何通过轻易云数据集成平台调用金蝶云星空的`executeBillQuery`接口来获取并加工数据。 #### 接口配置与请求参数 首先,我们需要了解`executeBillQuery`接口的基本配置和请求参数。根据提供的元数据配置,接口采用POST方法进行调用,主要参数如下: - **api**: `executeBillQuery` - **effect**: `QUERY` - **method**: `POST` - **id**: `FID` - **name**: `FBillNo` - **idCheck**: `true` 请求参数包括实体主键、单据编号、审核日期等字段。以下是具体的请求字段及其描述: ```json [ {"field":"FID","label":"实体主键","type":"string","describe":"实体主键","value":"FID"}, {"field":"FBillNo","label":"单据编号","type":"string","describe":"单据编号","value":"FBillNo"}, {"field":"FAPPROVEDATE","label":"审核日期","type":"string","describe":"审核日期","value":"FAPPROVEDATE"}, {"field":"FDocumentStatus","label":"单据状态","type":"string","value":"FDocumentStatus"}, {"field":"FBillType","label":"单据类型","type":"string","value":"FBillType.FNumber"}, {"field":"FPrdOrgId","label":"生产组织","type":"string","value":"FPrdOrgId.FNumber"}, {"field":"FDate","label":"单据日期","type":"string","value":"FDate"}, {"field":"FOwnerTypeId","label":"货主类型","type":"string","value":"FOwnerTypeId"}, {"field":"FIsRework","label":"是否返工","type":"string","value":"FIsRework"}, {"field":"FApproverId","label":"审核人","type":"string","value":"FApproverId.fname"} ] ``` 此外,还有一些其他请求参数用于分页和过滤条件: ```json [ {"field": "Limit", "label": "最大行数", "type": "string", "describe": "金蝶的查询分页参数", "value": "2000"}, {"field": "StartRow", "label": "开始行索引", "type": "string", "describe": "金蝶的查询分页参数", "value": "{PAGINATION_START_ROW}"}, {"field": "TopRowCount", "label": "返回总行数", "type": "int", "describe": "金蝶的查询分页参数"}, {"field": "FilterString", "label": "过滤条件", "type": "string", "describe": "示例写法 FSupplierId.FNumber = 'VEN00010' and FApproveDate>=", "value": "FPrdOrgId.fnumber in ('T01.01','T04') and left(FBillNo,2)='MO' and F_FSYNCMOM=0 and FBillType.fnumber='SCDD01_SYS' and FApproveDate>'2023-09-10' and FDocumentStatus ='C'" }, {"field": "FieldKeys", "label": "需查询的字段key集合", "type": "array", "describe": "金蝶分录主键ID格式:FPOOrderEntry_FEntryId,其它格式 FPurchaseOrgId.FNumber", "parser": {"name": "ArrayToString", "params": "," } }, {"field": "FormId", "label": "业务对象表单Id", "type": "string", "describe": "必须填写金蝶的表单ID如:PUR_PurchaseOrder", "value": "PRD_MO" } ] ``` #### 数据请求与清洗 在实际操作中,我们需要构建一个完整的API请求,并确保所有必要字段都已正确填充。例如,以下是一个示例请求体: ```json { "FormId": "PRD_MO", "FieldKeys": ["FID", "FBillNo", ...].join(","), ... } ``` 我们可以使用轻易云平台提供的可视化界面来配置这些参数,并发起API调用。平台会自动处理响应数据,并根据配置进行初步清洗。 #### 数据转换与写入 一旦获取到原始数据,我们需要对其进行进一步处理。这可能包括数据格式转换、字段映射等操作。例如,将日期格式从字符串转换为标准日期对象,或者将某些编码字段转换为更具可读性的文本。 在轻易云平台上,可以使用内置的数据转换工具来完成这些任务。通过拖拽式操作,可以轻松定义各种转换规则,并实时预览结果。 #### 实际案例应用 假设我们需要从金蝶云星空中获取所有状态为“已审核”的生产订单,并将其导入到另一个系统中。我们可以设置如下过滤条件: ```json { ... "FilterString": { ... value: [ "(left(FBillNo,2)='MO')", "(F_FSYNCMOM=0)", "(FBillType.fnumber='SCDD01_SYS')", "(FApproveDate>'2023-09-10')", "(FDocumentStatus ='C')" ].join(" and ") }, ... } ``` 通过上述配置,我们能够精准地筛选出所需的数据,并进行后续处理。 #### 小结 本文详细介绍了如何通过轻易云数据集成平台调用金蝶云星空接口`executeBillQuery`来获取并加工数据。通过合理配置API请求参数和过滤条件,可以高效地实现数据集成,为后续的数据处理和分析奠定基础。 ![用友与SCM系统接口开发配置](https://pic.qeasy.cloud/S11.png~tplv-syqr462i7n-qeasy.image) ### 使用轻易云数据集成平台实现ETL转换并写入MySQLAPI接口 在数据集成生命周期的第二步中,我们需要将已经集成的源平台数据进行ETL转换,并转为目标平台 MySQLAPI 接口所能够接收的格式,最终写入目标平台。以下是具体的技术实现过程。 #### 元数据配置解析 首先,我们需要理解元数据配置中的各个字段及其含义。以下是提供的元数据配置: ```json { "api": "execute", "effect": "EXECUTE", "method": "POST", "idCheck": true, "request": [ { "field": "main_params", "label": "main_params", "type": "object", "describe": "111", "value": "1", "children": [ {"field": "FID", "label": "实体主键", "type": "string", "value": "{FID}"}, {"field": "FBillNo", "label": "单据编号", "type": "string", "value": "{FBillNo}"}, {"field": "FDocumentStatus", "label": "单据状态", "type": "string", "value": "{FDocumentStatus}"}, {"field": "FBillType", "label": "单据类型", "type": "string", "value": "{FBillType}"}, {"field":"FPrdOrgId","label":"生产组织","type":"string","value":"{FPrdOrgId}"}, {"field":"FDate","label":"单据日期","type":"string","value":"{FDate}"}, {"field":"FOwnerTypeId","label":"货主类型","type":"string","value":"{FOwnerTypeId}"}, {"field":"FIsRework","label":"是否返工","type":"string","value":"_function CASE '{FIsRework}' WHEN true THEN '1' ELSE '0' END"}, {"field":"PMC","label":"PMC","type":"string","value":"{FApproverId}"} ] } ], ... } ``` #### 数据请求与清洗 在这个阶段,我们已经获取了源平台的数据,并且需要对这些数据进行清洗和转换。根据元数据配置中的 `request` 部分,我们可以看到需要处理的数据字段及其对应的映射关系。 例如: - `FID` 映射到 `{FID}` - `FBillNo` 映射到 `{FBillNo}` - `FIsRework` 的值通过一个条件表达式 `_function CASE '{FIsRework}' WHEN true THEN '1' ELSE '0' END` 来确定。 #### 数据转换与写入 接下来,我们需要将清洗后的数据按照目标平台 MySQLAPI 接口所需的格式进行转换,并写入数据库。这一步主要涉及 SQL 语句的构建和 API 请求的发送。 根据元数据配置中的 `otherRequest` 部分,我们可以看到 SQL 插入语句的模板: ```sql INSERT INTO mbs_assemble (FID, FBillNo, FDocumentStatus, FBillType, FPrdOrgId, FDate, FOwnerTypeId, FIsRework, PMC) VALUES (:FID, :FBillNo, :FDocumentStatus, :FBillType, :FPrdOrgId, :FDate, :FOwnerTypeId, :FIsRework, :PMC) ``` 我们需要将清洗后的数据填充到这个模板中,并通过 POST 方法发送到 MySQLAPI 接口。 #### 实际操作步骤 1. **提取源数据**:从源平台提取原始数据。 2. **清洗与转换**:根据元数据配置,对原始数据进行清洗和转换。例如,将 `true/false` 的 `FIsRework` 转换为 `'1'/'0'`。 3. **构建SQL语句**:使用清洗后的数据填充 SQL 插入语句模板。 4. **发送API请求**:通过 POST 方法将构建好的 SQL 插入语句发送到 MySQLAPI 接口。 以下是一个示例代码片段,用于实现上述步骤: ```python import requests import json # 假设我们已经获取了源平台的数据 source_data = { 'FID': '12345', 'FBillNo': 'BILL20231001', 'FDocumentStatus': 'Approved', 'FBillType': 'Standard', 'FPrdOrgId': 'ORG001', 'FDate': '2023-10-01', 'FOwnerTypeId': 'Owner001', 'FIsRework': True, 'FApproverId': 'PMC001' } # 清洗与转换 transformed_data = { ':FID': source_data['FID'], ':FBillNo': source_data['FBillNo'], ':FDocumentStatus': source_data['FDocumentStatus'], ':FBillType': source_data['FBillType'], ':FPrdOrgId': source_data['FPrdOrgId'], ':FDate': source_data['FDate'], ':FOwnerTypeId': source_data['FOwnerTypeId'], ':FIsRework': '1' if source_data['FIsRework'] else '0', ':PMC': source_data['FApproverId'] } # 构建SQL语句 sql_query = """ INSERT INTO mbs_assemble (FID, FBillNo, FDocumentStatus, FBillType, FPrdOrgId, FDate, FOwnerTypeId, FIsRework, PMC) VALUES (:FID, :FBillNo, :FDocumentStatus, :FBillType, :FPrdOrgId, :FDate, :FOwnerTypeId,: FIsRework,: PMC) """ # 替换占位符 for key in transformed_data: sql_query = sql_query.replace(key, f"'{transformed_data[key]}'") # 发送API请求 api_url = "<MySQLAPI接口URL>" headers = {'Content-Type': 'application/json'} payload = json.dumps({"main_sql": sql_query}) response = requests.post(api_url, headers=headers, data=payload) if response.status_code == 200: print("Data inserted successfully") else: print(f"Failed to insert data: {response.text}") ``` 通过以上步骤和代码示例,我们成功地将源平台的数据进行了ETL转换,并通过MySQLAPI接口写入了目标数据库。 ![数据集成平台API接口配置](https://pic.qeasy.cloud/T22.png~tplv-syqr462i7n-qeasy.image)