一起来学习管道模式,你知道吗?转载本文请联系JavaKeeper公众号。流水线模式不属于23种设计模式之一(它是责任链模式的变种),但是在我们实际的业务架构中还是有很多场景适用的,主要用于将复杂的流程分解成多个独立的子任务像流水线一样执行,让我们理解。1、开篇假设我们有这样一个需求,读取文件内容,过滤掉包含“hello”的字符串,然后逆向。linux一行获取cathello.txt|grep"hello"|rev用世界上最好的语言实现也??很简单JavaFilefile=newFile("/Users/starfish/Documents/hello.txt");Stringcontent=FileUtils.readFileToString(file,"UTF-8");ListhelloStr=Stream.of(content).filter(s->s.contains("hello")).collect(Collectors.toList());System.out.println(newStringBuilder(String.join("",helloStr)).reverse().toString());假设我们上面的场景是在一个大型系统中。这样的数据流,需要多次复杂的逻辑处理,或者一系列的处理,是不是像上面那样放在一个大组件里面?这种设计完全违背了单一职责原则。当我们增加或减少一些处理逻辑时,我们必须改变整个组件。几乎没有可扩展性和可重用性~~有没有一种模式可以把整个处理流程进行细化划分,并且划分出来的各个小模块相互独立,负责一小段逻辑处理。这些小模块可以依次连接起来那么,上一个模块的输出作为下一个模块的输入,最后一个模块的输出就是最终的处理结果?这样,在修改逻辑时,只针对某个模块进行修改,处理逻辑的增减也可以细化到某个模块粒度,每个模块都可以复用,大大增强了复用性。好了,这就是我们要说的管道模式2.定义管道模式(PipelinePattern)是责任链模式(ChainofResponsibilityPattern)的常用变体之一。顾名思义,管道模式就像一个连接多个对象的管道。整体看起来就像几个阀门嵌套在管道中,处理逻辑就放在阀门上。被处理物进入管道后,分别通过各个阀门,每个阀门都会对传入的对象进行一些逻辑处理,逐层处理后,从管道的末端出来。此时的对象就是已经处理过的目标对象。管道模式用于将复杂的流程分解为多个独立的子任务。每个单独的任务都是可重用的,因此可以将这些任务组合成复杂的流程。PS:在纯责任链模式下,链上只有一个处理器处理数据,而在流水线模式下,多个处理器会处理数据。3.角色管道模式:对于管道模式,有3个对象:Valve:处理数据的节点,或者叫filter,stagepipeline:组织各个valveclient:构造管道,调用它代码消化更快。我们使用流水线方式来实现文章开头的小需求1.Processor(流水线的各个阶段)publicinterfaceHandler{Oprocess(Iinput);}2.定义具体的processor(valve)publicclassFileProcessHandlerimplementsHandler{@OverridepublicStringprocess(Filefile){System.out.println("===fileprocessing===");try{returnFileUtils.readFileToString(file,"UTF-8");}catch(IOExceptione){e.printStackTrace();}returnnull;}}publicclassCharacterFilterHandlerimplementsHandler{@OverridepublicStringprocess(Stringinput){System.out.println("===characterfilter===");Listhello=Stream。of(input).filter(s->s.contains("hello")).collect(Collectors.toList());returnString.join("",hello);}}publicclassCharacterReverseHandlerimplementsHandler{@OverridepublicStringprocess(Stringinput){System.out.println("===反向字符串===");returnnewStringBuilder(input).rreverse().toString();}}3.流水线addHandler(HandlernewHandler){returnnewPipeline<>(input->newHandler.process(currentHandler.process(input)));}Oexecute(Iinput){returncurrentHandler.process(input);}}4。客户端使用importlombok.val;publicclassClientTest{publicstaticvoidmain(String[]args){Filefile=newFile("/Users/apple/Documents/hello.txt");valfilters=newPipeline<>(newFileProcessHandler()).addHandler(newCharacterFilterHandler()).addHandler(newCharacterReverseHandler());System.out.println(filters.execute(file));}}5.结果UML类图产品又来了,这次要删掉hello.txt里面的world字符是三,五,和两个。我精通shell编程,我已经想通了cathello.txt|grephello|rev|tr-d'world'Java的做法,你应该很清楚吧。就是将一个任务处理分解成若干个处理阶段(Stage),其中每个处理阶段的输出作为下一个处理阶段的输入,每个处理阶段都有相应的工作线程进行相应的计算因此,在处理一批任务时,每个任务的每个处理阶段都是并行的(Parallel)。Pipeline模式通过并行计算,使应用程序能够充分利用多核CPU资源,提高计算效率。——?优点将复杂的处理流程分解成独立的子任务,解耦上下游处理逻辑,也方便你对每个子任务的测试分解后的子任务也可以在复杂的不同处理流程中复用非常容易添加、删除和替换流程中的子任务,而不会对现有流程产生任何影响,这增加了模式的可扩展性和灵活性。每个处理单元都可以被修补和监控。(这就是切面编程)Pipelinedepth模式需要注意的事情:Pipeline中Pipes的个数称为Pipelinedepth。所以我们是利用Pipeline的深度和JVM主机的CPU数量之间的关系。如果Pipeline实例的任务大多是CPU密集型的,深度最好不要超过Ncpu。如果Pipeline处理的任务多为I/O密集型,那么Pipeline的深度最好不要超过2*Ncpu。基于线程池的Pipe:如果Pipe实例使用线程池,由于存在多个Pipe实例,更容易出现线程死锁,需要慎重考虑。错误处理:当Pipe实例执行其任务时发生的异常可能需要在相应的Pipe实例之外进行处理。此时通常有两种处理方式:一种是在每个Pipe实例捕获到异常后调用PipeContext实例的handleError处理错误。另一种是创建负责错误处理的Pipe实例。其他Pipe实例捕获异常并将相关数据提交给该Pipe实例进行处理。ConfigurablePipeline:Pipeline模式可以通过代码添加多个Pipe实例,也可以通过配置文件动态添加Pipe。6.JavaFunction如果你的流水线逻辑真的很简单,可以直接使用Java8提供的Function。具体实现如下:Filefile=newFile("/Users/apple/Documents/hello.txt");FunctionreadFile=input->{System.out.println("===fileprocessing===");try{returnFileUtils.readFileToString(input,"UTF-8");}catch(IOExceptione){e.printStackTrace();}returnull;};FunctionfilterCharacter=input->{System.out.println("===characterfilter===");Listhello=Stream.of(input).filter(s->s.contains("hello")).collect(Collectors.toList());returnString.join("",hello);};FunctionreverseCharacter=input->{System.out.println("===reversestring===");返回newStringBuilder(input).reverse().toString();};finalFunctionpipe=readFile.andThen(filterCharacter).andThen(reverseCharacter);System.out.println(pipe.apply(文件));不过最后,并不是一遇到这种流式处理任务就马上使用管道。Pipeline模式中每个处理阶段使用的工作线程或线程池,代表每个阶段输入/输出对象的创建和确定(in和outQueue)有自己的时间和空间开销,所以在使用Pipeline模式时,需要考虑它所付出的代价。建议处理大型任务,否则得不偿失。参考https://java-design-patterns.com/patterns/pipeline/https://developer.aliyun.com/article/778865https://yasinshaw.com/articles/108《Java多线程编程实战指南(设计模式篇)》