从数据抓取到写入:金蝶云星空ETL项目解析

  • 轻易云集成顾问-潘裕

金蝶员工查询系统对接案例分享

在现代企业的信息化建设过程中,实现各业务系统之间的数据集成至关重要。本文将重点介绍如何通过使用轻易云数据集成平台,将金蝶云星空的API executeBillQuery 获取到的员工信息数据,批量写入另一实例中。

此次项目名为“金蝶员工查询”,主要解决了以下几个技术难题与挑战:

  1. 高吞吐量的数据写入能力:确保大量员工信息数据能够快速、高效地被集成至目标金蝶云星空系统中。
  2. API资产管理:通过高度可视化的控制台和统一视图,对所有API请求和响应进行集中监控,使得接口调用情况一目了然,从而实现资源优化配置。
  3. 自定义数据转换逻辑:针对不同业务场景下,对获取的数据进行格式转换,以满足目标系统的具体需求,并处理源、目标端接口分页及限流问题。

项目实施细节概述

本次项目采用executeBillQuery获取源金蝶云星空的员工数据,通过定时触发机制定期抓取,确保不漏单。在整个过程中,我们使用批量写入 API batchSave 实现大量数据快速而可靠地导入目标系统。同时,为保障稳定性与实时性,还利用了如下技术手段:

  • 实时监控及告警:在执行每个任务环节时,开发团队借助轻易云平台提供的集中监控功能,不仅可以追踪任务状态,还能及时处理异常状况,提高整体效率和准确性。
  • 异常重试机制:为了应对网络波动或其他不可预测因素导致的数据传输失败,我们实现了一套完整的错误重试机制,当发现某些记录未成功存储时,可自动重新发起请求以保证最终操作的一致性。

接下来,我们将详细探讨具体方案设计,包括如何设置获取接口executeBillQuery以及调用batchSave 写入接口的方法,以及处理过程中的各种技术要点及其解决方案。 金蝶与WMS系统接口开发配置

调用源系统金蝶云星空接口executeBillQuery获取并加工数据

在数据集成生命周期的第一步,我们需要从源系统获取数据,并对其进行初步加工。本文将详细探讨如何通过轻易云数据集成平台调用金蝶云星空接口executeBillQuery来实现这一目标。

接口配置与请求参数

首先,我们需要了解接口的基本配置和请求参数。根据提供的元数据配置,executeBillQuery接口的主要特性如下:

  • API名称: executeBillQuery
  • 请求方法: POST
  • 主要字段:
    • FNumber: 员工编号
    • FID: 员工ID
    • FName: 员工姓名
    • FMobile: 员工手机
    • FEmail: 员工邮箱
    • FPostDept: 所属部门
    • FBaseProperty3: 基本属性

此外,还有一些分页和过滤参数,例如:

  • Limit: 查询分页参数,指定每页返回的数据条数。
  • StartRow: 查询分页参数,指定查询起始行。
  • TopRowCount: 查询分页参数,指定返回的前几行数据。
  • FilterString: 过滤条件字符串,例如:FSupplierId.FNumber = 'VEN00010' and FApproveDate>=
  • FieldKeys: 指定需要返回的字段。
  • FormId: 表单ID,例如:BD_Empinfo

请求示例

为了更好地理解如何调用该接口,我们可以构建一个具体的请求示例。假设我们需要查询所有状态为“已审核”的员工信息,每次查询返回10条记录,从第0行开始。请求体可以如下构建:

{
    "FormId": "BD_Empinfo",
    "FieldKeys": ["FNumber", "FID", "FName", "FMobile", "FEmail", "FPostDept", "FBaseProperty3"],
    "FilterString": "FDocumentStatus='C'",
    "Limit": 10,
    "StartRow": 0
}

数据清洗与加工

在获取到原始数据后,我们需要对其进行清洗和加工,以便后续的数据转换与写入。以下是一些常见的数据清洗操作:

  1. 字段重命名: 将原始字段名转换为目标系统所需的字段名。例如,将FNumber重命名为EmployeeNumber
  2. 数据类型转换: 确保每个字段的数据类型符合目标系统的要求。例如,将字符串类型的日期转换为日期对象。
  3. 缺失值处理: 对于缺失值,可以选择填充默认值、删除对应记录或进行其他处理。

以下是一个简单的数据清洗示例代码(假设使用Python):

import json

# 假设response_data是从金蝶接口获取到的原始数据
response_data = [
    {"FNumber": "E001", "FID": "1001", "FName": "张三", "FMobile": "13800000000", 
     "FEmail": "zhangsan@example.com", "FPostDept": "销售部", 
     "FBaseProperty3": ""},
    # 更多记录...
]

# 清洗后的数据列表
cleaned_data = []

for record in response_data:
    cleaned_record = {
        "EmployeeNumber": record["FNumber"],
        "EmployeeID": record["FID"],
        "EmployeeName": record["FName"],
        "MobilePhone": record["FMobile"],
        "EmailAddress": record["FEmail"],
        "Department": record["FPostDept"],
        # 对于空值,可以选择填充默认值或进行其他处理
        "BaseProperty3": record["FBaseProperty3"] if record["FBaseProperty3"] else None,
    }
    cleaned_data.append(cleaned_record)

# 将清洗后的数据转换为JSON格式,便于后续处理
cleaned_data_json = json.dumps(cleaned_data, ensure_ascii=False)
print(cleaned_data_json)

实时监控与调试

在实际操作中,实时监控和调试是确保数据集成过程顺利进行的重要环节。轻易云平台提供了全透明可视化的操作界面,可以实时监控数据流动和处理状态。在调用接口和处理数据时,可以通过日志记录、异常捕获等方式及时发现并解决问题。

例如,在Python代码中添加日志记录:

import logging

# 配置日志记录
logging.basicConfig(level=logging.INFO)

try:
    # 假设调用金蝶接口并获取响应数据
    response_data = call_executeBillQuery_api()

    logging.info("成功获取到响应数据")

    # 数据清洗过程...

except Exception as e:
    logging.error(f"发生错误: {e}")

通过上述步骤,我们可以高效地调用金蝶云星空接口获取并加工员工信息,为后续的数据转换与写入打下坚实基础。 如何对接用友BIP接口

使用轻易云数据集成平台将数据转换并写入金蝶云星空API接口

在数据集成生命周期的第二步,我们需要将已集成的源平台数据进行ETL转换,确保其符合目标平台——金蝶云星空API接口所能接收的格式,并最终成功写入目标平台。以下是具体的技术实现过程。

元数据配置解析

首先,我们需要理解元数据配置,以便正确地进行API调用。以下是元数据配置的详细信息:

{
  "api": "batchSave",
  "effect": "EXECUTE",
  "method": "POST",
  "idCheck": true,
  "otherRequest": [
    {
      "field": "FormId",
      "label": "FormId",
      "type": "string",
      "describe": "必须填写金蝶的表单ID如:PUR_PurchaseOrder",
      "value": "PLN_FORECAST"
    },
    {
      "field": "Operation",
      "label": "Operation",
      "type": "string",
      "value": "BatchSave"
    },
    {
      "field": "IsAutoSubmitAndAudit",
      "label": "IsAutoSubmitAndAudit",
      "type": "bool",
      "value": true
    },
    {
      "field": "IsVerifyBaseDataField",
      "label": "IsVerifyBaseDataField",
      "type": "bool",
      "describe":"是否验证所有的基础资料有效性,布尔类,默认false(非必录)",
      "value": false
    }
  ],
  "operation":{
    "method":"batchArraySave",
    "rows":1,
    "rowsKey":"array"
  }
}

API接口调用步骤

  1. 构建请求体: 根据元数据配置,我们需要构建一个符合金蝶云星空API要求的请求体。请求体主要包括FormIdOperationIsAutoSubmitAndAuditIsVerifyBaseDataField等字段。

    {
     "FormId":"PLN_FORECAST",
     "Operation":"BatchSave",
     “IsAutoSubmitAndAudit”:true,
     “IsVerifyBaseDataField”:false,
     “Data”:[...] // 数据数组
    }
  2. 数据转换: 将源平台的数据转换为目标平台所需的数据格式。假设源平台的数据如下:

    [
     {"EmployeeID":"E001","Name":"John Doe","Department":"Sales"},
     {"EmployeeID":"E002","Name":"Jane Smith","Department":"Marketing"}
    ]

    转换后应符合金蝶云星空API要求的数据结构:

    [
     {
       “FStaffNumber”:”E001”,
       “FStaffName”:”John Doe”,
       “FDeptName”:”Sales”
     },
     {
       “FStaffNumber”:”E002”,
       “FStaffName”:”Jane Smith”,
       “FDeptName”:”Marketing”
     }
    ]
  3. 发送HTTP请求: 使用POST方法将构建好的请求体发送到金蝶云星空API接口。

    import requests
    
    url = 'https://api.kingdee.com/batchSave'
    
    headers = {
       'Content-Type': 'application/json',
       'Authorization': 'Bearer your_access_token'
    }
    
    payload = {
       'FormId': 'PLN_FORECAST',
       'Operation': 'BatchSave',
       'IsAutoSubmitAndAudit': True,
       'IsVerifyBaseDataField': False,
       'Data': [
           {'FStaffNumber':'E001', 'FStaffName':'John Doe', 'FDeptName':'Sales'},
           {'FStaffNumber':'E002', 'FStaffName':'Jane Smith', 'FDeptName':'Marketing'}
       ]
    }
    
    response = requests.post(url, json=payload, headers=headers)
    
    if response.status_code == 200:
       print('Data successfully written to Kingdee Cloud')
    else:
       print('Failed to write data:', response.text)

注意事项

  • 字段验证:确保所有字段都符合金蝶云星空API接口的要求,特别是必填字段。
  • 错误处理:在实际应用中,需要对API返回结果进行详细的错误处理,以便及时发现和解决问题。
  • 性能优化:对于大批量数据,可以考虑分批次提交,以提高效率和成功率。

通过以上步骤,我们可以高效地将源平台的数据转换并写入到金蝶云星空,实现不同系统间的数据无缝对接。这不仅提升了业务流程的自动化程度,也极大提高了数据处理的准确性和效率。 打通金蝶云星空数据接口