前言上一章我们讲了客户端的连接。对于同步客户端,连接已经成功建立;但是对于异步客户端来说,此时可能还在进行中。对于DNS解析,onConnect回调函数还没有执行。本节继续讲解客户端发送数据的过程,同时可以看到TCP异步客户端执行onConnect回调函数的过程。sendentry这个入口函数的逻辑很简单,从PHP函数中获取data数据,然后调用connect函数。staticPHP_METHOD(swoole_client,send){char*data;zend_size_t数据长度;zend_longflags=0;#ifdefFAST_ZPPZEND_PARSE_PARAMETERS_START(1,2)Z_PARAM_STRING(data,data_len)Z_PARAM_OPTIONALZ_PARAM_LONG(flags)ZEND_PARSE_PARAMETERS_END();#elseif(zend_parse_parameters(ZEND_NUM_ARGS()TSRMLS_CC,"s|l",&data,&data&flags)==FAILURE){返回;}#endifswClient*cli=client_get_ptr(getThis()TSRMLS_CC);//清除errnoSwooleG.error=0;intret=cli->send(cli,data,data_len,flags);if(ret<0){swoole_php_sys_error(E_WARNING,"无法发送(%d)%zd字节。",cli->socket->fd,data_len);zend_update_property_long(swoole_client_class_entry_ptr,getThis(),SW_STRL("errCode")-1,SwooleG.errorTSRMLS_CC);RETVAL_FALSE;}else{RETURN_LONG(ret);}}swClient_tcp_send_sync数据通过swConnectio传递n_send函数阻塞调用send,当返回错误为EAGAIN时,调用swSocket_wait等待1sstaticintswClient_tcp_send_sync(swClient*cli,char*data,intlength,intflags){intwritten=0;诠释n;断言(长度>0);断言(数据!=NULL);while(writtensocket,data,length-written,flags);if(n<0){if(errno==EINTR){继续;}elseif(errno==EAGAIN){swSocket_wait(cli->socket->fd,1000,SW_EVENT_WRITE);继续;}else{SwooleG.error=errno;返回SW_ERR;}}写+=n;数据+=n;}returnwritten;}swClient_tcp_send_async异步TCP客户端由于异步客户端已经设置为非阻塞,并且添加了reactor监控,发送数据只需要reactor->write函数即可。当此时socket不可写时,会自动放入out_buff缓冲区。当out_buffer大于高水位线时,会自动调用onBufferFull回调函数。staticintswClient_tcp_send_async(swClient*cli,char*data,intlength,intflags){intn=length;if(cli->reactor->write(cli->reactor,cli->socket->fd,data,length)<0){if(SwooleG.error==SW_ERROR_OUTPUT_BUFFER_OVERFLOW){n=-1;cli->socket->high_watermark=1;}else{返回SW_ERR;}}if(cli->onBufferFull&&cli->socket->out_buffer&&cli->socket->high_watermark==0&&cli->socket->out_buffer->length>=cli->buffer_high_watermark){cli->socket->high_watermark=1;cli->onBufferFull(cli);}returnn;}swClient_udp_sendUDPclient对于UDPclient,如果Socketbuffer已满,则不会像TCP一样等待reactor可写,而是直接返回结果。对于异步客户端,它只是对sendto函数的非阻塞调用。staticintswClient_udp_send(swClient*cli,char*data,intlen,intflags){intn;n=sendto(cli->socket->fd,data,len,0,(structsockaddr*)&cli->server_addr.addr,cli->server_addr.len);如果(n<0||ntype==SW_SOCK_UDP){ret=swSocket_udp_sendto(cli->socket->fd,ip,port,data,len);}elseif(cli->type==SW_SOCK_UDP6){ret=swSocket_udp_sendto6(cli->socket->fd,ip,port,data,len);}else{swoole_php_fatal_error(E_WARNING,"仅支持SWOOLE_SOCK_UDP或SWOOLE_SOCK_UDP6。");返回假;}SW_CHECK_RETURN(ret);}intswSocket_udp_sendto(intserver_sock,char*dst_ip,intdst_port,char*data,uint32_tlen){structsockaddr_inaddr;如果(inet_aton(dst_ip,&addr.sin_addr)==0){swWarn("ip[%s]无效。",dst_ip);返回SW_ERR;}addr.sin_family=AF_INET;地址.sin_port=htons(dst_port);返回swSocket_sendto_blocking(server_sock,data,len,0,(structsockaddr*)&addr,sizeof(addr));}intswSocket_udp_sendto6(intserver_sock,char*dst_ip,intdst_port,char*data,uint32_tlen){structsockaddr_in6addr;bzero(&addr,sizeof(addr));if(inet_pton(AF_INET6,dst_ip,&addr.sin6_addr)<0){swWarn("ip[%s]无效。",dst_ip);返回SW_ERR;}addr.sin6_port=(uint16_t)htons(dst_port);地址.sin6_family=AF_INET6;返回swSocket_sendto_blocking(server_sock,数据,len,0,(structsockaddr*)&addr,sizeof(addr));}intswSocket_sendto_blocking(intfd,void*__buf,size_t__n,intflag,structsockaddr*__addr,socklen_t__addr_len){诠释n=0;while(1){n=sendto(fd,__buf,__n,flag,__addr,__addr_len);如果(n>=0){休息;}else{if(errno==EINTR){继续;}elseif(swConnection_error(errno)==SW_WAIT){swSocket_wait(fd,1000,SW_EVENT_WRITE);继续;}else{休息;}}}returnn;}swClient_onWrite写就绪状态当reactor监测到socket已经进入写就绪状态时,会调用swClient_onWrite函数。从上一章我们知道,当异步客户端建立连接时,swoole会调用connect函数,该函数应该返回0,或者返回错误码EINPROGRESS,此时会监听writereadiness。无论哪种情况,都会调用swClient_onWrite,表示连接建立成功,三次握手完成,但是cli->socket->active还是0,如果cli->socket->active为0,则意思是异步客户端虽然建立了连接,但是还没有调用onConnect回调函数,所以此时应该调用execute_onConnect函数。如果使用SSL隧道加密,也会进行SSL握手,设置_socket->ssl_state=SW_SSL_STATE_WAIT_STREAM。当active为1时,可以调用swReactor_onWrite发送数据。staticintswClient_onWrite(swReactor*reactor,swEvent*event){swClient*cli=event->socket->object;swConnection*_socket=cli->套接字;if(cli->socket->active){#ifdefSW_USE_OPENSSLif(cli->open_ssl&&_socket->ssl_state==SW_SSL_STATE_WAIT_STREAM){if(swClient_ssl_handshake(cli)<0){gotoconnect_fail;}elseif(_socket->ssl_state==SW_SSL_STATE_READY){gotoconnect_success;}else{if(_socket->ssl_want_read){cli->reactor->set(cli->reactor,event->fd,SW_FD_STREAM_CLIENT|SW_EVENT_READ);}返回SW_OK;}}#endifif(swReactor_onWrite(cli->reactor,event)<0){returnSW_ERR;}如果(cli->onBufferEmpty&&_socket->high_watermark&&_socket->out_buffer->length<=cli->buffer_low_watermark){_socket->high_watermark=0;cli->onBufferEmpty(cli);}返回SW_OK;}socklen_tlen=sizeof(SwooleG.error);if(getsockopt(event->fd,SOL_SOCKET,SO_ERROR,&SwooleG.error,&len)<0){swWarn("getsockopt(%d)failed.Error:%s[%d]",event->fd,strerror(错误号),错误号);返回SW_ERR;}//成功if(SwooleG.error==0){//监听读取事件cli->reactor->set(cli->reactor,event->fd,SW_FD_STREAM_CLIENT|SW_EVENT_READ);//已连接_socket->active=1;#ifdefSW_USE_OPENSSLif(cli->open_ssl){if(swClient_enable_ssl_encrypt(cli)<0){gotoconnect_fail;}如果(swClient_ssl_handshake(cli)<0){转到连接失败;}else{_socket->ssl_state=SW_SSL_STATE_WAIT_STREAM;}返回SW_OK;}connect_success:#endifif(cli->onConnect){execute_onConnect(cli);}}else{#ifdefSW_USE_OPENSSLconnect_fail:#endif_socket->active=0;cli->关闭(cli);如果(cli->onError){cli->onError(cli);}}returnSW_OK;}staticsw_inlinevoidexecute_onConnect(swClient*cli){if(cli->timer){swTimer_del(&SwooleG.timer,cli->timer);cli->timer=NULL;}cli->onConnect(cli);}client_onConnectstaticvoidclient_onConnect(swClient*cli){zval*zobject=(zval*)cli->object;#ifdefSW_USE_OPENSSLif(cli->ssl_wait_handshake){client_execute_callback(zobject,SW_CLIENT_CB_onSSLReady);}else#endifif(!cli->redirect){client_execute_callback(zobject,SW_CLIENT_CB_onConnect);}else{client_callback*cb=(client_callback*)swoole_get_property(zobject,0);if(!cb||!cb->onReceive){swoole_php_fatal_error(E_ERROR,"没有'onReceive'回调函数。");}}}