当前位置: 首页 > 科技观察

例子:一个服务器程序的架构介绍

时间:2023-03-17 10:08:07 科技观察

这篇文章将介绍我做过的一个项目的服务器架构和服务器编程的一些重要细节。程序运行环境操作系统:Centos7.0编译器:gcc/g++4.8.3、cmake2.8.11mysql数据库:5.5.47项目代码管理工具:VisualStudio2013一、程序结构程序共有17个线程,分别是分为9个数据库工作线程D和1个日志线程L,6个普通工作线程W,1个主线程M。(下面会用这些字母来指代这些线程)(一)、数据库工作线程9的用途数据库工作线程在线程启动之初就建立了与mysql的连接,也就是说,每个线程一路保持与mysql的连接,一共9个数据库连接。每个数据库工作线程同时有两个任务队列。第一个队列A存放的是需要进行数据库增删改查操作的任务sqlTask??,第二个队列B存放的是sqlTask??执行的结果。sqlTask??执行完成后,立即放入结果队列,所以结果队列中的任务也是需要一个一个执行的任务。大概的伪代码如下:voiddb_thread_func(){while(!m_bExit){if(NULL!=(pTask=m_sqlTask??.Pop())){//从m_sqlTask??中取出的任务先执行后,pTask会携带结果数据pTask->Execute();//得到结果后,立即将任务放入结果任务队列m_resultTask.Push(pTask);continue;}sleep(1000);}//endwhile-loop}Nowthe问题来了:任务队列A中的任务从哪里来?目前只有消费者,没有生产者。那么制作人是谁?任务队列B中的任务会去哪里呢?目前只有生产者,没有消费者。这两个问题先搁置一会,待会再回答。(2)工作线程和主线程在介绍主线程和工作线程是干什么的时候,引入了几个在服务器编程中经常被抽象出来的概念(这里以tcp连接为例):TcpServer是一个Tcp服务,server需要绑定设置ip地址和端口号,监听客户端在该端口号上的连接(通常会使用一个成员变量TcpListener来管理监听细节)。那么一个TcpServer要做的就是这些工作。此外,每当有新连接到达时,TcpServer都需要接收新连接。当存在多个新连接时,TcpServer需要有序地管理这些连接:连接的建立、断开等,即生成和管理。一个TcpConnection对象。一个连接对应一个TcpConnection对象,TcpConnection对象管理着连接的一些信息:比如连接状态,本端和对端的ip地址和端口号等数据通道对象Channel,它记录了socket的句柄,因此是一个连接上真正执行数据收发的执行者,Channel对象一般作为TcpConnection的成员变量。TcpSession对象将Channel接收到的数据解包,或者将准备好的数据打包,发送给Channel发送。总结一下:一个TcpServer依赖于TcpListener来监听和处理新的连接,依赖于TcpConnection对象来管理连接上的数据。TcpConnection实际上是依赖Channel来收发数据,依赖TcpSession来打包和解包数据。也就是说,一个TcpServer中有一个TcpListener,对应多个TcpConnection。有几个TcpSession有几个TcpConnections,同时有几个Channel。上面提到的TcpServer、TcpListener、TcpConnection、Channel和TcpSession是服务器框架的网络层。一个好的网络框架应该和业务代码解耦。即上层代码只需要获取数据和执行业务逻辑,无需关注数据的发送和接收、网络数据包的封装和解包、网络状态的变化(如网络断开和重新连接)。以发送数据为例:当业务逻辑将数据交给TcpSession时,TcpSession将数据打包(进程加载后可以进行一些加密或压缩操作),交给TcpConnection::SendData(),而TcpConnection::SendData()实际上调用了Channel::SendData(),因为Channel中包含了socket句柄,所以Channel::SendData()实际上调用了send()/sendto()/write()方法来发送数据。对于数据接收,有点不同:通过select()/poll()/epoll()等IO多路复用技术,在确定哪些TcpConnection有数据到达后,激活该TcpConnection的Channel对象调用recv()/recvfrom()/read()接收数据。接收到数据后,将数据交给TcpSession处理,最后交给业务层。需要注意的是,数据采集、拆包,甚至下发到业务层,都要分开。我的意思是:***不要拆包交给业务层,把数据采集的逻辑放在一起。因为数据采集是IO操作,拆包交给业务层是逻辑计算操作。IO操作通常比逻辑计算慢。具体怎么安排,要看服务器业务。也就是说,你要考虑你的服务器程序的性能瓶颈是在网络IO上还是在逻辑计算上。甚至网络IO也可以分为上行操作和下行操作。上行操作是客户端向服务器发送数据,下行是指服务器向客户端发送数据。有时数据上升较少而下降较多。(比如游戏服务器,当npc移动位置时,上行是客户端通知服务器自己的位置,而下行确实是服务器告诉每个在场的客户端)。工作线程流程:while(!m_bQuit){epoll_or_select_func();handle_io_events();handle_other_things();}其中epoll_or_select_func()是通过上面提到的select()/poll()/epoll()的IO复用技术,确定哪些TcpConnections有数据来。我的服务器代码一般只监听socket可读事件,不监听socket可写事件。至于如何发送数据,文章后面会介绍。所以对于可读事件,以epoll为例,这里需要设置的flags有:EPOLLIN普通可读事件(连接正常时产生该事件,recv()/read()函数返回的次数bytesreceived;当连接关闭时,这两个函数返回0,也就是说我们可以通过设置这个标志来监听新传入的数据和peershutdown事件)EPOLLRDHUPpeershutdown事件(linuxman手册说这个事件可以监控peershutdown,但是我实际debug的时候,即使peer关闭了,也没有触发这个事件,还是EPOLLIN,只是这个时候调用了recv()/read()函数,返回值会为0,那么在实际项目中是否可以设置这个flag?为了监控peer的关闭,还有待验证)EPOLLPRI带外数据muduo设置epoll_wait的超时事件为1毫秒,另外我的项目将epoll_wait的超时时间设置为10毫秒。这两个数值供大家参考。在本项目中,工作线程和主线程都是上面代码中的逻辑。主线程监听socket上的可读事件,即监听是否有新的连接。epollfd存在于主线程和每个工作线程上。如果有新的连接到来,在主线程的handle_io_events()中接受它。哪个线程的epollfd是新连接挂钩的套接字句柄?这里采用的方法是round-robin算法,即有一个对象CWorkerThreadManager记录每个工作线程的工作状态。伪代码大致如下:voidattach_new_fd(intnewsocketfd){workerthread=get_next_worker_thread(next);workerthread.attach_to_epollfd(newsocketfd);++next;if(next>max_worker_thread_num)next=0;}即从线程的epollfd开始***workerthread开始挂载新的socket,然后积累索引,这样下次就是第二个workerthread。如果超过工作线程数,则从第一个工作线程开始。这解决了“负载平衡”新连接套接字的问题。在实际代码中,还有一个细节需要注意:epoll_wait函数中的structepoll_events一开始应该设置多少才算合理?现有的担忧是浪费太多,少了还不行。我以前有个项目直接用4096:constantEPOLL_MAX_EVENTS=4096;constintdwSelectTimeout=10000;structepoll_eventevents[EPOLL_MAX_EVENTS];intnfds=epoll_wait(m_fdEpoll,事件,EPOLL_MAX_EVENTS,dwSelectTimeout/1000);一个好主意,就是一开始动态扩展的数量:n,当发现有events的fd数量已经达到n时,将structepoll_events的数量调整为2n,如果不够下次使用,就会变成4n,这样以此类推,作者巧妙的利用stl::vector在内存中的连续性实现了这个思路://初始化代码std::vectorevents_(16);//代码中的线程循环while(m_bExit){intnumEvents=::epoll_wait(epollfd_,&*events_.begin(),static_cast(events_.size()),1);if(numEvents>0){if(static_cast(numEvents)==events_.size()){events_.resize(events_.size()*2);}}}读到这里,你可能会认为工作线程所做的工作只是调用handle_io_events()来接收网络数据,其实不然,工作线程还可以对程序的业务逻辑做一些工作。那是在handle_other_things()里面。那么如何将这些任务添加到handle_other_things()中呢?写一个队列,先把任务放入队列,然后让handle_other_things()从队列中取出?我在这个项目中也借鉴了muduo库的做法。即在handle_other_things()中调用了一系列的函数指针。伪代码如下:voiddo_other_things(){somefunc();}//m_functors是一个stl::vector,其中每个元素都是一个函数指针voidsomefunc(){for(size_ti=0;ilock(mutex_);functors.swap(m_functors);}for(size_ti=0;ilock(mutex_);m_functors_.push_back(cb);//当B不忙时,直接加入篮子,不通知Bif(!bBusy){wakeup_to_do_task();}}voiddo_task(){bBusy=true;std::vectorfunctors;{std::unique_locklock(mutex_);functors.swap(pendingFunctors_);}for(size_ti=0;i=m_dwThreadsCount){m_index=0;}returnm_aoMysqlThreads[m_index++].AddTask(poTask);}类似问题2中的consumer也可能是do_other_things()函数执行体中的调用。下面说说问题3,业务层的数据经过TcpSession生成并打包后,如果需要发送,则将生成的任务丢给工作线程的do_other_things(),然后在相关的Channel中发送,因为套接字上的可用性不受监控。写事件,所以在调用send()或者write()的时候数据可能会被阻塞,没关系,sleep()一会,继续发送,一直尝试,直到数据发送完。伪代码如下:boolChannel::Send(){intoffset=0;while(true){intn=::send(socketfd,buf+offset,length-offset);if(n==-1){if(errno==EWOULDBLOCK){::sleep(100);continue;}}//对方关闭了socket,本端建议关闭elseif(n==0){close(socketfd);returnfalse;}offset+=n;if(offset>=length)break;}returntrue;}***,还有一个模块日志线程没有介绍,目前高性能日志实现方案并不常见。