当前位置: 首页 > 科技观察

孔乙己:Kotlin生产者-消费者问题的八种解法

时间:2023-03-16 22:19:31 科技观察

转载本文请联系AndroidPub公众号。生产者消费者问题是线程模型中的一个经典问题:生产者和消费者在同一时间段内共享同一个缓冲区(Buffer),生产者向Buffer中添加产品,消费者从Buffer中取出产品.当Buffer为空时,消费者阻塞,当Buffer满时,生产者阻塞。Kotlin中实现多线程生产/消费模型的方式有很多(大部分也适用于Java)SynchronizedReentrantLockBlockingQueueSemaphorePipedXXXStreamRxJavaCoroutineFlow1。SynchronizedSynchronized是最基本的线程同步工具,可以通过wait/notify问题实现生产和消费。valbuffer=LinkedList()valMAX=5//缓冲区最大大小vallock=Object()funproduce(data:Data){sleep(2000)//mockproducesynchronized(lock){while(buffer.size>=MAX){//当缓冲区满了,停止生产//注意这里使用while不能使用if,因为可能被另一个生产线程唤醒,而不是消费者线程,所以再次检查缓冲区状态//如果生产消费了两个锁,没必要担心这个问题lock.wait()}buffer.push(data)//notify方法只唤醒其中一个线程,选择哪个线程取决于操作系统对多线程管理的实现.//notifyAll会唤醒所有等待的线程,哪个线程最先处理取决于操作系统的实现,但有机会处理。//这里使用notify可能会唤醒另一个生产线程造成死锁,所以必须使用notifyAlllock.notifyAll()}}funconsume(){synchronized(lock){while(buffer.isEmpty())lock.wait()//暂停消费buffer.removeFirst()lock.notifyAll()}sleep(2000)//mockconsume}@Testfuntest(){//同时启动多个生产和消费线程repeat(10){Thread{produce(Data())}.start()}repeat(10){Thread{consume()}.start()}}2、ReentrantLockLock相对于Synchronized的优势在于,当有多条生产线/消费者线程时,我们可以精确地条件指定哪一个醒来。下面的例子说明Lock和await/single替换了之前的Synchronized写法。valbuffer=LinkedList()valMAX=5//缓冲区最大大小vallock=ReentrantLock()valcondition=lock.newCondition()funproduce(data:Data){sleep(2000)//mockproducelock.lock()while(buffer.大小>=5)condition.await()buffer.push(data)condition.signalAll()lock.unlock()}funconsume(){lock.lock()while(buffer.isEmpty())condition.await()buffer.removeFirst()condition.singleAll()lock.unlock()sleep(2000)//mockconsume}3.BlockingQueue(阻塞队列)BlockingQueue在达到临界条件时会自动阻塞当前线程等待锁的释放。适合这种生产/消费场景。valbuffer=LinkedBlockingQueue(5)funproduce(data:Data){sleep(2000)//mockproducebuffer.put(data)//buffer满时自动阻塞}funconsume(){buffer.take()//buffer为empty自动阻塞sleep(2000)//mockconsume}注意BlockingQueue有三套读写方法,只有一套有阻塞作用,不要用错。方法说明add(o)/remove(o)add方法添加元素时,如果超过队列长度,会直接抛出异常。offer(o)/poll(o)offer添加元素时,如果发现队列满了,无法添加,则直接返回falseput(o)/take(o)put。当向队列尾部添加元素时,发现队列已满,会阻塞等待空间添加元素。4.信号量(semaphore)信号量是JUC提供的一种共享锁机制,可以进行拥塞控制。此功能可用于控制缓冲区的大小。//canProduce:可以生产的数量(即缓冲区可用的数量),生产者调用acquire,减少许可数valcanProduce=Semaphore(5)//canConsumer:可以消费的数量,producer调用release,增加许可数valcanConsume=Semaphore(5)//控制缓冲区访问互斥valmutex=Semaphore(0)valbuffer=LinkedList()funproduce(data:Data){if(canProduce.tryAcquire()){sleep(2000)//mockproducemutex.acquire()buffer.push(data)mutex.release()canConsume.release()//通知消费者添加了新产品}}funconsume(){if(canConsume.tryAcquire()){sleep(2000)//mockconsumemutex.acquire()buffer.removeFirst()mutex.release()canProduce.release()//通知生产端可以追加生产}}5.PipedXXXStream(管道)Java中的PipedInputStream/PipedOutputStream实现了类似管道的功能,用于不同线程之间的相互通信,th输入流中有一个缓冲区数组,当缓冲区数组为空时,输入流PipedInputStream所在的线程就会被阻塞。valpis:PipedInputStream=PipedInputStream()valpos:PipedOutputStreambylazy{PipedOutputStream().apply{pis.connect(this)//建立输入流和输出流之间的连接}}funproduce(data:ContactsContract.Data){while(true){sleep(2000)pos.use{//Kotlin使用use方便的释放资源it.write(data.getBytes())it.flush()}}}funconsume(){while(true){sleep(2000)pis.use{valbyteArray=ByteArray(1024)it.read(byteArray)}}}@TestfunTest(){repeat(10){Thread{produce(Data())}.start()}repeat(10){Thread{consume()}.开始()}}6。从概念上讲,RxJavaRxJava可以使用Observable/Subject作为生产者,使用Subscriber作为消费者,但是Subject和Observable都缺乏Buffer溢出时的阻塞机制,很难独立实现生产者/消费者或模型。Flowable的背压机制可以用来控制缓冲区的数量,建立上下游之间的通信。配合Atomic,可以换方向实现单生产者/单消费者场景(不适用于多生产者/多消费者场景)。classProducer:Flowable(){overridefunsubscribeActual(subscriber:org.reactivestreams.Subscriber){subscriber.onSubscribe(object:Subscription{overridefuncancel(){//...}privatevaloutStandingRequests=AtomicLong(0)overridefunrequest(n:Long){//接收下游通知并开始生产outStandingRequests.addAndGet(n)while(outStandingRequests.get()>0){sleep(2000)subscriber.onNext(Data())outStandingRequests.decrementAndGet()}}})}}classConsumer:DefaultSubscriber(){overridefunonStart(){request(1)}overridefunonNext(i:Data?){sleep(2000)//mockconsumerequest(1)//通知上游可以增加产量}overridefunonError(throwable:Throwable){//...}overridefunonComplete(){//...}}@Testfuntest_rxjava(){try{valtestProducer=Producer)valtestConsumer=Consumer()testProducer.subscribeOn(Schedulers.computation()).observeOn(Schedulers.single()).blockingSubscribe(testConsumer)}catch(t:Throwable){t.printStackTrace()}}7.CoroutineChannel协程中的Channel有一个拥塞控制机制,可以实现生产者和消费者之间的通信。Channel可以理解为协程版的阻塞队列,capacity指定队列容量。valchannel=Channel(capacity=5)suspendfunproduce(data:ContactsContract.Contacts.Data)=run{delay(2000)//mockproducechannel.send(data)}suspendfunconsume()=run{delay(2000)//mockconsumechannel.receive()}@Testfuntest_channel(){repeat(10){GlobalScope.launch{produce(Data())}}repeat(10){GlobalScope.launch{consume()}}}另外Coroutine提供了produce方法,边声明Channel边生产数据,写起来比较简单,适用于单消费者单生产者场景:funCoroutineScope.produce():ReceiveChannel=produce{repeat(10){delay(2000)//mockproducesend(Data())}}@Testfuntest_produce(){GlobalScope.launch{produce.consumeEach{delay(2000)//mockconsume}}}8.CoroutineFlowFlow和RxJava一样,因为缺少Buffer溢出时的阻塞机制,不适合生产消费问题是它的背压机制比较简单,不能像RxJava一样接收下游通知。但是Flow后来发布了SharedFlow,作为缓冲的热流,它提供了Buffer溢出策略,可以作为生产者和消费者之间的同步。valflow:MutableSharedFlow=MutableSharedFlow(extraBufferCapacity=5//缓冲区大小,onBufferOverflow=BufferOverflow.SUSPEND//缓冲区溢出策略:暂停)@Testfuntest(){GlobalScope.launch{repeat(10){delay(2000)//mockproducesharedFlow.emit(Data())}}GlobalScope.launch{sharedFlow.collect{delay(2000)//mockconsume}}}注意SharedFlow只能用于单生产者/单消费者场景。总结生产者/消费者问题,其本质核心就是多线程读写共享资源(Buffer)时的同步问题。理论上,只要有一个带有同步机制的多线程框架,比如线程锁、信号量、阻塞队列、协程Channel等,都可以实现生产消费模型。另外,RxJava和Flow虽然也是多线程框架,但是缺少Buffer溢出时的阻塞机制,所以不适合生产/消费场景,更适合纯响应式场景使用。