ETL转换实例:使用轻易云平台集成金蝶云星空

  • 轻易云集成顾问-卢剑航

旺店通其他入库同步--114:从旺店通·企业奇门到金蝶云星空的数据集成实践

在处理大规模、复杂数据流的背景下,高效的数据集成是许多企业亟待解决的问题。本文将深入探讨如何通过轻易云数据集成平台,实现旺店通·企业奇门数据与金蝶云星空系统的无缝对接。本案例名为“旺店通其他入库同步--114”,旨在分享一个成功的系统对接实例,展示关键技术要点和实际操作细节。

确保数据不漏单 首先,我们利用wdt.stockin.order.query接口从旺店通·企业奇门抓取入库订单。在定时任务调度方面,通过设定合理的时间间隔频繁调用该接口,可以确保每个新生成或更新的订单及时进入我们的处理流程,从而避免漏单现象。同时,我们设计了容错机制,在网络波动或API限流等异常情况下进行重试,确保所有应该捕获的数据能够完整采集。

批量快速写入 针对金蝶云星空,我们采用其提供的batchSave API进行数据批量写入操作。这样不仅提高了单次请求的信息传输效率,还降低了整体系统交互次数,为大量数据高效上传提供了保障。此外,为应对不同时间段可能出现的大量并发写入需求,我们进行了性能调优,通过异步处理机制和队列管理,有效平衡负载,提高写入速度和稳定性。

分页与限流问题处理 由于wdt.stockin.order.query接口存在分页和限流限制,我们必须仔细规划调用逻辑。在每次获取一页记录后立即存储,并返回获取下一页,以此迭代直到所有页面读取完毕。为了防止因超出API限额导致的数据丢失或延迟,可加入自动降速功能,使得总请求次数维持在合理范围内,同时也减少对后续处理环节造成阻塞。

本篇文章将详细解析这一系列步骤及其实现方法,以便读者能全面了解高效、安全地完成两个主要业务系统之间的数据转移过程,以及应对此过程中潜在挑战的方法和策略。 数据集成平台API接口配置

调用旺店通·企业奇门接口wdt.stockin.order.query获取并加工数据

在数据集成生命周期的第一步中,调用源系统接口获取数据是至关重要的一环。本文将详细探讨如何通过轻易云数据集成平台调用旺店通·企业奇门接口wdt.stockin.order.query,并对获取的数据进行初步加工。

接口调用配置

首先,我们需要配置接口调用的元数据。以下是针对wdt.stockin.order.query接口的详细配置:

{
  "api": "wdt.stockin.order.query",
  "method": "POST",
  "number": "order_no",
  "id": "stockin_id",
  "pagination": {
    "pageSize": 50
  },
  "idCheck": true,
  "request": [
    {"field":"start_time","label":"开始时间","type":"string","value":"{{LAST_SYNC_TIME|datetime}}"},
    {"field":"end_time","label":"结束时间","type":"string","value":"{{CURRENT_TIME|datetime}}"},
    {"field":"order_type","label":"源单据类别","type":"string","value":"6"},
    {"field":"status","label":"入库单状态","type":"string"},
    {"field":"warehouse_no","label":"仓库编号","type":"string"},
    {"field":"src_order_no","label":"上层单据编号","type":"string"},
    {"field":"stockin_no","label":"入库单号","type":"string"}
  ],
  "otherRequest": [
    {"field":"page_size","label":"分页大小","type":"string","value":"{PAGINATION_PAGE_SIZE}"},
    {"field":"page_no","label":"页号","type":"string","value":"{PAGINATION_START_PAGE}"}
  ],
  "condition_bk": [
    [{"field": "operator_name", "logic": "neq", "value": "外部接口"}, {"field": "remark", "logic": "like", "value": "114"}]
  ],
  "condition": [
    [{"field": "operator_name", "logic": "neq", "value": "外部接口"}, {"field": "remark", "logic": "like", "value": "114"}]
  ]
}

请求参数解析

  • start_timeend_time:这两个字段用于指定查询的时间范围,分别对应上次同步时间和当前时间。
  • order_type:固定值为6,表示源单据类别。
  • statuswarehouse_nosrc_order_nostockin_no:这些字段用于进一步过滤查询结果,根据实际需求可以动态设置。
  • page_sizepage_no:用于分页查询,每次请求返回50条记录。

数据请求与清洗

在调用接口后,系统会返回包含多个入库订单的数据。为了确保数据质量和一致性,我们需要对返回的数据进行清洗和初步处理。

  1. 去重检查:根据stockin_id进行去重,确保每个入库订单唯一。
  2. 条件过滤:应用配置中的条件过滤器,例如排除操作员名称为“外部接口”的记录,并且备注包含“114”的记录。

数据转换与写入

经过清洗后的数据,需要进行适当的转换,以便写入目标系统。常见的转换操作包括:

  • 字段映射:将源系统的字段名映射到目标系统的字段名。
  • 数据格式转换:例如,将日期字符串转换为目标系统所需的日期格式。

以下是一个简单的数据转换示例:

{
  "_id": "{{stockin_id}}",
  "_orderNo": "{{order_no}}",
  "_warehouseNo": "{{warehouse_no}}",
  "_statusCode": "{{status}}"
}

实时监控与日志记录

在整个数据请求与清洗过程中,实时监控和日志记录是不可或缺的。通过轻易云平台提供的可视化界面,可以实时查看每个环节的数据流动和处理状态,及时发现并解决潜在问题。

总结

通过上述步骤,我们成功实现了从旺店通·企业奇门接口获取并加工入库订单数据。这不仅提高了数据处理效率,还确保了数据的一致性和准确性。在实际应用中,根据业务需求灵活调整配置,可以进一步优化数据集成流程。 金蝶与WMS系统接口开发配置

使用轻易云数据集成平台进行ETL转换并写入金蝶云星空API接口的技术案例

在数据集成生命周期的第二步,我们将已经集成的源平台数据进行ETL(提取、转换、加载)转换,最终转为目标平台——金蝶云星空API接口所能够接收的格式并写入目标平台。以下是具体的技术实现细节。

配置元数据

首先,我们需要配置元数据以适应金蝶云星空API接口的要求。根据提供的元数据配置,我们可以看到以下几个关键字段和配置项:

{
  "api": "batchSave",
  "method": "POST",
  "idCheck": true,
  "operation": {
    "rowsKey": "array",
    "rows": 1,
    "method": "batchArraySave"
  },
  "request": [
    {"field":"FBillNo","label":"单据编号","type":"string","value":"{stockin_no}-TC"},
    {"field":"FBillTypeID","label":"单据类型","type":"string","parser":{"name":"ConvertObjectParser","params":"FNumber"},"value":"QTRKD01_SYS"},
    {"field":"FStockOrgId","label":"库存组织","type":"string","parser":{"name":"ConvertObjectParser","params":"FNumber"},"value":"114"},
    {"field":"FDate","label":"日期","type":"string","value":"{stockin_time}"},
    {"field":"FSUPPLIERID","label":"供应商","type":"string","parser":{"name":"ConvertObjectParser","params":"FNumber"}},
    {"field":"FDEPTID","label":"部门","type":"string","parser":{"name":"ConvertObjectParser","params":"FNumber"},"value":"BM000002"},
    {"field":"FNOTE","label":"备注","type":"string"},
    {
      "field": "FEntity",
      "label": "明细信息",
      "type": "array",
      "children": [
        {"field": "FMATERIALID", "label": "物料编码", "type": "string", "parser":{"name": "ConvertObjectParser", "params": "FNumber"}, "value": "{{details_list.spec_no}}", "parent": "FEntity"},
        {"field": "FCMKBarCode", "label": "零售条形码", "type": "string",    "parent":   "FEntity"},
        {"field":   "FSTOCKID", "label":    "收货仓库", "type": "string",   "parser":{"name":   "ConvertObjectParser",  "params":   "FNumber"}, "value": "{warehouse_no}",  "parent":   "FEntity"},
        {"field":   "FQty", "label":    "实收数量", "type": "string",   "value”: “{{details_list.goods_count}}”, “parent”: “FEntity”},
        {"field”: “FEntryNote”, “label”: “备注”, “type”: “string”, “parent”: “FEntity”},
        {"field”: “FPrice”, “label”: “成本价”, “type”: “string”, “value”: "{cost_price}",“parent”:“ FEntity”}
      ],
      “value”:“ details_list”
    }
  ],
  “otherRequest”:[
    {“ field”:“ FormId”,“ label”:“业务对象表单Id”,“ type”:“ string”,“ value”:“ STK_MISCELLANEOUS”},
    {“ field”:“ IsVerifyBaseDataField”,“ label”:验证基础资料”,” type”:bool”,” value”:true”},
    {“ field”:Operation”,” label”:执行的操作”,” type”:string”,” value”:Save”},
    {“ field”:IsAutoSubmitAndAudit”,” label”:提交并审核”,” type”:bool”,” value”:true”}
  ]
}

数据请求与清洗

在数据请求阶段,我们从源系统(如旺店通)获取原始数据,并对其进行必要的数据清洗和预处理。这一步确保了数据的一致性和完整性,为后续的转换和加载打下基础。

数据转换与写入

  1. 提取(Extract):从源系统中提取原始数据,例如库存入库单的信息。
  2. 转换(Transform):将提取的数据转换为金蝶云星空API接口所需的格式。在这个过程中,我们需要根据元数据配置中的字段映射关系进行相应的数据转换。例如,将stockin_no字段值加上后缀-TC作为FBillNo,将stockin_time直接映射为FDate等。
  3. 加载(Load):将转换后的数据通过API接口写入到金蝶云星空系统中。

实现细节

  • API调用:使用HTTP POST方法调用金蝶云星空的batchSave API接口。
  • 字段映射与解析:利用元数据中的字段配置和解析器,如ConvertObjectParser,将源系统中的字段值转换为目标系统所需的格式。例如,将供应商ID、部门ID等字段通过解析器转换为对应的编码格式。
  • 批量处理:根据元数据配置中的operation.rowsKeyoperation.method,我们可以实现批量处理,将多个记录一次性写入目标系统,提高效率。

以下是一个简化版的代码示例,展示了如何利用上述元数据配置进行ETL转换并调用金蝶云星空API:

import requests
import json

# 假设从源系统获取的数据
source_data = {
    'stockin_no': 'SI12345',
    'stockin_time': '2023-10-01',
    'details_list': [
        {'spec_no': 'MAT001', 'goods_count': '100', 'cost_price': '10.00'},
        {'spec_no': 'MAT002', 'goods_count': '200', 'cost_price': '20.00'}
    ]
}

# 根据元数据配置进行字段映射和解析
transformed_data = {
    'FBillNo': f"{source_data['stockin_no']}-TC",
    'FBillTypeID': {'FNumber': 'QTRKD01_SYS'},
    'FStockOrgId': {'FNumber': '114'},
    'FDate': source_data['stockin_time'],
    # ...其他字段映射...
}

# 明细信息处理
transformed_data['FEntity'] = []
for detail in source_data['details_list']:
    entity = {
        'FMATERIALID': {'FNumber': detail['spec_no']},
        # ...其他明细字段映射...
        'FQty': detail['goods_count'],
        'FPrice': detail['cost_price']
    }
    transformed_data['FEntity'].append(entity)

# 构造请求体
request_body = {
    'FormId': 'STK_MISCELLANEOUS',
    'IsVerifyBaseDataField': True,
    'Operation': 'Save',
    'IsAutoSubmitAndAudit': True,
    # 包含主表和明细表的数据
}

# 调用金蝶云星空API
response = requests.post(
   url='https://api.kingdee.com/batchSave',
   headers={'Content-Type':'application/json'},
   data=json.dumps(request_body)
)

if response.status_code == 200:
   print('Data successfully loaded into Kingdee Cloud.')
else:
   print(f'Failed to load data: {response.text}')

通过上述步骤,我们成功地将源平台的数据经过ETL转换后写入到目标平台——金蝶云星空,实现了不同系统间的数据无缝对接。 用友与CRM系统接口开发配置