定时任务——xxl-job源码解析

   日期:2024-12-14    作者:caijiyuan 浏览:65    移动:http://w.yusign.com/mobile/quote/247.html

本文深入解析了xxl-job的源码,xxl-job是一个分布式任务调度平台,其核心设计思想是将调度行为抽象成“调度中心”,而任务逻辑则由“执行器”处理,实现调度与任务的解耦。文章详细介绍了调度器和执行器的初始化流程、任务执行机制,并探讨了xxl-job的关键组件和线程池的设计,以及任务触发和执行的具体实现。

将调度行为抽象形成“调度中心”公共平台而平台自身并不承担业务逻辑,“调度中心”负责发起调度请求。 将任务抽象成分散的JobHandler,交由“执行器”统一管理,“执行器”负责接收调度请求并执行对应的JobHandler中业务逻辑。 因此,“调度”和“任务”两部分可以相互解耦,提高系统整体稳定性和扩展性

负责管理调度信息,按照调度配置发出调度请求,自身不承担业务代码。调度系统与任务解耦,提高了系统可用性和稳定性,同时调度系统性能不再受限于任务模块

支持可视化、简单且动态的管理调度信息,包括任务新建,更新,删除,GLUE开发和任务报警等,所有上述操作都会实时生效,同时支持监控调度结果以及执行日志,支持执行器Failover。

负责接收调度请求并执行任务逻辑。任务模块专注于任务的执行等操作,开发和维护更加简单和高效; 接收“调度中心”的执行请求、终止请求和日志请求等。

 
 
 
 

start 方法中定义了一个快线程池和慢线程池,快线程池的最大线程数量最小为200,慢线程池的最大线程数量最小为100。 快慢线程池的设计思想是为了解决任务执行器并发度控制的问题,让任务执行器在任务量较大时能够快速响应,同时在任务量较小时能够节省资源。具体来说,快线程池和慢线程池的作用如下

  1. 快线程池:用于执行任务量较大的任务,线程数较多,执行速度较快,能够快速响应任务。
  2. 慢线程池:用于执行任务量较小的任务,线程数较少,执行速度较慢,能够节省资源。
 
 

创建一个线程池 registryOrRemoveThreadPool 用于注册或删除任务,创建一个后台守护线程 registryMonitorThread ,使用了一个死循环每隔30秒执行一次,删除超时的注册表信息,更新xxl_job_group执行器地址列表。

 
 

创建一个守护线程monitorThread ,如果失败任务设置了重试机制,则触发重试流程,设置了告警策略,则会根据告警策略触发告警操作。

 
 

创建一个回调线程池 callbackThreadPool 和一个守护线程monitorThread,守护线程负责将调度记录停留在 "运行中" 状态超过10min,且对应执行器心跳注册失败不在线,则将本地调度主动标记失败

 
 

每分钟刷新一次日志报告,包括当天的运行情况,包括运行次数、成功次数、失败次数等,并将这些信息保存到数据库中。 每天执行一次日志清理,删除指定天数前的日志数据。

 
 

在这个类中,定义了两个线程,第一个线程是定时任务扫描处理线程,第二个线程则是一个时间轮线程。

scheduleThread线程中做的事情是将JobInfo中即将要执行的任务取出来(5秒的预读时间,然后根据三种不同的情况分别进行处理

  1. 如果当前时间大于任务触发时间+5秒,说明这个任务漏触发,根据触发漏发策略决定是否执行任务。
  2. 如果当前时间大于任务的触发时间且小于触发时间+5秒,触发任务,计算下一次任务执行时间,如果下一次任务执行时间在五秒内,则放入时间轮。
  3. 其他情况,任务还没到时间触发,则放入时间轮中。

ringThread线程将时间轮中的任务按秒触发。

 

定时任务——xxl-job源码解析

在XxlJob源码的samples代码中,配置文件里可以看到这样一段配置

在这段配置里配置了Xxl所需要的配置信息,包括地址、IP、端口等信息,这里的XxlJobSpringExecutor是一个任务执行器。

xxl-job任务执行器涉及到的类主要三个XxlJobExecutor、XxlJobSimpleExecutor和XxlJobSpringExecutor。其中XxlJobSimpleExecutor和XxlJobSpringExecutor继承自XxlJobExecutor,两个类分别用于处理普通任务和Spring任务。

 

接着定义了一个start方法和destroy方法,来看看start方法

 

start()方法中定义了五个方法的初始化流程,我们依次来看一下

3.1.1. initLogPath()

这个方法是初始化日志相关的信息,初始化了日志文件的路径和glue文件的路径。

 

3.1.2. initAdminBizList()

 

这个方法主要做了下面几件事情

  1. 通过解析传入的adminAddresses参数,将调度中心的地址拆分为多个地址字符串。
  2. 遍历每个地址字符串,创建一个AdminBizClient对象,该对象负责与对应的调度中心地址建立连接和进行通信。AdminBizClient是XxlJob提供的RPC调用客户端,用于与调度中心进行交互。
  3. 将创建的AdminBizClient对象添加到adminBizList列表中,以便后续使用。

总结起来,这段代码的作用是根据传入的调度中心地址和访问令牌,创建与调度中心的连接,并将连接对象存储在adminBizList列表中,以便后续使用。通过这个列表,可以实现与调度中心的交互,例如获取任务、上报任务执行结果等操作。

3.1.3. JobLogFileCleanThread

JobLogFileCleanThread 方法初始化了一个清理日志的线程,这个方法实现了一个定时清理过期日志文件的功能,通过启动一个后台线程,在每天固定时间执行清理操作,删除过期的日志文件。

 

3.1.4. TriggerCallbackThread

这是一个触发回调的方法,在这个回调方法中创建了两个回调线程,第一个是回调线程

 

在上面线程中,在正常运行情况下,一直会通过循环取出回调任务,然后通过doCallback进行回调。当线程将要被关闭的时候,会有一段代码将回调队列中剩下的回调任务全部执行。

3.1.5. doCallback()

执行回调调用了doCallback方法

 

在callback方法中,会遍历所有的调度器集合,然后每个调度器执行callback方法,这里的callback方法会通过rpc去调用调度器的方法。然后根据回调拿到的结果,执行不同的写日志的操作。

 

具体的执行过程,我们在后续的文章里聊。 doCallback方法的最后会将错误写入到失败文件中,这个失败文件会用作失败重试。

 

第一个回调线程做的事情都讲清楚了,接下来介绍TriggerCallbackThread中的第二个线程,重试线程。

 

这个线程就做一件事情,每隔30秒执行一次retryFailCallbackFile()方法,在这个方法中,会扫描上面一步写入的失败文件,然后取出入参后再次调用doCallback方法,实现重试。

 

3.1.6. initEmbedServer

这个方法负责初始化 xxl-job 的 EmbedServer,并配置服务器的地址、端口、应用名称和访问令牌等参数,然后启动该服务器。

 

这个EmbedServer服务是基于Netty实现的NIO服务,主要为了实现包括任务下发、执行结果上报、心跳等功能。同时,通过嵌入式的 Netty HTTP 服务器,可以方便地处理任务调度中心的 HTTP 请求,并执行相应的任务操作。

XxlJobExecutor有两个子类,其中一个是XxlJobSimpleExecutor,用于处理简单的Java程序,不依赖Spring容器。

 

首先执行Job任务的注册,然后执行父类的start方法。

 

注册方法也比较简单,如果用@XxlJob注解了,就注册到JobHandler中。

XxlJobSpringExecutor是XxlJobExecutor的另一个子类,是 xxl-job 框架提供的基于 Spring 的执行器实现类,它是一个 Spring Bean,在 Spring 容器中进行管理。

 

XxlJobSpringExecutor的初始化也是先执行Job任务的注册,然后执行父类的start方法。中间有一个刷新GlueFactory实例的方法,Glue是xxljob支持的一种脚本执行模式。注册方法的注释已经放在代码内了。

 
 

不管是主动触发执行还是被动触发执行,都会进入到JobInfoController 中的triggerJob 方法

在这个接口中,首先会判断 executorParam 是否为 null,如果是 null 的话就设置executorParam参数为空,接着触发 JobTriggerPoolHelper 的 trigger 方法。

 
 

进入trigger方法后会进入到addTrigger方法内,这个方法做了两个事情,第一按执行耗时,将任务分到快线程池和或者慢线程池。第二件事情是触发 XxlJobTrigger.trigger 来执行任务。

 
 

在trigger方法中,会根据传入的参数做一系列的初始化,然后执行processTrigger方法进行任务的触发。

 

processTrigger方法主要作用是处理触发作业执行的逻辑。它初始化触发参数,确定执行器地址,触发执行器执行作业,并记录触发信息和日志。

 

在上面这个方法中,通过runExecutor(triggerParam, address)触发任务的执行。在这个runExecutor方法中,通过RPC的方式找到对应地址的执行器,调用执行器的run方法。并将调用的结果返回。

 
 

executorBiz.run(triggerParam),会根据传入的条件构建出IJobHandler和JobThread,最终将JobThread传入TriggerQueue等待触发。具体的细节流程我都在代码里加上中文注释了。

 
 

在上面这个方法中,最终会把triggerParam放入TriggerQueue中,那么真正的任务执行是在哪里呢?还是看上面这段代码,XxlJobExecutor.registJobThread这个方法中,注册了一个新的jobThread,并且通过start方法启动了这个线程。

 

调用了线程的start方法之后,JobThread的run方法就开始执行。

 

真正执行是在handler.execute(); execute方法通过反射,执行目标方法。

 

本文地址:http://w.yusign.com/quote/247.html    述古往 http://w.yusign.com/static/ , 查看更多

特别提示:本信息由相关用户自行提供,真实性未证实,仅供参考。请谨慎采用,风险自负。


举报收藏 0评论 0
0相关评论
相关行情
推荐行情
点击排行
{
网站首页  |  关于我们  |  联系方式  |  用户协议  |  隐私政策  |  版权声明  |  网站地图  |  排名推广  |  广告服务  |  积分换礼  |  网站留言  |  RSS订阅  |  违规举报  |  鄂ICP备2020018471号