ETL转换:如何使用轻易云将金蝶数据写入目标平台

  • 轻易云集成顾问-冯潇

【查询】金蝶客户:从金蝶云星空到轻易云数据集成的技术实现

在企业信息系统整合过程中,确保不同平台间的数据无缝对接是一个普遍且关键的问题。本文将聚焦于一个实际应用案例:[【查询】金蝶客户],详细说明如何将金蝶云星空数据集成至轻易云数据集成平台。

如何处理 executeBillQuery 接口的调用与分页限流问题

首先,我们通过调用executeBillQuery接口来获取金蝶云星空中的客户数据。在这一过程中需要特别注意的是分页和限流机制,这是为了避免因大批量请求而导致接口性能下降或超时错误。具体做法是:

  1. 分页策略:每次请求限制在一定数量(如100条),逐页获取直至所有数据抓取完毕。
  2. 限流控制:设置并发请求数限制,根据API文档建议进行合理配置,以确保不会触发频率限制。

代码示例如下:

def fetch_kd_cust_data(page_num, page_size):
    url = "https://api.kingdee.com/executeBillQuery"
    params = {
        "pageNum": page_num,
        "pageSize": page_size
    }
    response = requests.get(url, params=params)
    if response.status_code == 200:
        return response.json()
    else:
        handle_error(response)

# Example usage
all_data = []
page_num = 1
while True:
    data_batch = fetch_kd_cust_data(page_num, 100)
    if not data_batch: 
        break
    all_data.extend(data_batch)
    page_num += 1 

数据格式差异与映射规则

由于金蝶云星空和轻易云两者之间的数据格式可能存在差异,在有效完成这一步骤之前,需要进行必要的数据转换和映射。例如,将金蝶的JSON结构中字段名customerName映射为轻易表格存储中的cust_name

常见处理方式包括但不限于字典映射、序列化/反序列化工具等。主要目标是在不丢失业务语义的情况下,实现平稳过渡。例如:

def map_kd_to_qy(data):
   mapped_data = {
      "cust_name": data["customerName"],
      # Add additional mappings as required...
   }
   return mapped_data

# Apply mapping to all fetched data before writing to the target platform.
transformed_data = [map_kd_to_qy(item) for item in all_data]

数据写入及异常处理机制

在成功获得并转换所需数据后,通过轻易云集成平台提供的写入API进行批量写入操作。在此 用友与外部系统接口集成开发

调用金蝶云星空接口executeBillQuery获取并加工数据

在数据集成的生命周期中,调用源系统接口是至关重要的一步。本文将详细探讨如何通过轻易云数据集成平台调用金蝶云星空的executeBillQuery接口来获取客户数据,并进行必要的数据加工。

接口配置与调用

首先,我们需要了解如何配置和调用executeBillQuery接口。根据提供的元数据配置,我们可以看到该接口采用POST方法,主要用于查询操作(effect: QUERY)。以下是具体的请求字段及其描述:

{
  "api": "executeBillQuery",
  "effect": "QUERY",
  "method": "POST",
  "number": "FNumber",
  "id": "FCUSTID",
  "name": "FNumber",
  "idCheck": true,
  "request": [
    {"field":"FCUSTID","label":"FCUSTID","type":"string","describe":"FCUSTID","value":"FCUSTID"},
    {"field":"FNumber","label":"编码1","type":"string","describe":"编码","value":"FNumber"},
    {"field":"FName","label":"名称","type":"string","describe":"名称","value":"FName"},
    {"field":"FCreateOrgId_FNumber","label":"创建组织","type":"string","describe":"创建组织","value":"FCreateOrgId.FNumber"},
    {"field":"FUseOrgId_FNumber","label":"使用组织","type":"string","describe":"使用组织","value":"FUseOrgId.FNumber"},
    {"field":"FDescription","label":"描述","type":"string","describe":"描述","value":"FDescription"},
    {"field":"FCustTypeId_FNumber","label":"客户类别","type":"string","describe":"客户类别","value":"FCustTypeId.FNumber"},
    {"field":"FGroup_FNumber","label":"客户分组","type":"string","describe":"客户分组","value":"FGroup.FNumber"},
    {"field":"FSALDEPTID_FNumber","label":"销售部门","type":""},
    ...
  ],
  ...
}

请求参数解析

在请求参数中,关键字段包括:

  • FormId: 必须填写金蝶的表单ID,例如:BD_Customer
  • FieldKeys: 查询字段的key集合,格式为数组。
  • FilterString: 用于过滤查询结果的条件字符串。
  • Limit, StartRow, TopRowCount: 分页参数,用于控制查询结果的分页。

以下是一个示例请求体:

{
  "FormId": "BD_Customer",
  "FieldKeys": ["FCUSTID", "FNumber", "FName", ...],
  "FilterString": "FSupplierId.FNumber = 'VEN00010' and FApproveDate>='2023-01-01'",
  "Limit": 100,
  "StartRow": 0
}

数据清洗与加工

在获取到原始数据后,需要对其进行清洗和加工,以满足业务需求。以下是一些常见的数据清洗与加工操作:

  1. 字段映射:将原始数据中的字段映射到目标系统所需的字段。例如,将FCUSTID映射为目标系统中的客户ID。
  2. 数据格式转换:将日期、金额等字段转换为目标系统所需的格式。例如,将日期格式从YYYY-MM-DD转换为MM/DD/YYYY
  3. 数据过滤:根据业务规则过滤掉不需要的数据。例如,只保留状态为“有效”的客户记录。

以下是一个简单的数据清洗示例:

def clean_data(raw_data):
    cleaned_data = []
    for record in raw_data:
        cleaned_record = {
            'customer_id': record['FCUSTID'],
            'customer_number': record['FNumber'],
            'customer_name': record['FName'],
            'organization': record['FCreateOrgId_FNumber'],
            ...
        }
        # 数据过滤
        if record['status'] == '有效':
            cleaned_data.append(cleaned_record)
    return cleaned_data

实时监控与错误处理

在数据集成过程中,实时监控和错误处理同样重要。轻易云平台提供了实时监控功能,可以帮助我们及时发现和解决问题。例如,当接口调用失败时,可以通过日志和告警机制快速定位问题并采取相应措施。

def monitor_and_handle_errors(response):
    if response.status_code != 200:
        log_error(response)
        send_alert("API调用失败", response.text)

通过上述步骤,我们可以高效地调用金蝶云星空接口获取客户数据,并进行必要的数据清洗与加工,为后续的数据转换与写入做好准备。这不仅提升了业务透明度和效率,也确保了数据的一致性和准确性。 打通企业微信数据接口

使用轻易云数据集成平台进行ETL转换并写入目标平台

在数据集成的生命周期中,ETL(提取、转换、加载)是至关重要的一环。本文将详细探讨如何利用轻易云数据集成平台,将已经集成的源平台数据进行ETL转换,转为目标平台API接口所能够接收的格式,并最终写入目标平台。

数据提取与初步清洗

在进行ETL操作之前,首先需要从源系统中提取数据。在本案例中,我们假设已经从金蝶客户系统中成功提取了客户数据。这些数据通常以JSON或XML格式存储,并且可能包含冗余信息或不一致的数据格式。

数据转换

一旦完成初步清洗,下一步是将数据转换为目标平台所能接受的格式。轻易云数据集成平台提供了强大的数据转换功能,可以通过自定义脚本或内置函数来实现这一过程。

假设我们从金蝶系统中获取到以下客户信息:

{
  "customer_id": "12345",
  "name": "张三",
  "contact_info": {
    "phone": "13800138000",
    "email": "zhangsan@example.com"
  },
  "address": {
    "province": "北京",
    "city": "北京市",
    "district": "朝阳区"
  }
}

我们需要将这些信息转换为目标平台API接口所能接受的格式。根据元数据配置,我们知道目标API接口的相关信息如下:

{
  "api":"写入空操作",
  "effect":"EXECUTE",
  "method":"POST",
  "idCheck":true
}

为了满足目标API接口的需求,我们需要对原始数据进行以下处理:

  1. 将嵌套的contact_infoaddress字段展开。
  2. 确保所有字段名称符合目标平台要求。
  3. 根据idCheck参数,确保在写入之前检查ID是否存在。

处理后的数据可能如下:

{
  "customerId": "12345",
  "customerName": "张三",
  "phoneNumber": "13800138000",
  "emailAddress": "zhangsan@example.com",
  "provinceName": "北京",
  "cityName": "北京市",
  "districtName": "朝阳区"
}

数据加载

在完成数据转换之后,下一步是通过API接口将数据写入目标平台。根据元数据配置,我们使用HTTP POST方法来执行这一操作。

首先,需要构建HTTP请求:

POST /api/execute HTTP/1.1
Host: target-platform.example.com
Content-Type: application/json

{
  "customerId": "12345",
  "customerName": "张三",
  ...
}

在发送请求之前,需要检查ID是否存在。如果ID已经存在,则可能需要更新现有记录,而不是插入新记录。这可以通过向目标平台发送一个查询请求来实现:

GET /api/check-id?customerId=12345 HTTP/1.1
Host: target-platform.example.com

如果返回结果表明ID不存在,则可以安全地插入新记录;否则,需要更新现有记录。

实践案例

下面是一个实际的代码示例,展示如何使用Python脚本结合轻易云的数据集成功能来完成上述步骤:

import requests
import json

# 源系统中的客户信息
source_data = {
    'customer_id': '12345',
    'name': '张三',
    'contact_info': {
        'phone': '13800138000',
        'email': 'zhangsan@example.com'
    },
    'address': {
        'province': '北京',
        'city': '北京市',
        'district': '朝阳区'
    }
}

# 转换后的目标数据格式
target_data = {
    'customerId': source_data['customer_id'],
    'customerName': source_data['name'],
    'phoneNumber': source_data['contact_info']['phone'],
    'emailAddress': source_data['contact_info']['email'],
    'provinceName': source_data['address']['province'],
    'cityName': source_data['address']['city'],
    'districtName': source_data['address']['district']
}

# 检查ID是否存在
check_id_url = f"http://target-platform.example.com/api/check-id?customerId={target_data['customerId']}"
response = requests.get(check_id_url)
if response.status_code == 200 and response.json().get('exists'):
    print("ID already exists, updating record...")
else:
    print("ID does not exist, inserting new record...")

# 写入目标平台
write_url = f"http://target-platform.example.com/api/execute"
headers = {'Content-Type': 'application/json'}
response = requests.post(write_url, headers=headers, data=json.dumps(target_data))

if response.status_code == 200:
    print("Data written successfully")
else:
    print(f"Failed to write data: {response.status_code}")

通过上述步骤,我们成功地将金蝶客户系统中的数据提取、转换并写入到目标平台。这种基于轻易云的数据集成解决方案,不仅简化了跨系统的数据流动,还确保了每个环节的高效和透明。 钉钉与MES系统接口开发配置