来源:https://zhenbianshu.github.io/将上游系统中相似或重复的请求合并后发送给下游系统,可以大大降低下游系统的负载,提高整体系统的吞吐量。文章介绍了hystrixcollapser、ConcurrentHashMultiset和自实现的BatchCollapser三种请求合并技术,并通过它们的具体实现比较了它们的适用场景。前言在我们的工作中,我们常见的请求模型是“请求-响应”,即在一个请求中,服务为请求分配一个独立的线程和独立的内存空间,所有的操作都是独立的,包括资源和系统。手术。我们也知道在请求中处理一个系统I/O的消耗是非常大的。如果有很多请求执行相同类型的I/O操作,是否可以将这些I/O操作组合起来执行一个I/O操作,是否可以大大减轻下游资源服务器的负担?最近,我把大部分业余时间都花在了探索这个问题上。对比了现有的几个类库,为了解决一个小问题,翻了一下hystrixjavanica的代码。业务需求实现了一个简单的合并类,收获还是挺大的。也许这个要求有点“人迹罕至”。网上搜索的结果不多,也没有全面的资料。自己简单总结分享出来,希望能帮助到后面遇到这个问题的朋友。HystrixCollapserHystrix开源的请求合并库(知名度很高)貌似只有Netflix开源的Hystrix。hystrix专注于在高并发环境下保持WEB服务器的系统稳定。我们经常使用它的断路器(CircuitBreaker)来实现服务在灾难时的服务隔离和降级。有了它,整个系统就不会因为某个接口的高并发洪水而崩溃。即使接口宕机,也可以降级服务,返回人性化的响应。作为保证下游服务稳定性的利器,在hystrix中实现请求合并也就不足为奇了。我们在使用hystrix的时候,往往会使用它的javanica模块,以注解的形式编写hystrix代码,让代码更简洁,对业务代码的侵入性更小。所以在项目中我们一般至少需要引用两个包hystrix-core和hystrix-javanica。另外,hystrix是通过AOP实现的,我们需要在项目xml中显式配置HystrixAspectbean来开启。collapserhystrixcollapser是hystrix中的请求合并,它有自定义的那里是BatchMethod和注解的两个实现方法。网上有各种自定义BatchMethod的教程。实现起来非常复杂,需要大量的手写代码。注解方法只需要加两行注解即可,但是我在官方文档中找不到配置方法。看吧,这篇文章应该是唯一的中文了。实现需要注意:我们在需要合并的方法上加上@HystrixCollapser注解,在定义的合并方法上加上@HystrixCommand注解;single方法只能传入一个参数,如果有多个参数,我们需要自己包装一个参数类,而batch方法需要java.util.List;single方法返回java.util.concurrent.Future,batch方法返回java.util.List,返回结果个数和传入参数个数相同。下面是一个简单的例子://single方法不会执行到}publicListbatch(Listinputs){returninputs.stream().map(it->Boolean.TRUE).collect(Collectors.toList());}}源码实现为了解决hystrixcollazer的配置问题,阅读hystrixjavanica这里简单总结一下hystrixrequestcombiner的具体实现。详细的源码分析在我的笔记:Hystrixcollasper源码分析。在spring-boot中注册切面类的bean,其中包含@HystrixCollapser注解的切面;方法执行过程中检测到方法被HystrixCollapser注解后,spring调用methodsAnnotatedWithHystrixCommand方法执行hystrix代理;hystrix获取一个collazer实例(在当前scope中检测不到,则创建);hystrix将当前请求的参数提交给collapser,collapser将它们存储在一个concurrentHashMap(RequestArgumentType->CollapsedRequest)中。该方法会创建一个Observable对象,并返回一个观察该对象的Future给业务线程;在创建collpser时,它会创建一个定时器线程来定期消费存储的请求。定时器会将多个请求构造成一个合并请求,调用批量执行,将结果依次映射到输出参数,并通知Future任务已经完成。需要注意的是,因为需要等待定时器执行真正的请求操作,collazer会导致所有请求的开销增加大约timerInterval/2ms;hystrixcollapser的配置需要用在@HystrixCollapser注解上,主要包括两部分,hystrixCommand的专有配置和通用配置;专有配置包括:collazerKey,这个可以不用配置,hystrix会默认使用当前的方法名;batchMethod,配置批处理方法名,我们一般将单方法和批处理方法定义在同一个类中,直接填方法名即可;scope,最差的配置项,也是逼我读源码的罪魁祸首,com.netflix.hystrix.HystrixCollapser.Scope枚举类,有两个选项:REQUEST和GLOBAL。当scope为REQUEST时,hystrix会为每个请求创建一个collazer,你会发现执行batch方法时,传入的请求数始终为1。而且REQUEST项还是默认项,不明白requestmerging是什么意思;collapserProperties,我们可以在这个选项中配置hystrixCommand的通用配置;通用配置包括:maxRequestsInBatch,构造批量请求时使用的最大单次请求数;timerDelayInMilliseconds,这个选项配置collazer的定时器线程多久合并一次请求;requestCache.enabled,配置提交请求时是否缓存;一个完整的配置如下:@HystrixCollapser(batchMethod="batch",collazerKey="single",scope=com.netflix.hystrix.HystrixCollapser.Scope.GLOBAL,collapserProperties={@HystrixProperty(name="maxRequestsInBatch",value="100"),@HystrixProperty(name="timerDelayInMilliseconds",value="1000"),@HystrixProperty(name="requestCache.enabled",value="true")})BatchCollapserdesign由于业务需求,我们不'不太关心合并请求的返回值,我们认为hystrix没有必要保留那么多Futures,所以我们实现了一个简单的requestcombiner,业务线程简单的把请求放到一个容器中,当请求数量累积到一定数量或延迟一定时间,将容器中的数据取出,统一发送给下游系统。设计思想类似于hystrix。combiner有一个字段作为存储请求的容器,设置一个定时器线程定时消费容器中的请求,业务线程将请求参数提交给combiner的容器。不同的是,业务线程向容器提交请求后,立即同步返回成功,而不管请求的消费结果,从而实现了时间维度上的合并触发。此外,我还添加了另一个维度触发条件。每次将请求参数添加到容器中,都会检查容器中的请求数。如果数量达到一定的阈值,就会在业务线程中合并执行一次。由于有两个维度触发了merge,所以难免会有线程安全问题。为了保证容器中的请求不会被多个线程重复消费或遗漏,我需要一个满足以下条件的容器:它是一个Collection,类似于ArrayList或者Queue,可以存储重复的元素,并且有一个命令;在多线程环境下,无需自己实现锁,就可以安全地将里面的所有数据取出来消费。java.util.concurrent包中的LinkedBlockingDeque正好满足要求。首先,它实现了BlockingDeque接口,多线程环境下的访问操作是安全的;另外,它还提供了drainTo(Collectionc,intmaxElements)方法,可以放心的取出容器中的maxElements个元素放到Collectionc中。实现以下是具体的代码实现:publicclassBatchCollapserimplementsInitializingBean{privatestaticfinalLoggerlogger=LoggerFactory.getLogger(BatchCollapser.class);privatestaticvolatileMapinstance=Maps.newConcurrentMap();privatestaticfinalScheduledExecutorServiceSCHEDULE_EXECUTOR=Executors.newScheduledThreadPool(1);privatevolatileLinkedBlockingDequebatchContainer=newLinkedBlockingDeque<>();privateHandler,Boolean>清洁器;私有长间隔;私有intthreshHold;privateBatchCollapser(Handler,Boolean>cleaner,intthreshHold,longinterval){this.cleaner=cleaner;this.threshHold=threshHold;this.interval=间隔;}@OverridepublicvoidafterPropertiesSet()throwsException{SCHEDULE_EXECUTOR.scheduleAtFixedRate(()->{尝试{这个.clean();}catch(Exceptione){logger.error("cleancontainerexception",e);}},0,时间间隔,TimeUnit.MILLISECONDS);}publicvoidsubmit(Eevent){batchContainer.add(event);如果(batchContainer.size()>=threshHold){clean();}}privatevoidclean(){ListtransferList=Lists.newArrayListWithExpectedSize(threshHold);batchContainer.drainTo(transferList,100);如果(CollectionUtils.isEmpty(transferList)){返回;}试试{cleaner.handle(transferList);}catch(Exceptione){logger.error("批量执行错误,transferList:{}",transferList,e);}}publicstaticBatchCollapsergetInstance(Handler,Boolean>cleaner,intthreshHold,longinterval){类jobClass=cleaner.getClass();if(instance.get(jobClass)==null){synchronized(BatchCollapser.class){if(instance.get(jobClass)==null){instance.put(jobClass,newBatchCollapser<>(cleaner,threshHold,interval));}}}returninstance.get(jobClass);以下代码需要注意的地方:由于combiner的全局要求,需要将combiner实现为单例,为了提高其通用性,内部使用concurrentHashMap和doublecheck实现了一个简单的单例工厂。为了区分不同用途的组合器,工厂需要传入一个实现了Handler的实例,并按照实例的类对请求进行分组存储。由于java.util.Timer的阻塞特性,一个Timer线程在阻塞时不会启动另一个相同的Timer线程,所以使用ScheduledExecutorService定时启动Timer线程。ConcurrentHashMultiset设计上面说的请求合并就是一次发送多个请求,下游服务器在处理的时候本质上还是有多个请求。最好的请求合并是在内存中进行的,请求结果简单的合并为一个发送给下游服务器。比如我们经常遇到需求:元素分数累加或者数据统计,可以先在内存中累加某一项的分值或者数据,周期性的请求数据库保存。Guava提供了这样一个数据结构:ConcurrentHashMultiset,不同于普通的set结构在存储相同元素时直接覆盖原元素,而是为每个元素保留一个计数,插入重复时元素的计数值加1.并且在增删改查时不用加锁就可以保证线程安全。具体实现是通过一个while(true)循环尝试操作,直到需要的操作次数足够为止。ConcurrentHashMultiset的特性非常适合数据统计在短时间内重复率很高的场景。统计重复次数后,可以大大减轻下游服务器的压力。即使重复率不高,可以用少量的内存空间来换取系统可用性的提升,也是非常划算的。使用ConcurrentHashMultiset进行请求合并和使用普通容器在整体结构上没有太大区别,具体类似于:elementSet().forEach(request->{intcount=ConcurrentHashMultiset.count(request);if(count<=0){return;}transferList.add(count==1?request:newRequest(request.getIncrement()*count));ConcurrentHashMultiset.remove(request,count);});总结最后总结一下每种技术的适用场景:hystrixcollapser:需要每次请求的结果,并不关心每次请求的成本会增加;BatchCollapser:不关心请求的结果,需要在时间和数量两个维度合并请求触发;ConcurrentHashMultiset:统计请求重复率较高的场景;另外,如果选择自己实现,可以结合BatchCollapser和ConcurrentHashMultiset,在BatchCollapser中使用ConcurrentHashMultiset作为容器,这样可以结合两者的优点。近期热点文章推荐:1.1000+Java面试题及答案(2022最新版)2.厉害了!Java协程来了。..3.SpringBoot2.x教程,太全面了!4.不要用爆破爆满画面,试试装饰者模式,这才是优雅的方式!!5.《Java开发手册(嵩山版)》最新发布,赶快下载吧!感觉不错,别忘了点赞+转发!