ETL流程详解:将CRM数据无缝转移至MySQL

  • 轻易云集成顾问-姚缘
### 销帮帮数据集成到MySQL的系统对接案例分享 在此次技术案例中,我们将展示如何通过轻易云数据集成平台,实现销帮帮CRM客户(外贸)2308022的数据同步至商城中间表,并且确保整个过程中保持高效、可靠和可管理性。该方案不仅能够快速处理大量数据,同时还能实时监控和优化数据质量。 #### API接口的调用与配置 首先,通过调用销帮帮API来获取客户信息:`/pro/v2/api/customer/list`。为了确保不漏单并高效地抓取这些接口数据,我们设置了定时任务,按照一定时间间隔去请求API,这样可以保证客户信息同步的及时性。此外,为了避免分页和限流问题,每次请求都携带上次查询位置标识,并适当增加延迟或批量请求。 #### 数据转换与映射 针对不同的数据结构需求,可以使用轻易云自定义的数据转换逻辑,将获取到的销帮帮原始数据进行格式调整,以适应MySQL数据库中的字段要求。例如,销帮帮返回的JSON对象可能需要被拆解或者合并某些字段,这就需要事先定制好一套映射规则,从而保证写入MySQL时的数据一致性。 #### 高吞吐量写入与性能优化 为了应对大量客户数据快速写入MySQL的问题,我们利用轻易云提供的大吞吐量数据写入能力:通过`execute` API进行批量插入操作。具体实现上,可以采用分片机制,对大体积的数据进行分段处理,然后分别执行多线程并行写入。这种方式能显著提高整体效率,并降低单点瓶颈风险。 #### 监控与告警机制 整个过程中的每个步骤,都有相应的集中化监控和告警系统在运行,一旦出现异常情况,例如网络故障、API返回错误等,通过实时通知相关维护人员介入解决。同时,日志记录功能会详细追踪每一次接口调用与数据库操作,使得后续排查和性能分析更加方便。 以上内容是本次案例初步概要。在实际实施中,还有许多细节需要考虑,包括分页策略、错误重试机制以及如何更好地处理两端之间的数据差异等问题。而这些将在后续章节中逐步展开详细讨论。 ![钉钉与CRM系统接口开发配置](https://pic.qeasy.cloud/D11.png~tplv-syqr462i7n-qeasy.image) ### 调用销帮帮接口/pro/v2/api/customer/list获取并加工数据 在轻易云数据集成平台的生命周期中,调用源系统接口是至关重要的一步。本文将详细探讨如何通过调用销帮帮接口`/pro/v2/api/customer/list`来获取客户数据,并进行初步加工处理。 #### 接口调用配置 首先,我们需要根据提供的元数据配置来设置API请求参数。以下是配置的详细说明: ```json { "api": "/pro/v2/api/customer/list", "effect": "QUERY", "method": "POST", "number": "dataId", "id": "dataId", "name": "dataId", "request": [ {"field":"isPublic","label":"是否公海客户","type":"string","describe":"是否公海客户"}, {"field":"formId","label":"表单id","type":"int","describe":"表单id","value":"2308022"}, {"field":"pageSize","label":"每页数量","type":"int","describe":"每页数量","value":"100"}, {"field":"userId","label":"操作人ID","type":"string","describe":"操作人ID","value":"244012643437539806"}, {"field":"del","label":"客户列表","type":"string","describe":"0:客户列表 1:回收站数据,默认为0"}, {"field":"corpid","label":"公司ID","type":"string","describe":"公司ID","value":"ding65b814e691560eba35c2f4657eb6378f"}, {"field":"page","label":"页码","type":"int","describe":"页码","value":"1"}, { "field": "conditions", "label": "条件集合", "type": "object", "children": [ { "label": "条件1", "type": "object", "children": [ {"field": "attr", "label": "attr", "type": "string", "value": "text_18"}, {"field": "symbol", "label": "symbol", "type": "string", "value": "equal"}, {"field": "value", "label": "值", "type": ["array"],"value":["6"]} ] }, { "field": 1, "label": 'test2', 'type': 'object', 'children': [ {'field': 'attr', 'label': 'attr', 'type': 'string', 'value': 'addTime'}, {'field': 'symbol', 'label': 'symbol', 'type': 'string', 'value': 'greaterequal'}, {'field': 'value', 'label': 'value', 'type': ['array'], 'value':['{LAST_SYNC_TIME}']} ] } ] } ], ... } ``` #### 请求参数解析 - `isPublic`: 是否为公海客户。 - `formId`: 表单ID,固定值为`2308022`。 - `pageSize`: 每页返回的数据量,设定为`100`。 - `userId`: 操作人ID,固定值为`244012643437539806`。 - `del`: 客户列表类型,默认值为`0`表示正常客户列表。 - `corpid`: 公司ID,固定值为`ding65b814e691560eba35c2f4657eb6378f`。 - `page`: 页码,从第1页开始。 - `conditions`: 查询条件集合,包括两个子条件: - 条件1:属性为`text_18`, 符号为`equal`, 值为数组包含元素`6`。 - 条件2:属性为`addTime`, 符号为`greaterequal`, 值为数组包含元素 `{LAST_SYNC_TIME}`。 #### 数据请求与清洗 在发送请求后,我们将获得一个包含客户信息的JSON响应。此时需要对数据进行清洗和初步加工,以确保其符合目标系统的要求。常见的数据清洗步骤包括: 1. **字段映射**:将源系统字段映射到目标系统字段。例如,将销帮帮的客户ID映射到商城中间表的相应字段。 2. **数据过滤**:根据业务需求过滤掉不必要的数据。例如,只保留状态正常的客户记录。 3. **格式转换**:将日期、时间等字段转换成目标系统所需的格式。 #### 示例代码 以下是一个示例代码片段,用于调用API并处理响应数据: ```python import requests import json # 设置请求URL和头部信息 url = "/pro/v2/api/customer/list" headers = { 'Content-Type': 'application/json' } # 构建请求体 payload = { ... } # 发起POST请求 response = requests.post(url, headers=headers, data=json.dumps(payload)) # 检查响应状态码 if response.status_code == 200: data = response.json() # 数据清洗与加工 processed_data = [] for item in data['results']: processed_item = { # 映射字段 'customer_id': item['dataId'], ... } processed_data.append(processed_item) # 后续处理逻辑,例如写入数据库或传输到下游系统 else: print(f"Error: {response.status_code}") ``` 通过上述步骤,我们可以高效地从销帮帮获取并加工客户数据,为后续的数据转换与写入做好准备。这一过程不仅提高了数据处理效率,还确保了数据的一致性和准确性。 ![如何对接金蝶云星空API接口](https://pic.qeasy.cloud/S10.png~tplv-syqr462i7n-qeasy.image) ### 使用轻易云数据集成平台进行ETL转换并写入MySQL API接口 在轻易云数据集成平台的生命周期中,第二步是将已经集成的源平台数据进行ETL(Extract, Transform, Load)转换,并转为目标平台 MySQL API 接口所能够接收的格式,最终写入目标平台。以下是详细的技术实现过程。 #### 元数据配置解析 元数据配置是ETL过程中的核心部分,它定义了如何将源数据映射到目标数据库表中。以下是一个具体的元数据配置示例: ```json { "api": "execute", "effect": "EXECUTE", "method": "SQL", "number": "id", "id": "id", "name": "id", "idCheck": true, "request": [ { "field": "main_params", "label": "主参数", "type": "object", "describe": "对应主语句内的动态参数", "children": [ {"field":"customer_code","label":"客户编码","type":"string","describe":"店铺名称","value":"{serialNo}"}, {"field":"customer_name","label":"姓名","type":"string","describe":"部门","value":"{text_1}"}, {"field":"customer_contact","label":"客户联系人","type":"string","value":"{text_31}"}, {"field":"customer_tel","label":"电话","type":"string","value":"{{subForm_1.text_2}}"}, {"field":"principal","label":"负责人","type":"string","value":"{username}"}, {"field":"client_type","label":"客户表单ID","type":"string","value":"{formId}"}, {"field":"customer_id","label":"客户ID","type":"string","value":"{dataId}"}, {"field":"customer_source","label":"客户来源","type":"string","value":"外贸"}, {"field":"remark","label":"remark","type":"string","value":"{text_26}"}, {"field": "create_time", "label": "创建时间", "type": "datetime", "value": "_function FROM_UNIXTIME( {addTime} , '%Y-%m-%d %H:%i:%s' )"}, {"field": "update_time", "label": "更新时间", "type": "datetime", "value": "_function FROM_UNIXTIME( {updateTime} , '%Y-%m-%d %H:%i:%s' )"} ] } ], ... } ``` #### 数据请求与清洗 在这个阶段,我们从CRM系统中提取数据并进行初步清洗。根据元数据配置中的`request`部分,我们可以看到需要提取的数据字段和它们在目标数据库中的映射关系。例如,`customer_code`字段对应CRM系统中的`serialNo`字段,`customer_name`字段对应`text_1`字段。 #### 数据转换与写入 接下来,我们需要将清洗后的数据进行转换,并按照目标数据库的要求格式化。元数据配置中的`main_sql`定义了SQL插入语句: ```sql INSERT INTO `middle_client_file` (`customer_code`, `customer_name`, `customer_contact`, `customer_tel`, `principal`, `client_type`, `customer_id`, `customer_source`, `remark`, `create_time`, `update_time`) VALUES (:customer_code, :customer_name, :customer_contact, :customer_tel, :principal, :client_type, :customer_id, :customer_source, :remark, :create_time, :update_time) ON DUPLICATE KEY UPDATE `customer_name` = VALUES(`customer_name`), `customer_contact` = VALUES(`customer_contact`), `customer_tel` = VALUES(`customer_tel`), `principal` = VALUES(`principal`), `client_type` = VALUES(`client_type`), `customer_id` = VALUES(`customer_id`), `customer_source` = VALUES(`customer_source`), `remark` = VALUES(`remark`) ``` 这段SQL语句实现了两件事: 1. 插入新记录。 2. 如果记录已经存在(基于主键冲突),则更新已有记录。 其中,各个占位符(如`:customer_code`, `:create_time`)会被相应的动态参数值替换。这些动态参数值在元数据配置中定义,例如: ```json {"field": "create_time", "label": "创建时间", "type": "datetime", "value": "_function FROM_UNIXTIME( {addTime} , '%Y-%m-%d %H:%i:%s' )"} ``` 这里使用了一个函数 `_function FROM_UNIXTIME( {addTime} , '%Y-%m-%d %H:%i:%s' )`) 将Unix时间戳转换为MySQL datetime格式。 #### 实际操作步骤 1. **提取数据**:通过API或数据库查询从CRM系统中提取所需的数据。 2. **清洗和转换**:根据元数据配置,对提取的数据进行清洗和格式转换。 3. **生成SQL语句**:使用模板生成实际执行的SQL插入/更新语句。 4. **执行SQL**:通过MySQL API接口执行生成的SQL语句,将处理后的数据写入目标数据库。 #### 示例代码片段 以下是一个简化的Python代码示例,展示如何使用上述配置和步骤完成ETL过程: ```python import pymysql import time # 数据库连接 connection = pymysql.connect( host='your_host', user='your_user', password='your_password', database='your_database' ) def from_unixtime(timestamp): return time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(timestamp)) # 模拟从CRM系统提取的数据 crm_data = { 'serialNo': 'C12345', 'text_1': '张三', 'text_31': '联系人A', 'subForm_1.text_2': '1234567890', 'username': '李四', 'formId': 'F001', 'dataId': 'D001', 'text_26': '备注信息', 'addTime': 1633024800, 'updateTime': 1633101200 } # 清洗和转换数据 params = { 'customer_code': crm_data['serialNo'], 'customer_name': crm_data['text_1'], 'customer_contact': crm_data['text_31'], 'customer_tel': crm_data['subForm_1.text_2'], 'principal': crm_data['username'], 'client_type': crm_data['formId'], 'customer_id': crm_data['dataId'], 'remark': crm_data['text_26'], 'create_time': from_unixtime(crm_data['addTime']), 'update_time': from_unixtime(crm_data['updateTime']) } # SQL语句模板 sql_template = """ INSERT INTO middle_client_file ( customer_code, customer_name, customer_contact, customer_tel, principal, client_type, customer_id, customer_source, remark, create_time, update_time) VALUES (%( customer_code)s,%( customer_name)s,%( customer_contact)s,%( customer_tel)s, %( principal)s,%( client_type)s,%( customer_id)s,'外贸',%( remark)s,%( create_time)s,%( update_time)s) ON DUPLICATE KEY UPDATE customer_name=VALUES(customer_name), customer_contact=VALUES(customer_contact), customer_tel=VALUES(customer_tel), principal=VALUES(principal), client_type=VALUES(client_type), customer_id=VALUES(customer_id), remark=VALUES(remark), create_time=VALUES(create_time), update_time=VALUES(update_time) """ try: with connection.cursor() as cursor: cursor.execute(sql_template, params) connection.commit() finally: connection.close() ``` 通过上述步骤和代码示例,可以实现将CRM系统中的客户信息同步到商城中间表,并确保在重复插入时更新已有记录。这种方法不仅提高了数据处理效率,还保证了数据的一致性和完整性。 ![金蝶与SCM系统接口开发配置](https://pic.qeasy.cloud/T9.png~tplv-syqr462i7n-qeasy.image)