从金蝶云星空到轻易云的ETL转换与数据写入

  • 轻易云集成顾问-曹润
### 金蝶客户查询:高效对接金蝶云星空与轻易云集成平台 金蝶云星空作为一款广为企业使用的ERP系统,数据的实时同步和高效处理尤为关键。在此次项目中,我们通过轻易云数据集成平台,实现了金蝶客户查询功能的数据对接。以下我们将详细阐述如何使用executeBillQuery API从金蝶云星空获取相关客户信息,并通过写入空操作API快速、可靠地将这些数据整合到轻易云集成平台内。 #### 技术要点概览 1. **定时可靠的数据抓取** 为确保不漏单且及时更新,我们在轻易云集成平台上配置了定时任务,定期调用金蝶API executeBillQuery。这不仅能够保证新产生的数据能及时提取,还规避了大量人工干预的可能性,提高工作效率。 2. **分页与限流处理** 考虑到executeBillQuery接口返回的数据量较大,为避免请求失败及资源占用过多问题,我们采用分页技术进行数据抓取。此外,通过设置合理的限流策略,有效减少因并发请求带来的负担,使得系统运行更稳健。 3. **自定义数据转换逻辑** 在实际操作中,不同系统间的数据结构往往存在差异。我们利用轻易云提供的可视化数据流设计工具,自定义了一套适用于本次项目需求的数据映射规则,确保从金蝶导入的数据能够被正确解析和存储,同时增强了灵活性。 4. **集中监控与告警机制** 结合轻易云的平台特性,每个步骤都设有实时监控和日志记录功能,帮助我们全面追踪整个数据集成过程。一旦出现异常情况,如接口调用失败或网络波动,可通过告警系统第一时间通知相关人员进行处理,大大缩短故障响应时间。 5. **高吞吐量支持下的大批量数据写入** 由于公司业务需要频繁、大规模地访问和存储用户信息,对我们的集成能力提出了严峻挑战。幸运的是,凭借轻易云强大的并行处理能力,即便面对庞大的客户信息量,也能使其快速、高效地写入到目标数据库中,从而实现无缝衔接。 以上是此次“金蝶客户查询”案例中的主要技术实施要点,在后续内容里,将进一步详细解析各模块配置步骤及具体实现方案,包括如何捕捉执行过程中的细节问题以及有效解决方法。 ![打通企业微信数据接口](https://pic.qeasy.cloud/D9.png~tplv-syqr462i7n-qeasy.image) ### 调用金蝶云星空接口executeBillQuery获取并加工数据 在数据集成生命周期的第一步中,调用源系统的API接口是至关重要的一环。本文将深入探讨如何通过轻易云数据集成平台,调用金蝶云星空的`executeBillQuery`接口来获取客户数据,并进行初步加工。 #### 接口配置与请求参数 首先,我们需要配置调用金蝶云星空接口的元数据。以下是关键的配置项: - **API**: `executeBillQuery` - **Method**: `POST` - **FormId**: `BD_Customer`(业务对象表单ID) - **FieldKeys**: 需查询的字段key集合 - **Pagination**: 分页参数,包括每页大小和起始行索引 元数据配置如下: ```json { "api": "executeBillQuery", "method": "POST", "number": "FName", "id": "FCUSTID", "pagination": { "pageSize": 100 }, "idCheck": true, "request": [ {"field":"FCUSTID","label":"FCUSTID","type":"string","value":"FCUSTID"}, {"field":"FNumber","label":"编码","type":"string","value":"FNumber"}, {"field":"FName","label":"名称","type":"string","value":"FName"}, {"field":"FCreateOrgId_FNumber","label":"创建组织","type":"string","value":"FCreateOrgId.FNumber"}, {"field":"FUseOrgId_FNumber","label":"使用组织","type":"string","value":"FUseOrgId.FNumber"}, {"field":"FDescription","label":"描述","type":"string","value":"FDescription"}, {"field":"FIsTrade","label":"是否交易客户","type":"string","value":"FIsTrade"}, {"field":"FCustTypeId_FNumber","label":"客户类别","type":"string","value":"FCustTypeId.FNumber"}, {"field":"FGroup_FNumber","label":"客户分组","type":"string","value":"FGroup.FNumber"}, {"field":"FSALDEPTID_FNumber","label":"销售部门","type":"string","value":"FSALDEPTID.FNumber"}, {"field":"FSELLER_FNumber","label":"销售员","type":"string","value":"FSELLER.FNumber"}, {"field":...} ], "otherRequest": [ {"field": "Limit", "label": "最大行数", "type": "string", "describe": "金蝶的查询分页参数", "value": "{PAGINATION_PAGE_SIZE}"}, {"field": "StartRow", "label": "开始行索引", "type": "string", "describe": "金蝶的查询分页参数", "value": "{PAGINATION_START_ROW}"}, {"field":...} ] } ``` #### 构建请求体 根据上述元数据配置,我们需要构建一个符合金蝶云星空API要求的请求体。以下是一个示例请求体: ```json { "FormId": "BD_Customer", "FieldKeys": ["FCUSTID", "FNumber", ...], "FilterString": "", "Limit": 100, "StartRow": 0, ... } ``` #### 调用API并处理响应 通过轻易云平台,我们可以使用异步方式调用该API,并处理返回的数据。以下是一个示例代码片段,用于发送请求和处理响应: ```python import requests url = 'https://api.kingdee.com/executeBillQuery' headers = {'Content-Type': 'application/json'} payload = { 'FormId': 'BD_Customer', 'FieldKeys': 'FCUSTID,FNumber,...', 'FilterString': '', 'Limit': 100, 'StartRow': 0 } response = requests.post(url, headers=headers, json=payload) data = response.json() # 数据处理逻辑 for record in data['Result']['ResponseStatus']['SuccessEntitys']: process_record(record) ``` #### 数据清洗与转换 在获取到原始数据后,需要对其进行清洗和转换,以便后续写入目标系统。这一步通常包括以下操作: 1. **字段映射**:将源系统字段映射到目标系统字段。 2. **数据格式转换**:如日期格式、数值类型等。 3. **过滤无效数据**:去除不符合业务规则的数据。 例如,对于获取到的客户数据,可以进行如下处理: ```python def process_record(record): cleaned_data = { 'CustomerID': record['FCUSTID'], 'CustomerName': record['FName'], ... # 更多字段映射和转换 } # 写入目标系统或进一步处理 ``` 通过上述步骤,我们可以高效地从金蝶云星空获取客户数据,并进行初步加工,为后续的数据写入和业务应用打下坚实基础。 ![如何对接企业微信API接口](https://pic.qeasy.cloud/S24.png~tplv-syqr462i7n-qeasy.image) ### 金蝶客户查询数据的ETL转换与写入轻易云集成平台 在数据集成过程中,ETL(提取、转换、加载)是关键步骤之一。本文将详细探讨如何将金蝶客户查询的数据通过ETL过程转换为轻易云集成平台API接口所能够接收的格式,并最终写入目标平台。 #### 数据提取与清洗 首先,从金蝶系统中提取客户数据。假设我们已经成功获取了原始数据,接下来需要对这些数据进行清洗和预处理。这包括去除重复记录、处理缺失值以及标准化字段格式等操作。 ```python import pandas as pd # 假设从金蝶系统中提取的数据存储在一个DataFrame中 raw_data = pd.read_csv('kingdee_customers.csv') # 去除重复记录 cleaned_data = raw_data.drop_duplicates() # 处理缺失值,例如用空字符串填充缺失的客户名称 cleaned_data['customer_name'].fillna('', inplace=True) # 标准化字段格式,例如将所有的电话号码格式化为统一形式 cleaned_data['phone_number'] = cleaned_data['phone_number'].apply(lambda x: format_phone_number(x)) ``` #### 数据转换 在数据清洗完成后,需要将其转换为轻易云集成平台API接口能够接收的格式。根据提供的元数据配置,API接口使用POST方法,并且需要进行ID检查。 ```python import json def transform_data(data): transformed_records = [] for index, row in data.iterrows(): record = { "customer_id": row['customer_id'], "customer_name": row['customer_name'], "phone_number": row['phone_number'], "email": row['email'] } transformed_records.append(record) return transformed_records transformed_data = transform_data(cleaned_data) ``` #### 数据写入目标平台 最后,将转换后的数据通过API接口写入轻易云集成平台。这里需要特别注意的是,根据元数据配置中的`idCheck`属性,需要在写入前进行ID检查,以确保不会重复插入相同的数据。 ```python import requests def write_to_target_platform(api_url, data): headers = {'Content-Type': 'application/json'} for record in data: # 进行ID检查,如果存在则跳过写入 if check_id_exists(api_url, record['customer_id']): continue response = requests.post(api_url, headers=headers, data=json.dumps(record)) if response.status_code != 200: print(f"Failed to write record {record['customer_id']}: {response.text}") def check_id_exists(api_url, customer_id): # 假设有一个API可以用来检查ID是否存在 check_url = f"{api_url}/check/{customer_id}" response = requests.get(check_url) return response.status_code == 200 and response.json().get('exists', False) api_url = "https://api.qingyiyun.com/write" write_to_target_platform(api_url, transformed_data) ``` 通过上述步骤,我们实现了从金蝶系统提取客户数据,经过清洗和转换后,通过轻易云集成平台的API接口成功写入目标平台。这一过程不仅保证了数据的一致性和完整性,还提高了系统间的数据交互效率。 ![如何对接钉钉API接口](https://pic.qeasy.cloud/T3.png~tplv-syqr462i7n-qeasy.image)