当前位置: 首页 > 后端技术 > Java

面试官:说说线程池中的锁

时间:2023-04-01 18:30:46 Java

最近有个读者跟我说,面试的时候讨论线程池的时候,我们聊的还不错,基本都答对了,但是有一个问题直接把他搞糊涂了。面试官问他:说说线程池中的锁。结果他对线程池的认识其实是在各种博客或者面经上找的。他自己没有看过源码,所以压根没有关注线程池中锁的存在。他还跟我吐槽:他说这话的时候,我也觉得大家讲线程池的时候,都不太讲里面用到的锁。的确,存在感很低。我怎么安排?mainLock其实在线程池中还有很多地方用到了锁。比如前面说过,线程池中有一个叫workers的变量,它存放的东西可以理解为线程池中的线程。而这个对象的数据结构是HashSet。HashSet不是线程安全的集合类,你知道吗?所以,你可以看看上面的评论是什么:它只能在持有mainLock时访问。就算我不介绍,你看名字也能感觉到:如果没有猜到,那么mainLock应该是一把锁。是真的吗?如果是,那是什么锁?在源码中,变量mainLock就在workers上面:原来它的真身是一个ReentrantLock。使用ReentrantLock保护HashSet没有错。那么ReentrantLock和worker是如何协同工作的呢?我们以最关键的addWorker方法为例:如果使用了锁,那么肯定有东西需要独占。再看看,你锁定独占一个共享资源,你想干什么?大多数情况下,肯定是要改,往里面塞东西吧?那么你按照这个思路分析一下,addWorker独占锁里面包裹的代码到底是什么?其实不用分析,里面只有两条共享数据。两者都需要写,而且两者共享数据,一个是workers对象,一个是largestPoolSize变量。前面我们说过,worker的数据结构是线程不安全的HashSet。largestPoolSize是多少,为什么它被锁定?该字段用于记录线程池中曾经出现过的最大线程数。包括读这个值的时候加了mianLock锁:其实我个人觉得在这个地方把lar??gestPoolSize变量修改成volatile可以省去mainLock的加锁操作。它也是线程安全的。不知道你有没有同感?如果你也这么想,对不起,你错了。线程池中的许多其他字段使用volatile:为什么不使用largestPoolSize?再看前面getLargestPoolSize方法获取值的地方。如果改成volatile不加锁,mainLock.lock()就少了一次操作。如果去掉这个操作,可能会少一个阻塞等待操作。假设addWorkers方法还没有来得及修改largestPoolSize的值,一个线程调用getLargestPoolSize方法。由于没有阻塞,直接获取的值只是那一刻的largestPoolSize,不一定是addWorker方法执行后的阻塞,程序可以感知到largestPoolSize可能在变化,所以获取的值一定是addWorker方法的执行后的largestPoolSize。所以我理解了加锁就是最大程度保证这个参数的准确性。除了上面说到的地方,还有很多地方用到了mainLock:我就不一一介绍了,大家自己去看看吧,这个东西介绍起来不是很有意思,大家可以看下一眼代码。说点有意思的。你有没有想过,为什么DougLea先生用一个线程不安全的HashSet来用ReentrantLock来实现线程安全?为什么不直接创建一个线程安全的Set集合,比如Collections.synchronizedSet这个东西?答案其实之前已经出现过,只是我没有具体说出来,大家也没有注意到。mainLock的注释上写着:关键点我给你说说。首先看这句话:虽然我们可以使用某种并发集,但事实证明通常更喜欢使用锁。这句话是倒装句,应该没有生词,大家都懂的。其中之一是它原来是。让我介绍一下。这是美剧对话中经常出现的一句话。翻译过来就是“事实证明”四个字。所以,上面的整句话是这样的:虽然我们可以使用某种并发安全的集合,但事实证明,锁通常更好用。接下来老哥就解释一下为什么用锁比较好。我翻译这句话的意思是,我不是在胡说八道,都是有根据的,因为这就是老哥亲自解释为什么不使用线程安全的Set集合的原因。第一个原因如下:其中一个原因是这个序列化了interruptIdleWorkers,避免了不必要的中断风暴,尤其是在shutdown的时候。否则退出线程将同时中断那些尚未中断的线程。英文是的,我翻译成中文,加上我自己的理解是这样的。首先,第一句有“serializesinterruptIdleWorkers”。这两个词的组合还是有些让人摸不着头脑。这里的serializes不是指我们Java中的序列化操作,需要翻译成“序列化”。interruptIdleWorkers,这个东西根本就不是一个字,它是线程池中的一个方法:这个方法中首先要做的是拿mainLock锁,然后尝试做中断线程的操作。由于mainLock.lock的存在,调用该方法的多个线程被serializes序列化。序列化有什么好处?这就是我后面说的:避免不必要的中断风暴(interruptstorms),尤其是在调用shutdown方法的时候,防止退出的线程再次打断那些没有被中断的线程。为什么这里要特别提到shutdown方法呢?因为shutdown方法调用了interruptIdleWorkers:那么上面是什么意思呢?这里需要用到反证法。假设我们正在使用没有mainLock的并发安全的Set集合。此时调用shutdown方法的线程有5个。由于没有使用mainLock,所以没有阻塞,每个线程都会运行interruptIdleWorkers。所以会出现第一个线程发起中断,导致worker,也就是线程正在被中断。第二个线程又来发起中断了,于是又针对正在被打断的中断再次发起中断。好吧,这有点像绕口令。所以我再重复一遍:对于正在被打断的中断,发起一个中断。所以这里使用锁来避免中断风暴的风险。在并发时,只有一个线程可以发起中断操作,所以锁是必要的。在加锁的前提下,无论如何Set集合都会被加锁,所以根本不需要并发安全的Set。于是我明白了,这里使用了mainLock来实现序列化,同时保证Set集合不会被并发访问。只要确保此Set操作被锁包裹,就不需要并发安全的Set集合。也就是注释里写着:AccessedonlyundermainLock。请记住,它可能会被测试。然后,老哥说的第二个原因:还简化了一些largestPoolSize等相关的统计记账,这句话是关于锁住和维护largestPoolSize这个参数的,就不细说了。哦,对了,还有一个etc,就是“如此这般”的意思。这个etc指的是completedTaskCount参数,道理是一样的:除了前面提到的mainLock之外,线程池中其实还有一个经常被大家忽略的锁。那就是Worker对象。可以看到Worker继承自AQS对象,它的很多方法也都和锁有关。同时它也实现了Runnable方法,所以到底是一个封装好的线程,用来运行提交到线程池的任务,没有任务的时候去队列取或者poll等待,badluck被回收了。我们看一下被锁住的地方,在非常关键的runWorker方法中:java.util.concurrent.ThreadPoolExecutor#runWorker那么问题来了:这里是线程池中的线程,它正在执行提交的逻辑任务地点,为什么要锁?为什么这里是另外一个锁,而不是使用现有的ReentrantLock,即mainLock?答案还是写在纸条上:我知道你会瞬间对这么长的一段英文失去兴趣。不过别慌,我带你慢慢咀嚼。第一句话开门见山:ClassWorker主要维护线程运行任务的中断控制状态。worker类存在的主要意义是维护线程的中断状态。维护的线程不是普通的线程,而是运行任务的线程,也就是正在运行的线程。这个“保持线程的中断状态”怎么理解呢?如果您查看Worker类的lock和tryLock方法,就会发现只有一个地方可以调用它们。前面我们说过,lock方法是在runWorker方法中调用的。这里调用了tryLock方法:这个方法也是我们的老朋友了,刚才说了,是用来中断线程的。什么类型的线程被中断了?是等待任务的线程,也就是这里等待的线程:java.util.concurrent.ThreadPoolExecutor#getTask也就是说:正在执行任务的线程不应该被中断。线程池如何知道哪些任务正在执行,哪些任务不应该被中断?我们看一下判断条件:关键条件其实是w.tryLock()方法。那么看看tryLock方法中的核心逻辑:核心逻辑是一个CAS操作,将某个状态从0更新为1,如果成功,则说明tryLock成功。“0”和“1”分别是什么?注意,答案还在注释中:因此,tryLock中的核心逻辑compareAndSetState(0,1)是加锁操作。如果tryLock失败,为什么?肯定是此时state已经是1了。那么state什么时候变成1呢?一种情况是在执行lock方法时,它还会调用tryAcquire方法。锁是什么时候上的?在runWorker方法中,当获取到任务并准备执行时。也就是说status为1的worker一定是执行任务的线程,不能被中断。此外,状态的初始值设置为-1。我们可以写一个简单的代码来验证以上三种状态:首先我们定义一个线程池,然后调用prestartAllCoreThreads方法对所有线程进行预热,使其处于等待接收任务的状态。你这时候说,三个worker是什么状态?那必须是0,解锁状态。当然,你也可能会看到这样的情况:-1是从哪里来的?别着急,等会儿再告诉你,我们看看1在哪里?根据前面的分析,我们只需要提交一个任务给线程池即可:这时候,如果我们调用shutdown,会发生什么情况呢?当然,空闲线程被中断了。执行任务的线程呢?因为是while循环,任务执行完成后,会再次调用getTask方法:getTask方法会先判断线程池状态。这个时候可以感知到线程池关闭,返回null,worker会默默退出。好了,前面说了这么多,你只需要记住一个大前提:自定义worker类的大前提是保持中断状态,因为正在执行任务的线程不应该被中断。然后往下看注释:我们实现了一个简单的不可重入互斥锁,而不是使用ReentrantLock,因为我们不希望worker任务在调用setCorePoolSize这样的池控制方法时能够重新获取锁。这就解释了为什么老头不使用ReentrantLock,而是选择自己创建一个worker类。因为他要的是一个不能重入的互斥量,而ReentrantLock是可重入的。从前面的分析也可以看出,这个方法是一个不可重入的方法:传入的参数根本没有被使用,代码中也没有累加逻辑。如果你还没有意识到这是怎么回事,我给你看一下ReentrantLock中的可重入逻辑:你有没有看到有一个累加过程。当锁被释放时,有一个相应的递减过程。当减为0时,表示当前线程已经成功释放锁:而上面的加减逻辑在worker类中都是没有的。那么问题又来了:如果是可重入的怎么办?目的还是和以前一样:不想打断正在执行任务的线程。同时在评论中提到了一个方法:setCorePoolSize。巧合的是,之前写线程池动态调整的时候重点关注了这个方法:可惜当时主要讲的是delta>0里面的逻辑。现在让我们看看我陷害的地方。workerCountOf(ctl.get())>corePoolSize为真什么意思?显示当前的worker数量超过了我要重置的corePoolSize,需要减少一点。如何减少呢?调用interruptIdleWorkers方法。前面我们刚刚分析了这个方法,我把它拿出来一起看看:里面有一个tryLock,如果是可重入的,会发生什么?是否可以打断正在执行的worker?这合适吗?好吧,注释上的最后一句话:此外,为了在线程实际开始运行任务之前抑制中断,我们将锁定状态初始化为负值,并在启动时将其清除(在runWorker中)。这句话是针对线程在真正开始运行任务之前抑制中断。所以将worker的状态初始化为一个负数(-1)。大家要注意这一点:启动时清除(在runWorker中)。启动时清除,这是一个负值状态。老哥很贴心,把所有的方法都给你指出来了:在runWorker中。所以你看一下runWorker就知道为什么先unLock操作,后面是allowinterrupts的注释:因为在这个地方,worker的状态可能还是-1,所以先unLock,再把status重置为0.同时也解释了前面没有解释的-1是从哪里来的:你想明白吗,-1是从哪里来的?workers.add方法一定是在启动过程中执行的,但是还没有来得及执行runWorker方法的worker对象状态为-1。最后说一句,我看到了这个,喜欢并安排一个。写文章很累,需要一点积极的反馈。为所有读者朋友敲出一个: