您现在的位置是:首页 > 后台技术 > JavaJava
java 实现MQTT客户端(图文)
第十三双眼睛2021-05-30【Java】人已围观
简介前端时间,总结了以下java使用MQTT客户端的使用方法,特地记录一下,方便以后使用
MQTT是一种基于发布/订阅(publish/subscribe)模式的"轻量级"通讯协议,该协议构建于TCP/IP协议上,由IBM在1999年发布。MQTT最大优点在于,可以以极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。作为一种低开销、低带宽占用的即时通讯协议,使其在物联网、小型设备、移动应用等方面有较广泛的应用
我们使用的maven,因此首先引入依赖
发布端:
订阅端:
回调:
注意,在一个服务中,不能有两个客户端的clientId一样,但是有时候,明明没有一样的客户端,但是还是会报断开连接,这时候可能是缓存导致的,将服务器重启一下就好了。
我们使用的maven,因此首先引入依赖
<dependencies> <dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.0</version> </dependency> </dependencies> |
package com.zyb.mqtt; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; public class PublishSample { public static void main(String[] args) { String topic = "test"; String content = online(); System.out.println(content); int qos = 1; String broker = "tcp://127.0.0.1:1883"; String userName = "test"; String password = "123"; String clientId = "pub"; // 内存存储 MemoryPersistence persistence = new MemoryPersistence(); try { // 创建客户端 MqttClient sampleClient = new MqttClient(broker, clientId, persistence); // 创建链接参数 MqttConnectOptions connOpts = new MqttConnectOptions(); // 在重新启动和重新连接时记住状态 connOpts.setCleanSession(false); // 设置连接的用户名 connOpts.setUserName(userName); connOpts.setPassword(password.toCharArray()); // 建立连接 sampleClient.connect(connOpts); // 创建消息 MqttMessage message = new MqttMessage(content.getBytes()); // 设置消息的服务质量 message.setQos(qos); // 发布消息 sampleClient.publish(topic, message); // 断开连接 sampleClient.disconnect(); // 关闭客户端 sampleClient.close(); } catch (MqttException me) { System.out.println("reason " + me.getReasonCode()); System.out.println("msg " + me.getMessage()); System.out.println("loc " + me.getLocalizedMessage()); System.out.println("cause " + me.getCause()); System.out.println("excep " + me); me.printStackTrace(); } } public static String online() { return "{\"value\":" + true +"}"; } } |
package com.zyb.mqtt; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; public class SubscribeSample { public static void main(String[] args) throws MqttException { String HOST = "tcp://127.0.0.1:1883"; String TOPIC = "mqtt/test"; int qos = 1; String clientid = "subClient"; String userName = "test"; String passWord = "test"; try { // host为主机名,test为clientid即连接MQTT的客户端ID,一般以客户端唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存 MqttClient client = new MqttClient(HOST, clientid, new MemoryPersistence()); // MQTT的连接设置 MqttConnectOptions options = new MqttConnectOptions(); // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接 options.setCleanSession(true); // 设置连接的用户名 options.setUserName(userName); // 设置连接的密码 options.setPassword(passWord.toCharArray()); // 设置超时时间 单位为秒 options.setConnectionTimeout(10); // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制 options.setKeepAliveInterval(20); // 设置回调函数 client.setCallback(new CallBack()); client.connect(options); //订阅消息 client.subscribe(TOPIC, qos); System.out.println("客户端启动成功"); } catch (Exception e) { e.printStackTrace(); } } } |
package com.zyb.mqtt; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallbackExtended; import org.eclipse.paho.client.mqttv3.MqttMessage; public class CallBack implements MqttCallbackExtended{ @Override public void connectionLost(Throwable cause) { System.out.println("connectionLost"); } @Override public void messageArrived(String topic, MqttMessage message) throws Exception { System.out.println("messageArrived"); } @Override public void deliveryComplete(IMqttDeliveryToken token) { System.out.println("deliveryComplete"); } @Override public void connectComplete(boolean reconnect, String serverURI) { System.out.println("connectComplete"); //这里实现重连后的重新订阅 } } |
Tags:java mqtt
很赞哦! ()