当前位置: 首页 > 科技观察

java手写RPC从零开始基于Websocket实现

时间:2023-03-12 13:31:37 科技观察

RPC解决问题RPC主要解决两个问题:(1)解决分布式系统中服务之间的调用问题。(2)进行远程调用时,要像本地调用一样方便,让调用者感知不到远程调用的逻辑。本节我们将学习如何基于websocket实现最简单的rpc调用,后面会实现基于netty4的版本。开源地址:https://github.com/houbb/rpc左边的Client对应前面的ServiceA,右边的Server对应ServiceB的完整流程,下面一步步详细解释.(1)在ServiceA的应用层代码中,调用了Calculator的一个实现类的add方法,希望进行加法运算;(2)这个Calculator实现类并没有直接实现计算器的加减乘除逻辑,而是通过远程调用ServiceB的RPC接口获取运算结果,所以称为Stub;(3)Stub如何与ServiceB建立远程通信?这时候就需要一个远程通信的工具,就是图中那个Run-timeLibrary,这个工具会帮你实现远程通信的功能,比如JavaSocket,就是这样一个库,当然你可以也可以使用基于Http协议的HttpClient,或者其他通信工具,都可以,RPC并没有规定你要使用哪种协议进行通信;(4)Stub通过调用通信工具提供的方法与ServiceB建立通信,然后将请求数据发送给ServiceB。需要注意的是,由于底层网络通信是基于二进制格式的,所以Stub传递给通信工具类的数据也必须是二进制的,比如calculator.add(1,2),必须把参数值1而2变成一个Request对象(这个Request对象当然不仅仅是这个信息,还包括调用哪个服务的哪个RPC接口等其他信息),然后序列化成二进制,再传递给通信工具类,这也将在下面的代码实现中得到体现;(5)将二进制数据传输到ServiceB端,当然ServiceB也有自己的通信工具,通过这个通信工具接收二进制请求;(6)既然数据是二进制的,自然要反序列化,将二进制数据反序列化成一个请求对象,然后把请求对象交给ServiceB的Stub处理;(7)和之前ServiceA的Stub一样,这里的Stub也是一个“假的东西”,它负责的只是解析请求对象,知道调用者要调用哪个RPC接口,参数是什么传入,然后将这些参数传递给对应的RPC接口,也就是Calculator真正要执行的实现类。很显然,如果是Java,那么这里肯定要用到反射。(8)RPC接口执行后,返回执行结果。现在轮到ServiceB向ServiceA发送数据了,怎么发送呢?同样的道理,同样的流程,只是现在ServiceB变成了Client,而ServiceA变成了Serveronly:ServiceB反序列化执行结果->传递给ServiceA->ServiceA反序列化执行结果->返回结果给Application,完全的。简单实现假设服务A要调用服务B的一个方法,因为不在同一个内存中,所以不能直接使用。如何实现类似Dubbo的功能?这里不需要使用HTTP级别的通信,直接使用TCP协议即可。公共公共模块,公共对象的定义。Rpc常量publicinterfaceRpcConstant{/***地址*/StringADDRESS="127.0.0.1";/***端口号*/intPORT=12345;}请求输入publicclassRpcCalculateRequestimplementsSerializable{privatestaticfinallongserialVersionUID=6420751004355300996L;/***参数一*/privateintone;/***参数二*/privateinttwo;//getter&setter&toString()}服务接口publicinterfaceCalculator{/***计算加法*@paramone参数一*@paramtwo参数二*@return返回结果*/intadd(intone,inttwo);}server服务接口实现publicclassCalculatorImplimplementsCalculator{@Overridepublicintadd(intone,inttwo){returnone+two;}}启动服务publicstaticvoidmain(String[]args)throwsIOException{Calculatorcalculator=newCalculatorImpl();try(ServerSocketverlistenerock(RpcConstant.PORT)){System.out.println("服务器启动:"+RpcConstant.ADDRESS+":"+RpcConstant.PORT);while(true){try(Socketsocket=listener.accept()){//将请求反序列化ObjectInputStreamobjectInputStream=newObjectInputStream(socket.getInputStream());Objectobject=objectInputStream.readObject();System.out.println("Requestis:"+object);//调用服务intresult=0;if(objectinstanceofRpcCalculateRequest){RpcCalculateRequestcalculateRpcRequest=(RpcCalculateRequest)object;result=calculator.add(calculateRpcRequest.getOne(),calculateRpcRequest.getTwo());}//返回结果ObjectOutputStreamobjectOutputStream=newObjectOutputStream(socket.getOutputStream());objectOutputStream.writeObject(result);}catch(Exceptione){e.printStackTrace();}}}}启动日志:ServerClient启动:127.0.0.1:12345clientClient调用publicstaticvoidmain(String[]args){Calculatorcalculator=newCalculatorProxy();intresult=calculator.add(1,2);System.out.println(result);}计算代理类publicclassCalculatorProxyimplementsCalculator{@Overridepublicintadd(intone,inttwo){try{Socketsocket=newSocket(RpcConstant.ADDRESS,RpcConstant.PORT);//将请求顺序列表化RpcCalculateRequestcalculateRpcRequest=newRpcCalculateRequest(one,two);ObjectOutputStreamobjectOutputStream=newObjectOutputStream(socket.;//发送请求向服务提供者请求对象tputStream.writeObject(calculateRpcRequest);//反序列化响应体;}}catch(IOException|ClassNotFoundExceptione){thrownewRuntimeException(e);}}}调用日志客户端3server端服务器端启动:127.0.0.1:12345Requestis:RpcCalculateRequest{one=1,two=2}