当前位置: 首页 > 后端技术 > PHP

Swoole源码分析——Server模块的Stream模式

时间:2023-03-29 14:51:31 PHP

swReactorThread_dispatch发送数据Reactor线程会通过swReactorThread_dispatch发送数据。使用stream发送数据时,会调用swStream_new新建一个stream,使用swStream_send发送数据。intswReactorThread_dispatch(swConnection*conn,char*data,uint32_tlength){...if(serv->dispatch_mode==SW_DISPATCH_STREAM){swStream*stream=swStream_new(serv->stream_socket,0,SW_SOCK_UNIX_STREAM);如果(流==NULL){返回SW_ERR;}stream->response=swReactorThread_onStreamResponse;stream->session_id=conn->session_id;swListenPort*port=swServer_get_port(serv,conn->fd);swStream_set_max_length(stream,port->protocol.package_max_length);task.data.info.fd=conn->session_id;task.data.info.type=SW_EVENT_PACKAGE_END;任务.data.info.len=0;如果(swStream_send(stream,(char*)&task.data.info,sizeof(task.data.info))<0){returnSW_ERR;}if(swStream_send(stream,data,length)<0){stream->cancel=1;返回SW_ERR;}返回SW_OK;}...}swStream_new创建一个新流。可以看到流自动采用包长检测的方式。该函数的主要作用是设置各种回调函数。值得注意的是,swClient_create的第三个参数代表是否异步。为1,也就是说connect和send都是异步的。typedefstruct_swStream{swString*缓冲区;uint32_tsession_id;uint8_t取消;void(*response)(struct_swStream*stream,char*data,uint32_tlength);swClient客户端;}swStream;swStream*swStream_new(char*dst_host,intdst_port,inttype){swStream*stream=(swStream*)sw_malloc(sizeof(swStream));bzero(流,sizeof(swStream));swClient*cli=&stream->client;如果(swClient_create(cli,type,1)<0){swStream_free(stream);返回空值;}cli->onConnect=swStream_onConnect;cli->onReceive=swStream_onReceive;cli->onError=swStream_onError;cli->onClose=swStream_onClose;cli->对象=流;cli->open_length_check=1;swStream_set_protocol(&cli->协议);if(cli->connect(cli,dst_host,dst_port,-1,0)<0){swSysError("无法连接到[%s:%d]。",dst_host,dst_port);swStream_free(流);r返回空值;}else{返回流;}}voidswStream_set_protocol(swProtocol*protocol){protocol->get_package_length=swProtocol_get_package_length;协议->package_length_size=4;协议->package_length_type='N';->package_length_offset=0;}swStream_onConnect连接回调函数swStream_onConnect不仅是连接成功的回调函数,也是每个onWrite写事件的回调函数,所以每次发送数据都需要调用cli->send函数storeinstream->buffer值得注意的是,每次发送数据时,都必须将数据长度存放在buffer的头部,否则包长检测会失败。staticvoidswStream_onConnect(swClient*cli){swStream*stream=(swStream*)cli->object;如果(流->取消){cli->关闭(cli);}*((uint32_t*)stream->buffer->str)=ntohl(stream->buffer->length-4);如果(cli->send(cli,stream->buffer->str,stream->buffer->length,0)<0){cli->close(cli);}else{swString_free(stream->buffer);流->缓冲区=NULL;}}swStream_send发送数据swStream_send函数并不直接发送数据,而是将数据存放在stream->buffer中,等待write事件准备好后,调用swStream_onConnect发送数据。值得注意的是,每创建一个新的buffer,都要预留4个字节来存放buffer的数据长度intswStream_send(swStream*stream,char*data,size_tlength){if(stream->buffer==NULL){stream->buffer=swString_new(swoole_size_align(length+4,SwooleG.pagesize));if(stream->buffer==NULL){returnSW_ERR;}stream->buffer->length=4;}if(swString_append_ptr(stream->buffer,data,length)<0){returnSW_ERR;}returnSW_OK;}swStream_onReceive函数swStream_onReceive函数是streamread事件就绪的回调函数,worker进程发送给client的数据都会发送给这个函数。如果length为4,说明worker只发送了一个length的空数据包,也就是说worker进程已经消费完毕,此时我们可以关闭stream。staticvoidswStream_onReceive(swClient*cli,char*data,uint32_tlength){swStream*stream=(swStream*)cli->object;if(length==4){cli->socket->close_wait=1;}else{stream->response(stream,data+4,length-4);}}staticvoidswReactorThread_onStreamResponse(swStream*stream,char*data,uint32_tlength){swSendData响应;swConnection*conn=swServer_connection_verify(SwooleG.serv,stream->session_id);if(!conn){swoole_error_log(SW_LOG_NOTICE,SW_ERROR_SESSION_NOT_EXIST,"connection[fd=%d]doesnotexist.",stream->session_id);返回;}response.info.fd=conn->session_id;响应.info.type=SW_EVENT_TCP;response.info.len=0;response.length=长度;response.data=数据;swReactorThread_send(&response);}swWorker_onStreamAccept接受连接请求和reactor接受主进程的连接大致相同,略有不同的是conn->socket_type设置为SW_SOCK_UNIX_STREAMstaticintswWorker_onStreamAccept(swReactor*reactor,swEvent*event){intfd=0;swSocketAddress客户端地址;socklen_tclient_addrlen=sizeof(client_addr);#ifdefHAVE_ACCEPT4fd=accept4(event->fd,(structsockaddr*)&client_addr,&client_addrlen,SOCK_NONBLOCK|SOCK_CLOEXEC);#elsefd=accept(event->fd,(structsockaddr*)&client_addr,&client_addrlen);#endifif(fd<0){switch(errno){caseEINTR:caseEAGAIN:returnSW_OK;默认值:swoole_error_log(SW_LOG_ERROR,SW_ERROR_SYSTEM_CALL_FAIL,"accept()failed.Error:%s[%d]",strerror(errno),errno);返回SW_OK;}}#ifndefHAVE_ACCEPT4else{swoole_fcntl_set_option(fd,1,1);}#endifswConnection*conn=swReactor_get(reactor,fd);bzero(conn,sizeof(swConnection));conn->fd=fd;conn->active=1;连接->套接字类型=SW_SOCK_UNIX_STREAM;memcpy(&conn->info.addr,&client_addr,sizeof(client_addr));if(reactor->add(reactor,fd,SW_FD_STREAM|SW_EVENT_READ)<0){returnSW_ERR;}returnReadSW_OK;}swamWorker_onread获取数据swWorker_onStreamRead读取数据的核心是调用swProtocol_recv_check_length函数收集数据,放入serv->buffer_pool单链表中。我们已经了解了reactor线程事件循环中的swProtocol_recv_check_length函数。我们这里不再赘述。我们知道这个函数获取到数据后,会调用onPackage函数,也就是swWorker_onStreamPackage函数协议->package_length_size=4;协议->package_length_type='N';->package_length_offset=0;}staticintswWorker_onStreamRead(swReactor*reactor,swEvent*event){swConnection*conn=event->socket;swServer*serv=SwooleG.serv;swProtocol*协议=&serv->stream_protocol;swString*缓冲区;如果(!event->socket->recv_buffer){buffer=swLinkedList_shift(serv->buffer_pool);如果(缓冲区==NULL){缓冲区=swString_new(8192);如果(!缓冲区){返回SW_ERR;}}事件->套接字->recv_buffer=缓冲区;}else{buffer=event->socket->recv_buffer;}if(swProtocol_recv_check_length(protocol,conn,buffer)<0){swWorker_onStreamClose(reactor,event);returnSW_OK;}swWorker_onStreamPackage函数swWorker_onStreamPackage函数用于将数据包发送给swWorker_onTask函数进行消费。消费后会发送一个只包含长度为0的数据包,通知reactorworker消费结束。staticintswWorker_onStreamPackage(swConnection*conn,char*data,uint32_tlength){swServer*serv=SwooleG.serv;swEventData*task=(swEventData*)(data+4);serv->last_stream_fd=conn->fd;swString*package=swWorker_get_buffer(serv,task->info.from_id);uint32_tdata_length=length-sizeof(task->info)-4;//合并数据到包缓冲区swString_append_ptr(package,data+sizeof(task->info)+4,data_length);swWorker_onTask(&serv->工厂,任务);int_end=htonl(0);SwooleG.main_reactor->write(SwooleG.main_reactor,conn->fd,(void*)&_end,sizeof(_end));返回SW_OK;}