使用ETL技术将平台数据写入MySQL

  • 轻易云集成顾问-曹润
### 班牛数据集成到MySQL:BDS对账班牛售后理赔单 在企业级应用系统中,实现高效的数据集成是极为重要的任务,尤其是在处理跨平台的数据交互时。本案例聚焦于将班牛系统中的售后理赔单数据高效、稳定地集成到MySQL数据库中,方案命名为“BDS对账班牛售后理赔单”。 首先,我们通过调用班牛提供的`task.list`接口定期抓取待处理的售后理赔单数据。为了确保大规模数据能够被快速、高效地集成,我们利用了轻易云平台支持的大吞吐量写入功能,将这些数据批量导入至MySQL数据库。此外,通过自定义的数据转换逻辑,使得源端和目标端的数据结构差异问题得到解决。 在具体实现过程中,需要特别注意以下技术要点: 1. **班牛API分页与限流管理**: 为了获取全部待处理的理赔单,需要妥善处理来自班牛API的分页响应,并考虑限流策略,以防止因请求过多导致服务不可用。 2. **MySQL API高效写入**: 使用`executeReturn` API进行批量操作,保障大量数据能够迅速写入,同时需设置可靠性机制以防丢失任何一个关键记录。 3. **实时监控及异常处理**: 采用集中监控和告警系统,全程跟踪每个步骤的数据状态,如果出现异常情况,则触发错误重试机制,对失败任务重新执行,以保证所有数据信息准确无误地完成迁移。 4. **数据质量控制与日志记录**: 集成过程中,对输入输出的数据进行一致性校验,确保录入到MySQL中的每条记录都是完整、准确且符合预期。此外,通过详细的日志记录功能,可以随时回溯任意一笔交易,有助于问题诊断和性能优化。 接下来讲述的是该方案实施以来遇见的问题以及相应的解决办法,从而进一步增强整体系统集成效率和可靠性。 ![数据集成平台可视化配置API接口](https://pic.qeasy.cloud/D24.png~tplv-syqr462i7n-qeasy.image) ### 调用源系统班牛接口task.list获取并加工数据 在数据集成生命周期的第一步中,调用源系统接口获取数据是至关重要的环节。本文将深入探讨如何通过轻易云数据集成平台调用班牛接口`task.list`,并对获取的数据进行初步加工。 #### 接口调用配置 在轻易云数据集成平台中,我们使用元数据配置来定义接口调用的参数和行为。以下是针对班牛接口`task.list`的具体配置: ```json { "api": "task.list", "effect": "QUERY", "method": "GET", "number": "-1", "id": "-1", "idCheck": true, "request": [ {"field": "project_id", "label": "群组ID", "type": "string", "value": "25821"}, {"field": "page_size", "label": "page_size", "type": "string", "value": "50"}, {"field": "page_num", "label": "page_num", "type": "string", "value": "1"}, {"field": "star_created", "label": "起始时间", "type": "string"}, {"field": "end_created", "label": "结束时间", "type":"string"}, {"field":"star_modified","label":"修改时间起始时间","type":"string","value":"_function DATE_FORMAT(DATE_ADD(NOW(),INTERVAL - 24 HOUR),'%Y-%m-%d %H:%i:%s')"}, {"field":"end_modified","label":"修改时间结束时间","type":"string","value":"{{CURRENT_TIME|datetime}}"} ], "autoFillResponse": true, ... } ``` #### 参数解析与设置 1. **API与方法**:我们使用的是`task.list` API,采用HTTP GET方法进行请求。 2. **请求参数**: - `project_id`: 固定值为"25821",用于指定群组ID。 - `page_size`: 每页返回记录数,设置为"50"。 - `page_num`: 当前页码,初始值为"1"。 - `star_created`和`end_created`: 用于指定查询的起止创建时间,可以根据需求动态赋值。 - `star_modified`: 修改时间起始时间,使用函数计算当前时间减去24小时。 - `end_modified`: 修改时间结束时间,使用当前系统时间。 这些参数确保了我们能够准确地从班牛系统中获取所需的数据。 #### 数据请求与清洗 在实际操作中,我们需要处理分页数据。由于每次请求只能返回固定数量的数据(如上例中的50条),因此需要循环调用API以获取所有符合条件的数据。 ```python import requests import json def fetch_data(): url = 'https://api.banniu.com/task.list' headers = {'Content-Type': 'application/json'} params = { 'project_id': '25821', 'page_size': '50', 'page_num': '1', 'star_modified': (datetime.now() - timedelta(hours=24)).strftime('%Y-%m-%d %H:%M:%S'), 'end_modified': datetime.now().strftime('%Y-%m-%d %H:%M:%S') } all_data = [] while True: response = requests.get(url, headers=headers, params=params) data = response.json() if not data['tasks']: break all_data.extend(data['tasks']) params['page_num'] = str(int(params['page_num']) + 1) return all_data ``` 上述代码展示了如何通过循环分页获取所有符合条件的数据,并将其存储在一个列表中。 #### 数据转换与写入 在获取到原始数据后,我们通常需要对其进行清洗和转换,以便后续处理。例如,将日期格式统一、去除无效字段等。以下是一个简单的示例: ```python def clean_data(raw_data): cleaned_data = [] for item in raw_data: cleaned_item = { 'task_id': item['id'], 'project_id': item['project_id'], 'created_at': item['created_at'], 'modified_at': item['modified_at'] # 添加更多需要保留或转换的字段 } cleaned_data.append(cleaned_item) return cleaned_data ``` 通过上述步骤,我们可以将原始数据转换为更适合业务需求的格式,并准备好进行下一步的数据处理或写入目标系统。 #### 小结 本文详细介绍了如何通过轻易云数据集成平台调用班牛接口`task.list`并对获取的数据进行初步加工。通过合理配置元数据、循环分页获取数据以及清洗和转换数据,我们能够高效地完成数据集成生命周期中的第一步,为后续的数据处理打下坚实基础。 ![如何对接用友BIP接口](https://pic.qeasy.cloud/S21.png~tplv-syqr462i7n-qeasy.image) ### 数据集成:将源平台数据ETL转换并写入MySQL API接口 在轻易云数据集成平台的生命周期中,数据转换与写入是关键步骤之一。本文将详细探讨如何将已经集成的源平台数据进行ETL(提取、转换、加载)处理,并最终通过MySQL API接口写入目标平台。 #### 数据提取与清洗 首先,我们需要从源系统中提取所需的数据。这个过程包括从不同的数据源获取原始数据,并对其进行初步清洗和规范化处理,以确保数据的一致性和准确性。例如,使用MongoDB查询语句从数据库中提取特定字段的值: ```json { "field": "org_name", "label": "组织名称", "type": "string", "value": "_mongoQuery e8890e68-7f56-33d9-ae79-492a7c9cbead findField=content.options_title where={\"content.options_id\":{\"$eq\":\"{{26387}}\"}}" } ``` 上述配置表示通过MongoDB查询获取“组织名称”字段的值,其中`content.options_id`等于`{{26387}}`。 #### 数据转换 在完成数据提取后,下一步是将这些数据转换为目标平台所能接受的格式。这里我们主要使用JSON格式来定义API请求参数,并进行必要的字段映射和类型转换。例如: ```json { "api": "executeReturn", "method": "POST", "idCheck": true, "request": [ { "field": "main_params", "label": "主参数", "type": "object", "children": [ {"field": "bill_no", "label": "单据编号", "type": "string", "value": "{{-1}}"}, {"field": "trade_no", "label": "系统订单编号", "type": "string", "value": "{{80981}}"}, {"field": "online_trade_no", "label": "网店订单号", "type": "string", "value": "{{26390}}"}, {"field": "source_bill_no", ... } ] } ] } ``` 在这个配置中,我们定义了一个名为`main_params`的对象,其中包含了多个子字段,如`bill_no`、`trade_no`等。这些字段将被映射到目标平台的相应字段中。 #### 数据加载 最后一步是将转换后的数据通过API接口写入目标平台。在这里,我们使用MySQL API接口,通过HTTP POST请求将数据发送到目标数据库。例如: ```json { ... { field: 'main_sql', label: '主语句', type: 'string', value: `INSERT INTO \`lhhy_srm\`.\`supplier_return_change\` (\`bill_no\`, \`trade_no\`, \`online_trade_no\`, ...) VALUES (<{bill_no: }>, <{trade_no: }>, <{online_trade_no: }>, ...)` }, ... } ``` 上述SQL语句用于插入主表记录,其中各个字段值通过占位符进行动态替换。类似地,我们还可以定义扩展表的插入语句: ```json { field: 'extend_sql_1', label: '1:1扩展语句', type: 'string', value: `INSERT INTO \`lhhy_srm\`.\`supplier_return_change_detail\` (\`order_id\`, \`supplier_code\`, \`supplier_name\`, ...) VALUES (<{lastInsertId: }>, <{supplier_code: }>, <{supplier_name: }>, ...)` } ``` 这个配置表示在主表插入成功后,将相关联的数据插入到扩展表中。 #### 实际案例应用 在实际应用中,我们可能会遇到需要处理复杂的数据结构和多层嵌套关系。例如,需要同时处理多个扩展参数和附件信息: ```json { field: 'extend_params_2', label: '1:1扩展参数', type: 'object', children: [ {"field":"lastInsertId","label":"order_id","type":"string","value":"lastInsertId"}, {"field":"file_name","label":"附件名称","type":"string","value":"_function CEIL(RAND()*10000000000)"}, {"field":"url","label":"附件链接","type":"string","value":"{{26401}}"} ] } ``` 这种情况下,我们需要确保每个子字段都能正确映射并写入到相应的数据库表中。 综上所述,通过合理配置元数据并利用轻易云数据集成平台强大的ETL功能,可以高效地实现不同系统间的数据无缝对接,确保数据在整个生命周期中的一致性和准确性。 ![企业微信与ERP系统接口开发配置](https://pic.qeasy.cloud/T29.png~tplv-syqr462i7n-qeasy.image)