当前位置: 首页 > Web前端 > HTML

线程安全的生产者消费者四种实现方法

时间:2023-04-02 22:10:11 HTML

线程安全的生产者消费者的四种实现方式,是多线程并发协作问题的经典案例。该场景包含三个对象,生产者(Producer)、消费者(Consumer)和一个固定大小的缓冲区(Buffer)。生产者的主要作用是不断产生数据放入缓冲区,消费者不断从缓冲区中消费数据。这个问题的关键是如何线程安全的操作共享数据块,从而保证生产者线程和消费者线程能够正确的更新数据块。主要的考虑是1.生产者不会在缓冲区满的情况下添加数据。2.消费者应该停止消费缓冲区中的数据。3.同一时间只允许一个生产者或消费者访问共享缓冲区(这是互斥操作访问共享块的要求)。解决问题的方案上面的问题通常有信号量、wait¬ify、管道或者阻塞队列等几种思路。本文以Java语言为例一一讲解。信号量(Semaphore),又称信号量,用于控制同时访问资源的数量,比如控制访问数据库的最大连接数。线程通过acquire()获取连接权限。数据操作完成后,通过release()释放license。对于生产者-消费者问题,为了满足线程安全运行的要求,我们只允许一个线程同时访问共享数据区,所以需要一个大小为1的信号量互斥量来控制相互排除操作。请注意,我们还定义了notFull和notEmpty信号量。notFull用于标识当前可用块的空间大小。当notFullsize大于0时,表示“未满”,producer可以继续生产。当等于0时,表示空间已满,不能继续生产;同样,对于notEmpty信号量,当它大于0时,表示“不为空”,消费者可以继续消费,当它等于0时,表示没有商品,不能继续消费。notFull的初始大小为5(5个可用空间可用于生产),notEmpty的初始大小为0(没有产品可用于消费)。/*数据仓库类,所有生产者和消费者共享这个类对象**/staticclassDataWareHouse{//共享数据区privatefinalQueuedata=newLinkedList();//未满锁privatefinalSemaphorenotFull;//非空锁privatefinalSemaphorenotEmpty;//互斥锁privatefinal信号量互斥锁;publicDataWareHouse(intcapacity){this.notFull=newSemaphore(capacity);this.notEmpty=new信号量(0);mutex=new信号量(1);}publicvoidoffer(Stringx)throwsInterruptedException{notFull.acquire();//生产者获取信号,notFull信号量减一mutex.acquire();//当前进程获得signal,mutex信号量减1。其他线程被阻塞操作共享块数据data.add(x);mutex.release();//互斥信号量+1,其他线程可以继续signal操作共享块数据notEmpty.release();//成功产生数据,notEmpty信号量加1}publicStringpoll()throwsInterruptedException{notEmpty.acquire();//notEmpty信号减去一个mutex.acquire();字符串结果=data.poll();mutex.release();notFull.rele酶();//成功消耗数据,notFull信号量加1返回结果;}}/Producer线程程序/staticclassProducerimplementsRunnable{privatefinalDataWareHousedataWareHouse;publicProducer(finalDataWareHousedataWareHouse){this.dataWareHouse=dataWareHouse;}@Overridepublicvoidrun(){while(true){try{Thread.sleep(100);//生产的速度慢于消费的速度Strings=UUID.randomUUID().toString();System.out.println("输入数据"+s);dataWareHouse.offer(s);}catch(InterruptedExceptione){e.printStackTrace();}}}}/Consumer线程程序/staticclassConsumerimplementsRunnable{privatefinalDataWareHousedataWareHouse;publicConsumer(finalDataWareHousedataWareHouse){this.dataWareHouse=dataWareHouse;}@Overridepublicvoidrun(){while(true){while(true){try{System.out.println("获取数据"+dataWareHouse.poll());}catch(InterruptedExceptione){e.printStackTrace();}}}}}//测试代码publicstaticvoidmain(String[]args){finalDataWareHousedataWareHouse=newDataWareHouse(5);//连续生产的三个生产者for(inti=0;i<3;i++){Threadt=newThread(newProducer(dataWareHouse));t.开始();}//三个消费者继续消费for(inti=0;i<3;i++){Threadt=newThread(newConsumer(dataWareHouse));t.开始();}}Wait和Notify机制JavaObjectobject类包含三个final方法以允许线程之间进行通信以通知资源的状态。它们是wait()、notify()和notifyAll()。wait():顾名思义,告诉当前线程释放锁,进入休眠状态(等待状态),等待资源。wait方法本身是一个native方法,它在Java中的使用语法如下:synchronized(lockObject){while(!condition){lockObject.wait();}//在这里采取行动;}notify():使用唤醒处于等待状态的线程,同时释放锁,被唤醒的线程可以重新获得锁来访问资源。它的基本语法如下synchronized(lockObject){//establish_the_condition;lockObject.notify();//如果需要可以添加任何代码}notifyAll():与notify()不同,它用于唤醒所有处于等待状态的线程.语法如下:synchronized(lockObject){establish_the_condition;lockObject.notifyAll();}说完这三个方法,我们来看看如何使用wait¬ify(All)来解决我们的问题。新的DataWareHouse类如下://生产者类和消费者共享对象staticclassDataWareHouse{//共享数据区privatefinalQueuedata=newLinkedList();私有int容量;私有整数大小=0;publicDataWareHouse(intcapacity){this.capacity=capacity;}publicsynchronizedvoidoffer(Stringx)throwsInterruptedException{while(size==capacity){//当缓冲区满时,生产者进入等待状态this.wait();//使用这个对象来锁定}data.add(x);尺码++;通知所有();//当缓冲区有数据时,唤醒所有等待的消费者线程}publicsynchronizedStringpoll()throwsInterruptedException{while(size==0){//当缓冲区为空时,消费者进入等待状态this.wait();}字符串结果=data.poll();尺寸-;通知所有();//当数据被消费,空间被释放时,通知所有等待的生产者。返回结果;}}注意:在方法上使用synchronized等同于在方法体中使用synchronized(this),两者都是使用this对象作为锁。生产者和消费者类,以及测试代码和信号量部分都是一样的,不再赘述。Pipeline管道是实现进程或线程之间通信的常用方式(线程通常通过共享内存进行通信,而进程则使用scoket、管道和消息队列等技术)。它连接输入流和输出流,基于生产者-消费者模式构建的技术。具体实现可以通过创建管道输入流对象和管道输出流对象,然后将输入流和输出流链接起来。生产者向管道写入数据,消费者读取管道数据流中的数据。通过这种方式,实现了线程之间的相互通信。具体实现代码如下私有最终PipedOutputStreampos;publicDataWareHouse()抛出IOException{pis=newPipedInputStream();(位置);//连接管道}//向管道写入数据publicvoidoffer(intval)throwsIOException{pos.write(val);pos.flush();}//从管道中获取数据。publicintpoll()throwsIOException{//当管道中没有数据时,方法阻塞returnpis.read();}//关闭管道@Overridepublicvoidclose()throwsIOException{if(pis!=null){pis.close();}if(pos!=null){pos.close();}}}//消费者类staticclassConsumerimplementsRunnable{privatefinalDataWareHousedataWareHouse;消费者(DataWareHousedataWareHouse){this.dataWareHouse=dataWareHouse;}@Overridepublicvoidrun(){try{//消费者不断从管道中读取数据while(true){intnum=dataWareHouse.poll();System.out.println("获取数据+"+num);}}catch(IOExceptione){thrownewRuntimeException(e);}}}staticclassProducerimplementsRunnable{privatefinalDataWareHousedataWareHouse;私人最终随机随机=新随机();生产者(DataWareHousedataWareHouse){@this.dataWareHouse=;data}HareOverridepublicvoidrun(){try{//生产者不断向管道写入数据while(true){intnum=random.nextInt(256);dataWareHouse.offer(num);System.out.println("输入数据+"+num);线。睡眠(1000);}}catch(Exceptione){thrownewRuntimeException(e);}}publicstaticvoidmain(String[]args)抛出IOException{DataWareHousedataWareHouse=newDataWareHouse();新线程(新生产者(dataWareHouse))。开始();新线程(新消费者(dataWareHouse))。开始();}}阻塞队列阻塞队列(BlockingQueue),具有1.当队列满时阻塞入队操作2.当队列为空时阻塞出队操作3.线程安全特性,所以阻塞队列通常被认为是实现生产消费模式最方便的工具,其中DataWareHouse类实现代码如下:staticclassDataWareHouse{//共享数据区privatefinalBlockingQueueblockingQueue;publicDataWareHouse(intcapacity){this.blockingQueue=newArrayBlockingQueue<>(capacity);}publicvoidoffer(Stringx){blockingQueue.offer(x);}publicStringpoll(){returnblockingQueue.poll();}}生产者和消费者类,以及测试代码和信号量部分都是一样的,这里不再赘述。生产者-消费者问题的总结是面试中经常遇到的问题。本文总结了几种常见的实现方式。面试过程中通常不需要向面试官描述太多的实现细节。说明每种实现方法的特点就足够了。希望能给你带来帮助。