使用轻易云实现吉客云到金蝶云的数据ETL转换指南

  • 轻易云集成顾问-李国敏

吉客云数据集成到金蝶云星空的JY-BDS采购入库方案

在企业信息系统的日常运作中,如何高效、可靠地实现不同系统之间的数据对接是一个关键性问题。本文将详细探讨吉客云与金蝶云星空之间的数据集成案例——JY-BDS采购入库方案。在该方案中,我们利用轻易云数据集成平台,实现了两大核心系统间的无缝对接。

首先,该方案需要处理从吉客云获取ERP存货单据(erp.storage.goodsdocin)并将其批量写入到金蝶云星空(batchSave)。为了确保数据能够高吞吐量地准确传递,我们使用了以下技术要点:

  1. 定时抓取和实时监控:通过轻易云的平台调度机制,定时可靠地抓取吉客云接口数据,同时进行实时监控和日志记录,确保每一条数据都不漏单。

  2. 自定义转换逻辑:由于吉客云与金蝶云星空API所需的数据格式存在差异,我们进行了定制化的数据映射,以适应特定业务需求。从而保证输出符合目标系统预期。

  3. 分页及限流处理:为了解决吉客云接口在大量数据读取时可能遇到的分页及限流问题,在抓取过程中引入分页控制策略,并结合限流阈值进行优化配置,使得整个过程既安全又高效。

  4. 异常处理和重试机制:针对可能出现的网络波动或其他异常情况,本次实施过程中设计了一套完善的错误捕获和重试机制,一旦检测到失败便会自动触发重试流程,最大程度上提高了任务成功率。

  5. 集中式监控告警体系:提供集中式的监控和告警功能,可以实时跟踪所执行任务状态,通过可视化界面展示执行结果,并及时通知相关负责人进行处置。这种透明管理极大提升了系统维护效率。

以上描述只是这次集成项目的一部分内容,在后续章节中,将深入细节探讨实现这些功能的方法论及工具应用。 系统集成平台API接口配置

调用吉客云接口erp.storage.goodsdocin获取并加工数据

在数据集成的生命周期中,第一步是调用源系统的API接口以获取原始数据。本文将详细探讨如何通过轻易云数据集成平台调用吉客云的erp.storage.goodsdocin接口,并对获取的数据进行初步加工。

接口调用配置

根据元数据配置,我们需要设置以下参数来调用erp.storage.goodsdocin接口:

  • API: erp.storage.goodsdocin
  • 请求方法: POST
  • 分页设置: 每页50条记录
  • 入库类型: 101(采购入库)

请求参数包括分页信息、时间范围、仓库和供应商信息等。以下是一个典型的请求配置示例:

{
  "api": "erp.storage.goodsdocin",
  "method": "POST",
  "pagination": {
    "pageSize": 50
  },
  "request": [
    {"field": "pageIndex", "label": "分页页码", "type": "string"},
    {"field": "pageSize", "label": "分页页数", "type": "string", "value": "50"},
    {"field": "goodsDocNo", "label": "入库单号", "type": "string"},
    {"field": "startDate", "label": "创建时间的起始时间", "type": "string", "value":"{{LAST_SYNC_TIME|datetime}}"},
    {"field": "endDate", "label": "创建时间的结束时间", "type":"string","value":"{{CURRENT_TIME|datetime}}"},
    {"field":"inouttype","label":"入库类型","type":"string","value":"101"},
    {"field":"warehouseId","label":"仓库ID","type":"string"},
    {"field":"warehouseCode","label":"仓库编号","type":"string"},
    {"field":"vendId","label":"供应商ID(往来单位)","type":"string"},
    {"field":"vendCode","label":"供应商编号(往来单位)","type":"string"},
    {"field":"billNo","label":"上游单据号(关联单号)","type":"string"},
    {"field":"userName","label":"创建人名称","type":"string"},
    {"field":"gmtModifiedStart","label":"主表更新时间起始","type":"string"},
    {"field":"gmtModifiedEnd","label":"主表更新时间截至","type":"string"}
  ]
}

数据请求与清洗

在实际操作中,首先需要通过上述配置向吉客云发送HTTP POST请求以获取采购入库数据。假设我们已经成功获取了响应数据,接下来需要对这些数据进行清洗和初步加工。

  1. 字段映射与转换

    • 将原始字段映射到目标系统所需的字段。例如,将goodsDocNo映射为目标系统中的documentNumber
    • 对日期格式进行标准化处理,将不同格式的日期统一转换为ISO8601标准格式。
  2. 去重与校验

    • 根据元数据配置中的idCheck参数,确保每条记录唯一性。可以使用recId作为唯一标识符进行去重。
    • 校验关键字段是否存在空值或不合法值,例如确保每条记录都有有效的warehouseIdsupplierId
  3. 异常处理

    • 对于缺失或错误的数据,记录日志并进行相应处理。例如,如果某条记录缺少必要的供应商信息,可以选择丢弃该记录并在日志中注明原因。

以下是一个简单的数据清洗示例代码(伪代码):

def clean_data(raw_data):
    cleaned_data = []

    for record in raw_data:
        if not record.get('warehouseId') or not record.get('vendId'):
            log_error(f"Missing warehouseId or vendId in record {record['recId']}")
            continue

        cleaned_record = {
            'documentNumber': record['goodsDocNo'],
            'warehouseId': record['warehouseId'],
            'supplierId': record['vendId'],
            'createdDate': convert_to_iso8601(record['startDate']),
            'modifiedDate': convert_to_iso8601(record['gmtModifiedStart'])
        }

        cleaned_data.append(cleaned_record)

    return cleaned_data

数据转换与写入

完成数据清洗后,下一步是将清洗后的数据转换为目标系统所需的格式,并写入目标数据库或系统。在此过程中,需要注意以下几点:

  • 确保所有必需字段都已正确映射和填充。
  • 根据目标系统的要求,对数据进行进一步转换,例如数值单位转换、字符串格式调整等。
  • 使用批量写入方式提高效率,并确保在写入过程中有适当的错误处理机制。

通过上述步骤,我们可以高效地从吉客云获取采购入库数据,并对其进行清洗和初步加工,为后续的数据集成过程打下坚实基础。 打通企业微信数据接口

使用轻易云数据集成平台进行ETL转换并写入金蝶云星空API接口

在数据集成的生命周期中,第二步是将已经集成的源平台数据进行ETL(提取、转换、加载)转换,并将其转为目标平台所能够接收的格式。本文将详细探讨如何使用轻易云数据集成平台,将采购入库数据转换为金蝶云星空API接口所需的格式,并最终写入目标平台。

配置元数据

首先,我们需要根据元数据配置来定义API请求的结构。以下是一个典型的元数据配置示例:

{
  "api": "batchSave",
  "method": "POST",
  "idCheck": true,
  "operation": {
    "rowsKey": "array",
    "rows": 20,
    "method": "batchArraySave"
  },
  "request": [
    {"field":"FBillTypeID","label":"单据类型","type":"string","describe":"单据类型","parser":{"name":"ConvertObjectParser","params":"FNumber"},"value":"RKD01_SYS"},
    {"field":"FBusinessType","label":"业务类型","type":"string","describe":"下拉列表","value":"CG"},
    {"field":"FBillNo","label":"单据编号","type":"string","describe":"单据编号","value":"{goodsdocNo}"},
    {"field":"FDate","label":"入库日期","type":"string","describe":"日期","value":"_function FROM_UNIXTIME(  ( {inOutDate} \/ 1000 )  ,'%Y-%m-%d %T' )"},
    {"field":"FStockOrgId","label":"收料组织","type":"string","describe":"组织","parser":{"name":"ConvertObjectParser","params":"FNumber"},"value":"_findCollection find textField_l2ib4q9s from 0ce3a160-9fb2-36e5-a2ac-57f5ad0f3c72 where textField_l2ib4q9p={warehouseCode}"},
    {"field":"FPurchaseOrgId","label":"采购组织","type":"string","describe":"组织","parser":{"name":"ConvertObjectParser","params":"FNumber"},"value":"_findCollection find textField_l2ib4q9s from 0ce3a160-9fb2-36e5-a2ac-57f5ad0f3c72 where textField_l2ib4q9p={warehouseCode}"},
    {"field":"FSupplierId","label":"供应商","type":"string","describe":"基础资料","parser":{"name":"ConvertObjectParser","params":"FNumber"},"value":"{vendCustomerCode}"},
    {
      "field": "FInStockEntry",
      "label": "明细信息",
      "type": "array",
      "children": [
        {"field": "FMaterialId", "label": "物料编码", "type": "string", "describe": "基础资料", "parser":{"name": "ConvertObjectParser", "params": "FNumber"}, "value": "{{goodsDocDetailList.goodsNo}}", "parent": "FInStockEntry"},
        {"field": "FRealQty", "label": "实收数量", "type": "string", "describe": "数量",  "value": "{{goodsDocDetailList.quantity}}", "parent":   "FInStockEntry"},
        {"field":   "FPrice",   "label":    "单价",   "type": "string",   "describe": "单价",   "value": "{{goodsDocDetailList.cuPrice}}",  "parent":   "FInStockEntry"},
        {"field":   "FStockId", "label":    "仓库",   "type": "string",   "describe": "基础资料", "parser":{"name":
        ...
      ],
      ...
    },
    ...
]

数据解析与转换

  1. 字段解析:在元数据配置中,我们使用了ConvertObjectParser来解析某些字段,例如FBillTypeIDFSupplierId。这些字段需要从源系统的数据中提取并转换为目标系统所需的格式。

    {
     ...,
     {"field":  "FBillTypeID",  ...,"parser":{"name":
     ...
    }
  2. 日期格式转换:对于日期字段,如FDate,我们使用了自定义函数来将UNIX时间戳转换为标准日期格式。

    {
     ...,
     {"field":
     ...
    }
  3. 集合查找:对于组织相关的字段,如FStockOrgIdFPurchaseOrgId,我们通过集合查找功能来获取相应的数据。

    {
     ...,
     {"field":
     ...
    }

构建请求体

根据以上解析和转换规则,我们可以构建出符合金蝶云星空API接口要求的请求体。以下是一个示例请求体:

{
  ...
}

调用API接口

最后一步是调用金蝶云星空的API接口,将构建好的请求体发送到目标平台。我们使用HTTP POST方法来实现这一点:

import requests

url = 'https://api.kingdee.com/batchSave'
headers = {'Content-Type': 'application/json'}
data = {
  ...
}

response = requests.post(url, headers=headers, json=data)

if response.status_code == 200:
    print('Data successfully written to Kingdee Cloud.')
else:
    print('Failed to write data:', response.text)

通过以上步骤,我们成功地完成了从源平台到金蝶云星空的ETL转换和数据写入过程。这一过程充分利用了轻易云数据集成平台提供的强大功能,实现了高效、准确的数据集成。 轻易云数据集成平台金蝶集成接口配置