当前位置: 首页 > 后端技术 > PHP

Swoole源码分析——基础模块的Queue队列

时间:2023-03-29 23:31:34 PHP

前言swoole的底层队列有两个:进程间通信IPC的消息队列swMsgQueue,和环形队列swRingQueue。IPC消息队列用于task_worker进程接受和传递消息,ring队列用于task_worker在SW_MODE_THREAD线程模式下接受和传递消息。swMsgQueue消息队列数据结构swoole使用的消息队列不是POSIX下的mq_xx系统函数,而是SystemV下的msgxxx系列函数。原因猜测是systemv系统函数可以指定mtype,也就是消息的类型,从而指定task_worker的投递。swMsgQueue的数据结构比较简单。blocking指定消息队列是否阻塞,msg_id是创建的消息队列的id,flags也指定阻塞或非阻塞,perms指定消息队列的权限。typedefstruct_swMsgQueue{int阻塞;intmsg_id;内部标志;intperms;}swMsgQueue;swMsgQueue消息队列创建swMsgQueue消息队列是调用msgget函数,该函数的msg_key为服务器端配置的message_queue_key,任务队列在服务器端结束后,就不会了被毁。重启程序后,任务进程会继续处理队列中的任务。如果不设置这个值,程序会自动生成:ftok($php_script_file,1)voidswMsgQueue_set_blocking(swMsgQueue*q,uint8_tblocking){if(blocking==0){q->flags=q->flags|IPC_NOWAIT;}else{q->flags=q->flags&(~IPC_NOWAIT);}}intswMsgQueue_create(swMsgQueue*q,intblocking,key_tmsg_key,intperms){if(perms<=0||perms>=01000){perms=0666;}intmsg_id;msg_id=msgget(msg_key,IPC_CREAT|perms);如果(msg_id<0){swSysError("msgget()失败。");返回SW_ERR;}else{bzero(q,sizeof(swMsgQueue));q->msg_id=msg_id;q->烫发=烫发;q->blocking=阻塞;swMsgQueue_set_blocking(q,阻塞);}返回0;,flags指定发送是阻塞还是非阻塞,阻塞发送方式在task_worker进程中使用。intswMsgQueue_push(swMsgQueue*q,swQueue_data*in,intlength){intret;while(1){ret=msgsnd(q->msg_id,in,length,q->flags);if(ret<0){SwooleG.error=errno;如果(errno==EINTR){继续;}elseif(errno==EAGAIN){返回-1;}else{swSysError("msgsnd(%d,%d,%ld)failed.",q->msg_id,length,in->mtype);返回-1;}}else{返回ret;}}return0;}swMsgQueue消息队列的接收消息队列的接收是使用msgrcv函数,其中mtype是消息的类型,这个参数会取出指定类型的消息。如果task_ipc_mode设置为竞争模式,则该值统一为0,否则该值为发送消息的task_worker的id。task_worker进程的主循环将阻塞在这个函数中,直到消息到达。intswMsgQueue_pop(swMsgQueue*q,swQueue_data*data,intlength){intret=msgrcv(q->msg_id,data,length,data->mtype,q->flags);if(ret<0){SwooleG.error=errno;if(errno!=ENOMSG&&errno!=EINTR){swSysError("msgrcv(%d,%d,%ld)failed.",q->msg_id,length,data->mtype);}}returnret;}swRingQueue环形队列的数据结构环形队列在上一篇文章中没有出现过,因为该队列是在SW_MODE_THREAD模式的工作线程中使用的。由于不是进程间通信,而是线程间通信,效率会更高。swoole中有两种环形队列,一种是普通环形队列,一种是线程安全环形队列。本文只讲线程安全的环形队列。Swoole没有使用线程锁,让环形队列更高效,并且使用了无锁结构,只使用原子原子锁。值得注意的是,数据结构中的flags,其值只会是0-4中的一个,并且使用原子锁改变值来实现互斥。typedefstruct_swRingQueue{void**data;/*队列空间*/char*flags;//0:推送就绪1:立即推送//2:弹出就绪;3:现在弹出uint大小;/*队列总大小*/uintnum;/*当前入队队列数*/uinthead;/*头,队列的方向*/uinttail;/*尾部,队列的方向*/}swRingQueue;swRingQueue环形队列swRingQueue环形队列的创建环形队列的创建很简单,就是初始化队列数据结构中的各种属性。intswRingQueue_init(swRingQueue*queue,intbuffer_size){queue->size=buffer_size;queue->flags=(char*)sw_malloc(queue->size);if(queue->flags==NULL){return-1;}queue->data=(void**)sw_calloc(queue->size,sizeof(void*));如果(队列->数据==NULL){sw_free(队列->标志);返回-1;}队列->head=0;队列->尾=0;memset(队列->标志,0,队列->大小);memset(queue->data,0,queue->size*sizeof(void*));return0;}swRingQueue环形队列的消息入队。要发送消息,必须首先确定环队列的尾部。queue->flags是一个数组,存放的是所有队列元素的当前状态。如果当前队列尾元素的状态不为0,说明其他线程已经对该队列元素进行了操作。我们当前线程暂时不能对当前队列tail进行操作。我们必须等待其他线程将队列尾元素向后移动一位才能更新。当线程将当前队列尾部的状态从0变为1时,我们需要立即更新队列尾部的偏移量,以便其他线程可以继续排队数据。然后将数据放入queue->data,保存数据的地址即可。最后给cur_tail_flag_index原子加1,改变队列元素的状态为读取;将1添加到queue->num原子swRingQueue_push(swRingQueue*queue,void*ele){if(!(queue->numsize)){return-1;}intcur_tail_index=queue->tail;char*cur_tail_flag_index=queue->flags+cur_tail_index;//TODOScheldwhile(!sw_atomic_cmp_set(cur_tail_flag_index,0,1)){cur_tail_index=queue->tail;cur_tail_flag_index=queue->flags+cur_tail_index;}//两个入队线程之间的同步//TODO模运算可以优化intupdate_tail_index=(cur_tail_index+1)%queue->size;//如果已经被其他线程使用如果线程已经更新过,则不需要更新;//否则,更新为(cur_tail_index+1)%size;sw_atomic_cmp_set(&queue->tail,cur_tail_index,update_tail_index);//申请可用存储空间*(queue->data+cur_tail_index)=ele;sw_atomic_fetch_add(cur_tail_flag_index,1);sw_atomic_fetch_add(&queue->num,1);return0;}swRingQueue环形队列的消息出队与入队相反,出队需要判断当前队头的位置,如果队长的状态不为2,则表示其他线程已经执行dequeue操作,等待其他线程更新队首位置获取队首元素,立即更新新的队首位置,然后传首地址数据的数据到ele,然后将团队的leader元素的状态恢复,减少队列的num。intswRingQueue_pop(swRingQueue*queue,void**ele){if(!(queue->num>0))return-1;intcur_head_index=queue->head;char*cur_head_flag_index=queue->flags+cur_head_index;while(!sw_atomic_cmp_set(cur_head_flag_index,2,3)){cur_head_index=queue->head;cur_head_flag_index=queue->flags+cur_head_index;}//TODO取模操作可以优化intupdate_head_index=(cur_head_index+1)%queue->size;sw_atomic_cmp_set(&queue->head,cur_head_index,update_head_index);*ele=*(queue->data+cur_head_index);sw_atomic_fetch_sub(cur_head_flag_index,3);sw_atomic_fetch_sub(&queue->num,1);返回0;}