当前位置: 首页 > 后端技术 > Java

RocketMQ学习三种消息发送方式

时间:2023-04-01 19:38:10 Java

RocketMQ支持同步、异步、Oneway三种消息发送方式。同步:客户端发送消息后,会同步等待服务器的响应。异步:客户端发起消息请求后,不等待服务端响应,而是立即返回,这样客户端子线程就不会被阻塞。当客户端收到服务器(Broker)的响应后,会自动调用回调函数。Oneway:客户端发起消息发送请求后,不会等待服务端的响应结果,也不会调用回调函数,即不关心消息最终的发送结果。这里我们主要关注异步和同步。异步消息每个消息发送者实例(DefaultMQProducer)都会创建一个异步消息发送线程池。默认的线程数是CPU核心数。线程池持有一个默认长度为5W的有界队列,并控制异步调用。最大并发数,默认为65536,可以通过参数clientAsyncSemaphoreValue进行配置。客户端使线程池将消息发送到服务器。服务端处理完成后返回该结构体,并根据是否发生异常调用SendCallback回调函数。以上就是发送异步消息的过程。接下来,我们将分析源代码。publicvoidstart()throwsMQClientException{this.defaultMQProducerImpl.start();if(null!=traceDispatcher){try{traceDispatcher.start(this.getNamesrvAddr());}catch(MQClientExceptione){log.warn("tracedispatcherstartfailed",e);}}}这是Producer服务的启动入口。然后查看DefaultMQProducerImpl类:publicvoidstart(finalbooleanstartFactory)throwsMQClientException{...if(startFactory){//启动MQClientInstancemQClientFactory.start();}...}publicvoidstart(finalbooleanstartFactory)throwsMQClientException{...this.mQClientFactory=MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer,rpcHook);...}在getAndCreateMQClientInstance方法中会创建一个MQClientInstance实例,然后在MQClientInstance创建过程中会创建一个DefaultMQProducerImpl对象。创建异步消息发送线程池。this.asyncSenderThreadPoolQueue=newLinkedBlockingQueue(50000);this.defaultAsyncSenderExecutor=newThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),Runtime.getRuntime().availableProcessors(),1000*60,TimeUnit.MILLISECONDS,this.asyncSenderThreadPoolQueue,newThreadFactory(){privateAtomicIntegerthreadIndex=newAtomicInteger(0);@OverridepublicThreadnewThread(Runnabler){returnnewThread(r,"AsyncSenderExecutor_"+this.threadIndex.incrementAndGet());}});下面看下面异常回调的地方,摸及MQClientAPIImpl#sendMessageAsync方法:this.remotingClient.invokeAsync(addr,request,timeoutMillis,newInvokeCallback(){@OverridepublicvoidoperationComplete(ResponseFutureresponseFuture){RemotingCommandresponse=responseFuture.getResponseCommand();...if(response!=null){try{SendResultsendResult=MQClientAPIImpl.this.processSendResponse(brokerName,msg,response);断言sendResult!=null;if(context!=null){context.setSendResult(sendResult);context.getProducer().executeSendMessageHookAfter(context);}尝试{sendCallback.onSuccess(sendResult);}catch(Throwablee){}...}}});其中sendCallback.onSuccess(sendResult)是代理处理完请求后的回调。上面说的限制65535并发是通过NettyRemotingAbstract#invokeAsyncImpl()中设置的Semaphore来实现的。它默认为65535,可以通过clientAsyncSemaphoreValue进行调整。异步发送还能保证消息的顺序吗?如果做到了以下几点,就可以保证顺序:需要单线程异步传输。发件人需要记录每条消息的序号。假设有1、2、3、4、5的5条消息,如果在3发送失败,则需要从3重新发送,即4、5会发送两次。消费者需要处理重复的消息。比如第2步,如果你没有收到消息3,但是收到了后续的消息4,54和5也不能处理,只能在收到3之后往回处理。参考:消息生产的实现过程(答)同步发送因为RocketMQ使用Netty进行IO读写,而Netty是多主从Ractor模型,所以同步调用其实是异步的,只是RocketMQ用了个小技巧,将异步转化为Synchronize。我们看代码:try{finalResponseFuseopaque,timeoutMillis,null,null);this.responseTable.put(不透明,responseFuture);finalSocketAddress地址=channel.remoteAddress();//响应后回调,这是异步的channel.writeAndFlush(request).addListener(newChannelFutureListener(){//相关点1@OverridepublicvoidoperationComplete(ChannelFuturef)throwsException{if(f.isSuccess()){responseFuture.setSendRequestOK(true);返回;}else{responseFuture.setSendRequestOK(错误的);}responseTable.remove(不透明);responseFuture.setCause(f.cause());responseFuture.putResponse(null);//相关点2log.warn("发送请求命令到通道<"+addr+">失败。");}});RemotingCommandresponseCommand=responseFuture.waitResponse(timeoutMillis);//相关点3if(null==responseCommand){if(responseFuture.isSendRequestOK()){thrownewRemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr),timeoutMillis,responseFuture.getCause());}else{thrownewRemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr),responseFuture.getCause());}}返回响应命令;}最后{this.responseTable.remove(不透明);}}'相关点1'中的回调是broker处理完后,用另一个线程做回调让主线程等待。‘相关点2’外面的代码是RemotingCommandresponseCommand=responseFuture.waitResponse(timeoutMillis)里面用到了countDownLatch这个工具,countDown是在哪里进行的呢?答案是回调中“关联点2”处的responseFuture.putResponse(null)。这就是异步转同步的方法。Oneway方法Oneway方法通常用于发送一些不重要的消息,例如操作日志。偶发的消息丢失对业务没有影响,这里就不多说了。小结本文主要讲了RocketMQ的三种消息发送方式,着重介绍了异步发送逻辑和同步方法中如何将异步转为同步。根据我在网上查到的一些资料,使用异步发送的方法并不多。如果想提高消息发送效率,一般可以从刷盘策略和复制策略入手进行优化。使用同步发送的方式基本是可以的。满足需求,当然一切都要从实际的业务场景出发。最后要提到的是失败重试。三种发送方式中,如果SendStatus不为SEND_OK,只有同步方式会重试。也就是说在补偿机制和容错机制上,如果是异步或者Oneway,我们在使用的时候也是要考虑的问题。