当前位置: 首页 > 后端技术 > Java

Quartz相关的线程

时间:2023-04-01 21:55:43 Java

上一篇我们提到了Quartz相关的线程,但是散落在好几篇文章中,不好找。Quartz涉及到的线程对于理解Quartz也很重要,所以今天就抽出来单独说一下。Quartz中的主线程:任务调度线程QuartzSchedulerThread任务执行线程Misfire处理线程ClusterManager线程任务调度线程QuartzSchedulerThreadQuartzSchedulerThread是一个任务调度线程,它的职责是满足触发条件(nextFireTimer到达)注册到JobStoreTrigger分配给可用的任务要执行的执行线程。QuartzSchedulerThread启动任务调度线程QuartzSchedulerThread在创建调度器Scheduler时启动。应用层通过以下调用创建Scheduler:Schedulersche=newStdSchedulerFactory().getScheduler();一般Scheduler是通过StdSchedulerFactory构造的,getScheduler首先尝试从SchedulerRepository中获取Schedule。如果第一次获取不到,则通过instantiate()方法获取。publicSc??hedulergetScheduler()抛出SchedulerException{if(cfg==null){initialize();}SchedulerRepositoryschedRep=SchedulerRepository.getInstance();调度程序sched=schedRep.lookup(getSchedulerName());if(sched!=null){if(sched.isShutdown()){schedRep.remove(getSchedulerName());}else{返回计划;}}计划=实例化();返回计划;Quartz所有相关组件的初始化。其中,会创建QuartzScheduler,然后将QuartzScheduler打包成stdScheduler,存放在SchedulerRepository中。instantiate()创建QuartzScheduler是调用的构造方法:publicQuartzScheduler(QuartzSchedulerResourcesresources,longidleWaitTime,@DeprecatedlongdbRetryInterval)throwsSchedulerException{this.resources=resources;if(resources.getJobStore()instanceofJobListener){addInternalJobListener((JobListener)resources.getJobStore());}this.schedThread=newQuartzSchedulerThread(this,resources);ThreadExecutorschedThreadExecutor=resources.getThreadExecutor();schedThreadExecutor.execute(this.schedThread);如果(idleWaitTime>0){this.schedThread.setIdleWaitTime(idleWaitTime);}jobMgr=newExecutingJobsManager();addInternalJobListener(jobMgr);errLogger=newErrorLogger();addInternalSchedulerListener(errLogger);signaler=newSchedulerSignalerImpl(this,this.schedThread);getLog().info("QuartzSchedulerv.&qu哦;+getVersion()+“已创建。”);可以看到构造方法中创建了QuartzSchedulerThread对象,然后获取ThreadExecutor(一般是defaultThreadExecutor),通过调用它的execute方法启动QuartzSchedulerThread线程:voidexecute(Threadthread){thread.start();}}QuartzSchedulerThread的运行其实就是它的run方法,我们已经大致分析过了。主要逻辑是:从作业执行线程池中获取availThreadCount,即当前可用线程数调用JobStore的acquireNextTriggers方法获取特定短时间内可能需要触发的触发器(idleWaitTime,默认30秒),其中的个数不超过availThreadCount调用JobStore的triggersFired方法获取获取到的trigger可能需要处理两次,再次获取到最终的pendingtrigger结果集。对最终pendingtrigger结果集中每一个需要触发的trigger进行循环处理。将触发器与JobRunShell一起打包并发送到线程。池执行与触发器关联的作业。现在我们已经基本弄清楚了作业调度线程QuartzSchedulerThread。Quartz的作业执行线程是在线程池中管理的。默认值为SimpleTreadPool。关于SimpleThreadPool我们有专门的文章文章中有介绍,这里不再赘述。和作业调度线程一样,作业执行线程是在作业调度器Scheduler创建后立即启动的。这个过程也是在StdSchedulerFactory的instantiate()方法中完成的:instantiate()创建SimpleThreadPool后,会调用SimpleThreadPool的initialize方法。根据配置文件中指定的任务执行线程数完成工作线程的初始化和启动。比如配置没看到,设置为10,就会初始化10个工作线程,一个一个启动。作业执行线程的初始化和启动的详细过程请参考Quartz-SimpleThreadPool。MisfireHandler线程如果你的项目使用的是RAMJobStore而不是基于JDBC的JobStore(指需要持久化到数据库的JobStore),那么就没有Misfire处理线程。因为RAMJobStore在处理正常触发器的过程中处理了Misfire,不再需要其他的处理机制。我们在上一篇文章中也分析了Quartz-Misfire(forRAMJobstore)。基于JDBC的JobStore在处理普通触发器时,只获取不错过触发时间的触发器。对于错过触发时间的触发器,即Misfire触发器,需要另一种机制来处理。Misfire处理线程是Quartz使用基于JDBC的JobStore时,用于处理错过触发时机的触发器的线程。MisfireHandler启动MisfireHandler定义在JobStoreSupport类中。JobStoreSupport是JobStore的基于JDBC的JobStore的虚类。Quartz主要提供了基于JDBC的JobStore的两种实现:JobStoreTX和JobStoreCMT。后面我们会分析基于JDBC的JobStore,今天主要分析MisfireHandler。我们在应用中创建任务调度器Scheduler后,需要调用它的start方法:Schedulersche=newStdSchedulerFactory().getScheduler();sche.scheduleJob(jobDetail,触发器);sche.start();这个start方法会调用JobStore的schedulerStarted()方法,如果我们在应用中使用基于JDBC的JobStore,会调用JobStoreSupport的schedulerStarted(),它会创建MisfireHandler,然后调用MisfireHandler的initialize():misfireHandler=newMisfireHandler();如果(initializersLoader!=null)misfireHandler.setContextClassLoader(initializersLoader);misfireHandler.initialize();Misfire的initialize会将创建的MisFireHandle线程交给ThreadExecutor启动。publicvoidinitialize(){ThreadExecutorexecutor=getThreadExecutor();executor.execute(MisfireHandler.this);}MisfireHandle的运行也是MisfireHandle的run方法。处理逻辑和RAMJobStore处理misfiredtrigger类似,只是MisfireHandle的所有处理逻辑都是通过数据库操作完成的。JDBC-BasedJobStore对应的表结构我们会找机会分析一下,这里就不赘述了。MisfireHandle的run方法的主要逻辑是:从数据库中获取处于WAITING(等待执行)状态且下一次执行时间小于msifiredtime(当前时间-MisfireThreshold)的触发器,即miss的触发器触发器时间是为了避免在出现大量misfired触发器的情况下,一次处理过多的数据影响其他正常触发器的执行。MisfireHandle线程每次对每一个错过执行时间的trigger只获取部分而非全部misfiredtrigger(指定了参数maxToRecoverAtATime,默认为20)。触发器(misfiredtrigger),调用触发器的updateAfterMisfire方法获取下次执行时间。updateAfterMisfire方法,我们在上一篇Misfire处理策略的文章中说过,就是根据触发器的处理策略获取下一次执行时间。执行updateAfterMisfire方法最后,如果获取到的触发器下次执行时间不为空,则更新到数据库,等待正常任务执行线程调度执行ClusterManager线程。和MisfiredHandle一样,ClusterManager线程也是JDBC-BasedJobStore特有的。顾名思义,ClusterManager线程与集群有关。JDBC-BasedJobStore可以支持Quartz的集群部署。在集群环境下,Quartz服务节点可能宕机或断开连接,会影响任务的执行。ClusterManager线程负责检查Quartz服务节点的在线状态是否为离线,负责该服务节点的触发器将交给其他服务节点处理。具体逻辑我们分析完QuartzCluster后再补充。上一个easypoi模板exportforeachsinglerowmulti-resultset+mergedcells问题下一个Quartz-JDBC-BasedJobStore