当前位置: 首页 > 后端技术 > PHP

nsq优秀的消息队列

时间:2023-03-29 22:26:21 PHP

简介NSQ是一个开源的分布式消息队列中间件,用Go语言编写,旨在每天大规模处理数十亿条消息。NSQ具有分布式和去中心化的拓扑结构。这种结构具有无单点故障、容错、高可用性、能够保证消息可靠传递等特点。是已经在大规模生产环境中应用的成熟产品。NSQ在国内公司用的很少,用的时候越看越惊奇。比如使用方便,部署速度快,比如之前比较麻烦的延迟定时消息。我发现nsq也支持。官方文档比较齐全。Consult对问题的回复也很耐心和及时,所以我觉得有必要发一篇介绍nsq的文章,造福大众。nsq具有三个必要的组件:nsqd、nsqlookupd和nsqadmin。其中必须部署nsqd和nsqlookup。让我们一一介绍。nsqd:负责接收消息、存储队列和向客户端发送消息。nsqd可以部署在多台机器上。当你使用客户端向一个topic发送消息时,你可以配置多个nsqd地址,消息会随机分发到每个nsqd中,nsqd首先将消息存储在内存通道中,当内存通道满时,写入消息到磁盘文件。他监听两个tcp端口,一个用来服务客户端,一个用来提供http接口。nsqd启动时,设置nsqlookupd地址在最前面:nsqd–lookupd-tcp-address=127.0.0.1:4160也可以用数据目录指定端口nsqd–lookupd-tcp-address=127.0.0.1:4160--broadcast-address=127.0.0.1-tcp-address=127.0.0.1:4154-http-address=”0.0.0.0:4155”–data-path=/data/nsqdata其他配置项可以在官网找到nsqlookupd:主要负责用于服务发现,负责nsqd心跳和状态监控,提供nsqd地址和状态给客户端和nsqadminnsqadmin:nsqadmin是一个web管理界面启动方法如下:nsqadmin–lookupd-http-address=127.0.0.1:4161频道详情页示例图如下,empty可以清除当前频道的信息,delete表示删除当前频道,pause表示暂停消息消费。图中还有几个重要的参数,当前积压的深度,in-flight代表已经投递但还没有消费的消息,deferred是未消费的定时(延迟)消息的数量,readycount比较重要,以及go的client是max-in-flight除以client连接数得到的。它表示一次向客户端推送多少条消息,或者客户端准备一次接收多少条消息。小心设置它的值,因为它可能会导致服务器压力。如果消费能力比较弱,Rdy建议设置低一些,比如3Topic和Channel。其实nsqd相当于Kafka中的partition。channel和consumersclients的多重连接相当于Kafka的消费组,但是nsq在概念上比Kafka更易用。理解抛开和Kafka的对比,nsq的topic可以设置多个channel,因为可能有多个业务方需要设置topic的消息,这样就不会互相影响了。当然,一条消息会发送到该主题下的所有频道,然后分配给不同客户端的连接,如下图所示。本文主要介绍nsq的使用,源码就不展开讨论了。如果感兴趣的同学比较多,我过几天再开一篇专门介绍nsq的源码和分析的文章。这是延迟消息:nsq支持延迟消息的传递。比如我觉得这条消息会在5分钟后送达给客户端消费。相比普通的消息传递,多了毫秒数。默认支持的最大毫秒数是3600000毫秒,也就是60分钟,但是这个值可以在nsqd启动的时候用-max-req-timeout参数修改。延迟消息可用于以下场景。例如,如果订单超过30分钟未付款,则修改其状态或发送短信提醒客户。例如滴滴打车订单完成后一定时间内未评价的滴滴打车订单不能设置为默认值,再比如用户积分过期,其他场景避免全表扫描异步处理。Kafka不支持延迟消息的传递。目前,我们知道支持rabbitmq和rocketmq,但是rabbitmq有坑,可能会超时交付,而rocketmq只有阿里云付费版支持的更好。nsq延迟消息的实现是用最小堆算法完成的。作者继承了堆的一系列接口,写了一个pqueque最小堆优先级队列。在internal/pequeque目录下可以看到相关的实现。如果chanMsg.deferred!=0会调用channel.PutMessageDeferred方法,最后调用继承goheap接口的pqueque.push方法。延迟消息的处理和普通消息一样,消息在nsqd/protocol_v2.go下的messagePump中发送给客户端,最后在queueScanWorker中单独处理,pop在peekAndShift方法中,比较当前timewithdeferred[0],如果大于,弹出并发送如下代码给客户端:func(n*NSQD)queueScanWorker(workChchan*Channel,responseChchanbool,closeChchanint){for{select{casec:=<-workCh:now:=time.Now().UnixNano()dirty:=falseifc.processInFlightQueue(now){dirty=true}ifc.processDeferredQueue(now){dirty=true}responseCh<-脏案例<-closeCh:return}}}func(c*Channel)processDeferredQueue(tint64)bool{c.exitMutex.RLock()deferc.exitMutex.RUnlock()ifc.退出(){返回假}dirty:=falsefor{c.deferredMutex.Lock()item,_:=c.deferredPQ.PeekAndShift(t)c.deferredMutex.Unlock()ifitem==nil{gotoexit}dirty=truemsg:=item.Value.(*Message)_,err:=c.popDeferredMessage(msg.ID)如果err!=nil{gotoexit}c.put(msg)}exit:returndirty}func(pq*PriorityQueue)PeekAndShift(maxint64)(*Item,int64){ifpq.Len()==0{returnnil,0}item:=(*pq)[0]ifitem.Priority>max{returnnil,item.Priority-max}heap.Remove(pq,0)returnitem,0}php和go客户端使用官网客户端链接:ClientLibrariesphp客户端官网曾经有一个5年前的老客户端,没人维护甚至无法维护运行一下,于是贡献了一个php72扩展版php-nsq,速度快了近三倍,正在逐步完善,支持各种配置和特性。nsqd_addr=array("127.0.0.1:4150","127.0.0.1:4154");$nsq=newNsq();$is_true=$nsq->connect_nsqd($nsqd_addr);for($i=0;$我<20;$i++){$nsq->publish("test","nihao");}php-nsqdelaypub:参数只是多了一个毫秒参数,soeasy!$deferred=newNsq();$isTrue=$deferred->connectNsqd($nsqdAddr);for($i=0;$i<20;$i++){$deferred->deferredPublish("test","messagedaly",3000);//第三个值默认范围milliseconddefault:[0"test","channel"=>"struggle","rdy"=>2,//可选,默认1"connect_num"=>1,//可选,默认1"retry_delay_time"=>5000,//可选,默认0,5000毫秒后重试消息);$nsq->subscribe($nsq_lookupd,$config,function($msg){echo$msg->payload;echo$msg->attempts;echo$msg->message_id;echo$msg->timestamp;});}去客户端pubpackage主要导入t("github.com/nsqio/go-nsq")varproducer*nsq.Producerfuncmain(){nsqd:="127.0.0.1:4150"生产者,err:=nsq.NewProducer(nsqd,nsq.NewConfig())producer.Publish("test",[]byte("nihao"))iferr!=nil{panic(err)}}goclientsubpackagemainimport("fmt""sync""github.com/nsqio/go-nsq")typeNSQHandlerstruct{}func(this*NSQHandler)HandleMessage(msg*nsq.Message)error{fmt.Println("receive",msg.NSQDAddress,"message:",string(msg.Body))返回nil}functestNSQ(){waiter:=sync.WaitGroup{}waiter.Add(1)gofunc(){deferwaiter.Done()config:=nsq.NewConfig()config.MaxInFlight=9//建立多个连接因为我:=0;我<10;i++{consumer,err:=nsq.NewConsumer("test","struggle",config)ifnil!=err{fmt.Println("err",err)返回}consumer.AddHandler(&NSQHandler{})err=consumer.ConnectToNSQD("127.0.0.1:4150")ifnil!=err{fmt.Println("err",err)return}}select{}}()waiter.Wait()}funcmain(){testNSQ();}同时这篇文章已经更新了自己的博客