当前位置: 首页 > 后端技术 > Java

Quartz-JDBC-BasedJobStore

时间:2023-04-02 01:30:12 Java

JDBC-BasedJobStore是指使用数据库持久化存储作业相关信息的JobStore,对应于基于内存的RAMJobStore。我们在研究Quartz的Job、JobDetail、Trigger、Job调度线程、Job执行线程的时候,大多数情况下都是基于RAMJobStore来分析的,所以对于RAMJobStore我们已经有所了解,但是对于JDBC-BasedJobStore了解的不多。我们从以下几个角度来分析JDBC-BasedJobStore:涉及的表和每个表的作用,JDBC-BasedJobStore基于作业调度线程和作业执行线程的主要工作流程,JDBC的事务管理和锁机制-BasedJobStoreJDBC-BasedJobStoreBasedJobStorerecoveryandMisfireprocessing以上4点在一篇文章中可能篇幅太长,我们可以分2篇分析。今天,我们先分析1/2的问题。JDBC-BasedJobStore涉及的表JDBC-BasedJobStore涉及的表(省略前缀qrtz_):JOBDETAILS:作业信息TRIGGERS:触发器信息SIMPLE_TRIGGERS:简单触发器信息CRON_TRIGGERS:Cron触发器信息FIRED_TRIGGERS:触发触发器SCHEDULER_STATE:调度服务状态LOCKS:ThelockJob和Trigger的注册过程我们在上一篇文章中已经说过,RAMJobStore和JDBC-BasedJobStore的区别主要在于存储方式,一种是存储在内存中,一种是存储在数据库中。Job注册后,存储在JOBDETAILS表中,内容和内存中存储的基本一致,主要包括sched_name、name、group、description、job_class_name、job_data等。Trigger注册后存储在TRIGGERS表中,内容与内存中存储基本一致,主要包括sched_name、name、group、job_name、job_group、trigger_state、trigger_type、start_time、end_time、calendar_name、misfire_instr、job_data等.trigger_state初始写入后的状态为WAITING(等待被触发)。trigger_type包括:SIMPLE:SimpleTriggerCRON:CronTriggerCAL_INT:CalendarIntervalTriggerDAILY_I:DailyTimeIntervalTriggerBLOB:AgeneralblobTrigger无论是jobdetails还是trigger表,都有一个sched_name字段,记录了当前任务调度器的名字,sched_name取自配置文件Value(org.quartz.scheduler.instanceName)。调度器的名字sched_name,其实就是运行任务调度服务的服务器的ID,也就是当前运行quartz的服务的ID。在集群环境下,不同的服务必须指定不同的sched_names。我们将在其他文章中讨论石英簇的细节。分析。Trigger注册还涉及到其他表:如果当前Trigger处于GroupPaused状态,则该Trigger也会写入paused_trigger_grps表。SimpleTrigger会写入Simple_triggers表,CronTrigger会写入Cron_triggers表,负责记录它们的特殊属性;Simple_triggers记录了repeat_count/repeat_interval/times_triggered等信息,Cron_triggers记录了cron表达式。作业的调度调度任务的执行逻辑在之前的文章中已经反复分析过:从作业执行线程池中获取availThreadCount,即当前可用线程数,调用JobStore的acquireNextTriggers方法,获取具体的短时间(idleWaitTime,默认30秒))可能需要触发的,不超过availThreadCount的trigger数调用JobStore的triggersFired方法对获取到的可能需要触发的trigger进行二次处理,得到最终的结果集triggers再次触发进行循环处理,最终处理的trigger结果集中每一个需要触发的trigger,使用JobRunShell将trigger包装为一个绑定的Job,发送到线程池中执行job。JobStoreSupport#acquireNextTriggersJobStoreSupport是JDBC-BasedJobStore的一个抽象类,有两个实现类JobStoreCMT和JobStoreTX,这两个实现类的主要作用是管理事务,大部分业务逻辑其实都是在JobStoreSupport中实现的,包括acquireNextTriggers和triggersFired方法.acquireNextTriggers方法首先从Triggers表中获取符合条件的trigger:30秒内的nextFireTime(参数idleWaitTime设置),状态为WAITING,一定数量(参数设置的一次处理的trigger数量,或者可用执行次数线程)触发器。对于获取到的每一个trigger:从JobDetails表中获取绑定的Job,判断Job的ConcurrentExectionDisallowed属性,进行并发控制(逻辑同RAMJobStore)。Triggers表中的当前触发器状态由WAITING变为ACQUIRED触发器写入fired_triggers表,状态为ACQUIREDJobStoreSupport#triggersFired再次从triggers表中获取当前触发器,并验证其状态是否为ACQUIRED,不如果状态不正确,则进行处理。从job_details表中获取当前trigger绑定的jobs。将fired_triggers表中的当前触发器状态更新为EXECUTING。调用trigger的triggered方法获取当前trigger的下一次触发时间。更新triggers表中的当前触发状态(WAITING)、下一次触发时间等数据。作业执行#JobRunShell通过以上步骤,作业调度线程已经获取到了需要执行的trigger,接下来需要执行最后一步:用JobRunShell包装trigger,送入线程池执行关联的job与触发器。我们需要对执行这个JobRunShell做一个简单的了解。JobRunShell实例负责为Job的运行提供“安全”环境,并负责执行执行Job的所有工作,捕获任何抛出的异常,用Job的完成代码更新Trigger等。JobRunShell实例是由JobRunShellFactory代表QuartzSchedulerThread创建,当调度程序确定已触发作业时,QuartzSchedulerThread然后在配置的ThreadPool的线程中运行shell。JavaDoc非常清楚地说JobRunShell实际上负责运行作业,而SimpleThreadPool只是提供运行作业的线程。一个任务交上来后(交上来的其实是持有job对象的JobRunShell对象)SimpleThreadPool负责分配一个执行线程,然后使用这个线程来运行任务。我们在之前分析SimpleThreadPool的文章中已经说过,线程池的执行线程WorkThread有一个Runable接口的对象。任务被触发后,Job会被封装到这个Runable对象中,然后交给WorkThread以新的线程去执行Runable对象。这个Runable对象就是JobRunShell,JobRunShell实现了Runable接口。所以我们从JobRunShell的run方法开始。JobRunShell#runJobRunShell在初始化的时候封装了一个JobExecutionContextImpl对象jec,里面包含了Job、Trigger、Scheduler等相关对象。从jec获取作业并调用作业的执行方法。这里实际上调用了我们应用层的Job接口对象的execute方法。实际上,我们的业务逻辑是被调用和执行的。.job调用执行完成后,回调QuartzScheduler的notifyJobStoreJobComplete方法,通知调度器当前trigger执行完毕。QuartzScheduler的notifyJobStoreJobComplete方法调用了JobStore的triggeredJobComplete方法。JobStoreSuppor#triggeredJobComplete根据触发器的执行更新触发器表中触发器的当前状态。如果trigger的nextFireTime不为空,则更新为WAITING,等待下一次trigger。如果为空,则更新为COMPLETED,任务执行完成。此外还有hang、error等其他状态,当前trigger从fired_triggers表中删除。TriggerStateTrigger的状态与JobStore无关。也就是说无论你使用基于内存的JobStore还是基于数据库的JobStore,对于Trigger状态的管理逻辑都是一样的。Trigger的状态包括:WAITING:初始化状态,等待调度ACQUIRED:被调度器获取,等待触发EXECUTING:执行COMPLETE:执行完成,不再调度BLOCKED:阻塞ERROR:错误PAUSED:暂停/暂停PAUSED_BLOCKED:Suspend-BlockDELETED:删除Trigger注册时,如果在pendinglist(qrtz_paused_trigger_grps)中,则状态为PAUSED,否则状态为WAITING。Trigger被调度器调度后(状态为WAITING,nextFiredTime在30秒以内),状态为ACQUIRED,当被作业执行线程调用时,状态变为EXECUTING。Job执行完成后,如果Trigger不再需要执行(执行次数或执行时间达到设置要求),则状态为COMPLETE。设置为ConcurrentExectionDisallowed的job被scheduler执行后,当前job绑定的其他Trigger的状态:WAITING->BLOCKEDACQUIRED->BLOCKEDPAUSED->PAUSED_BLOCKEDjob被scheduler调度执行后,当前Trigger的状态:如果当前Trigger被触发,如果nextFiredTime不为空,状态设置为WAITING,等待下一次触发;否则设置为COMPLETE,触发器完成任务JobRunShell完成作业执行,并通过调用notifyJobStoreJobComplete方法通知JobStore触发器完成作业调度。如果当前Trigger绑定的job设置为ConcurrentExectionDisallowed,则设置当前job绑定的其他Trigger的状态(做与job执行相反的操作,将这些trigger恢复到正常状态):BLOCKED->WAITINGPAUSED_BLOCKED->PAUSED我们可以调用调度器的pauseTrigger方法挂起/挂起当前任务,调用后触发器的状态:WAITING->PAUSEDACQUIRED->PAUSEDBLOCKED->PAUSED_BLOCKED挂起的任务被重新调度后,状态恢复。总结至此,本文开头设定的任务已经完成,接下来的两个问题将在下一篇文章中继续。多谢!PreviousQuartz相关线程NextQuartz-基于JDBC的JobStore事务管理和锁定机制