当前位置: 首页 > 科技观察

实践学校-在Java项目中玩转Redis6.0客户端缓存!

时间:2023-03-18 14:08:46 科技观察

大家好,我是九头蛇。在上一篇文章中,我们介绍了Redis6.0中客户端缓存的新特性,通过telnet连接模拟客户端,测试了客户端缓存的三种工作模式。查战看看java项目中客户端缓存应该怎么实现。铺垫首先,我们今天要用到的工具lettuce是一个可扩展的线程安全的redis客户端。多个线程可以共享同一个RedisConnection,使用nio框架Netty高效管理多个连接。再看常用的redis客户端开发包,虽然可以用的很多,但是目前最先拥抱redis6.0的,支持客户端缓存功能的并不多,lettuce是其中的佼佼者。我们先在项目中引入最新版本的依赖,然后正式开始实战:io.lettucelettuce-core6.1.8.RELEASE在项目中实际应用lettuce,启用并使用客户端缓存功能,只需要如下一段代码:publicstaticvoidmain(String[]args)throwsInterruptedException{//创建RedisClient连接信息RedisURIredisURI=RedisURI.builder().withHost("127.0.0.1").withPort(6379).build();RedisClient客户端=RedisClient.create(redisURI);StatefulRedisConnectionconnect=client.connect();Mapmap=newHashMap<>();CacheFrontendfrontend=ClientSideCaching.enable(CacheAccessor.forMap(map),connect,TrackingArgs.Builder.enabled().noloop());字符串键=“用户”;while(true){Stringvalue=frontend.get(key);System.out.println(值);时间单位.SECONDS.睡眠(10);}}上面的代码主要完成几个任务:通过RedisURI配置redis连接的标准信息,建立连接并创建一个Map作为本地缓存,开启客户端缓存功能,创建缓存访问器CacheFrontend。在循环中使用CacheFrontend,不断查询并打印同一个key对应的value。启动上面的程序,控制台会不断打印用户对应的缓存。启动一段时间后,我们在其他客户端修改该用户对应的值。运行结果如下:可以看到在其他客户端修改了key,对应的value修改后,打印结果也随之改变。但是到这里为止,我们并不知道lettuce是否真的使用了客户端缓存。虽然结果是正确的,但也许它每次都重新执行get命令?那么我们就来看看源码,分析一下具体的代码执行流程。分析上面代码中,最关键的类是CacheFrontend。我们仔细看一下上面具体的实例化语句:首先调用ClientSideCaching的enable()方法,我们看一下它的源码:解释一下传入的3个参数:CacheAccessor:一个客户端缓存访问接口的定义,上面调用它的forMap方法返回一个MapCacheAccessor,其中底层使用我们自定义的Map来存放本地缓存,并提供了get、put、evict等方法来操作Map。StatefulRedisConnection:使用的redis连接。TrackingArgs:客户端缓存的参数配置。使用noloop后,当前连接修改key后不会收到通知。向redis服务器发送启用跟踪的命令后,继续向下调用create()方法:这个过程中实例化了一个重要的对象,就是实现了RedisCache接口的DefaultRedisCache对象,实际查询redis时的get请求,写的put请求都是由它完成的。实例化完成后,继续调用同名的create()方法:在该方法中,实例化了ClientSideCaching对象。注意传入的两个参数,通过前面的介绍也很容易理解它们的分工:当本地缓存存在时,直接从CacheAccessor读取。当本地缓存不存在时,使用RedisCache从服务器读取。需要特别注意的是返回前的两行代码,先看第一句(第114行)。在这里,为RedisCache添加了一个监视器。当监听到invalidate类型的失效消息时,获取待失效的key传递给消费者。一般来说,keys中只会有一个元素。消费时,会遍历当前ClientSideCaching消费者列表invalidationListeners:,而这个列表中的所有内容都是在上面第二行代码(115行)添加的,看方法的定义:而实际传入的方法引用是下面的MapCacheAccessor的evict()方法,也就是说当收到key失效报文时,本地缓存Map中缓存的数据会被移除。客户端缓存的失效逻辑我们已经梳理好了,接下来看什么时候写的,直接看ClientSideCaching的get()方法:可以看到,get方法会先尝试从本地获取缓存MapCacheAccessor,如果获取到则直接返回,如果不使用RedisCache则读取redis中的缓存,将返回的结果存储到MapCacheAccessor中。看到这里的图源码,是不是基本逻辑串联了?再画两张图梳理一下流程。先看get过程:再看缓存失败通知客户端的过程:怎么样?让我们通过这两张图来理解它。完美吗?其实不是……回想一下我们之前使用二级缓存Caffeine+Redis的时候,当时使用的通知机制会在redis缓存修改后通知所有主机修改本地缓存,修改会变成最新值。从目前的lettuce来看,它显然不满足这个功能,它只能使缓存失效和删除而不会主动更新。扩展那么,如果我们要实现本地客户端缓存的实时更新,在现有基础上应该如何扩展呢?仔细想想,思路也很简单:首先,去掉lettuce本身自带的客户端缓存。Invalidatedmessagelistener然后,添加我们自己的invalidatedmessagelistener回头看上面的源码分析图,在调用DefaultRedisCache的addInvalidationListener()方法时,其实是在调用StatefulRedisConnection的addListener()方法,也就是说这个监听器服务器实际上是添加到redis连接。如果我们再看这个方法的源码,就会发现在它附近有一个对应的removeListener()方法。乍一看,这就是我们要找的东西,我们准备用它来解除消息监听。但是仔细一看,这个方法是需要传参的。我们显然不知道里面已经存在什么PushListener,所以不能直接使用,只好继续往下看这个pushHandler是什么了。。。从注释中可以知道这个PushHandler是一个操作PushListeners的处理工具。虽然我们不知道要移除哪个PushListener,但令人惊讶的是它提供了一个getPushListeners()方法来获取当前的所有PushListener。听众。这样,就简单了。我直接上去清除了这个集合中的所有监听器,问题就解决了~不过StatefulRedisConnectionImpl中的pushHandler是私有对象,没有对外暴露。运营它仍然需要很多钱。努力一点。接下来,我们根据分析结果修改代码。首先,我们需要自定义一个工具类。它的主要功能是操作监听器,所以取名为ListenerChanger。它主要完成三个功能:清除所有原有消息的监听。添加新的自定义消息侦听器。更新本地缓存MapCacheAccessor中的数据。首先定义构造方法,需要传入StatefulRedisConnection和CacheAccessor作为参数,在后面的方法中会用到,后面创建一个RedisCommands向redis服务器发送get命令请求。公共类ListenerChanger{私有StatefulRedisConnection连接;私有缓存访问器映射缓存访问器;私有RedisCommands命令;publicListenerChanger(StatefulRedisConnectionconnection,CacheAccessormapCacheAccessor){this.connection=connection;this.mapCacheAccessor=mapCacheAccessor;this.command=connection.sync();}//先省略其他方法...}去掉监听前面说了,pushHandler是私有对象,我们不能直接获取和操作,只能先用反射获取。PushHandler中的侦听器列表存储在一个CopyOnWriteArrayList中,我们只是使用迭代器删除所有内容。publicvoidremoveAllListeners(){try{ClassconnectionClass=StatefulRedisConnectionImpl.class;字段pushHandlerField=connectionClass.getDeclaredField("pushHandler");pushHandlerField.setAccessible(true);PushHandlerpushHandler=(PushHandler)pushHandlerCollectionField.get);Array.connectpushListeners=(CopyOnWriteArrayList)pushHandler.getPushListeners();Iteratorit=pushListeners.iterator();while(it.hasNext()){PushListener监听器=it.next();pushListeners.remove(监听器);}}catch(NoSuchFieldException|IllegalAccessExceptione){e.printStackTrace();}}添加监听这里模仿DefaultRedisCache中addInvalidationListener()方法的写法,添加监听,只是最后的处理代码基本相同。对于要失效的监控键集,启动另一个线程来更新本地数据。publicvoidaddNewListener(){this.connection.addListener(newPushListener(){@OverridepublicvoidonPushMessage(PushMessagemessage){if(message.getType().equals("invalidate")){Listcontent=message.getContent(StringCodec.UTF8::decodeKey);Listkeys=(List)content.get(1);System.out.println("modifyKeys:"+keys);//开始一个新线程toupdatecacheAccessornewThread(()->updateMap(keys)).start();}}});}本地更新使用RedisCommands重新从redis服务器获取最新数据,更新本地缓存mapCacheAccessor中的数据.privatevoidupdateMap(Listkeys){for(Kkey:keys){VnewValue=this.command.get(key);System.out.println("新值:"+新值);mapCacheAccessor.put(key,newValue);}}至于为什么这个方法执行的时候会启动一个新的线程,是因为我在测试中发现在PushListener的onPushMessage方法中执行RedisCommands的get()方法时,是永远获取不到值的。但是像这样开启新线程是没有问题的。测试接下来,让我们编写一个测试代码来测试上面的变化。publicstaticvoidmain(String[]args)throwsInterruptedException{//省略前面的连接创建代码...Mapmap=newHashMap<>();CacheAccessormapCacheAccessor=CacheAccessor.forMap(地图);CacheFrontendfrontend=ClientSideCaching.enable(mapCacheAccessor,connect,TrackingArgs.Builder.enabled().noloop());ListenerChangerlistenerChanger=newListenerChanger<>(connect,mapCacheAccessor);//移除原来的监听器listenerChanger.removeAllListeners();//添加一个新的监听器listenerChanger.addNewListener();字符串键=“用户”;while(true){Stringvalue=frontend.get(key);系统。out.println(值);TimeUnit.SECONDS.sleep(30);}}可以看出,代码在之前的基础上基本没有改动,只是在创建ClientSideCaching之后,执行了自己实现的ListenerChanger两种方式。在添加新的侦听器之前删除所有侦听器。接下来我们以debug模式启动测试代码,简单看一下代码的执行逻辑。首先,在执行remove操作之前,pushHandler中的listener列表中有一个listener:remove之后,listener列表为空:添加自定义监听器并执行第一次查询操作后,在另一个redis客户端修改user的值.这时PushListener会收到一个obsolete类型的消息监听:启动一个新的线程,在redis中查询user对应的最新值,放入cacheAccessor:当循环中CacheFrontend的get()方法为再次执行,会直接从cacheAccessor中获取刷新后的值,不需要再次访问redis服务器:综上所述,我们对基于lettuce的客户端缓存的基本使用,并在此基础上进行的魔改,基本就是完全的。可以看出,lettuce客户端在底层封装了比较成熟的API,可以让我们在redis升级到6.0后开箱即用地使用客户端缓存的新特性。在使用中,我们不需要关注底层原理,也不需要对业务逻辑做任何改造。总的来说,使用起来还是比较愉快的。