轻易云与MySQLAPI接口的数据ETL解决方案

  • 轻易云集成顾问-吕修远
### 测试-查询销售渠道信息(已删除数据)-dange 集成案例解析 在此次技术案例中,我们将详细解析如何通过轻易云数据集成平台,将吉客云的数据高效、可靠地集成到MySQL数据库中。本次方案旨在实现对吉客云销售渠道信息,包括已删除数据的定时抓取,并批量写入到MySQL中,方案名称为“测试-查询销售渠道信息(已删除数据)-dange”。 #### 注意事项与挑战 对于此次集成任务,我们主要面临以下几个技术挑战: 1. **大量数据快速写入**:确保从吉客云获取的大量销售渠道信息能够高吞吐量地写入到MySQL,实现数据处理高效性。 2. **实时监控和异常处理**:全面跟踪每一个数据集成任务的状态与性能,同时提供有效的错误重试机制,以应对网络波动或接口调用失败等情况。 3. **分页与限流问题**:由于API接口返回的数据量较大,需要合理设计分页逻辑,并解决限流带来的潜在影响。 #### 方案概要 本次实施集中使用了如下特性: - 使用`erp.sales.get` API 从吉客云定时获取销售信息,并包含已删除的数据记录,保障不漏单。 - 利用轻易云提供的可视化流程设计工具,实现自定义的数据转换逻辑,使得不同格式之间的数据顺利映射至目标表结构。 - 借助中心化监控告警系统,对整个集成过程进行实时监控,及时发现并处理异常情况,通过日志记录功能追踪每一步操作。 接下来我们将深入剖析此集成案例,从具体API调用、分页策略、到实际写入细节,以及如何使用轻易云平台配置过程中的关键步骤。 ![打通金蝶云星空数据接口](https://pic.qeasy.cloud/D15.png~tplv-syqr462i7n-qeasy.image) ### 调用吉客云接口erp.sales.get获取并加工数据 在数据集成的生命周期中,第一步是调用源系统接口以获取原始数据。在本案例中,我们将重点讨论如何通过轻易云数据集成平台调用吉客云的`erp.sales.get`接口,并对获取的数据进行初步加工。 #### 元数据配置解析 首先,我们需要理解元数据配置中的各个字段及其作用。以下是对元数据配置的详细解析: ```json { "api": "erp.sales.get", "effect": "QUERY", "method": "POST", "number": "channelCode", "id": "channelId", "request": [ {"field": "pageIndex", "label": "页码(默认0)", "type": "int"}, {"field": "pageSize", "label": "每页页数(默认50)", "type": "int", "value": "50"}, {"field": "code", "label": "编号", "type": "string"}, {"field": "name", "label": "名称", "type": "string"}, {"field": "gmtModifiedStart", "label": "起始修改时间", "type": "string", "value":"{{LAST_SYNC_TIME|datetime}}"}, {"field": "gmtModifiedEnd", "label":"结束修改时间","type":"string","value":"{{CURRENT_TIME|datetime}}"}, {"field":"isBlockup","label":"是否停用,1-是,0-否","type":"int","value":"1"}, {"field":"isDelete","label":"是否删除,1-是,0-否","type":"int"} ], “autoFillResponse”: true } ``` #### 请求参数详解 1. **api**: `erp.sales.get`,表示我们要调用的吉客云API接口。 2. **effect**: `QUERY`,表示此操作为查询操作。 3. **method**: `POST`,表示使用HTTP POST方法进行请求。 4. **number**: `channelCode`,用于标识销售渠道的编号字段。 5. **id**: `channelId`,用于标识销售渠道的唯一ID字段。 请求参数列表: - **pageIndex**:页码,默认为0。 - **pageSize**:每页记录数,默认为50。 - **code**:销售渠道编号,可选参数。 - **name**:销售渠道名称,可选参数。 - **gmtModifiedStart**:起始修改时间,使用模板变量`{{LAST_SYNC_TIME|datetime}}`自动填充上次同步时间。 - **gmtModifiedEnd**:结束修改时间,使用模板变量`{{CURRENT_TIME|datetime}}`自动填充当前时间。 - **isBlockup**:是否停用,固定值为1(是)。 - **isDelete**:是否删除,可选参数。 #### 实际调用与数据处理 在实际操作中,我们需要按照上述配置构建请求,并发送给吉客云API。以下是一个示例请求体: ```json { “pageIndex”: 0, “pageSize”: 50, “gmtModifiedStart”: “2023-09-01T00:00:00Z”, “gmtModifiedEnd”: “2023-09-30T23:59:59Z”, “isBlockup”: 1 } ``` 通过轻易云平台,我们可以自动生成并发送上述请求。响应结果会包含符合条件的销售渠道信息。由于设置了`autoFillResponse=true`,平台会自动处理响应并填充到相应的数据结构中。 #### 数据清洗与转换 获取到原始数据后,需要对其进行清洗和转换,以便后续处理。常见的数据清洗步骤包括: 1. **去除无效记录**:过滤掉已删除或不符合业务规则的记录。例如,根据字段`isDelete=1`来过滤已删除的数据。 2. **字段映射与转换**:将源系统中的字段映射到目标系统所需的字段。例如,将吉客云中的`channelCode`映射为目标系统中的`salesChannelCode`。 示例代码如下: ```python def clean_and_transform(data): cleaned_data = [] for record in data: if record['isDelete'] == 0: transformed_record = { 'salesChannelCode': record['channelCode'], 'salesChannelId': record['channelId'], 'name': record['name'], 'modifiedTime': record['gmtModified'] } cleaned_data.append(transformed_record) return cleaned_data ``` 通过上述步骤,我们完成了从调用源系统API获取数据,到初步清洗和转换的全过程。这为后续的数据写入和进一步处理奠定了基础。 ![轻易云数据集成平台金蝶集成接口配置](https://pic.qeasy.cloud/S10.png~tplv-syqr462i7n-qeasy.image) ### 使用轻易云数据集成平台进行ETL转换并写入MySQL API接口 在数据集成的生命周期中,ETL(Extract, Transform, Load)转换是关键的一步。本文将深入探讨如何利用轻易云数据集成平台,将已经集成的源平台数据进行ETL转换,转为目标平台 MySQL API 接口所能够接收的格式,并最终写入目标平台。 #### 数据请求与清洗 在数据请求与清洗阶段,我们从源系统获取原始数据,并对其进行初步处理,以确保数据的准确性和一致性。这一步通常包括去重、填补缺失值、标准化字段等操作。假设我们已经完成了这一阶段,接下来将重点放在数据转换与写入阶段。 #### 数据转换与写入 为了将清洗后的数据转为 MySQL API 接口所能接收的格式,我们需要配置相应的元数据。以下是一个详细的元数据配置示例: ```json { "api": "execute", "effect": "EXECUTE", "method": "POST", "idCheck": true, "request": [ { "field": "main_params", "label": "主参数", "type": "object", "describe": "111", "value": "1", "children": [ {"field": "id", "label": "销售渠道id", "type": "int", "value": "{channelId}"}, {"field": "channel_code", "label": "渠道编码", "type": "string", "value": "{channelCode}"}, {"field": "channel_name", "label": "销售渠道名称", "type": "string", "value": "{channelName}"}, {"field": "channel_type", "label": "渠道类型", "type": "string", "describe": "渠道类型(0:分销办公室; 1:直营网店;2:直营门店;3:销售办公室;4:货主虚拟店;5:分销虚拟店;6:加盟门店;7:内部交易渠道)", "value": "{channelType}" }, {"field":"plat_code","label":"店铺平台编码","type":"string","value":"{onlinePlatTypeCode}"}, {"field":"plat_name","label":"店铺平台名称","type":"string","value":"{onlinePlatTypeName}"}, {"field":"depart_name","label":"负责部门名称","type":"string","value":"{channelDepartName}"}, {"field":"link_man","label":"联系人","type":"string","value":"{linkMan}"}, {"field":"link_tel","label":"联系电话","type":"string","value":"{linkTel}"}, {"field":"office_address","label":"办公地址","type":"string","value":"{officeAddress}"}, {"field":"company_name","label":"公司名称","type":"string","value":"{companyName}"}, {"field":"country_name","label":"国家","type":"string","value":"{countryName}"}, {"field":"province_name","label":"省","type":"string","value":"{provinceName}"}, {"field": "city_name", "label": "市", "type": "string", "value": "{cityName}" }, {"field": "town_name", "label": "镇,区", "type": "string", "value": "{townName}" }, {"field": "street_name", "label": "街道", "type": "string", "value": "{streetName}" }, {"field": "memo", "label": "备注", "type": "string", "value": "{memo}" }, {"field": "warehouse_code", "label": "默认仓库编码", type: string, value: {warehouseCode} }, { field: warehouse_name, label: 默认仓库名称, type: string, value: {warehouseName} }, { field: charge_type, label: 结算方式, type: string, describe: 结算方式(1:担保交易;2:银行收款;3:现金收款;4:货到付款;5:欠款计应收;6:客户预存款;7:多种结算;8:退换货冲抵;9:电子钱包) value: {chargeType} }, { field: depart_code, label: 部门编码, type: string, value: {departCode} }, { field: company_code, label: 公司编码, type: string, value: {departCode} }, { field: is_blockup, label: 是否停用, type: int, describe: 1-是,0-否,value:1,default:"1" }, { field:is_delete,label:"是否删除",type:"int",describe:"1-是,0-否" }, { field:create_by,label:"创建人",type:"string",value:"系统自动",default:"系统自动" }, { field:update_by,label:"修改人",type:"string",value:"系统自动",default:"系统自动" } ] } ], otherRequest:[ { field: main_sql,label: 主语句,type:string,describe:111,value:`REPLACE INTO lehua.sc_sale_channel( id, channel_code, channel_name, channel_type, plat_code, plat_name, depart_name, link_man, link_tel, office_address, company_name, country_name, province_Name city_name town_name street_name memo warehouse_code warehouse_name charge_type depart_code company_code is_blockup is_delete create_by update_by) VALUES( <{id}>, <{channel_code}>, <{channel_name}>, <{channel_type}>, <{plat_code}>, <{plat_name}>, <{depart_name}>, <{link_man}>, <{link_tel}>, <{office_address}>, <{company_name}>, <{country_name}> <{province_Name}> <{city_Name}> <{town_Name}> <{street_Name}> <{memo}> <{warehouse_Code}> <{warehouse_Name}> <charge_Type> depart_Code> company_Code> is_blockup> is_delete> create_by> update_by>); ` } ] } ``` #### 配置解析 - **API配置**: - `api`: 指定执行操作。 - `effect`: 设置为`EXECUTE`表示执行操作。 - `method`: HTTP方法,设置为`POST`。 - **主参数配置**: - `main_params`: 包含所有需要传递的数据字段,每个字段都有详细的描述和类型定义。例如,`id`表示销售渠道ID,类型为整数。 - **SQL语句**: - `main_sql`: 包含实际执行的SQL语句,通过占位符`<{...}>`来插入实际的数据值。 #### 数据写入 通过上述配置,我们可以将清洗后的数据转换为MySQL数据库所需的格式,并使用HTTP POST方法将其写入目标数据库。每个字段都经过了严格定义和映射,确保了数据的一致性和完整性。 这种方式不仅提高了数据处理效率,还保证了每个环节的透明度和可追溯性。通过实时监控和日志记录,可以随时检查和调整数据流动状态,确保业务流程顺畅无误。 ![钉钉与CRM系统接口开发配置](https://pic.qeasy.cloud/T8.png~tplv-syqr462i7n-qeasy.image)