ETL数据转换:从轻易云到金蝶云星空的最佳实践

  • 轻易云集成顾问-胡秀丛
### 金蝶云星空数据集成技术案例分享:查询DI与产品对应表修改物料DI 在实施金蝶云星空系统间的数据集成过程中,维持高效的数据流动和良好的数据质量是至关重要的。本文将详细介绍如何利用先进的数据集成技术,实现从一个金蝶云星空系统到另一个同类系统的无缝数据对接。本次案例采用的是具体方案“查询DI与产品对应表修改物料DI”,旨在优化和保障两套系统间的信息同步。 **背景** 本案例涉及使用金蝶云星空提供的API接口进行数据读取和写入操作,主要包括以下两个关键部分: - 数据获取接口 `executeBillQuery` - 数据写入接口 `batchSave` 通过这两个接口,我们能够完成大批量、高频次的数据交互,有效解决了跨系统信息孤岛的问题。 **技术要点** 1. **高吞吐量的数据处理能力** 系统需要处理大量业务单据,因此必须支持高吞吐量的数据写入。针对这一需求,我们设计了一种并行批处理机制,使得每个批次的数据都能快速、准确地被传输到目标系统中,确保了业务数据的实时性。 2. **集中监控与告警** 在执行过程中,通过平台提供的一体化监控工具,可以实时跟踪任务状态和性能指标。当出现异常情况时,如网络故障或API限流问题,能立即触发告警通知相关人员进行排查,从而减少潜在风险。 3. **自定义转换逻辑** 由于源系统与目标系统之间可能存在数据格式上的差异,需要事先定义好转换规则。例如,对于日期格式、数值单位等字段进行了统一规范。这些自定义逻辑可以灵活配置,同时也提升了后续维护工作的便利性。 4. **分页抓取与错误重试机制** 针对`executeBillQuery`接口返回的大规模结果集,我们采用分页抓取策略,每页请求限定适当数量,以规避服务器响应超时及性能瓶颈。同时,实现了健壮的错误重试机制,当遇到临时通信失败或服务拒绝等现象时,会自动重新发送请求,不影响整体流程稳定性。 5. **可视化设计工具助力管理** 利用平台内置的可视化设计工具,将复杂度较高的数据流过程直观清晰地展现在面板上,使得调试、部署更加便捷。这对于项目组成员理解整个工作链条起到了极大的帮助作用,也避免了因沟通不畅造成的不必要延迟。 综上所述,本次案例充分利用核心技术手段,在满足业务需求的前提下实现了两套 ![用友与MES系统接口开发配置](https://pic.qeasy.cloud/D4.png~tplv-syqr462i7n-qeasy.image) ### 调用金蝶云星空接口executeBillQuery获取并加工数据 在数据集成过程中,调用源系统接口是关键的一步。本文将深入探讨如何使用轻易云数据集成平台的元数据配置,通过调用金蝶云星空的`executeBillQuery`接口来获取并加工数据。 #### 接口配置与调用 首先,我们需要了解元数据配置中的各个字段及其作用。以下是元数据配置的详细说明: ```json { "api": "executeBillQuery", "effect": "QUERY", "method": "POST", "number": "F_BGP_ProductId", "id": "F_BGP_ProductId", "name": "FNumber", "idCheck": true, "request": [ {"field": "F_BGP_MaterialId", "label": "物料内码", "type": "string", "describe": "编码", "value": "F_BGP_MaterialId"}, {"field": "F_BGP_MaterialId_FNumber", "label": "物料编码", "type": "string", "value": "F_BGP_MaterialId.FNumber"}, {"field": "F_BGP_ProductId", "label": "产品标识", "type": "string", "describe": "名称", "value": "F_BGP_ProductId.FNumber"} ], ... } ``` #### 请求参数解析 1. **基本请求参数**: - `F_BGP_MaterialId`: 表示物料内码,是一个字符串类型的字段,用于唯一标识物料。 - `F_BGP_MaterialId_FNumber`: 表示物料编码,关联到物料内码的具体编码。 - `F_BGP_ProductId`: 表示产品标识,是一个字符串类型的字段,用于唯一标识产品。 2. **其他请求参数**: - `Limit`: 最大行数,用于分页查询。 - `StartRow`: 开始行索引,用于分页查询。 - `TopRowCount`: 返回总行数,用于分页查询。 - `FilterString`: 过滤条件,例如 `FSupplierId.FNumber = 'VEN00010' and FApproveDate>=`,可以动态生成过滤条件。 - `FieldKeys`: 查询字段集合,以逗号分隔,例如 `FPOOrderEntry_FEntryId,FPurchaseOrgId.FNumber`。 - `FormId`: 表单ID,例如 `BGP_ProductSet`。 #### 实际调用示例 在实际操作中,我们通过HTTP POST方法向金蝶云星空发送请求。以下是一个示例请求体: ```json { "FormId": "BGP_ProductSet", "FieldKeys": ["FPOOrderEntry_FEntryId","FPurchaseOrgId.FNumber"], ... } ``` 我们需要确保所有必要的字段都已正确填充,并且根据业务需求设置适当的过滤条件和分页参数。 #### 数据处理与清洗 在获取到原始数据后,需要对数据进行清洗和转换。这一步骤包括但不限于以下操作: 1. **数据格式转换**:将金蝶返回的数据格式转换为目标系统所需的格式。例如,将日期格式从`yyyy-MM-dd`转换为`MM/dd/yyyy`。 2. **字段映射**:根据业务需求,将源系统字段映射到目标系统字段。例如,将`F_BGP_MaterialId`映射到目标系统中的`MaterialCode`。 3. **去重与校验**:确保数据唯一性和完整性,去除重复记录并进行必要的数据校验。 #### 示例代码 以下是一个简单的Python示例代码,展示如何调用接口并处理返回的数据: ```python import requests import json url = 'https://api.kingdee.com/executeBillQuery' headers = {'Content-Type': 'application/json'} payload = { 'FormId': 'BGP_ProductSet', 'FieldKeys': 'FPOOrderEntry_FEntryId,FPurchaseOrgId.FNumber', 'FilterString': f"FModifyDate>='{last_sync_time}'", 'Limit': '2000' } response = requests.post(url, headers=headers, data=json.dumps(payload)) data = response.json() # 数据处理与清洗 processed_data = [] for record in data['Result']: processed_record = { 'MaterialCode': record['F_BGP_MaterialId'], 'ProductCode': record['F_BGP_ProductId'] # 更多字段映射... } processed_data.append(processed_record) # 输出处理后的数据 print(json.dumps(processed_data, indent=4)) ``` 通过上述步骤,我们成功地从金蝶云星空获取了所需的数据,并进行了初步加工,为后续的数据写入和进一步处理打下了基础。 ![如何对接钉钉API接口](https://pic.qeasy.cloud/S8.png~tplv-syqr462i7n-qeasy.image) ### 使用轻易云数据集成平台进行ETL转换并写入金蝶云星空API接口 在数据集成的生命周期中,第二步是将已经集成的源平台数据进行ETL(Extract, Transform, Load)转换,并将其转为目标平台能够接收的格式。本文将详细探讨如何使用轻易云数据集成平台,将源数据转换为金蝶云星空API接口所需的格式,并最终写入目标平台。 #### 1. 数据请求与清洗 在数据请求与清洗阶段,我们从源系统中获取原始数据,并对其进行必要的清洗和预处理。这一步骤确保了后续的数据转换和写入操作能够顺利进行。假设我们已经完成了这部分工作,接下来重点介绍如何配置和使用轻易云平台进行ETL转换。 #### 2. 数据转换与写入 在这一阶段,我们需要将清洗后的数据转换为金蝶云星空API接口所能接受的格式。以下是具体的元数据配置和操作步骤: ##### 元数据配置解析 ```json { "api": "batchSave", "effect": "EXECUTE", "method": "POST", "idCheck": true, "request": [ { "field": "FMATERIALID", "label": "FMATERIALID", "type": "string", "value": "{F_BGP_MaterialId}" }, { "field": "F_UVQS_DI", "label": "产品标识", "type": "string", "describe": "名称", "value": "{F_BGP_ProductId}" } ], "otherRequest": [ { "field": "FormId", "label": "业务对象表单Id", "type": "string", "describe": "必须填写金蝶的表单ID如:PUR_PurchaseOrder", "value": "BD_MATERIAL" }, { "field": "Operation", "label": "执行的操作", "type": "string", "value": "Save" }, { "field": "IsAutoSubmitAndAudit", "label": "提交并审核", "type": ":bool", ":value":"false" }, { ":field":"IsVerifyBaseDataField","label":"验证基础资料","type":"bool","describe":"是否验证所有的基础资料有效性,布尔类,默认false(非必录)","value":"false" } ], ":operation":{"rowsKey":"array","rows":"50","method":"batchArraySave"} } ``` ##### 配置说明 1. **API Endpoint**: `batchSave` - 表示批量保存操作。 2. **HTTP Method**: `POST` - 使用POST方法提交数据。 3. **ID Check**: `true` - 检查ID是否存在。 4. **Request Fields**: - `FMATERIALID`: 对应物料ID。 - `F_UVQS_DI`: 产品标识,描述为名称。 5. **Other Request Fields**: - `FormId`: 必须填写金蝶的表单ID,如`BD_MATERIAL`。 - `Operation`: 执行保存操作。 - `IsAutoSubmitAndAudit`: 是否自动提交并审核,设置为`false`。 - `IsVerifyBaseDataField`: 是否验证基础资料有效性,默认为`false`。 ##### 操作步骤 1. **配置请求字段**:根据元数据配置中的要求,将源平台的数据字段映射到目标平台所需字段。例如,将源系统中的物料ID映射到`FMATERIALID`,产品标识映射到`F_UVQS_DI`。 2. **设置其他请求参数**:填写必要的业务对象表单ID(如`BD_MATERIAL`),以及其他操作参数(如保存操作、是否自动提交审核等)。 3. **批量保存操作**:使用配置好的元数据,通过轻易云平台提供的批量保存方法(如`batchArraySave`),将处理后的数据批量写入到金蝶云星空。 ##### 示例代码片段 以下是一个示例代码片段,用于展示如何通过轻易云平台实现上述配置和操作: ```python import requests import json # 定义请求URL和头信息 url = 'https://api.kingdee.com/batchSave' headers = {'Content-Type': 'application/json'} # 构建请求体 payload = { 'FormId': 'BD_MATERIAL', 'Operation': 'Save', 'IsAutoSubmitAndAudit': False, 'IsVerifyBaseDataField': False, 'Model': [ { 'FMATERIALID': '{F_BGP_MaterialId}', 'F_UVQS_DI': '{F_BGP_ProductId}' } # 可以添加更多记录 ] } # 发起POST请求 response = requests.post(url, headers=headers, data=json.dumps(payload)) # 检查响应状态 if response.status_code == 200: print('Data successfully saved to Kingdee Cloud.') else: print('Failed to save data:', response.text) ``` 通过以上步骤,我们成功地将源平台的数据经过ETL转换后,写入到了金蝶云星空API接口中。这个过程不仅提高了数据处理效率,还确保了不同系统之间的数据一致性和准确性。 ![打通用友BIP数据接口](https://pic.qeasy.cloud/T22.png~tplv-syqr462i7n-qeasy.image)