本来以为TransferQueue和TransferStack的实现应该是一样的。一个是队列,另一个是堆栈。拿的时候是从头上取的,放的时候就不一样了。Queue放在队列的最后,Stack被压入栈顶。所以我认为两者之间的区别只是放置方式不同。但其实他们算法的差距不仅仅在put上,差距还是比较大的。了解了TransferStack的源码后,想要直接了解TransferQueue并不容易。TransferQueue#QNode类似于TransferStack,通过QNode(称为TransferStack的SNode)存储“数据”或获取对数据的“请求”。QNode的主要属性:staticfinalclassQNode{volatileQNodenext;//队列中的下一个节点volatileObjectitem;//CAS进出nullvolatileThreadwaiter;//控制停放/取消停放finalbooleanisData;next:下一个节点。waiter:节点绑定的线程,与SNode的waiter同义。item:与SNode的item含义一致。isData:与SNode的模式类似,true表示是数据,false表示该节点是请求。head:头节点,即第一个节点。tail:尾节点。更气人的是,他没有对手!他也没有FULFILLING模式,所以暗示他的算法逻辑和TransferStack的差距可能比较大。QNode提供了几个原子操作:casNext:cas方法替换当前节点的下一个节点casItem:cas方法替换当前节点的itemtryCancel;cas方法将当前节点的item设置为自己,表示该节点为calceladvanceHeaded:以cas的方式修改头节点,其实就是从头出队advanceTail:从头入队tailTransferQueue#InitializeTransferQueue(){QNodeh=newQNode(null,false);//初始化为虚拟node.head=h;尾巴=h;}构造函数创建一个“虚拟节点”并将其设置为头部和尾部。需要注意的是,在take操作过程中必须跳过这个“虚拟节点”。TransferQueue#transferTransferQueue的亮点,入队和出队都是方法。代码的解析在注释中:Etransfer(Ee,booleantimed,longnanos){QNodes=null;//根据需要构造/重用booleanisData=(e!=null);对于(;;){QNodet=tail;QNodeh=head;//如果当前队列还没有初始化,等待初始化完成if(t==null||h==null)//sawuninitializedvaluecontinue;//spin//队列为空,或者当前操作与tail的模式相同//这种情况下,队列中的节点无法匹配到当前请求节点,所以当前请求节点入队if(h==t||t.isData==isData){//空的或相同模式的QNodetn=t.next;//并发时,t不再是尾节点(其他线程有节点入队),重启主循环if(t!=tail)//不一致读继续;//一个新节点加入了队列的尾部,重置尾部节点,并重新启动主循环if(tn!=null){//滞后尾部advanceTail(t,tn);继续;}//等超时,返回nullif(timed&&nanos<=0)//等不及了returnnull;//创建一个新节点if(s==null)s=newQNode(e,isData);//向队尾添加一个新节点,如果添加失败,则重新开始if(!t.casNext(null,s))//failedtolinkincontinue;//设置新节点为尾节点---完成入队advanceTail(t,s);//摇尾等待//通过自旋或阻塞当前进程,等待匹配Objectx=awaitFulfill(s,e,timed,nanos);//等待匹配过程,节点被取消,清理后返回nullif(x==s){//等待被取消clean(t,s);返回空值;}//匹配完成,要么在自旋过程中直接匹配完成,要么阻塞后匹配成功的节点唤醒//如果s还在队列中if(!s.isOffList()){//notalreadyunlinked//如果s的前一个节点为首节点,则替换为s为首节点,将原来的首节点t从队列中移除//接下来的操作逻辑基于:s已经匹配成功,原头节点出队,如果s是数据节点,则解析除了s对数据的引用,s节点变成了一个虚拟节点,变成了headadvanceHead(t,s);//unlinkifhead//匹配成功后,如果x!=null,说明当前节点s为“request”,匹配到“data”if(x!=null)//andforgetfields//sitem指向自己,因为x即将返回,释放s对x的引用s.item=s;//s等待线程为空s.waiter=null;}//匹配成功,返回return(x!=null)?(EXE文件;//否则队列不为空且mode不同,可以匹配}else{//complementary-mode//准备匹配,注意获取第一个节点的下一个节点,跳过第一个节点QNodem=h.下一个;//nodetofulfill//处理过程中队列发生了变化,重新开始if(t!=tail||m==null||h!=head)continue;//读取不一致Objectx=m.item;//如果m已经被其他节点匹配到,或者m被取消,或者匹配失败,将m替换为头节点,继续主循环//如果m没有匹配到,则没有如果取消,则由casItem匹配。匹配完成后,将m节点的isData改为reverse//即:request节点改为data,data节点改为request。//这个操作是唤醒对应的阻塞进程后,让被阻塞的进程满足匹配条件,类似Stack的匹配赋值if(isData==(x!=null)||//malreadyfulfilledx==m||//m取消!m.casItem(x,e)){//丢失CASadvanceHead(h,m);//出列并重试continue;}//如果匹配完成,则第一个节点出队。需要注意的是,m节点并没有出队,而是改为对于第一个节点,//阻塞进程被唤醒后,m节点会被当作类似dummy节点一样处理。进行下一次匹配时将跳过第一个节点。advanceHead(h,m);//成功完成//唤醒m节点的阻塞ProcessLockSupport.unpark(m.waiter);//返回返回(x!=空)?(EXE文件;}}}TransferQueue#awaitFulfillawaitFulfill就是通过自旋或者阻塞当前线程来等待节点匹配。参数变为4:QNodes:等待匹配的节点。Ee:匹配判断标准和TransferStack完全不一样。TransferStack是通过match属性判断的。TransferQueue用了这么一个东西,不好理解。最初发送的e是s的项目。后面再解释具体的匹配规则。booeantimed:true表示在限定时间内等待。longnanos:时间限制等待时间。ObjectawaitFulfill(QNodes,Ee,booleantimed,longnanos){//自旋时长,超时时长等finallongdeadline=timed?System.nanoTime()+nanos:0L;线程w=Thread.currentThread();intspins=((head.next==s)?(timed?maxTimedSpins:maxUntimedSpins):0);对于(;;){如果(w.isInterrupted())s.tryCancel(e);对象x=s。物品;//一开始e==x,如果用x!=e判断是否匹配,一定是匹配不完//在自旋或者线程阻塞等待的过程中,变化的是x,即队列中的s项会在其他线程完成匹配后被替换掉。//具体可以参考transfer方法中匹配部分的casItem。这就是完成的。交换完成后,进程被唤醒,x!=e成立if(x!=e)returnx;//超时判断,如果超时,取消当前节点if(timed){nanos=deadline-System.nanoTime();如果(纳米<=0L){s.tryCancel(e);继续;}}//首先自旋if(spins>0)--spins;//线程分配,准备阻塞elseif(s.waiter==null)s.waiter=w;//阻塞线程elseif(!timed)LockSupport.park(this);elseif(nanos>spinForTimeoutThreshold)LockSupport.parkNanos(this,nanos);}}代码注释中说明,在调用开始时e==x时,如果用x!=e判断是否匹配,一定是没有匹配完成。在自旋或者线程阻塞等待的过程中,变化的是x,即队列中s的项会在其他线程完成匹配后被替换掉。详见transfer方法匹配部分的casItem。这就是完成的。交换完成后,进程被唤醒,x!结束。TransferQueue#clean和TransferStack的clean方法作用相同,都是在匹配完成前清理掉已经取消的节点,如果当前节点不是尾节点则清空队列。总结SynchronousQueue的源码已经分析完毕。虽然代码量不大,但理解起来并不容易。需要反复读取,对比写队列和从队列取数据两种操作,想象多线程并发条件下的操作场景,画出队列在各个场景和时刻的状态每个给定的时间点都在一张白纸上,这有助于更好地理解代码逻辑。非常感谢!PreviousBlockQueue-基于TransferStackNext的SynchronousQueue[BlockingQueue-LinkedBlockingQueue]{https://segmentfault.com/a/1190000043516074}
