利用轻易云进行ETL转换并写入金蝶云星空的实战案例

  • 轻易云集成顾问-曾平安
### 业务系统销售开单对接到金蝶销售订单-ok 在企业日常运作中,高效的数据集成对于提升生产力和管理效率至关重要。本案例探讨了如何利用轻易云数据集成平台,实现云水聚数据的高效采集与处理,并将其无缝对接到金蝶云星空。具体方案涉及多个模块的精细协作,确保从获取接口数据,到批量写入,再到错误处理和重试机制的各个环节万无一失。 首先,通过调用云水聚提供的API接口/Kingdee/GetSaleOrderList,我们能够定时可靠地抓取最新的销售订单数据。在这个过程中,为确保不漏单,我们设计了一套处理云水聚接口分页与限流问题的方法。这不仅保证了大范围的数据抓取,同时避免了因接口负载过高而导致的数据丢失或延迟。 为了实现大量数据快速写入金蝶云星空系统,我们利用其batchSave API,以批量方式将多条记录一次性传输并存储。这种方法不仅提高了效率,还降低了网络通信成本。然而,这同时也带来一个挑战,即如何处理两者之间的数据格式差异。对此,我们开发了一套定制化的数据映射规则,自动转换不同字段类型,使之符合目标端要求。 此外,在整个对接过程中,实时监控与日志记录功能不可或缺。我们部署了一系列监控节点,对每一步操作进行追踪并记录日志。当出现异常情况时,例如连接超时或响应失败,通过预设置的错误重试机制,可以自动重新发起请求,从容应对各种突发情况,大幅减少人为干预需求,提高系统稳定性。 ![打通用友BIP数据接口](https://pic.qeasy.cloud/D33.png~tplv-syqr462i7n-qeasy.image) ### 调用源系统云水聚接口/Kingdee/GetSaleOrderList获取并加工数据 在数据集成的生命周期中,调用源系统接口是至关重要的一步。本文将深入探讨如何使用轻易云数据集成平台调用云水聚接口 `/Kingdee/GetSaleOrderList` 获取销售订单列表,并对数据进行初步加工。 #### 接口配置与调用 首先,我们需要了解元数据配置中的各个字段及其作用: ```json { "api": "/Kingdee/GetSaleOrderList", "effect": "QUERY", "method": "GET", "number": "orderNo", "id": "orderNo", "name": "orderNo", "idCheck": true, "request": [ { "field": "ApproveDate", "label": "审核时间", "type": "datetime", "value": "{{LAST_SYNC_TIME|datetime}}" } ], "autoFillResponse": true } ``` 1. **API路径**:`/Kingdee/GetSaleOrderList` 是我们需要调用的接口路径。 2. **请求类型**:`GET` 表示我们将通过HTTP GET方法请求数据。 3. **主键字段**:`orderNo` 用于标识每条记录的唯一性。 4. **请求参数**:包含一个字段 `ApproveDate`,表示审核时间,其值为上次同步时间 `{{LAST_SYNC_TIME|datetime}}`。 #### 数据请求与清洗 在调用接口之前,我们需要确保请求参数的正确性。这里的 `ApproveDate` 参数用于过滤自上次同步以来的新数据。通过动态设置 `LAST_SYNC_TIME`,可以确保每次只获取增量数据,提升效率。 ```python import requests from datetime import datetime # 假设 LAST_SYNC_TIME 已经从系统中获取 last_sync_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S') # 构建请求参数 params = { 'ApproveDate': last_sync_time } # 发起 GET 请求 response = requests.get('https://api.yunshuiju.com/Kingdee/GetSaleOrderList', params=params) if response.status_code == 200: data = response.json() # 对返回的数据进行初步清洗和验证 cleaned_data = [] for order in data: if 'orderNo' in order and order['orderNo']: cleaned_data.append(order) else: print(f"Error: {response.status_code}") ``` #### 数据转换与写入 在获取并清洗了销售订单列表后,我们需要将这些数据转换为目标系统所需的格式,并写入目标数据库或系统。在此过程中,可以利用轻易云平台提供的自动填充响应功能 `autoFillResponse: true`,简化部分工作。 ```python # 假设目标系统需要的数据格式如下: transformed_data = [] for order in cleaned_data: transformed_order = { '订单编号': order['orderNo'], '客户名称': order.get('customerName', ''), '订单日期': order.get('orderDate', ''), # 添加其他必要的字段转换... } transformed_data.append(transformed_order) # 将转换后的数据写入目标系统(例如数据库) import pymysql connection = pymysql.connect( host='localhost', user='user', password='password', database='sales_db' ) try: with connection.cursor() as cursor: for order in transformed_data: sql = """ INSERT INTO sales_orders (order_no, customer_name, order_date) VALUES (%s, %s, %s) """ cursor.execute(sql, (order['订单编号'], order['客户名称'], order['订单日期'])) connection.commit() finally: connection.close() ``` 通过上述步骤,我们成功地完成了从源系统云水聚获取销售订单列表、对数据进行清洗和转换,并最终写入目标系统的全过程。这一过程不仅提高了数据处理效率,还确保了数据的一致性和准确性。 ![如何开发金蝶云星空API接口](https://pic.qeasy.cloud/S11.png~tplv-syqr462i7n-qeasy.image) ### 使用轻易云数据集成平台进行ETL转换并写入金蝶云星空API接口的技术案例 在数据集成生命周期的第二阶段,我们需要将已经集成的源平台数据进行ETL转换,使其符合目标平台金蝶云星空API接口的格式要求,并最终写入目标平台。本文将详细探讨这一过程中的技术细节,特别是如何配置和使用元数据来实现这一目标。 #### 数据请求与清洗 在数据请求与清洗阶段,我们已经从源系统获取了原始数据,并进行了必要的数据清洗和预处理。这些数据现在准备好进行进一步的ETL转换,以适应金蝶云星空API接口的需求。 #### 元数据配置解析 我们使用以下元数据配置来指导ETL转换过程: ```json { "api": "batchSave", "effect": "EXECUTE", "method": "POST", "number": "FBillNo", "id": "FBillNo", "name": "FBillNo", "idCheck": true, "operation": { "method": "batchArraySave", "rows": 1, "rowsKey": "array" }, "request": [ { "field": "FBillTypeID", "label": "单据类型", ... }, ... ], ... } ``` #### ETL转换过程 1. **字段映射与转换** 每个字段都有特定的配置,包括字段名、标签、类型、描述和值。例如,`FBillTypeID`字段被映射为单据类型,其值通过`ConvertObjectParser`解析器转换为`FNumber`格式。 ```json { "field": "FBillTypeID", ... "parser": { "name": "ConvertObjectParser", ... }, ... } ``` 2. **动态值替换** 某些字段需要从源数据中动态替换,例如订单编号`FBillNo`和客户名称`FCustId`。这些值通常通过占位符表示,如 `{orderNo}` 和 `{agentName}`,并在运行时替换为实际值。 ```json { "field": "FBillNo", ... "value": "{orderNo}" }, { ... "value":"_mongoQuery a0cfff28-620e-3150-bbbd-3ac0b72a3da5 findField=content.FNumber where={\"content.FName\":{\"$eq\":\"{agentName}\"}}" } ``` 3. **数组处理** 对于复杂的数据结构,如订单明细(`FSaleOrderEntry`),我们需要处理嵌套数组。每个子项也有相应的字段映射和转换规则。例如,物料编码、销售数量和含税单价等字段都需要进行相应的解析和赋值。 ```json { ... "children":[ { ... {"field":"FMaterialId",...,"value":"{{productItemEntity.productId}}"}, {"field":"FQty",...,"value":"{{productItemEntity.applyNumer}}"}, {"field":"FTaxPrice",...,"value":"{{productItemEntity.productPrice}}"}, ... } ] } ``` 4. **其他请求参数** 除了主要的数据字段外,还需要配置一些其他请求参数,如业务对象表单ID、执行操作、是否自动提交并审核等。这些参数确保了在调用API时能够正确执行所需操作。 ```json { ... {"field":"FormId",...,"value":"SAL_SaleOrder"}, {"field":"Operation",...,"value":"BatchSave"}, {"field":"IsAutoSubmitAndAudit",...,"value":"true"}, {"field":"IsVerifyBaseDataField",...,"value":"true"} } ``` #### 数据写入目标平台 完成ETL转换后,我们使用配置好的元数据通过HTTP POST方法将数据发送到金蝶云星空API接口。以下是一个简化的示例代码片段: ```python import requests url = 'https://api.kingdee.com/batchSave' headers = {'Content-Type': 'application/json'} data = { # 根据元数据配置生成的数据结构 } response = requests.post(url, headers=headers, json=data) if response.status_code == 200: print('Data successfully written to Kingdee Cloud.') else: print('Failed to write data:', response.content) ``` 通过上述步骤,我们成功地将源平台的数据经过ETL转换后写入到金蝶云星空,实现了不同系统间的数据无缝对接。这一过程不仅提高了业务效率,也确保了数据的一致性和准确性。 ![金蝶与SCM系统接口开发配置](https://pic.qeasy.cloud/T28.png~tplv-syqr462i7n-qeasy.image)