当前位置: 首页 > 科技观察

《一起玩Dubbo》系列之四:服务是如何调用的

时间:2023-03-15 18:42:33 科技观察

了解过rpc的应该都听说过rpc就是解决本地调用远程方法的问题。执行过程,那么这个过程是怎样的呢?同样的,我继续在dubbo流程图中画出我的流程。首先我按照文章一起玩dubbo,先搭建一个demo,包括注册中心和服务。消费者和服务商,一起来看看整个过程吧。为了方便说明,我们先做个demo。这是服务提供者:"+RpcContext.getContext().getRemoteAddress());return"Hello"+name+",responsefromprovider:"+RpcContext.getContext().getLocalAddress();}}这是服务消费者publicclassConsumer{publicstaticvoidmain(String[]args){ClassPathXmlApplicationContext=newClassPathXmlApplicationContext(newString[]{"META-INF/spring"+"/dubbo-demo-consumer.xml"});context.start();DemoServicedemoService=(DemoService)context.getBean("demoService");while(true){try{Thread.sleep(1000);Stringhello=demoService.sayHello("world");System.out.println(hello);}catch(Throwablethrowable){throwable.printStackTrace();}}}}我在这里打破了观点image-20210714015553406到服务消费者的结尾在底层,我们可以看到在开始分析细节之前,我们在头脑风暴过程中需要经过哪些步骤。不看dubbo代码大概也能猜到:要知道远程服务的地址,把要调用的方法的具体信息告诉远程服务,让远程服务解析这些信息。远程服务根据信息找到对应的实现类并调用。调用完成后,同样返回调用结果,然后客户端解析并响应第一点,我们通过前面的文章已经知道了。消费者在发起调用时就已经知道了远程服务的地址,那么要调用的方法的具体信息是什么呢?客户端要告诉服务端调用哪个接口,所以需要方法名,方法参数类型,方法参数值,然后可能有多个版本,所以要带上版本号。有了这些数据之后,服务端就可以准确的调用具体的方法了。我将首先在mdata中发布上面调用的示例。就是我上面提到的数据。看到这个Request,远程调用的基本原理应该就清楚了。这时候很容易想到另一个问题,消费者和提供者如何沟通?消费者和提供者如何沟通?其实很简单,就是消费者和提供者通过协议进行通信。Dubbo的协议很常见的header+body的形式,还有一个特殊字符0xdabb,用来解决TCP网络粘包问题。这种header是定长的,比较常见的做法是在header中填入body的长度,包括我们的游戏框架也是采用这种模式。我们可以看一下dubbo协议的ghost,可以看到协议分为协议头和协议体。16字节的header主要携带magicnumber,也就是之前提到的0xdabb,然后是一些请求设置,消息体Length等。16字节之后是协议体,包括协议版本,接口名称,接口版本,方法名称,ETC。看到这里,很容易引出另一个问题。协议是如何序列化的?协议的序列化?序列化的概念实际上是一个简短的回答。消费者首先将Java对象转换为字节序列。这个过程也称为对象的序列化,然后在服务器端将字节序列还原为Java对象。这个过程称为对象的反序列化。Dubbo默认使用hessian2序列化协议。Hessian2是阿里Hessian的修改版,应该不错。粗略的总结一下,就是当消费者发起调用的时候,那一刻,其实是调用了代理类,最终是代理类调用了Client。Client将Java对象序列化生成协议体,然后通过网络传输给服务端。服务器Server收到这个请求后,分发给业务线程池,具体实现方法由业务线程调用。首先看官网图,分析消费者的调用链接。我们先看一下服务消费者的调用逻辑。你可以看看我的照片。继续说可以看到调用接口生成的代理类是的,在invoke的时候,一些不需要拦截的方法会先释放,比如toString。、参数类和参数值。接下来仔细看一下MockClusterInvoker#invoke的代码,先解释一下为什么要进来MockClusterInvoker。如果看了文章想学习dubbo,应该能看懂这个过程。这个过程可以认为是一个集合Baby,A集合B,B集合C,已经设置到最外层的invoker就是MockClusterInvoker。如果你不明白这个过程,你可以回头看我的文章。这是非常制服但非常实用。可以看这里来判断配置里面有没有。配置mock,mock的话后面会展开,继续看this.invoker.invoke的实现,其实会调用AbstractClusterInvoker#invoker,这里涉及到一个模板方法设计模式,其实很简单,就是就是,在抽象类中设置代码的执行骨架,然后将具体的实现延迟到子类中,由子类来决定逻辑,这样可以在不改变整体执行步骤的情况下修改步骤中的实现,减少重复代码,也有利于扩展。开闭原则。接下来,让我们看看我们做了什么。这一步相当重要。我们分开说吧。其实就是先通过路由过滤一波,然后返回调用者继续看doInvoke流程。我们默认使用FailoverClusterInvoker,即失败。自动切换的容错模式,这里为什么默认是这个。其实从实用的角度来说,失败后自动切换到下一个服务实例的场景更合适。如果要替换其他模式,可以在xml中配置,我们继续。看doInvokepublicResult的实现intlen=getUrl().getMethodParameter(methodName,Constants.RETRIES_KEY,Constants.DEFAULT_RETRIES)+1;if(len<=0){len=1;}//retryloop.RpcExceptionle=null;//lastexception.List>invoked=newArrayList>(copyinvokers.size());//invokedinvokers.Setproviders=newHashSet(len);//重试次数for(inti=0;i0){checkWhetherDestroyed();copyinvokers=list(invocation);//checkagaincheckInvokers(copyinvokers,invocation);}//通过负载选择了一个invokerInvokerinvoker=select(loadbalance,invocation,copyinvokers,invoked);invoked.add(invoker);//上文保留调用过的invokerRpcContext.getContext().setInvokers((List)invoked);try{//发起调用Resultresult=invoker.invoke(invocation);if(le!=null&&logger.isWarnEnabled()){logger.warn("虽然重试方法"+methodName+"intheservice"+getInterface().getName()+"wassuccessfulbytheprovider"+invoker.getUrl().getAddress()+",buttherehavebeenfailedproviders"+providers+"("+providers.size()+"/"+copyinvokers.size()+")fromtheregistry"+directory.getUrl().getAddress()+"ontheconsumer"+NetUtils.getLocalHost()+"usingthedubboversion"+Version.getVersion()+".Lasterroris:"+le.getMessage(),le);}returnresult;}catch(RpcExceptione){if(e.isBiz()){//bizexception.throwe;}le=e;}catch(Throwablee){le=newRpcException(e.getMessage(),e);}finally{providers.add(invoker.getUrl().getAddress());}}thrownewRpcException(le!=null?le.getCode():0,"Failedtoinvokethemethod"+methodName+"intheservice"+getInterface().getName()+".Tried"+len+"timesofttheproviders"+providers+"("+providers.size()+"/"+copyinvokers.size()+")fromtheregistry"+directory.getUrl().getAddress()+"ontheconsumer"+NetUtils.getLocalHost()+"usingthedubboversion"+Version.getVersion()+".Lasterroris:"+(le!=null?le.getMessage():""),le!=null&&le.getCause()!=null?le.getCause():le);}这个调用的简要总结是FailoverClusterInvoker得到返回从DirectoryInvoker列表中,经过路由后,通过LoadBalance从Invoker列表中选择一个Invoker,即负载均衡,最后FailoverClusterInvoker会将参数传递给所选Invoker实例的invoke方法,进行真正的远程调用发起调用的invoke后面在抽象类中调用invoke,然后在子类中调用doInvoker。抽象类中的方法很简单,就不展示了。我们直接看子类DubboInvoker的doInvoke方法。protectedResultdoInvoke(finalInvocationinvocation)throwsThrowable{RpcInvocationinv=(RpcInvocation)invocation;finalStringmethodName=RpcUtils.getMethodName(invocation);inv.setAttachment(Constants.PATH_KEY,getUrl().getPath());inv.setAttachment.ERSI。ExchangeClientcurrentClient;//selectclientif(clients.length==1){currentClient=clients[0];}else{currentClient=clients[index.getAndIncrement()%clients.length];}try{//是否异步booleanisAsync=RpcUtils.isAsync(getUrl(),invocation);//是否发送oneway,即是否需要返回值booleanisOneway=RpcUtils.isOneway(getUrl(),invocation);//超时时间inttimeout=getUrl().getMethodParameter(methodName,Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT);//无需返回值,isSent);//future直接NullRpcContext.getContext().setFuture(null);//返回空结果returnnewRpcResult();}elseif(isAsync){//异步发送ResponseFuturefuture=currentClient.request(inv,timeout);//设置futureRpcContext.getContext().setFuture(newFutureAdapter(future));//返回空结果returnnewRpcResult();}else{//同步发送RpcContext.getContext().setFuture(null);//直接调用future.get等待return(Result)currentClient.request(inv,timeout).get();}}catch(TimeoutExceptione){thrownewRpcException(RpcException.TIMEOUT_EXCEPTION,"Invokeremotemethodtimeout.method:"+invocation.getMethodName()+",provider:"+getUrl()+",cause:"+e.getMessage(),e);}catch(RemotingExceptione){thrownewRpcException(RpcException.NETWORK_EXCEPTION,"Failedtoinvokeremotemethod:"+调用。getMethodName()+",provider:"+getUrl()+",cause:"+e.getMessage(),e);}}这里可以看到有三种调用方式,分别是oneway,asynchronous,synchronous,我先说oneway是一种比较常见的方式,就是当我们不关心请求是否发送成功的时候,我们使用oneway来发送。这种方式消耗最少的异步调用。我们可以看到Dubbo其实是支持异步的。客户端发送请求后,会得到一个ResponseFuture,然后将future包装到context中,这样用户就可以从context中获取到future,然后调用者可以在调用future.whenComplete或之前做一波操作某事异步订购某事。同步调用,Dubbo底层也帮了我们的忙。可以看到Dubbo源码中调用了future.get,所以我们感觉调用这个接口的方法后会阻塞,必须等待结果到达。返回,所以它是同步的。那么这个回调是如何工作的呢?其实很简单。就是在调用的时候生成一个唯一的id,把回调和这个id缓存起来,然后把这个id传给服务器。并将这个id发回给消费者,消费者可以拿到回调触发器。下面我们从代码层面来看一下DefaultFuture是什么东西。它生成一个唯一的id并将其放入Futures的并发容器中。让我们看看它用在什么地方。这里比较清楚。收到返回的协议后,将future取出来触发。基于这种思想,很多回调都可以采用这种设计思想。这基本上就是服务消费者触发rpc行为的方式。其实还是很清楚的。首先在启动服务订阅的时候对调用者进行层层封装,然后在我们的接口中注入一个代理对象,然后在调用接口的时候一个一个调用invoker,最后把约定发送给服务提供者。喜欢,喜欢,简单明了的逻辑。接下来说一下服务提供者的调用过程。分析提供商的调用电路。同样,我们先看看服务提供者的调用链。这个过程也很漫长。我在这里只强调几个关键点。我们先来看HeaderExchangeHandler和handleRequest,这两个比较容易理解,就是把request对象中的数据取出来,传递给DubboProtocol.requestHandler。此数据是解码后的DecodeableRpcInvocation对象,它是Invocation接口的实现。我们可以看到里面有什么,看到这里是调用信息。接下来就简单了,根据这些参数获取对应的对象反射调用即可,然后看DubboProtocol的核心回复方法@OverridepublicObjectreply(ExchangeChannelchannel,Objectmessage)throwsRemotingException{if(messageinstanceofInvocation){Invocationinv=(Invocation)message;//获取根据调用的参数对应的调用者。其实就是之前暴露服务时从Exporter拿来的Invoker。invoker=getInvoker(channel,inv);//这里是回调的一些处理,不管if(Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))){StringmethodsStr=invoker.getUrl().getParameters().get("方法");booleanhasMethod=false;if(methodsStr==null||methodsStr.indexOf(",")==-1){hasMethod=inv.getMethodName().equals(methodsStr);}else{String[]methods=methodsStr.split(",");for(Stringmethod:methods){if(inv.getMethodName().equals(method)){hasMethod=true;break;}}}if(!hasMethod){logger.warn(newIllegalStateException("ThemethodName"+inv.getMethodName()+"notfoundincallbackserviceinterface,invokewillbeignored."+"pleaseupdatetheapiinterface.urlis:"+invoker.getUrl())+",invocationis:"+inv);returnnull;}}RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());//最后调用returninvoker.invoke(inv);}thrownewRemotingException(channel,"Unsupportedrequest:"+(message==null?null:(message.getClass().getName()+":"+message))+",channel:consumer:"+channel.getRemoteAddress()+"-->provider:"+channel.getLocalAddress());}getInvoker的逻辑也很简单。之前的文章ServiceExposure已经提到了这个过程。其实就是从一个DubboProtocol.exporterMap中找一个Exporter。然后从里面取出invoker,那么key是什么呢?key其实就是一个由url生成的serviceKey。此时可以通过Invocation中的信息恢复serviceKey,找到对应的Exporter和Invoker。我们来看看前面提到的JavassistProxyFactory。这是一个为提供者的服务对象生成代理的工厂类。上面提到,在调用invoker.invoke时,通过反射调用最终服务,实现相关逻辑。入口在这里。.因为这篇之前的文章已经讲的比较详细了,这里就不再赘述了。此时,呼叫已经是技术性的。下面看看调用完成后如何将结果返回给服务消费者。调用完成后,服务提供者会创建一个Response对象返回给服务消费者,那么在执行服务时自然会有两种结果:成功和失败。如果成功,返回值设置为Response的结果,Response的状态设置为OK。如果失败,将失败异常设置为Response的errorMessage,状态设置为SERVICE_ERROR。我们回到HeaderExchangeHandler.received中的代码来看。handleRequest之后,调用channel.send将Response发送给Client,这个channel封装了client-server通信链路,最终调用Netty框架将response写回给client。在实践的总结中,终于把调用的过程讲完了。其实思路还是比较清晰的,但是最好仔细看看整个过程的断点,可以学到很多东西。说说后续的安排吧:SPIdubbo中的AOP机制服务治理....等等几个模块,最后给大家展示一个RPC框架。这是同一句话。想学习dubbo的可以继续关注本系列。