实现源平台数据向SQLServer的ETL转换与写入

  • 轻易云集成顾问-蔡威

汤臣倍健营销云数据集成到SQL Server的技术实现

在本案例中,我们探讨了如何通过轻易云数据集成平台,将汤臣倍健营销云的数据高效同步到SQL Server数据库。这一过程涉及API接口调用、数据质量监控、异常检测和处理等多个技术要点,确保从数据获取到存储的一系列操作无缝衔接。

数据获取与初始设置

首先,通过调用汤臣倍健营销云提供的API /api/openapi/v1/erp/order/honour/agreement/header 获取订单相关的数据。为应对大规模数据传输带来的挑战,我们采用定时任务机制,可靠地批量抓取这些API接口的数据。

数据转换与映射

为了将获取的数据顺利写入SQL Server,需要进行适当的格式转换和映射。在这一步骤中,自定义的数据转换逻辑尤为关键,以适配特定业务需求及不同系统间的结构差异。通过可视化设计工具,可以直观且快速地调整数据流,从而简化管理流程。

高吞吐量写入与性能优化

面对海量订单信息,为提升效率,我们充分利用了平台支撑的大吞吐量写入能力,通过 insert API 将转换后的数据注入SQL Server。这不仅提升了处理速度,也保证了业务实时性的要求。此外,还需注意分页和限流策略,以避免因单次请求过多导致系统性能瓶颈或超时错误。

实时监控与告警机制

在整个集成过程中,引入集中监控和告警系统至关重要。它能够实时跟踪每个任务的状态及其性能表现,一旦发现任何异常情况,如网络故障或接口服务未响应,可立即触发报警并执行预设应急方案,比如重试机制,从而确保任务稳定运行,不漏单不丢包。

本文将详细展开上述各环节中的具体实现方法,包括如何有效调用汤臣倍健营销云API、解决隔行问题以及正确配置 SQL Server 的定制化映射等内容。从实践出发,总结最优经验,希望能为其他企业同类项目提供参考借鉴。

用友BIP接口开发配置

调用源系统汤臣倍健营销云接口获取并加工数据

在轻易云数据集成平台的生命周期中,第一步是调用源系统的API接口以获取原始数据。本文将详细探讨如何通过调用汤臣倍健营销云的接口/api/openapi/v1/erp/order/honour/agreement/header来获取订单数据,并进行初步的数据加工。

接口配置与请求参数

我们使用POST方法来调用该API接口。以下是元数据配置中的主要请求参数:

{
  "api": "/api/openapi/v1/erp/order/honour/agreement/header",
  "method": "POST",
  "number": "no",
  "id": "id",
  "pagination": {
    "pageSize": 20
  },
  "beatFlat": ["details"],
  "idCheck": true,
  "request": [
    {"field":"orgId","label":"组织ID","type":"string","value":"bcf795d44109489f93c0560fa5d4bf0a"},
    {"field":"page","label":"页码","type":"string","value":"1"},
    {"field":"id","label":"订单id","type":"string"},
    {"field":"applyerId","label":"要货方id","type":"string"},
    {"field":"supplierId","label":"供货方id","type":"string"},
    {"field":"no","label":"订单号","type":"string"},
    {"field":"distributionType","label":"分销类型","type":"string"},
    {"field":"distributorId","label":"分销商id","type":"string"},
    {"field":"orderStatus","label":"订单状态","type":"string", "value": "WAIT_FINANCE_AUDIT,WAIT_DELIVERY,PART_DELIVERY,ALL_DELIVERY,WAIT_OUT_STORAGE,PART_OUT_STORAGE"},
    {"field":"createDt","label":"创建时间","type":"string"},
    {"field":"orderTypeCode","label":"订单类型,如普通订单、直运销售","type":"string"},
    {"field":"isDeliveryFreezed","label":"是否暂停发货","type":"string"},
    {"field":"relatedApplyerId","label":"关联交易经销商id","type":"string"},
    {"field":"saleDistribution","label":"销售渠道","type":"string"},
    {"field":"disApplyerId","label": "分销商id", "type": "string"},
    {"field": "startDt", "label": "订单时间(开始)", "type": "string"},
    {"field": "endDt", "label": "订单时间(结束)", "type": "string"},
    {"field": "appStartDt", "label": "审批时间(开始)", "type": "string"},
    {"field": "appEndDt", "label": "审批时间(结束)", "type": "string"},
    {"field": "lastStartDt", "label": "最后修改时间(开始)", type: string, value: "{{LAST_SYNC_TIME|datetime}}"},
    {"field": lastEndDt, label: 最后修改时间(结束), type: string, value: "{{CURRENT_TIME|datetime}}"}
  ]
}

请求参数详解

  • orgId: 固定值,用于标识组织ID。
  • page: 页码,初始值为1。
  • orderStatus: 包含多个状态值,用逗号分隔,表示需要查询的订单状态。
  • lastStartDtlastEndDt: 分别表示最后修改时间的开始和结束,用于增量同步数据。

这些参数确保了我们能够灵活地过滤和分页获取所需的订单数据。

数据请求与清洗

在发送请求后,我们会接收到一个包含多个订单记录的JSON响应。为了保证数据的一致性和准确性,需要对这些原始数据进行清洗和预处理。例如:

  1. 字段验证:确保所有必填字段都存在且格式正确。
  2. 数据转换:将日期字符串转换为标准日期格式,便于后续处理。
  3. 去重处理:根据id字段去重,避免重复记录。

以下是一个简单的数据清洗示例:

import requests
import json
from datetime import datetime

# 定义API URL和请求头
url = 'https://example.com/api/openapi/v1/erp/order/honour/agreement/header'
headers = {'Content-Type': 'application/json'}

# 构建请求体
payload = {
  'orgId': 'bcf795d44109489f93c0560fa5d4bf0a',
  'page': '1',
  'orderStatus': 'WAIT_FINANCE_AUDIT,WAIT_DELIVERY,PART_DELIVERY,ALL_DELIVERY,WAIT_OUT_STORAGE,PART_OUT_STORAGE',
  'lastStartDt': datetime.now().strftime('%Y-%m-%dT%H:%M:%S'),
  'lastEndDt': datetime.now().strftime('%Y-%m-%dT%H:%M:%S')
}

# 发起POST请求
response = requests.post(url, headers=headers, data=json.dumps(payload))

# 检查响应状态码
if response.status_code == 200:
  data = response.json()

  # 数据清洗示例
  cleaned_data = []

  for record in data['records']:
      if all(key in record for key in ('id', 'no', 'createDt')):
          record['createDt'] = datetime.strptime(record['createDt'], '%Y-%m-%dT%H:%M:%S')
          cleaned_data.append(record)

else:
  print(f"Error: {response.status_code}")

数据转换与写入

在完成数据清洗后,我们需要将其转换为目标系统所需的格式,并写入到目标数据库或其他存储系统中。这一步通常包括:

  • 字段映射:将源系统字段映射到目标系统字段。
  • 格式转换:例如,将日期格式从ISO标准转换为目标系统所需的格式。
  • 批量写入:为了提高效率,可以采用批量写入方式,将多个记录一次性写入目标系统。

通过上述步骤,我们可以高效地从汤臣倍健营销云接口获取并加工订单数据,为后续的数据集成和分析奠定基础。 如何对接钉钉API接口

数据集成与ETL转换:将源平台数据写入SQL Server API接口

在数据集成的生命周期中,ETL(提取、转换、加载)是关键步骤之一。本文将深入探讨如何通过轻易云数据集成平台,将已经集成的源平台数据进行ETL转换,并最终写入目标平台SQL Server。

配置元数据

在进行ETL操作之前,我们需要配置元数据,以便正确映射源数据到目标数据库表中。以下是我们使用的元数据配置:

{
    "api": "insert",
    "method": "POST",
    "idCheck": true,
    "request": [
        {
            "label": "主表参数",
            "field": "main_params",
            "type": "object",
            "children": [
                {"parent": "main_params", "label": "订单单号", "field": "djbh", "type": "string", "value": "{no}"},
                {"parent": "main_params", "label": "订单明细序号", "field": "dj_sn", "type":"string",  "value":"{bfn_line}"},
                {"parent":"main_params","label":"商品ID","field":"spid","type":"string","value":"_findCollection find spid from d76b64f9-f0e0-3436-a2d9-14c5579faa1b where spbh2={details_extNo}"},
                {"parent":"main_params","label":"订单数量","field":"shl","type":"string","value":"{details_quantity}"},
                {"parent":"main_params","label":"批号","field":"Pihao","type":"string","value":"{details__Flot}"},
                {"parent":"main_params","label":"有效期","field":"Sxrq","type":"string","value":"{{details__Fexp|date}}"},
                {"parent":"main_params","label":"生产日期","field":"Baozhiqi","type":"string","value":"{{details__Fmfg|date}}"},
                {"parent":"main_params","label":"含税价","field":"hshj","type":"string","value":"{details_price}"},
                {"parent":"main_params","label":"含税金额","field":"hsje","type":"string","value":"{details_money}"},
                {"parent":"main_params","label":"备注","field":"beizhu","type":"string","value":"{remark}"},
                {"parent":"main_params","label":"订单日期","field":"rq" ,"type" :"string" ,"value" :"{{lastUpdateDt|date}}"},
                {"parent" :"main_params" ,"label" :"订单时间" ,"field" :"ontime" ,"type" :"string" ,"value" :"{{lastUpdateDt|time}}"},
                {"parent" :"main_params" ,"label" :"单位名称" ,"field" :"wldwname" ,"type" :"string" ,"value" :"{details_unitCode}"},
                {"parent" :"main_params" ,"label" :"单位名称 ","field ":"wldwid ","type ":"string ","value ":"{clientAppNo }"},
                {"parent ":"main_params ","label ":"地址电话 ","field ":"dizhi ","type ":"string ","value ":"{shippingAddress }"},
                {"parent ":"main_params ","label ":"收货人 ","field ":"shr ","type ":"string ","value ":"{contacts }"},
                {"parent ":"main_params ","label ":"收货人电话 ","field ":"shrdh ","type ":"string ","value ":"{phone }"},
                {"parent ":"main_params ","label ":"业务员 ","field ":"ywy ","type ":"string ","value ": "{salesmanName }"},
                {"parent ":  “ main _params ” , “ label ” : “ 组织 ID ” , “ field ” : “ hzid ” , “ type ” : “ string ” , “ value ” : “ bcf795d44109489f93c0560fa5d4bf0a ”},
                 {" parent ":  “ main _params ” , “ label ” : “ 仓库名称 ” , “ field ” : “ ckname ” , “ type ” : “ string ” , “ value ” : “ {orgName }”}
            ]
        }
    ],
    "otherRequest":[
        {
            "label":  “ 主 SQL 语句” ,
             “ field” :  “ main_sql” ,
             “ type” :  “ string” ,
             “ value” :  “ INSERT INTO Inter_ddmx (djbh ,dj_sn,spid,shl,Pihao,Sxrq,Baozhiqi,hshj,hsje,beizhu,rq,ontime,wldwname,wldwid,dizhi,shr,shrdh,ywy,hzid,ckname) values ( :djbh ,:dj_sn,:spid,:shl,:Pihao,:Sxrq,:Baozhiqi,:hshj,:hsje,:beizhu,:rq,:ontime,:wldwname,:wldwid,:dizhi,:shr,:shrdh,:ywy,:hzid,:ckname)”
        }
    ],
    enforcedAssociation: true
}

数据请求与清洗

在ETL过程的第一步,我们从源系统中提取原始数据,并对其进行必要的清洗和处理。例如,确保所有字段都符合预期格式,并处理缺失或异常值。这里我们使用了_findCollection函数来查找特定字段,如商品ID(spid)。

数据转换与映射

接下来,我们将清洗后的数据映射到目标平台所需的格式。根据元数据配置,每个字段都被精确地映射到SQL Server数据库中的相应字段。例如:

  • 订单单号djbh)被映射为 {no}
  • 商品IDspid)通过 _findCollection find spid from d76b64f9-f0e0-3436-a2d9-14c5579faa1b where spbh2={details_extNo} 查找并获取。
  • 日期字段如 有效期Sxrq)和 生产日期Baozhiqi)则通过 {{details__Fexp|date}}{{details__Fmfg|date}} 格式化。

数据写入

最后一步是将转换后的数据写入SQL Server数据库。我们使用配置中的主SQL语句:

INSERT INTO Inter_ddmx (djbh ,dj_sn,spid,shl,Pihao,Sxrq,Baozhiqi,hshj,hsje,beizhu,rq,ontime,wldwname,wldwid,dizhi,shr,shrdh,ywy,hzid,ckname) 
VALUES (:djbh ,:dj_sn,:spid,:shl,:Pihao,:Sxrq,:Baozhiqi,:hshj,:hsje,:beizhu,:rq,:ontime,:wldwname,:wldwid,:dizhi,:shr ,:shrdh ,:ywy ,:hzid ,:ckname)

通过POST请求方法,将每个参数值替换为实际的数据,确保所有字段都正确匹配并插入到目标表中。

实际案例应用

假设我们有一个订单记录,其原始数据如下:

{
    no: 'ORD123456',
    bfn_line: '001',
    details_extNo: 'PROD789',
    details_quantity: '10',
    details__Flot: 'LOT123',
    details__Fexp: '2023-12-31',
    details__Fmfg: '2023-01-01',
    details_price: '100.00',
    details_money: '1000.00',
    remark: 'Urgent order',
    lastUpdateDt: '2023-10-01T10:00:00Z',
    details_unitCode: 'UNIT001',
    clientAppNo: 'CLIENT123',
    shippingAddress: '123 Main St.',
    contacts: 'John Doe',
    phone: '1234567890',
    salesmanName: 'Jane Smith',
    orgName:'Warehouse A'
}

经过上述ETL过程后,生成的SQL插入语句如下:

INSERT INTO Inter_ddmx (djbh ,dj_sn,spid,shl,Pihao,Sxrq,Baozhiqi,hshj,hsje,beizhu,rq,ontime,wldwname,wldwid,dizhi,shr ,shrdh ,ywy ,hzid ,ckname) 
VALUES ('ORD123456', '001', (SELECT spid FROM d76b64f9-f0e0-3436-a2d9-14c5579faa1b WHERE spbh2='PROD789'), '10', 'LOT123', '2023-12-31', 
'2023-01-01', '100.00', '1000.00', 'Urgent order', '2023-10-01', '10:00:00', 
'UNIT001', 'CLIENT123', 
'123 Main St.', 
'John Doe', 
'1234567890', 
'Jane Smith', 
'bcf795d44109489f93c0560fa5d4bf0a', 
'Warehouse A')

通过这种方式,我们成功地将源平台的数据转换并写入到目标SQL Server数据库中,实现了高效的数据集成与管理。 如何对接金蝶云星空API接口