当前位置: 首页 > 后端技术 > PHP

workerman源码-workerman事件监控

时间:2023-03-29 21:05:21 PHP

我们启动workerman之后,按照我们之前的理解。如果是linux下,worker子进程启动端口复用,监听和处理事件(win忽略)。那么workerman是如何完成对事件的监听和处理的呢?。让我们来看看。Workerlisten我们查看源码,在forkOneWorkerForLinux中有这样一行代码。听。我们看一下源码publicfunctionlisten(){//这里的socketName就是我们监听的地址信息。如果(!$this->_socketName){返回;}//设置自动加载信息。自动加载器::setRootPath($this->_autoloadRootPath);//需要监听的socket。if(!$this->_mainSocket){//解析套接字地址。$local_socket=$this->parseSocketAddress();//设置基本协议。如果是udp。它是STREAM_SERVER_BIND。如果不是udp就是4|8.请注意,这是按位或算法。是100(4)|1000(8)=1100.$flags=$this->transport==='udp'?STREAM_SERVER_BIND:STREAM_SERVER_BIND|STREAM_SERVER_LISTEN;$errno=0;$errmsg='';//设置端口重用。如果($this->reusePort){\stream_context_set_option($this->\context,'socket','so_reuseport',1);}//开始创建套接字。$this->_mainSocket=\stream_socket_server($local_socket,$errno,$errmsg,$flags,$this->_context);如果(!$this->_mainSocket){抛出新异常($errmsg);}//如果传输是ssl,则需要特殊处理。如果($this->transport==='ssl'){\stream_socket_enable_crypto($this->_mainSocket,false);}elseif($this->transport==='unix'){//如果是unix。$socket_file=\substr($local_socket,7);如果($this->user){chown($socket_file,$this->user);}if($this->group){chgrp($socket_file,$this->group);}}//尝试启用tcpkeepalive并禁用Nagle算法。如果(\function_exists('socket_import_stream')&&static::$_builtinTransports[$this->transport]==='tcp'){\set_error_handler(function(){});$套接字=\socket_import_stream($this->_mainSocket);\socket_set_option($socket,SOL_SOCKET,SO_KEEPALIVE,1);\socket_set_option($socket,SOL_TCP,TCP_NODELAY,1);\restore_error_handler();}//为资源流模式设置为非阻塞。\stream_set_blocking($this->_mainSocket,0);}//设置监听。$this->resumeAccept();}这里workerman先创建一个_mainSocket的socket监听信息,并设置为非阻塞模式。并设置监听器。publicfunctionresumeAccept(){//设置事件侦听器。if(static::$globalEvent&&true===$this->_pauseAccept&&$this->_mainSocket){//如果不是udp协议。只需使用acceptConnection即可收听。如果($this->transport!=='udp'){static::$globalEvent->add($this->_mainSocket,EventInterface::EV_READ,array($this,'acceptConnection'));}else{//udp协议,使用acceptUdpConnection监听static::$globalEvent->add($this->_mainSocket,EventInterface::EV_READ,array($this,'acceptUdpConnection'));}//将pauseAccept设置为false。$this->_pauseAccept=false;}}让我们看一下acceptConnection.publicfunctionacceptConnection($socket){//设置错误信息。\set_error_handler(函数(){});//接收信息。$new_socket=stream_socket_accept($socket,0,$remote_address);//恢复错误信息。\restore_error_handler();//如果没有接收成功。如果(!$new_socket){返回;}//底层使用Tcp进行监控。$connection=newTcpConnection($new_socket,$remote_address);//设置要监控的连接。$this->connections[$connection->id]=$connection;//将worker复制到其中.$connection->worker=$this;$connection->protocol=$this->protocol;$connection->transport=$this->transport;//设置事件监听器。$connection->onMessage=$this->onMessage;//连接关闭时的处理。$connection->onClose=$this->onClose;//设置错误信息。$connection->onError=$this->onError;$connection->onBufferDrain=$this->onBufferDrain;$connection->onBufferFull=$this->onBufferFull;//如果有onConnect连接处理,直接调用即可。如果($this->onConnect){try{\call_user_func($this->onConnect,$connection);}catch(\Exception$e){static::log($e);退出(250);}catch(\Error$e){static::log($e);退出(250);}}}到这里,我们就完成了对事件的监听。而在每一个链接下,worker的信息都赋给了TcpConnection的worker。所以,每一个Connection都会有worker的信息。事件监听完成。如何处理事件。eventloop我们在worker的run方法中有这句话。静态::$globalEvent->loop();这里使用无限循环等待处理事件。我们早些时候看到了。$globalEvent由workerman自己监控。它在run方法中初始化。//runif(!static::$globalEvent){$event_loop_class=static::getEventLoopName();static::$globalEvent=new$event_loop_class;$这个->resumeAccept();}//Worker::getEventLoopName()protectedstaticfunctiongetEventLoopName(){//如果存在,则直接返回。如果(static::$eventLoopClass){returnstatic::$eventLoopClass;}//如果Swoole\Event存在则删除。if(!\class_exists('\Swoole\Event',false)){unset(static::$_availableEventLoops['swoole']);}$loop_name='';//protectedstatic$\_availableEventLoops=array(//'libevent'=>'\\Workerman\\Events\\Libevent',//'event'=>'\\Workerman\\Events\\Event'//'swoole'=>'\\Workerman\\Events\\Swoole'//);foreach(static::$\_availableEventLoopsas$name=>$class){if(\extension_loaded($name)){$loop_name=$name;休息;}}if($loop\_name){//如果名称存在,则使用React下面的事件处理程序。如果(\interface_exists('\React\EventLoop\LoopInterface')){switch($loop_name){case'libevent':static::$eventLoopClass='\Workerman\Events\React\ExtLibEventLoop';休息;case'event':static::$eventLoopClass='\Workerman\Events\React\ExtEventLoop';休息;默认:static::$eventLoopClass='\Workerman\Events\React\StreamSelectLoop';休息;}}else{//只是默认值。我们这里使用Libevent。static::$eventLoopClass=static::$\_availableEventLoops[$loop_name];}}else{static::$eventLoopClass=\interface_exists('\React\EventLoop\LoopInterface')?'\Workerman\Events\React\StreamSelectLoop':'\Workerman\Events\Select';}returnstatic::$eventLoopClass;}以上是初始化$globalEvent。我现在的环境是libevent。那么让我们来看看吧。WorkermanEventsLibevent的循环方法。publicfunctionloop(){//调用了一个event_base_loop。\event_base_loop($this->_eventBase);}只有一种方法。event_base_loop等待事件被触发,然后触发它们的事件。这是$this->_eventBase。设置事件在我们的添加方法中。在listen的时候,我们调用了下面的方法。static::$globalEvent->add($this->_mainSocket,EventInterface::EV_READ,array($this,'acceptConnection'));那么我们来看一下add.publicfunctionadd($fd,$flag,$func,$args=array()){//判断信息。switch($flag){caseself::EV_SIGNAL:$fd_key=(int)$fd;$real_flag=EV_SIGNAL|EV_PERSIST;$this->_eventSignal[$fd_key]=event_new();如果(!\event_set($this->_eventSignal[$fd_key],$fd,$real_flag,$func,null)){returnfalse;}if(!\event_base_set($this->_eventSignal[$fd_key],$this->_eventBase)){返回false;}if(!\event_add($this->_eventSignal[$fd_key])){返回假;}变真;案例self::EV_TIMER:案例self::EV_TIMER_ONCE:$event=\event_new();$timer_id=(int)$event;如果(!\event_set($event,0,EV_TIMEOUT,array($this,'timerCallback'),$timer_id)){returnfalse;}if(!\event_base_set($event,$this->_eventBase)){返回false;}$time_interval=$fd*1000000;如果(!\event_add($event,$time_interval)){返回false;}$this->_eventTimer[$timer_id]=array($func,(array)$args,$event,$flag,$time_interval);返回$timer_id;默认值:$fd_key=(int)$fd;$real_flag=$flag===self::EV_READ?EV_READ|EV_PERSIST:EV_WRITE|EV_PERSIST;$event=\event_new();如果(!\event_set($event,$fd,$real_flag,$func,null)){返回错误的;}if(!\event_base_set($event,$this->_eventBase)){返回false;}如果(!\event_add($event)){返回false;}$this->_allEvents[$fd_key][$flag]=$event;返回真;}}从上面的方法中,我们可以简化几条语句//Createanewevent。$event=\event_new();//设置事件监听器。if(!\event_set($event,$fd,$real_flag,$func,null)){returnfalse;}//重置event的绑定事件。if(!\event_base_set($event,$this->_eventBase)){returnfalse;}//添加事件。if(!\event_add($event)){returnfalse;}//保存到全局事件。$this->_allEvents[$fd_key][$flag]=$event;所以。在run方法中,我们先设置事件监听,然后调用事件处理。事件处理的完成由我们的worker自带的onMessage方法处理。最后是event_set和event_base_set的作用。工人监视器上的工人。