当前位置: 首页 > 科技观察

侵入式服务和非侵入式程序结构

时间:2023-03-14 15:14:15 科技观察

通常一个后端服务器的结构是无法固定的,因为我们不知道服务的技术框架上会建立什么样的业务,而结构随着业务的不同而变化不同的。如果是这样,我们是不是可以讨论和抽象出一个可以适用于大部分业务场景的服务端程序结构呢?其实也不一定。大多数服务都会通过网络与其他服务或客户端通信,因此该服务必须包含网络通信模块。这样,网络通信模块将成为不同业务服务的公共部分。基于此,我们可以继续讨论。假设网络通信框架结构一定,根据通信数据是否流入和流出网络框架,我们将服务器结构分为侵入式和非侵入式两种。Non-intrusive结构Non-intrusive稍微简单一点,所以让我们先讨论一下。所谓非侵入性是指一个服务中的所有通信或业务数据都在网络通信框架内流动,即没有外部数据源注入或流出网络通信框架。例如,对于IM业务服务器,通常无论是单聊消息还是群聊消息,其核心业务本身的数据流都在网络通信框架内流动。单聊中,用户A向用户B发送消息,消息流实际上是从用户A的连接对象传递到用户B的连接对象,再通过B的连接对象的发送方法发送出去群聊也是如此,从一个用户的连接,到同时多个其他用户的连接对象。在任何一种情况下,这些连接对象都是网络通信框架的内部结构。非侵入式服务结构侵入式结构如果外部消息流入或流出网络通信模块,则相当于外部消息“侵入”了网络通信结构。我们称这种服务器结构为侵入式服务结构。侵入式服务器的结构除了网络通信组件之外,其他组件的结构设计可以是多种多样的。我们来看两种通用结构:结构一:业务线程(或数据源线程)处理数据,发送给网络通信组件。结构二:网络解包后,需要将任务交给专门的业务线程处理。处理后,需要通过网络通信组件再次发送。结构1其实就是结构2的后半部分,所以我们先看结构2。我们在一线程一循环的思想下介绍了各个网络线程的基本结构:,那么由于拆包得到的任务处理逻辑是比较耗时的,所以我们需要将这些任务交给专门的业务线程来处理。业务线程可以是一组工作的消费者线程,我们可以把这些任务放在某个队列中。这样,网络组件的线程(网络线程)就是生产者,业务工作线程就是消费者。我们可以使用互斥体、临界区(Windows)或条件变量等技术来协调生产者和消费者。这里据说会涉及到公共排队系统。这是一种常见的实现方式,其中数据从网络组件流向其他组件(业务组件)。接下来,如果业务组件处理后需要再次通过网络将处理后的数据进行通信,那么我们如何将处理后的数据从业务组件传输到网络组件呢?这里一般有两种方法。该方法通过一定的标志,如业务对应的socketfd、sessionID等,直接找到这些数据对应的网络组件中的session,直接发送数据。例如,有些数据需要处理后发送给所有用户。演示代码如下:}}上面代码中,dataToPush是需要发送给所有用户的数据,所以变量网络组件中记录的所有session对象都一一发送(m_mapSessions中记录了Session对象)。又如,如果将处理后的数据发送给用户,则演示代码如下::lock_guardscoped_lock(m_mutexForSession);//TODO:每次都遍历,太慢了,优化为(auto&session:m_mapSessions){if(session.second->isAccountIDMatched(accountID)){session.second->pushOtherTypeData(type,content,offset);found=true;}}if(!found){LOGW("useraccountId=%sisnotfoundinsessions,type:%s,data:%s",accountID.c_str(),type.c_str(),content.c_str());returnfalse;}returntrue;}以上代码逻辑根据数据中的accountID定位到具体的session,然后将数据发送给session对应的用户。这种方式是业务组件在处理完数据后将数据传递给网络线程的常用方式之一,但是这种方式有以下两个缺点。缺点1这里从调用关系来看,其实是业务线程调用网络线程相关的接口函数发送数据,也就是说本质上是业务线程直接发起的网络发送数据操作业务组件。如果按功能来划分,发送数据应该是数据网络线程的功能,而业务线程不应该发送数据,所以这一般是不合理的。由于session对象属于网络线程(网络线程管理这些session的生命周期),而这里业务线程直接操作session对象,所以在上面的demo代码中,使用了mutex(成员变量m_mutexForSession)相应的发送函数m_mapSessions保护会话记录集。这种方法的示意图如下:这种方法虽然不合理,但却是很多服务程序的做法。当业务组件调用这些发送方法时,这些会话通过互斥量被锁定。但是这里有一个效率问题。我们以上面向所有用户发送数据为例:1voidWebSocketSessionManager::pushInstrumentAndIndexIncrementData(conststd::string&dataToPush)2{3std::lock_guardscoped_lock(m_mutexForSession);4for(auto&session:m_mapSessions)5{6session.second->pushIndexIncrementData(dataToPush);7}8}这段代码实际上调用了每个session对象的pushInstrumentAndIndexIncrementData方法(代码第6行)。如果session对象的pushInstrumentAndIndexIncrementData方法耗时较长(消耗时间较长是相对的,在实际开发中要避免该函数耗时过长),因为记录session对象m_mapSessions正在被业务模块(业务线程)使用这个时候,所以如果网络线程要修改m_mapSessions对象,就必须等待业务线程调用完WebSocketSessionManager::pushInstrumentAndIndexIncrementData函数,这可能会影响网络线程的执行效率。所以有的开发者会这样设计:m_mapSessions将session对象指针复制到mapLocalSessionsmapLocalSessions=m_mapSessions;}//这里使用mapLocalSessions,这样网络线程才能继续操作复制原来m_mapSessions中记录的session指针。这样就大大降低了m_mutexForSession锁的粒度,业务线程尽快释放m_mapSessions,网络线程可以快速使用m_mapSessions。然而,这个看似不错的设计却存在严重的问题。m_mapSessions和mapLocalSessions中记录的很多会话指针都指向一个对象。如果此时有连接断开,网络线程会销毁m_mapSessions中记录的session对象,这样业务线程才有可能继续操作(发送数据)session对象的指针。这个时候指针已经是野指针了,会导致我们的程序崩溃。有读者会说,mapLocalSessions中记录的session对象不应该使用裸指针,而是智能指针,但智能指针不持有session的生命周期,如下形式:std::map>mapLocalSessions;虽然这样可以解决绝对使用session对象时查找session对象是否有效的问题,但是如果在使用session的过程中,比如进入session.second->pushInstrumentAndIndexIncrementData函数后,session被网络回收了thread,此时访问session的任何一个成员变量都会导致程序访问到非法指针,导致程序崩溃。所以,在使用这种方法的时候,一定不要使用这种降低锁粒度的技术,这是不正确的。为了保证性能,session.second->pushInstrumentAndIndexIncrementData函数的实现必须尽可能快的执行。缺点2缺点2是方法1在以下场景下的致命问题:假设你的服务有如下两种信息流,信息流1:需要发送业务组件产生的数据,交互后产生网络线程本身与客户端或下游服务的数据也必须发送。如果两种数据发送到同一个连接,则两种数据的顺序必须是特定的。这不好。因为在这种设计中,你的业务线程会间接使用某个session发送数据,而你的网络线程会直接使用某个session发送数据。这相当于多个线程同时调用send函数在同一个socket上发送。这种情况下,可能每个数据包都没有错,只是多个数据包之间的顺序错了。我在做我们交易系统的行情推送服务的时候就遇到过这样的问题。业务模块会从一个kafka主题中获取增量数据,然后侵入网络组件发送给用户,但是用户本身会向网络模块发送订阅命令,网络模块会通过内部的http服务查询到获取全量数据,推送给用户。用户接收到增量数据时,会在全量数据的基础上对全量数据进行增删改查。也就是说,如果用户没有收到全量数据,收到的增量数据会被丢弃。但是,丢弃的增量数据可能是有效的。因为被丢弃了,所以用户接收到全量数据,然后用新的增量数据对全量数据进行改造。这个时候转换后的增量数据已经不正确了。这是方法一不适用的场景,即侵入网络组件的其他组件产生的数据有多个来源,且多个来源有顺序要求。反之,如果入侵网络组件产生的数据源只有一个数据源,或者有多个数据源但数据源之间的数据没有顺序依赖,这种设计也是可以的。不适用场景示意图那么如果有多个数据源但是数据源之间的数据是顺序依赖的,有没有办法继续使用方法一呢?是的,可以在多个数据源处理完数据后,将数据交给一个专门对数据源进行排序的组件,然后排序组件统一调用网络组件的数据发送模块。需要注意的是,网络组件内部产生的需要发送的数据也交给了排序组件。原理图如下:可以给出这个场景的例子。我们在做交易系统的行情推送服务的时候,因为推送的数据有多个来源,有的来自kafka模块,有的来自管理后台接口,有的来自网络通信模块内部生成,最后这三类数据需要按照一定的顺序发送给用户。我只是用上面原理图所示的结构来设计。排序组件使用队列,不同数据源的数据按照一定的顺序进入队列,排序组件从队列中一个一个取出排序后的数据调用网络通信组件的数据发送模块发送.方法二方法一是在业务组件中直接调用网络组件的方法,有点过分。第二种方式是将业务组件需要发送的数据交给网络组件自己发送。一种常见的实现方式是将相应的数据添加到数据所属连接的网络线程中,然后看看这个结构体:}可以使用另外一个Queue,业务组件会把队列交给这个队列,然后通知对应网络组件中的线程需要去取任务执行。这个逻辑前面介绍过,就是利用唤醒机制来执行handle_other_things函数。这里是一个实现,业务组件调用voidEventLoop::runInLoop(constFunctor&cb){if(isInLoopThread()){cb();}else{queueInLoop(cb);}}其中cb是要执行的任务,因为业务thread和网络线程不是同一个线程,所以会执行queueInLoop函数。queueInLoop的实现如下,让任务放到pendingFunctors_容器中,然后调用唤醒函数wakeup()。voidEventLoop::queueInLoop(constFunctor&cb){{std::unique_locklock(mutex_);pendingFunctors_.push_back(cb);}if(!isInLoopThread()||doingOtherTasks_){wakeup();}}唤醒最后一个线程执行handle_other_things()函数,该函数从pendingFunctors_中获取任务执行。voidEventLoop::handle_other_things(){std::vectorfunctors;doingOtherTasks_=true;{std::unique_locklock(mutex_);functors.swap(pendingFunctors_);}for(size_ti=0;i