当前位置: 首页 > 科技观察

听叔叔的建议,消息队列的水太深了,你抓不住!

时间:2023-03-13 23:22:26 科技观察

本文转载自微信公众号《JAVA日之录》,作者:单色。转载本文请联系JAVA日知录公众号。很多人在做架构设计的时候容易“过度设计”。简单问题复杂,一上来就引用一堆中间件。我认为主要原因有以下几点:为展示(学习)技术而设计架构。我们常说,技术是为业务服务的,不要为了技术而依赖技术。引入一堆复杂的架构来炫耀技术是不可取的。考虑到问题不全面,或者广度不够,不知道如何简化。例如,假设有一个高并发的用户平台,需要处理注册(写)和登录查询(读)功能,在数据库层做主从同步。有人介绍了一个Redis来解决主从同步延迟问题。他们想写主库的时候同时写Redis,读的时候直接读Redis,这样就可以避免主从同步延迟的问题。这是一个典型的考虑。不全面。这样虽然可以解决主从延迟问题,但是也会导致双写一致性事务的发生。不如直接把主从同步方式改成强同步复制,直接从数据库层面保证一致性。那么你可能会说,改成强同步复制会增加响应时间,影响系统吞吐量。那我们也可以为用户创建一个分库,多做主从同步?,跑题了,我们今天不是讲了消息队列吗?哦,让我们言归正传。今天我们来说说消息队列的问题。希望大家在看完这篇文章后,在介绍消息队列的时候能够有所思考。有必要介绍吗?引入消息队列后出现的问题能否解决?消息队列在微服务开发中的作用我们经常会引入消息中间件来实现业务解耦,进行异步操作。下面我们就来看看使用消息中间件的优缺点。首先要明确的是,使用消息组件有很多好处,最核心的三个是:解耦、异步、削峰。解耦:客户端只需要将请求发送到特定的通道,不需要感知接收请求的实例。异步:向消息队列写入消息,非必要的业务逻辑异步运行,加快响应速度。削峰:消息中间件缓存消息直到被消费,消息处理端可以根据自己能处理的并发量从消息队列中慢慢处理消息,不会瞬间压垮业务。当然,消息中间件不是灵丹妙药。引入消息机制后,会有如下一些缺点:潜在的性能瓶颈:消息代理可能存在性能瓶颈。好在目前主流的消息中间件都支持高度的横向扩展。潜在的单点故障:消息代理的高可用性至关重要,否则将影响系统的整体可靠性。幸运的是,大多数消息中间件都是高可用的。额外的运维复杂度:消息系统是一个必须独立安装、配置和运行的系统组件,增加了运维的复杂度。我们可以借助消息中间件本身提供的扩展和高可用能力来解决这些缺点,但是我们需要注意一些可能遇到的设计难点,才能真正用好消息中间件。消息队列的设计问题处理并发和顺序消息为了提高生产环境中应用的消息处理能力和吞吐量,消费者一般会部署多个实例节点。接下来的挑战是如何确保每条消息仅按发送顺序处理一次。例如:假设有3个相同的receiver实例从同一个点对点通道读取消息,sender依次发布OrderCreated、OrderUpdated、OrderCancelled3个事件消息。简单的消息传递实现可能同时将每条消息发送给不同的收件人。如果由于网络问题而出现延迟,则消息可能不会按照发送的顺序进行处理,这将导致奇怪的行为,即服务实例可能在另一个服务器处理OrderCreated消息之前处理OrderCancelled消息。Kafka使用的解决方案是使用分片(分区)通道。整体方案分为三部分:一个topicchannel由多个shard组成,每个shard相当于一个channel。发送者在消息头中指定一个shardkey如orderId,Kafka使用shardkey将消息分配给特定的shard。将接收器的多个实例组合在一起,并将它们视为相同的逻辑接收器(消费者组)。Kafka将每个分片分配给一个接收器,接收器在接收器启动和关闭时重新分配分片。如上所示,每个Order事件消息都将orderId作为其分片键。特定订单的每个事件都发布到同一个分片。此外,该分片中的消息始终由同一个接收方实例读取,因此可以保证按顺序处理它们。处理重复消息消息传递体系结构必须解决的另一个挑战是处理重复消息。理想情况下,消息代理应该只传递一次消息,但确保一次且仅传递一次消息的成本通常很高。相反,许多消息传递组件承诺至少有一个有保证的消息传递。一般情况下,消息组件只会传递一次消息。但是客户端、网络或消息传递组件中的故障可能会导致消息被多次传递。假设客户端的数据库在处理完消息后还没发送确认消息就崩溃了,消息组件会在数据库重启时再次向客户端发送未确认的消息。有两种不同的方法来处理重复消息:编写一个幂等消息处理程序跟踪消息并丢弃重复项编写一个幂等消息处理程序如果应用程序的消息处理逻辑是幂等的,那么重复消息是无害的。程序的幂等性意味着即使以相同的输入参数重复调用应用程序,也不会产生额外的效果。例如:取消取消的订单是一个幂等操作。创建现有订单操作也是如此。只要消息组件在传递消息时保持相同的消息顺序,就可以安全地多次执行幂等消息处理程序。但不幸的是,应用程序通常不是幂等的。或者您现在使用的消息传递组件在重新传递消息时不会保留顺序。重复或乱序的消息会导致错误。在这种情况下,您需要编写消息处理程序来跟踪消息并丢弃重复的消息。跟踪消息并丢弃重复项考虑一个授权消费者信用卡的消息处理程序。每个订单只能执行一次信用卡授权。每次调用应用程序的这一部分都会产生不同的效果。如果重复的消息导致消息处理程序多次执行逻辑,应用程序将无法正常运行。执行此类应用程序逻辑的消息处理程序必须通过检测和丢弃重复消息使其幂等。一个简单的解决方案是让消息接收者使用消息ID跟踪他处理过的消息并丢弃任何重复的消息。例如,将它消费的每条消息的消息ID存储在数据库表中。当接收方处理消息时,它会在数据表中记录消息的消息ID,作为创建和更改业务实体的事务的一部分。如上图所示,接收方向PROCESSED_MESSAGE表中插入包含消息id的行。如果消息是重复的,则INSERT将失败,接收方可以选择丢弃该消息。另一种解决方案是消息处理程序将消息id记录在应用程序表而不是专用表中。这种方法在使用具有受限事务模型的NoSQL数据库时特别有用,因为NoSQL数据库通常不支持将两个表的更新作为数据库事务。处理事务性消息传递服务通常需要在更新数据库的事务中发布消息,数据库更新和消息发送都必须在事务中发生,否则服务可能会更新数据库,然后在发送消息之前崩溃。如果服务不以原子方式执行这两个操作,则此类故障可能会使系统处于不一致状态。接下来我们来看两种常用的事务性消息保障方案,最后看看现代消息组件RocketMQ的事务性消息方案。使用数据库表作为消息队列如果你的应用使用的是关系型数据库,可以直接使用事务性发件箱模式,TransactionalOutbox,保证数据更新和消息发送之间的事务。此模式使用数据库表作为临时消息队列。如上图所示,发送消息的服务有一个OUTBOX数据表。在执行INSERT、UPDATE、DELETE业务操作时,还会向OUTBOX数据表INSERT发送一条消息记录,由于是基于本地ACID事务,可以保证原子性。.OUTBOX表作为一个临时的消息队列,然后我们引入一个消息中继(MessageRelay)服务,从OUTBOX表中读取数据,发布消息到消息组件。消息中继的实现可以非常简单。只需要通过定时任务定时从OUTBOX表中拉取最新未发布的数据,获取数据后发送给消息组件即可。最后,从OUTBOX表中删除已发送的消息。能。另一种使用事务日志来发布事件以确保事务消息的方法是基于数据库事务日志,这就是所谓的数据更改捕获,ChangeDataCapture,简称CDC。一般数据库会在数据发生变化时记录事务日志(TransactionLog),比如MySQL的binlog。事务日志可以简单理解为数据库的一个本地文件队列,主要按照时间顺序记录数据库表的变化。这里我们使用阿里巴巴的开源组件canal结合MySQL来说明这种模式的工作原理。更多操作说明请参考官方文档:https://github.com/alibaba/canalCanal的工作原理是模拟MySQLslave的交互协议,伪装成MySQLslave节点,向MySQLmaster发送dump协议;MySQLmaster收到dump请求后,开始向slave(即canal)推送二进制日志;canal解析二进制日志对象(原本是字节流),然后将解析后的数据直接发送给消息组件。RocketMQ事务消息解决方案ApacheRocketMQ在4.3.0版本已经支持分布式事务消息。RocketMQ采用2PC的思想来实现事务消息的提交,同时增加了一个补偿逻辑来处理二阶段超时或者失败的消息,如下图所示。RocketMQ对事务消息的实现主要分为两个阶段:正常事务的发送和提交,以及事务信息的补偿过程。整体流程为:正常的事务发送和提交阶段1.生产者向MQServer发送半消息(半消息是指消费者暂时无法消费的消息)2.服务器响应消息写入结果,以及半消息发送成功3.开始执行本地事务4.根据本地事务的执行状态执行Commit或Rollback操作事务信息补偿过程1.如果MQServer还没有收到本地事务的执行状态时间长了,就会向生产者发起确认复查操作请求2.生产者收到确认复查请求后,检查本地事务的执行状态3.根据检查结果执行Commit或Rollback操作。补偿阶段主要用于解决生产者在发送Commit或Rollback操作时超时或失败的情况。当生产者使用RocketMQ发送事务消息时,我们也会借鉴第一种方案,即建立事务日志表,然后在执行本地事务时同时生成一条事务日志记录,使得本地事务和日志事务在同一个方法中,同时添加@Transactional注解,保证两个操作事务是一个原子操作。这样,如果事务日志表中有本地事务的信息,则说明本地事务执行成功,需要Commit。反之,如果没有对应的事务日志,则说明执行不成功,需要回滚。