商城订单数据的ETL过程及其向销帮帮合同接口的转换

  • 轻易云集成顾问-何语琴
### MySQL 数据集成到销帮帮的技术案例解析 在当今高度数字化和竞争激烈的商业环境中,实现不同系统之间的数据集成变得至关重要。本文将聚焦一个具体的系统对接集成案例:通过轻易云数据集成平台,将MySQL中的商城订单数据高效、安全地集成到销帮帮合同接口。本次实际运行方案名称为“3查询商城订单到销帮帮合同接口 (线下:4836006)”。 #### 一、任务概述与需求分析 此次技术案例主要涉及以下几个步骤: 1. 定时从MySQL数据库获取最新的商城订单数据。 2. 处理获取的数据,确保其符合销帮帮API所需的格式。 3. 利用/`pro/v2/api/contract/add` API批量写入数据到销帮帮。 实现此过程需要解决如下关键问题: - **如何调用MySQL接口select**:定时可靠地抓取MySQL中最新订单信息,保证没有漏单情况发生。 - **大量数据快速写入到销帮帮**:支持高吞吐量的数据写入能力,以满足业务实时性的要求。 - **处理分页和限流问题**:确保在大批量数据传输过程中,不因流量过大导致系统性能下降或崩溃。 #### 二、技术实现细节 ##### 1. 数据获取与监控 利用轻易云提供的定时调度功能,我们设置了一个定期触发器,通过执行SELECT SQL语句,从MySQL中提取新增或更新的商城订单。这一步我们配备了作业监控机制,对每次执行进行日志记录,并设置告警机制,一旦出现异常,能够及时发现并处理。 ```sql SELECT * FROM orders WHERE status = 'new' AND created_at >= DATE_SUB(NOW(), INTERVAL 1 HOUR); ``` ##### 2. 数据转换逻辑 为了适应特定业务需求和API规范,我们自定义了一套数据转换规则。例如,将日期格式从`YYYY-MM-DD HH:MM:SS`转化为ISO8601标准,以及字段映射等操作。这不仅保障了兼容性,也提升了数据质量。 ##### 3. 高效写入与错误重试 使用轻易云的平台特性,大规?模?上可以迅速地将整理后的订单数据推送至销助力平台。针对潜在的问题,例如网络不稳定因素引发的数据传输失败,我们设计了错误重试机制,以确保最终一致性。同时,每一次POST请求后都会即时返回结果并进行记录,若有错误会触发二次尝试以及告警通知。 总结一下,此次方案充分发挥了轻易云强大的可视化工具、集中监控及 ![金蝶云星空API接口配置](https://pic.qeasy.cloud/D35.png~tplv-syqr462i7n-qeasy.image) ### 调用源系统MySQL接口select获取并加工数据 在数据集成过程中,调用源系统接口获取数据是至关重要的一步。本文将详细探讨如何通过轻易云数据集成平台调用MySQL接口`select`来查询商城订单,并将其加工处理。 #### 元数据配置解析 元数据配置是实现数据请求与清洗的核心。以下是关键的元数据配置项: ```json { "api": "select", "effect": "QUERY", "method": "POST", "number": "order_code", "id": "order_code", "idCheck": true, "request": [ { "field": "main_params", "label": "main_params", "type": "object", "describe": "111", "value": "1", "children": [ {"field": "order_status", "label": "订单状态", "type": "string", "value":"2"}, {"field": "update_time", "label": "创建时间", "type": "datetime", "value":"_function SUBTIME('{{LAST_SYNC_TIME|datetime}}', '05:01:01')"}, {"field": "limit", "label":"limit", "type":"string", "value":"{PAGINATION_PAGE_SIZE}"}, {"field":"offset", "label":"offset", "type":"string", "value":"{PAGINATION_START_ROW}"}, {"field":"customer_source", "label":"crm表单ID", "type":"string", "value":"2246776"} ] }, { ... } ], ... } ``` #### 数据请求与清洗 在这个阶段,我们需要从MySQL数据库中查询订单信息。以下是具体步骤: 1. **定义API和方法**: - `api`: `select` - `effect`: `QUERY` - `method`: `POST` 2. **设置主键和检查项**: - `number`: `order_code` - `id`: `order_code` - `idCheck`: `true` 3. **构建请求参数**: - 主参数`main_params`包含多个子参数,如订单状态、创建时间、分页限制等。 ```json { ... {"field":"order_status","label":"订单状态","type":"string","value":"2"}, {"field":"update_time","label":"创建时间","type":"datetime","value":"_function SUBTIME('{{LAST_SYNC_TIME|datetime}}', '05:01:01')"}, ... } ``` 4. **SQL查询语句**: - 主查询语句`main_sql`用于获取订单的主要信息。 ```sql SELECT middle_order.order_code, middle_order.user_id, middle_order.whether_to_sign_new, middle_order.quotation_desc3, middle_order.order_amount, middle_order.product_amount, middle_order.other_amount, middle_order.discount, middle_order.account_receivable, middle_order.discount_amount, middle_order.sales_amount, middle_order.client_type, middle_order.order_status, middle_order.audit_time, middle_order.create_time, middle_order.update_time, middle_order.customer_code, middle_order.customer_name, middle_order.customer_id, middle_order.customer_source, middle_order.crm_user_id, middle_order.user_name, middle_order.file_address,middle_order.business_user_id,middle_order.business_user_name FROM middle_order WHERE middle_order.`order_status` = :order_status AND middle_order.crm= 1 AND middle_order.customer_id!='' AND middle_order.client_type= :customer_source AND middle_order.`update_time`>= :update_time order by middle_order.`create_time`asc limit :limit offset :offset ``` 5. **扩展查询语句**: - 扩展语句`extend_sql_1`用于获取订单详情。 ```sql SELECT middle_order_details.order_code,middle_order_details.create_time,middle_order_details.update_time,middle_order_details.product_no,middle_order_details.product_name,middle_order_details.product_number,middle_order_details.product_price,middle_order_details.sales_price,middle_order_details.total_product_price,middle_order_details.total_sales_price FROM middle_order_details where middle_order_details .`order_code`= :order_code ``` #### 数据转换与写入 在获取到原始数据后,需要对其进行转换和写入目标系统。在轻易云平台上,这一步通常包括以下操作: 1. **数据映射**:将源系统的数据字段映射到目标系统的字段。 2. **数据清洗**:根据业务规则对数据进行清洗和转换,例如日期格式转换、数值计算等。 3. **批量写入**:将处理后的数据批量写入目标系统,确保高效和一致性。 通过上述步骤,我们可以高效地从MySQL数据库中获取并处理商城订单数据,为后续的数据集成工作打下坚实基础。 ![打通企业微信数据接口](https://pic.qeasy.cloud/S26.png~tplv-syqr462i7n-qeasy.image) ### 将商城订单数据转换并写入销帮帮合同接口的ETL过程 在数据集成生命周期中,ETL(提取、转换、加载)过程是将源平台的数据转换为目标平台所需格式的关键步骤。本文将详细介绍如何使用轻易云数据集成平台,将商城订单数据转换为销帮帮API接口可接收的格式,并最终写入目标平台。 #### 1. 数据提取与清洗 首先,从源平台提取商城订单数据。这一步通常涉及从数据库或API接口获取原始数据,并进行初步清洗和过滤,确保数据质量和完整性。 #### 2. 数据转换 在数据转换阶段,我们需要将提取到的原始数据映射到销帮帮API接口所需的字段格式。以下是具体的元数据配置及其解析: ```json { "api": "/pro/v2/api/contract/add", "effect": "EXECUTE", "method": "POST", "number": "id", "id": "id", "name": "id", "idCheck": true, "request": [ {"field":"corpid","label":"corpid","type":"string","value":"ding65b814e691560eba35c2f4657eb6378f"}, {"field":"userId","label":"userId","type":"string","value":"{crm_user_id}"}, {"field":"formId","label":"表单ID","type":"string","value":"4836006"}, {"field":"dataList","label":"dataList","type":"object","children":[ {"field":"serialNo","label":"合同编号","type":"string","value":"{order_code}"}, {"field":"text_15","label":"签约状态","type":"string","value":"{whether_to_sign_new}"}, {"field":"text_2","label":"客户名称(CRM客户ID)","type":"string","value":"{customer_id}"}, {"field":"text_8","label":"签订人(crm用户ID)","type":"string","value":"{business_user_id}"}, {"field":"date_1","label":"签订日期","type":"string","value": "_function UNIX_TIMESTAMP( {{create_time|UNIX_TIMESTAMP}} )" }, {"field":"text_6","label":"合同状态","type":"string","value": "签约" }, {"field": "array_4", "label": "关联产品", "type": "array", "children": [ {"field": "text_1", "label": "CRM产品ID", "type": "string", "value": "_mongoQuery f98d7b84-e8e1-3960-9941-487c49c19dd7 findField=content.dataId where={\"content.data.serialNo\":{\"$eq\":\"{{extend_sql_1.product_no}}\"}}" }, {"field": "num_1", "label": 单价, type: string, value: {{extend_sql_1.sales_price}} }, {"field": num_4, label: 折扣(%), type: string, value: 1}, {"field": num_6, label: 售价(元), type: string, value: {{extend_sql_1.sales_price}}}, {"field": num_3, label: 数量, type: string, value: {{extend_sql_1.product_number}}}, {"field": text_9, label: 状态, type: string, value: 1} ] }, { field: array_5, label: 付款时效, type: string, value: {quotation_desc3}, parser: { name: StringToArray, params: , }, mapping: { target: 64350d87ebac3c6e9c314f80, direction: positive } }, { field: text_23, label: 拆扣, type: string, value: _function CASE WHEN ('{discount}' BETWEEN '0' AND '0.5') THEN '5折以下(不含5折)' WHEN ('{discount}' BETWEEN '0.5' AND '0.7') THEN '5折及5折以上' WHEN ('{discount}' BETWEEN '0.7' AND '1') THEN '7折及7折以上' ELSE '7折及7折以上' END }, { field:"num_27", label:"其他费用", type:"string" }, { field:"num_28", label:"优惠金额", type:"string", value:"_function {product_amount}-{sales_amount}" }, { field:"num_1", label:"合同金额", type:"string", value:"{sales_amount}" }, { field:"array_1", label:"应收款", type:"string", value:"{sales_amount}" }, { field:"ownerId", label:"所有用户ID", type:"string", value:"{business_user_id}", parser:{ name:"StringToArray", params: "," }}, { field:"text_24", label:"合同链接", type:"string", value:"{file_address}" }, { field:"coUserId", label:"协同人", type:"string", value:{crm_user_id}, parser:{ name:StringToArray, params:, }}, { field:file_1,label:file_1,type:array,children:[{"field": filename,label:filename,type:string,value:{order_code}.pdf},{"field": attachIndex,label:attachIndex,type:string,value:{file_address}},{"field": ext,label:ext,type:string,value:pdf},{"field:size,label:size,type:string,value:1000}]} ]} ] } ``` ##### 字段解析与映射 - **corpid** 和 **userId** 是固定值和动态值的结合,分别代表企业ID和CRM用户ID。 - **formId** 是表单ID,用于标识特定表单。 - **dataList** 是一个复杂对象,包含多个子字段: - **serialNo** 对应订单编号 `{order_code}`。 - **text_15** 表示签约状态 `{whether_to_sign_new}`。 - **text_2** 和 **text_8** 分别对应客户名称和签订人,即CRM客户ID和用户ID。 - **date_1** 使用 `_function` 函数将创建时间 `create_time` 转换为UNIX时间戳。 - **array_4** 包含关联产品信息,通过 `_mongoQuery` 查询产品详情,并映射价格、数量等字段。 #### 3. 数据写入 最后,将转换后的数据通过POST请求写入销帮帮API接口 `/pro/v2/api/contract/add`。确保请求方法为POST,且携带正确的JSON负载。 ```json { api:/pro/v2/api/contract/add, method:'POST', headers:{ Content-Type:'application/json' }, body:{ corpid:'ding65b814e691560eba35c2f4657eb6378f', userId:'crm_user_id', formId:'4836006', dataList:[ { serialNo:'order_code', text15:'whether_to_sign_new', text2:'customer_id', text8:'business_user_id', date1:_function UNIX_TIMESTAMP( create_time|UNIX_TIMESTAMP ), text6:'签约', array4:[ { text1:_mongoQuery f98d7b84-e8e1-3960-9941-487c49c19dd7 findField=content.dataId where={"content.data.serialNo":{"$eq":{"extend_sql_1.product_no"}}}, num1:sales_price, num4:'1', num6:sales_price, num3:'product_number', text9:'1' } ], array5:{quotation_desc3}, text23:_function CASE WHEN (discount BETWEEN '0' AND '0.5') THEN '5折以下(不含5折)' WHEN (discount BETWEEN '0.5' AND '0.7') THEN '5折及5折以上' WHEN (discount BETWEEN '0.7' AND '1') THEN '7折及7折以上' ELSE '7折及7折以上' END, num27:'', num28:_function product_amount-sales_amount, num1:sales_amount, array1:sales_amount, ownerId:StringToArray(business_user_id), text24:file_address, coUserId:StringToArray(crm_user_id), file1:[ { filename:'order_code.pdf', attachIndex:file_address, ext:'pdf', size:'1000' } ] } ] } } ``` 通过上述步骤,我们成功地完成了从商城订单到销帮帮合同接口的数据集成过程,实现了不同系统间的数据无缝对接。 ![金蝶与SCM系统接口开发配置](https://pic.qeasy.cloud/T5.png~tplv-syqr462i7n-qeasy.image)