众所周知,redis是一个高性能的分布式key-value存储系统。在NoSQL数据库市场,redis本身就占据了近半壁江山,足以见得其实力。同时,由于redis的单线程特性,我们可以将其作为消息队列来使用。本文将讲讲如何将redis集成到springboot中,并作为消息队列使用...1.什么是消息队列?“消息队列”是消息传输过程中存储消息的容器。——《百度百科》Message可以理解为在计算机中或者在整个计算机网络中传输的数据。队列是我们学习数据结构时学习的基本数据结构之一,它具有先进先出的特点。所以,消息队列是一个存放消息的容器,具有先进先出的特点。为什么会有消息队列?异步:在常见的B/S架构下,客户端向服务器发送请求,但服务器处理消息需要很长时间。如果客户端等待服务器处理完消息,会造成客户端系统资源的浪费;使用消息队列后,服务端直接将消息推送到消息队列中,由专门的消息处理程序处理消息,这样客户端就不用花费大量时间等待服务端的响应;解耦:传统软件在开发模式下,模块之间的调用都是直接调用。这样的系统不利于系统的扩展。同时,模块之间的相互调用和数据的共享也很成问题。每个模块都要时刻考虑其他模块会不会挂掉?使用消息队列后,模块不直接调用,而是传递数据,当某个模块挂掉时,数据仍会保存在消息队列中。最典型的就是生产者消费者模型,本案例中使用的就是这种模型;削峰填谷:在某个时刻,系统的并发请求急剧增加,远远超过了系统的最大处理能力,如果不做任何处理,系统就会崩溃;使用消息队列后,服务端将请求推送到消息队列,专门的消息处理程序以合理的速度消费消息,减轻服务端的压力。我们简单看一下下图中的消息队列。从上图可以看出,消息队列起到了中间人的作用。我们可以通过操作这个消息队列来保证我们系统的稳定性。2.环境准备Java环境:jdk1.8springboot版本:2.2.1.RELEASEredis-server版本:3.2.1003.相关依赖这里只展示redis相关的依赖,org.springframework.bootspring-boot-starter-data-redisorg.springframework.integrationspring-integration-redis这里解释一下这两个依赖:第一个依赖是对redisNoSQL的支持,第二个依赖是springintegration和redis的结合。这里添加这段代码主要是为了实现分布式锁。四、这里的配置文件只显示redis相关的配置#redis所在地址spring.redis.host=localhost#redis数据库索引,从0开始,可以查看spring.redis.database=1的端口#redis来自redis的可视化客户端,默认是6379spring.redis.port=6379#redispasswordspring.redis.password=#连接redis的超时时间(ms),默认是2000spring.redis.timeout=5000#连接池最大连接数spring.redis.jedis.pool.max-active=16#连接池最小空闲连接spring.redis.jedis.pool.min-idle=0#连接池最大空闲连接spring.redis.jedis.pool.max-idle=16#连接池最大阻塞等待时间(负数表示无限制)spring.redis.jedis.pool.max-wait=-1#连接redis的客户端名称为spring.redis.client-name=mall5.代码配置redis作为消息队列。它在springboot中的主要表现是一个RedisTemplate.convertAndSend()方法和一个MessageListener接口。所以我们需要在IOC容器中注入一个RedisTemplate和一个MessageListener接口的类。话不多说,先看代码配置RedisTemplate配置RedisTemplate的主要目的是配置序列化方式,解决乱码问题。同时,合理配置序列化方式也能减少一点性能开销。/***配置RedisTemplate解决乱码*/@BeanpublicRedisTemplateredisTemplate(RedisConnectionFactoryfactory){LOGGER.debug("redis序列化配置开始");RedisTemplatetemplate=newRedisTemplate<>();template.setConnectionFactory(factory);//字符串序列化方式RedisSerializerserializer=newGenericJackson2JsonRedisSerializer();//设置默认序列化方式;LOGGER.debug("redis序列化配置结束");returnttemplate;}代码第12行,我们配置默认序列化方式为GenericJackson2JsonRedisSerializer代码第13行,我们配置key序列化方式为StringRedisSerializer代码第14行,我们配置哈希表的value的序列化方式为GenericJackson2JsonRedisSerializerRedisTemplate几种序列化方式的简单介绍六、redis队列监听器(consumer)上面提到了redis队列监听器相关的类是一个名为MessageListener的接口,下面是该接口的源码publicinterfaceMessageListener{voidonMessage(Messagemessage,@Nullablebyte[]pattern);}可以看到接口只有一个onMessage(Messagemessage,@Nullablebyte[]pattern)方法,这是监听到队列中的消息后的回调方法。下面对这两个参数进行解释:message:Redis消息类。这个类只有两个方法:byte[]getBody()和Getthemessagebodybyte[]inbinaryformgetChannel()Getthemessagechannelpatterninbinaryform:themessagechannelinbinaryform,同上message.getChannel()的返回值介绍完接口,我们来实现一个简单的redis队列监听器@ComponentpublicclassRedisListenerimplementMessageListener"监听消息通道={}",newString(pattern));LOGGER.debug("监听消息通道={}",newString(message.getChannel()));LOGGER.debug("metamessage={}",newString(message.getBody()));//新建一个用于反射的序列化对象,注意这里的对象要和之前配置的一致//因为我前面设置的默认序列化方式是GenericJackson2JsonRedisSerializer//所以这里的实现方式是GenericJackson2JsonRedisSerializerRedisSerializerserializer=newGenericJackson2JsonRedisSerializer();LOGGER.debug("反序列化下面的message={}",serializer.deserialize(message.getBody()));}}代码很简单,就是输出参数中包含的关键信息。需要注意的是,RedisSerializer的实现要和上面配置的序列化方式保持一致。队列监听器实现之后,我们还需要将这个监听器添加到redis队列监听器容器中,代码如下:redisListener,newPatternTopic("demo-channel"));returncontainer;}这几行代码大致意思是创建一个Redis消息监听器容器,然后将监听器绑定到管道名上,最后返回到这个容器中。这里需要注意的是,这个管道名必须和下面会提到的推送消息时的管道名一致,否则监听器将无法监听到消息。7、Redis队列推送服务(生产者)上面我们配置了RedisTemplate在这里使用。代码如下:@ServicepublicclassPublisher{@AutowriteprivateRedisTemplateredis;publicvoidpublish(Objectmsg){redis.convertAndSend("demo-channel",msg);}}关键代码在第7行,redis.convertAndSend()方法的作用是发送一个Channel(参数1)来推送一条消息(第二个参数)。这里还是要注意上面的,生产者和消费者的频道名一定要一样。至此,消息队列的生产者和消费者都写完了。八、遇到的问题及解决方案1、springboot使用log4j2日志框架问题我在spring-boot-starter-web中添加了spring-boot-starter-log4j2依赖并排除了spring-boot-starter-logging后,运行项目,如下还是会提示错误:SLF4J:ClasspathcontainsmultipleSLF4Jbindings.SLF4J:Foundbindingin[jar:file:.....m2/repository/ch/qos/logback/logback-classic/1.2.3/logback-classic-1.2.3.jar!/org/slf4j/impl/StaticLoggerBinder.class]SLF4J:Foundbindingin[jar:file:.....m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.12.1/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]SLF4J:参见http://www.slf4j.org/codes.html#multiple_bindingsforanexplanation.SLF4J:Actualbindingisoftype[ch.qos.logback.classic.util.ContextSelectorStaticBinder]这个错误是maven中多个日志框架导致的。后来通过依赖分析发现spring-boot-starter-data-redis也依赖spring-boot-starter-logging。解决方法也很简单。下面贴出详细代码org.springframework。bootspring-boot-starter-data-redisorg.springframework.bootspring-boot-starter-loggingorg.springframework.bootspring-boot-starter-weborg.springframework.bootspring-boot-starter-loggingorg.springframework.bootspring-boot-starter-log4j2<groupId>org.springframework.integrationspring-integration-redis2.Redis队列监听线程安全问题redis队列监听的监听机制是:使用线程监听到队列中,如果队列中有未被消费的消息,则取出该消息,并产生一个新的线程来消费该消息。如果你还记得的话,我一开始说的是因为redis的单线程特性,所以我们把它当成一个消息队列,但是如果监听器每收到一条消息,就产生一个新的线程来消费信息,这样redis的单线程特性就完全用不到了,线程安全问题也会出现。单个consumer(一个channel只有一个consumer)最简单的解决方法就是给onMessage()方法加锁,简单粗暴但是有用,但是这个方法无法控制队列监听的速率,也没有创建线程的限制最终会导致系统资源耗尽。那么如何解决这种情况呢?线程池。在为容器的配置添加监听时,RedisMessageListenerContainer类中有一个方法setTaskExecutor(ExecutortaskExecutor)来为监听容器配置线程池。配置线程池后,所有线程都会由线程池生成。因此,我们可以通过调整线程池来控制队列监听的速率。多个消费者的解决方案(一个通道有多个消费者)单个消费者的问题相对于多个消费者来说比较简单,因为Java内置的锁只能控制自己程序的运行,不能干扰其他的运行程式;然而,现在很多时候我们是在分布式环境中开发,处理多个消费者是有意义的。那么如何解决这个问题呢?分布式锁。简单介绍一下什么是分布式锁:分布式锁是指在分布式环境下,同一时刻只有一个客户端可以从共享环境(如redis)获取锁,只有获取到锁的客户端才能执行程序。那么分布式锁一般需要满足:独占性(即同一时间只能有一个客户端获取锁)、避免死锁(即超时后自动释放)、高可用(即获取或获取锁的机制)释放锁一定要高可用,性能好。)上面讲依赖的时候,我们导入了一个spring-integration-redis依赖,里面包含了很多有用的工具类,而接下来要讲的分布式锁,就是下面的一个工具包RedisLockRegistry这种依赖。先说说怎么用吧。导入依赖后,首先配置一个Bean@BeanpublicRedisLockRegistryredisLockRegistry(RedisConnectionFactoryfactory){returnnewRedisLockRegistry(factory,"demo-lock",60);}RedisLockRegistry构造函数,第一个参数是redis连接池,第一个第二个参数是lock,即要取出的锁,键名是“demo-lock:KEY_NAME”,第三个参数是锁的过期时间(秒),默认为60秒,当锁持有时间为超过这个时间自动失效。使用锁的方法,下面是修改监听器@ComponentpublicclassRedisListenerimplementMessageListener{@AutowriteprivateRedisLockRegistryredisLockRegistry;privatestaticfinalLoggerLOGGER=LoggerFactory.getLogger(RedisListener.class);@OverridepublicvoidonMessage(Messagemessage,byte[]pattern){Lockist"locktain"ry;try{lock.lock();//锁定LOGGER.debug("从消息通道监听的消息={}",newString(pattern));LOGGER.debug("从消息通道监听的消息={}",newString(message.getChannel()));LOGGER.debug("metamessage={}",newString(message.getBody()));//新建一个对象进行反序列化,注意这里的对象必须和之前配置的一样//因为我前面设置的默认序列化方式是GenericJackson2JsonRedisSerializer//这里的实现方式是GenericJackson2JsonRedisSerializerRedisSerializerserializer=newGenericJackson2JsonRedisSerializer();LOGGER.debug("Deserializedmessage={}",serializer.deserialize(message.getBody()));}catch(Exceptione){e.printStackTrace();}finally{lock.unlock();//unlock}}}与前面的监听代码相比,上面代码的代码就是多了一个注入的RedisLockRegistry,一个通过redisLockRegistry.obtain()方法获取锁,一个锁住另一个Unlock,然后这样就完成了分布式锁的使用。注意,获取锁的方法redisLockRegistry.obtain()返回的是一个名为RedisLock的锁,它是一个私有的内部类,实现了Lock接口,所以我们不能从代码外部创建它的实例,只能获取这个锁通过obtian()方法。这就是本文的全部内容。本文来自作者投稿,原作者:小胖儿,转载请注明出处和作者。【本文为专栏作家霍利斯原创文章,作者微信公众号Hollis(ID:hollishuang)】点此阅读更多本作者好文