汤臣倍健营销云数据集成到SQL Server的技术实现
在本案例中,我们探讨了如何通过轻易云数据集成平台,将汤臣倍健营销云的数据高效同步到SQL Server数据库。这一过程涉及API接口调用、数据质量监控、异常检测和处理等多个技术要点,确保从数据获取到存储的一系列操作无缝衔接。
数据获取与初始设置
首先,通过调用汤臣倍健营销云提供的API /api/openapi/v1/erp/order/honour/agreement/header
获取订单相关的数据。为应对大规模数据传输带来的挑战,我们采用定时任务机制,可靠地批量抓取这些API接口的数据。
数据转换与映射
为了将获取的数据顺利写入SQL Server,需要进行适当的格式转换和映射。在这一步骤中,自定义的数据转换逻辑尤为关键,以适配特定业务需求及不同系统间的结构差异。通过可视化设计工具,可以直观且快速地调整数据流,从而简化管理流程。
高吞吐量写入与性能优化
面对海量订单信息,为提升效率,我们充分利用了平台支撑的大吞吐量写入能力,通过 insert
API 将转换后的数据注入SQL Server。这不仅提升了处理速度,也保证了业务实时性的要求。此外,还需注意分页和限流策略,以避免因单次请求过多导致系统性能瓶颈或超时错误。
实时监控与告警机制
在整个集成过程中,引入集中监控和告警系统至关重要。它能够实时跟踪每个任务的状态及其性能表现,一旦发现任何异常情况,如网络故障或接口服务未响应,可立即触发报警并执行预设应急方案,比如重试机制,从而确保任务稳定运行,不漏单不丢包。
本文将详细展开上述各环节中的具体实现方法,包括如何有效调用汤臣倍健营销云API、解决隔行问题以及正确配置 SQL Server 的定制化映射等内容。从实践出发,总结最优经验,希望能为其他企业同类项目提供参考借鉴。
调用源系统汤臣倍健营销云接口获取并加工数据
在轻易云数据集成平台的生命周期中,第一步是调用源系统的API接口以获取原始数据。本文将详细探讨如何通过调用汤臣倍健营销云的接口/api/openapi/v1/erp/order/honour/agreement/header
来获取订单数据,并进行初步的数据加工。
接口配置与请求参数
我们使用POST方法来调用该API接口。以下是元数据配置中的主要请求参数:
{
"api": "/api/openapi/v1/erp/order/honour/agreement/header",
"method": "POST",
"number": "no",
"id": "id",
"pagination": {
"pageSize": 20
},
"beatFlat": ["details"],
"idCheck": true,
"request": [
{"field":"orgId","label":"组织ID","type":"string","value":"bcf795d44109489f93c0560fa5d4bf0a"},
{"field":"page","label":"页码","type":"string","value":"1"},
{"field":"id","label":"订单id","type":"string"},
{"field":"applyerId","label":"要货方id","type":"string"},
{"field":"supplierId","label":"供货方id","type":"string"},
{"field":"no","label":"订单号","type":"string"},
{"field":"distributionType","label":"分销类型","type":"string"},
{"field":"distributorId","label":"分销商id","type":"string"},
{"field":"orderStatus","label":"订单状态","type":"string", "value": "WAIT_FINANCE_AUDIT,WAIT_DELIVERY,PART_DELIVERY,ALL_DELIVERY,WAIT_OUT_STORAGE,PART_OUT_STORAGE"},
{"field":"createDt","label":"创建时间","type":"string"},
{"field":"orderTypeCode","label":"订单类型,如普通订单、直运销售","type":"string"},
{"field":"isDeliveryFreezed","label":"是否暂停发货","type":"string"},
{"field":"relatedApplyerId","label":"关联交易经销商id","type":"string"},
{"field":"saleDistribution","label":"销售渠道","type":"string"},
{"field":"disApplyerId","label": "分销商id", "type": "string"},
{"field": "startDt", "label": "订单时间(开始)", "type": "string"},
{"field": "endDt", "label": "订单时间(结束)", "type": "string"},
{"field": "appStartDt", "label": "审批时间(开始)", "type": "string"},
{"field": "appEndDt", "label": "审批时间(结束)", "type": "string"},
{"field": "lastStartDt", "label": "最后修改时间(开始)", type: string, value: "{{LAST_SYNC_TIME|datetime}}"},
{"field": lastEndDt, label: 最后修改时间(结束), type: string, value: "{{CURRENT_TIME|datetime}}"}
]
}
请求参数详解
orgId
: 固定值,用于标识组织ID。page
: 页码,初始值为1。orderStatus
: 包含多个状态值,用逗号分隔,表示需要查询的订单状态。lastStartDt
和lastEndDt
: 分别表示最后修改时间的开始和结束,用于增量同步数据。
这些参数确保了我们能够灵活地过滤和分页获取所需的订单数据。
数据请求与清洗
在发送请求后,我们会接收到一个包含多个订单记录的JSON响应。为了保证数据的一致性和准确性,需要对这些原始数据进行清洗和预处理。例如:
- 字段验证:确保所有必填字段都存在且格式正确。
- 数据转换:将日期字符串转换为标准日期格式,便于后续处理。
- 去重处理:根据
id
字段去重,避免重复记录。
以下是一个简单的数据清洗示例:
import requests
import json
from datetime import datetime
# 定义API URL和请求头
url = 'https://example.com/api/openapi/v1/erp/order/honour/agreement/header'
headers = {'Content-Type': 'application/json'}
# 构建请求体
payload = {
'orgId': 'bcf795d44109489f93c0560fa5d4bf0a',
'page': '1',
'orderStatus': 'WAIT_FINANCE_AUDIT,WAIT_DELIVERY,PART_DELIVERY,ALL_DELIVERY,WAIT_OUT_STORAGE,PART_OUT_STORAGE',
'lastStartDt': datetime.now().strftime('%Y-%m-%dT%H:%M:%S'),
'lastEndDt': datetime.now().strftime('%Y-%m-%dT%H:%M:%S')
}
# 发起POST请求
response = requests.post(url, headers=headers, data=json.dumps(payload))
# 检查响应状态码
if response.status_code == 200:
data = response.json()
# 数据清洗示例
cleaned_data = []
for record in data['records']:
if all(key in record for key in ('id', 'no', 'createDt')):
record['createDt'] = datetime.strptime(record['createDt'], '%Y-%m-%dT%H:%M:%S')
cleaned_data.append(record)
else:
print(f"Error: {response.status_code}")
数据转换与写入
在完成数据清洗后,我们需要将其转换为目标系统所需的格式,并写入到目标数据库或其他存储系统中。这一步通常包括:
- 字段映射:将源系统字段映射到目标系统字段。
- 格式转换:例如,将日期格式从ISO标准转换为目标系统所需的格式。
- 批量写入:为了提高效率,可以采用批量写入方式,将多个记录一次性写入目标系统。
通过上述步骤,我们可以高效地从汤臣倍健营销云接口获取并加工订单数据,为后续的数据集成和分析奠定基础。
数据集成与ETL转换:将源平台数据写入SQL Server API接口
在数据集成的生命周期中,ETL(提取、转换、加载)是关键步骤之一。本文将深入探讨如何通过轻易云数据集成平台,将已经集成的源平台数据进行ETL转换,并最终写入目标平台SQL Server。
配置元数据
在进行ETL操作之前,我们需要配置元数据,以便正确映射源数据到目标数据库表中。以下是我们使用的元数据配置:
{
"api": "insert",
"method": "POST",
"idCheck": true,
"request": [
{
"label": "主表参数",
"field": "main_params",
"type": "object",
"children": [
{"parent": "main_params", "label": "订单单号", "field": "djbh", "type": "string", "value": "{no}"},
{"parent": "main_params", "label": "订单明细序号", "field": "dj_sn", "type":"string", "value":"{bfn_line}"},
{"parent":"main_params","label":"商品ID","field":"spid","type":"string","value":"_findCollection find spid from d76b64f9-f0e0-3436-a2d9-14c5579faa1b where spbh2={details_extNo}"},
{"parent":"main_params","label":"订单数量","field":"shl","type":"string","value":"{details_quantity}"},
{"parent":"main_params","label":"批号","field":"Pihao","type":"string","value":"{details__Flot}"},
{"parent":"main_params","label":"有效期","field":"Sxrq","type":"string","value":"{{details__Fexp|date}}"},
{"parent":"main_params","label":"生产日期","field":"Baozhiqi","type":"string","value":"{{details__Fmfg|date}}"},
{"parent":"main_params","label":"含税价","field":"hshj","type":"string","value":"{details_price}"},
{"parent":"main_params","label":"含税金额","field":"hsje","type":"string","value":"{details_money}"},
{"parent":"main_params","label":"备注","field":"beizhu","type":"string","value":"{remark}"},
{"parent":"main_params","label":"订单日期","field":"rq" ,"type" :"string" ,"value" :"{{lastUpdateDt|date}}"},
{"parent" :"main_params" ,"label" :"订单时间" ,"field" :"ontime" ,"type" :"string" ,"value" :"{{lastUpdateDt|time}}"},
{"parent" :"main_params" ,"label" :"单位名称" ,"field" :"wldwname" ,"type" :"string" ,"value" :"{details_unitCode}"},
{"parent" :"main_params" ,"label" :"单位名称 ","field ":"wldwid ","type ":"string ","value ":"{clientAppNo }"},
{"parent ":"main_params ","label ":"地址电话 ","field ":"dizhi ","type ":"string ","value ":"{shippingAddress }"},
{"parent ":"main_params ","label ":"收货人 ","field ":"shr ","type ":"string ","value ":"{contacts }"},
{"parent ":"main_params ","label ":"收货人电话 ","field ":"shrdh ","type ":"string ","value ":"{phone }"},
{"parent ":"main_params ","label ":"业务员 ","field ":"ywy ","type ":"string ","value ": "{salesmanName }"},
{"parent ": “ main _params ” , “ label ” : “ 组织 ID ” , “ field ” : “ hzid ” , “ type ” : “ string ” , “ value ” : “ bcf795d44109489f93c0560fa5d4bf0a ”},
{" parent ": “ main _params ” , “ label ” : “ 仓库名称 ” , “ field ” : “ ckname ” , “ type ” : “ string ” , “ value ” : “ {orgName }”}
]
}
],
"otherRequest":[
{
"label": “ 主 SQL 语句” ,
“ field” : “ main_sql” ,
“ type” : “ string” ,
“ value” : “ INSERT INTO Inter_ddmx (djbh ,dj_sn,spid,shl,Pihao,Sxrq,Baozhiqi,hshj,hsje,beizhu,rq,ontime,wldwname,wldwid,dizhi,shr,shrdh,ywy,hzid,ckname) values ( :djbh ,:dj_sn,:spid,:shl,:Pihao,:Sxrq,:Baozhiqi,:hshj,:hsje,:beizhu,:rq,:ontime,:wldwname,:wldwid,:dizhi,:shr,:shrdh,:ywy,:hzid,:ckname)”
}
],
enforcedAssociation: true
}
数据请求与清洗
在ETL过程的第一步,我们从源系统中提取原始数据,并对其进行必要的清洗和处理。例如,确保所有字段都符合预期格式,并处理缺失或异常值。这里我们使用了_findCollection
函数来查找特定字段,如商品ID(spid
)。
数据转换与映射
接下来,我们将清洗后的数据映射到目标平台所需的格式。根据元数据配置,每个字段都被精确地映射到SQL Server数据库中的相应字段。例如:
订单单号
(djbh
)被映射为{no}
。商品ID
(spid
)通过_findCollection find spid from d76b64f9-f0e0-3436-a2d9-14c5579faa1b where spbh2={details_extNo}
查找并获取。- 日期字段如
有效期
(Sxrq
)和生产日期
(Baozhiqi
)则通过{{details__Fexp|date}}
和{{details__Fmfg|date}}
格式化。
数据写入
最后一步是将转换后的数据写入SQL Server数据库。我们使用配置中的主SQL语句:
INSERT INTO Inter_ddmx (djbh ,dj_sn,spid,shl,Pihao,Sxrq,Baozhiqi,hshj,hsje,beizhu,rq,ontime,wldwname,wldwid,dizhi,shr,shrdh,ywy,hzid,ckname)
VALUES (:djbh ,:dj_sn,:spid,:shl,:Pihao,:Sxrq,:Baozhiqi,:hshj,:hsje,:beizhu,:rq,:ontime,:wldwname,:wldwid,:dizhi,:shr ,:shrdh ,:ywy ,:hzid ,:ckname)
通过POST请求方法,将每个参数值替换为实际的数据,确保所有字段都正确匹配并插入到目标表中。
实际案例应用
假设我们有一个订单记录,其原始数据如下:
{
no: 'ORD123456',
bfn_line: '001',
details_extNo: 'PROD789',
details_quantity: '10',
details__Flot: 'LOT123',
details__Fexp: '2023-12-31',
details__Fmfg: '2023-01-01',
details_price: '100.00',
details_money: '1000.00',
remark: 'Urgent order',
lastUpdateDt: '2023-10-01T10:00:00Z',
details_unitCode: 'UNIT001',
clientAppNo: 'CLIENT123',
shippingAddress: '123 Main St.',
contacts: 'John Doe',
phone: '1234567890',
salesmanName: 'Jane Smith',
orgName:'Warehouse A'
}
经过上述ETL过程后,生成的SQL插入语句如下:
INSERT INTO Inter_ddmx (djbh ,dj_sn,spid,shl,Pihao,Sxrq,Baozhiqi,hshj,hsje,beizhu,rq,ontime,wldwname,wldwid,dizhi,shr ,shrdh ,ywy ,hzid ,ckname)
VALUES ('ORD123456', '001', (SELECT spid FROM d76b64f9-f0e0-3436-a2d9-14c5579faa1b WHERE spbh2='PROD789'), '10', 'LOT123', '2023-12-31',
'2023-01-01', '100.00', '1000.00', 'Urgent order', '2023-10-01', '10:00:00',
'UNIT001', 'CLIENT123',
'123 Main St.',
'John Doe',
'1234567890',
'Jane Smith',
'bcf795d44109489f93c0560fa5d4bf0a',
'Warehouse A')
通过这种方式,我们成功地将源平台的数据转换并写入到目标SQL Server数据库中,实现了高效的数据集成与管理。