当前位置: 首页 > 科技观察

Java9异步编程-ReactiveStream使用

时间:2023-03-13 16:23:32 科技观察

本文主要研究Java9+中的ReactiveStream。简单的说就是使用Flow类,它包含了构建反应式流处理的主要模块。这里说的reactivestream其实就是一个非阻塞背压异步流处理标准(有点绕)。该规范在ReactiveManifesto中定义,并且有各种实现,例如RxJava或Akka-Streams。反应式API概述要构建流,使用三个抽象并将其组合到异步处理逻辑中。每个流都需要处理Publisher实例发布给它的事件;Publisher有一个subscribe()方法。如果订阅者想要接收发布者发布的事件,他需要使用subscribe()来订阅发布者。消息的接收者需要实现Subscriber接口。通常,Receiver是每个Flow处理的终点,因为它的实例不发送进一步的消息。您可以将Subscriber视为Sink。有四种方法需要重写onSubscribe()、onNext()、onError()和onComplete()。如果要转换传入消息并将其进一步传递给下一个订阅服务,则需要实现Processor接口。它既充当订阅服务(因为它接收消息)又充当发布服务(因为它处理这些消息并发送它们以供进一步处理)。发布和使用消息假设您要创建一个简单的流,其中发布者发布消息,而简单的订阅者在消息到达时使用它们。首先创建一个EndSubscriber类。需要实现订阅服务接口。接下来,覆盖所需的方法。onSubscribe()方法在处理开始之前被调用。订阅实例作为参数传递。订阅是一个类,它控制订阅服务和发布服务之间的消息流。1publicclassEndSubscriberimplementsSubscriber{2//需要消费多少条消息?3privatefinalAtomicIntegerhowMuchMessagesToConsume;4privateFlow.Subscriptionsubscription;consumedElements=newLinkedList<>();78publicEndSubscriber(IntegerhowMuchMessagesToConsume){9this.howMuchMessagesToConsume=newAtomicInteger(howMuchMessagesToConsume);10}1112@Override13publicvoidonSubscribe(Flow.Subscriptionsubscription){14this.subscription=subscription;15subscription.request(1);16}17}在测试中使用的消耗元素的空列表也在这里初始化。现在,其余的方法需要从订阅者接口中实现。这里主要的方法是onNext(),在发布者发布新消息时调用1@Override2publicvoidonNext(Titem){3System.out.println("Got:"+item);4consumedElements.add(item);5subscription。请求(1);6}这里需要注意的是,在onSubscribe()方法中启动订阅时,onNext()处理消息时,需要调用订阅上的request()方法通知当前订阅者已经准备好使用更多新闻。最后需要实现onError(),在处理过程中抛出异常时调用。OnComplete()在发布者关闭时被调用。1@Override2publicvoidonError(Throwablet){3t.printStackTrace();){8System.out.println("Done");9}接下来,为这个处理流程编写一个测试。我们将使用SubmissionPublisher类,它是java.util.concurrent中实现Publisher接口的类。测试中向发布者提交N个元素,我们的终端订阅者会收到这些元素。1@Test2publicvoidwhenSubscribeToIt_thenShouldConsumeAll()3throwsInterruptedException{45//given6SubmissionPublisherpublisher=newSubmissionPublisher<>();7EndSubscribersubscriber=newEndSubscriber<>();8publisher.subscribe(ListassertThat(subscriber.consumedElements)20.containsExactlyElementsOf(items)21);22}注意close()方法在发布者实例上被调用。它将在每个订阅者上调用onComplete()。程序的输出如下:1Got:12Got:x3Got:24Got:x5Got:36Got:x7Done消息转换假设你想在发布者和订阅者之间做一些数据转换。下面我创建了一个TransformProcessor类,它实现Processor并扩展SubmissionPublisher,因为它同时包含发布者和订阅者。并且将传入一个函数以将输入转换为输出。1importjava.util.concurrent.Flow;2importjava.util.concurrent.SubmissionPublisher;3importjava.util.function.Function;45publicclassTransformProcessorextendsSubmissionPublisherimplementsFlow.Processor{6privateFunction函数;7privateFlow.Subscriptionsubscription;89publicTransformProcessor(Functionfunction){10super();11this.function=function;12}1314@Override15publicvoidonSubscribe(Flow.Subscriptionsubscription){16this.subscription=subscription;17subscription.request(1);18}1920@Override21publicvoidonNext(Titem){22submit(function.apply(item));23subscription.request(1);24}2526@Override27publicvoidonError(Throwablet){28t.printStackTrace();29}3031@Override32publicvoidonComplete(){33close();34}35}这里的TransformProcessor将把String转换为两个String,看下面我写的测试用例。1@Test2publicvoidwhenSubscribeAndTransformElements_thenShouldConsumeAll(){3//given4SubmissionPublisherpublisher=newSubmissionPublisher<>();5Functiondup=x->x.concat(x);6TransformProcessortransformProcessor7=newTransformProcessor<>(dup);8EndSubscribersubscriber=newEndSubscriber<>(6);9Listitems=List.of("1","2","3");10ListexpectedResult=List.of("11","22","33");11//当12publisher.subscribe(transformProcessor);13transformProcessor.subscribe(subscriber);14items.forEach(publisher::submit);15publisher.close();1617await()。atMost(1000,TimeUnit.MILLISECONDS)18.untilAsserted(()->assertTrue(subscriber.consumedElements.containsAll(expectedResult)));19}使用订阅控制消息需求假设你只想消费第一条消息,应用一些逻辑并完成处理。这可以使用request()方法来实现。修改修改下:1publicClassendSubscriberstrumentflow.subscriber{2//////////////////nprivate.subscription;4private.subscription;5///////7<5/inpoblientsement;IntegerhowMuchMessagesToConsume){9this.howMuchMessagesToConsume=newAtomicInteger(howMuchMessagesToConsume);10}1112@Override13publicvoidonSubscribe(Flow.Subscriptionsubscription){14this.subscription=订阅;15subscription.request(1);16}1718@Override19publicvoidonNext(标题){20howMuchMessagesToConsume();//减一21System.out.println("Got:"+item);22consumedElements.add(item);23if(howMuchMessagesToConsume.get()>0){24subscription.request(1);25}26}2728@Override29publicvoidonError(Throwablet){30t.printStackTrace();31}3233@Override34publicvoidonComplete(){35System.out.println("Done");36}37}测试1@Test2publicvoidwhenRequestForOnlyOneElement_thenShouldConsumeOne(){3//given4SubmissionPublisherpublisher=newSubmissionPublisher<>();5EndSubscribersubscriber=newEndSubscriber<>(1);6publisher.subscribe(subscriber);7Listitems=List.of("1","x","2","x","3","x");8Listexpected=List.of("1");910//when11assertEquals(publisher.getNumberOfSubscribers(),1);12items.forEach(publisher::submit);13publisher.close();1415//then16await().atMost(1000,TimeUnit.MILLISECONDS)17.untilAsserted(()->18assertTrue(subscriber.consumedElements.containsAll(expected)))19);20}即使Publisher发布了6个元素,EndSubscriber也只会使用一个元素,因为它表示只需要处理这一个元素通过在Subscription上使用request()方法,我们可以实现更复杂的反压机制来控制消息消费的速度。