当前位置: 首页 > 后端技术 > Java

如何在Java中使用MQTT

时间:2023-04-01 15:58:08 Java

MQTT是一种基于发布/订阅模型的轻量级物联网消息协议,可以在严重受限的硬件设备和低带宽、高延迟的网络上实现稳定传输。易于实现,支持QoS,支持小包,占据物联网协议的半壁江山。本文主要介绍如何在Java项目中使用MQTT实现客户端与服务器的连接、订阅、消息收发等功能。客户端库介绍本文开发环境为:构建工具:MavenIDE:IntelliJIDEAJava版本:JDK1.8.0本文将使用EclipsePahoJavaClient作为客户端,这是Java中使用最广泛的MQTT客户端库语言。将以下依赖项添加到项目pom.xml文件中。<依赖项><依赖项>org.eclipse.pahoorg.eclipse.paho.client.mqttv31.2.5创建MQTT连接MQTT服务器本文将使用EMQX提供的免费公共MQTT服务器,它是基于EMQX的MQTT云平台创建的。服务器访问信息如下:Broker:broker.emqx.io(中国用户可以使用broker-cn.emqx.io)TCP端口:1883SSL/TLS端口:8883普通TCP连接设置MQTTBroker基本连接参数,用户名和密码是可选参数。Stringbroker="tcp://broker.emqx.io:1883";//TLS/SSL//Stringbroker="ssl://broker.emqx.io:8883";Stringusername="emqx";Stringpassword="public";Stringclientid="publish_client";然后创建MQTT客户端并连接。MqttClientclient=newMqttClient(broker,clientid,newMemoryPersistence());MqttConnectOptionsoptions=newMqttConnectOptions();options.setUserName(username);options.setPassword(password.toCharArray());client.connect(options);MqttClient:同步调用客户端,采用阻塞方式进行通信。MqttClientPersistence:表示一个持久化的数据存储,用于存储传输过程中的出站和入站信息,以便可以将其传递到指定的QoS。MqttConnectOptions:连接选项,用于指定连接参数,下面列出一些常用的方法。setUserName:设置用户名setPassword:设置密码setCleanSession:设置是否清除会话setKeepAliveInterval:设置心跳间隔setConnectionTimeout:设置连接超时时间setAutomaticReconnect:设置是否自动重连TLS/SSL连接如果要使用TLS自签名证书/SSL连接,需要在pom.xml文件中加入bcpkix-jdk15on。org.bouncycastlebcpkix-jdk15on1.70然后使用下面的代码创建SSLUtils.java文件。包io.emqx.mqtt;导入org.bouncycastle.jce.provider.BouncyCastleProvider;导入org.bouncycastle.openssl.PEMKeyPair;导入org.bouncycastle.openssl.PEMParser;导入org.bouncycastle.openssl.jcajce.JcaPEMKeyConverter;导入javax。net.ssl.KeyManagerFactory;导入javax.net.ssl.SSLContext;导入javax.net.ssl.SSLSocketFactory;导入javax.net.ssl.TrustManagerFactory;导入java.io.BufferedInputStream;导入java.io.FileInputStream;导入java。io.FileReader;导入java.security.KeyPair;导入java.security.KeyStore;导入java.security.Security;导入java.security.cert.CertificateFactory;导入java.security.cert.X509Certificate;公共类SSLUtils{publicstaticSSLSocketFactorygetSocketFactory(finalStringcaCrtFile,finalStringcrtFile,finalStringkeyFile,finalStringpassword)抛出异常{Security.addProvider(newBouncyCastleProvider());//加载CAcertificateX509CertificatecaCert=null;FileInputStreamfis=newFileInputStream(caCrtFile);BufferedInputStreambis=newBufferedInputStream(fis);CertificateFactorycf=CertificateFactory.getInstance("X.509");while(bis.available()>0){caCert=(X509Certificate)cf.generateCertificate(bis);}//加载客户端证书bis=newBufferedInputStream(newFileInputStream(crtFile));X509Certificatecert=null;while(bis.available()>0){cert=(X509Certificate)cf.generateCertificate(bis);}//加载客户端私钥PEMParserpemParser=newPEMParser(newFileReader(keyFile));对象对象=pemParser.readObject();JcaPEMKeyConverter转换器=newJcaPEMKeyConverter().setProvider("BC");KeyPairkey=converter.getKeyPair((PEMKeyPair)object);pemParser.close();//CA证书是你sed验证服务器KeyStorecaKs=KeyStore.getInstance(KeyStore.getDefaultType());caKs.load(null,null);caKs.setCertificateEntry("ca-certificate",caCert);TrustManagerFactorytmf=TrustManagerFactory.getInstance("X509");tmf.init(caKs);//客户端密钥和证书被发送到服务器,因此它可以验证KeyStoreks=KeyStore.getInstance(KeyStore.getDefaultType());ks.load(null,null);ks.setCertificateEntry("证书",cert);ks.setKeyEntry("私钥",key.getPrivate(),password.toCharArray(),newjava.security.cert.Certificate[]{cert});KeyManagerFactorykmf=KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());kmf.init(ks,password.toCharArray());//最后,创建SSL套接字工厂SSLContextcontext=SSLContext.getInstance("TLSv1.2");context.init(kmf.getKeyManagers(),tmf.getTrustManagers(),null);返回context.getSocketFactory();}}参考以下设置选项//设置SSL/TLS连接地址Stringbroker="ssl://broker.emqx.io:8883";//设置socketfactoryStringcaFilePath="/cacert.pem";StringclientCrtFilePath="/client.pem";StringclientKeyFilePath="/client.key";SSLSocketFactorysocketFactory=getSocketFactory(caFilePath,clientCrtFilePath,clientKeyFilePath,"");options.setSocketFactory(socketFactory);发布MQTT消息创建一个发布客户端类PublishSample,它将向主题mqtt/test发布一条HelloMQTT消息。包io.emqx.mqtt;导入org.eclipse.paho.client.mqttv3.MqttClient;导入org.eclipse.paho.client.mqttv3.MqttConnectOptions;导入org.eclipse.paho.client.mqttv3.MqttException;导入org.eclipse.paho.client.mqttv3.MqttMessage;importorg.eclipse.paho.client.mqttv3.persist.MemoryPersistence;publicclassPublishSample{publicstaticvoidmain(String[]args){Stringbroker="tcp://broker.emqx.io:1883";Stringtopic="mqtt/test";String用户名="emqx";字符串密码=“公共”;Stringclientid="publish_client";字符串内容="你好MQTT";国际服务质量=0;try{MqttClientclient=newMqttClient(broker,clientid,newMemoryPersistence());//连接参数MqttConnectOptionsoptions=newMqttConnectOptions();//设置用户名和密码options.setUserName(username);选项.setPassword(密码。toCharArray());options.setConnectionTimeout(60);options.setKeepAliveInterval(60);//连接client.connect(options);//创建消息并设置QoSMqttMessagemessage=newMqttMessage(content.getBytes());消息.setQos(qos);//发布消息client.publish(topic,message);System.out.println("消息发布");System.out.println("主题:"+主题);System.out.println("消息内容:"+content);//关闭连接client.disconnect();//关闭客户端client.close();}catch(MqttExceptione){thrownewRuntimeException(e);}}}订阅MQTT主题创建订阅客户端类SubscribeSample将订阅主题mqtt/testpackageio.emqx.mqtt;importorg.eclipse.paho.client.mqttv3.*;importorg.eclipse.paho.client.mqttv3.persist.MemoryPersistence;publicclassSubscribeSample{publicstaticvoidmain(String[]args){Stringbroker="tcp://broker.emqx.io:1883";Stringtopic="mqtt/test";String用户名="emqx";字符串密码=“公共”;Stringclientid="subscribe_client";国际服务质量=0;try{MqttClientclient=newMqttClient(broker,clientid,newMemoryPersistence());//连接参数MqttConnectOptionsoptions=newMqttConnectOptions();options.setUserName(用户名);options.setPassword(password.toCharArray());options.setConnectionTimeout(60);options.setKeepAliveInterval(60);//设置回调client.setCallback(newMqttCallback(){publicvoidconnectionLost(Throwablecause){System.out.println("connectionLost:"+cause.getMessage());}publicvoidmessageArrived(Stringtopic,MqttMessagemessage){System.out.println("topic:"+topic);System.out.println("Qos:"+message.getQos());System.out.println("消息内容:"+newString(message.getPayload()));}publicvoiddeliveryComplete(IMqttDeliveryTokentoken){System.out.println("deliveryComplete--------"+token.isComplete());}});客户端连接(选项);client.subscribe(topic,qos);}catch(Exceptione){e.printStackTrace();}}}MqttCallback解释:connectionLost(Throwablecause):连接丢失时调用messageArrived(Stringtopic,MqttMessagemessage):收到消息时调用deliveryComplete(IMqttDeliveryTokentoken):消息传递完成时调用TestNextrunsubscribeSample,订阅mqtt/test主题,然后运行PublishSample向mqtt/test主题发布消息。我们会看到发布者成功发布消息,订阅者收到消息。至此,我们就完成了在Java中使用PahoJavaClient作为MQTT客户端连接公共MQTT服务器,实现了测试客户端与MQTT服务器的连接、消息发布和订阅。版权声明:本文为EMQ原创,转载请注明出处。