本文深入解析了xxl-job的源码,xxl-job是一个分布式任务调度平台,其核心设计思想是将调度行为抽象成“调度中心”,而任务逻辑则由“执行器”处理,实现调度与任务的解耦。文章详细介绍了调度器和执行器的初始化流程、任务执行机制,并探讨了xxl-job的关键组件和线程池的设计,以及任务触发和执行的具体实现。
将调度行为抽象形成“调度中心”公共平台,而平台自身并不承担业务逻辑,“调度中心”负责发起调度请求。 将任务抽象成分散的JobHandler,交由“执行器”统一管理,“执行器”负责接收调度请求并执行对应的JobHandler中业务逻辑。 因此,“调度”和“任务”两部分可以相互解耦,提高系统整体稳定性和扩展性;
负责管理调度信息,按照调度配置发出调度请求,自身不承担业务代码。调度系统与任务解耦,提高了系统可用性和稳定性,同时调度系统性能不再受限于任务模块;
支持可视化、简单且动态的管理调度信息,包括任务新建,更新,删除,GLUE开发和任务报警等,所有上述操作都会实时生效,同时支持监控调度结果以及执行日志,支持执行器Failover。
负责接收调度请求并执行任务逻辑。任务模块专注于任务的执行等操作,开发和维护更加简单和高效; 接收“调度中心”的执行请求、终止请求和日志请求等。
start 方法中定义了一个快线程池和慢线程池,快线程池的最大线程数量最小为200,慢线程池的最大线程数量最小为100。 快慢线程池的设计思想是为了解决任务执行器并发度控制的问题,让任务执行器在任务量较大时能够快速响应,同时在任务量较小时能够节省资源。具体来说,快线程池和慢线程池的作用如下:
- 快线程池:用于执行任务量较大的任务,线程数较多,执行速度较快,能够快速响应任务。
- 慢线程池:用于执行任务量较小的任务,线程数较少,执行速度较慢,能够节省资源。
创建一个线程池 registryOrRemoveThreadPool 用于注册或删除任务,创建一个后台守护线程 registryMonitorThread ,使用了一个死循环每隔30秒执行一次,删除超时的注册表信息,更新xxl_job_group执行器地址列表。
创建一个守护线程monitorThread ,如果失败任务设置了重试机制,则触发重试流程,设置了告警策略,则会根据告警策略触发告警操作。
创建一个回调线程池 callbackThreadPool 和一个守护线程monitorThread,守护线程负责将调度记录停留在 "运行中" 状态超过10min,且对应执行器心跳注册失败不在线,则将本地调度主动标记失败;
每分钟刷新一次日志报告,包括当天的运行情况,包括运行次数、成功次数、失败次数等,并将这些信息保存到数据库中。 每天执行一次日志清理,删除指定天数前的日志数据。
在这个类中,定义了两个线程,第一个线程是定时任务扫描处理线程,第二个线程则是一个时间轮线程。
scheduleThread线程中做的事情是将JobInfo中即将要执行的任务取出来(5秒的预读时间),然后根据三种不同的情况分别进行处理:
- 如果当前时间大于任务触发时间+5秒,说明这个任务漏触发,根据触发漏发策略决定是否执行任务。
- 如果当前时间大于任务的触发时间且小于触发时间+5秒,触发任务,计算下一次任务执行时间,如果下一次任务执行时间在五秒内,则放入时间轮。
- 其他情况,任务还没到时间触发,则放入时间轮中。
ringThread线程将时间轮中的任务按秒触发。
在XxlJob源码的samples代码中,配置文件里可以看到这样一段配置
在这段配置里配置了Xxl所需要的配置信息,包括地址、IP、端口等信息,这里的XxlJobSpringExecutor是一个任务执行器。
xxl-job任务执行器涉及到的类主要三个:XxlJobExecutor、XxlJobSimpleExecutor和XxlJobSpringExecutor。其中XxlJobSimpleExecutor和XxlJobSpringExecutor继承自XxlJobExecutor,两个类分别用于处理普通任务和Spring任务。
接着定义了一个start方法和destroy方法,来看看start方法
start()方法中定义了五个方法的初始化流程,我们依次来看一下
3.1.1. initLogPath()
这个方法是初始化日志相关的信息,初始化了日志文件的路径和glue文件的路径。
3.1.2. initAdminBizList()
这个方法主要做了下面几件事情
- 通过解析传入的adminAddresses参数,将调度中心的地址拆分为多个地址字符串。
- 遍历每个地址字符串,创建一个AdminBizClient对象,该对象负责与对应的调度中心地址建立连接和进行通信。AdminBizClient是XxlJob提供的RPC调用客户端,用于与调度中心进行交互。
- 将创建的AdminBizClient对象添加到adminBizList列表中,以便后续使用。
总结起来,这段代码的作用是根据传入的调度中心地址和访问令牌,创建与调度中心的连接,并将连接对象存储在adminBizList列表中,以便后续使用。通过这个列表,可以实现与调度中心的交互,例如获取任务、上报任务执行结果等操作。
3.1.3. JobLogFileCleanThread
JobLogFileCleanThread 方法初始化了一个清理日志的线程,这个方法实现了一个定时清理过期日志文件的功能,通过启动一个后台线程,在每天固定时间执行清理操作,删除过期的日志文件。
3.1.4. TriggerCallbackThread
这是一个触发回调的方法,在这个回调方法中创建了两个回调线程,第一个是回调线程
在上面线程中,在正常运行情况下,一直会通过循环取出回调任务,然后通过doCallback进行回调。当线程将要被关闭的时候,会有一段代码将回调队列中剩下的回调任务全部执行。
3.1.5. doCallback()
执行回调调用了doCallback方法
在callback方法中,会遍历所有的调度器集合,然后每个调度器执行callback方法,这里的callback方法会通过rpc去调用调度器的方法。然后根据回调拿到的结果,执行不同的写日志的操作。
具体的执行过程,我们在后续的文章里聊。 doCallback方法的最后会将错误写入到失败文件中,这个失败文件会用作失败重试。
第一个回调线程做的事情都讲清楚了,接下来介绍TriggerCallbackThread中的第二个线程,重试线程。
这个线程就做一件事情,每隔30秒执行一次retryFailCallbackFile()方法,在这个方法中,会扫描上面一步写入的失败文件,然后取出入参后再次调用doCallback方法,实现重试。
3.1.6. initEmbedServer()
这个方法负责初始化 xxl-job 的 EmbedServer,并配置服务器的地址、端口、应用名称和访问令牌等参数,然后启动该服务器。
这个EmbedServer服务是基于Netty实现的NIO服务,主要为了实现包括任务下发、执行结果上报、心跳等功能。同时,通过嵌入式的 Netty HTTP 服务器,可以方便地处理任务调度中心的 HTTP 请求,并执行相应的任务操作。
XxlJobExecutor有两个子类,其中一个是XxlJobSimpleExecutor,用于处理简单的Java程序,不依赖Spring容器。
首先执行Job任务的注册,然后执行父类的start方法。
注册方法也比较简单,如果用@XxlJob注解了,就注册到JobHandler中。
XxlJobSpringExecutor是XxlJobExecutor的另一个子类,是 xxl-job 框架提供的基于 Spring 的执行器实现类,它是一个 Spring Bean,在 Spring 容器中进行管理。
XxlJobSpringExecutor的初始化也是先执行Job任务的注册,然后执行父类的start方法。中间有一个刷新GlueFactory实例的方法,Glue是xxljob支持的一种脚本执行模式。注册方法的注释已经放在代码内了。
不管是主动触发执行还是被动触发执行,都会进入到JobInfoController 中的triggerJob 方法
在这个接口中,首先会判断 executorParam 是否为 null,如果是 null 的话就设置executorParam参数为空,接着触发 JobTriggerPoolHelper 的 trigger 方法。
进入trigger方法后会进入到addTrigger方法内,这个方法做了两个事情,第一按执行耗时,将任务分到快线程池和或者慢线程池。第二件事情是触发 XxlJobTrigger.trigger 来执行任务。
在trigger方法中,会根据传入的参数做一系列的初始化,然后执行processTrigger方法进行任务的触发。
processTrigger方法主要作用是处理触发作业执行的逻辑。它初始化触发参数,确定执行器地址,触发执行器执行作业,并记录触发信息和日志。
在上面这个方法中,通过runExecutor(triggerParam, address)触发任务的执行。在这个runExecutor方法中,通过RPC的方式找到对应地址的执行器,调用执行器的run方法。并将调用的结果返回。
executorBiz.run(triggerParam),会根据传入的条件构建出IJobHandler和JobThread,最终将JobThread传入TriggerQueue等待触发。具体的细节流程我都在代码里加上中文注释了。
在上面这个方法中,最终会把triggerParam放入TriggerQueue中,那么真正的任务执行是在哪里呢?还是看上面这段代码,XxlJobExecutor.registJobThread这个方法中,注册了一个新的jobThread,并且通过start方法启动了这个线程。
调用了线程的start方法之后,JobThread的run方法就开始执行。
真正执行是在handler.execute(); execute方法通过反射,执行目标方法。