使用ETL技术实现数据从旺店通到金蝶云的转换

  • 轻易云集成顾问-孙传友

旺店通销售出库单同步至金蝶销售出库单【分销】案例解析

在企业信息化系统集成过程中,如何高效、准确地进行数据对接是一个关键问题。本篇文章将聚焦于旺店通·企业奇门的数据集成到金蝶云星空的实际案例,通过具体的技术细节分享,实现“旺店通销售出库单同步至金蝶销售出库单【分销】”这一操作。

首先,我们需要从旺店通获取相关的数据。这里使用了 wdt.stockout.order.query.trade API 来抓取最新的销售出库单数据。为了确保不漏单,并实现定时可靠的数据抓取,我们设置了合理的触发时间和轮询机制。同时,处理接口调用中的分页和限流问题也是必要的一环,以保证每次请求能够顺利返回完整数据。

接着是数据格式转换的问题。在将旺店通·企业奇门的数据写入到金蝶云星空之前,需进行字段映射和格式转换,这里我们实现了一套定制化的数据映射方案,使得两边系统间无缝对接。此外,为保证大量数据能快速而稳定地写入金蝶云星空,我们采用了批量提交的方法来提高效率。而对应的API则为 batchSave,该API支持多条记录同时插入,大大提升了运行性能。

为了应对可能出现的数据异常情况,本方案中还引入了错误重试机制。一旦某个批次提交失败,可以自动重新尝试提交,从而避免因为网络波动或临时故障导致的大规模数据丢失。同时,在整个处理过程中,我们通过实时监控与日志记录机制,对每一步操作进行了详细追踪,一旦发现异常,可迅速定位并解决问题。

最后,实施中涉及到了若干精细化配置,不仅确保了高效传输,还兼顾到了业务逻辑复杂性及扩展性需求,例如动态调整限流参数、优化分页策略等。当所有这些技术要点有机结合后,即可实现高度可靠且灵活扩展的系统对接集成。

此案展示的不仅是一种工具应用,更是多种技术手段综合运用后的最佳实践,希望能为类似复杂业务场景下的信息系统整合提供有益借鉴。 用友与外部系统接口集成开发

调用源系统旺店通·企业奇门接口wdt.stockout.order.query.trade获取并加工数据

在数据集成的生命周期中,调用源系统接口是至关重要的一步。本文将详细探讨如何通过轻易云数据集成平台调用旺店通·企业奇门接口wdt.stockout.order.query.trade来获取销售出库单数据,并进行初步的数据清洗和加工。

接口调用配置

首先,我们需要配置API接口的基本信息。根据提供的元数据配置,接口的基本信息如下:

  • API名称: wdt.stockout.order.query.trade
  • 请求方法: POST
  • 分页大小: 100条/页
  • 唯一标识字段: order_no

请求参数设置

为了实现增量数据获取,我们需要设置时间范围参数start_timeend_time。这些参数将帮助我们获取特定时间段内的数据:

{
  "start_time": "{{HOURE_AGO_3|datetime}}",
  "end_time": "{{CURRENT_TIME|datetime}}"
}

此外,我们还可以根据业务需求设置其他查询条件,例如订单状态、系统订单编号、原始单号等:

{
  "status": "55",  // 已审核
  "src_order_no": "",
  "src_tid": "",
  "stockout_no": "",
  "shop_no": "",
  "warehouse_no": ""
}

条件过滤

为了确保数据的准确性和有效性,我们需要对数据进行条件过滤。根据元数据配置,以下是我们需要应用的过滤条件:

  1. 仓库名称包含“七遇”或“百媚”或“江苏淮安”
  2. 店铺名称不包含“OEM”
  3. 分销昵称不等于某个特定值

这些条件可以通过以下JSON格式来表示:

[
  {
    "field": "warehouse_name",
    "logic": "like",
    "value": "七遇"
  },
  {
    "field": "shop_name",
    "logic": "notlike",
    "value": "OEM"
  },
  {
    "field": "fenxiao_nick",
    "logic": "neqv2"
  }
],
[
  {
    "field": "warehouse_name",
    "logic": "like",
    "value": "百媚"
  },
  {
    "field": "shop_name",
    "logic": "notlike",
    "value": "OEM"
  },
  {
    "field": "fenxiao_nick",
    "logic": "neqv2"
  }
],
[
  {
    "field": "warehouse_name",
    "logic": "like",
    {
      value: “江苏淮安”
      }
   },
   {
     field: “shop_name”,
     logic: “notlike”,
     value: “OEM”
   },
   {
     field: “fenxiao_nick”,
     logic: “neqv2”
   }
]

数据请求与分页处理

由于每次请求返回的数据条数有限(最多100条),我们需要实现分页处理,以确保能够获取所有符合条件的数据。分页处理可以通过设置page_sizepage_no参数来实现:

{
  “page_size”: “100”,
  “page_no”: “0”
}

在实际操作中,需要循环增加page_no直到没有更多数据返回为止。

数据清洗与转换

在获取到原始数据后,需要对其进行清洗和转换,以便后续写入目标系统。在这个过程中,可以执行以下操作:

  1. 字段映射:将源系统中的字段映射到目标系统中的对应字段。
  2. 数据格式转换:例如,将日期格式从“yyyy-MM-dd HH:mm:ss”转换为目标系统所需的格式。
  3. 异常数据处理:过滤掉不符合业务规则的数据,或者标记为异常以便后续处理。

实际案例应用

假设我们已经成功调用了接口并获取了销售出库单数据,接下来我们需要对这些数据进行清洗和转换。例如,将原始单号(src_tid)映射到目标系统中的订单编号(order_id),并将日期格式进行转换:

{
   order_id: src_tid,
   order_date: convertDateFormat(order_date, ‘yyyy-MM-dd HH:mm:ss’, ‘MM/dd/yyyy’)
}

通过以上步骤,我们可以确保从旺店通·企业奇门接口获取的数据经过清洗和转换后,能够无缝对接到目标系统中,从而实现高效的数据集成。

总结来说,通过合理配置API接口、设置请求参数、应用条件过滤以及执行分页处理,我们能够高效地从源系统获取并加工所需的数据,为后续的数据写入和业务决策提供坚实的基础。 钉钉与WMS系统接口开发配置

使用轻易云数据集成平台实现旺店通销售出库单同步至金蝶云星空

在数据集成生命周期的第二步,我们需要将已经集成的源平台数据进行ETL转换,转为目标平台金蝶云星空API接口所能够接收的格式,并最终写入目标平台。以下是详细的技术实现过程。

1. API接口配置

首先,我们需要了解金蝶云星空API接口的配置。根据元数据配置文件,目标API为batchSave,请求方法为POST。在请求中,我们需要传递多个字段,这些字段包括单据类型、单据编号、日期、销售组织、客户、发货组织等。

2. 数据转换与映射

我们需要将源平台的数据转换为目标平台所需的格式。以下是主要字段的映射和转换规则:

  • FBillTypeID(单据类型): 固定值XSCKD01_SYS
  • FBillNo(单据编号): 从源数据中提取订单编号{order_no}
  • FDate(日期): 使用模板引擎将源数据中的发货时间{consign_time}转换为日期格式。
  • FSaleOrgId(销售组织): 通过MongoDB查询,根据分销商昵称{fenxiao_nick}获取对应的销售组织编码。
  • FCustomerID(客户): 同样通过MongoDB查询,根据分销商昵称获取客户名称。
  • FStockOrgId(发货组织): 从源数据中提取仓库编号{warehouse_no}并进行映射。

3. 明细信息处理

对于明细信息,我们需要处理多个子字段:

  • FMaterialID(物料编码): 根据物料编码规范,将源数据中的规格编号{{details_list.spec_no}}进行转换。
  • FRealQty(实发数量): 提取源数据中的商品数量{{details_list.goods_count}}
  • FTaxPrice(含税单价): 提取源数据中的销售价格{{details_list.sell_price}}
  • FOwnerTypeId(货主类型): 固定值BD_OwnerOrg
  • FOwnerId(货主): 固定值100,通过解析器进行转换。
  • FStockID(仓库): 提取并转换仓库编号。
  • FIsFree(是否赠品): 判断商品是否为赠品,进行相应标记。
  • FEntrynote(备注): 提取并转换备注信息。
  • F_POIH_Text(原始单号): 提取原始订单号。
  • FEntryTaxRate(税率): 提取并设置税率。

4. 财务信息处理

财务信息包含结算组织和结算币别:

  • FSettleOrgID(结算组织): 同样通过MongoDB查询,根据分销商昵称获取结算组织编码。
  • FSETTLECURRID(结算币别): 固定值PRE001,表示人民币。

5. 其他请求参数

除了上述主要字段外,还需要传递一些其他参数以确保请求成功:

  • FormId(业务对象表单Id): 固定值SAL_OUTSTOCK,表示销售出库单表单ID。
  • Operation(执行的操作): 固定值BatchSave,表示批量保存操作。
  • IsAutoSubmitAndAudit(提交并审核): 设置为true,表示自动提交并审核单据。
  • IsVerifyBaseDataField(验证基础资料): 设置为true,表示验证基础资料是否正确。
  • SubSystemId(系统模块): 固定值21,表示具体系统模块ID。
  • InterationFlags(允许负库存): 固定值STK_InvCheckResult, 表示允许负库存。

6. 实现代码示例

以下是一个伪代码示例,用于展示如何实现上述ETL过程:

import requests
import json
from datetime import datetime

def transform_data(source_data):
    transformed_data = {
        "FBillTypeID": {"FNumber": "XSCKD01_SYS"},
        "FBillNo": source_data["order_no"],
        "FDate": datetime.strptime(source_data["consign_time"], "%Y-%m-%d %H:%M:%S").strftime("%Y-%m-%d"),
        "FSaleOrgId": get_sale_org_id(source_data["fenxiao_nick"]),
        "FCustomerID": get_customer_id(source_data["fenxiao_nick"]),
        "FStockOrgId": {"FNumber": source_data["warehouse_no"]},
        "FNote": source_data["cs_remark"],
        "FEntity": [
            {
                "FMaterialID": {"FNumber": detail["spec_no"]},
                "FRealQty": detail["goods_count"],
                "FTaxPrice": detail["sell_price"],
                "FOwnerTypeId": "BD_OwnerOrg",
                "FOwnerId": {"FNumber": "100"},
                "FStockID": {"FNumber": source_data["warehouse_no"]},
                "FIsFree": detail.get("is_free", ""),
                "FEntrynote": detail.get("remark", ""),
                "F_POIH_Text": source_data["src_tids"],
                "FEntryTaxRate": source_data["tax_rate"]
            }
            for detail in source_data["details_list"]
        ],
        "SubHeadEntity": {
            "FSettleOrgID": get_settle_org_id(source_data["fenxiao_nick"]),
            "FSETTLECURRID": {"FNumber":"PRE001"}
        },
        # Other request parameters
    }
    return transformed_data

def get_sale_org_id(fenxiao_nick):
    # Implement MongoDB query to get sale org id
    pass

def get_customer_id(fenxiao_nick):
    # Implement MongoDB query to get customer id
    pass

def get_settle_org_id(fenxiao_nick):
    # Implement MongoDB query to get settle org id
    pass

def send_request(transformed_data):
    url = 'https://api.kingdee.com/batchSave'
    headers = {'Content-Type': 'application/json'}
    response = requests.post(url, headers=headers, data=json.dumps(transformed_data))
    return response.json()

source_data = {
    # Source data from 旺店通
}

transformed_data = transform_data(source_data)
response = send_request(transformed_data)
print(response)

以上代码展示了如何从源数据中提取必要的信息,并通过ETL过程将其转换为金蝶云星空API所需的格式。最终,通过HTTP POST请求将转换后的数据发送到金蝶云星空,实现数据的无缝对接和写入。 金蝶与MES系统接口开发配置