当前位置: 首页 > 后端技术 > Java

ApachePulsar简单讲解(二):Pulsar消息机制

时间:2023-04-01 17:13:19 Java

消息机制Pulsar采用发布-订阅(pub-sub)设计模式。在这种设计模式中,生产者向主题发布消息,消费者订阅主题,处理发布的消息,处理完成后发送确认。创建订阅后,即使消费者断开连接,Pulsar仍然可以保存所有消息。在消费者确认消息已被成功处理之前,消息不会被删除。主题(Topic)从逻辑上讲,一个Topic就是一个日志结构,每条消息在这个日志结构中都有一个偏移量。ApachePulsar使用游标来跟踪偏移量(CursorTracking)。Pulsar支持两种基本主题类型:持久主题和非持久主题。{持久|非持久}://tenant/namespace/topicNon-Partitionedtopics$$PULSAR_HOME/bin/pulsar-admintopics\listpublic/default$$PULSAR_HOME/bin/pulsar-admintopics\createpersistent://public/default/input-seed-avro-topic$$PULSAR_HOME/bin/pulsar-admintopics\lookuppersistent://public/default/input-seed-avro-topic$$PULSAR_HOME/bin/pulsar-admintopics\deletepersistent//public/default/input-seed-avro-topic$$PULSAR_HOME/bin/pulsar-admintopics\statspersistent://public/default/input-seed-avro-topic$curlhttp://server-101:8080/admin/v2/persistent/public/default/exclamation-input/stats|python-mjson.toolPartitionedtopics$$PULSAR_HOME/bin/pulsar-admintopics\create-partitioned-topicpersistent://public/default/output-seed-avro-topic\--partitions2$$PULSAR_HOME/bin/pulsar目录-admin主题\list-partitioned-topicspublic/default$$PULSAR_HOME/bin/pulsar-admintopics\get-partitioned-topic-metadatapersistent://public/default/output-seed-avro-topic$$PULSAR_HOME/bin/pulsar-admintopics\delete-partitioned-topicpersistent://public/default/output-seed-avro-topic消息(Message)消息是Pulsar的基本“单元”。publicinterfaceMessage{MapgetProperties();布尔hasProperty(Stringvar1);字符串getProperty(Stringvar1);字节[]getData();TgetValue();MessageIdgetMessageId();长getPublishTime();长getEventTime();长getSequenceId();字符串getProducerName();布尔hasKey();字符串获取密钥();布尔hasBase64EncodedKey();byte[]getKeyBytes();布尔hasOrderingKey();byte[]getOrderingKey();字符串getTopicName();可选的getEncryptionCtx();intgetRedeliveryCount();byte[]getSchemaVersion();布尔isReplicated();StringgetReplicatedFrom();}生产者(Producer)publicvoidsend()throwsPulsarClientException{finalStringserviceUrl="pulsar://server-100:6650";//finalStringserviceUrl="pulsar://server-101:6650,server-102:6650,server-103:6650";//http://pulsar.apache.org/docs/en/client-libraries-java/#clientfinalPulsarClientclient=PulsarClient.builder().serviceUrl(serviceUrl).connectionTimeout(10000,TimeUnit.MILLISECONDS).build();finalStringtopic="persistent://public/default/topic-sensor-temp";//http://pulsar.apache.org/docs/en/client-libraries-java/#configure-producerfinalProducerproducer=client.newProducer().producerName("sensor-temp").topic(topic).compressionType(CompressionType.LZ4).enableChunking(true).enableBatching(true).batchingMaxBytes(1024).batchingMaxMessages(10).batchingMaxPublishDelay(10,TimeUnit.MILLISECONDS).blockIfQueueFull(真).maxPendingMessages(512).sendTimeout(1,TimeUnit.SECONDS).create();MessageIdmid=producer.send("sensor-temp".getBytes());System.out.printf("\nID为%s的消息发送成功",mid);mid=producer.newMessage().key("sensor-temp-key").value("sensor-temp-key".getBytes()).property("my-key","my-value").property(“我的其他键”,“我的其他值”).send();System.out.printf("ID为%s的消息密钥发送成功",mid);生产者.close();client.close();}消费者(Consumer)publicvoidconsume()throwsPulsarClientException{finalStringserviceUrl="pulsar://server-101:6650";finalStringtopic="input-seed-avro-topic";最终PulsarClient客户端=PulsarClient.builder().serviceUrl(serviceUrl).enableTcpNoDelay(真).build();finalConsumerconsumer=client.newConsumer().consumerName("seed-avro-consumer").subscriptionName("seed-avro-subscription").subscriptionType(SubscriptionType.Exclusive).subscriptionInitialPosition(订阅初始位置.Earliest).topic(主题).receiverQueueSize(10).subscribe();finalAvroSchema架构=AvroSchema.of(SeedEvent.class);while(true){try{finalMessagemsg=consumer.receive();LOG.info("接收消息:[{}]topic:{}mid:{}sid:{}event:{}publish:{}producer:{}key:{}value:{}",Thread.currentThread().getId(),msg.getTopicName(),msg.getMessageId(),msg.getSequenceId(),msg.getEventTime(),msg.getPublishTime(),msg.getProducerName(),msg.getKey(),schema.decode(msg.getValue()));尝试{consumer.acknowledge(msg);}catch(finalPulsarClientExceptione){consumer.negativeAcknowledge(msg);LOG.error("确认:"+e.getLocalizedMessage(),e);}}catch(finalPulsarClientExceptione){LOG.error("receive:"+e.getLocalizedMessage(),e);}}}订阅(Subscriptions)消费者通过订阅来消费Topic中的消息。订阅是游标的逻辑实体(跟踪偏移量)。可以将多个订阅添加到一个主题。订阅不包含消息的数据,仅包含元数据和游标。每个订阅存储一个游标。游标是日志中的当前偏移量。订阅将其Cursor存储在BookKeeper的Ledger中。这使光标跟踪能够像主题一样缩放。订阅类型(subscription-type)Exclusive一个订阅只能有一个messager来消费消息。Failover容灾一个订阅同时只有一个消费者,可以有多个备份消费者。一旦主消费者发生故障,备用消费者就会接管。永远不会同时有两个活跃的消费者。Shared一个订阅中可以同时有多个消费者,多个消费者共享Topic中的消息。Key_Shared顺序保证(Orderingguarante)如果有顺序要求,可以使用Exclusive和Failover的订阅模式,这样只有一个Consumer在消费同一个Topic,可以保证顺序。如果采用Shared订阅方式,多个Consumer可以同时消费同一个Topic。通过动态增加消费者的数量,可以加速topic的消费,减少服务器上的消息堆积。KeyShared模式保证在Shared模式下,相同Key的消息也会发送给同一个Consumer,并发的同时保证了顺序。多主题订阅(Multi-topicsubscriptions)模式:persistent://public/default/.*persistent://public/default/foo.*Readerpublicvoidread()throwsIOException{finalStringserviceUrl="pulsar://服务器-101:6650”;最终PulsarClient客户端=PulsarClient.builder().serviceUrl(serviceUrl).build();//http://pulsar.apache.org/docs/en/client-libraries-java/#readerfinalReaderreader=client.newReader().topic("my-topic").startMessageId(MessageId.earliest())//MessageId.latest.create();while(true)finalMessagemessage=reader.readNext();System.out.println(newString(message.getData()));}}分区主题(Partitionedtopics)消息保留和过期(Messageretentionandexpiration)如果没有Topic,设置数据保留策略。一旦一个主题的所有订阅游标都成功消费了一个偏移量,这个偏移量之前的消息将被自动删除。如果主题设置了数据保留策略,则消费确认的消息超过保留策略阈值(主题的消息存储大小,主题中的消息保留时间)后将被删除。conf/broker.conf#默认消息保留时间#默认0,修改为3天=60*24*3defaultRetentionTimeInMinutes=4320#默认保留大小#默认0,修改为10GdefaultRetentionSizeInMB=10240#如果ttl还没有,则命名空间的默认ttl在命名空间策略中配置。(禁用值为0的default-ttl)ttlDurationDefaultInSeconds=0retentionpolicy(对于命名空间)$$PULSAR_HOME/bin/pulsar-adminnamespaces\get-retentionpublic/default$curl-XGEThttp://server-101:8080/管理/v2/命名空间/公共/默认/保留|python-mjson.tool$$PULSAR_HOME/bin/pulsar-adminnamespaces\set-retentionpublic/default\--size1024M\--time5m$curl-XPOSThttp://server-101:8080/admin/v2/namespaces/public/default/retention\--header"Content-Type:application/json"\--data'{"retentionTimeInMinutes":5,"retentionSizeInMB":1024}'消息到期/消息-ttl$$PULSAR_HOME/bin/pulsar-admin命名空间\get-message-ttlpublic/default$curl-XGEThttp://server-101:8080/admin/v2/namespaces/public/default/messageTTL$$PULSAR_HOME/bin/pulsar-admin命名空间\set-message-ttlpublic/default\--messageTTL1800$curl-XPOSThttp://server-101:8080/admin/v2/namespaces/public/default/messageTTL\--header"Content-Type:application/json"\--data'1800'是最近几年写的,在AIOps领域快速发展的背景下,各个行业对IT工具、平台能力、解决方案、AI场景、可用数据集的迫切需求不断涌现。基于此,云智慧于2021年8月发布了AIOps社区,旨在树立一面开源旗帜,为各行业的客户、用户、研究人员和开发人员打造一个活跃的用户和开发者社区,贡献和解决行业问题,促进该领域技术的发展。社区先后开源了数据可视化与编排平台——FlyFish、运维管理平台OMP、云服务管理平台——Moore平台、Hours算法等产品。视觉编排平台-FlyFish:项目介绍:https://www.cloudwise.ai/flyF...Github地址:https://github.com/CloudWise-...Gitee地址:https://gitee.com/CloudWise/f...行业案例:https://www.bilibili.com/vide...部分大屏案例:可添加小助手(xiaoyuerwie)注:飞鱼。加入开发者交流群,与行业大咖1V1交流!您还可以通过小助手获取云智慧AIOps信息,了解飞鱼的最新进展!ApachePulsar系列阅读(一):PulsarvsKafka