当前位置: 首页 > 后端技术 > Java

看到一个神奇修改过的线程池,面试素材加一!

时间:2023-04-02 09:17:12 Java

大家好,我是伟伟。今天给大家分享一个扩展的线程池,我觉得扩展的思路很好。别担心,我是头条党。我想没有人会在面试中测试这个东西,但是在工作中遇到相应的场景是有可能的。为了介绍这个线程池,我来创建一个场景,方便大家理解。以下面的表情符号为例。假设我们有两个程序员,姑且称他们为富贵和旺财吧。上面这个表情包就是这两个程序员一天工作的写照,用程序表达是这样的。首先,我们创建一个对象来表示当时程序员正在做什么:publicclassCoderDoSomeThing{privateStringname;私有字符串doSomeThing;publicCoderDoSomeThing(Stringname,StringdoSomeThing){this.name=name;this.doSomeThing=doSomeThing;}}然后,用代码来描述富贵和旺财做了什么:CoderDoSomeThingrich2=newCoderDoSomeThing("Rich","搞数据库,连tomcat和crud都输出");CoderDoSomeThingrich3=newCoderDoSomeThing("有钱","疯狂嘴角");CoderDoSomeThingrich4=newCoderDoSomeThing("Rich","接口访问错误");CoderDoSomeThingrich5=newCoderDoSomeThing("富贵","心态崩了,卸载Idea");CoderDoSomeThingwww1=newCoderDoSomeThing("旺财","开始创意");CoderDoSomeThingwww2=newCoderDoSomeThing("旺财","搞数据库,连tomcat和crud都输出");CoderDoSomeThingwww3=newCoderDoSomeThing("旺财","疯狂嘴角");CoderDoSomeThingwww4=newCoderDoSomeThing("旺财","接口访问错误");CoderDoSomeThingwww5=newCoderDoSomeThing("旺财","心态崩了,卸载Idea");}}简单解释一下变量名,说明我还是仔细想想Toberich就是有钱,所以变量名有钱。wangcai就是旺旺旺,所以变量名是www。看我的类名NbThreadPoolTest,就知道我要用到线程池了。实际情况下,富贵和旺财可以各自做自己的事情,互不干扰,也就是应该是各自的线程。各干各的,互不干扰,好像可以用线程池。因此,我使用线程池将程序修改为如下所示:publicclassNbThreadPoolTest{publicstaticvoidmain(String[]args){ExecutorServiceexecutorService=Executors.newFixedThreadPool(5);ListcoderDoSomeThingList=newArrayList<>();coderDoSomeThingList.add(newCoderDoSomeThing("Fugui","StartIdea"));coderDoSomeThingList.add(newCoderDoSomeThing("富贵","搞数据库,连tomcat和crud输出"));coderDoSomeThingList.add(newCoderDoSomeThing("富贵","疯狂嘴角"));coderDoSomeThingList.add(newCoderDoSomeThing("富贵","接口访问错误"));coderDoSomeThingList.add(newCoderDoSomeThing("富贵","心态崩了,卸载Idea"));coderDoSomeThingList.add(newCoderDoSomeThing("旺财","开始创意"));coderDoSomeThingList.add(newCoderDoSomeThing("旺财","搞数据库,连tomcat,crud输出"));coderDoSomeThingList.add(newCoderDoSomeThing("旺财","嘴角疯狂上扬"));coderDoSomeThingList.add(newCoderDoSomeThing("旺财","接口访问错误"));编码器DoSomeThingList.add(newCoderDoSomeThing("旺财","卸载创意"));coderDoSomeThingList.forEach(coderDoSomeThing->{executorService.execute(()->{System.out.println(coderDoSomeThing.toString());});});}}上面的程序就是把富贵和旺财做的事情封装成一个列表,然后遍历这个列表,把里面的所有东西都扔进去,也就是“做事情”到线程池上面的程序执行完后,一个可能的输出如下:.jpg)乍一看没有问题,财富和繁荣是同时做的事情。但是仔细一看,大家做事的顺序都是错的。比如旺财,看起来有点“精神分裂”。刚刚启动Idea,他的嘴角就开始疯狂上扬。所以,在这里我可以导致我想要的。我想要什么样的东西?就是要保证富贵是同时做事的,还要保证他们做事是按照一定的顺序的,也就是按照我放到线程池里的顺序执行。更正式一点的描述是这样的:我需要这样一个线程池,可以保证下发的任务按照一定的维度划分任务,然后按照任务的先后顺序依次执行提交。这个线程池可以通过并行处理(多线程)提高吞吐量,同时也保证一定范围内的任务严格按顺序运行。用我前面的例子,“按某次元”是人名,也就是富贵的次元。你怎么做呢?膳食分析后我会做什么?首先我可以肯定JDK的线程池做不到这一点。因为从线程池原理来看,并行和顺序是不能同时满足的。你明白我的意思吗?比如我要用线程池来保证顺序,那么就是这样:只有一个线程的线程池才能保证顺序。但这有意义吗?有道理,因为它不占用主线程,但是意义不大,毕竟阉割了重要的“多线程”能力。那么在这种场景下我们如何提高并行能力呢?等等,我们好像已经有一个可以保证顺序的线程池了。那如果我们横向扩展,再做几个,不就具备并行运行的能力了吗?那么前面提到的“按照某个维度”,如果有多个线程池只有一个线程,那我也可以按照这个维度来映射“维度”和“每个线程池”。程序上是这样的:标①的地方是设置多个线程池,只有一个线程,目的是为了保证消费的顺序。标②的地方是通过一个map来映射name和threadpool的关系。这只是一个提示。比如我们也可以通过用户号取模来定位对应的线程池。比如用户数是奇数就用一个线程池,用户数是偶数就用另一个线程。所以,“某一维度”有多少数据,就没有必要定义多少个只有一个线程的线程池。它们也可以重复使用。这个地方有个小转弯。标③的地方就是根据名字去映射中对应的线程池。从输出结果来看,没什么问题:看到这里有朋友会说:你这不是作弊吗?你不是说你有线程池吗?您已经制作了多个。如果你想从这个角度看事情,那就缩小路径。你得考虑有一个大的线程池,有很多线程池里面只有一个线程。这打开了模式。上面我写的方式是一个很简单的Demo,主要引出了这个方案的思路。我要介绍的是一个基于这个想法的开源项目。这是一家大公司的老板写的。看了下源码,惊呆了:写的真他妈的好。先给大家一个用例和输出结果:从案例来看,使用方法也很简单。和JDK原生用法不同的是我框架的部分。首先创建一个KeyAffinityExecutor对象来代替原生的线程池。KeyAffinityExecutor里面涉及到一个词,Affinity。翻译的意思差不多:所以KeyAffinityExecutor的翻译就是同key的线程池。当你了解它的作用和作用范围时,你会觉得名字是针而不是戳。然后调用KeyAffinityExecutor对象的executeEx方法,可以再传入一个参数,这个参数就是同一任务区分某类的维度。比如我这里给的是name字段。从用例来看,可以说是包装的非常好,开箱即用。KeyAffinityExecutor用法先说一下这个类的用法。对应的开源项目地址是这样的:https://github.com/PhantomThi...如果要使用,必须导入如下maven地址:com.github.phantomthiefmore-lambdas0.1.55核心代码就是这个接口:com.github.phantomthief.pool.KeyAffinityExecutor这个里面有很多注释界面,大家可以拉下来看看。这里主要给大家展示一下作者在界面上写的评论。他就是这样介绍他的工具的。这是一个按照指定的Keyaffinityorder消费的线程池。KeyAffinityExecutor是一个特殊的任务线程池。可以保证下发的任务按照相同Key的任务提交顺序依次执行。非常适合需要通过并行处理提高吞吐量,保证一定范围内的任务严格有序运行的场景。KeyAffinityExecutor内置的实现是将指定的Key映射到一个固定的单线程线程池,它会在内部维护多个(数量可配置)这样的单线程线程池,以保持一定的任务并行度。需要注意的是,该接口定义的KeyAffinityExecutor并不要求具有相同Key的任务运行在同一个线程上。虽然实现类可以这样实现,但不是强制要求,所以在使用的时候也请不要依赖这样的假设。很多人问,这和使用线程池数组,简单取模实现有什么区别?其实大部分场景并没有太大区别,但是当出现数据倾斜时,由于热点倾斜数据,散列到同一位置的数据可能会延迟。当并发度较低时(可以设置阈值),本实现会选择最空闲的线程池进行下发,尽量隔离倾斜的数据,减少对其他数据的影响。在作者的介绍中,简单说明了项目的应用场景和内部原理,和我们前面分析的类似。另外,还有两个地方需要特别注意。第一个地方在这里:作为区分任务维度的对象,如果是自定义对象,那么必须重写它的hashCode和equals,保证它能起到识别作用。这里的提醒和HashMap的key是对象的时候hashCode和equals方法要重写的道理是一样的。只是说了编程的基础,就不赘述了。第二名就得慎重说了,属于他的核心思想。他没有使用简单取模的方法,因为在简单取模的场景下,数据可能会出现倾斜。我个人是这样理解作者的思路的。首先,让我解释一下取模的数据倾斜。举个简单的例子:在上面的代码片段中,我添加了一个新角色“钓鱼大师”。同时在对象中添加一个id字段。假设我们对id字段取模2:那么会出现master和rich对应的id取模结果都是1,他们会共享同一个线程池。显然,由于master的频繁操作,“钓鱼”成为了热点数据,导致0号连接池发生倾斜,进而影响了富贵的正常工作。KeyAffinityExecutor的策略是什么?它会选择最空闲的线程池进行传递。你怎么理解的?还是上面的例子,如果我们构建这样一个线程池:KeyAffinityExecutorexecutorService=KeyAffinityExecutor.newSerializingExecutor(3,200,"MY-POOL-%d");第一个参数3表示会在这个线程池中建立3个只有一个线程的线程池。然后在用它提交任务的时候,由于维度是id维度,我们正好有3个id,所以这个线程池恰好是满的:此时没有数据倾斜。但是,如果我把前面构建线程池的参数从3改成2呢?KeyAffinityExecutorexecutorService=KeyAffinityExecutor.newSerializingExecutor(2,200,"MY-POOL-%d");提交方式不变,增加延迟id为1和2的任务的逻辑,观察id为3的数据如何解决:无疑,提交执行master的钓鱼操作时线程池肯定不够用,怎么办?此时,根据作者的描述,“会选择最空闲的线程池进行下发”。我用这个数据来说明:所以,在进行高手钓鱼操作时,你会选择仅有的两个选项中的一个。如何选择?选择具有最低并发性的人。由于任务有延迟时间,我们可以观察到执行财富的线程并发为5,而执行旺财的线程并发为6。因此,在执行master的钓鱼操作时,并发为将选择5个进行处理。在这种情况下,会出现数据倾斜。但是倾斜的前提变了,变成了当前没有线程可用。所以,作者说“尽力隔离倾斜数据”。这两种方案最大的区别在于线程资源的利用。如果是纯取模,那么数据倾斜时可能会有可用的线程。如果是KeyAffinityExecutor的方式,可以保证当数据发生倾斜时,线程池中的线程一定已经用完了。然后,您会细细品味这两种方案之间的细微差别。KeyAffinityExecutor的源码并不多,就那么几类:但是它的大部分源码都是用lambdas写的,基本都是函数式编程,如果你这方面比较薄弱,看起来会有点吃力。如果你想掌握它的源码,我的建议是把工程拉到本地,从他的测试用例入手:https://github.com/PhantomThi...把我看到的一些关键点汇报给大家,方便方便大家自己去看的时候整理思路。首先要从它的构造方法说起。作者把每个入参的含义都标注的很清楚了:假设我们的构造函数是这样的,意思是只用一个线程建立3个线程池。队列大小为200:KeyAffinityExecutorexecutorService=KeyAffinityExecutor.newSerializingExecutor(3,200,"WHY-POOL-%d");首先,我们要找出“只有一个线程的线程池”的构建逻辑在哪里。这个方法隐藏在构造函数中:com.github.phantomthief.pool.KeyAffinityExecutorUtils#executor(java.lang.String,int)这里可以看到我们一直说的“只有一个线程的线程池”,队列的长度of也可以指定:该方法返回一个Supplier接口,后面会用到。接下来,我们要找出数字“3”体现在哪里?隐藏在构造函数的build方法中,最终会调用到这个方法:com.github.phantomthief.pool.impl.KeyAffinityImpl#KeyAffinityImpl可以在这个地方打个断点,然后Debug看看。很清楚了:下面我来解释一下盒子这部分的关键参数:首先是count参数,也就是我们定义的3。那么range(0,3)就是0,1,2。然后是supplier,也就是我们前面提到的executor方法返回的supplier接口。可以看到它封装了一个线程池。然后里面有一个很关键的操作:map(ValueRef::new)。这个操作中的ValueRef对象非常关键:com.github.phantomthief.pool.impl.KeyAffinityImpl.ValueRef的key就是这个对象中的并发变量。还记得前面提到的“选择最空闲的执行器(线程池)”这句话吗?如何判断是否空闲?依赖并发变量。对应的代码在这里:com.github.phantomthief.pool.impl.KeyAffinityImpl#select可以到断点,说明当前key之前没有映射过,所以需要为其指定一个线程池。指定这个线程池的操作是循环all集合,其中包含ValueRef对象:因此,compareInt(ValueRef::concurrency)方法是选择当前所有并发数最少的线程池。如果这个线程池从来没有被使用过或者当前没有任务在使用,那么并发度一定是0,都会被选中。如果所有线程池都被使用,将选择并发值最低的线程池。我只是在这里给你一个大概的想法。如果想深入了解,可以自己去看源码。如果你非常了解lambdas的用法,你会觉得写起来真的很优雅,很舒服。如果你不知道lambdas...你为什么不快点?此外,我还发现了两个熟悉的东西。朋友们请看这是什么:这不是线程池参数的动态调整吗?第二个是这样的:我也写过RabbitMQ中的动态调整,也强调了这三个地方:增加{@link#setCapacity(int)}和{@link#getCapacity()}{@link#capacity}判断边界由==改为>=部分signal()信号触发改为signalAll()另外,作者还提到在RabbitMQ版本中会存在导致NPE的bug。这个我没有详细研究过。有兴趣的可以对比一下代码,应该就能知道问题出在哪里了。聊聊Dubbo为什么要聊聊Dubbo?因为我好像在Dubbo中发现了KeyAffinityExecutor的踪迹。为什么说好像呢?因为最后没有合并到代码库中。对应的链接在这里:https://github.com/apache/dub...这次提交了这么多文件:你可以找我们熟悉的东西:其实思路是一样的,只是你会我发现即使思路一样,两个不同的人写出来的代码结构还是有很大区别的。在这里,Dubbo让代码的层次更加明显。例如,它定义了一个抽象的AbstractKeyAffinity对象,然后实现了两种方案:随机和最小并发。这些细节存在差异。但是这个代码的提供者最后并没有使用这些代码,而是想出了一个替代方案:https://github.com/apache/dub...在这次提交中,他主要提交了这个类:org.apache.dubbo.common.threadpool.serial.SerializingExecutor这个类,顾名思义,强调序列化。带大家看看它的测试用例,就知道它是怎么用的了:首先,它的构造方法以另一个线程池为参数。然后使用SerializingExecutor的execute方法提交任务。任务内部,所做的就是从map中取出val对应的key,然后加1放回去。大家都知道上面的操作在多线程的情况下是线程不安全的,最终的结果肯定小于循环次数。但是,如果是单线程的情况,肯定是没问题的。那么如何将线程池映射到单线程呢?SerializingExecutor正是这样做的。而且它的原理也很简单,核心代码就那么几行。首先,它自己建立了一个队列:所有提交的任务都被丢进队列中。接下来,一一执行。如何保证一一落实?方法有很多,这里用一个AtomicBoolean对象来控制:这样就实现了序列化多线程任务的场景。只是想知道Dubbo中目前没有使用SerializingExecutor类。但是,如果你要实现这样一个奇怪的功能,比如别人给你一个线程池,你自己的过程中出于一些考虑需要序列化任务,这时候一定不要去碰别人的线程池。是的,那么你可以把Dubbo看作是一个现成的、更优雅、更引人注目的解决方案。最后说一句,我在这里看到了,转推,看,点赞,排一个就好了,你都排我不介意。写文章很累,需要一点积极的反馈。在这里给各位读者朋友敲一下:本文已收录到我的个人博客中,欢迎大家前来游玩。https://www.whywhy.vip/