当前位置: 首页 > 后端技术 > Java

【Java并发编程】Lock&Condition协调同步生产和消费

时间:2023-04-01 14:06:51 Java

1.协调生产/消费的要求我们手写一个ArrayBlockingQueue。JDK其实已经有了这个类,是基于Lock锁和Condition锁信号实现的。当然,JDK的实现代码非常复杂,包括更严格的逻辑检查,从性能优化的角度也做了更多的工作。在本文中,我们只是简单的实现了它的核心逻辑:在初始化构造ArrayBlockingQueue时,指定最大容量限制,并提供put方法。当达到Queue队列的最大容量限制时,产生数据的线程就会被阻塞。put方法产生数据后,队列一定不能为空,通知消费者线程消费。提供take方法,当Queue队列容量为0时,阻塞消费数据的线程。take方法执行完后,肯定队列没有满,通知生产者线程生产。一条数据只能取一次。取出后,数据从队列中删除。相信上面的逻辑实现之后,你一定已经掌握了java并发编程的Lock锁和Condition锁信号吧!其实这个逻辑基本上就是kafka生产者客户端缓冲队列和批量数据传输的实现逻辑。不同的是take方法是一次性取出buffer中的所有数据,而本文的take方法是一次取出一条数据。2.构造方法构造队列的方法很简单。使用List作为数据存储队列,并指定其容量。至此我们还没有实现容量判断和阻塞线程的功能。//类成员变量-存储数据的队列privateListqueue;//类成员变量-存储队列的容量上限privateintqueueSize;publicMyBlockingQueue(intqueueSize){this.queueSize=queueSize;queue=newArrayList<>(queueSize);//存储消息的集合}3.Lock&Condition逻辑设计首先,我们需要一个锁来保证数据put和take操作的同步,即:一条数据只能被取一次,取完后,数据从队列中删除;而createCondition逻辑需要Lock锁。学过java基础并发编程的同学可以把Lock锁理解为Synchronized同步代码块,功能相同。写了专栏《java并发编程》介绍两者的区别,欢迎关注。privateLocklock=newReentrantLock();//LockCondition逻辑可以理解为传统JDK多线程编程中的wait和notify,但是Condition的语义更容易理解。如下代码所示:privateConditionnotFull=lock.newCondition();//队列未满notFull.signal();//通知生产者队列未满,可以继续生产数据(一般在消费者拿到数据后,调用)notFull.await();//队列已满,阻塞生产线程(await反转condition逻辑)privateConditionnotEmpty=lock.newCondition();//队列不为空notEmpty.signal();//通知消费者线程队列不为空,可以继续消费数据(一般在生产者生产数据后,调用)notEmpty.await();//队列已经为空,消费线程阻塞(await反转condition逻辑)大家在使用Lock&Condition进行线程同步协调的时候,首先要像我一样设计condition的逻辑语义,设计好表达的时候xxxx作为条件。当情况满足条件时,发送信号signal()通知其他线程;当情况恰好与条件相反时,使用await阻塞当前线程。4.其实放入数据最重要的是完成Lock&Condition逻辑设计,剩下的就是填空了。模板如下,通过while循环判断队列当前容量是否达到上限。如果达到上限,则说明队列已满。队列已满(notFull使用await否定),await阻塞生产线程继续将数据放入队列。在这里,曾经有朋友问过我一个奇葩问题:多个线程持有同一个锁,怎么知道阻塞线程是生产线程,不是消费者线程?答:一个线程是生产线程还是消费者线程取决于它的动作(调用什么方法),没有标签来定义。调用put方法放入数据的线程就是产生数据的线程。while/await组合是标准的写法,请不要随意改成if,否则会遇到很多奇怪的bug。//队列已满,等待阻塞生产线程while(queue.size()>=queueSize){System.out.println(Thread.currentThread().getName()+"waiting,becausethequeueisfull");不完整。await();}向队列中添加一条数据。这时候我们可以判断队列是notEmpty,所以使用notEmpty.signal()向producer发送一个signal。问题又来了:多个线程持有同一个锁,怎么知道通知的是消费者线程,而不是生产者线程呢?答案是我真的不知道,所以上面的while(queue.size()>=queueSize)用的是while而不是if。即使producer线程被唤醒,while判断也会阻塞它的await。//向队列添加一条消息,通知消费者有新消息queue.add(message);System.out.println(Thread.currentThread().getName()+"production"+message);notEmpty。signal();//通知消费者线程5.take消费数据take从队列中取出数据。取出数据后,队列一定是notFull,所以发送notFull.signal信号。当队列为空时(notEmpty使用await取反),await同时阻塞消费者线程。publicObjecttake()throwsInterruptedException{ObjectretVal=null;lock.lock();//先锁定操作队列try{//队列为空,通知生产线程,消费者线程阻塞while(queue.size()==0){System.out.println("队列为空,停止消费!");notEmpty.await();}//从队列中删除一条消息,通知生产者队列中有位置retVal=queue.get(0);队列.删除(0);notFull.signal();//同时通知生产者队列}finally{lock.unlock();}returnretVal;}相信有了上面put方法的基础,理解take方法的代码就很容易了,这里就不做过多解释了。6.生产消费测试publicstaticvoidmain(String[]args){//为了方便查看测试结果,我们的队列容量设置小一些MyBlockingQueuequeue=newMyBlockingQueue(2);//生产者线程newThread(()->{for(inti=0;i<5;i++){try{queue.put("msg"+i);//放入5条数据}catch(InterruptedExceptione){e.printStackTrace();}}}).start();//消费线程newThread(()->{while(true){//一直消费try{System.out.println(Thread.currentThread().getName()+"消费数据"+queue.take());}catch(InterruptedExceptione){e.printStackTrace();}}}).start();}输出结果如下,满足我们的需求。当队列满时,生产者线程Thread-0等待;生产和消费相互协调,相互通知,最终数据消费完成,队列为空,消费线程阻塞。Thread-0产生msg0Thread-0产生msg1Thread-0等待,因为队列已满Thread-1消耗数据msg0Thread-0产生msg2Thread-0等待,因为队列已满Thread-1消耗数据msg1Thread-0产生msg3Thread-0等待,因为Queue已满Thread-1消费数据msg2Thread-0产生msg4Thread-1消费数据msg3Thread-1消费数据msg4队列为空,停止消费!欢迎关注我的博客,本文转载更多优质知识合集,注明出处(一定要有链接,不能只是文字):字母哥博客-zimug.com如果觉得对你有帮助,请点赞分享!您的支持是我创作不竭的动力!.另外,作者近期输出了以下优质内容,期待大家的关注。《kafka修炼之道》《手摸手教你学Spring Boot2.0》《Spring Security-JWT-OAuth2一本通》《实战前后端分离RBAC权限管理系统》《实战SpringCloud微服务从青铜到王者》