当前位置: 首页 > Linux

MySql实现事务性消息队列和php多进程消费设计

时间:2023-04-06 23:53:46 Linux

由于公司业务需要,最近在设计一个通用的队列功能模块。需求主要有两点:使用MySql实现事务性消息队列(当然主流队列服务可以使用redis或者rabbitmq等,这里讨论Mysql实现)PHP多进程消费队列消息使用MySql实现事务性消息队列。消息队列的作用是:异步、解耦、消峰。目前我用的最多的是异步。在很多业务场景中,我们可以将实时性要求较低的请求转化为异步处理,降低系统负载压力,提高系统稳定性。在异步处理离线数据的过程中,消息队列必须满足以下要求:即使在系统故障的情况下,消息也不能丢失。一条消息一旦插入,就必须至少处理一次(最好只处理一次,但很难实现,所以只需要at-least-once语义);先进先出顺序。(mysqlid自增可以满足这个特性,当然可以设计特殊参数进行特殊处理)支持多个生产者(mysql支持并发操作,支持这个特性)支持多个消费者。每条消息只能由其中一个消费者处理(业务处理需要考虑幂等性)。以上就是对队列实现的描述。用MySql实现事务性消息队列,参考文章https://spockwangs.github.io/...本次设计的表结构如下:CREATETABLE`comom_queue`(`id`int(11)NOTNULLAUTO_INCREMENTCOMMENT'auto-incrementid',`type`tinyint(4)NOTNULLDEFAULT'0'COMMENT'queuetype,codebusinessnotes',`conn_id`int(11)NOTNULLDEFAULT'0'COMMENT'ConsumerID',`param_content`textCOMMENT'队列入参',`callback`varchar(255)NOTNULLDEFAULT''COMMENT'队列消费回调函数',`status`tinyint(2)NOTNULLDEFAULT'0'COMMENT'0new1消费2成功3失败4重试',`create_time`int(11)NOTNULLDEFAULT'0'COMMENT'创建时间',`update_time`int(11)NOTNULLDEFAULT'0'COMMENT'状态改变时间',`preexec_time`int(11)NOTNULLDEFAULT'0'COMMENT'预消费时间',`p_key`varchar(100)NOTNULLDEFAULT''COMMENT'业务唯一标识键,供查询',`mark`varchar(255)NOTNULLDEFAULT''COMMENT'备注',PRIMARYKEY(`id`),KEY`indx_s`(`p_key`,`type`)USINGBTREE,KEY`indx_exec`(`conn_id`,`status`)USINGBTREE,KEY`indx_ty`(`type`)使用BTREE)ENGINE=InnoDBDEFAULTCHARSET=utf8;解释以下字段设计:回调队列中不同的业务消息有不同的业务处理,使用回调值回调相应的业务方法类型队列业务类型,区分不同的业务,使用不同的消费者分别消费。除了FIFO的特性,消费可以单独开启算子消费有特殊要求的业务消息(消息优先级高)preexec_time预消费时间,部分业务消息有消费时间要求,phpmulti可以设置排队时间-过程消耗设计。php多进程的实现依赖于pcntl。posix扩展,读者可以自行查看是否安装了该扩展。队列服务的设计与实现包括以下功能点:主进程和子进程的运行时间可以配置。主进程(主进程)创建并监视子进程的行为以创建定时器信号。主进程(master进程)定时监听队列信息,可用于子进程(worker进程)消费消息,如消息堆积通知,不同业务消息可配置不同数量的子进程。每个业务的子进程数可以配置正常上拉数和最大进程数。根据队列积压,子进程可以动态启动进程号(暂未实现,后续补充)话不多说,直接看代码,提取出来的队列服务类代码如下:'process_num']protected$child=[];//子进程pid数组受保护$result=[];//计算结果受保护$overTime=0;//主进程超时保护$startTime;//主进程运行时间受保护$childOverTime=3600;//子进程超时保护$alarm_time=2;publicfunction__construct($process=[],$overTime=0,$childOverTime=3600){if(!function_exists('pcntl_fork')){die("pcntl_fork不存在");}$this->process=$process;$this->overTime=$overTime;$this->childOverTime=$childOverTime;$this->startTime=ti我();}/***设置子进程*/publicfunctionsetProcess($process){$this->process=$process;}/***设置检测时间间隔单位s*/publicfunctionsetAlarmTime($time){$this->alarm_time=$time;}/***forkchildprocess*/protectedfunctionforkProcess(){//创建每种类型的消费者进程$process=$this->process;foreach($processas$key=>$num){for($i=0;$i<$num;$i++){$this->forkOneProcess($key);}}}返回$this;}/***创建子进程操作*@param$key*@return$this*/privatefunctionforkOneProcess($key){$pid=pcntl_fork();如果($pid==0){$id=getmypid();$this->processDo($id,$key);退出(0);}elseif($pid>0){//记录子进程信息$childProcess=array('pid'=>$pid,'type'=>$key,'create_time'=>time());$this->child[$pid]=$childProcess;}返回$this;}/***子进程做什么,consumer*/abstractprotectedfunctionprocessDo($id,$key);/***检查队列数*/抽象保护函数checkQueueNum();/***等待子进程结束*/protectedfunctionwaitProcess(){while(count($this->child)){foreach($this->childas$pid=>$item){$res=pcntl_waitpid($pid,$status,WNOHANG);pcntl_signal_dispatch();如果(-1==$res||$res>0){unset($this->child[$pid]);echo"pid$pidexit",PHP_EOL;//判断主进程是否超时,拉起新的子进程$leftTime=time()-$this->startTime;如果($this->overTime>$leftTime){$this->forkOneProcess($item['type']);echo"创建一个新进程",PHP_EOL;}}//判断子进程是否存在并超时,超过20分钟则强制退出elseif(posix_kill($pid,0)&&(time()-$item['create_time']-20*60)>$this->childOverTime){posix_kill($pid,SIGUSR1);echo"pid$pidexit2",PHP_EOL;}}}返回$this;}/***队列检测*/protectedfunctiontimeHandler(){$this->checkQueueNum();pcntl_alarm($this->alarm_time);}/***start*/publicfunctionrunProcess(){//注册信号pcntl_signal(SIGALRM,array($this,'timeHandler'));pcntl_alarm($this->alarm_time);$leftTime=time()-$this->startTime;while(($this->overTime==0||$this->overTime>$leftTime)){echo"newprocessprocesslist",PHP_EOL;$this->forkProcess()->waiteProcess();$leftTime=时间()-$t他的->开始时间;}}}最后一个功能点:每个业务可以配置正常启动的子进程数和最大进程数。根据队列积压,动??态启动的子进程数还没有实现。目前的队列服务设计如上,请大家指教!