由来

在项目中正好用到了mqtt技术,最简单的方式就是使用现成的实现啦。所以使用了paho的mqtt service

依赖

 //mqtt依赖    compile 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.0'    compile 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'

下面直接上代码

package com.sunvua.android.atlas.mqtt;import android.content.Context;import android.util.Log;import com.sunvua.android.library.util.AndroidUniqueIdUtil;import com.sunvua.android.library.util.RxBus;import org.eclipse.paho.android.service.MqttAndroidClient;import org.eclipse.paho.client.mqttv3.IMqttActionListener;import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;import org.eclipse.paho.client.mqttv3.IMqttToken;import org.eclipse.paho.client.mqttv3.MqttCallback;import org.eclipse.paho.client.mqttv3.MqttConnectOptions;import org.eclipse.paho.client.mqttv3.MqttException;import org.eclipse.paho.client.mqttv3.MqttMessage;import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;/** * Created by leo on 2017/6/9. */public class Mqtt {    private MqttAndroidClient mqttAndroidClient;    private MqttConnectOptions options;    private static final String BROKER_IP = "tcp://192.168.1.157:61613";    public static final String TOPIC1 = "topic1";    public static final String TOPIC2 = "topic2";    private String clientId;    private static Mqtt instance;    private static final String USERNAME = "admin";    private static final String PASSWORD = "password";    private static final String TAG = Mqtt.class.getName();    private Mqtt(Context context) {//        clientId = UUID.randomUUID().toString();        clientId = AndroidUniqueIdUtil.getUniqueID(context);        options = new MqttConnectOptions();        options.setCleanSession(false);//            // 设置连接的用户名        options.setUserName(USERNAME);        // 设置连接的密码        options.setPassword(PASSWORD.toCharArray());        // 设置超时时间 单位为秒        options.setConnectionTimeout(10);        // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制        options.setKeepAliveInterval(20);        options.setAutomaticReconnect(true);        try {            mqttAndroidClient = new MqttAndroidClient(context, BROKER_IP, clientId, new MemoryPersistence());        } catch (Exception e) {            e.printStackTrace();            Log.i(TAG, "create mqtt client error");        }        mqttAndroidClient.setCallback(new MqttCallback() {            @Override            public void connectionLost(Throwable cause) {                Log.i(TAG, "lost:" + cause);            }            @Override            public void messageArrived(String topic, MqttMessage message) throws Exception {                Log.i(TAG, "receiveMsg:topic" + topic + ",msg:" + message.toString());                RxBus.getDefault().post(message);            }            @Override            public void deliveryComplete(IMqttDeliveryToken token) {                Log.i(TAG, "msg published");            }        });    }    public static Mqtt getInstance(Context context) {        if (null == instance) {            synchronized (Mqtt.class) {                instance = new Mqtt(context);            }        }        return instance;    }    /**     * 连接服务器     * MqttService有自己的重连机制,在断线情况下会重连,但是首次连接失败后,需要再调用connect方法     */    public void connect() {        try {            mqttAndroidClient.connect(options, this, new IMqttActionListener() {                @Override                public void onSuccess(IMqttToken asyncActionToken) {                    Log.i(TAG, "connected");                    try {                        mqttAndroidClient.subscribe(new String[]{TOPIC1, TOPIC2}, new int[]{2, 2});                    } catch (MqttException e) {                        e.printStackTrace();                    }                }                @Override                public void onFailure(IMqttToken asyncActionToken, Throwable exception) {                    Log.i(TAG, "connect error:" + exception);                    connect();//当发生连接失败的情况时继续连接。通常只发生在服务器未在线情况,一旦服务器上线,将立刻连接。                }            });        } catch (MqttException e) {            e.printStackTrace();        }    }    /**     * 断开服务器链接     */    public void disConnect() {        if (null == mqttAndroidClient) {            return;        }        try {            mqttAndroidClient.disconnect();        } catch (MqttException e) {            e.printStackTrace();        }        Log.i(TAG, "disconnected");    }    /**     * 发布消息     *     * @param topic topic     * @param msg   消息内容     * @param qos   0:最多一次的传输;1:至少一次的传输;2: 只有一次的传输     */    public void publish(String topic, String msg, int qos) {        MqttMessage mqttMessage = new MqttMessage();        mqttMessage.setPayload(msg.getBytes());        mqttMessage.setRetained(true);        mqttMessage.setQos(qos);        try {            IMqttDeliveryToken token = mqttAndroidClient.publish(topic, mqttMessage);            token.waitForCompletion();        } catch (MqttException e) {            e.printStackTrace();        }    }}

使用时直接在 application 中调用即可。

  Mqtt.getInstance(this).connect(); 

还有许多不完善,大家用的时候自己根据自己的业务调整吧。当收到消息的时候是用rxbus进行的异步通讯,大家自己修改自己需要的方式

更多相关文章

  1. Android(安卓)通过http访问服务器
  2. Android(安卓)网络连接是否可用的
  3. android选择视频文件上传到后台服务器
  4. android开发环境 国内镜像 及Android(安卓)SDK manager使用国内
  5. Android(安卓)中监听WIFI连接状态变化
  6. android中 异步消息处理机制及Handler
  7. Android(安卓)连接Mysql数据库步骤(新手步骤)
  8. unable to connect to 192.168.1.110:5555 解决办法
  9. Android(安卓)面试题总结(一)

随机推荐

  1. Android系统移植与调试之------->如何修
  2. Android Toast小解
  3. Android Scroll分析 (一) 滑动效果是如何
  4. Android新浪客户端开发教程(完整版)
  5. Android Studio gradle打包可执行jar包(包
  6. ActivityTask: Android上的Async/Await小
  7. Android:No implementation found for na
  8. Android无线连接手机调试
  9. 关于ShapeDrawable应用的一些介绍(下)
  10. apk反向编译