当前位置: 首页 > 后端技术 > Python

使用ThreadPoolExecutor后内存溢出(一)

时间:2023-03-26 13:04:51 Python

排查问题在从Kafka获取消费数据的过程中,使用ThreadPoolExecutor(线程池),在数据量较大的情况下,会造成内存泄漏,导致机器挂掉;伪代码为:defdeal_func(msg):#处理相关逻辑passpool=ThreadPoolExecutor(10)#LinkkafkawhileTrue:msg=client.poll(0.1)#Callpool.submit(deal_func,msg)调用程序后,内存直线上升;查询文章后找到原因,在循环中调用使用线程池时,进程会不断往线程池中扔任务,而不会判断线程池中是否有空闲进程;验证程序:importtimefromconcurrent.futuresimportThreadPoolExecutordeffunc(num):print(f'the{num}run')time.sleep(2)returnnum*numdefmain():pool=ThreadPoolExecutor(2)result=[]foriinrange(100):pool.submit(func,i)print(pool._work_queue.qsize())pool.shutdown()if__name__=='__main__':main()执行结果如下:1...95969798the2runthe3runthe4runthe5runthe6runthe7run可以看出当前进程还没有判断线程池中是否有空闲进程;进程在哪里发送任务到线程池?查看源码,可以发现:classThreadPoolExecutor(_base.Executor):#...省略代码self._max_workers=max_workersself._work_queue=queue.SimpleQueue()self._threads=set()self._broken=Falseself._shutdown=False代码中新建SimpleQueue作为线程池通信的队列对象,但是没有指定队列大小。在不断向线程池添加消息的过程中,线程池的消费速度跟不上生产速度。队列中会出现消息积压,导致进程内存不断增加;解决方法由于线程池使用的是无界队列,所以可以重写类,使用有界队列,如:):super(BoundThreadPoolExecutor,self).__init__(*args,**kwargs)self._work_queue=queue.Queue(2)在初始化函数__init__中写入线程池的队列对象,赋值为Queue(2),这样就可以限制队列的大小,在执行过程中,如果队列满了,程序会一直等待线程池,直到线程池中有空闲线程为止;到这里为止,已经解决了为什么进程会内存溢出,但是ThreadPoolExcutor是怎么实现的呢?可以进一步分析源码;见下一章;