当前位置: 首页 > 科技观察

10分钟搞定Java并发队列好吗?好

时间:2023-03-18 00:42:23 科技观察

前言如果按照用途和特点粗略划分,JUC包中包含的工具大致可以分为6类:Executors和线程池Concurrentqueue同步工具主要讲解executor和线程池,同步工具,以及锁。在分析源码的时候,或多或少都会提到“队列”。队列在JUC中也以各种方式存在,所以本文以“高瞻远瞩”的视角,帮助大家快速认识和辨别这些看似“凌乱”的队列。Java中的并发队列按照其实现方式可以分为两种:阻塞队列和非阻塞队列。如果你看过并发系列锁的实现,你就可以知道它们实现的区别:前者是基于锁实现的,后者是基于CAS非阻塞算法实现的。常见的队列如下:瞬间糊涂?看到这惨无人道的画面就想直接离开?不要担心是否客观,过一会儿就会清楚。现在你可能会有一个疑问:为什么会有这么多队列?锁有各种情况的锁,队列自然有各种情况的队列吧?这也意味着单一责任原则?那么我们需要了解这些队列是如何设计的呢?它们在哪里使用?先看下图。如果你在IDE中打开上面的非阻塞队列和阻塞队列,查看它们的实现方法,你会发现阻塞队列比非阻塞队列多支持两个操作:阻塞插入当队列满时,队列会阻塞插入元素直到队列未满的线程阻塞移除当队列为空时,获取元素的线程将被阻塞,直到队列变为非空。入队/退出操作的综合描述。看似乱七八糟的方法,可以用一张表来概括。当队列满时,如果此时向队列中插入元素,会抛出IllegalStateException(这个很好理解)当队列为空时,如果此时从队列中获取元素,会抛出NoSuchElementException(这个也很容易理解)并返回一个特殊的值当向队列中插入一个元素时,返回该元素是否插入成功,如果成功,则返回true当从队列中移除一个元素时,如果不成功,返回null和保持阻塞当队列满时,如果生产者线程向队列中放入元素,队列会一直阻塞生产者线程,直到队列可用或响应中断退出当队列为空时,如果消费者线程从队列中取元素,队列会阻塞消费者线程,直到队列不为空关于阻塞,我们已经充分实现了并发编程中的等待通知机制已经解释过了,你还记得下面这张图吗?原理其实是一样的。超时退出和锁一样。因为有阻塞,为了灵活使用,必须支持超时退出。当阻塞时间到达超时时间后,会直接返回。至于我不知道为什么插入和删除了这么多词表示。为了方便记忆,只需要记住blockedmethod形式:put这个词和take的字母t开头连在一起,一个是put,另一个是taken。Java并发队列你应该有一个初步的了解,看似凌乱的方法看似有规律接下来,就是疯狂系列的知识点时间了。有了前面几章的知识,分分钟就能搞懂所有的队列。ArrayBlockingQueue之前也说过,JDK中的命名还是很有讲究的。看名字,底层是数组实现。是否有界取决于构建时是否需要指定容量值。填鸭式的描述也很容易忘记。你在哪里看到这些的?在所有队列的Java文档的第一段中,一句话是总结了队列的主要特点,所以强烈建议大家在看源码的时候,可以简单的看一下文档的开头部分,以及你会想到一半以上的数字。在讲JavaAQS队列同步器和ReentrantLock的应用的时候,我们介绍了公平锁和非公平锁的概念,ArrayBlockingQueue也有相同的概念,看它的构造方法,有ReentrantLock辅助实现publicArrayBlockingQueue(intcapacity,booleanfair){if(capacity<=0)thrownewIllegalArgumentException();this.items=newObject[capacity];lock=newReentrantLock(fair);notEmpty=lock.newCondition();notFull=lock.newCondition();}默认还是不能保证线程会公平的访问队列(公平是指被阻塞的线程是否可以跟进blocked顺序访问队列,先阻塞行访问,再阻塞后面的访问)到这里,我也想临时问一个在面试中多次说过的子问题:非公平锁方法为什么默认使用?与公平锁方式相比有何不同?好处,又会带来什么问题?知道了以上内容,再结合上表中的方法,ArrayBlockingQueue就可以轻松过关了。数组的对立面自然是链表。LinkedBlockingQueueLinkedBlockingQueue也是一个有界阻塞队列。从下面的构造函数你还可以看到队列的默认和最大长度是Integer.MAX_VALUE,这就是为什么文档说optional-boundedpublicLinkedBlockingQueue(){this(Integer.MAX_VALUE);}publicLinkedBlockingQueue(intcapacity){if(capacity<=0)thrownewIllegalArgumentException();this.capacity=capacity;last=head=newNode(null);}和Java集合一样,链表形式的队列访问效率更好比数组队列形式的高但是在一些并发程序中,数组形式的队列由于其可预测性,在某些场景下可以取得更高的效率。LinkedBlockingQueue是不是很熟悉?为什么要使用线程池?创建单个线程池publicstaticExecutorServicenewSingleThreadExecutor(){returnnewFinalizableDelegatedExecutorService(newThreadPoolExecutor(1,1,0L,TimeUnit.MILLISECONDS,newLinkedBlockingQueue()));}创建固定数量的线程池publicstaticExecutorServicenewFixedThreadsPool(ExreadThreadtnThreads)遇到过很多次,nThreads,0L,TimeUnit.MILLISECONDS,newLinkedBlockingQueue());}面试分题又来了。使用Executors创建线程池非常简单。为什么厂商严格要求禁用这种创建方式?PriorityBlockingQueuePriorityBlockingQueue是一个支持Priority的无界阻塞队列,默认按自然顺序升序排列。当然,也有非默认的情况需要自定义优先级排序。Comparator自然是用来定义排序规则的。可以定义优先级,自然也有相应的限制和使用注意事项,根据上图,队列中不允许有空值,不允许不能排序的元素。对于排序值相同的元素,不保证顺序,但可以继续自定义其他可以区分优先级的值。如果你有严格的优先级区分,建议有一个更完整的比较规则,就像Java文档classFIFOEntry>implementsComparable>{staticfinalAtomicLongseq=newAtomicLong(0);finallongseqNum;finalEentry;publicFIFOEntry(Eentry){seqNum=seq.getAndIncrement();this.entry=entry;}publicEgetEntry(){returnentry;}publicintcompareTo(FIFOEntryother){intres=entry.compareTo(other.entry);if(res==0&&other.entry!=this.entry)res=(seqNum)other;longdiff=time-x.time;if(diff<0)return-1;elseif(diff>0)return1;elseif(sequenceNumber0)?1:0;}上面的代码是哪里来的?如果打开ScheduledThreadPoolExecutor中的ScheduledFutureTask,就会看到(ScheduledThreadPoolExecutor内部是应用DelayQueue)所以一般来说,以下两种情况非常适合设计DelayQueue缓存系统:使用DelayQueue来保存有效性缓存元素的周期,并使用线程循环遍历DelayQueue。如果可以从DelayQueue中获取到元素,则说明缓存有效期已经到了定时任务调度:使用DelayQueue保存当天要执行的任务和时间。如果可以从DelayQueue中获取到元素,任务就可以开始执行比如TimerQueue就是这样实现的。SynchronousQueue是一个不存储元素的阻塞队列。如果它不存储元素,它也被称为队列。对,SynchronousQueue直译过来就是同步队列。如果长时间在队列中,应该算是“异步”了。所以使用它,每一次put()操作都必须等待一次take()操作,反之亦然,否则无法继续添加元素。实践中如何使用呢?如果你需要同步两个线程之间的共享变量,如果你不使用SynchronousQueue你可能会选择使用CountDownLatch来完成,像这样:1);Runnableproducer=()->{IntegerproducedElement=ThreadLocalRandom.current().next();sharedState.set(producedElement);countDownLatch.countDown();};Runnableconsumer=()->{try{countDownLatch.await();IntegerconsumedElement=sharedState.get();}catch(InterruptedExceptionex){ex.printStackTrace();}};这种小事用计数器来实现,显然是不妥的。如果用一个SynchronousQueue来改造一下,瞬间感觉就不一样了。ExecutorServiceexecutor=Executors.newFixedThreadPool(2);SynchronousQueuequeue=newSynchronousQueue<>();Runnableproducer=()->{IntegerproducedElement=ThreadLocalRandom.current().nextInt();try{queue.put(producedElement);}catch(InterruptedExceptionex){ex.printStackTrace();}};Runnableconsumer=()->{try{IntegerconsumedElement=queue.take();}catch(InterruptedExceptionex){ex.printStackTrace();}};其实Executors.newCachedThreadPool()方法使用的是SynchronousQueuepublicstaticExecutorServicenewCachedThreadPool(){returnnewThreadPoolExecutor(0,Integer.MAX_VALUE,60L,TimeUnit.SECONDS,newSynchronousQueue());}见previousLinkedBlockingQueue用在newSingleThreadExecutor和newFixedThreadPool上,而newCachedThreadPool用的是SynchronousQueue。为什么?因为单线程池和固定线程池的线程数有限,所以提交的任务需要在LinkedBlockingQueue队列中等待空闲线程;在缓存线程池中,线程数几乎没有限制(上限为Integer.MAX_VALUE),所以提交的任务只需要同步交给SynchronousQueue队列中的空闲线程即可,所以有时说SynchronousQueue的吞吐量高于LinkedBlockingQueue和LinkedBlockingQueueArrayBlockingQueueLinkedTransferQueue简单来说,TransferQueue提供了一个场所,生产者线程使用transfer方法传入一些对象并阻塞,直到这些对象全部被消费者线程取出。是不是觉得刚才介绍的SynchronousQueue很像一个容量为0的?传输队列。但是LinkedTransferQueue比其他阻塞队列多了三个methodtransfer(Ee)。如果当前有消费者在等待消费元素,transfer方法可以直接将生产者传入的元素立即传输(传输)给消费者;如果没有消费者正在等待消费元素,那么transfer方法会将元素放到队列的尾(tail)节点上,阻塞直到元素被消费者消费完才返回tryTransfer(Ee)tryTransfer,这显然是一种尝试,如果不是消费者正在等待消费元素,它会立即返回false,程序不会阻塞tryTransfer(Ee,longtimeout,TimeUnitunit)有超时限制,尝试传输生产者传给消费者的元素,如果超时,仍然没有消费者消费元素,返回false你看,所有的阻塞方法都是套路:阻塞方法有try,非阻塞方法有try,timeoutnon-blockingmethod看到这里,你可能会觉得LinkedTransferQueue没有什么特点,其实它和其他阻塞队列的区别还是挺大的:BlockingQueue是如果队列满了,线程就会阻塞;但是TransferQueue如果没有consumer元素(transfer方法)就会被阻塞,这也对应DougLea说的那句话:LinkedTransferQueue其实是ConcurrentLinkedQueue、SynchronousQueue(在“公平”模式下)和unboundedLinkedBlockingQueues的超集。通过允许您混合和匹配这些功能以及利用更高性能的实现技术,它变得更好。简单翻译一下:LinkedTransferQueue就是ConcurrentLinkedQueue,SynchronousQueue(公平模式下),无界LinkedBlockingQueues等的超集;允许你混合使用阻塞队列的多种特性所以,在合适的场景下,请尽量使用LinkedTransferQueue上面是一个单向队列FIFO,然后我们看双向队列LinkedBlockingDequeLinkedBlockingDeque是一个双向阻塞队列由链表结构组成。任何带有后缀Deque的东西都表示双向队列。后缀读作deck——/dek/。刚开始接触的时候,我以为是这个冰淇淋的读音。所谓的双向排队是值得的。在双向队列的两端插入和移除元素。因此:因为双向队列多了一个操作队列的入口,当多个线程同时入队时,也会减少一半的竞争。队列有头有尾,所以比其他阻塞队列多。还有几个比较特殊的方法addFirstaddLastxxxxFirstxxxxLast...那么,双向阻塞队列真的很高效,那么双向阻塞队列应用在什么地方呢?不知道大家有没有听说过“工作窃取”模式,这种看似不厚道的方法,其实是一种高效使用线程的好方法。下一篇文章,我们就来看看ForkJoinPool是如何应用“work-stealing”模式的。从总结到Java队列(其实主要介绍的是阻塞队列),我们将快速区分整理看似乱七八糟的方法,方便快速了解其用途,也说明了这些队列的实际用途。相信大家站在更高的角度去阅读源码会更轻松。最后希望大家仔细阅读两个队列的源码实现。遇到排队问题,脑海中的画面分分钟搞定。“日工医兵”,可以通过以下二维码关注。转载本文请联系日工一兵公众号。