当前位置: 首页 > 科技观察

SpringBoot集成分布式消息平台Pulsar

时间:2023-03-13 18:06:56 科技观察

大家好,我是君哥,Pulsar作为一个优秀的消息流平台,用的越来越多。本文介绍Pulsar的Java客户端。Pulsar的部署Pulsar的部署方式主要有三种,本地安装二进制文件、docker部署、部署在Kubernetes上。本文使用docker部署单节点Pulsar集群。实验环境为2核CPU,4G内存。部署命令如下:dockerrun-it-p6650:6650-p8080:8080--mountsource=pulsardata,target=/pulsar/data--mountsource=pulsarconf,target=/pulsar/confapachepulsar/pulsar:2.9.1bin/pulsarstandalone的安装过程中可能会出现如下错误:unknownflag:--mount参见'dockerrun--help'。这是因为docker版本低,不支持挂载参数。将docker版本升级到17.06或以上即可。部署过程可能会因为网络原因失败,多试几次就会成功。如果看到如下日志,说明启动成功。2022-01-08T22:27:58,726+0000[main]INFOorg.apache.pulsar.broker.PulsarService-messagingserviceisready,bootstrapserviceport=8080,brokerurl=pulsar://localhost:6650,cluster=standalone本地单节点集群启动后,会创建一个命名空间命名为public/defaultPulsarclient目前Pulsar支持多种语言的客户端,包括:Java客户端Go客户端Python客户端C++客户端Node.js客户端WebSocket客户端C#客户端SpringBoot配置使用SpringBoot集成了Pulsar客户端,先介绍一下Pulsar客户端依赖,代码如下:org.apache.pulsarpulsar-client2.9.1然后添加在properties文件中配置:#Pulsar地址pulsar.url=pulsar://192.168.59.155:6650#topicpulsar.topic=testTopic#consumergrouppulsar.subscription=topicGroup创建客户端很简单,代码如下:client=PulsarClient.builder().serviceUrl(url).build();上面的url就是在properties文件中定义的pulsar.url。在创建Client时,即使集群启动失败,程序也不会报错,因为此时集群还没有真正连接上。创建生产者producer=client.newProducer().topic(topic).compressionType(CompressionType.LZ4).sendTimeout(0,TimeUnit.SECONDS).enableBatching(true).batchingMaxPublishDelay(10,TimeUnit.MILLISECONDS).batchingMaxMessages(1000)。maxPendingMessages(1000).blockIfQueueFull(true).roundRobinRouterBatchingPartitionSwitchFrequency(10).batcherBuilder(BatcherBuilder.DEFAULT).create();创建Producer实际上会连接到集群。如果集群有问题,会报连接错误。下面解释创建Producer的参数:topic:Producer要写的topic。compressionType:压缩策略,目前支持4种策略(NONE、LZ4、ZLIB、ZSTD)。从Pulsar2.3开始,只有Consumer版本在2.3以上,该策略才会生效。sendTimeout:超时时间,如果Producer在超时时间内没有收到ACK,就会重新发送。enableBatching:是否开启消息的批处理,这里默认为true,该参数只能在异步发送(sendAsync)时生效,选择同步发送会失败。batchingMaxPublishDelay:批量发送消息的时间周期,这里定义为10ms。需要注意的是,如果设置了分批时间,则不受消息条数的影响。批量发送会将要发送的批量消息放在一个网络包中发送,减少了网络IO次数,大大提高了网卡的发送效率。batchingMaxMessages:批量发送的最大消息数。maxPendingMessages:等待从broker接收ACK的消息队列的最大长度。如果队列已满,则生产者的所有sendAsync和发送都会失败,除非blockIfQueueFull设置为true。blockIfQueueFull:当Producer发送消息时,会先将消息放入本地的Queue缓存中。如果缓存已满,消息将被阻塞。roundRobinRouterBatchingPartition-SwitchFrequency:发送消息时如果不指定key,则默认以roundrobin方式发送消息。使用roundrobin方式,分区切换周期为(frequency*batchingMaxPublishDelay)。创建ConsumerPulsar的消费模型如下图所示:从图中可以看出,Consumer需要绑定一个订阅进行消费。consumer=client.newConsumer().topic(topic).subscriptionName(subscription).subscriptionType(SubscriptionType.Shared).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).negativeAckRedeliveryDelay(60,TimeUnit.SECONDS).receiverQueueSize(1000).subscribe();下面解释创建Consumer的参数:topic:Consumer想要订阅的主题。subscriptionName:与消费者关联的订阅的名称。subscriptionType:订阅类型,Pulsar支持四种订阅类型:Exclusive:独占模式,同一个Topic只能有一个消费者,如果有多个消费者会报错。Failover:容灾模式,同一个Topic可以有多个消费者,但只能有一个消费者消费,其他消费者作为故障转移的备份。如果当前消费者失败,将选择其中一个备份消费者执行。消耗。如下图所示:Shared:共享模式,同一个Topic可以被多个消费者订阅消费。消息通过轮询机制分发给不同的消费者,每条消息只会分发给一个消费者。当消费者断??开连接时,如果发送给它的消息尚未被消费,则这些消息将重新分发给其他幸存的消费者。如下图所示:Key_Shared:消息和消费者都会绑定一个key,消息只会发送给绑定了同一个key的消费者。如果新的消费者建立连接或消费者断开连接,则需要更新一些消息密钥。与Shared模式相比,Key_Shared的优势在于既可以让消费者并发消费消息,又可以保证同一个Key下消息的顺序。如下图所示:subscriptionInitialPosition:创建新订阅时从哪里开始消费,有两个选项Latest:从最新的消息开始消费Earliest:从最早的消息开始消费negativeAckRedeliveryDelay:broker需要多长时间消费失败后重新发送。receiverQueueSize:在调用receive方法之前可以累积多少条消息。可以设置为0,以便一次只从代理拉取一条消息。在Shared模式下,receiverQueueSize设置为0,可以防止批量消息发送给一个Consumer,导致其他Consumer空闲。Consumer接收消息有四种方式:同步单、同步批、异步单、异步批,代码如下:.batchReceive();CompletableFuturemessage=consumer.batchReceiveAsync();对于批量接收,也可以设置一个批量接收策略,代码如下:consumer=client.newConsumer().topic(topic).subscriptionName(subscription).batchReceivePolicy(BatchReceivePolicy.builder().maxNumMessages(100).maxNumBytes(1024*1024).timeout(200,TimeUnit.MILLISECONDS).build()).subscribe();代码中的参数解释如下:maxNumMessages:批量接收的最大消息数。maxNumBytes:批量接收消息的大小,这里是1MB。测试首先编写Producer发送消息的代码,如下:.sendAsync();未来。handle((v,ex)->{if(ex==null){logger.info("发送消息成功,key:{},msg:{}",key,data);}else{logger.error("发送消息失败,key:{},msg:{}",key,data);}returnnull;});future.join();logger.info("发送消息完成,key:{},msg:{}",key,data);}然后写一段Consumer消费消息的代码,如下:publicvoidstart()throwsException{while(true){Messagemessage=consumer.receive();Stringkey=message.getKey();Stringdata=newString(message.getData());Stringtopic=message.getTopicName();if(StringUtils.isNotEmpty(data)){try{logger.info("收到消息,topic:{},key:{},data:{}",topic,key,data);}catch(Exceptione){logger.error("接收消息异常,topic:{},key:{},data:{}",topic,key,data,e);}}consumer.acknowledge(message);}}最后写一个Controller类,调用Producer发送消息。代码如下:@RequestMapping("/send")@ResponseBodypublicStringsend(@RequestParamStringkey,@RequestParamStringdata){logger.info("收到消息发送请求,key:{},value:{}",key,data);pulsarProducer.sendMsg(key,data);返回“成功”;}调用Producer发送消息,key=key1,data=data1,具体操作是在浏览器中输入如下url回车:http://192.168.157.1:8083/pulsar/send?key=key1&data=data1可以看到如下控制台输出日志:2022-01-0822:42:33,199[pulsar-client-io-6-1][INFO]boot.pulsar.PulsarProducer-sendmessagesuccessful,key:key1,msg:data12022-01-0822:42:33,200[http-nio-8083-exec-1][INFO]boot.pulsar.PulsarProducer-发送消息完成,key:key1,msg:data12022-01-0822:42:33,232[Thread-22][INFO]boot.pulsar.PulsarConsumer-receivedmessage,topic:persistent://public/default/testTopic,key:key1,data:data12022-01-0822:43:14,498[pulsar-timer-5-1][信息]org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl-[testTopic][topicGroup][7def6]Prefetchedmessages:0---Consumethputreceived:0.02msgs/s---0.00Mbit/s---Acksentrate:0.02ack/s---Failedmessages:0---batchmessages:0---Failedacks:02022-01-0822:43:14,961[pulsar-timer-9-1][INFO]org.apache.pulsar.client.impl.ProducerStatsRecorderImpl-[testTopic][standalone-9-0]Pendingmessages:0---Publishthroughput:0.02msg/s---0.00Mbit/s---Latency:med:69.000ms-95pct:69.000ms-99pct:69.000ms-99.9pct:69.000ms-max:69.000ms---Ackreceivedrate:0.02ack/s---Failedmessages:0从日志中可以看出,这里使用的命名空间是集群创建时生成的public/default。总结从SpringBoot对Java客户端的集成来看,Pulsar的API是非常友好易用的。Consumer的使用需要多考虑,需要考虑批处理、异步和订阅类型。