当前位置: 首页 > 科技观察

揭开Raft的神秘面纱,ApacheRatis了解Raft组件的使用

时间:2023-03-16 16:24:40 科技观察

相对于Paxos,Raft一向以简单易懂着称。今天从一个用了一年Raft的人的角度,看一下别人基于Raft论文实现后,我们一般是怎么用的。俗话说,梨子的滋味,要亲口尝一尝。没吃过猪肉,就得看猪跑。不然不管别人怎么形容,你都可能觉得它长得像猫狗一样毛茸茸的。从Raft官网长长的列表中可以发现,目前实现Raft的框架有很多。在Java方面,我大概看了Ant的SOFARaft和Apache的Ratis。本次我们以Ratis为例揭开面纱,看看如何使用。当然,下面提到的例子也是这些组件自带的例子。1.编译g??ithub下载Ratis直接运行mvncleanpackage即可。如果在编译过程中出现错误,可以先cleaninstallratis-proto2.ExamplesRatis自带了三个example:arithmeticcounterfilestore在ratis-examples模块中,对于算术和filestore比较方便,可以快速启动Server和Client通过main/bin目录下的shell脚本进行测试。对于Raft,我们都知道需要多个实例组成一个集群来测试。开个instance也没用,连master的选型都是个问题。Bin目录下的start-all支持example的名字和对应的命令。例如,文件存储服务器代表启动文件存储应用程序的服务器。对应的命令参数会在对应的例子中的cli中解析出来。同时,同时启动三台服务器组成集群,并在一个周期内完成选举。对于counter这个例子,并没有对应的快速启动三台服务器的脚本,我们可以通过命令行启动,也可以在IDE中以参数的形式启动。3.分析下面举个例子来看看RaftServer是如何工作的。对于计数器的例子,我们在启动的时候,需要传入一个参数,这个参数代表当前服务器的编号。目的是知道使用哪个IP+端口从对等列表中启动它。这里我们可以发现,这个peers列表是在代码中提前设置好的。当然,如果你讲的是动态配置,那是没有问题的。另外两个例子是通过shell脚本中common的配置传入的。所以在第一步中,我们可以看到RaftServer在启动的时候,会通过“配置”的形式知道peer的存在,从而可以相互通信,让别人为自己投票或者为别人投票,并完成任期选举。另外,Leader传递过来的Log可以在本地接收和应用。第二步,我们看看Client是如何与集群通信的。整个Raft集群中可能存在多个实例,我们知道写操作必须通过Leader来完成。那你怎么知道谁是Leader呢?有什么办法吗?常见的思路是:在写之前先去集群里看看谁是Leader,然后随便写一个写。如果失败,请更换另一个并继续尝试。总会有成功的。当然,如果你这样尝试,第二种方法的效率不是很高。因此,在这次随机试验之后,集群会将当前的Leader信息返回给Client,然后Client就可以直接通过这个建立的连接进行通信了。在Ratis中,当客户端调用非领导节点时,会收到服务器抛出的异常。异常将包含称为suggestLeader的信息,它指示当前正确的领导者。只需单击此链接即可连接。当然,如果这个过程中Leader发生了变化,就会返回一个新的suggestLeader重新尝试。让我们看看这个例子中Counter的实现。Server和Client共享的Common代码包括peers的声明publicfinalclassCounterCommon{publicstaticfinalListPEERS=newArrayList<>(3);static{PEERS.add(newRaftPeer(RaftPeerId.getRaftPeerId("n1"),"127.0.0.1:6000"));PEERS.add(newRaftPeer(RaftPeerId.getRaftPeerId("n2"),"127.0.0.1:6001"));PEERS.add(newRaftPeer(RaftPeerId.getRaftPeerId("n3"),"127.0.0.1:6002"));}这里声明了三个节点。从命令行启动时,会直接传入index,index的值为1-3。java-cp*.jarorg.apache.ratis.examples.counter.server.CounterServer{serverIndex}并在Server启动时获取相应的配置信息。//根据应用参数RaftPeercurrentPeer=CounterCommon.PEERS.get(Integer.parseInt(args[0])-1);//找到当前对等对象然后设置存储目录//setthestoragedirectory(differentforeachpeer)inRaftPropertyobjectFileraftStorageDir=newFile("./"+currentPeertoString(Id().));RaftServerConfigKeys.setStorageDir(properties,Collections.singletonList(raftStorageDir))关注这个,每个Server都会有一个状态机“CounterStateMachine”,通常我们的“业务逻辑”都放在这里Log的形式,每个节点的Log都会在自己的状态机中执行,从而保证每个节点的状态。一致。最后根据这些配置生成并启动一个RaftServer实例。//createandstarttheRaftserverRaftServerserver=RaftServer.newBuilder().setGroup(CounterCommon.RAFT_GROUP).setProperties(properties).setServerId(currentPeer.getId()).setStateMachine(counterStateMachine).build();server.start();CounterStateMachine,应用这个计数的一小段代码,我们先检查命令是否合法,然后执行命令"INCREMENT")){returnCompletableFuture.completedFuture(Message.valueOf("InvalidCommand"));}//updatethelastappliedtermandindexfinallongindex=entry.getIndex();updateLastAppliedTermIndex(entry.getTerm(),index);//命令的实际执行:incrementthecounterGetter(incrementthecounterGetter);//返回计数器的新值给客户端trx.getServerRole()==RaftProtos.RaftPeerRole.LEADER){LOG.info("{}:Incrementto{}",index,counter.toString());}我们看Client的实现和Server类似,通过配置属性创建实例privatestaticRaftClientbuildClient(){RaftPropertiesraftProperties=newRaftProperties();RaftClient.Builderbuilder=RaftClient.newBuilder().setProperties(raftProperties).setRaftGroup(CounterCommon.RAFT_GROUP).setClientRpc(newGrpcFactory(newParameters()).newRaftClientRpc(ClientId.randomId(),raftPropertiens)build);returnbuild();}然后你可以向服务器发送命令开始工作。raftClient.send(Message.valueOf("INCREMENT"));Counter的状态机支持INCREMENT和GET命令。所以例子最后执行了一条GET命令,得到了最终的计数结果RaftClientReplycount=raftClient.sendReadOnly(Message.valueOf("GET"));4、在RaftClientImpl的内部实现中,一开始会从peers列表中选出一个作为leader。问。RaftClientImpl(ClientIdclientId,RaftGroupgroup,RaftPeerIdleaderId,RaftClientRpcclientRpc,RaftPropertiesproperties,RetryPolicyretryPolicy){this.clientId=clientId;this.clientRpc=clientRpc;this.peers=newConcurrentLinkedQueue<>(group).getGroup(Idgers).getPeers=this.leaderIdleaderId!=null?leaderId:!peers.isEmpty()?peers.iterator().next().getId():null;...}之后,处理。privateRaftClientReplysendRequest(RaftClientRequestrequest)throwsIOException{RaftClientReplyreply;try{reply=clientRpc.sendRequest(request);}catch(GroupMismatchExceptiongme){throwgme;}catch(IOExceptionioe){handleIOException(request,ioe);}reply=handleLeaderException(request,reply,null));reply=handleRaftException(reply,Function.identity());returnreply;}比如handleLeaderException中,有几种情况,因为通过Client与Server通信时,会随机从peer中选出一个作为leader去请求,如果服务端返回异常,说自己不是leader,用下面的代码从其他peer中随机选一个然后请求。finalRaftPeerIdoldLeader=request.getServerId();finalRaftPeerIdcurLeader=leaderId;finalbooleanstillLeader=oldLeader.equals(curLeader);if(newLeader==null&&stillLeader){newLeader=CollectionUtils.random(oldLeader,CollectionUtils.as(get,RaftId)staticTrandom(finalTgiven,Iterable迭代){Objects.requireNonNull(给定,"given==null");Objects.requireNonNull(迭代,"迭代==null");finalListlist=StreamSupport.stream(iteration.spliterator(),false).filter(e->!given.equals(e)).collect(Collectors.toList());finalintsize=list.size();returnsize==0?null:list.get(ThreadLocalRandom.current().nextInt(size));}是不是感觉很低效,如果这个时候server返回的信息告诉client谁是leader,那么client直接connect就可以了,正确的?/***@returnnullifthereplyisnullorithas*{@linkNotLeaderException}或{@linkLeaderNotReadyException}*otherwisereturnthesamereply.*/RaftClientReplyhandleLeaderException(RaftClientRequestrequest,RaftClientReplyreply,Consumerhandler){if(reply==null||reply.getException()NotyReaderException){Leaderreturnnull;}finalNotLeaderExceptionnle=reply.getNotLeaderException();if(nle==null){returnreply;}returnhandleNotLeaderException(request,nle,handler);}RaftClientReplyhandleNotLeaderException(RaftClientRequestrequest,NotLeaderExceptionnle,Consumer处理程序){refreshPeers(nle.getPeers)());finalRaftPeerIdnewLeader=nle.getSuggestedLeader()==null?null:nle.getSuggestedLeader().getId();handleIOException(request,nle,newLeader,handler);returnnull;}我们会在异常信息中看到,如果能提取出一个suggestedLeader,则此时作为新的leaderId,下次直接连接。本文转载自微信公众号“Tomcat物语”,可关注下方二维码。转载此文请联系Tomcat那些东西公众号。