1.线程池原理我们在使用线程的时候,创建一个线程,实现起来很简单,但是会出现一个问题:如果并发线程很多,而每个线程都是执行完一个短任务之后,所以频繁创建线程会大大降低系统的效率,因为频繁创建和销毁线程是需要时间的。那么有没有办法让线程可以复用,即执行完一个任务后,不被销毁,还能继续执行其他任务呢?线程池是多线程处理的一种形式。在处理过程中,将任务添加到队列中,然后在创建线程后自动启动这些任务。线程池线程都是后台线程。每个线程使用默认堆栈大小,以默认优先级运行,并且位于多线程单元中。如果一个线程在托管代码中空闲(比如等待一个事件),线程池将插入另一个工作线程来保持所有处理器忙碌。如果所有的线程池线程一直处于忙碌状态,但队列中有待处理的工作,线程池会在一段时间后创建另一个工作线程,但线程数永远不会超过最大值。超过最大值的线程可以排队,但在其他线程完成之前它们不会启动。每种编程语言都有线程池的概念,很多语言直接提供了线程池,作为程序员可以直接使用。下面介绍一下线程池的实现原理:线程池的主要组成部分由三部分组成,这三部分共同作用得到一个完整的线程池:1)。任务队列存储需要处理的任务,这些任务由工作线程处理。通过线程池提供的API函数,将一个等待处理的任务添加到任务队列中,或者将处理的任务从任务队列中删除,线程池的使用者也会从任务队列中删除,即,调用线程池函数向任务队列中添加任务的线程就是生产者线程2)。工作线程(任务队列任务的消费者),在N个线程池中维护一定数量的工作线程。它们的作用是不断地读取任务队列。从里面取出任务并处理工作的线程相当于任务队列的消费者角色,如果任务队列为空,工作线程会被阻塞(使用条件变量/信号量阻塞)如果有新的任务阻塞后,生产者将解除阻塞,工作线程将开始工作3)。管理器线程(不处理任务队列中的任务),它的任务之一就是周期性地检测任务队列中的任务数和处于忙碌状态的工作线程数。当任务过多时,可以适当新建一些工作线程。当任务太少时,可以适当销毁部分工作线程。2.任务队列//任务结构体typedefstructTask{void(*function)(void*arg);void*arg;}Task;3.线程池定义//线程池结构structThreadPool{//任务队列Task*taskQ;intqueueCapacity;//容量intqueueSize;//当前任务数intqueueFront;//队头->获取数据intqueueRear;//队尾->putdatapthread_tmanagerID;//管理线程IDpthread_t*threadIDs;//工作线程IDintminNum;//最小线程数intmaxNum;//最大线程数intbusyNum;//忙线程数intliveNum;//存活线程数intexitNum;//要销毁的线程数pthread_mutex_tmutexPool;//锁住整个线程池pthread_mutex_tmutexBusy;//锁定busyNum变量pthread_cond_tnotFull;//任务队列是否满pthread_cond_tnotEmpty;//任务队列是否空intshutdown;//是否销毁线程池,如果销毁则为1,如果不是,它将是0};4。头文件声明#ifndef_THREADPOOL_H#define_THREADPOOL_HtypedefstructThreadPoolThreadPool;//创建线程池并初始化ThreadPool*threadPoolCreate(intmin,intmax,intqueueSize);//销毁线程池intthreadPoolDestroy(ThreadPool*pool);//向线程池添加任务voidthreadPoolAdd(ThreadPool*pool,void(*func)(void*),void*arg);//获取线程池中工作线程数intthreadPoolBusyNum(ThreadPool*pool);//获取线程池中存活线程数intthreadPoolAliveNum(ThreadPool*pool);//////////////////////////////工作线程(消费者线程)任务函数void*worker(void*arg);//管理线程任务函数void*manager(void*arg);//单线程退出voidthreadExit(ThreadPool*pool);#endif//_THREADPOOL_H5.源文件定义ThreadPool*threadPoolCreate(intmin,intmax,intqueueSize){ThreadPool*pool=(ThreadPool*)malloc(sizeof(ThreadPool));do{if(pool==NULL){printf("mallocthreadpoolfail...\n");break;}pool->threadIDs=(pthread_t*)malloc(sizeof(pthread_t)*max);if(pool->threadIDs==NULL){printf("mallocthreadIDsfail...\n");break;}memset(pool->threadIDs,0,sizeof(pthread_t)*max);pool->minNum=min;pool->maxNum=max;pool->busyNum=0;pool->liveNum=min;//和最小个数等pool->exitNum=0;if(pthread_mutex_init(&pool->mutexPool,NULL)!=0||pthread_mutex_init(&pool->mutexBusy,NULL)!=0||pthread_cond_init(&pool->notEmpty,NULL)!=0||pthread_cond_init(&pool->notFull,NULL)!=0){printf("mutexorconditioninitfail...\n");break;}//任务队列pool->taskQ=(Task*)malloc(sizeof(Task)*queueSize);pool->queueCapacity=queueSize;pool->queueSize=0;pool->queueFront=0;pool->queueRear=0;pool->shutdown=0;//创建线程pthread_create(&pool->managerID,NULL,manager,pool);for(inti=0;ithreadIDs[i],NULL,worker,pool);}returnpool;}while(0);//释放资源if(pool&&pool->threadIDs)free(pool->threadIDs);if(pool&&pool->taskQ)free(pool->taskQ);if(pool)free(池);returnNULL;}intthreadPoolDestroy(ThreadPool*pool){if(pool==NULL){return-1;}//关闭线程池pool->shutdown=1;//阻塞回收管理器线程pthread_join(pool->managerID,NULL);//唤醒阻塞的消费者线程for(inti=0;iliveNum;++i){pthread_cond_signal(&pool->notEmpty);}//释放堆内存if(pool->taskQ){free(pool->taskQ);}if(pool->threadIDs){free(pool->threadIDs);}pthread_mutex_destroy(&pool->mutexPool);pthread_mutex_destroy(&pool->mutexBusy);pthread_cond_destroy(&pool->notEmpty);pthread_cond_destroy(&pool->notFull);free(pool);pool=NULL;return0;}voidthreadPoolAdd(ThreadPool*pool,void(*func)(void*),void*arg){pthread_mutex_lock(&pool->mutexPool);while(pool->queueSize==pool->queueCapacity&&!pool->shutdown){//块生产者线程pthread_cond_wait(&pool->notFull,&pool->mutexPool);}if(pool->shutdown){pthread_mutex_unlock(&pool->mutexPool);return;}//添加任务池->taskQ[pool->queueRear].function=func;pool->taskQ[pool->queueRear].arg=arg;pool->queueRear=(pool->queueRear+1)%pool->queueCapacity;pool->queueSize++;pthread_cond_signal(&pool->notEmpty);pthread_mutex_unlock(&pool->mutexPool);}intthreadPoolBusyNum(ThreadPool*pool){pthread_mutex_lock(&pool->mutexBusy);intbusyNum=pool->busyNum;pthread_mutex_unlock(&pool->mutexBusy);returnbusyNum;}intthreadPoolAliveNum(ThreadPool*pool){pthread_mutex_lock(&pool->mutexPool=intaliveNum)>liveNum;pthread_mutex_unlock(&pool->mutexPool);returnaliveNum;}void*worker(void*arg){ThreadPool*pool=(ThreadPool*)arg;while(1){pthread_mutex_lock(&pool->mutexPool);//当前任务是队列emptywhile(pool->queueSize==0&&!pool->shutdown){//阻塞工作线程pthread_cond_wait(&pool->notEmpty,&pool->mutexPool);//判断是否销毁线程if(pool->exitNum>0){pool->exitNum--;if(pool->liveNum>pool->minNum){pool->liveNum--;pthread_mutex_unlock(&pool->mutexPool);threadExit(pool);}}}//判断线程池是否关闭if(pool->shutdown){pthread_mutex_unlock(&pool->mutexPool);threadExit(pool);}//从任务队列中取出一个任务Tasktask;task.function=pool->taskQ[pool->queueFront].function;task.arg=pool->taskQ[pool->queueFront].arg;//移动头节点pool->queueFront=(pool->queueFront+1)%pool->queueCapacity;pool->queueSize--;//解锁pthread_cond_signal(&pool->notFull);pthread_mutex_unlock(&pool->mutexPool);printf("thread%ldstartworking...\n",pthread_self());pthread_mutex_lock(&pool->mutexBusy);pool->busyNum++;pthread_mutex_unlock(&pool->mutexBusy);task.function(task.arg);free(task.arg);task.arg=NULL;printf("thread%ldendworking...\n",pthread_self());pthread_mutex_lock(&pool->mutexBusy);pool->busyNum--;pthread_mutex_unlock(&pool->mutexBusy);}returnNULL;}void*manager(void*arg){ThreadPool*pool=(ThreadPool*)arg;while(!pool->shutdown){//每3s检查一次sleep(3);//获取线程池中的任务数和当前线程数pthread_mutex_lock(&pool->mutexPool);intqueueSize=pool->queueSize;intliveNum=pool->liveNum;pthread_mutex_unlock(&pool->mutexPool);//取出忙线程数pthread_mutex_lock(&pool->mutexBusy);intbusyNum=pool->busyNum;pthread_mutex_unlock(&pool->mutexBusy);//添加线程//任务数>存活线程数&&存活线程数<最大线程数if(queueSize>liveNum&&liveNummaxNum){pthread_mutex_lock(&pool->mutexPool);intcounter=0;for(inti=0;imaxNum&&counterliveNummaxNum;++i){if(pool->threadIDs[i]==0){pthread_create(&pool->threadIDs[i],NULL,worker,pool);counter++;pool->liveNum++;}}pthread_mutex_unlock(&pool->mutexPool);}//销毁线程//忙线程*2<存活数threads&&survivingthreads>最小线程数if(busyNum*2pool->minNum){pthread_mutex_lock(&pool->mutexPool);pool->exitNum=NUMBER;pthread_mutex_unlock(&pool->mutexPool);//让工作线程自杀(inti=0;i<NUMBER;++i){pthread_cond_signal(&pool->notEmpty);}}}returnNULL;}voidthreadExit(ThreadPool*pool){pthread_ttid=pthread_self();for(inti=0;imaxNum;++i){if(pool->threadIDs[i]==tid){pool->threadIDs[i]=0;printf("threadExit()called,%ldexiting...\n",tid);break;}}pthread_exit(NULL);}6。测试代码voidtaskFunc(void*arg){intnum=*(int*)arg;printf("thread%ldisworking,number=%d\n",pthread_self(),num);sleep(1);}intmain(){//创建线程池ThreadPool*pool=threadPoolCreate(3,10,100);for(inti=0;i<100;++i){int*num=(int*)malloc(sizeof(int));*num=i+100;threadPoolAdd(pool,taskFunc,num);}sleep(30);threadPoolDestroy(pool);return0;}