之前我们也讲了很多RocketMQ的知识点。本文介绍的是RocketMQ消息的推拉机制。这应该也属于采访的热点。让我们学习吧。我们下面要讲的推拉模式是指broker。消费者和生产者之间,生产者和代理之间的模式是推送模式,即生产者每生产一条消息,都会主动推送给代理。其实这一点大家应该都很清楚。如果producer和broker交互broker会拉取它,那就奇怪了,每次消息都必须存储在producer本地,然后等待broker拉取,这个要看多个的可靠性制作人,显然这个设计很糟糕。下面我们来看看要讨论的是broker和consumer之间的交互是push还是pull。你也可以想想是push还是pull。谈谈推送模式及其优缺点。推送模式是指broker将消息推送给consumer,即consumer。就是被动接收这个消息,broker会主动推送消息给consumer。那么,这种模式的优缺点,大家可以好好想想。一个明显的优点是延迟小,实时性更好。broker收到消息后,会立即推送给consumer。实时性比较高。还有一个好处就是简化了消费端的逻辑。消费者端不需要自己处理拉取逻辑。它只需要监听消息的主题,然后专心处理这条消息的业务逻辑即可。以上两点都是优点,所以有优点,就会有相应的缺点。第二点简化了consumer端的逻辑,同时把broker端的逻辑复杂化了,这其实算不上优势,也算不上劣势。这是这种模式的一个特点。您需要根据场景选择适合自己的模式。最大的缺点就是推送率和消费率不匹配,很不好。你想,如果broker拿到消息,推送给consumer,不关心consumer的消费能力,直接扔给consumer,那consumer可能像生产线一样崩溃,最高速度每秒只能接收10立方米,结果你把每秒100立方米扔到生产线上,生产线可能会直接崩溃,因为它处理不了。当推送速度非常快时,甚至像DDos攻击一样,消费者就更难受了,不同消费者的消费率也不一样,经纪人很难平衡每个消费者的消费率。如果broker需要记住每一个Consum如果er的消费能力和速度,broker的复杂度可以直线上升。缺点之一是消费者推送出去后,不能保证消息发送成功。Push采用的是广播模式,即只有服务端和客户端在同一个频道时,push模式才能成功将消息推送给消费者分析pull模式以及pull模式的优缺点。同理,就是消费者主动从broker那里拉取消息。主动点,我不需要你喂我了,我只是偶尔去你那儿看看消息,你也不管我的消费速度,我知道怎么回事,想想这个的优点和缺点,我知道这个模型的优点和缺点肯定是明白的。最大的优势在于主动权掌握在消费者手中。每个消费者的消费能力可能不同。消费者可以根据自己的情况拉动消息。请求,如果消费者真的不堪重负,根据一定的策略暂停服务端的拉取是比较容易的,不需要进行消息处理逻辑。你来了,我就给你,你要多少我就给多少,broker是一台没有感情的存储机器,pull模式更适合发送批量消息。推送方式是一条消息来就推送,当然也可以缓存一些消息再推送,但是不确定消费者是否能处理这批推送的消息。在pull模式下,consumer主动通知broker,让broker更好的决定缓存多少条消息进行批量发送。说完优点,就要说缺点了,拉模式需要消费者对服务端有一定的了解。主要缺点是实时性差。对于服务端实时更新的信息,客户端获取实时信息的难度还是很大的。毕竟,消费者是在拉消息。consumer怎么知道消息已经到了,那么consumer可以做的就是不断的拉,但是不能频繁的拉,这样也是很消耗性能的,所以必须要降低请求的频率,而请求间隔就是消息延迟RocketMQ最终的选择,为什么是pull模式RocketMQ最终决定采用pull模式,Kafka也是如此,还是调用MQ接口保存到broker端独立选择MessageQueue和offset来拉取消息。用户在拉取消息的时候,由用户决定从哪个offset拉取哪个queue,拉取多少条消息,为什么pull方式稍微更合适现在的消息团队呢列都需要持久消息。调峰主要是通过持久化实现的,也就是需要有存储的功能。它的任务是接收消息,保存好消息,然后等待消费者拉它。那么消费者有各种各样的,消费者的能力也参差不齐,所以broker不能对消费者有太多的依赖。拉模型也有缺点。上面我们已经说过,最大的缺点就是实时性比较差,所以RocketMQ也想尽办法来缓解这些缺点。当broker发送消息时,broker会提醒消费者需要拉取消息。简而言之,经纪人和消费者相互合作。下面将详细解释RocketMQ是如何实现拉取模式的,拉取模式是指消费者主动去broker拉取消息。pull模式分为两种方式:普通轮询和长轮询。收到请求后,不管有没有数据更新,都会立即回复,也比较容易理解,实现起来也比较简单。缺点是broker比较被动,需要不断处理client连接,也就是server属于a2.longpolling是对普通polling的一种优化。当然,消费者向服务端发起请求,服务端收到后并不会立即响应,而是保持客户端连接等待。只有在数据发生变化后才会回复客户端,或者在指定的时间后没有发生变化。其实就是在一定程度上限制了正常的轮询。客户端可以随时请求服务器,但我可能不会立即回复你。RocketMQ是使用长轮询来实现拉取模式的。消费者发起拉取请求后,broker在处理拉取消息的请求时,如果没有找到消息,则不会回复消费者。相反,它将等待触发器通知消费者此事件。这里触发事件会有两个条件:1.DefaultMessageStore.ReputMessageService.run,一个定时任务,每1毫秒一次,不断检查是否有消息产生,如果检测到,会通知消费者,并将新的消息发送给Consumer2,PullRequestHoldService.run也是一个定时任务,每5秒一次,task会一个一个的检查requests,判断是否有对应的新消息,如果有直接返回给consumer,检查request是否超过了默认的Longpolling等待时间(默认15秒),如果超过了,则返回给消费者,pushConsumer怎么说?RocketMQ中的PushConsumer其实底层也是在pull模式下实现的,只是pull模式下的一层狼。因为RocketMQ后台有一个ReblanceService线程,它会偷偷去broker那里请求数据。参与者的数量是负载平衡的。每个队列产生的请求都会被放入阻塞队列,然后一个PullMessageService线程会不断从阻塞队列中获取请求,然后通过网络向broker请求。这是一个准实时拉取消息源码的PullMessageProcessor中的processRequest方法,用于处理拉取消息的请求。如果有消息返回,如果没有消息,就会进入上面提到的长轮询过程。这部分源码我就不删了。如果你有兴趣,你可以去研究
