前言Active是一个开源的、面向消息(MOM)的中间件,实现了JMS1.1规范,为应用程序提供高效、可扩展、稳定、安全的企业级消息传递。AC-tiveMQ使用Apache提供的授权,任何人都可以修改其实现代码。ActiveMQ的设计目标是提供标准的、面向消息的、应用集成的、可以跨越多种语言和系统的消息通信中间件。ActiveMQ实现JMS标准并提供许多附加功能。本文将为您详细介绍ActiveMQAPI的使用。1.JMS的概念?“什么是JMS?”JMS--------JAVAMessageServiceJAVA的消息服务是Sun公司提供的接口,只是一种规范,类似于JDBC。是的,使用时需要当前规范的实现产品。《JMS能做什么:》可以向目的地发布信息,也可以消费来自目的地的消息2.两种通信模型《队列的通信概念:》特点:当我们在同一个队列中有多个消费者时,多个消费者的数据是原始队列中所有数据队列的通信模型最大的适用场景:流量调峰,高并发处理《主题通信模型:》特点:当我们的队列有多个消费者时,那么这多个消费者消费的数据是同一个topic消费者通信模型的适用场景:微服务下服务之间的异步通信3.MQ实现产品《实现产品:》ActiveMQRabbitMQRockerMQKafka(本设计的初衷是为了做分布式logs.后来因为日志顺序严格,这时候人们就用kafka做消息队列)4.JMS中的常用名词“Commonnouns:"ActiveMQConnectionFactory:this是一个创建连接的工厂ConnectionFactory:连接工厂Connection:连接JAVA到MQ的连接Destination:目的地Producer(生产者)Consumer(消费者)Session:Session(对MQ的每一次操作都称为一个会话)Queue:Queue题目:题目5.什么是消息队列《简单来说,消息队列就是一个存放临时数据的地方:》Producer------------>存储介质上的Consumer------------>存储介质上的“消息队列类似于快递公司:”。您可以把东西交给快递公司的目标人,也可以到快递公司自取。6、什么是ActiveMQ《含义:》ActiveMQ是一个JMS的实现产物,可以实现JMS下的所有功能7、ActiveMQ能做什么《主要功能:》流量调峰处理模块高并发下的异步通信处理订单处理三-party平台高并发辅助消息表可以完成分布式事务的最终一致性八、ActiveMQ的安装《ActiveMQ的安装与配置:》1、从官网下载Linux版本的ActiveMQ(最新版本为5.13.4)https://activemq.apache.org/下载。html2,解压安装tar-zxvfapache-activemq-5.13.4-bin.tar.gz3,配置(这里使用默认配置,无需修改)vim/usr/lical/activemq-1/conf/activemq.xml4,启动cd/usr/local/activemq-1/bin./activemqstart5.打开管理界面(管理界面可以查看和管理所有队列和消息)http://192.168.1.100:8161启动成功后可以浏览http://localhost:8161/admin/默认用户名和密码:admin/admin管理界面使用jetty作为容器。如果要修改管理接口的端口,可以编辑../conf/jetty.xml,找到下面这段:用户名/密码在../conf/jetty-realm.properties中。比如要添加管理员jimmy/123456,可以参考如下修改:123admin:admin,adminjimmy:123456,adminuser:user,user注意:管理界面有个小洞。ActiveMQ5.13.2与jdk1.8存在一些兼容性问题。如果使用jdk1.8,管理界面进入Queues选项卡时,偶尔会报错,但不影响消息收发的正常,只是无法从界面查看队列状态。如果出现这个问题,可以将jdk版本降到1.7。同时最好清除data目录下的所有数据,然后重启activemq。9.ActiveMQAPI用法《AcatveMQAPI用法:》队列用法(生产者)packagecom.qy.mq.queue;importorg.apache.activemq.ActiveMQConnectionFactory;importorg.apache.activemq.Message;importjavax.jms.*;/***@Auther:qianyu*@Date:2020/11/0414:12*@Description:Producer*/publicclassProducer{//要发布的地址privatestaticfinalStringPATH="tcp://10.7.182.87:61616";//下的用户名ActiveMQprivatestaticfinalStringuserName="admin";//ActiveMQ下的密码privatestaticfinalStringpassword="admin";publicstaticvoidmain(String[]args)throwsJMSException{//第一步:创建连接工厂ActiveMQConnectionFactoryactiveMQConnectionFactory=newActiveMQConnectionFactory(userName,password,PATH);//获取通过这个工厂连接Connectionconnection=activeMQConnectionFactory.createConnection();//第三步:打开连接connection.start();//第四步:创建操作MQ的会话/***第一个参数:是否使用transaction*第二个参数:交易的响应方式client*第一种:自动响应*第二种:客户端手动响应*/Sessionsession=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);//需要发送消息的目的地(队列操作在队列)Destinationdestination=session.createQueue(“wqqq");//生产这条消息的生产者//必须有一个生产者MessageProducermessageProducer=session.createProducer(destination);//发送大量的消息到消息队列//for(inti=0;i<100;i++){//需要发送的消息//TextMessagetextMessage=session.createTextMessage("我是朝日:"+i);//研究消息类型/*BytesMessagebytesMessage=session.createBytesMessage();bytesMessage.setByteProperty("www",newByte("123"));//然后就可以发送消息了messageProducer.send(bytesMessage);*///创建一个map类型的消息/*MapMessagemapMessage=session.createMapMessage();mapMessage.setInt("www1",123);messageProducer.send(mapMessage);*/ObjectMessageobjectMessage=session.createObjectMessage(newUser(1,"qianyu","123"));messageProducer.send(objectMessage);//}}}队列usage(Consumer)packagecom.qy.mq.queue;importorg.apache.activemq.ActiveMQConnectionFactory;importjavax.jms.*;importjava.io.Serializable;/***@Auther:qianyu*@Date:2020/11/0414:13*@描述:消费者*/publicclassConsumer{//要发布的地址privatestaticfinalStringPATH="tcp://10.7.182.87:61616";//ActiveMQ下的用户名privatestaticfinalStringuserName="admin";//ActiveMQ下的密码privatestaticfinalStringpassword="admin";publicstaticvoidmain(String[]args)throwsJMSException{//第一步:创建连接factoryActiveMQConnectionFactoryactiveMQConnectionFactory=newActiveMQConnectionFactory(userName,password,PATH);//通过这个factory获取连接Connectionconnection=activeMQConnectionFactory.createConnection();//第三步:打开连接connection.start();//第四步:创建操作MQ的本次会话/***第一个参数:是否使用事务*第二个参数:客户端响应方式*第一种:自动响应*第二种:客户端手动响应*/Sessionsession=connection.createSession(false,Session.CLIENT_ACKNOWLEDGE);//需要发送消息的目的地(队列操作队列)Destinationdestination=session.createQueue("wqqq");//创建我们的consumerMessageConsumermessageConsumer=session.createConsumer(destination);//然后就可以接收到我们的消息了//Messagemessage=messageConsumer.receive();//接收消息并指定超时时间//Messagemessage=messageConsumer.receive(5000);//接收消息不用等待直接过来不接收//Messagemessage=messageConsumer.receiveNoWait();//给定当前路径设置监听messageConsumer.setMessageListener(newMessageListener(){@OverridepublicvoidonMessage(Messagemessage){/*BytesMessagebytesMessage=(BytesMessage)message;try{System.out.println("获取到的数据为:"+bytesMessage.getByteProperty("www"));}catch(JMSExceptione){e.printStackTrace();}*//*MapMessagemapMessage=(MapMessage)message;try{System.out.println("获取的数据为:"+mapMessage.getInt("www1"));}catch(JMSExceptione){e.printStackTrace();}*///测试对象消息收发ObjectMessageobjectMessage=(ObjectMessage)message;try{Useruser=(User)objectMessage.getObject();System.out.println("Thereceiveddatais:"+user);}catch(JMSExceptione){e.printStackTrace();}/*//我们知道是字符串类型的消息TextMessagetextMessage=(TextMessage)message;//接下来可以打印这条消息.try{System.out.println("Consumer1---收到的消息是:"+textMessage.getText());}catch(JMSExceptione){e.printStackTrace();}*/try{//这句话表示客户端手动回复message.acknowledge();}catch(JMSExceptione){e.printStackTrace();}}});}}packageco主题模型的制作m.qy.mq.topic;importorg.apache.activemq.ActiveMQConnectionFactory;importjavax.jms.*;/***@Auther:qianyu*@Date:2020/11/0415:17*@Description:*/publicclassProducer{//要发布的地址privatestaticfinalStringPATH="tcp://10.7.182.87:61616";//ActiveMQ下的用户名privatestaticfinalStringuserName="admin";//ActiveMQ下的密码privatestaticfinalStringpassword="admin";publicstaticvoidmain(String[]args)throwsJMSException{//第一步:创建一个连接工厂ActiveMQConnectionFactoryactiveMQConnectionFactory=newActiveMQConnectionFactory(userName,password,PATH);//通过这个工厂获取一个连接Connectionconnection=activeMQConnectionFactory.createConnection();//第三步:打开这个连接connection。start();//第四步:创建这个会话来操作MQ/***第一个参数:是否使用事务*第二个参数:客户端的响应方式*第一种:自动应答*第二种:客户端手动响应*/SessionSession=connection.createSession(false,Session.CLIENT_ACKNOWLEDGE);//要发送的消息的目的地(下面创建的应该是主题模型的地址)生产者MessageProducermessageProducer=session.createProducer(destination);//向消息队列发送大量消息for(inti=0;i<100;i++){//需要发送的消息TextMessagetextMessage=session.createTextMessage("我是朝日:"+i);//然后就可以发送消息了messageProducer.send(textMessage);}}}Topicmodelconsumerpackagecom.qy.mq.topic;importorg.apache.activemq.ActiveMQConnectionFactory;importjavax.jms.*;/***@Auther:qianyu*@Date:2020/11/0415:19*@Description:*/publicclassConsumer{//要发布的地址privatestaticfinalStringPATH="tcp://10.7.182.87:61616";//ActiveMQ下的用户名privatestaticfinalStringuserName="admin";//ActiveMQ下的密码privatestaticfinalStringpassword="admin";publicstaticvoidmain(String[]args)throwsJMSException{//第一步:创建连接工厂ActiveMQConnectionFactoryactiveMQConnectionFactory=newActiveMQConnectionFactory(userName,password,PATH);//获取连接通过这个工厂Connectionconnection=activeMQConnectionFactory.createConnection();//第三步:打开连接connection.start();//第四步:创建操作MQ的会话/***第一个参数:是否使用事务*的第二个参数:客户端响应方式*第一个t类型:自动响应*第二种:客户端手动响应*/Sessionsession=connection.createSession(false,Session.CLIENT_ACKNOWLEDGE);//需要发送消息的目的地(队列操作队列)Destinationdestination=session.createTopic("topic222");//创建我们的consumerMessageConsumermessageConsumer=session.createConsumer(destination);//然后我们就可以收到我们的消息了//给定监听器的当前路径messageConsumer.setMessageListener(newMessageListener(){@OverridepublicvoidonMessage(Messagemessage){//我们知道是字符串类型的消息//然后就可以打印这条消息了{message.acknowledge();}catch(JMSExceptione){e.printStackTrace();}}});}}关于ActiveMQ的文章先到这里结束,后续发布更多关于ActiveMQ系列的文章,感谢大家支持!