首发地址day05实现多线程需要注意什么?项目仓库地址https://github.com/lzs123/CProxy,欢迎fork和star!上一篇教程day01-从一个基础的socket服务开始day02真正的高并发依赖于IO多路复用day03C++项目开发配置最佳实践(vscode远程开发配置、格式化、代码检查、cmake管理配置)day04高性能服务设计思路Howto初始化工作线程?在我们的设计中,工作线程本身就是一个事件循环,启动后会被阻塞,等待事件的发生。为了达到这个效果,需要在线程启动的时候做一些初始化工作。我们定义了EventLoopThread类,定义如下~EventLoopThread();无效启动循环();无效线程函数();voidAddChannel(SP_Channel);voidAddConn(SP_Conn);SP_EventLoopGetLoop(){returnloop_;}私有:SP_EventLoop循环_;布尔开始了_;;std::condition_variablecond_;};SP开头代表对应类的shared_ptr智能指针类型;例如,SP_EventLoop=std::shared_ptr\在EventLoopThread的构造函数中,创建了一个std::thread对象,EventLoopThread::ThreadFunc将函数作为一个线程来执行。voidEventLoopThread::ThreadFunc()try{if(loop_){throw"loop_isnotnull";}loop_=SP_EventLoop(newEventLoop());{std::unique_locklock(mutex_);开始_=真;cond_.notify_all();}loop_->Loop();}catch(std::exception&e){SPDLOG_CRITICAL("EventLoopThread::ThreadFuncexception:{}",e.what());abort();}让我们先看第5行:loop_=SP_EventLoop(newEventLoop());初始化一个EventLoop,赋值给EventLoopThread的成员变量loop_,我们先看一下EventLoop的定义classEventLoop{public:EventLoop():poller_(SP_Epoll(newEpoll())){};无效循环();voidAddToPoller(SP_Channel通道);voidUpdateToPoller(SP_Channel通道);voidRemoveFromPoller(SP_Channel通道);私有:SP_EventDispatcherpoller_;};在EventLoop的构造函数中,初始化了一个epoll对象,赋值给变量poller_。poller_本身是一个EventDispatcher对象,epoll继承了EventDispatcher,表示基于epoll的事件分发;这样做的好处是,如果要增加事件分发机制,比如项目支持mac环境,我们需要使用kqueue代替epoll来实现事件分发。这时候我们只需要修改EventLoop的构造函数,将新的事件分发对象Kqueue赋值给poller_即可。我们先看看Epoll在初始化的时候做了什么。epoll::Epoll():epoll_fd_(epoll_create1(EPOLL_CLOEXEC)),epoll_events_(EVENTSNUM){assert(epoll_fd_>0);}调用epoll_create1并创建一个epoll实例,也就是说,当一个Epoll对象被初始化时,它是内核中已经准备了一个eventpoll对象,我们可以添加sockets并监听相关事件。回头看EventLoopThread::ThreadFunc,初始化一个EventLoop对象后,EventLoopThread::ThreadFunc会再次调用loop_->Loop()。这就是我们前面提到的事件循环。voidEventLoop::Loop(){std::vectorready_channels;对于(;;){ready_channels.clear();ready_channels=poller_->WaitForReadyChannels();对于(SP_Channelchan:ready_channels){chan->HandleEvents();}}}EventLoop::Loop本身就是一个死循环,逻辑大概比较简单。就是不断从poller_中获取触发事件的通道列表,然后遍历列表,调用对应的HandleEvent事件处理函数。当没有事件发生时,循环会阻塞在WaitForReadyChannels处,底层实际上阻塞在epoll_wait处。(线程在阻塞过程中被挂起,不占用cpu)。下面简单回顾一下线程的初始化。创建一个EventLoop对象,底层通过调用epoll_create1创建一个epoll实例,通过这个epoll实例可以添加事件监听。调用EventLoop::Loop后,当没有事件发生时,线程会被阻塞;当事件发生时,会调用注册对应的handleEvents方法进行处理。如何控制线程的启动顺序?上面我们讲了线程初始化,但是初始化之后EventLoopThread还需要调用StartLoop开始工作。这其实就是让主线程等待线程池中的工作线程完成初始化。为什么要控制?首先说说为什么主线程要等待工作线程完成初始化。在我们的线程模型设计中,主线程负责监听和接收新的连接请求,然后在线程池中选择一个工作线程,将新的连接套接字交给工作线程处理。假设工作线程不需要StartLoop,在工作线程初始化后直接加入线程池。voidEventLoopThreadPool::start(){for(inti=0;iStartLoop();threads_.emplace_back(t);}}当有新的连接时,主线程从线程池中获取一个工作线程。但是此时,我们不能保证选择的工作线程已经初始化了loop_。因为EventLoopThread::ThreadFunc的执行是异步的,执行顺序可能如下。当主线程选择向工作线程添加新连接时,此时工作线程的loop_还没有初始化,可能会导致程序直接coredump。因此,我们必须想办法在主线程开始接收新的连接请求之前初始化工作线程的EventLoop。如何控制?这其实是一个多线程的通知问题。我们主要使用mutex和condition这两个武器来完成通过条件变量的通知。在C++中,我们通常使用condition_variable配合mutex来处理线程间的同步。主要函数是condition_variable::notify_xx和condition_variable::wait。顾名思义,wait是等待的函数。假设我们在线程A中等待,流程如下:lock获取锁,调用wait,会自动unlock并释放锁,然后阻塞线程;被其他线程唤醒后,会自动lock获取锁,继续执行下一行代码唤醒线程。唤醒线程的函数是notify_all和notify_one。两者的区别是notify_one()一次只唤醒一个线程,然后notify_all()函数会唤醒所有等待的线程(当最终能够抢到锁时只有一个线程)。调用过程如下:锁获取到锁,调用notify_all/notify_one唤醒等待线程释放锁。我们为EventLoopThread引入了StartLoop方法,大概效果如下。hclassEventLoopThread{public:...voidStartLoop();无效线程函数();...私人:...boolstarted_;std::mutex互斥量_;std::condition_variablecond_;};//lib/event_loop_thread.cppvoidEventLoopThread::ThreadFunc()try{if(loop_){throw"loop_isnotnull";}}loop_=SP_EventLoop(newEventLoop());{std::unique_locklock(mutex_);开始_=真;cond_.notify_all();}loop_->Loop();}catch(std::exception&e){SPDLOG_CRITICAL("EventLoopThread::ThreadFuncexception:{}",e.what());中止();}voidEventLoopThread::StartLoop(){std::unique_locklock(mutex_);while(!started_)cond_.wait(lock);}首先我们要明确一点,worker线程初始化loop_之后,就意味着线程准备好接收和处理socket了。所以我们完成loop_的初始化后,将started_设置为true,然后发送notify通知唤醒等待线程。在StartLoop函数中,我们首先检查started_是否为false。如果为真,说明工作线程已经初始化了loop_。这种情况下,StartLoop不再需要等待,直接返回即可;如果started_为false,则陷入等待,直到工作线程在完成loop_initialization后被唤醒。如何将套接字添加到工作线程?最后,让我们仔细看看新的连接套接字是如何添加到工作线程中的。当没有请求时,主线程会阻塞accept调用,当有新的连接请求时,accept会返回新的连接套接字accept_fd。主线程会先将accept_fd封装成一个Conn对象。上一节《day04 高性能服务设计思路》提到项目中存在各种连接,而这些连接有一个共同的基类Conn,Conn主要将socket封装成一个Channel,并设置这个Channel的各种事件回调处理逻辑。不同类型的Conn有自己的回调处理逻辑。接下来主线程通过EventLoopThreadPool::PickRandThread获取一个工作线程。SP_EventLoopThreadEventLoopThreadPool::PickRandThread(){SP_EventLoopThreadt;{std::unique_locklock(thread_mutex_);t=threads_[next_work_thread_Idx_];next_work_thread_Idx_=(next_work_thread_Idx_+1)%;}tnum_here我们直接使用轮训策略在线程池中选择线程。获取工作线程后,我们直接调用EventLoopThread::AddConn将连接交给工作线程。//lib/event_loop_thread.cppvoidEventLoopThread::AddConn(SP_Connconn){loop_->AddToPoller(conn->GetChannel());}//lib/event_loop.cppvoidEventLoop::AddToPoller(SP_Channelchannel){poller_->PollAdd(channel);}//lib/epoll.cppvoidEpoll::PollAdd(SP_Channelchannel){intfd=channel->getFd();epoll_event事件;事件.data.fd=fd;event.events=channel->GetEvents();if(epoll_ctl(epoll_fd_,EPOLL_CTL_ADD,fd,&event)<0){SPDLOG_CRITICAL("epoll_ctlfd:{}err:{}",fd,strerror(errno));}else{fd2chan_[fd]=channel;}}可以发现,底层是调用epoll_ctl将socketfd添加到对应工作线程的epoll实例中。这里值得注意的是【将socket添加到worker线程的epoll实例】这个动作是在主线程上完成的。由于epoll是线程安全的,直接在主线程上操作worker线程的epoll实例是没有问题的。继续思考有没有办法把【将socket添加到工作线程的epoll实例】的动作放在工作线程上完成呢?事实上,这种做法更为普遍。例如,有时为了避免加锁,提高运行效率,某些操作需要由主线程触发,由工作线程执行。这里的难点在于工作线程本身就是一个死循环。当没有事件发生时,会一直阻塞在epoll_wait上。在这种情况下,主线程如何通知工作线程执行操作呢?这是一种思考方式。我们可以在EventLoop初始化的时候调用eventfd()创建一个socketevent_fd,EventLoop添加一个read事件来监听event_fd。在EventLoop::Loop函数中,每轮读写处理完后,都会执行另一个函数doPendingFns()。伪代码如下voidEventLoop::Loop(){std::vectorready_channels;对于(;;){ready_channels.clear();ready_channels=poller_->WaitForReadyChannels();对于(SP_Channelchan:ready_channels){chan->HandleEvents();}doPendingFns();}}voiddoPendingFns(){std::vectorfns;{MutexLockGuard锁(mutex_);fns.swap(pendingFns_);}for(autofn:fns){fn();}}当主线程需要工作线程执行某个函数时,只需要到工作线程中将相应的函数添加到pendingFns列表中,然后向event_fd写入一些数据,让工作线程退出block,以及工作线程最终会遍历并执行doPendingFns中pendingFns列表中的所有函数。如果本文对您有用,请点个赞,走起!或者关注我,我会带来更多优质内容