当前位置: 首页 > 后端技术 > Java

Dubbo的简单使用与Triple协议Streaming通信的实现

时间:2023-04-01 18:49:04 Java

商业转载请联系作者授权,非商业转载请注明出处。如需商业使用,请联系作者授权。非商业用途请注明出处。协议(许可):署名-非商业使用-相同方式共享4.0国际(CCBY-NC-SA4.0)作者(Author):废码链接(URL):https://waste-code.tech/archi..来源(Source):WasteCode文章概览项目模块公共模块——实现实体类并声明暴露的api接口提供者模块——业务实现暴露的api接口消费者模块——请求接口的实现,暴露的api接口会用到GITHUB:Dubbo的简单使用和三重协议的流式通信的实现官方文档:三重协议博客目的:记录实现过程和问题Dubbo的简单使用在公共模块中定义实体类User在实现接口UserService的公共模块publicinterfaceUserService{/***获取用户信息*@paramname*@return*/UsergetUserInfo(Stringname);}在provider和consumer模块中引入相关依赖org.apache.dubbodubbo-spring-boot-starter3.0.7org.apache.dubbodubbo-registry-zookeeper3.0.7com.samplecommon0.0.1-SNAPSHOT在provider和consumer模块中创建application.yml文件并写入相关配置server:port:8082#这里填写端口号,provider和consumer不同,spring:application:name:consumerdubbo:protocol:name:dubbo#选择通信协议端口:-1registry:id:zk-zookeeper地址:zookeeper://127.0.0.1:2181在provider和中编写启动类consumer,这里以consumer模块为例,这里需要添加EnableDubbo注解}}ImplementUserServiceintheprovider//注意这里使用了注解@DubboService,也就是dubbo中的Service注解,主要是在对外提供服务的实现类上@DubboServicepublic类UserServiceImpl实现UserService{@OverridepublicUsergetUserInfo(Stringname){Useruser=newUser();user.setName("dubbo");user.setAge(12);返回用户;}}在consumer中实现request接口,引用provider模块暴露的接口,使用DubboReference注解@RestController@RequestMapping("/user")publicclassUserController{@DubboReferenceprivateUserServiceuserService;@GetMapping("/info")publicUsergetUserInfo(){returnuserService.getUserInfo("xxx");}}写完代码,启动provider和consumer模块,然后通过Postman工具调用接口,发现可以正常使用了。Triple协议的Streaming通信就完成了。Triple协议的Stream通信主要分为三种:服务端流式传输、客户端流式传输、双向流式应用场景接口,需要发送大量数据,不能放在一个请求中.流式场景需要分批发送,数据需要按照发送的先后顺序进行处理。数据本身是无边界推送的类场景下,多条消息在同一个调用上下文中发送和处理。流的语义保证(优势)提供了消息边界,可以方便消息的单独处理。严格有序,发送端和接收端的顺序一致全双工,无需等待发送支持取消和超时流式流通信实现服务端流(SERVER_STREAM)请求流程服务端流的Java实现(SERVER_STREAM)在provider和consumer模块中添加相关依赖com.谷歌.protobufprotobuf-java修改provider和consumer模块中的相关配置dubbo:#这里只拦截需要更改的配置,其他配置默认为原协议:name:tri#修改dubbo的通信协议。当然三重协议也支持之前dubbo的简单使用。在公共模块的UserService中声明相关的api接口/***serverstream*@paramname*@paramresponse*/voidsayHelloServerStream(Stringname,StreamObserverresponse)throwsInterruptedException;在provider模块中实现相关功能//StreamObserver是接收消息的观察者,//调用onNext方法后,consumer模块中的consumer会获取相关数据,//当调用onCompleted方法时,整个server流消费者模块执行完最终处理后会结束//这里的延迟是10s,主要测试从provider模块接收数据会不会有10s的延迟Thread.sleep(10*1000);response.onNext("你好,"+name+",thesecondtime");response.onCompleted();}在consumer模块写request方法/***测试服务端流程*@paramname*@return*@throwsInterruptedException*/@GetMapping("/sayHallo/{name}")publicListsayHallo(@PathVariable("name")Stringname)throwsInterruptedException{Listlist=newArrayList<>();用户服务。sayHelloServerStream(name,newStreamObserver(){//provider模块每次调用onNext时,该方法都会执行一次@OverridepublicvoidonNext(Stringdata){System.out.println("onNext:"+data);list.add(data);}@OverridepublicvoidonError(Throwablethrowable){System.out.println("Anerrorwasreported");}//当provider模块的onCompleted方法被调用时,执行这个方法@OverridepublicvoidonCompleted(){System.out.println("End");}});returnlist;}client(CLIENT_STREAM)streamrequestprocessbidirectionalstream(BIDIRECTIONAL_STREAM)requestprocessclientstream(CLIENT_STREAM)/bidirectionalstream(BIDIRECTIONAL_STREAM)的java实现clientstream和bidirectionalstream在Java中引用pom和修改的方式相同配置与公共模块中的服务器流相同说明相关接口/***客户端流/双向流,这里返回的StreamObserver类中的处理实际上是在provider模块中实现的,*而参数StreamObserver是在consumer模块中实现的,虽然consumer调用了这个方法*@paramresponse*@return*/StreamObserversayHelloStream(StreamObserverresponse);在提供者模块中实现相关方法@OverridepublicStreamObserversayHelloStream(StreamObserverresponse){returnnewStreamObserver(){@OverridepublicvoidonNext(Stringdata){System.out.println("Serverrequest参数:"+数据);response.onNext("你好,"+data);}@OverridepublicvoidonError(Throwablethrowable){}@OverridepublicvoidonCompleted(){System.out.println("providerclosed");响应.onCompleted();}};}在消费者模块中实现方法调用@PostMapping("/sayHallo")publicListsayHallo(@RequestBodyListnames){List;list=newArrayList<>();StreamObserverrequest=userService.sayHelloStream(newStreamObserver(){@OverridepublicvoidonNext(Stringdata){System.out.println("Whatdidyousay?"+data);list.add(data);}@OverridepublicvoidonError(Throwablethrowable){}@OverridepublicvoidonCompleted(){System.out.println("End");}});//上面定义了StreamObserver并调用方法后,通过下面的onNext方法调用发送请求names.forEach(item->{request.onNext(item);try{Thread.sleep(10*1000);}catch(InterruptedExceptione){thrownewRuntimeException(e);}});请求.onCompleted();返回列表;}