当前位置: 首页 > 后端技术 > Java

xxl-job的惊人设计,怎能不爱

时间:2023-04-01 18:28:53 Java

通信底层介绍xxl-job使用nettyhttp进行通信,虽然也支持mina、jetty、nettytcp等方式,但是代码固定为nettyhttp.整体通信流程我以调度器通知执行器执行任务为例。我画的活动图:活动图的惊人设计。现将这些设计的亮点总结如下:|使用动态代理方式隐藏通讯细节xxl-job定义了两个接口ExecutorBiz、AdminBiz,ExecutorBiz接口封装了心跳、暂停、触发执行等操作,AdminBiz封装了回调、注册、注销操作,没有通讯相关的处理在接口的实现类中。XxlRpcReferenceBean类的getObject()方法将生成一个将进行远程通信的代理类。|全异步处理执行器接收消息并反序列化。它不同步执行任务代码,而是将任务信息存储在LinkedBlockingQueue中。异步线程从这个队列中获取任务信息,然后执行。任务的处理结果并不是说处理完就同步返回,而是也放在回调线程的阻塞队列中,处理结果异步返回。这样处理的好处是减少了netty工作线程的处理时间,提高了吞吐量。|Wrapperforasynchronousprocessing将异步处理包装起来,代码看起来像是同步调用的。再看调度器,XxlJobTrigger类触发任务执行代码:publicstaticReturnTrunExecutor(TriggerParamtriggerParam,Stringaddress){ReturnTrunResult=null;尝试{ExecutorBizexecutorBiz=XxlJobScheduler.getExecutorBiz(地址);//这里做了很多异步处理,最后同步得到处理结果runResult=executorBiz.run(triggerParam);}catch(Exceptione){logger.error(">>>>>>>>>>xxl-job触发错误,请检查executor[{}]是否正在运行。",address,e);runResult=newReturnT(ReturnT.FAIL_CODE,ThrowableUtil.toString(e));}StringBufferrunResultSB=newStringBuffer(I18nUtil.getString("jobconf_trigger_run")+":");runResultSB.append("
地址:").append(地址);runResultSB.append("
code:").append(runResult.getCode());runResultSB.append("
msg:").append(runResult.getMsg());runResult.setMsg(runResultSB.toString());returnrunResult;}ExecutorBiz.run方法,正如我们所说的,是一个与执行者通信的动态代理。执行器的执行结果在返回之前也是异步处理的。这里看到的run方法是同步等待处理结果返回。看看xxl-job是如何同步获取处理结果的:调度器向执行器发送消息后,线程阻塞。执行器处理完成后,返回处理结果,唤醒阻塞线程,在调用处获取返回值。动态代理代码如下://在代理类中触发调用try{//调用客户端。asyncSend(finalAddress,xxlRpcRequest);//未来得到XxlRpcResponsexxlRpcResponse=futureResponse.get(timeout,TimeUnit.MILLISECONDS);如果(xxlRpcResponse.getErrorMsg()!=null){thrownewXxlRpcException(xxlRpcException(xxlRpcException;M}RgResponse)(xxlRpcResponse.getResult();}catch(Exceptione){logger.info(">>>>>>>>>>>>xxl-rpc,invokeerror,address:{},XxlRpcRequest{}",finalAddress,xxlRpcRequest);throw(einstanceofXxlRpcException)?e:newXxlRpcException(e);}finally{//未来响应删除futureResponse.removeInvokerFuture();}}XxlRpcFutureResponse类实现线程等待和线程唤醒处理://返回结果并唤醒线程publicvoidsetResponse(XxlRpcResponse响应){this.response=响应;同步(锁定){完成=真;lock.notifyAll();}}@OverridepublicXxlRpcResponseget(longtimeout,TimeUnitunit)throwsInterruptedException,ExecutionException,TimeoutException{if(!done){synchronized(lock){try{if(timeout<0){//线程阻塞lock.wait();}else{longtimeoutMillis=(TimeUnit.MILLISECONDS==unit)?timeout:TimeUnit.MILLISECONDS.convert(timeout,unit);lock.wait(timeoutMillis);}}catch(InterruptedExceptione){抛出e;}}}if(!done){thrownewXxlRpcException("xxl-rpc,请求超时时间:"+System.currentTimeMillis()+",request:"+request.toString());}返回响应感觉;}有同学可能会问,调度器是如何确定收到返回结果后唤醒哪个线程的呢?每次远程调用都会生成一个uuidrequestid,这个uuidrequestid在整个调用过程中传递,就像一把钥匙,回家的时候拿着它用requestid打开这里的门,用钥匙就可以找到对应的XxlRpcFutureResponse,然后调用setResponse方法,设置返回值,唤醒线程。publicvoidnotifyInvokerFuture(StringrequestId,finalXxlRpcResponsexxlRpcResponse){//通过requestId找到XxlRpcFutureResponse,finalXxlRpcFutureResponsefutureResponse=futureResponsePool.get(requestId);如果(futureResponse==null){返回;}if(futureResponse.getInvokeCallback()!=null){//回调类型).onFailure(newXxlRpcException(xxlRpcResponse.getErrorMsg()));}else{futureResponse.getInvokeCallback().onSuccess(xxlRpcResponse.getResult());}}});}catch(Exceptione){logger.error(e.getMessage(),e);}}else{//里面调用锁的通知方法futureResponse.setResponse(xxlRpcResponse);}//删除futureResponsePool.remove(requestId);}