转载本文请联系AndroidPub公众号。生产者消费者问题是线程模型中的一个经典问题:生产者和消费者在同一时间段内共享同一个缓冲区(Buffer),生产者向Buffer中添加产品,消费者从Buffer中取出产品.当Buffer为空时,消费者阻塞,当Buffer满时,生产者阻塞。Kotlin中实现多线程生产/消费模型的方式有很多(大部分也适用于Java)SynchronizedReentrantLockBlockingQueueSemaphorePipedXXXStreamRxJavaCoroutineFlow1。SynchronizedSynchronized是最基本的线程同步工具,可以通过wait/notify问题实现生产和消费。valbuffer=LinkedList()valMAX=5//缓冲区最大大小vallock=Object()funproduce(data:Data){sleep(2000)//mockproducesynchronized(lock){while(buffer.size>=MAX){//当缓冲区满了,停止生产//注意这里使用while不能使用if,因为可能被另一个生产线程唤醒,而不是消费者线程,所以再次检查缓冲区状态//如果生产消费了两个锁,没必要担心这个问题lock.wait()}buffer.push(data)//notify方法只唤醒其中一个线程,选择哪个线程取决于操作系统对多线程管理的实现.//notifyAll会唤醒所有等待的线程,哪个线程最先处理取决于操作系统的实现,但有机会处理。//这里使用notify可能会唤醒另一个生产线程造成死锁,所以必须使用notifyAlllock.notifyAll()}}funconsume(){synchronized(lock){while(buffer.isEmpty())lock.wait()//暂停消费buffer.removeFirst()lock.notifyAll()}sleep(2000)//mockconsume}@Testfuntest(){//同时启动多个生产和消费线程repeat(10){Thread{produce(Data())}.start()}repeat(10){Thread{consume()}.start()}}2、ReentrantLockLock相对于Synchronized的优势在于,当有多条生产线/消费者线程时,我们可以精确地条件指定哪一个醒来。下面的例子说明Lock和await/single替换了之前的Synchronized写法。valbuffer=LinkedList()valMAX=5//缓冲区最大大小vallock=ReentrantLock()valcondition=lock.newCondition()funproduce(data:Data){sleep(2000)//mockproducelock.lock()while(buffer.大小>=5)condition.await()buffer.push(data)condition.signalAll()lock.unlock()}funconsume(){lock.lock()while(buffer.isEmpty())condition.await()buffer.removeFirst()condition.singleAll()lock.unlock()sleep(2000)//mockconsume}3.BlockingQueue(阻塞队列)BlockingQueue在达到临界条件时会自动阻塞当前线程等待锁的释放。适合这种生产/消费场景。valbuffer=LinkedBlockingQueue(5)funproduce(data:Data){sleep(2000)//mockproducebuffer.put(data)//buffer满时自动阻塞}funconsume(){buffer.take()//buffer为empty自动阻塞sleep(2000)//mockconsume}注意BlockingQueue有三套读写方法,只有一套有阻塞作用,不要用错。方法说明add(o)/remove(o)add方法添加元素时,如果超过队列长度,会直接抛出异常。offer(o)/poll(o)offer添加元素时,如果发现队列满了,无法添加,则直接返回falseput(o)/take(o)put。当向队列尾部添加元素时,发现队列已满,会阻塞等待空间添加元素。4.信号量(semaphore)信号量是JUC提供的一种共享锁机制,可以进行拥塞控制。此功能可用于控制缓冲区的大小。//canProduce:可以生产的数量(即缓冲区可用的数量),生产者调用acquire,减少许可数valcanProduce=Semaphore(5)//canConsumer:可以消费的数量,producer调用release,增加许可数valcanConsume=Semaphore(5)//控制缓冲区访问互斥valmutex=Semaphore(0)valbuffer=LinkedList()funproduce(data:Data){if(canProduce.tryAcquire()){sleep(2000)//mockproducemutex.acquire()buffer.push(data)mutex.release()canConsume.release()//通知消费者添加了新产品}}funconsume(){if(canConsume.tryAcquire()){sleep(2000)//mockconsumemutex.acquire()buffer.removeFirst()mutex.release()canProduce.release()//通知生产端可以追加生产}}5.PipedXXXStream(管道)Java中的PipedInputStream/PipedOutputStream实现了类似管道的功能,用于不同线程之间的相互通信,th输入流中有一个缓冲区数组,当缓冲区数组为空时,输入流PipedInputStream所在的线程就会被阻塞。valpis:PipedInputStream=PipedInputStream()valpos:PipedOutputStreambylazy{PipedOutputStream().apply{pis.connect(this)//建立输入流和输出流之间的连接}}funproduce(data:ContactsContract.Data){while(true){sleep(2000)pos.use{//Kotlin使用use方便的释放资源it.write(data.getBytes())it.flush()}}}funconsume(){while(true){sleep(2000)pis.use{valbyteArray=ByteArray(1024)it.read(byteArray)}}}@TestfunTest(){repeat(10){Thread{produce(Data())}.start()}repeat(10){Thread{consume()}.开始()}}6。从概念上讲,RxJavaRxJava可以使用Observable/Subject作为生产者,使用Subscriber作为消费者,但是Subject和Observable都缺乏Buffer溢出时的阻塞机制,很难独立实现生产者/消费者或模型。Flowable的背压机制可以用来控制缓冲区的数量,建立上下游之间的通信。配合Atomic,可以换方向实现单生产者/单消费者场景(不适用于多生产者/多消费者场景)。classProducer:Flowable(){overridefunsubscribeActual(subscriber:org.reactivestreams.Subscriber
