当前位置: 首页 > 科技观察

访谈侃集-SynchronousQueueFairMode

时间:2023-03-21 10:25:42 科技观察

面试官:哟,你们来的早啊!九头蛇:是的,我不能让你等太久场)。面试官:前两轮表现还不错,今天继续说说队列中的SynchronousQueue。Hydra:嗯,SynchronousQueue和之前介绍的队列相比,有点特殊。必须等到队列中的元素被消耗完,才能继续往里面添加新的元素,所以也叫无缓冲等待队列。.我先写个例子,创建两个线程,生产者线程putThread向SynchronousQueue中放入元素,消费者线程takeThread从中取元素:SynchronousQueuequeue=newSynchronousQueue<>(true);ThreadputThread=newThread(()->{for(inti=0;i<=2;i++){try{System.out.println("putthreadput:"+i);queue.put(i);System.out.println("putthreadput:"+i+"awake");}catch(InterruptedExceptione){e.printStackTrace();}}});ThreadtakeThread=newThread(()->{intj=0;while(j<2){try{j=队列.take();System.out.println("takefromputThread:"+j);}catch(InterruptedExceptione){e.printStackTrace();}}});putThread.start();Thread.sleep(1000);takeThread.start();执行上面的代码,查看结果:putthreadput:0takefromputThread:0putthreadput:0awakeputthreadput:1takefromputThread:1putthreadput:1awakeputthreadput:2takefromputThread:2putthreadput:2awake可以看到生产者线程在执行完put方法后一直处于阻塞状态,直到消费者线程执行完take方法消费队列中的元素,生产者线程被唤醒继续向下执行。简单来说,操作流程是这样的:面试官:就这?谁不能使用该应用程序?不谈底层原理,还想蒙混过关?Hydra:别着急,先从它的构造函数说起,根据参数的不同,SynchronousQueue分为公平模式和非公平模式。默认为非公平模式。该模式下,底层使用TransferQueue队列,内部节点由QNode组成,定义如下:){this.item=item;this.isData=isData;}item用于存储数据,isData用于区分节点是从什么类型的线程生成的。true表示是producer,false表示是consumer,后面用于节点匹配(complementary)的必备。匹配是SynchronousQueue中一个非常重要的概念。比如一个线程先执行put生成一个节点入队列,另一个线程执行take生成一个节点。这两种不同类型的节点可以成功匹配。面试官:但是我看了很多资料说SynchronousQueue是一个不存储元素的阻塞队列。你怎么理解这个?Hydra:从上面节点封装的属性可以看出,SynchronousQueue的队列封装了更多的节点,不是关于数据,而是要执行的操作。我个人猜测这个说法的出发点是队列中存储的节点更偏向于操作的属性。面试官:好的,我们继续说队列的结构。Hydra:TransferQueue中定义的主要属性如下:transientvolatileQNodehead;transientvolatileQNodetail;transientvolatileQNodecleanMe;TransferQueue(){QNodeh=newQNode(null,false);//initializetodummynode.head=h;tail=h;}比较重要的头节点head、tail节点、cleanMe节点标记下一个要删除的节点。在构造函数的初始化中创建一个节点。评论中称其为dummynode,即假节点。它的作用类似于AQS中的头节点,实际运行的节点是它的下一个节点。说起SynchronousQueue,真是一个神奇的队列。不管是调用put和offer,还是take和poll,都是交给核心的transfer方法处理,只是参数不一样而已。今天我们抛开源码,画图分析。首先我们看一下方法的定义:Etransfer(Ee,booleantitimed,longnanos);面试官:哦,就一种方法?我想看看它是如何区分实现Queue和dequeue操作的...Hydra:方法的参数中,timed和nanos用于标识调用transfer的方法是否可以超时,e是否为空可以指示该方法是由生产者还是消费者调用的。如果e不为null,则为生产者调用,如果e为null,则为消费者调用。该方法的整体逻辑可以分为以下几步:1.如果队列为空,或者队列中尾节点的类型与自身类型相同,准备封装一个新的QNode添加到队列。在向队尾添加新节点的过程中,并没有使用synchronized或者ReentrantLock,而是使用CAS来保证线程间的同步。在向队尾添加新的QNode之前,它会先判断之前获取的尾节点是否发生了变化。如果有变化,则放弃修改,自旋,在下一个循环中重新判断。检查队列尾部节点没有变化后,新建一个节点QNode,加入队列尾部。2、当队列尾部加入新节点时,会调用awaitFulfill方法,线程会根据传入的参数自旋或直接挂起。该方法的定义如下。当参数中的timed为true时,表示这是一个等待超时的方法。ObjectawaitFulfill(QNodes、Ee、booleantitimed、longnanos);在awaitFulfill方法中,会进行判断。如果新节点是头节点的下一个节点,考虑到它可能很快完成匹配出队,先不要挂起,执行一定的旋转次数,超过旋转次数上限后暂停。如果不是头节点的下一个节点,为避免自旋造成资源浪费,直接调用park或parkNanos挂起线程。3.当挂起的线程被打断或者达到超时时间,需要从队列中移除该节点,此时会执行clean()方法。如果要删除的节点不是链表中的尾节点,比较简单,直接用CAS替换上一个节点的next指针即可。如果要删除的节点是链表中的尾节点,那就有点复杂了,因为在多线程环境下,可能还有其他线程向尾节点添加新节点。如果直接删除tail节点,会造成如下节点丢失。此时会使用TransferQueue中定义的cleanMe来标记节点。cleanMe的作用是当要移除的节点是队列的尾节点时,标记尾节点的前驱节点。具体在执行过程中,有两种情况:cleanMe节点为null,说明队列之前没有标记要删除的节点。这时会使用cleanMe来识别节点的前驱节点。标记完成后会退出clean方法,下次执行clean方法时会删除cleanMe的下一个节点。如果cleanMe节点不为null,说明之前已经标记了需要删除的节点。此时删除cleanMe的下一个节点,清除当前cleanMe标记,将当前节点未修改前的前驱节点标记为cleanMe。注意,待删除节点的前驱节点不会改变,即使前驱节点已经从队列中逻辑删除。执行完clean方法后,transfer方法会直接返回null,表示入队操作失败。面试官:说了这么多,加入队列的节点都是同一种类型吧?Hydra:是的,TransferQueue队列中只会有一种类型的节点。如果有另一种类型的节点过来,那么就会执行Dequeued操作。面试官:嗯,那你可以谈谈离队的方式。Hydra:相比进队,离队的逻辑相对简单。因为现在使用的是公平模式,当队列不为空且队列头节点的下一个节点与当前节点匹配成功后,进行出队操作,唤醒头节点的下一个节点进行转账数据。根据队列中节点类型的不同,可以分为两种情况进行分析:1、如果头节点的下一个节点是put类型,则当前新节点是take类型。take线程取出put节点item的值并将其item变为null,然后前进head节点,唤醒挂起的put线程,take线程返回item的值,完成数据传输过程.头节点的下一个节点被唤醒后,会推进头节点。虽然说队列的头节点是dummynode,不存储数据。理论上应该直接将第二个节点从队列中移除,但是源码中还是会把头节点出队,把原来的第二个节点变成新的头节点。2.同理,如果头节点的下一个节点是take类型,则当前新节点是put类型。put线程将take节点的item设置为自己的数据值,然后前进head节点,唤醒挂起的take线程,最终返回从put线程获取的item的值。另外,take线程唤醒后,会将自己QNode的item指针指向自己,并将保存在waiter中的线程置为null,以便后面gc回收。面试官:也就是说,在代码中,生产者不一定要先生产产品,或者消费者可以先到达,然后阻塞等待?Hydra:可以,两个线程都可以先入队。面试官:好的,公平模式我明白了,我去喝杯水给你十分钟,回来再说说公平模式的实现。本文转载自微信公众号“码农仓上”,可通过以下二维码关注。转载本文请联系码农公众号。