轻易云平台上的订单数据ETL转换与SQLServer写入

  • 轻易云集成顾问-彭亮
### 新版订单同步-四川绪泰+:实现汤臣倍健营销云数据到SQL Server的无缝集成 在本技术案例中,我们深入探讨如何通过新版订单同步-四川绪泰+方案,实现汤臣倍健营销云的数据高效集成到SQL Server系统。这一方案利用了平台的多项特性,确保数据处理过程中不漏单,实现业务与技术需求的完美契合。 首先,通过调用API接口`/api/openapi/v1/erp/order/honour/agreement/header`从汤臣倍健营销云获取最新订单数据,这是整个流程的起点。为保障高吞吐量的数据写入能力,大量订单数据被快速有效地抓取并准备好进行后续处理。同时,为应对接口分页和限流问题,特别设计了异步抓取策略,最大程度上提高了接口调用效率。 接着,在将数据写入SQL Server之前,我们需要针对不同的数据格式差异定制化转换逻辑。借助平台提供的可视化数据流设计工具,我们配置了一系列变换规则,将原始API响应中的JSON格式转换为符合SQL Server表结构要求的数据记录,并通过`insert`操作插入数据库中。 此外,为保证全流程透明度和可控性,新版订单同步方案引入集中监控和告警系统,对每个步骤进行实时跟踪。一旦出现异常情况,例如超时或错误响应,这些都可以被及时捕获并触发相应的重试机制,从而大幅降低因偶发故障导致的数据丢失风险。 最后,还实施了一系列质量监控措施。在完成初步写入后,会有二次校验来确保所有已拉取且成功插入数据库的记录准确无误。如果检测出任何不一致之处,该条目会立即重新抓取、解析并再次尝试写入,以此维持整体系统的一致性和可靠性。 以上介绍仅覆盖核心环节,下文将详细讨论具体操作方法及注意事项,为您全面展示这一解决方案如何在实际环境中运行,高效地达成既定目标。 ![如何开发钉钉API接口](https://pic.qeasy.cloud/D15.png~tplv-syqr462i7n-qeasy.image) ### 调用源系统汤臣倍健营销云接口获取并加工数据 在轻易云数据集成平台的生命周期中,调用源系统接口是数据处理的第一步。本文将深入探讨如何通过调用汤臣倍健营销云的API接口`/api/openapi/v1/erp/order/honour/agreement/header`获取订单数据,并进行初步加工。 #### 接口调用配置 首先,我们需要配置API接口的元数据。根据提供的metadata,以下是关键配置项: - **API路径**:`/api/openapi/v1/erp/order/honour/agreement/header` - **请求方法**:POST - **分页设置**:每页20条记录 - **请求参数**: - `orgId`(组织ID):固定值 `d4f5ddb3c6584af7980904b63bee33ce` - `page`(页码):默认值 `1` - `id`(订单ID) - `applyerId`(要货方ID) - `supplierId`(供货方ID) - `no`(订单号) - `distributionType`(分销类型) - `distributorId`(分销商ID) - `orderStatus`(订单状态):固定值 `WAIT_FINANCE_AUDIT,WAIT_DELIVERY,PART_DELIVERY,ALL_DELIVERY,WAIT_OUT_STORAGE,PART_OUT_STORAGE` - `createDt`(创建时间) - `orderTypeCode`(订单类型,如普通订单、直运销售) - `isDeliveryFreezed`(是否暂停发货) - `relatedApplyerId`(关联交易经销商ID) - `saleDistribution`(销售渠道) - `disApplyerId`(分销商ID) - `startDt`(订单时间开始) - `endDt`(订单时间结束) - `appStartDt`(审批时间开始) - `appEndDt`(审批时间结束) - `lastStartDt`(最后修改时间开始):动态值,使用上次同步时间 - `lastEndDt`(最后修改时间结束):动态值,使用当前时间 - 单据类型:固定值为1,即订单 #### 请求示例 为了实现上述配置,我们需要构建一个POST请求。以下是一个示例请求体: ```json { "orgId": "d4f5ddb3c6584af7980904b63bee33ce", "page": "1", "orderStatus": "WAIT_FINANCE_AUDIT,WAIT_DELIVERY,PART_DELIVERY,ALL_DELIVERY,WAIT_OUT_STORAGE,PART_OUT_STORAGE", "lastStartDt": "{{LAST_SYNC_TIME|datetime}}", "lastEndDt": "{{CURRENT_TIME|datetime}}", "nature": "1" } ``` 该请求体包含了必要的参数,并使用动态变量来填充最后修改时间范围。 #### 数据清洗与加工 在获取到原始数据后,我们需要对其进行清洗和初步加工。以下是一些常见的数据清洗步骤: 1. **字段映射与转换**: 将API返回的数据字段映射到目标系统所需的字段。例如,将返回的订单状态字段转换为目标系统中的对应状态码。 2. **数据过滤**: 根据业务需求过滤掉不需要的数据。例如,只保留状态为“已审核”的订单。 3. **格式化处理**: 对日期、金额等字段进行格式化处理,以符合目标系统的要求。例如,将日期格式从YYYY-MM-DD转换为YYYYMMDD。 #### 示例代码 以下是一个简单的数据清洗示例代码片段: ```python import requests import json from datetime import datetime # 配置API请求参数 url = 'https://api.example.com/api/openapi/v1/erp/order/honour/agreement/header' headers = {'Content-Type': 'application/json'} payload = { "orgId": "d4f5ddb3c6584af7980904b63bee33ce", "page": "1", "orderStatus": "WAIT_FINANCE_AUDIT,WAIT_DELIVERY,PART_DELIVERY,ALL_DELIVERY,WAIT_OUT_STORAGE,PART_OUT_STORAGE", "lastStartDt": datetime.now().strftime('%Y-%m-%dT%H:%M:%S'), "lastEndDt": datetime.now().strftime('%Y-%m-%dT%H:%M:%S'), "nature": "1" } # 发起POST请求 response = requests.post(url, headers=headers, data=json.dumps(payload)) data = response.json() # 数据清洗与转换 cleaned_data = [] for order in data['orders']: cleaned_order = { 'order_id': order['id'], 'order_status': map_status(order['status']), 'created_date': format_date(order['createDt']), # 添加更多字段映射和转换逻辑... } cleaned_data.append(cleaned_order) def map_status(status): status_mapping = { 'WAIT_FINANCE_AUDIT': '待财务审核', 'WAIT_DELIVERY': '待发货', # 添加更多状态映射... } return status_mapping.get(status, '未知状态') def format_date(date_str): return datetime.strptime(date_str, '%Y-%m-%dT%H:%M:%S').strftime('%Y%m%d') # 输出清洗后的数据 print(json.dumps(cleaned_data, indent=2)) ``` 以上代码展示了如何调用API获取数据,并对返回的数据进行初步清洗和格式化处理。通过这种方式,我们可以确保数据在进入下一阶段处理之前已经过基本的验证和转换,为后续的数据集成奠定基础。 ![打通企业微信数据接口](https://pic.qeasy.cloud/S26.png~tplv-syqr462i7n-qeasy.image) ### 使用轻易云数据集成平台实现订单数据ETL转换与写入SQL Server 在轻易云数据集成平台中,数据处理的第二步是将已经集成的源平台数据进行ETL(Extract, Transform, Load)转换,并将其转为目标平台SQL Server API接口所能够接收的格式,最终写入目标平台。本文将详细探讨这一过程中的技术细节和实现方法。 #### 数据请求与清洗 首先,我们需要从源系统中提取订单相关的数据。这些数据通常包括订单单号、商品ID、订单数量、批号、有效期等。提取的数据可能存在不一致或冗余,需要进行清洗和标准化处理。以下是一个典型的数据请求配置示例: ```json { "api": "insert", "method": "POST", "idCheck": true, "request": [ { "label": "主表参数", "field": "main_params", "type": "object", "children": [ {"parent": "main_params", "label": "订单单号", "field": "djbh", "type": "string", "value": "{no}"}, {"parent": "main_params", "label": "订单明细序号", "field": "dj_sn", "type": "string", "value": "{bfn_line}"}, {"parent": "main_params", "label": "商品ID", "field": "spid", "type": "string", "value":"_findCollection find spid from d76b64f9-f0e0-3436-a2d9-14c5579faa1b where spbh2={details_extNo}" }, {"parent": "main_params", ... } ] } ], ... } ``` #### 数据转换 在数据清洗之后,下一步是将这些数据转换为目标平台SQL Server所能接受的格式。这一步骤涉及到字段映射、数据类型转换以及特定格式的处理。例如,将日期字段格式化为SQL Server可识别的日期格式。 以下是部分字段的转换示例: - `订单单号` (djbh): 从源系统中提取并直接映射到目标字段。 - `商品ID` (spid): 使用 `_findCollection` 函数从指定集合中查找对应的商品ID。 - `有效期` (Sxrq): 将日期字段格式化为SQL Server可接受的日期字符串。 ```json { ... { ... {"parent":"main_params","label":"有效期","field":"Sxrq","type":"string","value":"{{details__Fexp|date}}"}, {"parent":"main_params","label":"生产日期","field":"Baozhiqi","type":"string","value":"{{details__Fmfg|date}}"}, ... } } ``` #### 数据写入 完成数据转换后,最后一步是将这些数据通过API接口写入到目标平台SQL Server。轻易云提供了全异步、支持多种异构系统集成的平台,使得这一过程变得高效且可靠。 以下是一个典型的SQL插入语句配置示例: ```json { ... { ... { label: '主SQL语句', field: 'main_sql', type: 'string', value: 'INSERT INTO Inter_ddmx (djbh ,dj_sn,spid,shl,Pihao,Sxrq,Baozhiqi,hshj,hsje,beizhu,rq,ontime,wldwname,wldwid,dizhi,shr,shrdh,ywy,hzid,ckname) values (:djbh ,:dj_sn,:spid,:shl,:Pihao,:Sxrq,:Baozhiqi,:hshj,:hsje,:beizhu,:rq,:ontime,:wldwname,:wldwid,:dizhi,:shr,:shrdh,:ywy,:hzid,:ckname)' } ... } } ``` 该配置中定义了插入操作所需的所有字段及其对应的参数。这些参数通过前面的字段映射和转换步骤已经准备就绪。 #### 技术要点总结 1. **元数据配置**:通过元数据配置文件定义API接口、请求方法及字段映射关系。 2. **字段映射**:利用轻易云提供的工具进行字段映射和数据类型转换,确保源系统与目标系统之间的数据一致性。 3. **异步处理**:利用全异步处理机制,提高数据写入效率和系统响应速度。 4. **实时监控**:通过实时监控功能,可以随时查看数据流动和处理状态,确保每个环节都清晰可见。 通过上述步骤,我们可以高效地将源平台的数据经过ETL转换后写入到目标平台SQL Server,实现不同系统间的数据无缝对接。 ![如何开发钉钉API接口](https://pic.qeasy.cloud/T20.png~tplv-syqr462i7n-qeasy.image)