和小青GPT助理聊天吧

小青是一位基于大语言预训练模型的超级智能AI,熟悉轻易云产品并能为您提供多种数据集成方案和技术支持。您在使用轻易云产品时遇到问题或有关技术咨询,小青都将为您提供专业、即时、高效的帮助和建议。与小青智能对话,您可享受同人类互动一样的使用体验,全面了解最新产品更新、功能特性及相关技术资讯,进一步提升您在数据集成领域的实力。小青是您友好、智能、便捷的助手,与她互动让您的数据集成之路轻松愉快。
轻易云自训练大语言模型小青助理
扫码关注公众号即可开始聊天

轻易云数据集成平台的源平台调度者生命周期(专业流程版本)

轻易云数据集成平台的源平台调度者生命周期详解

轻易云数据集成平台为企业提供了一套完整的数据处理流程,包括数据抽取、清洗、转换及转发等关键步骤。本文旨在深入探讨平台中“数据抽取”阶段的核心组件——源平台调度者的生命周期管理及操作流程,确保开发工程师能够充分理解并有效地实施数据集成方案。

数据抽取是数据集成的首步,通常通过“源平台定时调度请求任务”来实现。该过程涉及以下关键步骤:

1 定时任务调度

  • 调度者分配与配置: 轻易云平台预设了20个调度者(dispatcher-0至dispatcher-19),通过Linux crontab计划任务实现每分钟一次的调度命令执行。在数据集成方案的配置页面中,用户可指定“调度号”以分配特定调度者,此举是为了在处理数百至数千个集成方案时优化性能,防止单个调度者的任务阻塞。
  • 调度命令执行: 每个调度者将执行cd ./dispatcher && php dispatcher-[0~19] schedule:run命令,根据分配的“调度号”识别并启动相应的集成方案。

2 集成方案遍历与调度命令生成

  • 异步方案识别: 在遍历集成方案时,首先排除非异步方案,这些通常由事件触发或外部系统激活。
  • 调度命令创建: 对于需要调度的方案,将创建一个或多个调度命令,例如dispatch:datahub [task_id] --source --asyn,以及对应目标平台的调度命令。这些命令将根据crontab延迟执行。
  • 补漏命令生成: 如果集成方案配置了补漏措施,则会创建一个特殊的补漏命令,例如dispatch:datahub [task_id] --source --asyn --omission,按照配置的补漏crontab执行。

3 调度命令执行与队列管理

  • 调度命令激活: 到达指定时间点后,创建的调度命令将被激活,转化为特定格式并放入AsynDispatcher队列池中排队,等待处理。
  • 队列池任务消费: AsynDispatcher队列池将对排队的任务进行处理,包括再次确认任务启动条件、检查调度条件是否满足,以及执行具体的调度工作。

4 适配器加载与任务调度

  • 适配器加载: 调度过程中首先加载集成方案配置的源平台适配器。
  • 任务调度执行: 适配器初始化后,执行$adapter->dispatch()方法,负责完成具体的调度任务。包括插入调度日志、生成任务请求参数、将新任务参数写入任务存储、将任务ID插入源任务队列池进行排队,最终插入调度结束日志。

5. 异步队列池任务消费

异步队列池(AsynDispatcher队列池)的任务消费过程是数据抽取阶段的核心。以下步骤详细描述了从任务验证到执行的流程:

5.1 任务验证与条件检查

  • 任务状态确认: 消费任务前,首先确认任务是否处于启动状态,并检查是否满足调度条件。这包括验证是否有前置任务正在执行,以确保调度的顺序性和依赖性。
  • 调度条件满足后的处理: 一旦确认任务满足所有调度条件,系统将调用Instance::handleSourceDispatch方法来执行具体的调度工作。

5.2 适配器操作与任务执行

  • 适配器加载与初始化: 系统将加载指定的源平台适配器,并初始化适配器参数。
  • 执行适配器调度方法: 初始化后,适配器的dispatch()方法被调用,执行包括插入调度开始日志、生成任务请求参数、将请求参数写入异步源任务存储、将任务ID插入源任务队列池、插入调度结束日志等步骤。

6. 数据抽取与处理

适配器完成任务调度后,接下来的步骤涉及数据的实际抽取和处理:

6.1 数据抽取任务的执行

  • 任务详情获取: 根据任务ID,从异步源任务存储中获取任务的详细信息,包括请求参数等。
  • 适配器连接与执行: 适配器首先尝试连接到源平台,确保连接成功后,执行具体的数据请求操作,通常通过调用适配器的invoke()方法完成。

6.2 数据处理与转换

  • 响应数据处理: 任务执行后,适配器处理源平台的响应数据,包括检查响应状态、处理成功或失败的响应、以及对成功响应的数据进行进一步处理。
  • 数据加工与转换: 在适配器处理响应数据后,可能会触发脚本加工厂的调度方法,对数据进行加工和转换,以满足集成方案的需求。

7. 任务状态管理与日志记录

任务在执行过程中,其状态管理和日志记录是至关重要的:

7.1 任务状态更新

  • 任务完成状态标记: 完成数据处理后,任务将被标记为“已完成”状态,确保系统正确跟踪任务的执行结果。
  • 错误处理与重排: 如果任务执行失败,系统将执行错误处理流程,包括异常记录和判断是否需要任务重新排队。

7.2 日志记录

  • 调度日志: 从调度开始到结束,系统将记录详细的调度日志,包括任务开始、结束时间,以及任何重要的状态变更信息。
  • 适配器日志: 适配器操作过程中产生的日志也被记录,包括连接状态、数据处理结果等,以便于问题诊断和性能优化。

8. 源平台事件关联与触发

最后,轻易云数据集成平台支持源平台事件关联,允许一个集成方案的执行触发其他方案的调度:

8.

1 事件关联配置

  • 源平台事件关联: 在某些场景中,一个集成方案的成功执行可能需要触发另一个方案的调度。这通过配置源平台事件关联来实现。

  • 后续方案的调度触发: 当一个方案成功完成后,系统将根据配置的事件关联检查是否需要触发其他集成方案的调度,确保数据集成流程的连续性和自动化。

9. 异步队列池中任务的具体执行过程

9.1 任务的获取与验证

  • 获取任务详情: 对于异步队列池中的每个任务,系统首先通过任务ID从getAsynSourceJobStorage获取任务的具体详情,包括执行所需的所有参数。
  • 任务状态检查: 系统接着检查任务的当前状态。如果任务已经标记为错误或已完成,系统将不会继续执行该任务,并将其从队列中移除。

9.2 适配器连接与任务执行

  • 适配器连接: 确认任务有效后,系统将通过调用适配器的connect()方法来检测与源平台的连接是否成功。
  • 执行任务: 连接成功后,适配器的invoke()方法被调用,传入任务的参数,向源平台发送请求并获取响应。这一步是数据抽取过程的核心,适配器内部的SDK负责具体的数据请求与接收。

9.3 响应处理与数据加工

  • 响应数据处理: 接收到源平台的响应后,适配器将根据响应状态进行处理。成功的响应将进入数据加工阶段,而失败的响应则会触发错误处理流程。
  • 脚本加工厂调度: 对于成功的响应,可能会触发脚本加工厂的调度方法ScriptFactory::dispatch(),以执行任务完成后的数据加工,如数据清洗、格式转换等。

9.4 错误处理与任务重排

  • 错误处理: 如果适配器在执行过程中遇到错误,将调用handleError方法进行错误处理。该过程包括异常记录和判断是否需要将任务重新排队。
  • 任务重排: 在某些情况下,任务执行失败可能是由于临时问题造成的,如网络不稳定。此时,系统可能会决定将任务重新放入队列中,以便稍后重试。

9.5 成功响应的后续处理

  • 处理响应数据: 对于成功的响应,适配器将处理响应中的数据,可能包括数据解析、格式化等操作,以确保数据能够被后续流程正确处理。
  • 数据存储与队列管理: 处理完响应数据后,系统将相关数据存入数据存储,并根据需要更新任务的状态,如标记为已完成。

9.6 任务完成后的事件触发

  • 脚本加工厂的再次调用: 在任务成功完成并处理响应数据之后,系统可能会再次调用脚本加工厂的事件,例如AfterSourceResponseSuccess,以执行特定的后处理脚本,进一步加工或验证数据。

热门文章

金蝶云星空与飞书集成打通发起审批实例

2024-02-28 10:52:14

企业案例:简道云对接金蝶云星空

2024-02-28 10:48:01

API资产的【代码】字段有什么作用?

2024-02-23 07:15:36

操作日志审计里面具体能够看到什么?

2024-02-22 02:27:15

请求调度者中的【历史版本】管理有什么作用

2024-02-21 11:16:14