多线程中必须要测试的“生产者-消费者”模型,看这篇文章就够了,这里很常见。这也是中美各大公司在面试中都爱考的问题。新生问的比较少,但是很受有工作经验的工程师欢迎。这个问题有很多版本和解决方案。在本文中,我将着重与大家理清思路,由浅入深地思考问题,保证大家看完后有所收获。问题背景简单来说,这个模型由两类线程组成:生产者线程:“生产”产品并将其放入队列;消费者线程:“消费”产品。队列:数据缓存。有了这个队列,生产者只需要关注生产,不需要关心消费者的消费行为,更不需要等待消费者线程执行完毕;消费者只需要消费,不管生产者如何生产,更不用说等待生产者生产了。所以这个模型实现了生产者和消费者之间的解耦和异步。什么是异步?比如你给女朋友打电话,你要等她接了电话才能说话。这是同步的。但是如果你给她发微信,你不需要等她回复,她也不需要马上回复,而是等她有空再回复。这是异步的。但是,生产者和消费者之间不可能没有联系。如果队列中的产品已满,则生产者无法继续生产;如果队列中的产品从头开始,生产者必须通知消费者它可以来消费;如果队列中没有商品,Consumers不能继续消费;如果队列中的产品满到满,消费者必须通知生产者你可以来生产了。因此,他们之间需要合作。最经典的就是使用Object类自带的wait()和notify()或者notifyAll()的消息通知机制。上面描述中的等待,其实就是通过wait()实现的;并且通知是notify()或notifyAll()。那么基于这个消息通知机制,我们也可以平衡生产者和消费者的速度差异。如果生产者的生产速度很慢,而消费者的消费速度很快,就像我们每个月付两次钱,但每天都在花钱,就是1:15。那么我们需要调整生产者(Salary)为15个线程,消费者维护1个线程。是不是很爽~总结一下这个模型的三个优点:解耦、异步、均衡速差。wait()/notify()接下来我们需要重点关注这个通知机制。wait()和notify()是Java中Object类自带的方法,可用于线程间通信。我在上一节提到的11个API中也提到过,这里就展开一下。wait()方法用于让当前线程等待,直到另一个线程调用notify()将其唤醒,或者我们可以设置一个时间让它自动唤醒。在调用该方法之前,线程必须获得该对象的对象监视器锁,即只能在加锁的方法中使用。调用该方法后,当前线程会释放锁。(提示:这一点很重要,也是下面代码中使用while而不使用if的原因。)notify()方法只能通知一个线程。如果有多个线程在等待,唤醒任何一个。notifyAll()方法可以唤醒所有等待的线程,然后加入同步队列。这里我们使用了2个队列:同步队列:对应我们上节讲到的线程状态下的Runnable,也就是线程就绪,等待抢资源。Waitingqueue:对应我们上一节讲到的线程状态中的Waiting,即等待状态。这里需要注意的是,从等待状态的线程不能直接进入Q2,必须先重新加入同步队列,再次等待锁,拿到锁后才能进入Q2;一旦退出Q2,锁就会丢失。Q2其实只有一个线程,因为这里必须加锁才能操作。为此,我首先构建了一个简单的Product类来表示生产和消费的产品,更多的字段你可以自己添加。publicclassProduct{privateStringname;publicProduct(Stringname){this.name=name;}publicStringgetName(){returnname;}publicvoidsetName(Stringname){this.name=name;}}在main函数中,我设置了两种线程,这里我选择用普通的ArrayDeque来实现Queue,更简单的方法是直接用Java中的BlockingQueue来实现。BlockingQueue是一个阻塞队列。它有一系列的方法让线程自动阻塞。常用的BlockingQueues有很多,后面会单独写一篇文章。这里,为了更好的理解并发协作的过程,我们先自己来处理一下。publicclassTest{publicstaticvoidmain(String[]args){Queuequeue=newArrayDeque<>();for(inti=0;i<100;i++){newThread(newProducer(queue,100)).start();newThread(newConsumer(queue,100)).start();}}}然后就是Producer和Consumer。publicclassProducerimplementsRunnable{privateQueuequeue;privateintmaxCapacity;publicProducer(Queuequeue,intmaxCapacity){this.queue=queue;this.maxCapacity=maxCapacity;}@Overridepublicvoidrun(){synchronized(queue){while(queue.size()==maxCapacity){//一定要用while而不是if,如下解释无法产生");wait();System.out.println("生产者"+Thread.currentThread().getName()+"退出等待");}catch(InterruptedExceptione){e.printStackTrace();}}if(queue.size()==0){//队列中的商品从头开始,需要通知等待的消费者queue.notifyAll();}Randomrandom=newRandom();Integeri=random.nextInt();queue.offer(newProduct("Product"+i.toString()));System.out.println("Producer"+Thread.currentThread().getName()+"生产的产品:"+i.toString());}}}其实它的主要逻辑很简单。为了方便演示,我这里加了很多打印语句,有点复杂。先看主要逻辑:publicvoidrun(){synchronized(queue){while(queue.size()==maxCapacity){//一定要用while而不是if,下面解释try{wait();}catch(InterruptedExceptione){e.printStackTrace();}}if(queue.size()==0){queue.notifyAll();}queue.offer(newProduct("Product"+i.toString()));}}这里有3条内容,我们可以和这个流程对比一下:生产者线程拿到锁后,其实进入了Q2阶段。首先检查队列是否满,如果满了,去Q3等待;如果不是,首先检查队列是否为空,如果为空,则需要通知消费者;最终生产出产品。这里有个问题,为什么只能用while而不用if?其实在这短短的一段中,生产者线程经历了几个过程:如果队列满了,就不能生产,也不能占位什么都不做,所以要让锁出来,进入Q3——等待排队等待;在等待队列中被唤醒后,不能直接抢锁,必须先加入Q1——同步队列等待资源;抢到资源后,关门上锁,才能来到Q2,继续执行wait()后的工作。但是此时队列可能又满了,所以退出wait()后,我们需要再次检查queue.size()==maxCapacity的条件,所以使用while。那么为什么它可能又满了呢?因为线程并不是一直持有锁,被唤醒后,在获取锁之间的时间段内,有可能是其他生产者线程先拿到了锁并进行了生产,所以队列经历了一个过程从不满到充实。总结:在使用线程的等待通知机制时,一般需要在while循环中调用wait()方法。消费者线程是完全对称的,我们看代码。publicclassConsumerimplementsRunnable{privateQueuequeue;privateintmaxCapacity;publicConsumer(Queuequeue,intmaxCapacity){this.queue=queue;this.maxCapacity=maxCapacity;}@Overridepublicvoidrun(){synchronized(queue){while(queue.isEmpty()){try{System.out.println("Consumer"+Thread.currentThread().getName()+"Waiting...队列缺货,无法消费");wait();System.out.println("Consumer"+Thread.currentThread().getName()+"退出等待");}catch(InterruptedExceptione){e.printStackTrace();}}if(queue.size()==maxCapacity){queue.notifyAll();}Productproduct=queue.poll();System.out.println("Consumer"+Thread.currentThread().getName()+"Consumed:"+product.getName());}}}结果如下:总结生产者消费者问题是面试中经常遇到的话题。本文首先讲了该模型的三大优势:解耦、异步、平衡速度差异,然后讲解了等待/通知的消息机制以及在应用中,最后进行了代码实现。本文所有代码已放在我的Github上:https://github.com/xiaoqi6666/NYCSDE。这个Github汇总了我所有的文章和资料,以后会持续更新维护。也希望大家给小七一个star。您的支持和认可是我创作最大的动力。下篇文章见!本文转载自微信公众号“码农田小七”,可通过以下二维码关注。转载本文请联系码农田小七公众号。