当前位置: 首页 > 后端技术 > Java

Java开发中常用的消息队列工具ActiveMQ

时间:2023-04-01 20:18:12 Java

一、ActiveMQ的同类产品介绍:RabbitMQ、Kafka、Redis(List)1.1与RabbitMQ最接近的同类产品相比,经常被用来做比较。互相替换。主要区别在于两者的协议不同。RabbitMQ的协议是AMQP(AdvancedMessageQueuingProtocol),而ActiveMQ使用的是JMS(JavaMessagingService)协议。顾名思义,JMS是Java系统的传输协议。队列的两端必须有JVM。因此,如果开发环境是Java,建议使用ActiveMQ。可以使用一些Java对象来传输,比如Map、Blob、Stream等。但是AMQP通用性强,常用于非java环境,传输内容为标准字符串。还有一点就是RabbitMQ是用Erlang开发的。安装前先安装Erlang环境比较麻烦。ActiveMQ解压后无需安装即可使用。1.2KafKa比较Kafka的性能超过ActiveMQ等传统MQ工具,集群具有良好的扩展性。缺点是:在传输过程中可能会出现消息重复,不保证发送顺序。一些传统的MQ功能是不存在的,比如消息的事务功能。所以通常使用Kafka来处理大数据日志。1.3对比Redis其实Redis本身可以使用List实现消息队列的功能,但是功能比较少,当队列大的时候性能会急剧下降。可用于数据量小、业务简单的场景。2安装ActiveMQ复制apache-activemq-5.14.4-bin.tar.gz到Linux服务器的/opt解压tar-zxvfapache-activemq-5.14.4-bin.tar.gz重命名mvapache-activemq-5.14.4activemqvim/opt/activemq/bin/activemq查看java环境:vim/etc/profile或者echo$JAVA_HOME添加两行JAVA_HOME="/opt/jdk1.8.0_152"JAVA_CMD="/opt/jdk1.8.0_152/bin"注册serviceln-s/opt/activemq/bin/activemq/etc/init.d/activemq(软连接必须使用绝对路径)chkconfig–addactivemq禁止使用cp/opt/activemq/bin/activemq/etc/init.dd/activemqstartserviceserviceactivemqstartcloseserviceserviceactivemqstop通过netstat查看端口netstat-tlnpt:表示tcpl:表示监听n:将ip和端口转换成域名和服务名p:显示程序名activemq两个重要的端口,一个是默认提供消息队列的端口:61616,另一个是console端口8161。通过console测试启动消费端,输入网页控制台账号/密码默认:admin/admin点击Queues,观察客户端3.在客户端使用消息队列Java3.1在gmall-service-util中导入依赖坐标.slf4j<艺术factId>slf4j-log4j12org.apache.activemqactivemq-pool<版本>5.15.2org.slf4jslf4j-log4j123.2在支付项目中添加ProducersidepublicclassProducerTest{publicstaticvoidmain(String[]args)throwsJMSException{//创建连接工厂ConnectionFactoryconnectionFactory=newActiveMQConnectionFactory("tcp://192.168.67.201:61616");连接connection=connectionFactory.createConnection();connection.start();//创建一个session第一个参数表示是否支持事务,当为false时,第二个参数Session.AUTO_ACKNOWLEDGE自动为其签名,Session.CLIENT_ACKNOWLEDGE为其手动签名,DUPS_OK_ACKNOWLEDGE在订阅时为其其中一个签名//第一次时第一个参数设置为true,第二个如果服务器设置为SESSION_TRANSACTED,则可以忽略此参数Sessionsession=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);//创建一个队列Queuequeue=session.createQueue(“Atguigu”);MessageProducerproducer=session.createProducer(queue);//创建消息对象ActiveMQTextMessageactiveMQTextMessage=newActiveMQTextMessage();activeMQTextMessage.setText(“你好ActiveMq!”);//发送消息producer.send(activeMQTextMessage);生产者.close();connection.close();}}注意:如果有事务,需要先提交事务session.commit();NumberOfPendingMessages是等待使用的消息数。这是尚未排队的消息数。可以理解为接待总数-排队总数。NumberOfConsumersConsumers这是消费者端的消费MessagesEnqueued进入队列的消息总数,包括出队列的消息。这个数字只增不减。MessagesDequeued出队列的消息可以理解为消费者消费数量的汇总:当有消息进入这个队列时,等待被消费的消息为1,进入队列的消息为1。消息被消费后,等待消费的消息为0,进入队列的消息为1,离开队列的消息为1。当有消息到来时,等待消费的消息为1,进入队列的消息队列为2.3.3在支付项中添加消费者端publicclassConsumerTest{publicstaticvoidmain(String[]args)throwsJMSException{ActiveMQConnectionFactoryactiveMQConnectionFactory=newActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,ActiveMQConnection.DEFAULT_PASSWORD,“612.268.67”:1;//192.168.67)创建连接Connectionconnection=activeMQConnectionFactory.createConnection();connection.start();//创建会话Sessionsession=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);//创建一个队列Queuequeue=session.createQueue(“Atguigu”);//创建消费者MessageConsumerconsumer=session.createConsumer(queue);//接收消息consumer.setMessageListener(newMessageListener(){@OverridepublicvoidonMessage(Messagemessage){//参数为接收到的消息if(messageinstanceofTextMessage){try{Stringtext=((TextMessage)消息).getText();System.out.println(text+"消息收到!");}catch(JMSExceptione){e.printStackTrace();}}}});消费者.close();session.close();}}3.4关于producer提交事务时的事务控制transaction开启为true,只执行send,不会提交到队列。只有在执行session.commit()的时候,消息才真正提交到队列中,如果队列中的事务没有开启false,只要执行send,就会进入队列。当消费者收到交易,交易开始,收据必须写Session.SESSION_TRANSACTED。收到消息后,并没有真正消费消息。消息只是被锁定。一旦线程死亡,抛出异常,或者程序执行了session.rollback(),消息就会被释放并返回到队列中,以供其他消费者再次消费。java训练事务没有开启,签到方式选择Session.AUTO_ACKNOWLEDGE,只要调用comsumer.receive方法,就会自动确认。如果事务未开启,登录方式为Session.CLIENT_ACKNOWLEDGE,客户端需要执行message.acknowledge(),否则视为未提交。线程结束后,其他线程仍然可以接收到。这种方式和事务模式很相似,不同的是不能手动回滚,可以单独确认一条消息。Topic模式下不启用手动签到事务,签到方式选择Session.DUPS_OK_ACKNOWLEDGE进行批量签到,可以提高性能。但是在某些情况下,消息可能会重复提交,使用这种模式的消费者必须能够处理重复提交的问题。3.5持久化与非持久化通过producer.setDeliveryMode(DeliveryMode.PERSISTENT)设置持久化的好处是当activemq宕机时,消息队列中的消息不会丢失。非持久化会丢失。但是会消耗一定的性能。持久性:当服务器宕机时,消息仍然存在。非持久化:当服务器宕机时,消息不存在。在zookeeper中,有持久化-非持久化。

最新推荐
猜你喜欢