当前位置: 首页 > 科技观察

基础:Java原子组件与同步组件

时间:2023-03-15 19:01:36 科技观察

本文转载自微信公众号《潜行》,作者cscw。转载本文请联系SneakUp公众号。前言在使用多线程并发编程时,经常会遇到修改共享变量的操作。此时我们可以选择ConcurrentHashMap、ConcurrentLinkedQueue来安全存储数据。但如果只涉及状态修改和线程执行顺序,那么使用以Atomic开头的原子组件或ReentrantLock、CyclicBarrier等同步组件会是更好的选择。下面将一一介绍它们的原子组件的原理和用法。实现原理CASAtomicBoolean、AtomicIntegerArray等原子组件的使用,以及同步组件的实现原理ReentrantLock、CyclicBarrier等同步组件的使用原子组件的实现原理CAScas的底层实现可以看一篇文章之前写过:解锁原理详解、synchronized、volatile+cas底层实现[1]应用场景可用于实现多线程下的变量和状态原子操作可用于实现同步锁(ReentrantLock)原子组件原子组件依赖于使用cas自旋操作volatile变量实现的volatile类型变量保证了变量修改时其他线程可以看到最新的值。cas保证值修改操作是原子的,不会被中断。基本类型原子类AtomicBoolean//布尔类型AtomicInteger//正整数类型AtomicLong//长整数类型使用示例publicstaticvoidmain(String[]args)throwsException{AtomicBooleanatomicBoolean=newAtomicBoolean(false);//异步线程修改atomicBooleanCompletableFuturefuture=CompletableFuture.runAsync(()->{try{Thread.sleep(1000);//确保异步线程在主线程之后修改atomicBoolean为false;}catch(Exceptione){thrownewRuntimeException(e);}});atomicBoolean.set(true);future.join();System.out.println("booleanvalueis:"+atomicBoolean.get());}-------------输出结果----------------booleanvalueis:falsereferenceclassatomclassAtomicReference//引用类atomwithtimestampversionClassAtomicStampedReference//相当于AtomicStampedReference,AtomicMarkableReference关心变量是否还是原来的变量,中间有没有修改过也没关系)实现原子操作,MethodHandles会帮助计算值在类中的偏移位置,最后调用VarHandle中的Unsafe.publicfinalnativebooleancompareAndSetReference(Objecto,longoffset,Objectexpected,Objectx)方法原子修改对象的属性privatestaticfinalVarHandleVALUE;static{try{MethodHandles.Lookupl=MethodHandles.lookup();VALUE=l.findVarHandle(AtomicReference.class(class,"value);value}",cReflectiveOperationExceptione){thrownewExceptionInInitializerError(e);}}privatevolatileVvalue;...ABA问题线程X准备把变量的值从A改成B,但是在这期间,线程Y把变量的值从A改成了C,然后给A;最后一个线程X检测到变量值为A,将其替换为B但实际上,A已经不是原来的A了,解决办法就是设置变量为唯一类型。值可以附加版本号,或者时间戳。如果加上版本号,线程Y的修改就变成了A1->B2->A3。此时如果线程X再次更新,则可以判断A1不等于A3。AtomicStampedReference的实现和AtomicReference类似,但是它原子修饰的变量是volatilePairpair;,Pair是它的内部类。AtomicStampedReference可用于解决ABA问题(Treference,intstamp){returnnewPair(reference,stamp);}}privatevolatilePair对;如果我们不关心变量在中间过程中是否被修改,只关心当前变量是否还是原来的变量,那么我们可以使用AtomicMarkableReferenceAtomicStampedReference的例子publicclassMain{publicstaticvoidmain(String[]args)throwsException{Testold=newTest("hello"),newTest=newTest("world");AtomicStampedReferencereference=newAtomicStampedReference<>(old,1);reference.compareAndSet(old,newTest,1,2);System.out.println("对象:"+reference.getReference().name+";版本号:"+reference.getStamp());}}classTest{Test(Stringname){this.name=name;}publicStringname;}----------------输出结果----------------对象:世界;版本号:2ArrayatomicclassAtomicIntegerArray//整数数组AtomicLongArray//长整数数组AtomicReferenceArray//引用类型数组数组原子类会在里面初始化一个final数组,它把整个数组看成一个对象,然后根据下标索引计算元素偏移量,然后调用UNSAFE.compareAndSetReference来执行原子操作该数组未被volatile修改。为了保证元素类型在不同线程中的可见性,使用UNSAFEpublicnativeObjectgetReferenceVolatile(Objecto,longoffset)方法获取元素获取实时元素值。Example//元素默认初始化为0AtomicIntegerArrayarray=newAtomicIntegerArray(2);//下标为0的元素,期望值为0,更新值为1array.compareAndSet(0,0,1);System.out.println(array.get(0));----------------输出结果----------------1属性原子类AtomicIntegerFieldUpdaterAtomicLongFieldUpdaterAtomicReferenceFieldUpdater如果操作对象是某类属性,可以使用AtomicIntegerFieldUpdater原子更新,但是类的属性需要定义为volatile修饰的变量,保证属性在每个线程中的可见性,否则报错将使用示例进行报告fieldUpdater.compareAndSet(测试,“helloworld”,“siting”);System.out.println(fieldUpdater.get(test));System.out.println(test.name);}}classTest{Test(Stringname){this.name=name;}publicvolatileStringname;}-------------输出结果----------------sittingsittingaccumulatorStriped64LongAccumulatorLongAdder//accumulatorFunction:运算规则,identity:初始值publicLongAccumulator(LongBinaryOperatoraccumulatorFunction,longidentity)LongAccumulator和LongAdder都继承自Striped64,Striped64的主要思想有点类似于ConcurrentHashMap。分段计算。当单个变量计算的并发性能较慢时,我们可以将数学运算分散到多个变量上,当需要计算总值时,逐个累加。LongAdder相当于一个LongAccumulatorLongAccumulator的特殊情况实现的例子publicstaticvoidmain(String[]args)throwsException{LongAccumulatoraccumulator=newLongAccumulator(Long::sum,0);for(inti=0;i<100000;i++){CompletableFuture.runAsync(()->accumulator.accumulate(1));}Thread.sleep(1000);//等待所有CompletableFuture线程完成,然后获取System.out.println(accumulator.get());}---------------Outputresults------------------100000同步组件实现原理java中的大部分同步组件都会在内部维护一个状态值,和原子组件一样修改state值一般是通过cas实现状态修改的维护工作由DougLeaAbstractQueuedSynchronizer(AQS)抽象出来实现AQS的原理。可以看之前写的一篇文章:解锁原理详解、synchronized、volatile+cas底层实现[2]同步组件ReentrantLock、ReentrantReadWriteLockReentrantLock、ReentrantReadWriteLock是基于AQS(AbstractQueuedSynchronizer)实现的。因为它们有公平锁和非公平锁的区别,所以它们不直接继承AQS,而是使用内部类来继承。公平锁和非公平锁分别实现AQS,ReentrantLock、ReentrantReadWriteLock使用内部类实现同步ReentrantLock使用示例ReentrantLocklock=newReentrantLock();if(lock.tryLock()){//业务逻辑lock.unlock();}示例publicstaticvoidmain(String[]args)throwsException{ReentrantReadWriteLocklock=newReentrantReadWriteLock();if(lock.readLock().tryLock()){//读锁//业务逻辑lock.readLock().unlock();}if(lock.writeLock().tryLock()){//写锁//业务逻辑锁。writeLock().unlock();}}Semaphore的实现原理与Semaphore和ReentrantLock的使用场景是一样的,也有公平竞争和不公平竞争锁的策略。它们还通过从内部类继承AQS来实现同步。通俗解释:假设有一口井,至多有三个位置可以打水。人每打一次水,就需要占一个位置。当三个位置都满了,第四个人需要去打水的时候,他要等前三个人中的一个离开水位,才能继续获取打水的位置。示例publicstaticvoidmain(String[]args)throwsException{Semaphoresemaphore=newSemaphore(2);for(inti=0;i<3;i++)CompletableFuture.runAsync(()->{try{System.out.println(Thread.currentThread().toString()+"开始");if(信号量.tryAcquire(1)){Thread.sleep(1000);semaphore.release(1);System.out.println(Thread.currentThread().toString()+"非阻塞结束");}else{System.out.println(Thread.currentThread().toString()+"阻塞结束");}}catch(Exceptione){thrownewRuntimeException(e);}});//确保CompletableFuture线程执行完毕,主线程结束Thread。sleep(2000);}----------------输出结果----------------Thread[ForkJoinPool.commonPool-worker-19,5,main]startThread[ForkJoinPool.commonPool-worker-5,5,main]startThread[ForkJoinPool.commonPool-worker-23,5,main]startThread[ForkJoinPool.commonPool-worker-23,5,main]被阻塞并且Thread[ForkJoinPool.commonPool-worker-5,5,main]结束时没有阻塞Thread[ForkJoinPool.commonPool-worker-19,5,main]结束时没有阻塞。可以看到三个线程,因为如果设置信号量为2,第三个线程将无法成功获取信息,打印结束。五个小任务,当主任务可以继续时,此时可以使用CountDownLatch。主任务被阻塞并等待。每完成一个小任务就统计一次,直到五个小任务全部执行完才会触发主线。示例publicstaticvoidmain(String[]args)throwsException{CountDownLatchcount=newCountDownLatch(2);for(inti=0;i<2;i++)CompletableFuture.runAsync(()->{try{Thread.sleep(1000);System.out.println("CompletableFutureover");count.countDown();}catch(Exceptione){thrownewRuntimeException(e);}});//等待CompletableFuture线程完成count.await();System.out.println("mainover");}---------------输出结果----------------CompletableFutureoverCompletableFutureovermainoverCyclicBarrier实现原理及使用场景CyclicBarrier是基于关于ReentrantLock锁和Conditiontrip属性实现同步,通俗解释:CyclicBarrier需要阻塞所有线程进入await状态,然后唤醒所有线程执行。想象一个栏杆可以挡住五只羊。只有当五只羊一起站在栏杆上时,栏杆才会被拉起。这时,所有的羊都能飞出羊圈。示例publicstaticvoidmain(String[]args)throwsException{CyclicBarrierbarrier=newCyclicBarrier(2);CompletableFuture.runAsync(()->{try{System.out.println("CompletableFuturerunstart-"+Clock.systemUTC().millis());barrier.await();//需要等待主线程也执行到await状态再继续执行System.out.println("CompletableFuturerunover-"+Clock.systemUTC().millis());}catch(Exceptione){thrownewRuntimeException(e);}});Thread.sleep(1000);//CompletableFuture线程等待barrier.await();System.out.println("mainrunover!");}---------------输出结果---------------CompletableFuturerunstart-1609822588881mainrunover!CompletableFuturerunover-1609822589880StampedLockStampedLock不依赖AQS,而是维护多个状态值在内部,用cas实现的StampedLock有三种模式:writemode,readmode,optimisticreadmodeStampedLock的读写锁可以相互转换urnastampvaluepubliclongreadLock()//尝试加读锁,不成功返回0publiclongtryReadLock()//解锁publicvoidunlockRead(longstamp)//获取写锁,自旋获取,返回一个stamp值publiclongwriteLock()//尝试加写锁,失败返回0publiclongtryWriteLock()//解锁publicvoidunlockWrite(longstamp)//尝试乐观读取一个时间戳,并使用validate方法验证时间戳的有效性publiclongtryOptimisticRead()//验证时间戳是否有效publicbooleanvalidate(longstamp)useExamplepublicstaticvoidmain(String[]args)throwsException{StampedLockstampedLock=newStampedLock();longstamp=stampedLock.tryOptimisticRead();//判断版本号是否有效if(!stampedLock.validate(stamp)){//获取一个读锁会空闲stamp=stampedLock.readLock();longwriteStamp=stampedLock.tryConvertToWriteLock(stamp);if(writeStamp!=0){//成功转换为写锁//fixme业务操作stampedLock.unlockWrite(writeStamp);}else{stampedLock.unlockRead(stamp);//尝试获取write和readstamp=stampedLock.tryWriteLock();if(stamp!=0){//fixme业务操作stampedLock.unlockWrite(writeStamp);}}}}参考到文章ConcurrentStriped64(lAccumulationDevice)[3]参考资料[1]解锁原理详解,synchronized,volatile+cas底层实现:https://juejin.cn/post/6854573210768900110[2]解锁原理详解,synchronized,volatile+cas底层实现:https://juejin.cn/post/6854573210768900110[3]并发Striped64(l累加器):https://www.cnblogs.com/gosaint/p/9129867.html