本文已收录到Github仓库,里面包括计算机基础、Java基础、多线程、JVM、数据库、Redis、Spring、Mybatis、SpringMVC、SpringBoot、分布式、微服务、设计模式、架构、校招社招等核心知识点分享,欢迎star~Github地址:https://github.com/Tyson0314/Java-learning为什么要用消息队列?总结起来,主要有3个原因:解耦、异步、调峰。1.解耦。例如,用户下单后,订单系统需要通知库存系统。如果无法访问库存系统,订单将无法减少库存,导致订单操作失败。订单系统与库存系统相结合。如果此时使用消息队列,可以将成功返回给用户。首先,坚持信息。库存系统恢复后,可以正常消费,减去库存。2.异步。向消息队列写入消息,非核心业务逻辑异步运行,不影响主流程业务。3、削峰。消费者根据数据库能处理的并发量慢慢从消息队列中拉取消息。在生产中,这种短暂的峰值积压是允许的。例如,尖峰活动通常会导致流量激增,应用程序会因流量过大而挂起。此时添加消息队列。服务器端收到用户的请求后,首先写入消息队列。如果消息队列长度超过最大数量,则直接丢弃用户请求或跳转到错误页面。使用消息队列有什么缺点?系统可用性降低。引入消息队列后,如果消息队列挂了,可能会影响业务系统的可用性。增加了系统的复杂性。加入消息队列后,需要考虑很多问题,比如:一致性问题,如何保证消息不被重复消费,如何保证消息的可靠传输等。常见消息队列对比及对比方向总结吞吐量ActiveMQ和RabbitMQ的吞吐量在10000(ActiveMQ的性能最差)比RocketMQ和Kafka的吞吐量在100000甚至百万级低一个数量级。可用性可以实现高可用性。ActiveMQ和RabbitMQ都是基于主从架构来实现高可用的。RocketMQ基于分布式架构。Kafka也是分布式的,一份数据多副本,少数机器宕机,没有数据丢失,没有不可用的时效性RabbitMQ是基于erlang开发的,因此并发性强,性能优异,延迟低,达到微秒级别。其他三个都是ms级别的。功能支持除了Kafka,其他三个功能都比较齐全。Kafka功能比较简单,主要支持简单的MQ功能。被大规模用于大数据领域的实时计算和日志采集。这是事实上的标准。丢失activeMQ和RabbitMQ的可能性很低。RocketMQ和Kafka理论上不会丢失。总结:ActiveMQ社区比较成熟,但是相对于现在的性能,ActiveMQ的性能比较差,版本迭代很慢,不推荐使用。虽然RabbitMQ在吞吐量上略逊于Kafka和RocketMQ,但由于是基于erlang开发的,并发性强,性能优异,延迟低,达到微秒级别。但也因为RabbitMQ是基于erlang开发的,国内很少有公司有实力做erlang源码层面的研究和定制。如果业务场景对并发量要求不高(十万级、百万级),那么这四种消息队列中,RabbitMQ一定是你的首选。大数据领域的实时计算、日志采集等场景,用Kafka是行业标准,绝对没问题,社区很活跃,永远不会黄,更何况几乎是一个全世界该领域的事实规范。RocketMQ是阿里出品的Java开源项目,我们可以直接阅读源码,然后自定义自己公司的MQ,RocketMQ有阿里巴巴实际业务场景的实战检验。RocketMQ社区比较活跃,不过还好。文档比较简单,接口不符合标准的JMS规范。有些系统需要修改很多代码才能迁移。还有就是阿里引进的技术。你必须做好,以防这个技术被抛弃,社区变黄的风险。如果你们公司有技术实力,我觉得用RocketMQ是非常不错的。Kafka的特点其实很明显,就是只提供了相对较少的核心功能,但是提供了超高的吞吐量,ms级的延迟,高可用和可靠,分布可以任意扩展。同时,Kafka最好支持少量的topic,以保证其超高的吞吐量。kafka唯一的缺点就是有可能重复消费消息,这对数据的准确性会有非常轻微的影响。在大数据和日志采集领域,这种轻微的影响可以忽略不计。这个特性天然适用于大数据和日志的实时计算。收集。如何保证消息队列的高可用?RabbitMQ:镜像集群模式RabbitMQ基于主从高可用。Rabbitmq有三种模式:单机模式、普通集群模式、镜像集群模式。单机模式一般很少在生产环境中使用。普通集群模式只是提高了系统的吞吐量,让集群中的多个节点服务于某个Queue的读写操作。那么真正实现RabbitMQ高可用的是镜像集群模式。镜像集群模式和普通集群模式的区别在于,创建的Queue,无论是元数据还是Queue中的消息,都会存在于多个实例上,然后每次向Queue写入消息时,都会自动链接到多个实例。消息同步队列。这种设计的好处是任何一台机器宕机都不会影响其他机器的使用。缺点是:1.性能开销过高:消息同步到所有机器,网络带宽压力大,消耗大;2、可扩展性差:如果一个Queue负载很重,即使增加一台机器,新增加的机器也会包含这个Queue的所有数据,而你的Queue是没有办法线性扩展的。Kafka:分区和副本机制Kafka的基本架构是由多个broker组成,每个broker是一个节点。创建一个topic可以分为多个partition,每个partition可以存在于不同的broker上,每个partition可以存储一些数据,这是一个天然的分布式消息队列。也就是说,一个topic的数据是分散在多台机器上的,每台机器都存储一部分数据。Kafka0.8之前是没有HA机制的。如果任何代理出现故障,则无法写入或读取其分区。没有高可用性可言。Kafka0.8之后,提供了HA机制,也就是replica副本机制。每个分区的数据都会同步到其他机器上,形成自己的多个replica副本。然后所有的replica都会选出一个leader,生产和消费都会和这个leader打交道,然后其他的replica都是follower。写的时候leader会负责把数据同步给所有的follower,读的时候直接读取leader上的数据即可。Kafka会将一个partition的所有replicas平均分布在不同的机器上,从而提高容错能力。MQ常用协议AMQP协议AMQP即AdvancedMessageQueuingProtocol,一种提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的开放标准,专为面向消息的中间件而设计。基于该协议的客户端和消息中间件可以传递消息,不受不同客户端/中间件产品、不同开发语言等条件的限制。优点:可靠、通用的MQTT协议MQTT(MessageQueuingTelemetryTransport)是IBM开发的一种即时通讯协议,有可能成为物联网的重要组成部分。该协议支持所有平台,可以将几乎所有连接的事物与外界连接起来。它被用作传感器和执行器的通信协议(例如通过Twitter连接房屋)。优点:格式简单、带宽小、移动通信、PUSH、嵌入式系统STOMP协议STOMP(StreamingTextOrientatedMessageProtocol)是一种面向流式文本的消息协议,是一种MOM(MessageOrientedMiddleware,面向消息的中间件)设计的一种简单的文本协议。STOMP提供了一种可互操作的连接格式,允许客户端与任何STOMP消息代理(Broker)进行交互。优点:命令模式(非主题/队列模式)XMPP协议XMPP(ExtensibleMessagingandPresenceProtocol)是一种基于可扩展标记语言(XML)的协议,主要用于即时消息(IM)和在线站点探测。适用于服务器之间的准即时操作。它的核心是基于XML流,这种协议最终可能允许互联网用户向互联网上的任何其他人发送即时消息,即使他们的操作系统和浏览器不同。优点:一般开放性、兼容性强、可扩展性强、安全性高,但XML编码格式占用带宽大其他基于TCP/IP的自定义协议:一些特殊框架(如:redis、kafka、zeroMq等)没有根据自己的需求定制严格遵循MQ规范,而是封装了一套基于TCP\IP的协议,通过网络socket接口传输,实现MQ的功能。MQ通信方式点对点通信:点对点模式是最传统最常见的通信方式,它支持一对一、一对多、多对多、多对一等多种配置方式,支持树形、网状等拓扑结构。组播:MQ适用于不同类型的应用。最重要和正在开发的应用程序之一是“多播”应用程序,它可以将消息发送到多个目标站点(目标列表)。单个MQ命令可用于将单个消息发送到多个目标站点,并确保向每个站点可靠地传递信息。MQ不仅提供了组播的功能,还具有智能消息分发的功能。当向同一系统上的多个用户发送消息时,MQ会将消息的副本版本和系统上的接收者列表发送到TargetMQ系统。目标MQ系统在本地复制这些消息并将它们发送到列表中的队列,从而最大限度地减少网络流量。发布/订阅(Publish/Subscribe)模式:Publish/Subscribe功能使得消息的分发能够突破目的队列的地理点的限制,使得消息可以根据特定的主题甚至内容进行分发,而用户或应用程序可以根据主题或内容接收所有消息。需要的消息。发布/订阅功能使得发送方和接收方之间的耦合关系更加松散,发送方不需要关心接收方的目的地址,接收方也不需要关心消息的发送地址,只发送消息根据消息的主题发送和接收。在MQ家族产品中,MQEventBroker是专门用于使用发布/订阅技术进行数据通信的产品。它支持基于队列和直接基于TCP/IP的发布和订阅。Cluster:为了简化点对点通信方式下的系统配置,MQ提供了Cluster方案。集群类似于域。集群中的队列管理器相互通信时,不需要在它们之间建立消息通道。相反,他们使用Cluster通道与其他成员进行通信,这大大简化了系统配置。此外,集群中的队列管理器可以自动进行负载均衡。当一个队列管理器发生故障时,其他队列管理器可以接替它的工作,从而大大提高了系统的高可靠性。如何保证消息的顺序?RabbitMQ拆分多个Queue,每个Queue有一个Consumer;或者只是一个Queue但是对应一个Consumer,然后Consumer内部使用一个内存队列进行排队,然后分发给不同的底层Worker进行处理。Kafka有一个主题、一个分区和一个消费者。它使用单线程进行内部消费。单线程吞吐量太低,所以一般不用。写入N个memoryQueue,相同key的数据全部进入同一个memoryQueue;那么对于N个线程,每个线程可以消费一个内存Queue,这样顺序就可以保证了。如何避免消息重复消费?在消息生产过程中,MQ内部会为生产者发送的每条消息生成一个唯一的id,作为去重和幂等(消息投递失败重传)的基础,避免重复的消息进入队列。消费消息时,要求消息体中有一个全局唯一的id作为去重和幂等的基础,避免重复消费同一条消息。大量消息长期积压在MQ中,如何解决?一般这个时候只能应急临时扩容。具体操作步骤和思路如下:先修复问题消费者,确保其消费速度恢复,然后停止现有消费者;新建一个topic,分区是原来的10倍。临时建立原来10倍的队列数;然后编写一个临时消费者程序用于分发数据。这个程序被部署来消费积压的数据。消费后不做耗时处理,直接对临时创建的10个队列进行均匀轮询写入。队列数量加倍;然后临时使用10倍数量的机器部署消费者,每批消费者从一个临时队列中消费数据。这种做法相当于暂时将队列资源和消费者资源扩大10倍,以10倍正常速度消费数据;快速消费完积压的数据后,必须恢复原来部署的架构,重新使用原来的消费机消费消息。MQ中的消息过期了怎么办?如果使用RabbitMQ,RabbtiMQ可以设置过期时间(TTL)。如果消息在Queue中积压超过一定时间,就会被RabbitMQ清空,数据也就没了。这时候的问题不是大量的数据会积压在MQ中,而是大量的数据会直接丢失。这样的话,并不是增加了消费者消费消息的积压,因为实际上并没有积压,而是丢失了很多消息。我们可以采取的一种解决方案是批量重定向。即当大量积压时,直接将数据写入数据库,然后在高峰期过后,将数据一点一点地检出,然后重新填入MQ,以弥补丢失的数据。消息中间件是如何实现高可用的?以卡夫卡为例。Kafka的基本集群架构由多个broker组成,每个broker是一个节点。当你创建一个主题时,它可以分为多个分区,每个分区包含一部分数据,这些数据存在于不同的代理上。也就是说,一个topic的数据是分散在多台机器上的,每台机器都存储一部分数据。每个分区存储一部分数据。如果对应的broker挂了,这部分数据会不会丢失?这不保证高可用性吗?Kafka0.8之后,提供了多副本机制来保证高可用,即每个分区的数据都会同步到其他机器上,形成多副本。然后所有的replica都会选出一个leader,让leader处理生产和消费者,其他的replica都是follower。写入数据时,leader负责将数据同步给所有follower。读取消息时直接读取leader上的数据即可。如何保证高可用?假设某个broker挂了,这个broker上的分区在其他机器上有副本。如果领导者的经纪人被链接怎么办?其他追随者将重新选举领导者。如何保证数据一致性,如何实现交易消息?一条普通的MQ消息,从生成到消费,大致流程是这样的:生产者生成消息,用MQ服务器发送,MQ接收消息,持久化消息到存储系统。MQ服务器返回ACK给生产者。MQ服务器将消息推送给消费者。消费者消费消息并响应ACK。MQ服务器收到ACK,认为消息消费成功,即删除存储中的消息。让我们以下订单为例。订单系统创建订单后,向下游系统发送消息。如果订单创建成功,消息发送不成功,下游系统将无法感知此事,造成数据不一致。如何保证数据的一致性?可以使用交易消息。我们来看看事务消息是如何实现的。生产者生成消息并向MQ服务器发送半事务性消息。MQ收到消息后,将消息持久化到存储系统。此消息的状态是待发送。MQ服务器向生产者返回ACK确认。此时MQ不会触发消息推送事件。生产者执行本地事务。如果本地事务执行成功,将commit执行结果发送给MQ服务器;如果执行失败,则发送回滚。如果是正常提交??,则MQ服务器更新消息状态为可发送;如果是回滚,则删除消息。如果消息的状态更新为可发送,MQ服务器会将消息推送给消费者。消费者消费后返回ACK。如果MQ服务器长时间没有收到producer的commit或者rollback,就会回溯producer,然后根据查询结果执行finalstate。如何设计消息队列?首先是消息队列的整体流程。生产者向broker发送消息,broker存储,broker发送给消费者消费,消费者回复确认消费。生产者向broker发送消息,broker向consumer发送消息进行消费。然后需要两个RPC。如何设计RPC?可以参考开源框架Dubbo,可以说说broker是如何考虑持久化的,比如服务发现,序列化协议等等,是放到文件系统还是数据库,会不会有消息堆积,如何处理消息堆积。如何维系消费者关系?点对点还是广播?广播关系如何维持?如何保证zk或configserver消息的可靠性?如果消息重复,如何幂等处理?如何设计消息队列的高可用?可以参考Kafka的高可用保障机制。多副本->leader&follower->broker挂掉,重新选举leader对外服务。消息事务特性,与本地业务相同的事务,本地消息存储在数据库中;消息投递到服务器,本地被删除;计划任务扫描本地消息数据库并补偿发送。MQ具有扩展性和可伸缩性。如果出现消息积压或者资源不足,如何支持快速扩容,提高吞吐量?可以参考Kafka的设计理念,broker->topic->partition,每个partition放一台机器,只存储一部分数据。如果现在资源不够用,干脆给topic加个partition,然后进行数据迁移,加机器,这样就可以存储更多的数据,提供更高的吞吐量。参考多线程异步和MQ区别的CPU消耗。多线程异步可能存在CPU竞争,但是MQ不会消耗机器的CPU。MQ方式实现异步完全解耦,适用于大型互联网项目。削峰或消息累积功能。当业务系统处于高并发时,MQ可以在Broker实例中积累消息,多线程会创建大量线程,甚至触发拒绝策略。MQ的使用引入了中间件,增加了项目的复杂度和运维难度。一般来说,小型项目可以使用多线程实现异步,大型项目推荐使用MQ实现异步。最后给大家分享一个Github仓库,里面有大斌编译的300多本经典计算机书籍PDF,包括C语言、C++、Java、Python、前端、数据库、操作系统、计算机网络、数据结构还有算法,机器学习,编程生活等等,可以star一下,下次找书的时候可以直接在上面搜索,仓库持续更新中~Github地址:https://github.com/Tyson0314/java-books
