从源数据到简道云的ETL转换与写入实战

  • 轻易云集成顾问-冯潇
### 金蝶云星空到简道云的数据集成:销售发货通知单同步技术案例分享 在本案例中,我们将探讨如何高效地实现从金蝶云星空到简道云的销售发货通知单数据同步,确保过程中的数据不漏单、快速写入、并且具备异常处理与错误重试机制。 #### 确保集成金蝶云星空数据不漏单 为了保证每一笔销售发货通知单都能正确地由金蝶云星空传输至简道云,我们首先需要可靠的定时任务来抓取金蝶的API接口数据。使用`executeBillQuery`接口,通过分页和限流机制获取所有未同步的数据,并记录已处理的位置,以避免重复或遗漏。 ```json { "formId": "SAL_DELIVERYNOTICE", "filterString": "", "fieldKeys": "" } ``` 我们可以配置相应参数,使查询结果包含必要字段,过滤条件则可以根据业务逻辑动态调整,实现灵活性与准确性的兼顾。 #### 大量数据快速写入到简道云 对于批量写入简道云平台,可以利用其提供的批量创建API:`/api/v2/app/{app_id}/entry/{entry_id}/data_create`。通过构建一个高效的数据转换器,将从金蝶获取的大量JSON格式的数据迅速转化为符合简道结构要求的数据包,以及使用适当的并行调用策略,实现高速稳定的数据传输。 ```json { "triggerEntryIds":[] //其他相关字段填充 } ``` 在这里,每个请求会携带一个专门用以触发后续动作的方法,这样能够有效保障大量数据的一致性和完整性。 #### 简道云对接异常处理与错误重试机制 为了提升系统健壮性,对接过程中可能出现网络抖动、服务不可用等情况时,需要设计完善的异常捕获和自动重试机制。一方面,当遇到HTTP错误码或者接口响应超时时,应及时记录日志,并触发告警;另一方面,根据规则进行指数退避式(exponential backoff)重试,以降低瞬间负载压力,同时提高成功率。 下面是典型的一段伪代码示例: ```python import time def data_sync(): attempts = 0 max_attempts = 5 while attempts < max_attempts: try: response = requests.post(api_url, json=data) if response.status_code == 200: return True # 成功完成操作 else: log(response.status_code, response.text) except Exception as e: log(e) attempts += 1 sleep_time = min(2 ** attempts, MAX_WAIT_TIME ![用友与SCM系统接口开发配置](https://pic.qeasy.cloud/D12.png~tplv-syqr462i7n-qeasy.image) ### 调用金蝶云星空接口executeBillQuery获取并加工数据 在轻易云数据集成平台中,调用源系统接口是数据集成生命周期的第一步。本文将详细介绍如何通过调用金蝶云星空的`executeBillQuery`接口来获取销售发货通知单的数据,并进行初步加工。 #### 接口配置与请求参数 首先,我们需要配置接口的元数据,以便正确调用金蝶云星空的API。以下是元数据配置的关键部分: ```json { "api": "executeBillQuery", "effect": "QUERY", "method": "POST", "number": "FBillNo", "id": "FEntity_FEntryID", "name": "FBillNo", "idCheck": true, ... } ``` 该配置指定了我们要调用的API名称为`executeBillQuery`,请求方法为`POST`,并且设置了单据编号(`FBillNo`)和实体主键(`FEntity_FEntryID`)等关键字段。 #### 请求参数详解 在实际调用过程中,我们需要构造一个包含所有必要字段的请求体。以下是一些重要字段及其描述: - `FID`: 实体主键 - `FBillNo`: 单据编号 - `FDocumentStatus`: 单据状态 - `FSaleOrgId.FNumber`: 销售组织编号 - `FDate`: 日期 - `FCustomerID.Fnumber`: 客户编号 这些字段在请求体中的表示如下: ```json { "FormId": "SAL_DELIVERYNOTICE", "FieldKeys": ["FID", "FBillNo", ...], "FilterString": "FApproveDate>='{{LAST_SYNC_TIME|datetime}}' and FSaleOrgId.FNumber = '103'", ... } ``` 其中,`FormId`指定了业务对象表单ID为销售发货通知单(`SAL_DELIVERYNOTICE`),而`FieldKeys`则列出了我们需要查询的字段集合。`FilterString`用于过滤查询结果,例如按审批日期和销售组织编号进行过滤。 #### 数据请求与清洗 通过上述配置和请求参数,我们可以向金蝶云星空发送API请求并获取原始数据。接下来,我们需要对这些数据进行清洗和初步加工,以便后续处理。 1. **数据格式转换**:将API返回的数据转换为标准化格式,例如JSON或CSV。 2. **字段映射**:根据业务需求,将原始字段映射到目标系统所需的字段。例如,将金蝶云返回的客户编号(`FCustomerID.Fnumber`)映射到目标系统中的客户ID。 3. **数据验证**:检查关键字段是否存在,并验证其有效性。例如,确保每条记录都有有效的单据编号和实体主键。 #### 示例代码 以下是一个示例代码片段,用于发送API请求并处理响应数据: ```python import requests import json # API URL 和头信息 url = 'https://api.kingdee.com/executeBillQuery' headers = {'Content-Type': 'application/json'} # 请求体 payload = { 'FormId': 'SAL_DELIVERYNOTICE', 'FieldKeys': ['FID', 'FBillNo', 'FDocumentStatus', ...], 'FilterString': "FApproveDate>='2023-01-01' and FSaleOrgId.FNumber = '103'", ... } # 发起POST请求 response = requests.post(url, headers=headers, data=json.dumps(payload)) # 检查响应状态码 if response.status_code == 200: data = response.json() # 数据清洗和初步加工 cleaned_data = [] for record in data: cleaned_record = { '实体主键': record['FID'], '单据编号': record['FBillNo'], ... } cleaned_data.append(cleaned_record) else: print(f"Error: {response.status_code}, {response.text}") ``` 通过上述步骤,我们成功地从金蝶云星空获取了销售发货通知单的数据,并进行了初步清洗和加工。这些处理后的数据将作为后续集成过程中的输入,为实现不同系统间的数据无缝对接打下基础。 ![如何开发金蝶云星空API接口](https://pic.qeasy.cloud/S21.png~tplv-syqr462i7n-qeasy.image) ### 数据集成生命周期第二步:ETL转换与数据写入简道云API接口 在轻易云数据集成平台中,数据集成生命周期的第二步是将已经集成的源平台数据进行ETL(提取、转换、加载)转换,并转为目标平台简道云API接口所能够接收的格式,最终写入目标平台。本文将详细探讨这一过程中的技术细节和实现方法。 #### API接口元数据配置解析 根据提供的元数据配置,我们需要将源平台的数据通过ETL过程转换为简道云API能够接收的格式。以下是元数据配置的具体内容: ```json { "api": "/api/v2/app/{app_id}/entry/{entry_id}/data_create", "effect": "EXECUTE", "method": "POST", "idCheck": true, "request": [ { "field": "_widget_1688112112673", "label": "发货通知单号", "type": "string", "describe": "111", "value": "{FBillNo}", "parser": { "name": "ConvertObjectParser", "params": "value" } }, { "field": "_widget_1688112203982", "label": "销售单号", "type": "string", "value": "{FOrderNo}", "parser": { "name": "ConvertObjectParser", "params": "value" } }, { "field": "_widget_1688112203983", "label": "创建人", "type": "string", "value": "{FCreatorId}", "parser": { "name": "ConvertObjectParser", "params": "value" } } ], ... } ``` #### 数据提取与清洗 在ETL过程的第一步,我们需要从源平台提取所需的数据字段。例如,从ERP系统中提取销售发货通知单相关的数据。这些字段包括发货通知单号(FBillNo)、销售单号(FOrderNo)和创建人(FCreatorId)。 ```python source_data = { 'FBillNo': 'SH20231001', 'FOrderNo': 'SO20231001', 'FCreatorId': 'U123456' } ``` #### 数据转换 接下来,我们需要将这些提取到的数据按照目标平台简道云API要求的格式进行转换。根据元数据配置中的`parser`字段,我们可以使用`ConvertObjectParser`进行简单的值转换。 ```python converted_data = { '_widget_1688112112673': source_data['FBillNo'], '_widget_1688112203982': source_data['FOrderNo'], '_widget_1688112203983': source_data['FCreatorId'] } ``` #### 构建请求体 在完成数据转换后,我们需要构建一个符合简道云API要求的请求体。根据元数据配置中的`otherRequest`部分,我们还需要添加一些额外的请求参数。 ```python request_body = { '_widget_1688112112673': converted_data['_widget_1688112112673'], '_widget_1688112203982': converted_data['_widget_1688112203982'], '_widget_1688112203983': converted_data['_widget_1688112203983'], 'is_start_workflow': 'true', 'is_start_trigger': 'false', 'transaction_id': '1', 'appId': '63899c8e6705fb000870437d', 'entryId': '649e8bf09d56390009c3ee0f' } ``` #### 数据写入 最后一步是通过HTTP POST请求将构建好的请求体发送到简道云API接口,实现数据写入。 ```python import requests url = "/api/v2/app/63899c8e6705fb000870437d/entry/649e8bf09d56390009c3ee0f/data_create" headers = {'Content-Type': 'application/json'} response = requests.post(url, json=request_body, headers=headers) if response.status_code == 200: print("Data successfully written to JianDaoYun.") else: print(f"Failed to write data: {response.text}") ``` 以上代码展示了如何通过轻易云数据集成平台,将源平台的数据经过ETL转换后,成功写入到目标平台简道云中。通过这种方式,可以实现不同系统间的数据无缝对接,提高业务处理效率。 ![钉钉与CRM系统接口开发配置](https://pic.qeasy.cloud/T25.png~tplv-syqr462i7n-qeasy.image)