ETL转换指导:如何将金蝶数据整合至钉钉系统

  • 轻易云集成顾问-钟家寿
### 金蝶云星空数据集成钉钉:传给金蝶后,回传钉钉提示(已审核)④ 当使用轻易云数据集成平台对接企业的不同系统时,我们需要确保每个环节都精准、高效地执行。本文分享一个经典的系统对接案例——将金蝶云星空中的业务数据集成到钉钉,实现信息的双向流动和实时通知。 #### 背景描述 在本案例中,主要目标是通过轻易云的数据处理能力,将完成状态的业务单据从金蝶云星空拉取到,并将其自动传输到钉钉上进行审计确认。实现这个目标,只需调用两组API接口: - **获取金蝶云星空数据**:`executeBillQuery` - **写入到钉钉**:`topapi/process/instance/comment/add` 具体方案命名为“传给金蝶后,回传钉钉提示(已审核)④”,我们利用了几个关键技术特性来保证流程顺畅,包括高吞吐量的数据写入、定制化的数据转换逻辑以及集中监控和告警功能。 #### 技术要点 1. **高效的数据获取与写入** - 我们首先配置了轻易云平台,通过`executeBillQuery`接口批量抓取金蝶云星空内特定条件下已完成审批的单据。这些单据信息会被快速、高效地导出,有赖于平台所支持的大规模并发请求能力。 2. **自定义数据转换逻辑** - 数据从源系统(金蝶云星空)提取后,需要根据业务需求做适当格式转化,以匹配目标系统(如扣除冗余字段或添加必要标识)。这一过程通过可视化编辑器设定,使开发人员能够快速制定合乎需求的转换规则,提高效率和准确度。 3. **分页与限流控制策略** - 在处理大体量数据时,为防止一次性请求导致性能瓶颈或接口受限,我们设置了分页机制及速率限制策略,有效避免API超载问题,同时确保所有需要处理的信息均能逐步、安全地得到操作。 4. **可靠异常检测与重试机制** - 集成过程难免遇见失败情境,如网络故障或者临时服务不可用等情况。采用集中监控和告警手段,可以实时跟踪任务状态,一旦发现错误即刻触发重试机制,从而保障最终一致性。 5. **反馈结果至终端用户** - 数据成功整合至企业管理后台后,还需即时通知相关人员,即把成功状态反馈回来。例如,通过调用` ![金蝶与WMS系统接口开发配置](https://pic.qeasy.cloud/D8.png~tplv-syqr462i7n-qeasy.image) ### 调用金蝶云星空接口executeBillQuery获取并加工数据 在数据集成的生命周期中,第一步是从源系统获取数据。本文将深入探讨如何通过轻易云数据集成平台调用金蝶云星空接口`executeBillQuery`来获取并加工数据。 #### 接口配置与请求参数 首先,我们需要了解接口的基本配置和请求参数。根据提供的元数据配置,接口为`executeBillQuery`,请求方法为`POST`。以下是主要的请求参数: - `FormId`: 业务对象表单Id,必须填写金蝶的表单ID,例如:`AP_PAYBILL` - `FieldKeys`: 需查询的字段key集合 - `FilterString`: 过滤条件,例如:`FApproveDate>='{{LAST_SYNC_TIME|dateTime}}' and FDOCUMENTSTATUS='C'` - `Limit`: 最大行数,用于分页 - `StartRow`: 开始行索引,用于分页 #### 请求字段解析 元数据配置中详细列出了需要查询的字段,包括但不限于: - `FPAYBILLENTRY_FEntryID`: 金蝶分录主键ID - `FID`: 实体主键 - `FBillNo`: 单据编号 - `FDOCUMENTSTATUS`: 单据状态 - `FCreatorId`: 创建人 - `FAPPROVERID`: 审核人 - `FCreateDate`: 创建日期 - `FSETTLEORGID.FNumber`: 结算组织编号 这些字段将用于构建查询请求,并且可以根据实际需求进行调整。 #### 构建查询请求 根据元数据配置,我们可以构建一个完整的查询请求。以下是一个示例请求体: ```json { "FormId": "AP_PAYBILL", "FieldKeys": "FPAYBILLENTRY_FEntryID,FID,FBillNo,FDOCUMENTSTATUS,FCreatorId,FAPPROVERID,FCreateDate,FSETTLEORGID.FNumber", "FilterString": "FApproveDate>='2023-01-01' and FDOCUMENTSTATUS='C'", "Limit": 500, "StartRow": 0 } ``` 在这个示例中,我们设置了分页参数`Limit`为500,表示每次查询最多返回500条记录;`StartRow`为0,表示从第0行开始查询。 #### 数据处理与清洗 一旦成功获取到数据,下一步就是对数据进行处理和清洗。这一步骤包括但不限于: 1. **数据格式转换**:将金蝶返回的数据格式转换为目标系统所需的格式。 2. **字段映射**:根据业务需求,对字段进行重新命名或映射。 3. **数据过滤**:进一步过滤不符合业务逻辑的数据。 例如,如果我们需要将金蝶返回的数据转换为JSON格式,并且只保留已审核的数据,可以使用以下代码片段: ```python import json def process_data(response): data = json.loads(response) processed_data = [] for record in data: if record['FDOCUMENTSTATUS'] == 'C': processed_record = { 'entry_id': record['FPAYBILLENTRY_FEntryID'], 'bill_no': record['FBillNo'], 'status': record['FDOCUMENTSTATUS'], 'creator': record['FCreatorId'], 'approver': record['FAPPROVERID'], 'create_date': record['FCreateDate'], 'settle_org': record['FSETTLEORGID.FNumber'] } processed_data.append(processed_record) return json.dumps(processed_data, ensure_ascii=False) ``` #### 异常处理与重试机制 在实际操作中,可能会遇到各种异常情况,如网络超时、接口返回错误等。因此,需要设计合理的异常处理与重试机制。例如,可以在捕获异常后进行多次重试,并记录错误日志以便后续排查。 ```python import requests import time def fetch_data(url, payload, retries=3): for attempt in range(retries): try: response = requests.post(url, json=payload) response.raise_for_status() return response.json() except requests.exceptions.RequestException as e: print(f"Attempt {attempt + 1} failed: {e}") time.sleep(2 ** attempt) # 指数退避策略 raise Exception("Failed to fetch data after multiple attempts") url = "https://api.kingdee.com/executeBillQuery" payload = { "FormId": "AP_PAYBILL", "FieldKeys": "FPAYBILLENTRY_FEntryID,FID,FBillNo,FDOCUMENTSTATUS,FCreatorId,FAPPROVERID,FCreateDate,FSETTLEORGID.FNumber", "FilterString": "FApproveDate>='2023-01-01' and FDOCUMENTSTATUS='C'", "Limit": 500, "StartRow": 0 } data = fetch_data(url, payload) processed_data = process_data(data) print(processed_data) ``` 通过上述步骤,我们可以高效地调用金蝶云星空接口获取并加工数据,为后续的数据集成和分析打下坚实基础。 ![打通钉钉数据接口](https://pic.qeasy.cloud/S15.png~tplv-syqr462i7n-qeasy.image) ### 轻易云数据集成平台生命周期第二步:ETL转换与写入钉钉API接口 在数据集成过程中,ETL(Extract, Transform, Load)是至关重要的一环。本文将重点探讨如何使用轻易云数据集成平台,将已集成的源平台数据进行ETL转换,并转为目标平台钉钉API接口所能接收的格式,最终写入目标平台。 #### 数据请求与清洗 在数据集成的初始阶段,我们已经从源系统(金蝶)获取了原始数据,并进行了必要的清洗和预处理。这一步骤确保了数据的准确性和一致性,为后续的转换和写入奠定了基础。 #### 数据转换与写入 接下来,我们进入生命周期的第二步,即将清洗后的数据进行转换,并通过钉钉API接口写入目标平台。以下是具体操作步骤及技术细节: ##### 元数据配置解析 根据提供的元数据配置,我们需要向钉钉API接口发送一个POST请求,具体配置如下: ```json { "api": "topapi/process/instance/comment/add", "method": "POST", "idCheck": true, "request": [ { "field": "request", "label": "请求对象", "type": "object", "children": [ { "field": "process_instance_id", "label": "审批实例ID", "type": "string", "describe": "可通过调用获取审批实例ID列表接口获取。", "value": "_findCollection find id from a9138482-05d7-36b8-a20e-2f3bdb3f90fb where business_id={FBillNo}", "parent": "request" }, { "field": "text", "label": "评论的内容", "type": "string", "value": "已审核", "parent": "request" }, { "field": "comment_userid", "label": "评论人的userid", "type": "string", "value": "112018120420563028", "parent": "request" } ] } ] } ``` ##### 配置解析与应用 1. **API Endpoint**:`topapi/process/instance/comment/add` - 这是我们需要调用的钉钉API接口,用于添加审批实例评论。 2. **HTTP Method**:`POST` - 我们将使用POST方法发送请求。 3. **ID Check**:`true` - 表示需要对ID进行校验,确保唯一性和有效性。 4. **Request Payload**: - `process_instance_id`: 从金蝶系统中获取审批实例ID,通过查询语句 `_findCollection find id from a9138482-05d7-36b8-a20e-2f3bdb3f90fb where business_id={FBillNo}` 获取。 - `text`: 评论内容,固定为“已审核”。 - `comment_userid`: 评论人的用户ID,固定为“112018120420563028”。 ##### 实现步骤 1. **提取审批实例ID** - 首先,从金蝶系统中提取业务单据号(FBillNo),并通过查询语句获取对应的审批实例ID。 2. **构建请求对象** - 根据元数据配置构建请求对象,包括`process_instance_id`、`text`和`comment_userid`字段。 3. **发送HTTP POST请求** - 使用构建好的请求对象,通过HTTP POST方法调用钉钉API接口。 以下是示例代码片段,展示如何实现上述步骤: ```python import requests import json # 提取审批实例ID def get_process_instance_id(fbill_no): query = f"_findCollection find id from a9138482-05d7-36b8-a20e-2f3bdb3f90fb where business_id={fbill_no}" # 假设有一个函数execute_query来执行查询并返回结果 process_instance_id = execute_query(query) return process_instance_id # 构建请求对象 def build_request_object(process_instance_id): request_object = { 'request': { 'process_instance_id': process_instance_id, 'text': '已审核', 'comment_userid': '112018120420563028' } } return request_object # 发送HTTP POST请求 def send_post_request(api_endpoint, request_object): url = f"https://oapi.dingtalk.com/{api_endpoint}" headers = {'Content-Type': 'application/json'} response = requests.post(url, headers=headers, data=json.dumps(request_object)) return response.json() # 主流程 def main(fbill_no): process_instance_id = get_process_instance_id(fbill_no) request_object = build_request_object(process_instance_id) response = send_post_request("topapi/process/instance/comment/add", request_object) print(response) # 示例调用 main("example_fbill_no") ``` 通过以上步骤,我们成功地将从金蝶系统获取的数据进行了ETL转换,并通过钉钉API接口写入了目标平台。这不仅实现了不同系统间的数据无缝对接,也提升了业务流程的自动化程度。 ![数据集成平台API接口配置](https://pic.qeasy.cloud/T17.png~tplv-syqr462i7n-qeasy.image)