当前位置: 首页 > 科技观察

LongAdder实现原理详解

时间:2023-03-12 03:22:41 科技观察

前言AtomicInteger和AtomicLong使用非阻塞CAS算法原子更新一个变量,比synchronized阻塞算法有更好的性能,但是在高并发下,大量线程更新一个变量在同时,由于同一时间只能有一个线程成功,大部分线程在尝试更新失败后会通过自旋重试,严重占用CPU的时间片。AtomicLong的实现示意图:LongAdder是JDK8中新增的一个原子操作类。它提供了一种新的思维方式。既然AtomicLong的性能瓶颈是大量线程同时更新一个变量造成的,那这个变量能不能拆分出来呢?,变成多个变量,然后让线程去竞争这些变量,最后合并?LongAdder的设计精髓就在这里。通过将变量拆分成多个元素,降低变量的并发度,最后合并元素,变相的减少了CAS失败的次数。LongAdder的实现示意图:公共方法publicclassLongAdderextendsStriped64implementsSerializable{//构造方法publicLongAdder(){}//加1操作publicvoidincrement();//减1操作publicvoiddecrement();//获取原子的值variablepubliclonglongValue();}下面举一个简单的例子模拟50个线程同时更新.LongAdder;publicclassMain{publicstaticvoidmain(String[]args){LongAdderadder=newLongAdder();ExecutorServicethreadPool=Executors.newFixedThreadPool(20);for(inti=0;i<50;i++){Runnabler=()->{adder.add(1);};threadPool.execute(r);}threadPool.shutdown();//如果关闭线程池后所有任务都执行完毕,isTerminated()返回truewhile(!threadPool.isTerminated()){System.out.println(adder.longValue());break;}}}输出结果为50,如果你不是fa熟悉线程池的可以参考我另一篇讲线程池原理分析类图的文章。LongAdder内部维护了一个Cell类型的数组,其中Cell是Striped64中的静态内部类。Cell类abstractclassStriped64extendsNumber{@sun.misc.ContendedstaticfinalclassCell{volatilelongvalue;Cell(longx){value=x;}finalbooleancas(longcmp,longval){returnUNSAFE.compareAndSwapLong(this,valueOffset,cmp,val);}}}Cell用于封装对于拆分后的元素,内部使用一个value字段来存储当前元素的值,需要合并时,将所有Cell数组中的值累加起来。Cell内部使用CAS操作来更新value值。不熟悉CAS操作的同学可以参考我的另一篇文章探讨CAS的实现原理。你可以注意到Cell类被@sun.misc.Contended注解修改了。这个注解是为了解决虚假分享的问题。什么是虚假分享?一个缓存行可以存储多个变量(填充当前缓存行的字节数);而CPU对缓存的修改是以缓存行的最小单位为单位的。在线程的情况下,如果需要修改“共享同一缓存行的变量”,就会在不经意间影响到彼此的性能,这就是虚假共享(FalseSharing)。不了解虚假共享的同学可以参考这位大佬的文章虚假共享(FalseSharing)的底层原理及其解决方法。LongAdder使用的是Cell数组,数组的元素是连续的,所以多个Cell对象共享一个cacheline的情况很常见,所以这里@sun.misc.Contended注解对单个Cell元素进行字节填充,保证一个Cell对象占用一个cacheline,即填充到64字节。关于如何确定一个对象的大小,可以参考我的另一篇文章对象的内存布局,如何确定对象的大小,这样就可以计算出需要填充多少字节。longValue()longValue()返回累加值publiclonglongValue(){returnsum();}publiclongsum(){Cell[]as=cells;Cella;longsum=base;//当Cell数组不为null时,累加后返回,否则直接返回基准数baseif(as!=null){for(inti=0;i0){/***insidesmallbranch1:这是处理add方法内部if分支的条件3:如果hash后的位置为null,*说明没有线程在这个位置设置值,*没有竞争,可以使用直接,然后用x的值作为初始值Value创建一个新的Cell对象,*用cellsBusy锁住cells数组,*然后把这个Cell对象放在cells[m%cells.length]的位置*/if(a=as[(n-1)&h])==null){//cellsBusy==0表示当前没有线程修改cells数组if(cellsBusy==0){//新建一个Cell对象以要累加的x值作为初值,cellr=newCell(x);//如果cellsBusy=0不加锁,通过cas将cellsBusy置1并加锁if(cellsBusy==0&&casCellsBusy()){//标记是否Cell创建成功,放入hashed位置thecellsarraybooleancreated=false;try{Cell[]rs;intm,j;//再次检查cells数组不为null,且长度不为空,散列位置的cell为nullif((rs=cells)!=null&&(m=rs.length)>0&&rs[j=(m-1)&h]==null){//设置新cell到这个位置rs[j]=r;created=true;}}finally{//解除锁cellsBusy=0;}//生成成功,跳出循环if(created)break;//如果created为false,表示cells上面指定的cells数组的位置[m%cells.length]//已经有其他线程设置cell完成,//继续执行循环continue;}}//如果执行到当前行,说明cellsBusy=1,有线程正在改变cells数组,说明发生冲突,设置collide为falsecollide=false;/***内部小分支2:如果在add方法中条件4:通过cas到v+x设置Cell对象中cells[m%cells.length]位置的*value值失败,*说明已经发生竞争,设置wasUncontended为true,跳出内部if判断,*finally重新计算一个新的probe,然后重新执行循环;*/}elseif(!wasUncontended)//设置uncontended标志为true,继续执行,然后计算一个新的probe值,然后重新执行循环。wasUncontended=true;/***内部小分支3:新竞争线程参与竞争的情况:处理进入当前方法时threadLocalRandomProbe=0的情况,*即当前线程参与的cascell第一次竞争失败,这里会尝试将x值加到cells[m%cells.length]*的值上,成功则直接退出*/elseif(a.cas(v=a.value,((fn==null)?v+x:fn.applyAsLong(v,x))))break;/***内部小分支4:分支3处理新线程争用执行失败。此时如果cells数组长度已经达到最大值*(大于等于杯数),*或者当前cells已经展开,则设置collide为false,然后重新计算prob的值*/elseif(n>=NCPU||cells!=as)collide=false;/***内部小分支五:如果发生冲突collide=false,设置为true;最后重新计算hash值后,*进入下一个for循环*/elseif(!collide)//设置冲突标志,表示发生冲突,需要重新生成hash,重试。//如果下一次重试仍然导致change分支,collide=true,不满足!collide条件,则进入下一个分支collide=true;/***内部小分支六:展开cells数组,并参与新的cell竞争使用的线程失败2次且满足存储容量条件,将执行分支*/elseif(cellsBusy==0&&casCellsBusy()){try{//检查cells是否已经扩容if(cells==as){//ExpandtableunlessstaleCell[]rs=newCell[n<<1];for(inti=0;i