当前位置: 首页 > 后端技术 > Python

Python进阶:队列源码分析

时间:2023-03-26 12:19:15 Python

startingqueue模块提供了适合多线程编程的先进先出(FIFO)数据结构。因为它是线程安全的,多个线程可以很容易地使用同一个实例。源码分析从初始化函数开始:classQueue:def__init__(self,maxsize=0):#设置队列的最大容量self.maxsize=maxsizeself._init(maxsize)#线程锁,互斥量变量self.mutex=threading.Lock()#从锁中导出三个条件变量self.not_empty=threading.Condition(self.mutex)self.not_full=threading.Condition(self.mutex)self.all_tasks_done=threading.Condition(self.mutex))self.unfinished_tasks=0def_init(self,maxsize):#初始化底层数据结构self.queue=deque()这个初始化函数可以得到什么信息?首先,队列的容量是可以设置的,对于具体的底层存储元素,它使用了collections.deque()双端链表的数据结构,这样可以很方便的进行先进先出——出操作。_init函数在这里也被特地抽象出来,方便其子类覆盖,允许子类使用其他结构来存储元素(比如优先级队列使用列表)。然后是线程锁self.mutex,底层数据结构self.queue的操作必须先获取到它;然后就是三个条件变量,这三个Condition都是以self.mutex为参数的,也就是说,他们共享一把锁;由此我们可以知道withself.mutex和withself.not_empty是互斥的。基于这些锁的一些简单操作:isEmptywithself.mutex:returnnotself._qsize()deffull(self):#队列是否满withself.mutex:return00:ifnotblock:ifself._qsize()>=self.maxsize:raiseFull#如果block为False且队列已满,则抛出Full异常eliftimeoutisNone:whileself._qsize()>=self.maxsize:self.not_full.wait()#阻塞直到有剩余空间eliftimeout<0:#参数值不合格,抛出ValueErrorraiseValueError("'timeout'mustbeanon-negativenumber")else:endtime=time()+timeout#计算等待的结束时间whileself._qsize()>=self.maxsize:remaining=endtime-time()ifremaining<=0.0:raiseFull#已经没有等待periodSpace,throwsFull异常self.not_full.wait(remaining)self._put(item)#向底层数据结构添加一个元素self.unfinished_tasks+=1self.not_empty.notify()def_put(self,item):self.queue.append(item)虽然只有20行代码,但是这里的逻辑还是比较复杂的。它必须处理超时和队列中剩余空间不足的问题。具体情况如下:如果block为False,则忽略timeout参数。如果此时队列已满,将抛出AFull异常;如果此时队列未满,则立即将该元素保存到底层数据结构中;如果block为True且timeout为None,则put操作可能会阻塞,直到队列中有可用空间(默认);如果超时是一个非负数,它会阻塞相应的时间,直到队列中有剩余空间。在此期间,如果队列没有空间,则会抛出Full异常;处理好参数逻辑后,将元素保存到底层数据结构中,并递增unfinished_tasks,同时通知not_empty,唤醒等待其中数据的线程。出队操作:classQueue:...defget(self,block=True,timeout=None):withself.not_empty:ifnotblock:ifnotself._qsize():raiseEmptyeliftimeoutisNone:whilenotself._qsize():self.not_empty.wait()eliftimeout<0:raiseValueError("'timeout'mustbeanon-negativenumber")else:endtime=time()+timeoutwhilenotself._qsize():remaining=endtime-time()ifremaining<=0.0:raiseEmptyself.not_empty.wait(remaining)item=self._get()self.not_full.notify()returnitemdef_get(self):returnself.queue的.popleft()get()操作是put()的相反操作,代码块也很相似。get()从队列中移除第一个插入的元素并返回它。如果block为False,则忽略超时参数。如果此时队列中没有元素,则会抛出Empty异常;如果此时队列有元素,则立即将元素保存到底层数据结构中;如果block为True,如果timeout为None,则get操作可能会阻塞,直到队列中有元素为止(默认);如果超时是一个非负数,它会阻塞相应的时间,直到队列中有一个元素。在此期间,如果队列中没有元素,则会抛出Empty异常;最后,通过self.queue.popleft()移除最早放入队列的元素,并通知not_full唤醒等待其中数据的线程。这里值得注意的是self.unfinished_tasks在put()操作中是递增的,而在get()操作中并没有递减。为什么?这其实就是给用户留出时间去消费元素。get()只是获取元素,并不意味着该元素被消费者线程处理。用户需要调用task_done()来通知队列任务已经处理完毕:classQueue:...deftask_done(self):withself.all_tasks_done:unfinished=self.unfinished_tasks-1ifunfinished<=0:ifunfinished<0:#即当成功调用put()的次数小于调用task_done()的次数时,会抛出异常raiseValueError('task_done()calledtoomanytimes')self.all_tasks_done.notify_all()#当unfinished为0时,会通知all_tasks_doneself.unfinished_tasks=unfinisheddefjoin(self):withself.all_tasks_done:whileself.unfinished_tasks:#如果有未完成的任务,wait()方法会调用等待self.all_tasks_done.wait()。由于task_done()用户调用,当task_done()的个数大于put()的个数时,会抛出异常。task_done()操作的作用是唤醒阻塞的join()操作。join()方法会阻塞,直到队列中的所有元素都被取出并处理完毕(类似于线程的join方法)。也就是说join()方法必须和task_done()一起使用。LIFOLifoQueue使用LIFO顺序,类似于堆栈结构:queue=[]def_qsize(self):returnlen(self.queue)def_put(self,item):self.queue.append(item)def_get(self):returnself.queue.pop()这是LifoQueue所有的代码都没有了,这也是Queue设计得很好的原因之一。它将底层数据操作抽象为四个操作函数,并自行处理线程安全问题,使其子类只需要关注底层操作即可。LifoQueue的底层数据结构是存储在列表中的,列表中的最后一个元素可以通过self.queue.pop()移除,而不需要重新设置索引。PriorityQueuepriorityqueuefromheapqimportheappush,heappopclassPriorityQueue(Queue):'''Queue的变体,它按优先顺序检索打开的条目(最低的优先)。条目通常是以下形式的元组:(优先级数,数据)。'''def_init(self,maxsize):self.queue=[]def_qsize(self):returnlen(self.queue)def_put(self,item):heappush(self.queue,item)def_get(self):returnheappop(self.queue)优先级队列使用heapq模块的结构,也就是最小堆的结构。优先级队列更常用。队列中项目的处理顺序需要基于这些项目的特性。一个简单的例子:importqueueclassA:def__init__(self,priority,value):self.priority=priorityself.value=valuedef__lt__(self,other):returnself.priority