作者:Lavender\来源:https://segmentfault.com/a/11...1.龙头公司前期转用quartz进行任务调度,日调度量超过200万次.随着调度量的增加,突然开始出现作业重复调度的情况,而且没有规律可循。网上也没有比较明确的解决办法,于是开始调试Quartz源码,终于找到了问题所在。如果没有耐心看源码分析,可以直接拉到文末,有直接简单的解决方法。注:本文使用的quartz版本为2.3.0,使用JDBC方式存储Job。2.准备工作首先,由于本文是代码级别的分析文章,所以需要提前了解Quartz的用途和用法。网上还是有很多不错的文章的,大家可以提前自己了解一下。其次,除了用法之外,我们还需要了解Quartz框架的一些基本概念:1)Quartz调用triggerjobfire。TRIGGER_STATE是当前触发器的状态,PREV_FIRE_TIME是上次触发时间,NEXT_FIRE_TIME是下一次触发时间,misfire是指job在某个时刻要触发的情况,但是因为某种原因没有触发。2)Quartz在运行时,会有两种线程(两种以上),一种是调度线程(单线程),用于调度作业,另一种是工作池,用于执行具体业务工作。3)在Quartz自带的表中,本文主要涉及以下三个表:triggers表。触发器表记录了触发器的PREV_FIRE_TIME(上次触发时间)、NEXT_FIRE_TIME(下一次触发时间)和TRIGGER_STATE(当前状态)。尽管并不详尽,但这些是本文中使用的唯一内容。锁表。Quartz支持分布式,即会出现多个线程同时抢占同一个资源的情况,Quartz就是依靠这张表来处理这种情况。具体怎么做,见3.1。fired_triggers表记录有关正在触发的触发器的信息。4)TRIGGER_STATE,即触发器的状态,主要包括以下几类:触发器初始状态为WAITING,处于WAITING状态的触发器等待触发。调度线程会不断扫描triggers表,根据NEXT_FIRE_TIME提前拉取即将触发的trigger。如果触发器被调度线程拉取,它的状态将变为ACQUIRED。因为触发器被提前拉动,还没有到触发器真正的触发时间,所以调度线程会一直等到真正的触发时间,然后将触发器状态从ACQUIRED变为EXECUTING。如果触发器不再执行,则将状态更改为COMPLETE,否则为WAITING,并开始新的循环。如果这个循环的任何部分抛出异常,触发器的状态将变为ERROR。如果触发器被手动暂停,状态将变为PAUSED。3、开始排查3.1分布式状态下的数据访问上面说了触发器的状态是保存在数据库中的,而Quartz是支持分布式的,所以如果启动了多个quartz服务,就会有多个调度线程去抢夺并触发相同的触发器。mysql默认执行select语句,并没有加锁。如果多个调度线程同时抢到同一个trigger,会不会导致trigger被重复调度?我们来看看Quartz是如何解决这个问题的。首先我们看一下JobStoreSupport类的executeInNonManagedTXLock()方法:官方对该方法的介绍:已经存在(托管)。**@paramlockName要获取的锁的名称,例如*“TRIGGER_ACCESS”。如果为null,则不获取锁,但*lockCallback仍在事务中执行。*/也就是说,传入的回调方法执行时携带指定的锁,启动事务。注释还提到lockName是指定锁的名称。如果lockName为空,则回调方法的执行不在锁保护下,但仍在业务中。这意味着我们在使用这个方法的时候,不仅可以保证事务,还可以选择保证回调方法的线程安全。接下来我们看一下executeInNonManagedTXLock(...)中的obtainLock(conn,lockName)方法,也就是抢锁的过程。该方法定义在信号量接口中。Semaphore接口通过锁定线程或资源来保护资源不被其他线程修改。由于我们的调度信息是保存在数据库中的,现在查看DBSemaphore.java中的obtainLock方法的具体实现:我们通过调试查看expandedSQL和expandedInsertSQL这两个变量:从图3-3可以看出,obtainLock方法使用locks表中的行锁(由lockName决定),保证回调方法的事务和线程安全。拿到锁后,obtainLock方法将lockName写入threadlocal。当然,在releaseLock的时候,lockName会从threadlocal中删除。总而言之,executeInNonManagedTXLock()方法保证了在分布式情况下,同一时刻只有一个线程可以执行该方法。3.2quartzQuartzSchedulerThread的调度过程是调度线程的具体实现。图3-4是线程run()方法的主要内容。图中只提到了正常情况,即流程没有异常时的处理。过程。从图中可以看出,调度过程主要分为以下三个步骤:1)拉取待触发的触发器:调度线程会在一定的时间窗口内拉取即将触发的触发器信息,并在一次一定数量的距离。那么,如何确定时间窗口和数量信息呢,我们来看看下面几个参数:idleWaitTime:默认30s,可以通过配置属性org.quartz.scheduler.idleWaitTime来设置。availThreadCount:获取可用(空闲)工作线程的数量,它将始终大于1,因为此方法将始终阻塞,直到有工作线程空闲为止。maxBatchSize:一次最多拉取的trigger数,默认为1,可以通过org.quartz.scheduler.batchTriggerAcquisitionMaxCount改写batchTimeWindow:时间窗口调整参数,默认为0,可以通过org.quartz.scheduler改写.batchTriggerAcquisitionFireAheadTimeWindowmisfireThreshold:超过这个时间还没有被触发的trigger被认为misfired,默认60s,可以通过org.quartz.jobStore.misfireThreshold设置。调度线程将一次拉取NEXT_FIRE_TIME小于(now+idleWaitTime+batchTimeWindow)且大于(now-misfireThreshold)的min(availThreadCount,maxBatchSize)个触发器。默认情况下,它会在接下来的30秒内扣动扳机,而在过去的60秒内不会开火。1个触发器。然后将这些触发器的状态从WAITING更改为ACQUIRED,并将它们插入到fired_triggers表中。2)Trigger触发器:首先我们会检查每个触发器的状态是否为ACQUIRED,如果是,则将状态改为EXECUTING,然后更新触发器的NEXT_FIRE_TIME,如果这个触发器的NEXT_FIRE_TIME为空,即为它以后不会再触发了,把它的状态改成COMPLETE就可以了。如果触发器不允许并发执行(即Job的实现类标有@DisallowConcurrentExecution),则将状态变为BLOCKED,否则将状态变为WAITING。3)将触发器打包丢给工作线程池:遍历触发器,如果??其中一个触发器在第二步出错,即返回值有异常或null,一些触发器表和内容fired_triggers表中的fired_triggers会被修正并跳过这个trigger,继续查看下一个。否则,根据触发信息实例化JobRunShell(实现Thread接口),根据JOB_CLASS_NAME实例化Job,然后我们把JobRunShell实例丢到工作线程中。在JobRunShell的run()方法中,Quartz会在执行job.execute()前后通知绑定的监听器。如果在job.execute()执行过程中抛出异常,则执行结果jobExEx会保存异常信息,否则如果没有抛出异常,则jobExEx为null。然后根据jobExEx的不同,得到不同的执行指令instCode。JobRunShell将触发器信息、作业信息和执行指令传递给triggeredJobComplete()方法,完成最后的数据表更新操作。比如job执行过程中抛出异常,将trigger状态改为ERROR,如果是BLOCKED则改为WAITING等,最后从fired_triggers表中删除已经执行的trigger。请注意,这些是在工作线程池上异步完成的。3.3故障排查在上一篇文章中,我们可以看到Quartz的调度过程中有3种(可选的)加锁行为。为什么叫可选呢?因为这三个步骤都在executeInNonManagedTXLock方法的保护下,但是executeInNonManagedTXLock方法可以通过设置传入参数lockName为空来取消锁。翻阅代码,我们看到第一步是拉动要触发的触发器:如果(isAcquireTriggersWithinLock()||maxCount>1){lockName=LOCK_TRIGGER_ACCESS;锁定}else{锁名=null;}returnexecuteInNonManagedTXLock(lockName,newTransactionCallback>(){publicList
>(){//省略},newTransactionValidator
>(){//省略});}通过调试,它发现isAcquireTriggersWithinLock()的值为false,所以传入的lockName为null。我在代码中添加了一个日志,以便更清楚地看到过程。从图3-5可以清楚的看到,当扣动要触发的扳机时,默认是解锁。如果这个默认配置有问题,重复调度的问题不会经常出现吗?其实不是,原因是Quartz默认采用了乐观锁,也就是说允许多个线程同时拉同一个触发器。让我们看看当触发触发是调度过程的第二步时,Quartz做了什么。注意此时是锁定的:protectedTriggerFiredBundletriggerFired(Connectionconn,OperaableTriggertrigger)throwsJobPersistenceException日历cal=null;//确保触发器没有被删除、暂停或完成...try{//如果触发器被删除,状态将为STATE_DELETEDStringstate=getDelegate().selectTriggerState(conn,trigger.getKey());如果(!state.equals(STATE_ACQUIRED)){返回null;}}catch(SQLExceptione){thrownewJobPersistenceException("无法选择触发状态:"+e.getMessage(),e);}如果调度线程发现当前trigger状态为NotACQUIRED,也就是说如果这个trigger被其他线程触发,则返回null。在3.2中,我们提到在调度过程的第三步,如果发现某个触发器第二步的返回值为null,则跳过第三步,取消开火。一般情况下,乐观锁可以保证不会出现重复调度,但是难免会出现ABA问题。我们看一下重复调度时的日志:第一步,Quartz在拉取符合条件的触发器,它们的状态从WAITING变为ACQUIRED之间有超过9ms的停顿,另一台服务器利用这9ms的间隙完成WAITING-->ACQUIRED-->EXECUTING-->WAITING(即一个完整的状态变化周期的整个过程),如图3-6所示。3.4解决方案如何解决这个问题?在配置文件中添加org.quartz.jobStore.acquireTriggersWithinLock=true,这样在调度过程的第一步,也就是拉取要触发的trigger的时候,是锁定的,即不会有多个多个线程同时拉动同一个触发器,避免了重复调度的危险。3.5经验调查过程并非一帆风顺。经历了一些坑之后,也有一些非技术的体会:1)学习是一种能力,需要不断打磨和修正。个人为了学习Quartz,一开始就把一个2.4MB的源码过一遍,很无头,效率很低,所以立马换个方向,先了解一下这个框架的运行方式,它是干什么的,有哪些模块,怎么做,然后找到主线,把相关源码翻过来。之后在一次次的使用中,遇到问题再去翻翻之前没有看过的源码,就会越来越顺手。以前听过其他同事的学习方法,感觉不是完全适合自己。可能每个人的状态和经历不同,学习的方法也略有不同。在平时的学习中,要感受自己的学习效率,参考建议,尝试,感受效果,提高,你就会越来越清楚自己适合做什么。在这里非常感谢我的师傅,用短短的几句话帮我把调度过程打通了,让我阅读源码的难度没有那么大了。2)质疑“经验”和“应该”,惯性思维会蒙蔽双眼。在大型代码中很容易被习惯混淆。一开始我们看到加锁的方法,觉得这个加锁的技术很棒。这种方法是为了解决并发问题,“应该”加锁,加锁就不会出现并发问题。怎么可能跟数据库交互几次都加锁,突然某段时间不加锁呢?直到看到要触发的触发方法,我才觉得不对劲,敲了下日志,才发现居然解锁了。3)日志很重要。虽然我们可以调试,但是没有日志,我们无法发现并证明程序存在ABA问题。4)最重要的是,不要害怕问题。即使使用像Quartz这样的大型框架,解决问题也不一定需要对2.4MB的源代码有透彻的了解。只要有时间,问题都能解决,但是好的技巧可以缩短这个时间,需要在实战中磨练自己的技巧。近期热点文章推荐:1.1000+Java面试题及答案(2022最新版)2.厉害了!Java协程来了。..3.SpringBoot2.x教程,太全面了!4.不要用爆破爆满画面,试试装饰者模式,这才是优雅的方式!!5.《Java开发手册(嵩山版)》最新发布,赶快下载吧!感觉不错,别忘了点赞+转发!
