最近在看Apache DolphineScheduler,发现DolphinScheduler在处理任务时,通过先将任务快速的存储在数据库中,然后基于对应的Task,将Task放入队列中,然后将Task进行快速消费的思路。
这种模型在很多框架中,都有体现。
我们知道在Master模块时处理任务的核心模块,而API模块主要是界面中操作的UI逻辑,而Alert模块是告警模块。
因此如果想要了解里面的逻辑,可以通过查看API中工作流的执行可以找到一些线索之外,可以在Master中可以了解到核心的逻辑。
如果想体验相关功能,可以参考官网的搭建过程和相关视频和公众号,这里不展开赘述。
Fetcher突破口
因此我们可以从Fetcher中找到突破口:
告警处理核心
以AlertEventFetcher
为例来了解Apache DolphineScheduler处理任务/告警/事件等的原理。
首先我们需要问自己一个问题,告警和Task任务之间又是如何串联起来的?即告警Alert和业务处理是如何串联起来的?
两者都是随项目模块启动,触发源自于相关工作流和Task处理的事件产生的告警信息,实现对应的event事件,从而进行告警,而告警是通过启动告警模块,进行队列的Put和Take处理,从而实现对应各个渠道的对接告警。
告警信息的放入:
eventPendingQueue.put(alert)
什么时候会Put?
存在告警数据的时候会put。
List<T> pendingEvents = fetchPendingEvent(eventOffset)
从alertMapper.listingAlertByStatus(minAlertId, AlertStatus.WAIT_EXECUTION.getCode(), QUERY\_ALERT\_THRESHOLD)
中获取告警数据。
放入到队列中,核心在saveEvent(AbstractListenerEvent event)
而调用saveEvent(AbstractListenerEvent event)
的地方:
状态:alert_status
为等待执行的状态。
从这些监听事件中,我们可以看到这里的监听事件主要和工作流处理和Task处理监听有关,即和Task和Workflow有关,也即我们最核心的业务处理。
可以根据这些事件找到对应的事件找到对应的业务逻辑处理。
eventPendingQueue.take()
的地方在哪里?
org.apache.dolphinscheduler.alert.service.AbstractEventLoop#run
。
而调用org.apache.dolphinscheduler.alert.service.AlertEventLoop#handleEvent
的地方:
alertChannel.process(alertInfo)
这里会根据对应的渠道类型,进行告警。
doSendEvent
的地方中有两个地方:
org.apache.dolphinscheduler.alert.service.AbstractEventSender#sendEvent
org.apache.dolphinscheduler.alert.service.AlertSender#syncHandler
工作流处理核心
同理我们可以找到处理工作流的核心逻辑,串联的要点在于:
commandFetcher.fetchCommands()
因此我们可以思考如何获取对应的command的逻辑,以及获取之后的处理。
获取command之后,放入到workflowEventQueue
队列中。
处理的核心逻辑在:
org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable#startWorkflow
过程:
initTaskQueue
processStart
submitPostNode
根据这样的思路可以找到上面相关Fetcher的处理的核心逻辑。更多细节和详情,可以去官网了解。
参考:
海豚调度官网:https://dolphinscheduler.apache.org/
Github地址:https://github.com/apache/dolphinscheduler