ETL转换实现:对账系统货品价格执行日期写入MySQL

  • 轻易云集成顾问-曹润
### 钉钉数据集成到MySQL案例分享:对账系统--查询货品价格执行日期(报价流程) 在本次技术案例中,我们将详解如何通过轻易云数据集成平台,实现钉钉与MySQL的高效对接,具体聚焦于"对账系统--查询货品价格执行日期(报价流程)"的实现。此方案不仅确保了大量数据能够快速写入MySQL,还注重实时性和可靠性的双重保障。 首先,我们需要从钉钉获取有关商品报价的数据。为此我们调用了API接口`v1.0/yida/processes/instances`,该接口能够抓取到每个商品的报价记录,并提供详细的实例信息。然而,在实际实施过程中,我们面临以下几个技术挑战: 1. **定时可靠抓取**:为了保证数据的一致性和完整性,需要设计一个机制来定时、批量地从钉钉接口获取最新的数据。 2. **分页与限流处理**:由于API接口具有分页限制和请求频率限制,必须在脚本设计中兼顾这些约束条件,以确保不会因请求过多而被限流。 3. **自定义数据转换逻辑**:API返回的数据格式可能不完全符合我们的数据库表结构,因此须进行必要的数据转换和映射。 在解决这些问题之后,下一个任务是将获取到的数据写入MySQL。使用轻易云提供的集中监控和告警系统,使得我们能实时跟踪数据集成任务状态和性能,迅速发现并应对任何潜在的问题。此外,通过支持高吞吐量的数据写入能力,大量的数据可以迅速且准确地被导入至目标数据库。这部分工作主要涉及以下几点: 1. **异常处理与错误重试机制**:对于可能发生的写入失败场景,如因网络波动或其他原因导致操作未生效,需要建立错误捕捞机制并自动触发重试。 2. **可视化工具设计优化流程**:利用平台内置的可视化设计工具,可以更直观地配置整个数据流,从源头采集、转换,到最终目的地存储,每一步都清晰明了,大大提升管理效率。 以上步骤仅覆盖了整体方案中的一部分关键点。在后续内容中,将进一步分享具体代码实现细节及相关配置方法,以便更好地指导类似需求场景下的数据集成工作。 ![如何对接用友BIP接口](https://pic.qeasy.cloud/D15.png~tplv-syqr462i7n-qeasy.image) ### 调用钉钉接口获取并加工数据的技术案例 在数据集成生命周期的第一步,调用源系统接口获取数据是至关重要的一环。本文将详细探讨如何通过轻易云数据集成平台调用钉钉接口`v1.0/yida/processes/instances`,并对返回的数据进行加工处理。 #### API 接口配置 首先,我们需要配置API接口的元数据。以下是该接口的详细配置: ```json { "api": "v1.0/yida/processes/instances", "method": "POST", "number": "title", "id": "processInstanceId", "pagination": { "pageSize": 50 }, "idCheck": true, "request": [ {"field":"pageSize","label":"分页大小","type":"string","describe":"分页大小","value":"10"}, {"field":"pageNumber","label":"分页页码","type":"string","describe":"分页页码"}, {"field":"appType","label":"应用ID","type":"string","describe":"应用ID","value":"APP_UYN987QNZ82Q4QK409VT"}, {"field":"systemToken","label":"应用秘钥","type":"string","describe":"应用秘钥","value":"DR766X813F8925E1F57YN8U6ZQFR26RQKCJFL04"}, {"field":"userId","label":"用户的userid","type":"string","describe":"用户的userid","value":"16000443318138909"}, {"field":"language","label":"语言","type":"string","describe":"语言,取值:zh_CN:中文(默认值)en_US:英文"}, {"field":"formUuid","label":"表单ID","type":"string","describe":"表单ID","value":"FORM-RS966T81BIGD6RSS9B74Z86BMX8G28U4GFGLL5"}, {"field":"searchFieldJson","label":"条件","type": "object", "children":[{"parent": "searchFieldJson", "label": "品牌", "field": "selectField_lmhje0qv", "type": "string"}]}, {"field": "createFromTimeGMT", "label": "创建时间起始值", "type": "string", "describe": "创建时间起始值", "value": "2023-01-17 00:00:00"}, {"field": "createToTimeGMT", "label": "创建时间终止值", "type": "string", "describe": "创建时间终止值", "value": "{{CURRENT_TIME|datetime}}"}, {"field": "modifiedFromTimeGMT", "label": "修改时间起始值", "type": “string”, “describe”: “修改时间起始值”}, {"field”: “modifiedToTimeGMT”, “label”: “修改时间终止值”, “type”: “string”, “describe”: “修改时间终止值”}, {"field”: “taskId”, “label”: “任务ID”, “type”: “string”, “describe”: “任务ID”}, {"field”: “instanceStatus”, “label”: “实例状态”, “type”: “string”, “describe”: “实例状态”, “value”:“COMPLETED”}, {"field”:“approvedResult”,“label”:流程审批结果”,“类型”:字符串”,“描述”:流程审批结果”,“价值”:同意} ], autoFillResponse: true, effect: QUERY, beatFlat: ["tableField_llhrnnb5"] } ``` #### 请求参数详解 - **pageSize** 和 **pageNumber**:用于控制分页请求,每次请求返回的数据量和当前请求的页码。 - **appType** 和 **systemToken**:分别是应用ID和应用秘钥,用于身份验证。 - **userId**:用户的唯一标识符。 - **language**:指定返回数据的语言,可以是中文(zh_CN)或英文(en_US)。 - **formUuid**:表单ID,用于指定查询哪个表单的数据。 - **searchFieldJson**:查询条件,这里可以根据具体需求设置不同的查询条件,例如品牌等。 - **createFromTimeGMT** 和 **createToTimeGMT**:用于过滤创建时间范围内的数据。 - **modifiedFromTimeGMT** 和 **modifiedToTimeGMT**:用于过滤修改时间范围内的数据。 - **taskId**、**instanceStatus** 和 **approvedResult**:分别表示任务ID、实例状态和审批结果。 #### 数据请求与清洗 在配置好API接口后,我们可以通过轻易云平台发起请求,获取钉钉中的报价流程实例数据。以下是一个示例请求: ```json { pageSize: 10, pageNumber: 1, appType: 'APP_UYN987QNZ82Q4QK409VT', systemToken: 'DR766X813F8925E1F57YN8U6ZQFR26RQKCJFL04', userId: '16000443318138909', language: 'zh_CN', formUuid: 'FORM-RS966T81BIGD6RSS9B74Z86BMX8G28U4GFGLL5', searchFieldJson: { selectField_lmhje0qv: '某品牌' }, createFromTimeGMT: '2023-01-17 00:00:00', createToTimeGMT: '2023-12-31 23:59:59', instanceStatus: 'COMPLETED', approvedResult: 'agree' } ``` 通过上述请求,我们可以获取到符合条件的报价流程实例数据。接下来,我们需要对这些数据进行清洗和加工,以便进一步使用。 #### 数据转换与写入 在获取到原始数据后,我们需要对其进行必要的转换。例如,将日期格式标准化、过滤掉无用字段、合并相关信息等。以下是一个简单的数据清洗示例: ```python import json from datetime import datetime # 假设response_data是从API获取到的原始数据 response_data = ''' [ { processInstanceId: '12345', title: '报价流程A', createTimeGMT:'2023-01-18T08:30:00Z', ... }, ... ] ''' data = json.loads(response_data) # 清洗和转换数据 cleaned_data = [] for item in data: cleaned_item = { 'id': item['processInstanceId'], 'title': item['title'], 'create_time': datetime.strptime(item['createTimeGMT'], '%Y-%m-%dT%H:%M:%SZ').strftime('%Y-%m-%d %H:%M:%S') # 添加其他需要处理的字段 } cleaned_data.append(cleaned_item) # 将清洗后的数据写入目标系统或数据库 ``` 通过以上步骤,我们完成了从调用钉钉接口获取报价流程实例数据,到对数据进行清洗和转换,并最终准备好将其写入目标系统或数据库。 这种全生命周期管理的数据处理方式,不仅提高了业务透明度,还极大提升了效率。 ![如何开发企业微信API接口](https://pic.qeasy.cloud/S1.png~tplv-syqr462i7n-qeasy.image) ### 使用轻易云数据集成平台实现ETL转换:对账系统查询货品价格执行日期 在数据集成的生命周期中,将源平台的数据进行ETL转换并写入目标平台是关键步骤之一。本文将重点探讨如何利用轻易云数据集成平台将对账系统中的货品价格执行日期信息,转换为MySQL API接口所能接收的格式,并最终写入目标平台。 #### 元数据配置 在进行ETL转换之前,首先需要了解元数据配置。这次我们使用的元数据配置如下: ```json { "api": "execute", "method": "POST", "idCheck": true } ``` 此配置表明我们将通过POST方法调用MySQL API的`execute`接口,并且启用了ID检查功能。 #### 数据请求与清洗 在ETL流程中,第一步是从对账系统中请求原始数据并进行清洗。假设我们已经完成了这一步,获取到了如下结构的原始数据: ```json [ { "product_id": "12345", "price": 100.0, "effective_date": "2023-10-01" }, { "product_id": "67890", "price": 200.0, "effective_date": "2023-10-05" } ] ``` #### 数据转换 接下来,我们需要将这些原始数据转换为目标平台MySQL API能够接收的格式。根据元数据配置,我们需要构建一个适合POST请求体的JSON对象。 首先,我们定义一个函数来处理单条记录的转换: ```python def transform_record(record): return { "query": f"INSERT INTO product_prices (product_id, price, effective_date) VALUES ('{record['product_id']}', {record['price']}, '{record['effective_date']}') ON DUPLICATE KEY UPDATE price={record['price']}, effective_date='{record['effective_date']}'" } ``` 这个函数生成了一个SQL插入语句,并处理了主键冲突(假设`product_id`是主键),以确保如果记录已经存在,则更新其价格和执行日期。 然后,我们将所有记录进行批量转换: ```python def transform_data(data): return [transform_record(record) for record in data] ``` 调用此函数后,我们得到如下结构的数据: ```json [ { "query": "INSERT INTO product_prices (product_id, price, effective_date) VALUES ('12345', 100.0, '2023-10-01') ON DUPLICATE KEY UPDATE price=100.0, effective_date='2023-10-01'" }, { "query": "INSERT INTO product_prices (product_id, price, effective_date) VALUES ('67890', 200.0, '2023-10-05') ON DUPLICATE KEY UPDATE price=200.0, effective_date='2023-10-05'" } ] ``` #### 数据写入 最后一步是将转换后的数据通过API接口写入目标平台。我们使用Python的requests库来发送POST请求: ```python import requests url = 'http://target-platform.com/api/execute' headers = {'Content-Type': 'application/json'} def write_data(transformed_data): for record in transformed_data: response = requests.post(url, json=record, headers=headers) if response.status_code != 200: print(f"Failed to write record: {record}") else: print(f"Successfully wrote record: {record}") # 执行写入操作 transformed_data = transform_data(original_data) write_data(transformed_data) ``` 在这个过程中,每一条记录都会被逐条发送到MySQL API接口。如果某条记录写入失败,系统会输出错误信息,以便后续排查。 通过以上步骤,我们成功地实现了从对账系统到MySQL目标平台的数据ETL转换和写入。这种方法不仅保证了数据的一致性和完整性,还提升了整个集成过程的效率和透明度。 ![如何对接用友BIP接口](https://pic.qeasy.cloud/T20.png~tplv-syqr462i7n-qeasy.image)