这是我参加8月更新挑战的第12天。主要基于Netty和ProjectReactor实现的异步连接池。因为是基于ProjectReactor的,所以可以直接在spring-webflux的异步项目中使用。当然也提供同步接口。在我们的微服务项目中,使用了SpringBoot和SpringCloud。并使用spring-data-redis作为连接Redis的库。而连接池使用的是Lettuce。同时我们线上的JDK是OpenJDK11LTS版本,每个进程都开启了JFR记录。关于JFR,可以参考这个系列:JFRFullSolution在Lettuce6.1之后,Lettuce也引入了基于JFR的监听事件。参考:events.flight-recorderRedis连接相关事件:ConnectEvent:尝试与Redis建立连接时发出此事件。ConnectedEvent:连接建立时会发送的事件,包括建立连接使用的远程IP和端口,使用的RedisURI。对应Netty,其实就是ChannelHandler中channelActive回调开始时会发送的事件。ConnectionActivatedEvent:完成对Redis连接的一系列初始化操作(如SSL握手、发送PING心跳命令等)后,可以使用该连接在执行Redis命令时发出事件。ConnectionDeactivatedEvent:在没有任何命令被处理且isOpen()为false的情况下,连接不活跃,即将关闭。这个时候就会发出这个事件。DisconnectedEvent:当连接实际关闭或重置时,会发出此事件。ReconnectAttemptEvent:Lettuce中的Redis连接会保持为长连接。当连接丢失时,它会自动重新连接。当需要重新连接时,将发送此事件。ReconnectFailedEvent:重连失败时,发送该事件。Redis集群相关事件:AskRedirectionEvent:当Redis槽处于迁移状态时会返回ASK,此时会发出该事件。MovedRedirectionEvent:当Redis槽不在当前节点上时会返回MOVED,此时会发出该事件。TopologyRefreshEvent:如果开启集群拓扑刷新定时任务,查询集群拓扑时会发送该事件。不过这个需要在配置中开启定时检查集群拓扑的任务,参考cluster-topology-refreshClusterTopologyChangedEvent:当Lettuce发现Redis集群的拓扑发生变化时,会发送这个事件。Redis命令相关事件:CommandLatencyEvent:Lettuce会统计每个命令的响应时间,并定时发出这个事件。这个也需要手动配置开启,后面会讲到如何开启。CommandStartedEvent:当命令开始执行时发送此事件。CommandSucceededEvent:当命令执行成功时会发出此事件。CommandFailedEvent:当命令执行失败时发送该事件。Lettuce的监控是基于事件分发和监控机制的设计。它的核心接口是EventBus:EventBus.javapublicinterfaceEventBus{//获取Flux,通过Flux订阅,允许多个订阅者Fluxget();//发布事件voidpublish(Eventevent);}复制代码默认实现是DefaultEventBus,publicclassDefaultEventBusimplementsEventBus{privatefinalDirectProcessorbus;privatefinalFluxSinksink;privatefinalSchedulerscheduler;privatefinalEvent=RecorderEventRecorder.getInstance();publicDefaultEventBus(Schedulerscheduler){this.总线=DirectProcessor.create();this.sink=bus.sink();this.scheduler=scheduler;}@OverridepublicFluxget(){//如果消费失败,直接丢弃returnbus.onBackpressureDrop().publishOn(scheduler);}@Overridepublicvoidpublish(Eventevent){//调用记录器进行记录recorder.record(event);//调用recorder记录后,然后Publisheventsink.next(event);}}复制代码在默认实现中,我们发现要发布一个事件,必须先调用recorder记录,然后再放进入FluxSink进行事件发布。目前recorder的实际实现是基于JFR的JfrEventRecorder。查看源码:JfrEventRecorderpublicvoidrecord(Eventevent){LettuceAssert.notNull(event,"Eventmustnotbenull");//使用Event创建对应的JFREvent,然后直接commit,即提交这个JFR事件到JVM的JFR记录jdk.jfr.EventjfrEvent=createEvent(event);if(jfrEvent!=null){jfrEvent.commit();}}privatejdk.jfr.EventcreateEvent(Eventevent){try{//获取构造函数,如果构造函数是Object的构造函数,说明还没有找到这个Event对应的JFREvent构造函数。构造函数>构造函数=getEventConstructor(事件);if(constructor.getDeclaringClass()==Object.class){返回null;}//使用构造函数创建JFREventreturn(jdk.jfr.Event)constructor.newInstance(event);}catch(ReflectiveOperationExceptione){thrownewIllegalStateException(e);}}//Event对应JFREvent构造函数缓存privatefinalMap,Constructor>>constructorMap=newHashMap<>();privateConstructor>getEventConstructor(Eventevent)抛出NoSuchMethodExceptionn{Constructor>constructor;//简单来说就是检查缓存Map中是否有该类对应的JFREvent构造函数,有则返回,没有则尝试寻找synchronized(constructorMap){constructor=constructorMap.get(event.getClass());}if(constructor==null){//这个发现的方法比较粗糙,直接找Jfr开头后面跟当前类名的类与当前Event同包路径下的Event是否存在//如果存在则获取其第一个构造函数(无参构造函数),如果不存在则返回ObjectString的构造函数jfrClassName=event.getClass().getPackage().getName()+".Jfr"+事件.getClass().getSimpleName();类>eventClass=LettuceClassUtils.findClass(jfrClassName);如果(eventClass==null){constructor=Object.class.getConstructor();}else{constructor=eventClass.getDeclaredConstructors()[0];构造函数.setAccessible(true);}synchronized(constructorMap){constructorMap.put(event.getClass(),constructor);}}returnconstructor;}复制代码,发现这段代码不是很好,每次读取都需要获取锁,于是做了一些修改,提交了一个PullRequest:reformgetEventConstructorforJfrEventRecordernottosynchronizeforeachread从这里可以知道一个Event是否有对应的JFREvent,通过查看是否有路径相同的以Jfr开头,后面跟着自己名字的class。目前可以发现:io.lettuce.core.event.connection包:ConnectedEvent->JfrConnectedEventConnectEvent->JfrConnectedEventConnectionActivatedEvent->JfrConnectionActivatedEventConnectionCreatedEvent->JfrConnectionCreatedEventConnectionDeactivatedEvent->JfrConnectionDeactivatedEventDisconnectedEvent->JfrDisconnectedEventReconnectAttemptEvent->JfrReconnectAttemptEventReconnectFailedEvent->JfrReconnectFailedEventio.lettuce.core.cluster.event包:AskRedirectionEvent->JfrAskRedirectionEventClusterTopologyChangedEvent->JfrClusterTopologyChangedEventMovedRedirectionEvent->JfrMovedRedirectionEventAskRedirectionEvent->JfrTopologyRefreshEventio.lettuce.core.event.command包:CommandStartedEvent->无CommandSucceededEvent->无CommandFailedEvent->无io.lettuce.core.event.metrics包:、CommandLatencyEvent->无可以看到目前JFR对指令的监控是没有的,但是对我们来说,指令监控是最重要的。我们考虑为命令相关的事件添加JFR对应的事件。如果在io.lettuce.core.event.command包下为命令事件生成相应的JFR,那么事件的数量就有点多了(我们的应用实例可能会执行几十万条Redis指令)。所以我们倾向于为CommandLatencyEvent添加JFR事件。CommandLatencyEvent包含一个Map:privateMaplatencies;复制代码,其中CommandLatencyId包含Redis连接信息和执行的命令。CommandMetrics是时间统计,包括:接收Redis服务器响应的时间指标,通过它可以判断Redis服务器响应是否慢。处理完Redis服务器响应的时间索引后,可能由于应用实例繁忙,响应还没有处理完。通过与Redis服务器收到响应的时间指标进行比较,我们可以判断应用处理??所花费的时间。这两个指标包括以下信息:最短时间和最长时间百分位时间,默认为top50%、top90%、top95%、top99%、top99.9%,对应源码:MicrometerOptions:publicstaticfinaldouble[]DEFAULT_TARGET_PERCENTILES=newdouble[]{0.50,0.90,0.95,0.99,0.999};我们想通过JFR查看各个不同Redis服务器的各个命令在一段时间内响应时间指标的统计情况,可以这样实现:packageio.lettuce.core.event.metrics;importjdk.jfr。类别;导入jdk.jfr.Event;导入jdk.jfr.Label;导入jdk.jfr.StackTrace;@Category({“Lettuce”,“命令事件”})@Label(“命令延迟触发器”)@StackTrace(false)publicclassJfrCommandLatencyEventextendsEvent{privatefinalintsize;publicJfrCommandLatencyEvent(CommandLatencyEventcommandLatencyEvent){this.size=commandLatencyEvent.getLatencies()commandLatencyEvent.getLatencies().forEach((commandLatencyId,commandMetrics)->{JfrCommandLatencyjfrCommandLatency=newJfrCommandLatency(commandLatencyId,commandMetrics);jfr.Command();延迟});}}复制代码包io.lettuce.core.event。;进口io.lettuce.core.metrics.CommandLatencyId;导入io.lettuce.core.metrics.CommandMetrics;导入jdk.jfr.Category;导入jdk.jfr.Event;导入jdk.jfr.Label;导入jdk.jfr.StackTrace;导入java.util.concurrent.TimeUnit;@Category({"Lettuce","CommandEvents"})@Label("CommandLatency")@StackTrace(false)publicclassJfrCommandLatencyextendsEvent{privatefinalStringremoteAddress;privatefinalStringcommandType;私有最终长计数;私有最终TimeUnittimeUnit;私有最终长firstResponseMin;私有最终长firstResponseMax;私有最终字符串firstResponsePercentiles;私有最终长完成响应最小值;私有最终长完成响应最大值;私有最终字符串完成响应百分比;公共JfrCommandLatency(CommandLatencyIdcommandLatencyId,CommandMetricscommandMetrics){this.remoteAddress=commandLatencyId.remoteAddress().toString();this.commandType=commandLatencyId.commandType().toString();这个.c计数=commandMetrics.getCount();this.timeUnit=commandMetrics.getTimeUnit();this.firstResponseMin=commandMetrics.getFirstResponse().getMin();this.firstResponseMax=commandMetrics.getFirstResponse().getMax();commandMetrics.getFirstResponse().getPercentiles().toString();this.completionResponseMin=commandMetrics.getCompletion().getMin();this.completionResponseMax=commandMetrics.getCompletion().getMax();this.completionResponsePercentiles=commandMetrics.getCompletion().getPercentiles().toString();}}复制代码这样,我们可以这样分析这些事件:首先,在事件浏览器中,选择Lettuce->CommandEvents->CommandLatency,右键使用事件创建新页面:在创建的事件页面中,按commandType分组,图表中显示感兴趣的指标:对于这些修改,我也提交了一个PullRequest到community:fix#1820为SpringBoot中的命令延迟添加JFR事件(即添加了spring-boot-starter-redis依赖),我们需要手动打开CommandLatencyEvent的集合:@Configuration(proxyBeanMethods=false)@Import({LettuceConfiguration.class})//需要在RedisAuto中进行强制自动加载配置@AutoConfigureBefore(RedisAutoConfiguration.class)publicclassLettuceAutoConfiguration{}复制代码;导入io.lettuce.core.resource.DefaultClientResources;导入org.springframework.context.annotation.Bean;导入org.springframework.context.annotation.Configuration;导入java.time.Duration;@Configuration(proxyBeanMethods=false)公共类LettuceConfiguration{/***每10s收集一次命令统计信息*@return*/@BeanpublicDefaultClientResourcesgetDefaultClientResources(){DefaultClientResourcesbuild=DefaultClientResources.builder().commandLatencyRecorder(newDefaultCommandLatencyCollector(//启用CommandLatency事件收集,并配置每次收集清除数据毕竟DefaultCommandLatencyCollectorOptions.builder().enable().resetLatenciesAfterEvent(true).build())).commandLatencyPublisherOptions(//每10s收集一次命令统计DefaultEventPublisherOptions.builder().eventEmitInterval(Duration.ofSeconds(10)).build()).建造();returnbuild;}}如果您觉得这篇文章对您有用,请给我们的开源项目一个小星星:http://github.crmeb.net/u/defu非常感谢!