当前位置: 首页 > 后端技术 > PHP

php+sockets实现了一个靠谱的延迟队列

时间:2023-03-29 23:30:45 PHP

delayqueue,顾名思义,就是一个带有延迟功能的消息队列。那么,在什么场景下需要这样的队列呢?一、背景首先我们来看一下业务场景:1、会员到期前3天发送召回通知。2、订单支付成功后,5分钟后查看下游链路是否正常。成功3、如何定时查看处于退款状态的订单是否退款成功?4、如果通知失败,会在1、3、5、7分钟后重复通知,直到对方回复?通常,解决上述问题最简单直接的方法就是定时扫表。表扫描的问题是:1.表扫描长时间连接数据库。在数据量大的情况下,连接容易出现异常中断,需要更多的异常处理,对程序的健壮性要求高。2、在数据量大的情况下下载延迟高,无法在规定时间内处理完成,影响业务。虽然可以启动多个进程进行处理,但这会带来额外的维护成本,无法从根本上解决。3、每个业务都要维护自己的扫表逻辑。当业务越来越多的时候,我发现查表部分的逻辑会重复开发,但是和延迟队列很像,可以很好的解决上面的需求。2.调研了市面上的一些开源方案,如下:1.喜欢技术:只有原理,没有开源代码2.GitHub个人:https://github.com/ouqiang/de...1.基于redis实现上,redis只能配置一个,如果redis挂了,整个服务不可用,可用性差不多2。消费端实现pull模式,接入成本高。每个项目都必须执行一次访问代码。3、star用的人不多,生产环境有风险。另外,它也不适合go语言。理解,有问题很难维护3.SchedulerX-阿里巴巴开源:功能强大,但运维复杂,依赖组件多,不够轻便4.RabbitMQ-delaytask:没有延时功能,需要靠一个特性自己实现,而且公司没有部署这个队列,延迟队列单独部署一个队列有点贵,而且还需要专门的运维维护,目前团队做的不支持以上原因打算自己写一个,平时用php比较多,使用项目基本的rediszset结构作为存储,用php语言实现。实现原理参考有赞团队:https://tech.youzan.com/queui...3.目标是轻量级:php的扩展少可以直接运行,不需要引入网络框架,比如swoole,workman等稳定性:采用master-work架构,master不做业务处理,只管理子进程,子进程异常退出时自动拉高可用性:1.支持多实例部署,每个instance是无状态的,一个instance挂掉不影响服务2.支持多个redis配置,一个redis挂掉只影响部分消息3.业务端接入方便,只需要在后台填写相关消息类型和返回接口扩展性:当消费过程出现瓶颈时,可以配置增加消费过程的数量。当写入出现瓶颈时,可以增加实例数。写入性能可以线性提高实时性能:允许一定的时间误差。支持消息删除:企业用户可以随时删除指定的消息。消息传输可靠性:消息进入延迟队列后,保证至少被消费一次。写入性能:qps>1000+四、架构设计及说明整体架构采用master-work架构模式,主要包括6个模块:1.dq-mster:主进程,负责管理创建、销毁、恢复和子进程的信号通知2.dq-server:负责消息写入、读取、删除功能和维护redis连接池3.dq-timer-N:负责从rediszset结构中扫描过期消息,并负责写入到就绪队列,数量是可配置的,一般2个就够了,因为消息在zset结构中是按时间排序的4.dq-consume-N:负责从就绪队列中读取消息并通知相应的回调接口,number是可配置的5.dq-redis-checker:负责检查redis的服务状态。如果redis宕机,发送告警邮件6.dq-http-server:提供用于注册主题的web后台接口。5、部署环境依赖:PHP5.4+安装sockets、redis、pcntl、pdo_mysql扩展step1:安装数据库存放一些主题和告警信息创建数据库dq;#存储报警信息CREATETABLE`dq_alert`(`id`int(11)NOTNULLAUTO_INCREMENT,`host`varchar(255)NOTNULLDEFAULT'',`port`int(11)NOTNULLDEFAULT'0',`user`varchar(255)NOTNULLDEFAULT'',`pwd`varchar(255)NOTNULLDEFAULT'',`ext`varchar(2048)NOTNULLDEFAULT'',PRIMARYKEY(`id`))ENGINE=InnoDBAUTO_INCREMENT=2DEFAULTCHARSET=utf8;#存储redis信息CREATETABLE`dq_redis`(`id`int(11)NOTNULLAUTO_INCREMENT,`t_name`varchar(200)NOTNULLDEFAULT'',`t_content`varchar(2048)NOTNULLDEFAULT'',PRIMARYKEY(`id`))ENGINE=InnoDBAUTO_INCREMENT=14DEFAULTCHARSET=utf8;#存储注册信息CREATETABLE`dq_topic`(`id`int(11)NOTNULLAUTO_INCREMENT,`t_name`varchar(1024)NOTNULLDEFAULT'',`delay`int(11)NOTNULLDEFAULT'0',`callback`varchar(1024)NOTNULLDEFAULT'',`timeout`int(11)NOTNULLDEFAULT'3000',`email`varchar(1024)NOTNULLDEFAULT'',`topic`varchar(255)NOTNULLDEFAULT'',`createor`varchar(1024)NOTNULLDEFAULT'',`status`tinyint(4)NOTNULLDEFAULT'1',`method`varchar(32)NOTNULLDEFAULT'GET',PRIMARYKEY(`id`))ENGINE=InnoDBAUTO_INCREMENT=6DEFAULTCHARSET=utf8;step2:在DqConfg中配置数据库信息。file:DqConf::$dbstep3:启动http服务并修改php路径$logPath在DqConf.php文件中命令:phpDqHttpServer.php--port8088访问:http://127.0.0.1:8088,配置界面出现redis信息格式:host:post:auth比如127.0.0.1:6379:12345stop4:启动服务进程:phpDqInit.php--port6789看到如下信息说明启动成功stop5:配置报告信息(如redis宕机)stop6:注册topicstep7:写入数据,在项目根目录下新建test.php文件,写入addServer($server);$topic='order_openvip_checker';//主题在后台注册$id=uniqid();$data=array('id'=>$id,'body'=>array('a'=>1,'b'=>2,'c'=>3,'ext'=>str_repeat('a',64),),//可选,设置后以这个通知时间为准,注册默认延迟时间Specify'fix_time'=>date('Y-m-d23:50:50'),);//添加$boolRet=$dqClient->add($topic,$data);echo'添加耗时:'.(msectime()-$time)。"ms\n";//查询$time=msectime();$result=$dqClient->get($topic,$id);echo'gettime-consuming:'.(msectime()-$time)."ms\n";//删除$time=msectime();$boolRet=$dqClient->del($topic,$id);echo'del需要时间:'.(msectime()-$time)。“毫秒\n”;执行phptest.phpstep8:查看日志默认日志目录在项目目录的logs目录下,修改DqConf.php中的$logPath1。请求日志:request_ymd.txt2。通知日志:notify_ymd.txt3。错误日志:err_ymd.txtstep9:如果配置文件有变化1.系统会自动检测配置文件是否新增,如果有变化会自动退出(没有找到更好的热更新方案),需要重启,你能在crontab中创建任务,1分执行一次,程序有check_self的判断2.优雅退出命令:master检测并监听USR2信号,收到信号后会通知所有子进程,子进程完成当前任务后自动退出ps-有效|grepdq-master|grep-vgrep|头-n1|awk'{print$2}'|xargskill-USR2六、性能测试需要安装pthreads扩展:测试原理:使用多线程模拟并发,1s内可以成功返回请求数phpDqBenchconcurrencyrequestsconcurrency:Concurrencyrequests:产生的请求数各并发测试环境:内存8G,8核cpu,2台redis,1台dq-server部署在一台机器上,数据包64字节qps:2400VII.值得一提的性能优化点:1.Redismulticommand:将对redis的多个操作打包成一个,减少网络开销2.计数操作异步处理,在异步逻辑中使用函数的静态变量保存。当redis写入成功后,静态变量被释放,redis异常时count可以保持一致,除非进程退出。3、内存泄漏检测很有必要:所有的内存分配都在底层调用了brk或mmap,只要程序中只有大量的brk或mmap系统调用,内存泄漏的可能性就很大,检测命令:strace-c-ppid|grep'mmap|系统函数调用次数是别人的好几倍,程序大概率可能会出问题。最大通知次数为10次(可以修改Dqconf.php文件中的$notify_exp_nums)。通知间隔为2n+1。比如第一次是1分钟,通知失败,第二次是3分钟后,直到收到回复。超过最大通知数量后系统自动丢弃,同时发送邮件通知ps:网络抖动在所难免,如果通知接口涉及到核心服务,一定要幂等!!9、线上情况线上部署两个实例,每个机房一个,4个redis用于存储。服务已稳定运行数月,各项指标均符合预期。补偿项目地址:https://github.com/chenlinzho...