从数据获取到ETL写入:轻松实现数据集成全生命周期管理

  • 轻易云集成顾问-卢剑航
### 汤臣倍健营销云数据集成到SQL Server案例:采购入库同步-(Life-Space)江油泰熙 在实施"采购入库同步-(Life-Space)江油泰熙"这个项目过程中,我们成功地将汤臣倍健营销云的数据无缝对接到了SQL Server数据库。具体流程是通过调用汤臣倍健营销云的API接口`/erp/api/order/query/purInWarehsOrder`获取原始数据,然后利用轻易云平台对这些数据进行处理和转换,最终批量写入到目标SQL Server中。这一系列操作确保了数据的完整性、准确性,并大幅提升了业务流程效率。 为了应对极大的数据流量和复杂的数据处理需求,我们采取了一些关键技术措施: #### 确保数据不漏单 为了保证从汤臣倍健营销云获取的数据完全覆盖所有可用信息,我们实现了多层次的数据检查机制。首先,通过定时任务可靠地抓取API提供的新订单记录;其次,在每次抓取后,会进行校验来确认没有遗漏任何订单。此外,还采用日志监控手段,对每个环节进行实时记录,可根据日志追溯并验证是否存在漏单情况。 #### 批量集成与快速写入 面对大量采集来的业务数据,高效且稳定地批量写入到SQL Server成为关键之一。为此,我们设计并优化了一种专用的批处理方案,通过分块实现大量数据的快速插入。同时,还使用了高性能的数据库驱动,使得整个插入过程迅速且低延迟。此外,为确保每次批量操作的一致性和完整性,事务机制被严格应用,以防止部分失败或异常导致的数据不一致问题。 #### 处理分页与限流问题 由于API接口返回的大规模数据信息往往被分页限制,为全面抓取全部所需内容,我们设计了智能化爬虫策略,根据实际需要逐页调度请求,从而稳定、有效地收集所有相关信息。而对于突发的大负载流量,系统及时响应进行了自动限流保护,在适当范围内平滑分布网络压力,以避免服务器过载或宕机影响整体运营。 ##### 数据格式差异解决方案 两种系统间常见的问题之一,是源自不同平台间的元数据信息格式差异。在本项目中,主要通过构建灵活强大的映射规则引擎,将购买订单等重要字段从JSON格式转化为符合标准关系型数据库模版,同时依据实际业务需求定制特有映射逻辑,一方面减少人工介入力度,提高自动化水平;另一方面,也缩短开发周期,加快上线步伐。 这几项核心实施策略,大 ![企业微信与OA系统接口开发配置](https://pic.qeasy.cloud/D18.png~tplv-syqr462i7n-qeasy.image) ### 调用汤臣倍健营销云接口获取并加工数据的技术案例 在数据集成生命周期的第一步,我们需要从源系统获取数据并进行初步加工。本文将详细探讨如何调用汤臣倍健营销云接口`/erp/api/order/query/purInWarehsOrder`,并结合元数据配置进行数据请求与清洗。 #### 接口调用配置 首先,我们需要配置API调用的相关参数。根据提供的元数据配置,以下是关键参数的设置: - **API路径**:`/erp/api/order/query/purInWarehsOrder` - **请求方法**:`POST` - **分页设置**:每页30条记录 - **条件过滤**:过滤掉物料编号包含"F.A"的记录 - **ID检查**:启用ID检查 #### 请求参数详解 在请求参数中,有几个关键字段需要特别注意: 1. **tenantId(经销商id)**: - 类型:字符串 - 描述:必填项,用于标识经销商。例如:`34cc4109705e4c058b7b3b0352e57d31` 2. **yxyNumber(营销云销售订单号)**: - 类型:字符串 - 描述:传此参数时,其他时间状态等条件无效。例如:`YD1215710122031701` 3. **number(系统订单号)**: - 类型:字符串 - 描述:传此参数时,其他时间状态等条件无效。例如:`XOUT0000000293` 4. **status(订单状态)**: - 类型:字符串 - 描述:0表示未审核,1表示已审核(已出库)。默认值为1。 5. **beginTime(开始时间)和endTime(结束时间)**: - 类型:字符串 - 描述:用于基于创建时间或更新时间查询。如果不传单号,这两个字段必填。格式为`YYYY-MM-DD`或`YYYY-MM-DD HH:MM:SS`。 - 动态值示例: ```json "beginTime": "{{LAST_SYNC_TIME|datetime}}", "endTime": "{{CURRENT_TIME|datetime}}" ``` 6. **pageNo(页码)和pageSize(每页条数)**: - 类型:字符串 - 描述:默认值分别为1和30。 7. **timeType(时间段标志)**: - 类型:字符串 - 描述:0表示基于创建时间查询,1表示基于更新时间查询。默认值为1。 #### 请求示例 以下是一个完整的请求示例: ```json { "tenantId": "34cc4109705e4c058b7b3b0352e57d31", "status": "1", "beginTime": "{{LAST_SYNC_TIME|datetime}}", "endTime": "{{CURRENT_TIME|datetime}}", "pageNo": "1", "pageSize": "30", "timeType": "1" } ``` #### 数据清洗与转换 在获取到数据后,需要对数据进行初步清洗和转换,以确保其符合目标系统的要求。根据元数据配置,我们可以进行以下处理: - **过滤条件应用**: 根据配置中的条件过滤项,排除物料编号包含"F.A"的记录。这可以通过简单的逻辑判断实现。 - **字段映射与转换**: 将源系统中的字段映射到目标系统所需的字段,并进行必要的数据类型转换。例如,将日期格式从源系统格式转换为目标系统所需格式。 #### 实现代码示例 以下是一个简单的Python代码示例,用于调用API并处理返回的数据: ```python import requests import json # 配置API请求参数 url = 'https://api.example.com/erp/api/order/query/purInWarehsOrder' headers = {'Content-Type': 'application/json'} payload = { "tenantId": "34cc4109705e4c058b7b3b0352e57d31", "status": "1", "beginTime": "{{LAST_SYNC_TIME|datetime}}", "endTime": "{{CURRENT_TIME|datetime}}", "pageNo": "1", "pageSize": "30", "timeType": "1" } # 发起POST请求 response = requests.post(url, headers=headers, data=json.dumps(payload)) # 检查响应状态码 if response.status_code == 200: data = response.json() # 数据清洗与转换 cleaned_data = [] for item in data['items']: if 'F.A' not in item['itemList']['materialNumber']: cleaned_item = { 'id': item['id'], 'number': item['number'], # 添加更多字段映射与转换逻辑... } cleaned_data.append(cleaned_item) # 输出清洗后的数据 print(json.dumps(cleaned_data, indent=4)) else: print(f"Error: {response.status_code}") ``` 通过上述步骤,我们成功地从汤臣倍健营销云接口获取了采购入库订单数据,并进行了初步的数据清洗和转换,为后续的数据处理奠定了基础。这一过程展示了如何利用轻易云数据集成平台实现高效的数据集成和处理。 ![打通企业微信数据接口](https://pic.qeasy.cloud/S24.png~tplv-syqr462i7n-qeasy.image) ### 数据集成生命周期中的ETL转换与写入 在数据集成的生命周期中,ETL(Extract, Transform, Load)转换和写入是至关重要的一步。本文将重点探讨如何利用轻易云数据集成平台,将源平台的数据转换为目标平台SQL Server API接口所能接收的格式,并最终写入目标平台。 #### 元数据配置解析 在进行ETL转换之前,我们需要详细了解元数据配置。这些配置定义了如何从源数据中提取信息,并将其转换为目标平台所需的格式。 ```json { "api": "insert", "method": "POST", "idCheck": true, "request": [ { "label": "主表参数", "field": "main_params", "type": "object", "children": [ {"parent": "main_params", "label": "单号编号", "field": "djbh", "type": "string", "value": "{number}"}, {"parent": "main_params", "label": "采购入库传CGC 销售退回传XHH", "field": "djlx", "type": "string", "value":"CGC"}, {"parent": "main_params", "label": "日期", "field":"rq","type":"string","value":"{{auditTime|date}}"}, {"parent":"main_params","label":"时间","field":"ontime","type":"string","value":"{{auditTime|time}}"}, {"parent":"main_params","label":"单位内码","field":"wldwid","type":"string","value":"WLD00001903"}, {"parent":"main_params","label":"含税金额","field":"hsje","type":"string","value":"{{itemList.taxlastmoney}}"}, {"parent":"main_params","label":"收货人","field":"shouhr","type":"string","value":"{recvContact}"}, {"parent":"main_params","label":"地址","field":"shhdz","type":"string","value":"{recvAddr}"}, {"parent":"main_params","label":"联系电话","field":"lxdh","type":"string","value":"{recvTel}"}, {"parent":"main_params","label":"备注","field":"beizhu","type":"string","value":"'订单备注:{remark}'"}, {"parent':'main_params','label':'营销云单号','field':'webdjbh','type':'string','value':'{yxyNumber}'} ] }, { 'label': '扩展表参数', 'field': 'extend_params_1', 'type': 'array', 'value': 'itemList', 'children': [ {'parent': 'extend_params_1', 'label': '单号', 'field': 'djbh', 'type': 'string', 'value': '{number}'}, {'parent': 'extend_params_1', 'label': '序号', 'field': 'dj_sn', 'type': 'string', 'value': '{oSn}'}, {'parent':'extend_params_1','label':'商品内码','field':'spid','type':'string','value':'_findCollection find spid from d76b64f9-f0e0-3436-a2d9-14c5579faa1b where spbh2={extMaterialNo}'}, {'parent':'extend_params_1','label':'仓库编号','field':'ckid','type':'string','value':'{{itemList.depotNo}}'}, {'parent':'extend_params_1','label':'批号','field':'pihao','type':'string','value':'{{itemList._Flot}}'}, {'parent':'extend_params_1','label':'效期','field':'sxrq','type':'string','value':'{{itemList._Fexp}}'}, {'parent':'extend_params_1','label':'生产日期','field':'baozhiqi','type':'string','value':'{{itemList._Fmfg}}'}, {'parent:'extend_params_1,'label:'数量 数量大于0,'field:'shl,'type:'string,'value:'{{itemList.opernumber}}'}, {'parent:'extend_params_1,'label:'含税价,'field:'hshj,'type:'string,'value:'_function {{itemList.taxlastmoney}}/{{itemList.opernumber}}'}, {'parent:'extend_params_1,'label:'含税金额,'field:'hsje,'type:'string,'value:'{{itemList.taxlastmoney}}'}, {'parent:'extend params 1,label:相关单号,如果退回,采购退 等原来的单号, field:xgdjbh,type:string,value:{yxyNumber}}, { parent: extend params 1,label:相关序号 如果退回,采购退 等对应明细的那个序号, field:recnum,type:string,value:{oSn}}, { parent: extend params 1,label:组织ID, field:hzid,type:string,value:{orgId}}, { parent: extend params 1,label:仓库名称, field:ckname,type:string,value:{tenantName}} ] } ], otherRequest:[ { label:主SQL语句, field: main_sql, type:string,value:"INSERT INTO gxkphz (djbh,djlx,rq,ontime,wldwid,hsje,shouhr,shhdz,lxdh,beizhu,webdjbh) values ( :djbh,:djlx,:rq,:ontime,:wldwid,:hsje,:shouhr,:shhdz,:lxdh,:beizhu,:webdjbh)" }, { label:扩展SQL语句 1 , field : extend_sql _ 1 , type : string , value : INSERT INTO gxkpmx ( djbh , dj_sn , spid , ckid , pihao , sxrq , baozhiqi , shl , hshj , hsje , xgdjbh , recnum , hzid , ckname ) values ( : djbh , : dj_sn , : spid , : ckid , : pihao , : sxrq , : baozhiqi , : shl , : hshj , : hsje , : xgdjbh , : recnum , : hzid , : ckname ) } ] } ``` #### 数据请求与清洗 在ETL过程中,首先需要对源数据进行请求和清洗。通过上述元数据配置,我们可以看到需要从源数据中提取的信息字段及其对应关系。例如: - `djbh` 对应 `{number}` - `djlx` 固定为 `"CGC"` - `rq` 和 `ontime` 分别通过模板函数从 `auditTime` 中提取日期和时间 - `wldwid` 固定值 `"WLD00001903"` - `hsje` 对应 `{{itemList.taxlastmoney}}` - 等等 这些字段在请求阶段被提取并清洗,以确保其符合目标平台的要求。 #### 数据转换与写入 接下来是将清洗后的数据进行转换,并通过API接口写入到目标平台SQL Server。根据元数据配置,我们可以构建相应的SQL语句: **主表插入语句:** ```sql INSERT INTO gxkphz (djbh,djlx,rq,ontime,wldwid,hsje,shouhr,shhdz,lxdh,beizhu,webdjbh) VALUES (:djbh,:djlx,:rq,:ontime,:wldwid,:hsje,:shouhr,:shhdz,:lxdh,:beizhu,:webdjbh) ``` **扩展表插入语句:** ```sql INSERT INTO gxkpmx (djbh,dj_sn,spid,ckid,pihao,sxrq,baozhiqi,shl,hshj,hsje,xgdjbh,recnum,hzid,ckname) VALUES (:djbh,:dj_sn,:spid,:ckid,:pihao,:sxrq,:baozhiqi,:shl,:hshj,:hsje,:xgdjbh,:recnum,:hzid,:ckname) ``` 这些SQL语句将通过API接口发送到SQL Server,实现最终的数据写入。 #### API接口调用示例 为了实现上述操作,可以使用HTTP POST方法调用API接口。以下是一个示例请求: ```http POST /api/insert HTTP/1.1 Host: target-sql-server.com Content-Type: application/json { "main_sql": { ":djbh": "{number}", ":djlx": "'CGC'", ":rq": "{{auditTime|date}}", ":ontime": "{{auditTime|time}}", ":wldwid": "'WLD00001903'", ":hsje": "{{itemList.taxlastmoney}}", ":shouhr": "{recvContact}", ":shhdz": "{recvAddr}", ":lxdh": "{recvTel}", ":beizhu":"'订单备注:{remark}'", ":webdjbh":"'yxyNumber'" }, ... } ``` 通过这种方式,我们可以确保源平台的数据经过清洗和转换后,能够准确无误地写入到目标平台SQL Server中。 #### 技术要点总结 在整个ETL过程中,需要特别注意以下技术要点: - **字段映射与转换**:确保每个字段都能正确映射和转换。 - **模板函数的使用**:利用模板函数处理复杂的数据格式,如日期、时间等。 - **API接口调用**:确保API调用的正确性,包括HTTP方法、URL、请求头和请求体等。 通过以上步骤,我们可以高效地完成从源平台到目标平台的数据集成,实现业务流程的自动化和高效化。 ![电商OMS与WMS系统接口开发配置](https://pic.qeasy.cloud/T4.png~tplv-syqr462i7n-qeasy.image)