从吉客云到BI拉伯塔MySQL的ETL过程剖析

  • 轻易云集成顾问-陈洁琳
### 吉客云数据集成到MySQL的技术案例分享 在本次技术案例中,我们将重点探讨如何使用轻易云数据集成平台,将吉客云的其他入库单查询结果高效、可靠地集成到MySQL数据库。集成方案名称为“吉客云-其他入库单查询-->BI拉伯塔-其他入库单表”。该方案旨在通过API接口`erp.storage.goodsdocin.v2`获取吉客云的数据,并利用MySQL API `batchexecute`进行批量写入,实现大量数据的快速存储和处理。 为了确保整个数据对接过程顺利进行,本方案特别关注以下几个关键点: 1. **定时抓取与分页处理**:通过设置定时任务,定期调用吉客云接口以抓取最新的其他入库单数据。同时,为了应对可能存在的大量记录,我们进行了分页和限流操作,以确保每次请求的数据规模可控,并避免超出系统负荷。 2. **自定义转换逻辑与格式差异解决**:针对吉客云返回的数据结构,与目标MySQL数据库字段不完全匹配的问题,通过自定义转换逻辑,对原始数据进行清洗和映射,保证写入过程中的一致性和完整性。 3. **高吞吐量写入能力**:轻易云平台提供了强大的批量处理功能,使得我们能有效快速地将大量记录一次性写入到MySQL。这对于提升系统性能和响应速度至关重要,同时也减少系统资源消耗。 4. **实时监控与异常处理机制**:采用集中化监控工具来实时跟踪每个集成任务的执行状态,一旦检测到错误或异常情况,触发告警并启动自动重试机制,最大限度地保障数据传输不中断、不漏单。 5. **日志记录与审计**:实现全流程日志记录,从API调用开始,到最终写入完成,每一步都有详细的日志信息供查阅。这不仅有助于问题排查,也为后续优化提供了宝贵的数据支持。 以上是此次技术案例开头部分主要内容。在接下来的讨论中,将逐步展开具体实施步骤及代码示例,包括如何正确配置API调用、实现分页抓取策略、自定义转化函数,以及优化批量插入性能的方法。 ![钉钉与WMS系统接口开发配置](https://pic.qeasy.cloud/D8.png~tplv-syqr462i7n-qeasy.image) ### 调用吉客云接口erp.storage.goodsdocin.v2获取并加工数据的技术案例 在数据集成过程中,调用源系统接口是至关重要的一步。本文将详细探讨如何通过轻易云数据集成平台调用吉客云接口`erp.storage.goodsdocin.v2`来获取并加工数据。 #### 接口配置与请求参数 首先,我们需要配置接口的元数据。根据提供的元数据配置,我们可以看到该接口采用POST方法,主要参数包括分页信息、入库单号、创建时间、更新时间、入库类型等。以下是具体的请求参数配置: ```json { "api": "erp.storage.goodsdocin.v2", "method": "POST", "number": "goodsdocNo", "id": "recId", "pagination": { "pageSize": 10 }, "request": [ {"field": "pageIndex", "label": "分页页码", "type": "int"}, {"field": "pageSize", "label": "分页页数", "type": "int", "value": "100"}, {"field": "goodsDocNo", "label": "入库单号", "type": "string"}, {"field": "startDate", "label": "创建时间的起始时间", "type": "string"}, {"field": "endDate", "label": "创建时间的结束时间", "type": "string"}, {"field": "gmtModifiedStart", "label": "主表更新时间起始", "type":"string", "value":"_function from_unixtime(({LAST_SYNC_TIME}-86400),'%Y-%m-%d %H:%i:%s')" }, {"field":"gmtModifiedEnd", "label":"主表更新时间截至", "type":"string", "value":"_function from_unixtime(({CURRENT_TIME}),'%Y-%m-%d %H:%i:%s')" }, {"field":"inouttype","label":"入库类型","type":"string","value":"104"}, {"field":"sourceBillNo","label":"原始单号","type":"string"}, {"field":"warehouseCode","label":"仓库编号","type":"string"}, {"field":"outBillNo","label":"外部单号","type":"string"}, {"field":"vendCode","label":"供应商编号(往来单位)","type":"string"}, {"field":"billNo","label":"上游单据号(关联单号)","type":"string"}, {"field":"userName","label":"创建人名称","type":"string"}, { "field" : "selelctFields", "label" : "需要返回的字段", "type" : "string", "value" : "recId,goodsdocNo,billNo,inOutDate,gmtCreate,inouttype,inouttypeName,vendCustomerCode,vendCustomerName,currencyCode,currencyRate,userName,warehouseCode,warehouseName,comment,memo,logisticName,logisticNo,companyId,companyName,logisticType,logisticCode,inOutReason,sourceBillNo,channelId,channelCode,channelName,redStatus,field1,field2,field3,field4,field5..." } ], ... } ``` #### 数据请求与清洗 在实际操作中,我们会先构建一个HTTP POST请求,包含上述所有必要参数。为了保证数据的一致性和准确性,我们需要特别注意以下几点: 1. **分页处理**:由于数据量可能较大,需要进行分页处理。每次请求时设置`pageIndex`和`pageSize`,确保能够逐页获取所有数据。 2. **时间过滤**:通过`gmtModifiedStart`和`gmtModifiedEnd`字段,可以实现对更新时间范围内的数据进行过滤。这两个字段使用了函数转换,将时间戳转换为标准时间格式。 3. **入库类型**:固定值为104,即“其他入库”,确保只获取特定类型的入库单。 示例代码如下: ```python import requests import json url = 'https://api.jikecloud.com/erp.storage.goodsdocin.v2' headers = {'Content-Type': 'application/json'} payload = { 'pageIndex': 1, 'pageSize': 100, 'gmtModifiedStart': '2023-01-01 00:00:00', 'gmtModifiedEnd': '2023-01-31 23:59:59', 'inouttype': '104', } response = requests.post(url, headers=headers, data=json.dumps(payload)) data = response.json() ``` #### 数据转换与写入 获取到原始数据后,需要对其进行清洗和转换,以便写入目标系统。在这个过程中,可以利用轻易云平台提供的数据处理工具,对字段进行映射、格式转换等操作。 例如,将返回的数据中的日期字段从字符串格式转换为日期对象,或者将某些字段值进行标准化处理: ```python from datetime import datetime def clean_data(raw_data): for record in raw_data: record['inOutDate'] = datetime.strptime(record['inOutDate'], '%Y-%m-%d %H:%M:%S') # 更多清洗操作... cleaned_data = clean_data(data['result']) ``` 最后,将清洗后的数据写入目标系统,例如BI拉伯塔中的其他入库单表: ```python def write_to_target_system(cleaned_data): # 假设目标系统有一个API可以接收批量插入的数据 target_url = 'https://bi.labata.com/api/v1/storage' response = requests.post(target_url, headers=headers, data=json.dumps(cleaned_data)) write_to_target_system(cleaned_data) ``` 通过以上步骤,我们完成了从吉客云接口获取并加工数据的全过程。在实际项目中,可以根据具体需求调整参数和处理逻辑,以达到最佳效果。 ![金蝶与CRM系统接口开发配置](https://pic.qeasy.cloud/S23.png~tplv-syqr462i7n-qeasy.image) ### 数据集成生命周期中的ETL转换与写入 在数据集成的过程中,将源平台的数据转换为目标平台所能接受的格式是至关重要的一步。本文将详细探讨如何利用轻易云数据集成平台将吉客云的“其他入库单查询”数据转换并写入到BI拉伯塔的MySQL数据库中。 #### 数据请求与清洗 在进行ETL转换之前,首先需要从源系统吉客云中获取“其他入库单查询”的数据。这一步骤涉及到数据请求和清洗,确保获取的数据符合目标平台的需求。 #### 数据转换与写入 接下来,我们重点关注如何将清洗后的数据进行转换,并通过MySQL API接口写入到目标平台。以下是元数据配置及其应用示例: ```json { "api": "batchexecute", "effect": "EXECUTE", "method": "SQL", "number": "id", "id": "id", "name": "id", "idCheck": true, "request": [ {"field":"recId","label":"入库单ID","type":"string","value":"{recId}"}, {"field":"goodsdocNo","label":"入库单号","type":"string","value":"{goodsdocNo}"}, {"field":"billNo","label":"上游单据号(关联单号)","type":"string","value":"{billNo}"}, {"field":"inOutDate","label":"入库时间","type":"string","value":"{{inOutDate|datetime}}"}, {"field":"gmtCreate","label":"创建时间","type":"string","value":"{{gmtCreate|datetime}}"}, {"field":"inouttype","label":"入库类型编码(100-期初库存 101-采购入库...)","type":"string","value":"{inouttype}"}, {"field":"inouttypeName","label":"入库类型名称(100-期初库存 101-采购入库...)","type":"string","value":"{inouttypeName}"}, {"field":"vendCustomerCode","label":"往来单位编号","type":"string","value":"{vendCustomerCode}"}, {"field":"vendCustomerName","label":"往来单位名称","type":"string","value":"{vendCustomerName}"}, {"field":"currencyCode","label":"币种编号","type":"string","value":"{currencyCode}"}, {"field":"currencyRate","label":"币种汇率","type":"string","value":"{currencyRate}"}, {"field":"userName","label":"业务员名字","type":"string","value":"{userName}"}, {"field":"","label":"","type":"","value":""} ], "otherRequest": [ { "field": "main_sql", "label": "主语句", "type": "string", "describe": "SQL首次执行的语句,将会返回:lastInsertId", "value": `REPLACE INTO erp_other_storage_goodsdocin (recId, goodsdocNo, billNo, inoutDate, gmtCreate, inouttype, inouttypeName, vendCustomerCode, vendCustomerName, currencyCode, currencyRate, userName) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)` }, { "field": "limit", "label": "limit", "type": "string", "value": "1000" } ] } ``` #### SQL语句解析 上述配置中的`main_sql`字段定义了一个SQL插入语句,用于将数据写入MySQL数据库。该语句使用了占位符`?`,这些占位符将在实际执行时被具体的数据值替换。 例如: ```sql REPLACE INTO erp_other_storage_goodsdocin (recId, goodsdocNo, billNo, inoutDate, gmtCreate, inouttype, inouttypeName, vendCustomerCode, vendCustomerName, currencyCode, currencyRate, userName) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ``` 在执行时,系统会自动将占位符替换为相应的数据值,如: ```sql REPLACE INTO erp_other_storage_goodsdocin (recId, goodsdocNo, billNo, inoutDate, gmtCreate, inouttype, inouttypeName, vendCustomerCode,vendCustomerName,currencyCode,currencyRate,userName) VALUES ('12345', 'IN20230901', 'UP20230901', '2023-09-01 10:00:00', '2023-09-01 09:00:00', '101', '采购入库', 'CUST001', '客户A', 'CNY', '6.5', '张三') ``` #### 数据类型转换 在元数据配置中,字段`inOutDate`和`gmtCreate`等日期字段需要进行格式转换。通过使用模板字符串如`{{inOutDate|datetime}}`,可以确保日期格式符合目标平台要求。 #### 批量执行 为了提高效率,可以使用批量执行API (`batchexecute`) 一次性处理多条记录。通过设置合适的批处理大小(如配置中的`limit:1000`),可以有效减少网络延迟和资源消耗。 #### 异常处理 在实际操作中,需要考虑异常处理机制。例如,当某条记录插入失败时,应记录错误信息并继续处理后续记录,以确保整体流程的稳定性和可靠性。 ### 总结 通过上述步骤,我们实现了从吉客云到BI拉伯塔MySQL数据库的数据ETL转换和写入。在此过程中,充分利用了轻易云数据集成平台提供的元数据配置功能,实现了高效、可靠的数据集成。 ![如何对接用友BIP接口](https://pic.qeasy.cloud/T13.png~tplv-syqr462i7n-qeasy.image)