Android(安卓)基于paho的mqtt service的工具类
16lz
2021-01-26
由来
在项目中正好用到了mqtt技术,最简单的方式就是使用现成的实现啦。所以使用了paho的mqtt service
依赖
//mqtt依赖 compile 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.0' compile 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'
下面直接上代码
package com.sunvua.android.atlas.mqtt;import android.content.Context;import android.util.Log;import com.sunvua.android.library.util.AndroidUniqueIdUtil;import com.sunvua.android.library.util.RxBus;import org.eclipse.paho.android.service.MqttAndroidClient;import org.eclipse.paho.client.mqttv3.IMqttActionListener;import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;import org.eclipse.paho.client.mqttv3.IMqttToken;import org.eclipse.paho.client.mqttv3.MqttCallback;import org.eclipse.paho.client.mqttv3.MqttConnectOptions;import org.eclipse.paho.client.mqttv3.MqttException;import org.eclipse.paho.client.mqttv3.MqttMessage;import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;/** * Created by leo on 2017/6/9. */public class Mqtt { private MqttAndroidClient mqttAndroidClient; private MqttConnectOptions options; private static final String BROKER_IP = "tcp://192.168.1.157:61613"; public static final String TOPIC1 = "topic1"; public static final String TOPIC2 = "topic2"; private String clientId; private static Mqtt instance; private static final String USERNAME = "admin"; private static final String PASSWORD = "password"; private static final String TAG = Mqtt.class.getName(); private Mqtt(Context context) {// clientId = UUID.randomUUID().toString(); clientId = AndroidUniqueIdUtil.getUniqueID(context); options = new MqttConnectOptions(); options.setCleanSession(false);// // 设置连接的用户名 options.setUserName(USERNAME); // 设置连接的密码 options.setPassword(PASSWORD.toCharArray()); // 设置超时时间 单位为秒 options.setConnectionTimeout(10); // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制 options.setKeepAliveInterval(20); options.setAutomaticReconnect(true); try { mqttAndroidClient = new MqttAndroidClient(context, BROKER_IP, clientId, new MemoryPersistence()); } catch (Exception e) { e.printStackTrace(); Log.i(TAG, "create mqtt client error"); } mqttAndroidClient.setCallback(new MqttCallback() { @Override public void connectionLost(Throwable cause) { Log.i(TAG, "lost:" + cause); } @Override public void messageArrived(String topic, MqttMessage message) throws Exception { Log.i(TAG, "receiveMsg:topic" + topic + ",msg:" + message.toString()); RxBus.getDefault().post(message); } @Override public void deliveryComplete(IMqttDeliveryToken token) { Log.i(TAG, "msg published"); } }); } public static Mqtt getInstance(Context context) { if (null == instance) { synchronized (Mqtt.class) { instance = new Mqtt(context); } } return instance; } /** * 连接服务器 * MqttService有自己的重连机制,在断线情况下会重连,但是首次连接失败后,需要再调用connect方法 */ public void connect() { try { mqttAndroidClient.connect(options, this, new IMqttActionListener() { @Override public void onSuccess(IMqttToken asyncActionToken) { Log.i(TAG, "connected"); try { mqttAndroidClient.subscribe(new String[]{TOPIC1, TOPIC2}, new int[]{2, 2}); } catch (MqttException e) { e.printStackTrace(); } } @Override public void onFailure(IMqttToken asyncActionToken, Throwable exception) { Log.i(TAG, "connect error:" + exception); connect();//当发生连接失败的情况时继续连接。通常只发生在服务器未在线情况,一旦服务器上线,将立刻连接。 } }); } catch (MqttException e) { e.printStackTrace(); } } /** * 断开服务器链接 */ public void disConnect() { if (null == mqttAndroidClient) { return; } try { mqttAndroidClient.disconnect(); } catch (MqttException e) { e.printStackTrace(); } Log.i(TAG, "disconnected"); } /** * 发布消息 * * @param topic topic * @param msg 消息内容 * @param qos 0:最多一次的传输;1:至少一次的传输;2: 只有一次的传输 */ public void publish(String topic, String msg, int qos) { MqttMessage mqttMessage = new MqttMessage(); mqttMessage.setPayload(msg.getBytes()); mqttMessage.setRetained(true); mqttMessage.setQos(qos); try { IMqttDeliveryToken token = mqttAndroidClient.publish(topic, mqttMessage); token.waitForCompletion(); } catch (MqttException e) { e.printStackTrace(); } }}
使用时直接在 application 中调用即可。
Mqtt.getInstance(this).connect();
还有许多不完善,大家用的时候自己根据自己的业务调整吧。当收到消息的时候是用rxbus进行的异步通讯,大家自己修改自己需要的方式
更多相关文章
- Android(安卓)通过http访问服务器
- Android(安卓)网络连接是否可用的
- android选择视频文件上传到后台服务器
- android开发环境 国内镜像 及Android(安卓)SDK manager使用国内
- Android(安卓)中监听WIFI连接状态变化
- android中 异步消息处理机制及Handler
- Android(安卓)连接Mysql数据库步骤(新手步骤)
- unable to connect to 192.168.1.110:5555 解决办法
- Android(安卓)面试题总结(一)