每日一句军人天生就是放弃战斗的意义!概述RabitMQ发布确认以确保消息在磁盘上。前提条件1.队列必须是持久化的队列持久化2.队列中的消息必须是持久化的。消息持久化使用三种发布确认方式:1.单次发布确认2.批量发布确认3。异步批量发布确认方法,开启发布确认//创建连接工厂ConnectionFactoryfactory=newConnectionFactory();factory.setHost("127.0.0.1");factory.setUsername("guest");factory.setPassword("guest");Connectionconnection=factory.newConnection();Channelchannel=connection.createChannel();**//开启释放确认channel.confirmSelect();**最简单的单确认方式,是同步释放确认的方式,也就是说,发送消息后,只有确认后,后续的消息才能继续发布。最大的缺点是:释放速度特别饱满。吞吐量:每秒不超过数百条已发布的消息/***单次确认*/publicstaticvoidpublishSingleMessage()throwsException{Channelchannel=RabbitMqUtils.getChannel();//生命队列StringqueueName=UUID.randomUUID().toString();channel.queueDeclare(queueName,true,false,false,null);**//开启释放确认channel.confirmSelect();**//开始时间longbegin=System.currentTimeMillis();for(inti=0;i<1000;i++){字符串消息=i+"";channel.basicPublish("",queueName,null,message.getBytes());//立即确认一条消息**booleanb=channel.waitForConfirms();**if(b){System.out.println("消息发送成功!!!");}}//结束时间longend=System.currentTimeMillis();System.out.println("发送1000条消息,单条发布确认时间:"+(end-begin)+"ms");}相比单条消息等待确认,批量确认可以大大提高吞吐量发布一批消息,然后一起确认。当然,这种方式的缺点就是:当故障导致发布出现问题时,我们不知道是哪条消息出了问题。我们必须将整个批次保存在内存中以记录重要信息,然后重新发布消息。当然这个方案还是同步的,也会阻塞消息的发布//生命队列StringqueueName=UUID.randomUUID().toString();channel.queueDeclare(queueName,true,false,false,null);**//开启发布确认通道.confirmSelect();//批量确认消息大小intbatchSize=100;//否已确认消息数intoutstandingMessageCount=0;**//开始时间longbegin=System.currentTimeMillis();for(inti=0;i<1000;i++){字符串消息=i+"";channel.basicPublish("",queueName,null,message.getBytes());**outstandingMessageCount++;//发送的消息==批量确认前确认消息的大小if(outstandingMessageCount==batchSize){channel.waitForConfirms();优秀消息计数=0;}**}**//为了保证没有留下确认信息,再次确认if(outstandingMessageCount>0){channel.waitForConfirms();}**//结束时间longend=System.currentTimeMillis();System.out.println("发送1000条消息,确认100批次发布时间:"+(end-begin)+"ms");}异步确认使用回调函数实现消息可靠性通过,这个中间件也使用函数回调来确保是否发送成功/***异步批量确认**@throwsException*/publicstaticvoidpublishAsyncMessage()throwsException{try(Channelchannel=RabbitMqUtils.getChannel()){StringqueueName=UUID.randomUUID().toString();channel.queueDeclare(queueName,false,false,false,null);**//开启释放确认channel.confirmSelect();**//线程安全有序哈希表,适用于高并发情况//1.轻松关联序号和消息2.轻松批量删除item只要给定序号3.支持并发访问ConcurrentSkipListMapoutstandingConfirms=newConcurrentSkipListMap<>();**//确认收到消息的回调**//1.消息序号//2.multiple是否为批量确认//false确认当前序号消息ConfirmCallbackackCallback=(sequenceNumber,multiple)->{if(multiple){//返回的未确认消息小于或等于当前序号的是一张地图ConcurrentNavigableMapconfirmed=outstandingConfirms.headMap(sequenceNumber,true);//清除这部分未确认消息confirmed.clear();}else{//只清除当前序号的消息outstandingConfirms.remove(sequenceNumber);}};//未确认消息的回调ConfirmCallbacknackCallback=(sequenceNumber,multiple)->{Stringmessage=outstandingConfirms.get(sequenceNumber);System.out.println("发布消息"+message+"未确认,序列号"+sequenceNumber);};**//添加异步确认的监听器//1.确认收到消息的回调//2.收不到消息的回调channel.addConfirmListener(ackCallback,nackCallback);**longbegin=System.currentTimeMillis();for(inti=0;i<1000;i++){Stringmessage="message"+i;**//channel.getNextPublishSeqNo()获取下一条消息的序号//将消息体与序号关联起来,都是未确认的消息体//要发布的序号保存号码并发布消息到地图outstandingConfirms.put(channel.getNextPublishSeqNo(),message);**channel.basicPublish("",queueName,null,message.getBytes());}longend=System.currentTimeMillis();System.out.println("发布"+1000+"异步确认消息,耗时"+(end-begin)+"ms");}}如何处理异步未确认消息的最佳解决方案是将未确认消息放入发布线程可以访问的基于内存的队列中。适用于ConcurrentLinkedQueue等高并发队列。该队列在确认回调和发布线程之间传输消息。ConcurrentSkipListMap等都有。如何保证面试题中消息不丢失?就市面上常见的消息队列而言,只要配置得当,我们的消息是不会丢失的。消息队列主要有三个阶段:1.生产消息2.存储消息3.消费消息1.生产消息的生产者向Broker发送消息,需要处理Broker的响应。无论消息是同步发送还是异步发送,同步和异步回调都需要做一次try-catch才能正确处理响应。如果Broker返回写入失败等错误信息,则需要重试发送。当多次发送失败需要告警,日志记录等,保证消息生产阶段不会丢失消息。2.存储消息在存储消息阶段,需要在消息flush后响应producer。假设消息写入缓存并返回响应,那么机器突然断电,消息没有了,生产者认为已经发送成功。如果Broker部署在集群中,存在多副本机制,即消息不仅要写入当前Broker,还需要写入副本机器。它被配置为在响应生产者之前写入至少两台机器。这样就可以基本保证存储的可靠性。一个挂了,一个还在(如果怕两个都挂了..那就多了)。3.我们应该在消费者真正执行完业务逻辑后,再将消费消息发送给Broker。这才是真正的消费。所以只要我们在消息业务逻辑处理完成后响应Broker,消费阶段的消息就不会丢失。总结:1、producer需要妥善处理Broker的response,在error2时使用retry、alarm等手段。Broker需要控制响应的时间。单机情况下,刷新消息后返回response。在集群有多个副本的情况下,发送到两个或多个副本后返回响应。3、消费者执行完真正的业务逻辑后需要返回对Brokervolatile关键字的响应?1.保证内存可见性1.1基本概念可见性是指线程之间的可见性,一个线程修改的状态对另一个线程可见。也就是说,一个线程修改的结果可以立即被另一个线程看到。1.2实现原理在读写非易失性变量时,每个线程首先将变量从主存中复制到CPU缓存中。如果计算机有多个CPU,每个线程可能在不同的CPU上处理,这意味着每个线程都可以复制到不同的CPU缓存中。volatile变量不会缓存在寄存器或者其他处理器不可见的地方,保证变量每次读写都是从主存中读取,跳过CPU缓存这一步。当一个线程修改这个变量的值时,新值立即为其他线程所知。2.禁止指令重排序2.1基本概念指令重排序是通过JVM优化指令,提高程序运行效率,在不影响单线程程序执行结果的情况下,尽可能提高并行度。指令重排序包括编译器重排序和运行时重排序。在JDK1.5之后,可以使用volatile变量来禁用指令重排序。对于volatile修饰的变量,会在读写操作指令前后插入内存屏障。当指令重新排序时,后续指令不能重新排序到内存屏幕。示例说明:doubler=2.1;//(1)doublepi=3.14;//(2)doublearea=pi*r*r;//(3)虽然代码语句的定义顺序是1->2->3,但是计算顺序是1->2->3和2->1->3对结果没有影响,所以语句1和2可以在编译时和运行时根据需要重新排序。2.2指令重排带来的问题在线程A{context=loadContext();inited=true;}在线程B中{if(inited)fun(context);}如果线程A中的指令被重新排序,那么B很可能会得到一个未初始化或未初始化的上下文,这将导致程序错误。2.3禁止指令重排原则olatile关键字提供了一个内存屏障来防止指令重排。当编译器生成字节码文件时,它会在指令序列中插入内存屏障,以禁止特定类型的处理器重新排序。JVM内存屏障插入策略:在每次volatile写操作之前插入一个StoreStore屏障;在每次易失性写入操作后插入一个StoreLoad屏障;在每个易失性读取操作之后插入一个LoadLoad屏障;在每个易失性读取操作之后插入一个LoadLoad屏障在.3.适用场景(1)volatile关键字不能同时保证内存可见性和原子性。锁机制可以保证可见性和原子性。(2)volatile屏蔽了JVM中必要的代码优化,所以效率比较低,所以必须只在必要的时候使用这个关键字。告诉我有关Netty的信息?Netty是一个高性能、异步事件驱动的NIO框架。简化和优化TCP、UDP套接字等网络编程,优化性能、安全等多方面。3.支持多种协议,如FTP、SMTP、HTTP和各种二进制和基于文本的传统协议。在网络编程中,Netty是绝对的王者。有许多使用Netty的开源项目。1、市面上很多消息推送系统都是基于Netty的。2、我们常用的框架:Dubbo、RocketMQ、ES等都是用的Netty。使用Netty进行项目统计:https://netty.io/wiki/related-projects.html大家好,我是yltrcc,每天分享一些技术细节。欢迎关注我:ylcoder