ETL转换技术案例:从数据清洗到金蝶云星空接口写入

  • 轻易云集成顾问-杨嫦

在旺店通·企业奇门到金蝶云星空的数据集成:从API对接到大数据同步

在我们这次的技术案例分享中,我们将重点介绍如何通过轻易云数据集成平台高效、安全地实现 旺店通·企业奇门 数据到 金蝶云星空 的无缝对接。具体执行方案为 "旺店通其他入库委外同步--114"。

集成需求与挑战

对于电商企业来说,订单和库存数据的准确性和实时性至关重要。在此背景下,通过调用 wdt.stockin.order.query API,从旺店通·企业奇门拉取最新的入库单据并写入金蝶云星空成为关键任务。然而,这个过程面临多种挑战:

  1. 避免漏单现象:确保所有数据都能成功获取和写入,而不会遗漏任何一条。
  2. 批量处理能力:支持大量订单数据快速上传,并保证系统稳定运行。
  3. 分页与限流问题:分批次提取和传输大体量的数据,以避免接口调用次数过多导致速率限制或超时。
  4. 格式差异处理:从源头上解决两者之间的数据格式不兼容问题,确保顺利转换。

方案设计思路

依托轻易云数据集成平台,我们设计了如下的实施路径:

  • 首先,利用定时调度功能,每隔固定时间段自动调用 wdt.stockin.order.query API 抓取新生成的入库单据。该操作会结合特有的日志记录功能,实现全生命周期内的数据变动实时监控。
  • 接着,通过自定义脚本,将抓取来的JSON格式原始数据进行清洗转化,使其符合目标数据库——金蝶云星空所需的XML或其它适用格式,以适配 batchSave 写入接口规范。
  • 此后,在每次调度周期结束后做整体校验,如遇异常状况则触发重试机制,对失败部分重新提交以保障业务持续不中断。

在实际应用过程中,要特别注意两个方面的问题。一是精确控制分页及限流策略,合理设置页码、页面大小及请求频率;二是在映射字段时,需要考虑双方系统表结构可能存在的不一致,因此须逐项校验明细字段确保匹配。同时,在结果回执环节也必须反馈详细状态信息便于后续审核追踪。

开篇至此,下文将详细阐述上述流程具体配置方法以及常见故障应对举措,包括但不限于接口连接参数设定、高效代码示例等。这不仅展示了一个扩展性的解决方案模板,也提供了一系列 金蝶与SCM系统接口开发配置

调用旺店通·企业奇门接口wdt.stockin.order.query获取并加工数据

在数据集成生命周期的第一步中,调用源系统接口获取数据是至关重要的环节。本文将详细探讨如何使用轻易云数据集成平台调用旺店通·企业奇门接口wdt.stockin.order.query,并对获取的数据进行初步加工。

接口调用配置

首先,我们需要配置调用接口的元数据。根据提供的元数据配置,可以看到该接口使用POST方法进行请求,并且支持分页查询。以下是具体的元数据配置细节:

{
  "api": "wdt.stockin.order.query",
  "method": "POST",
  "number": "order_no",
  "id": "stockin_id",
  "pagination": {
    "pageSize": 50
  },
  "idCheck": true,
  "condition": [
    [
      {"field": "operator_name", "logic": "neq", "value": "外部接口"},
      {"field": "remark", "logic": "like", "value": "114"}
    ]
  ],
  "request": [
    {"field": "start_time", "label": "开始时间", "type": "string", "value": "{{LAST_SYNC_TIME|datetime}}"},
    {"field": "end_time", "label": "结束时间", "type": "string", "value": "{{CURRENT_TIME|datetime}}"},
    {"field": "order_type", "label": "源单据类别", "type": "string", "value":"12"},
    {"field":"status","label":"入库单状态","type":"string"},
    {"field":"warehouse_no","label":"仓库编号","type":"string"},
    {"field":"src_order_no","label":"上层单据编号","type":"string"},
    {"field":"stockin_no","label":"入库单号","type":"string"}
  ],
  "otherRequest":[
    {"field":"page_size","label":"分页大小","type":"string","value":"{PAGINATION_PAGE_SIZE}"},
    {"field":"page_no","label":"页号","type":"string","value":"{PAGINATION_START_PAGE}"}
  ]
}

请求参数解析

  1. 时间参数start_timeend_time分别代表查询的起始和结束时间,通过模板变量{{LAST_SYNC_TIME|datetime}}{{CURRENT_TIME|datetime}}动态生成。
  2. 过滤条件:通过设置order_type为12来指定源单据类别,同时可以根据需求添加其他过滤条件如状态、仓库编号等。
  3. 分页参数:通过page_sizepage_no实现分页查询,确保每次请求返回的数据量可控。

数据请求与清洗

在实际操作中,调用接口后会返回一批原始数据,这些数据需要经过清洗和转换才能进入下一个处理阶段。以下是一个简单的数据清洗流程:

  1. 过滤无效数据:根据元数据中的条件配置,过滤掉不符合条件的数据。例如,排除操作员为“外部接口”的记录,以及备注中不包含“114”的记录。
  2. 字段映射与转换:将返回的数据字段映射到目标系统所需的字段格式。例如,将返回的订单号映射到目标系统的订单号字段,并进行必要的数据类型转换。

实现代码示例

以下是一个伪代码示例,展示如何使用轻易云平台调用接口并处理返回的数据:

import requests
import datetime

# 设置请求URL和头信息
url = 'https://api.wangdian.cn/openapi2/wdt.stockin.order.query'
headers = {'Content-Type': 'application/json'}

# 设置请求参数
params = {
    'start_time': (datetime.datetime.now() - datetime.timedelta(days=1)).strftime('%Y-%m-%d %H:%M:%S'),
    'end_time': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
    'order_type': '12',
    'page_size': 50,
    'page_no': 1
}

# 发起POST请求
response = requests.post(url, json=params, headers=headers)
data = response.json()

# 数据清洗与转换
cleaned_data = []
for record in data['orders']:
    if record['operator_name'] != '外部接口' and '114' in record['remark']:
        cleaned_record = {
            'order_no': record['order_no'],
            'stockin_id': record['stockin_id'],
            # 添加其他需要的字段映射与转换
        }
        cleaned_data.append(cleaned_record)

# 输出清洗后的数据
print(cleaned_data)

通过上述步骤,我们成功地从旺店通·企业奇门接口获取了所需的数据,并进行了初步的清洗和转换,为后续的数据处理奠定了基础。在实际应用中,可以根据具体业务需求进一步优化和扩展此流程。 钉钉与WMS系统接口开发配置

基于轻易云数据集成平台的ETL转换与金蝶云星空API接口集成技术案例

在数据集成过程中,ETL(Extract, Transform, Load)是关键的一步。本文将详细探讨如何使用轻易云数据集成平台,将源平台的数据经过ETL转换后,写入目标平台金蝶云星空API接口。

数据请求与清洗

在数据集成的第一阶段,我们已经从源平台获取了原始数据,并进行了初步清洗。接下来,我们重点讨论如何将这些清洗后的数据进行转换,以符合金蝶云星空API接口的要求,并最终写入目标系统。

元数据配置解析

元数据配置是实现ETL转换的核心。以下是我们在轻易云数据集成平台上配置的元数据:

{
  "api": "batchSave",
  "method": "POST",
  "idCheck": true,
  "operation": {
    "rowsKey": "array",
    "rows": 1,
    "method": "batchArraySave"
  },
  "request": [
    {"field":"FBillNo","label":"单据编号","type":"string","value":"{stockin_no}-TC"},
    {"field":"FBillTypeID","label":"单据类型","type":"string","parser":{"name":"ConvertObjectParser","params":"FNumber"},"value":"QTRKD01_SYS"},
    {"field":"FStockOrgId","label":"库存组织","type":"string","parser":{"name":"ConvertObjectParser","params":"FNumber"},"value":"114"},
    {"field":"FDate","label":"日期","type":"string","value":"{stockin_time}"},
    {"field":"FSUPPLIERID","label":"供应商","type":"string","parser":{"name":"ConvertObjectParser","params":"FNumber"}},
    {"field":"FDEPTID","label":"部门","type":"string","parser":{"name":"ConvertObjectParser","params":"FNumber"},"value":"BM000002"},
    {"field":"FNOTE","label":"备注","type":"string","value":"{remark}"},
    {
      "field": "FEntity",
      "label": "明细信息",
      "type": "array",
      "children": [
        {"field": "FMATERIALID", "label": "物料编码", "type": "string", "parser":{"name": "ConvertObjectParser", "params": "FNumber"}, "value": "{{details_list.spec_no}}", "parent": "FEntity"},
        {"field": "FCMKBarCode", "label": "零售条形码", "type": "string",    "parent":   "FEntity"},
        {"field":   "FSTOCKID", "label":    "收货仓库", "type": "string",   "parser":{"name":   "ConvertObjectParser",  "params":   "FNumber"}, "value": "{warehouse_no}",  "parent":   "FEntity"},
        {"field":   "FQty", "label":    "实收数量", "type": "string",   "value": "{{details_list.goods_count}}",    "parent":   "FEntity"},
        {"field":   "FEntryNote",   "label":    "备注",   "type": "string",   "parent":   "FEntity"},
        {"field":   "FPrice",   "label":    "成本价",  "type": "string",                                                   value: "{cost_price}",                       parent: FEntity}
      ],
      value: details_list
    }
  ],
  otherRequest: [
    { field: FormId, label: 业务对象表单Id, type: string, value: STK_MISCELLANEOUS },
    { field: IsVerifyBaseDataField, label: 验证基础资料, type: bool, value: true },
    { field: Operation, label: 执行的操作, type: string, value: Save },
    { field: IsAutoSubmitAndAudit, label: 提交并审核, type: bool, value: true }
  ]
}

数据转换与写入

  1. API接口配置:我们使用的是batchSave API,采用POST方法。为了确保数据唯一性,我们启用了idCheck

  2. 字段映射

    • 单据编号(FBillNo):通过模板字符串将stockin_no字段加上后缀-TC
    • 单据类型(FBillTypeID):固定值为QTRKD01_SYS,并通过ConvertObjectParser解析。
    • 库存组织(FStockOrgId):固定值为114,同样通过解析器处理。
    • 日期(FDate):直接映射源字段stockin_time
    • 供应商(FSUPPLIERID)部门(FDEPTID)备注(FNOTE)等字段均进行了相应的解析和映射。
  3. 明细信息(FEntity)处理

    • 明细信息是一个数组,每个元素包含多个字段,如物料编码、零售条形码、收货仓库、实收数量、备注和成本价等。
    • 每个字段都通过相应的解析器和模板字符串进行转换,例如物料编码通过spec_no字段映射,并由解析器处理为目标格式。
  4. 其他请求参数

    • 设置业务对象表单Id为STK_MISCELLANEOUS
    • 启用基础资料验证和自动提交审核功能,以确保数据的完整性和一致性。

实践案例

假设我们从源系统获得了以下原始数据:

{
  stockin_no: 'IN20231012',
  stockin_time: '2023-10-12',
  remark: '本次入库委外',
  details_list:[
    {
      spec_no:'MAT001',
      goods_count:'100',
      warehouse_no:'WH001',
      cost_price:'10.00'
    }
  ]
}

经过上述元数据配置和ETL转换后,生成的请求体如下:

{
  api:"batchSave",
  method:"POST",
  idCheck:true,
  operation:{
    rowsKey:"array",
    rows:1,
    method:"batchArraySave"
  },
  request:[
    { FBillNo:"IN20231012-TC"},
    { FBillTypeID:{ FNumber:"QTRKD01_SYS"}},
    { FStockOrgId:{ FNumber:"114"}},
    { FDate:"2023-10-12"},
    { FSUPPLIERID:{ FNumber:"SUP001"}},
    { FDEPTID:{ FNumber:"BM000002"}},
    { FNOTE:"本次入库委外"},
     {
       field:FEntity,
       label:"明细信息",
       type:"array",
       children:[
         { FMATERIALID:{ FNumber:"MAT001"}},
         { FCMKBarCode:null},
         { FSTOCKID:{ FNumber:"WH001"}},
         { FQty:"100"},
         { FEntryNote:null},
         { FPrice:"10.00"}
       ],
       value:[]
     }
   ],
   otherRequest:[
     { FormId:"STK_MISCELLANEOUS"},
     { IsVerifyBaseDataField:true},
     { Operation:"Save"},
     { IsAutoSubmitAndAudit:true}
   ]
}

以上示例展示了如何利用轻易云数据集成平台进行ETL转换,并通过金蝶云星空API接口实现数据写入。这种方法不仅提高了数据处理效率,还保证了不同系统间的数据一致性和准确性。 打通企业微信数据接口