ETL转换与数据写入在数据集成中的应用

  • 轻易云集成顾问-李国敏

金蝶云星空与轻易云数据集成:查询销售订单数据(电销退货)

在现代企业的数字化转型过程中,系统之间的数据无缝对接和高效集成变得愈发关键。本文将详细介绍如何通过轻易云数据集成平台,实现金蝶云星空销售订单数据(电销退货)的实时抓取和批量写入。具体案例方案命名为“查询金蝶销售订单(电销退货)”。

任务描述

此次集成旨在解决以下几个核心问题:确保每一笔交易记录都能准确、及时地上传至目标数据库,同时处理分页和限流等技术难点。这些操作可显著提升业务透明度,并提高运营效率。

数据获取与接口调用

首先,我们需要从金蝶云星空中获取相关的销售订单数据。在这一步骤中,使用API接口executeBillQuery来实现定时可靠的数据抓取。我们执行了以下步骤:

  1. 建立连接:配置与金蝶云星空API的网络连接。
  2. 调用接口:利用executeBillQuery方法进行调度,包括传递必要的参数如时间范围、状态标识等,确保不漏单。
def fetchKDSalesOrders():
    url = "https://api.kingdee.com/executeBillQuery"
    payload = {"date_range": "2023-09-01_to_2023-10-01", "order_type": "tele_sales_return"}
    response = requests.post(url, json=payload)
    return response.json()

数据格式转换及异常处理

由于金蝶云星空与轻易云平台的数据格式存在差异,我们需进行了适当的数据映射和格式转换,如字段重命名、类型校正等。同时,为了保证数据传输过程中的稳定性,我们设计了一套完备的异常处理机制,以应对可能出现的错误或网络波动:

def transformData(kd_data):
    transformed_data = []
    for record in kd_data:
        try:
            transformed_record = {
                "OrderID": record["billNo"],
                "CustomerName": record["custNam"],
                # 更多字段映射...
            }
            transformed_data.append(transformed_record)
        except KeyError as e:
            log.error(f"字段缺失: {e}")
            continue  # 跳过该条有误记录继续下一条
    return transformed_data

批量写入到轻易云平台

最后,将处理好的数据批量写入到轻易云平台,通过其提供的一组灵活API。这不仅极大地提升了写入速度,还保证了每日大量交易信息能够快速而精确地同步:

def write
![数据集成平台API接口配置](https://pic.qeasy.cloud/D35.png~tplv-syqr462i7n-qeasy.image)
### 调用金蝶云星空接口executeBillQuery获取并加工数据

在轻易云数据集成平台的生命周期中,调用源系统接口是至关重要的一步。本文将深入探讨如何通过调用金蝶云星空的`executeBillQuery`接口来获取销售订单(电销退货)数据,并进行初步加工。

#### 接口配置与请求参数

首先,我们需要配置元数据以便正确调用金蝶云星空的API接口。以下是元数据配置的详细内容:

```json
{
  "api": "executeBillQuery",
  "effect": "QUERY",
  "method": "POST",
  "number": "FBillNo",
  "id": "FSaleOrderEntry_FEntryID",
  "name": "FBillNo",
  "idCheck": true,
  "request": [
    {"field":"FID","label":"FID","type":"string","describe":"FID","value":"FID"},
    {"field":"FSaleOrderEntry_FEntryID","label":"FSaleOrderEntry_FEntryID","type":"string","describe":"FSaleOrderEntry_FEntryID","value":"FSaleOrderEntry_FEntryID"},
    {"field":"FBillNo","label":"单据编号","type":"string","describe":"单据编号","value":"FBillNo"},
    {"field":"FDocumentStatus","label":"单据状态","type":"string","describe":"单据状态","value":"FDocumentStatus"},
    {"field":"FSaleOrgId_FNumber","label":"销售组织","type":"string","describe":"销售组织","value":"FSaleOrgId.FNumber"},
    {"field":"FDate","label":"日期","type":"string","describe":"日期","value":"FDate"},
    {"field":"FCustId_FNumber","label":"客户","type":"string","describe":"客户","value":"FCustId.FNumber"},
    {"field":"FSaleDeptId_Fnumber","label":"销售部门","type":"string","describe":"销售部门","value":"FSaleDeptId.Fnumber"},
    {"field":"FReceiveAddress","label":"收货地址","type":"string","describe":"收货地址","value":"FReceiveAddress"},
    {"field":...},
    ...
  ],
  "otherRequest": [
    {"field":...},
    ...
  ],
  "autoFillResponse": true
}

请求参数详解

  1. 基本字段:包括FIDFBillNoFDocumentStatus等,这些字段用于标识和描述销售订单的基本信息。
  2. 业务字段:如FSaleOrgId_FNumber(销售组织)、FCustId_FNumber(客户)、FSaleDeptId_Fnumber(销售部门)等,这些字段用于业务逻辑处理。
  3. 分页参数:如LimitStartRowTopRowCount等,用于控制查询结果的分页。
  4. 过滤条件:通过设置合适的过滤条件,如 FilterString: FApproveDate>='{{LAST_SYNC_TIME|dateTime}}' and FBillTypeID.Fnumber = 'WDTTHDD', 可以精确筛选所需的数据。

调用API获取数据

在配置好元数据后,我们可以通过以下步骤来调用API并获取数据:

  1. 构建请求体: 根据元数据配置,构建请求体。请求体应包含所有必要的字段及其对应值。

    {
     "FormId": "SAL_SaleOrder",
     "FieldKeys": ["FID", "FBillNo", ...],
     ...
    }
  2. 发送请求: 使用HTTP POST方法发送请求到金蝶云星空的API端点。

    POST /k3cloud/api/executeBillQuery HTTP/1.1
    Host: api.kingdee.com
    Content-Type: application/json
    Authorization: Bearer <token>
    
    {
     ...
    }
  3. 处理响应: 接收到响应后,对返回的数据进行解析和初步加工。根据业务需求,可以对特定字段进行转换或清洗。

数据清洗与转换

在获取到原始数据后,下一步是对数据进行清洗和转换。这一步骤确保了数据的一致性和准确性,为后续的数据写入做好准备。

  1. 字段映射与转换: 根据业务需求,将原始字段映射到目标系统所需的字段。例如,将金蝶中的 FBillNo 映射为目标系统中的 order_number.

  2. 数据格式化: 对日期、数值等字段进行格式化处理。例如,将日期格式从 YYYY-MM-DD HH:mm:ss 转换为 YYYYMMDD.

  3. 异常处理: 对于缺失或异常的数据进行处理,例如填充默认值或记录日志以便后续排查。

示例代码

以下是一个示例代码片段,用于展示如何调用API并处理响应:

import requests
import json

# 构建请求体
payload = {
    "FormId": "SAL_SaleOrder",
    ...
}

# 设置请求头
headers = {
    'Content-Type': 'application/json',
    'Authorization': 'Bearer <token>'
}

# 发送请求
response = requests.post('https://api.kingdee.com/k3cloud/api/executeBillQuery', headers=headers, data=json.dumps(payload))

# 检查响应状态码
if response.status_code == 200:
    data = response.json()

    # 数据清洗与转换
    cleaned_data = []

    for entry in data['Result']['ResponseStatus']['SuccessEntitys']:
        cleaned_entry = {
            'order_number': entry['FBillNo'],
            'customer_id': entry['FCustId.FNumber'],
            ...
        }
        cleaned_data.append(cleaned_entry)

else:
    print(f"Error: {response.status_code}, {response.text}")

通过上述步骤,我们成功地调用了金蝶云星空的 executeBillQuery 接口,获取并初步加工了销售订单(电销退货)数据。这为后续的数据写入和进一步处理奠定了坚实基础。 钉钉与CRM系统接口开发配置

数据集成生命周期中的ETL转换与写入

在数据集成生命周期中,ETL(Extract, Transform, Load)转换与写入是至关重要的一步。本文将深入探讨如何将已经集成的源平台数据进行ETL转换,转为目标平台轻易云集成平台API接口所能够接收的格式,并最终写入目标平台。

数据请求与清洗

首先,我们从源平台(金蝶销售订单)获取数据。这一步涉及到通过API接口查询销售订单数据,并对原始数据进行清洗和预处理。假设我们已经完成了这一步,接下来我们重点关注如何将这些清洗后的数据进行ETL转换,并写入目标平台。

数据转换

在数据转换阶段,我们需要将源平台的数据格式转换为目标平台所能接受的格式。这里,我们假设金蝶销售订单的数据结构如下:

{
  "orderId": "12345",
  "customerName": "张三",
  "orderDate": "2023-10-01",
  "items": [
    {
      "itemCode": "A001",
      "quantity": 2,
      "price": 100.0
    },
    {
      "itemCode": "B002",
      "quantity": 1,
      "price": 200.0
    }
  ]
}

而目标平台轻易云集成平台API接口所能接受的数据结构如下:

{
  "api": "写入空操作",
  "effect": "EXECUTE",
  "method": "POST",
  "idCheck": true,
  "data": {
    "order_id": "",
    "customer_name": "",
    "order_date": "",
    "items_detail": []
  }
}

为了实现这一转换,我们需要编写一个转换函数,将源数据映射到目标数据结构。例如:

def transform_data(source_data):
    transformed_data = {
        "api": "写入空操作",
        "effect": "EXECUTE",
        "method": "POST",
        "idCheck": True,
        "data": {
            "order_id": source_data["orderId"],
            "customer_name": source_data["customerName"],
            "order_date": source_data["orderDate"],
            "items_detail": []
        }
    }

    for item in source_data["items"]:
        transformed_item = {
            "item_code": item["itemCode"],
            "quantity_purchased": item["quantity"],
            "unit_price": item["price"]
        }
        transformed_data["data"]["items_detail"].append(transformed_item)

    return transformed_data

数据写入

一旦数据被成功转换为目标格式,下一步就是通过API接口将其写入目标平台。根据元数据配置,我们使用POST方法进行数据提交,并确保启用ID检查功能。以下是一个示例代码段,展示如何通过HTTP请求将转换后的数据发送到目标平台:

import requests
import json

def write_to_target_platform(transformed_data):
    url = 'https://api.targetplatform.com/data'

    headers = {
        'Content-Type': 'application/json'
    }

    response = requests.post(url, headers=headers, data=json.dumps(transformed_data))

    if response.status_code == 200:
        print("Data successfully written to target platform.")
    else:
        print(f"Failed to write data. Status code: {response.status_code}, Response: {response.text}")

# 示例使用
source_data = {
  # 假设这是从金蝶获取并清洗后的数据
}

transformed_data = transform_data(source_data)
write_to_target_platform(transformed_data)

实时监控与调试

在实际操作中,实时监控和调试是确保数据正确性的关键步骤。通过轻易云提供的全透明可视化界面,我们可以实时监控每个环节的数据流动和处理状态。一旦发现问题,可以迅速定位并解决,从而极大提升业务的透明度和效率。

总结来说,通过上述步骤,我们实现了从源平台金蝶销售订单的数据请求、清洗、ETL转换到最终写入目标平台的全过程。这一过程不仅确保了数据的准确性和一致性,还大大提高了系统集成的效率和可靠性。 如何开发钉钉API接口