基于轻易云的数据清洗与ETL转换案例解析

  • 轻易云集成顾问-吴伟
### 案例分享:JY-BDS直接调拨单提交-审核 在本次技术案例中,我们将探讨如何通过轻易云数据集成平台,实现金蝶云星空系统间的高效数据交换。具体而言,本案例涉及到`JY-BDS直接调拨单提交及审核功能`,主要通过调用金蝶云星空的API接口来完成数据获取与写入操作。 在项目实施过程中,我们利用了轻易云提供的多个特性,例如支持自定义的数据转换逻辑和其高吞吐量的数据写入能力。首先,通过使用金蝶云星空的获取数据API `ExecuteBillQuery`,我们能够定时可靠地抓取所需数据信息,并确保这些信息无遗漏地传输到目标系统。在这个过程中,为了适应业务特殊需求和数据结构差异,自定义的数据转换逻辑发挥了关键作用,使得不同系统间的数据格式得以兼容。 接下来,通过调用金蝶云星空的写入API `Audit`,大量被处理过的数据需要快速且稳定地导入目标系统。借助平台提供的批量集成功能,高效实现了大规模数据的一次性传输,大幅度提升了整体工作效率。同时,为确保每个环节顺利进行,还部署了一套集中化监控与告警机制,以实时跟踪任务状态并及时处理可能出现的问题。 一个突出的挑战是如何有效处理接口限流以及分页问题。在这种情况下,需要充分考虑接口限制条件,并根据实际情况设计合理的分页策略,以保证每次调用都能最大程度发挥性能而不引起服务端压力过大或请求失败。 总之,这样一套高度集成化和可视化配置的平台,不仅显著简化了复杂业务场景下的大规模数据对接过程,还为企业带来了极大的灵活性与效率提升。随后的内容中,将详细介绍该项目具体实施步骤及注意事项。 ![打通用友BIP数据接口](https://pic.qeasy.cloud/D7.png~tplv-syqr462i7n-qeasy.image) ### 调用金蝶云星空接口ExecuteBillQuery获取并加工数据 在轻易云数据集成平台中,调用源系统金蝶云星空接口ExecuteBillQuery是数据集成生命周期的第一步。此步骤的主要任务是通过API接口从金蝶云星空获取原始数据,并进行初步加工,为后续的数据清洗和转换奠定基础。 #### API接口配置 首先,我们需要配置ExecuteBillQuery接口的元数据。以下是该接口的具体配置参数: ```json { "api": "ExecuteBillQuery", "method": "POST", "number": "FBillNo", "id": "FId", "pagination": { "pageSize": 10 }, "idCheck": true, "request": [ { "field": "Limit", "label": "最大行数", "type": "string", "describe": "金蝶的查询分页参数", "value": "50" }, { "field": "StartRow", "label": "开始行索引", "type": "string", "describe": "金蝶的查询分页参数", "value": "{PAGINATION_START_ROW}" }, { "field": "TopRowCount", "label": "返回总行数", "type": "int", "describe": "金蝶的查询分页参数" }, { "field": "FilterString", "label": "过滤条件", "type": "string", "describe": "示例写法 FSupplierId.FNumber = 'VEN00010' and FApproveDate>=", "value":"FDocumentStatus <> 'C'" }, { "field":"FieldKeys","label":"需查询的字段key集合","type":"array","describe":"金蝶分录主键ID格式:FPOOrderEntry_FEntryId,其它格式 FPurchaseOrgId.FNumber","parser":{"name":"ArrayToString","params":","}}, { "field":"FormId","label":"业务对象表单Id","type":"string","describe":"必须填写金蝶的表单ID如:PUR_PurchaseOrder","value":"STK_TransferDirect"} ] } ``` #### 请求参数解析 1. **Limit**: 设置每次请求返回的最大行数,这里设置为50。 2. **StartRow**: 开始行索引,用于分页处理,值为`{PAGINATION_START_ROW}`,由系统动态生成。 3. **TopRowCount**: 返回总行数,用于获取符合条件的数据总量。 4. **FilterString**: 过滤条件,这里设置为`FDocumentStatus <> 'C'`,表示只查询未完成状态的数据。 5. **FieldKeys**: 查询字段集合,以逗号分隔多个字段,如`FPOOrderEntry_FEntryId,FPurchaseOrgId.FNumber`。 6. **FormId**: 表单ID,这里指定为`STK_TransferDirect`,表示直接调拨单。 #### 数据请求与处理 在实际操作中,通过POST请求向金蝶云星空API发送上述配置的请求参数。以下是一个示例请求体: ```json { "Limit": 50, "StartRow": 0, “TopRowCount”: true, “FilterString”: “FDocumentStatus <> 'C'”, “FieldKeys”: “FBillNo,FDate,FMaterialId.FNumber,FQty”, “FormId”: “STK_TransferDirect” } ``` 通过该请求,我们可以获取到符合条件的数据集。接下来,我们需要对返回的数据进行初步加工,例如将JSON数组转换为表格形式,以便后续处理。 #### 数据加工示例 假设我们从API返回了以下数据: ```json [ { "FBillNo":"DB20230001", "FDate":"2023-01-01", "FMaterialId.FNumber":"MAT001", "FQty":"100" }, { "FBillNo":"DB20230002", "FDate":"2023-01-02", "FMaterialId.FNumber":"MAT002", "FQty":"200" } ] ``` 我们可以将其转换为如下表格形式: | FBillNo | FDate | FMaterialId.FNumber | FQty | |-------------|-------------|---------------------|------| | DB20230001 | 2023-01-01 | MAT001 | 100 | | DB20230002 | 2023-01-02 | MAT002 | 200 | 这种表格形式便于后续的数据清洗和转换处理。 #### 小结 通过调用金蝶云星空接口ExecuteBillQuery并对数据进行初步加工,我们能够高效地获取并准备好源系统数据,为整个数据集成过程打下坚实基础。这一步骤不仅确保了数据获取的准确性和及时性,还提升了整体业务流程的透明度和效率。 ![金蝶与CRM系统接口开发配置](https://pic.qeasy.cloud/S30.png~tplv-syqr462i7n-qeasy.image) ### 使用轻易云数据集成平台进行ETL转换并写入金蝶云星空API接口的技术案例 在数据集成生命周期的第二步,我们需要将已经集成的源平台数据进行ETL(提取、转换、加载)转换,转为目标平台金蝶云星空API接口所能够接收的格式,并最终写入目标平台。以下是一个具体的技术案例,详细介绍如何使用轻易云数据集成平台完成这一过程。 #### 元数据配置解析 首先,我们来看一下元数据配置: ```json { "api": "Audit", "method": "POST", "idCheck": true, "request": [ {"field":"FormId","label":"业务对象表单Id","type":"string","describe":"必须填写金蝶的表单ID如:PUR_PurchaseOrder","value":"STK_TransferDirect"}, {"field":"Numbers","label":"编码集合","type":"string","value":"{BillNo}"}, {"field":"Ids","label":"id集合","type":"string"}, {"field":"InterationFlags","label":"交互标志集合","type":"string","describe":"字符串类型,分号分隔,格式:\"flag1;flag2;...\"(非必录) 例如(允许负库存标识:STK_InvCheckResult)","value":"STK_InvCheckResult"}, {"field":"IgnoreInterationFlag","label":"是否允许忽略交互","type":"string","describe":"布尔类型,默认true(非必录)","value":"true"}, {"field":"NetworkCtrl","label":"是否启用网控","type":"string","describe":"布尔类型,默认false(非必录)","value":"false"}, {"field":"IsVerifyProcInst","label":"检验单据关联运行","type":"string","describe":"是否检验单据关联运行中的工作流实例,布尔类型,默认false(非必录)","value":"false"} ] } ``` #### 数据请求与清洗 在ETL过程的第一阶段,我们已经完成了数据请求与清洗。此时,我们有了一个干净且结构化的数据集,可以用于下一步的数据转换和写入。 #### 数据转换与写入 在这个阶段,我们需要将清洗后的数据转换为金蝶云星空API接口所能接受的格式,并通过API调用将其写入目标平台。 1. **配置API请求参数** 根据元数据配置,我们需要构建一个包含所有必要字段的JSON对象。以下是一个示例: ```json { "FormId": "STK_TransferDirect", "Numbers": "{BillNo}", "Ids": "", "InterationFlags": "STK_InvCheckResult", "IgnoreInterationFlag": "true", "NetworkCtrl": "false", "IsVerifyProcInst": "false" } ``` 2. **动态填充字段值** 在实际操作中,需要动态填充`Numbers`字段中的`{BillNo}`。假设我们从源系统获取到的单据编号为`BILL12345`,则填充后的JSON对象应如下: ```json { "FormId": "STK_TransferDirect", "Numbers": "BILL12345", "Ids": "", "InterationFlags": "STK_InvCheckResult", "IgnoreInterationFlag": "true", "NetworkCtrl": "false", "IsVerifyProcInst": "false" } ``` 3. **调用金蝶云星空API** 使用HTTP POST方法,将上述JSON对象作为请求体发送到金蝶云星空的`Audit` API接口。以下是一个示例代码片段: ```python import requests import json url = 'https://api.kingdee.com/Audit' headers = {'Content-Type': 'application/json'} payload = { 'FormId': 'STK_TransferDirect', 'Numbers': 'BILL12345', 'Ids': '', 'InterationFlags': 'STK_InvCheckResult', 'IgnoreInterationFlag': 'true', 'NetworkCtrl': 'false', 'IsVerifyProcInst': 'false' } response = requests.post(url, headers=headers, data=json.dumps(payload)) if response.status_code == 200: print('Data successfully written to Kingdee Cloud.') else: print(f'Failed to write data: {response.text}') ``` 4. **处理响应** 根据响应结果判断数据是否成功写入。如果响应状态码为200,则表示操作成功;否则,需要根据返回的信息进行错误处理。 #### 技术要点总结 - **元数据配置**:通过详细解析元数据配置文件,明确每个字段的含义和用途。 - **动态填充**:根据实际业务需求动态填充请求参数中的变量。 - **API调用**:使用HTTP POST方法调用目标平台API,并处理响应结果。 - **错误处理**:根据响应状态码和返回信息进行错误处理,以确保数据正确写入。 通过上述步骤,我们可以高效地将源平台的数据经过ETL转换后写入到金蝶云星空,实现不同系统间的数据无缝对接。这不仅提升了业务流程的自动化程度,也确保了数据的一致性和准确性。 ![用友与WMS系统接口开发配置](https://pic.qeasy.cloud/T17.png~tplv-syqr462i7n-qeasy.image)