从金蝶到轻易云:数据转换与写入的技术实现

  • 轻易云集成顾问-卢剑航
### 金蝶云星空数据集成到轻易云集成平台案例分享 在本次技术案例中,我们将探讨如何实现金蝶云星空系统与轻易云数据集成平台的数据对接,具体操作方案命名为“查询金蝶业务员--ok”。通过这一整套流程,可以有效解决金蝶云星空接口executeBillQuery获取的海量业务员信息被漏单、分页及限流等问题,并确保这些数据能够在定时抓取的情况下快速而稳定地写入到轻易云集成平台。 为了确保每一次的数据请求都能完整无误地进行,我们使用了executeBillQuery接口,该API可以灵活调用并处理大量数据。然而,由于金蝶云星空系统存在一定的分页和限流机制,为避免超出该限制,我们需要制定详细的策略来分段获取并整理所有响应数据。与此同时,针对不同格式的数据,在迁移至轻易云集成平台前,还需执行一系列预处理和映射,以确保兼容性。 基于以上需求,本方案特别注重以下几个关键点: 1. **定时可靠的数据抓取**:采用精确时间调度器,允许我们按设定频率自动触发executeBillQuery接口。 2. **快速批量写入优化**:采用高效并行处理技术,将大批量分段获取到的数据迅速写入到轻易云。 3. **错误重试机制**:当出现网络不畅或其他异常情况时,实现智能化错误捕捉与重试,以提高整个过程的稳健性。 4. **实时监控与日志记录**:全程监控每一个步骤,并生成详尽日志供事后审查。 这不仅提升了整体操作透明度,也显著缩短了从源头获取到目标存储之间所需时间。在以下章节中,我们将逐步揭示各个环节中的具体实施细节,以及核心代码实现方式,希望借此机会为大家提供一个清晰、高效且可复用的方法模板。 ![用友与SCM系统接口开发配置](https://pic.qeasy.cloud/D3.png~tplv-syqr462i7n-qeasy.image) ### 调用金蝶云星空接口executeBillQuery获取并加工数据 在数据集成的生命周期中,调用源系统接口获取数据是至关重要的一步。本文将详细探讨如何通过轻易云数据集成平台调用金蝶云星空的`executeBillQuery`接口,获取业务员相关数据并进行初步加工。 #### 接口配置与请求参数 首先,我们需要配置元数据,以便正确调用金蝶云星空的API接口。以下是元数据配置的详细内容: ```json { "api": "executeBillQuery", "effect": "QUERY", "method": "POST", "number": "FNumber", "id": "{FOperatorId}{FBizOrgId}{FName}{Fdept}", "name": "FNumber", "request": [ {"field":"FOperatorId","label":"FOperatorId","type":"string","value":"FOperatorId"}, {"field":"FBillNo","label":"单据编号","type":"string","value":"FBillNo"}, {"field":"FOperatorType","label":"业务员类型","type":"string","value":"FOperatorType"}, {"field":"FCreatorId","label":"创建人","type":"string","value":"FCreatorId"}, {"field":"FStaffId","label":"职员","type":"string","value":"FStaffId.FStaffNumber"}, {"field":"Fdept","label":"部门","type":"string","value":"Fdept"}, {"field":"FBizOrgId","label":"业务组织","type":"string","value":"FBizOrgId.FNumber"}, {"field":"FName","label":"业务员名称","type":"string","value":"FName"}, {"field":"FNumber","label":"业务员编码","type":"string","value":"FNumber"}, {"field":"FEmpNumber","label":"员工编码","type":"string","value":"FEmpNumber"}, {"field": "FBizOrgId_FName", "label": "业务组织", "type": "string", "value": "FBizOrgId.FName"}, {"field": "FForbiddenStatus", "label": "禁用状态", "type": "string", "value": "FForbiddenStatus"} ], "otherRequest": [ {"field": "Limit", "label": "最大行数", "type": "string", "describe": "金蝶的查询分页参数", "value": "{PAGINATION_PAGE_SIZE}"}, {"field": "StartRow", "label": "开始行索引", "type": "string", "describe": "金蝶的查询分页参数", "value": "{PAGINATION_START_ROW}"}, {"field": "TopRowCount", "label": "返回总行数", "type": int, describe: 金蝶的查询分页参数}, {"field": FilterString, label: 过滤条件, type: string, describe: 示例写法 FSupplierId.FNumber = 'VEN00010' and FApproveDate>=, value: FForbiddenStatus='0' and FBizOrgId.FNumber in ('101','105') and FCreateDate>={{LAST_SYNC_TIME|datetime}}}, {"field" : FieldKeys, label : 需查询的字段key集合, type : array, describe : 金蝶分录主键ID格式: FPOOrderEntry_FEntryId,其它格式 FPurchaseOrgId.FNumber , parser : {name : ArrayToString , params : , }}, { field: FormId , label: 业务对象表单 Id , type: string , describe: 必须填写金蝶的表单 ID 如:PUR_PurchaseOrder , value: BD_OPERATOR } ], autoFillResponse: true } ``` #### 请求示例 根据上述元数据配置,我们可以构建一个请求示例: ```json { "_api_": "/k3cloud/Kingdee.BOS.WebApi.ServicesStub.DynamicFormService.ExecuteBillQuery.common.kdsvc", "_method_": POST, "_params_":{ "_FormID_ ": BD_OPERATOR, "_FieldKeys_ ": [" FOperatorId "," FBillNo "," FOperatorType "," FCreatorId "," FStaffId.FStaffNumber "," Fdept "," FBizOrgId.FNumber "," FName "," FNumber "," FEmpNumber "," FBizOrgId.FName "," FForbiddenStatus"], "_FilterString_ ": “ FForbiddenStatus='0' and FBizOrgId.FNumber in ('101','105') and FCreateDate>={{LAST_SYNC_TIME|datetime}}”, "_Limit_ ": {PAGINATION_PAGE_SIZE}, "_StartRow_ ": {PAGINATION_START_ROW} } } ``` #### 数据清洗与转换 在获取到原始数据后,需要对其进行清洗和转换,以便后续处理和存储。以下是一个简单的数据清洗和转换示例: 1. **去除无效字段**:过滤掉不需要的字段,例如一些内部标识或临时变量。 2. **字段重命名**:将字段名转换为更具可读性的名称,例如将`FBillNo`重命名为`Bill Number`。 3. **数据格式化**:将日期、数字等字段格式化为标准格式。 ```python def clean_and_transform(data): cleaned_data = [] for record in data: cleaned_record = { 'Operator ID': record['FOperatorId'], 'Bill Number': record['FBillNo'], 'Operator Type': record['FOperatorType'], 'Creator ID': record['FCreatorId'], 'Staff Number': record['FStaffId.FStaffNumber'], 'Department': record['Fdept'], 'Business Organization ID': record['FBizOrgId.FNumber'], 'Operator Name': record['FName'], 'Operator Code': record['FNumber'], 'Employee Number': record['FEmpNumber'], 'Business Organization Name': record['FBizOrgId.FName'], 'Forbidden Status': record['FForbiddenStatus'] } cleaned_data.append(cleaned_record) return cleaned_data ``` #### 实时监控与异常处理 在整个过程中,实时监控和异常处理也是不可忽视的重要环节。通过轻易云平台提供的监控工具,可以实时查看数据流动情况,并及时发现和处理异常。例如,当接口返回错误或数据不符合预期时,可以记录日志并发送告警通知。 ```python try: response = execute_api_call(api_endpoint, params) if response.status_code != 200: raise Exception(f"API call failed with status code {response.status_code}") data = response.json() except Exception as e: log_error(e) send_alert(f"Data integration failed: {str(e)}") ``` 通过以上步骤,我们可以高效地调用金蝶云星空接口获取业务员相关数据,并进行初步加工,为后续的数据处理和分析打下坚实基础。 ![如何开发钉钉API接口](https://pic.qeasy.cloud/S13.png~tplv-syqr462i7n-qeasy.image) ### 数据转换与写入:轻易云数据集成平台API接口技术案例 在数据集成生命周期的第二步,我们需要将已经集成的源平台数据进行ETL转换,转为目标平台轻易云集成平台API接口所能够接收的格式,并最终写入目标平台。本文将详细探讨这一过程中的关键技术点和具体实现方法。 #### API接口元数据配置解析 在本案例中,我们使用了以下元数据配置: ```json { "api": "写入空操作", "effect": "EXECUTE", "method": "POST", "idCheck": true } ``` 该配置项包含了几个关键属性: - `api`: 指定了目标API接口的名称,这里是“写入空操作”。 - `effect`: 表示该操作的效果,这里是执行操作(EXECUTE)。 - `method`: 指定了HTTP请求方法,这里是POST。 - `idCheck`: 表示是否进行ID检查,这里设置为true。 这些属性共同定义了如何与目标平台的API接口进行交互。 #### 数据转换过程 在进行数据转换之前,首先需要明确源数据的结构和目标API接口所需的数据格式。假设我们从金蝶业务员查询到的数据如下: ```json { "salesmanId": "12345", "name": "张三", "department": "销售部" } ``` 而目标API接口期望接收到的数据格式可能是: ```json { "id": "12345", "fullName": "张三", "dept": "销售部" } ``` 为了实现这一转换,我们可以编写一个简单的数据映射函数: ```python def transform_data(source_data): return { "id": source_data["salesmanId"], "fullName": source_data["name"], "dept": source_data["department"] } ``` #### 数据写入过程 完成数据转换后,接下来就是通过轻易云集成平台API接口将数据写入目标平台。我们需要构建一个HTTP POST请求,将转换后的数据发送到指定的API端点。 以下是一个使用Python和requests库实现这一过程的示例代码: ```python import requests def write_to_target_platform(transformed_data): url = "<轻易云集成平台API端点>" headers = { 'Content-Type': 'application/json' } response = requests.post(url, json=transformed_data, headers=headers) if response.status_code == 200: print("Data written successfully.") else: print(f"Failed to write data. Status code: {response.status_code}") # 示例调用 source_data = { "salesmanId": "12345", "name": "张三", "department": "销售部" } transformed_data = transform_data(source_data) write_to_target_platform(transformed_data) ``` #### ID检查机制 在元数据配置中,`idCheck`属性被设置为true,这意味着在写入数据之前,需要对ID进行检查,以确保不会重复或冲突。实现这一机制的一种方法是在发送POST请求之前,先发送一个GET请求以检查ID是否存在: ```python def check_id_exists(salesman_id): check_url = f"<轻易云集成平台API端点>/{salesman_id}" response = requests.get(check_url) return response.status_code == 200 def write_to_target_platform_with_id_check(transformed_data): if check_id_exists(transformed_data["id"]): print("ID already exists. Skipping write operation.") return url = "<轻易云集成平台API端点>" headers = { 'Content-Type': 'application/json' } response = requests.post(url, json=transformed_data, headers=headers) if response.status_code == 200: print("Data written successfully.") else: print(f"Failed to write data. Status code: {response.status_code}") # 示例调用 source_data = { "salesmanId": "12345", "name": "张三", "department": "销售部" } transformed_data = transform_data(source_data) write_to_target_platform_with_id_check(transformed_data) ``` 通过上述步骤,我们实现了从源平台数据查询、ETL转换到目标平台数据写入的完整流程。这一过程不仅体现了轻易云数据集成平台在处理异构系统间无缝对接方面的强大能力,也展示了如何利用其提供的元数据配置高效地完成复杂的数据集成任务。 ![金蝶与外部系统打通接口](https://pic.qeasy.cloud/T29.png~tplv-syqr462i7n-qeasy.image)