必要性前面实现了一个通用的rpc,但是有个问题就是在同步获取响应的时候没有超时处理。如果服务器挂了,或者处理速度太慢,客户端不能一直傻等。当外部调用超过指定时间时,会直接报错,避免无意义的资源消耗。调用idea时,保留开始时间。查看获取时是否超时。同时创建线程检测是否有超时请求。在实施ideacall的时候,开始时间是预留的。查看获取时是否超时。同时创建线程检测是否有超时请求。超时检测线程为了不影响正常业务的性能,我们再设置一个线程来检测调用是否超时。packagecom.github.houbb.rpc.client.invoke.impl;importcom.github.houbb.heaven.util.common.ArgUtil;importcom.github.houbb.rpc.common.rpc.domain.RpcResponse;importcom.github.houbb。rpc.common.rpc.domain.impl.RpcResponseFactory;importcom.github.houbb.rpc.common.support.time.impl.Times;importjava.util.Map;importjava.util.concurrent.ConcurrentHashMap;/***超时检测线路*@authorbinbin.hou*@since0.0.7*/publicclassTimeoutCheckThreadimplementsRunnable{/***请请求信息*@since0.0.7*/privatefinalConcurrentHashMaprequestMap;/***请请求信息*@since0.0.7*/privatefinalConcurrentHashMapresponseMap;/***新建*@paramrequestMap请求Map*@paramresponseMap结果map*@since0.0.7*/publicTimeoutCheckThread(ConcurrentHashMaprequestMap,ConcurrentHashMapresponseMap){ArgUtil.notNull(requestMap,"requestMap");this.requestMap=requestMap;this.responseMap=responseMap;}@Overridepublicvoidrun(){for(Map.Entryentry:requestMap.entrySet()){longexpireTime=entry.getValue();longcurrentTime=Times.time();if(currentTime>expireTime){finalStringkey=entry.getKey();//结果设置超时,从请求中移除responseMap.putIfAbsent(key,RpcResponseFactory.timeout());requestMap.remove(key);}}}}这里主要存储请求和响应时间。如果超时,相应的请求线程被移除,并在初始化时在DefaultInvokeService启动:finalRunnabletimeoutThread=newTimeoutCheckThread(requestMap,responseMap);Executors.newScheduledThreadPool(1).scheduleAtFixedRate(timeoutThread,60,60,TimeUnit.SECONDS);DefaultInvokeService原来的设置结果没有考虑时间,所以这里加上相应的判断。设置请求时间?添加请求addRequest会将过期时间直接放入map。因为put是一次性操作,query可能是多次。所以时间是在放进去的时候计算出来的+timeoutMills;requestMap.putIfAbsent(seqId,expireTime);returnthis;}设置请求结果?添加响应addResponse1。如果请求信息在requestMap中不存在,则意味着可能会超时,直接忽略存储的结果。2、此时检查是否有超时,超时时直接返回超时信息。3.放入信息后,通知所有其他等待进程。@OverridepublicInvokeServiceaddResponse(StringseqId,RpcResponserpcResponse){//1。判断是否有效LongexpireTime=this.requestMap.get(seqId);//如果为空,可能是结果超时了。被scheduledjob移除后,response结果会过来。直接忽略if(ObjectUtil.isNull(expireTime)){returnthis;}//2。判断是否超时if(Times.time()>expireTime){LOG.info("[Client]seqId:{}信息已经超时,直接返回超时结果。",seqId);rpcResponse=R??pcResponseFactory.timeout();}//放这里之前可以加上判断。//如果seqId必须在request集合中处理,则允许放置。或者干脆忽略并丢弃。//通知所有等待方responseMap.putIfAbsent(seqId,rpcResponse);LOG.info([Client]获取结果信息,seqId:{},rpcResponse:{}",seqId,rpcResponse);LOG.info("[Client]]seqId:{}信息已经放入,通知所有等待方",seqId);//移除对应的requestMapPrequestMap.remove(seqId);LOG.info("[Client]seqId:{}removefromrequestmap",序号);synchronized(this){this.notifyAll();}returnthis;}获取请求结果?获取对应的getResponse1。如果结果存在,直接返回响应result2。否则进入等待。3.等待后得到结果。@OverridepublicRpcResponsegetResponse(StringseqId){try{RpcResponserpcResponse=this.responseMap.get(seqId);if(ObjectUtil.isNotNull(rpcResponse)){LOG.info("[Client]得到seq{}对应的结果:{}",seqId,rpcResponse);returnrpcResponse;}//进入waitingwhile(rpcResponse==null){LOG.info([Client]seq{}对应结果为空,进入waiting",seqId);//同步等待锁synchronized(this){this.wait();}rpcResponse=this.responseMap.get(seqId);LOG.info([Client]seq{}对应结果已获取:{}",seqId,rpcResponse);}returnrpcResponse;}catch(InterruptedExceptione){thrownewRpcRuntimeException(e);}}可以发现获取部分的逻辑没有变化,因为timeout会返回一个超时对象:RpcResponseFactory.timeout();这是一个非常简单的实现,如下:packagecom.github.houbb。rpc.common.rpc.domain.impl;importcom.github.houbb.rpc.common.exception.RpcTimeoutException;importcom.github.houbb.rpc.common.rpc.domain.RpcResponse;/***响应工厂类*@authorbinbin.hou*@since0.0.7*/publicfinalclassRpcResponseFactory{privateRpcResponseFactory(){}/***超时异常信息*@since0.0.7*/privatestaticfinalDefaultRpcResponseTIMEOUT;static{TIMEOUT=newDefaultRpcResponse();TIMEOUT.error(newRpcTimeoutException());}/***获取超时响应结果*@return响应结果*@since0.0.7*/publicstaticRpcResponsetimeout(){returnTIMEOUT;}}响应结果指定超时异常,代理处理结果时会抛出该异常:返回rpc响应。结果();测试代码server我们特意在server的实现中加入了sleep,其他保持不变publicclassCalculatorServiceImplimplementsCalculatorService{publicCalculateResponsesum(CalculateRequestrequest){intsum=request.getOne()+request.getTwo();//故意休眠3stry{TimeUnit.SECONDS.sleep(3);}catch(InterruptedExceptione){e.printStackTrace();}returnnewCalculateResp(true,sum);}}客户端设置对应的超时时间为1S,其他不变:publicstaticvoidmain(String[]args){//服务配置信息ReferenceConfigconfig=newDefaultReferenceConfig();配置。serviceId(ServiceIdConst.CALC);config.serviceInterface(CalculatorService.class);config.addresses("localhost:9527");//设置超时为1Sconfig.timeout(1000);CalculatorServicecalculatorService=config.reference();CalculateRequestrequest=newCalculateRequest();request.setOne(10);request.setTwo(20);CalculateResponseresponse=calculatorService.sum(request);System.out.println(response);}日志如下:.log.integration.adaptors.stdout.StdOutExImpl'适配器。[INFO][2021-10-0514:59:40.974][main][c.g.h.r.c.c.RpcClient.connect]-RPC服务启动客户端...[INFO][2021-10-0514:59:42.504][main][c.g.h.r.c.c.RpcClient.connect]-RPC服务启动客户端,监听地址localhost:9527[INFO][2021-10-0514:59:42.533][main][c.g.h.r.c.p.ReferenceProxy.invoke]-[Client]startcallremotewithrequest:DefaultRpcRequest{seqId='62e126d9a0334399904509acf8dfe0bb',createTime=16334'=16334'=7,1825'calc',method'dcalc'sum',paramTypeNames=[com.github.houbb.rpc.server.facade.model.CalculateRequest],paramValues=[CalculateRequest{one=10,two=20}]}[INFO][2021-10-0514:59:42.534][main][c.g.h.r.c.i.i.DefaultInvokeService.addRequest]-[Client]startaddrequestforseqId:62e126d9a0334399904509acf8dfe0bb,timeoutMills:1000[INFO][2021-10-0514:59:42.535][main][c.g.h.r.c.p.ReferenceProxy.invoke]-[Client]startcallchannelid:00e04cfffe360988-000004bc-00000000-1178e1265e903c4c-7975626f...com.github.houbb.rpc.common.rpc线程“主”com.github.houbb.rpc.common.exception.RpcTimeoutException中的异常。域.impl。RpcResponseFit>(RpcResponseFactory.java:23)atcom.github.houbb.rpc.client.invoke.impl.DefaultInvokeService.addResponse(DefaultInvokeService.java:72)atcom.github.houbb.rpc.client.handler.RpcClientHandler.channelRead0(RpcClientHandler.java:43)atio.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)atio.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)atio.netty.channel.AbstractChannelHandlerContext.invokeChannelChannel4:Handler3Channel8(抽象)atio.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)atio.netty.handler.logging.LoggingHandler.channelRead(LoggingHandler.java:241)atio.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext2)java:36atio.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)atio.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)atio.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310)atio.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:284)atio.netty。通道.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)atio.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)atio.netty.channel.AbstractChannelHandlerContext.fireChannel.channelHandlerAt:fireChannel.channelHat:DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1359)atio.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)atio.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannel.channeljaHandlerContext8.8)DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:935)atio.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:138)atio.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)atio.netty.channel.nio。NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)atio.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)atio.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)atio.netty。util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)atio.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)atjava.lang.Thread.run(Thread.java:748)。。[INFO][2021-10-0514:59:45.617][NioeventloopGroup-2-1][C.G.G.H.R.C.I.I.DefaultInVokeservice.Addresponse]-[客户=com.github.houbb.rpc.common.exception.RpcTimeoutException,result=null}[INFO][2021-10-0514:59:45.617][nioEventLoopGroup-2-1][c.g.h.r.c.i.i.DefaultInvokeService.addResponse]-[客户端]seqId:62e126d9a0334399904509acf8dfe0bb信息已经放入,通知所有等待方[INFO][2021-10-0514:59:45.618][nioEventLoopGroup-2-1][c.g.h.r.c.i.i.DefaultInvokeService.addResponse]-[Client]seqId:62e126d9a0334399904509acf8dfe0bbremovefromrequestmap[INFO][2021-10-0514:59:45.618][nioEventLoopGroup-2-1][c.g.h.r.c.c.RpcClient.channelRead0]-[客户端]响应:DefaultRpcResponse{seqId='62e126d9a0334399904509acf8dfe0bb',错误=null,resumesumrespon=Cal=30}}[INFO][2021-10-0514:59:45.619][main][c.g.h.r.c.i.i.DefaultInvokeService.getResponse]-[Client]seq62e126d9a0334399904509acf8dfe0bb得到了对应的结果:DefaultRpcResponse{seqId='null',error=com.github.houbb.rpc.common.exception.RpcTimeoutException,result=null}...可以发现超时异常的缺点可以扩展为超时处理是双向的,比如服务器端也可以指定超时限制,避免资源浪费。