Android使用的MQTT客户端,支持订阅、发送消息;

支持创建连接到本地保存;

支持话题消息筛选;


使用视频:https://dwz.cn/undJFEnq
小米应用商店也有 【蘑菇IoT】~

核心代码贴一下,做个记录

import android.app.Service;import android.content.Context;import android.content.Intent;import android.os.IBinder;import androidx.annotation.Nullable;import com.annimon.stream.Collectors;import com.annimon.stream.Stream;import com.freddon.android.snackkit.extension.regex.RegexHelper;import com.freddon.android.snackkit.extension.tools.NetSuit;import com.freddon.android.snackkit.log.Loger;import com.qiniu.util.StringUtils;import com.sagocloud.ntworker.agent.App;import com.sagocloud.ntworker.agent.RxEventBus;import com.sagocloud.ntworker.mqtt.ActionEventType;import com.sagocloud.ntworker.mqtt.EventType;import com.sagocloud.ntworker.mqtt.bean.MQTTConnectUserEntity;import com.sagocloud.ntworker.mqtt.bean.MQTTMessage;import com.sagocloud.ntworker.mqtt.bean.MqttConnectPoint;import com.sagocloud.ntworker.mqtt.event.MQTTClientActionEvent;import com.sagocloud.ntworker.mqtt.event.MQTTTransferEvent;import com.sagocloud.ntworker.mqtt.event.MQTTMessageEvent;import com.sagocloud.ntworker.mqtt.event.MQTTStateEvent;import com.sagocloud.ntworker.mqtt.event.MQTTTraceEvent;import org.eclipse.paho.android.service.MqttAndroidClient;import org.eclipse.paho.android.service.MqttTraceHandler;import org.eclipse.paho.client.mqttv3.DisconnectedBufferOptions;import org.eclipse.paho.client.mqttv3.IMqttActionListener;import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;import org.eclipse.paho.client.mqttv3.IMqttToken;import org.eclipse.paho.client.mqttv3.MqttCallback;import org.eclipse.paho.client.mqttv3.MqttConnectOptions;import org.eclipse.paho.client.mqttv3.MqttException;import org.eclipse.paho.client.mqttv3.MqttMessage;import java.util.ArrayList;import java.util.Arrays;import java.util.List;import java.util.Locale;import java.util.concurrent.TimeUnit;import io.reactivex.Observable;import io.reactivex.disposables.CompositeDisposable;import io.reactivex.disposables.Disposable;public class MQTTService extends Service {    public final static String CONN = "CON_MQTT_CF";    private MqttAndroidClient mqttAndroidClient;    private MQTTMessageEvent mQTTConnectEvent;    private MQTTConnectUserEntity connectPoint;    private MqttConnectOptions mMqttConnectOptions;    private MqttCallback mqttCallback = new MqttCallback() {        @Override        public void connectionLost(Throwable cause) {            mQTTConnectEvent = new MQTTMessageEvent();            mQTTConnectEvent.setType(EventType.connectionLost);            Loger.e("?connectionLost:", cause.getMessage());            RxEventBus.post(mQTTConnectEvent);            RxEventBus.post(new MQTTStateEvent(App.mqttIsConnected = false));        }        @Override        public void messageArrived(String topic, MqttMessage message) throws Exception {            mQTTConnectEvent = new MQTTMessageEvent();            mQTTConnectEvent.setType(EventType.messageArrived);            mQTTConnectEvent.setTopic(topic);            mQTTConnectEvent.setMessage(message);            Loger.e("✉️messageArrived:", topic);            RxEventBus.post(mQTTConnectEvent);        }        @Override        public void deliveryComplete(IMqttDeliveryToken token) {            mQTTConnectEvent = new MQTTMessageEvent();            mQTTConnectEvent.setType(EventType.deliveryComplete);            try {                mQTTConnectEvent.setTopic(StringUtils.join(token.getTopics(), ","));                mQTTConnectEvent.setMessage(token.getMessage());            } catch (MqttException e) {                e.printStackTrace();            }            Loger.e("?deliveryComplete:", token.toString());            RxEventBus.post(mQTTConnectEvent);        }    };    private IMqttActionListener iMqttActionListener = new IMqttActionListener() {        @Override        public void onSuccess(IMqttToken asyncActionToken) {            RxEventBus.post(new MQTTTransferEvent(asyncActionToken, null));            Loger.e("?onSuccess:", "" + Arrays.toString(asyncActionToken.getTopics()));            App.mqttIsConnected = true;            RxEventBus.post(new MQTTStateEvent(true));            DisconnectedBufferOptions disconnectedBufferOptions = new DisconnectedBufferOptions();            disconnectedBufferOptions.setBufferEnabled(true);            disconnectedBufferOptions.setBufferSize(100);            disconnectedBufferOptions.setPersistBuffer(false);            disconnectedBufferOptions.setDeleteOldestMessages(false);            mqttAndroidClient.setBufferOpts(disconnectedBufferOptions);        }        @Override        public void onFailure(IMqttToken asyncActionToken, Throwable exception) {            RxEventBus.post(new MQTTTransferEvent(asyncActionToken, exception.getMessage()));            Loger.e("?onFailure:", "" + exception.getMessage());            App.mqttIsConnected = false;            RxEventBus.post(new MQTTStateEvent(false));        }    };    private CompositeDisposable subscription;    private MqttTraceHandler traceCallback = new MqttTraceHandler() {        @Override        public void traceDebug(String tag, String message) {            Loger.e("?traceDebug:" + tag, "" + message);//            LiveDataBus.post(MQTTTraceEvent.class,new MQTTTraceEvent(MQTTTraceEvent.Type.DEBUG, tag, message));            RxEventBus.post(new MQTTTraceEvent(MQTTTraceEvent.Type.DEBUG, tag, message));        }        @Override        public void traceError(String tag, String message) {            Loger.e("?traceError:" + tag, "" + message);//            LiveDataBus.post(MQTTTraceEvent.class,new MQTTTraceEvent(MQTTTraceEvent.Type.ERROR, tag, message));            RxEventBus.post(new MQTTTraceEvent(MQTTTraceEvent.Type.ERROR, tag, message));        }        @Override        public void traceException(String tag, String message, Exception e) {            Loger.e("?traceException:" + tag, "" + message + e.getMessage());            RxEventBus.post(new MQTTTraceEvent(MQTTTraceEvent.Type.EXCEPTION, tag, message));//            LiveDataBus.post(MQTTTraceEvent.class,new MQTTTraceEvent(MQTTTraceEvent.Type.EXCEPTION, tag, message));        }    };    private Disposable actSubscription;    private Disposable actTimerSubscription;    public static void startService(Context context, MQTTConnectUserEntity point) {        Intent service = new Intent();        service.setClass(context, MQTTService.class);        service.putExtra(CONN, point);        context.startService(service);    }    private void $prepareActionHandler() {        if (subscription == null) {            subscription = new CompositeDisposable();        }        subscription.clear();        if (actSubscription != null && actSubscription.isDisposed()) {            actSubscription.dispose();        }        if (actTimerSubscription != null && actTimerSubscription.isDisposed()) {            actTimerSubscription.dispose();        }        actSubscription = RxEventBus.subscribeIOEvent(                MQTTClientActionEvent.class,                event -> {                    ActionEventType type = event.getEventType();                    Object payload = event.getPayload();                    switch (type) {                        case connect:                            connect();                            break;                        case publish:                            if (payload instanceof MQTTMessage) {                                publish((MQTTMessage) payload);                            }                            break;                        case subscribe:                            if (payload instanceof MQTTMessage) {                                subscribe((MQTTMessage) payload);                            }                            break;                        case unsubscribe:                            if (payload instanceof MQTTMessage) {                                unsubscribe((MQTTMessage) payload);                            }                            break;                        case unsubscribe_all:                            if (payload instanceof String[]) {                                unsubscribeAll((String[]) payload);                            }                            break;                        case close:                            disconnect();                            mqttAndroidClient = null;                            App.mqttIsConnected = false;                            RxEventBus.post(new MQTTStateEvent(false));                            stopSelf();                            break;                    }                },                error -> {                    Loger.d("error", error.getMessage());                }        );        actTimerSubscription = Observable.interval(2000, TimeUnit.MILLISECONDS)                .subscribe((i) -> {                    App.mqttIsConnected = mqttAndroidClient != null && mqttAndroidClient.isConnected();                    RxEventBus.post(new MQTTStateEvent(App.mqttIsConnected));                });        subscription.add(actSubscription);        subscription.add(actTimerSubscription);    }    @Override    public void onDestroy() {        disconnect();        RxEventBus.unsubscribeEvent(subscription);        super.onDestroy();    }    @Nullable    @Override    public IBinder onBind(Intent intent) {        return null;    }    @Override    public int onStartCommand(Intent intent, int flags, int startId) {        if (intent != null) {            connectPoint = intent.getParcelableExtra(CONN);            if (connectPoint != null) {                $prepareActionHandler();                connect();            } else {                if (mqttAndroidClient != null && mqttAndroidClient.isConnected()) {                    connect();                }            }        }        return super.onStartCommand(intent, flags, startId);    }    private void $prepared(MqttConnectPoint connectPoint) {        String serverURI = String.format(Locale.ENGLISH, "%s://%s:%s", connectPoint.isUseSSL() ? "ssl" : "tcp", connectPoint.getHost(), connectPoint.getPort());        if (mqttAndroidClient != null) {            disconnect();        }        mqttAndroidClient = new MqttAndroidClient(this, serverURI, connectPoint.getClientId());        mqttAndroidClient.setCallback(mqttCallback); //设置监听订阅消息的回调        mqttAndroidClient.setTraceEnabled(true);        mqttAndroidClient.setTraceCallback(traceCallback);        mMqttConnectOptions = new MqttConnectOptions();        mMqttConnectOptions.setMqttVersion(connectPoint.getVersion());        mMqttConnectOptions.setMaxInflight(connectPoint.getMaxInflight());        mMqttConnectOptions.setAutomaticReconnect(connectPoint.isAutoReconnect());        mMqttConnectOptions.setCleanSession(connectPoint.isClearSession()); //设置是否清除缓存        mMqttConnectOptions.setConnectionTimeout(connectPoint.getConnectTimeout()); //设置超时时间,单位:秒        mMqttConnectOptions.setKeepAliveInterval(connectPoint.getTickTime()); //设置心跳包发送间隔,单位:秒        if (RegexHelper.isAllNotEmpty(connectPoint.getUserName(), connectPoint.getUserPasswort())) {            mMqttConnectOptions.setUserName(connectPoint.getUserName()); //设置用户名            mMqttConnectOptions.setPassword(connectPoint.getUserPasswort().toCharArray()); //设置密码        }        if (connectPoint.isUseSSL() && connectPoint.getSslProperties() != null) {            mMqttConnectOptions.setSSLProperties(connectPoint.getSslProperties());        }        if (RegexHelper.isNotEmpty(connectPoint.getLwt())) {            mMqttConnectOptions.setWill(connectPoint.getLwt().getTopic(), connectPoint.getLwt().getMessage().getBytes(), connectPoint.getLwt().getQos(), connectPoint.getLwt().isRetained());        }    }    private void connect() {        if (mqttAndroidClient == null) {            $prepared(connectPoint);        }        if (!mqttAndroidClient.isConnected() && NetSuit.checkEnable(this)) {            try {                mqttAndroidClient.connect(mMqttConnectOptions, null, iMqttActionListener);            } catch (MqttException e) {                e.printStackTrace();            }        }    }    private void disconnect() {        try {            if (mqttAndroidClient == null) return;            mqttAndroidClient.unregisterResources();            mqttAndroidClient.disconnect();            mqttAndroidClient.close();        } catch (MqttException e) {            e.printStackTrace();        } finally {            mqttAndroidClient = null;        }    }    private void subscribe(MQTTMessage subscribe) {        try {            if (mqttAndroidClient == null || subscribe == null || subscribe.getTopic() == null)                return;            MqttMessage.validateQos(subscribe.getQos());            List sub = connectPoint.getSubTopics();            if (sub == null) {                sub = new ArrayList<>();            }            Long count = Stream.of(sub)                    .filter(item -> subscribe.getTopic().equalsIgnoreCase(item.getTopic()))                    .collect(Collectors.counting());            if (count > 0) {                return;            }            sub.add(subscribe);            connectPoint.setSubTopics(sub);            mqttAndroidClient.subscribe(subscribe.getTopic(), subscribe.getQos());        } catch (IllegalArgumentException e) {            e.printStackTrace();        } catch (MqttException e) {            e.printStackTrace();        }    }    private void subscribeAll(String[] topics, int[] qos) {        if (RegexHelper.isAnyEmpty(topics, qos)) return;        if (mqttAndroidClient == null) return;        if (topics.length != qos.length) return;        try {            mqttAndroidClient.subscribe(topics, qos);        } catch (MqttException e) {            e.printStackTrace();        }    }    private void unsubscribe(MQTTMessage subscribe) {        try {            if (mqttAndroidClient == null || connectPoint == null) return;            List sub = connectPoint.getSubTopics();            if (sub != null) {                List filtered = Stream.of(sub)                        .filter(item -> !subscribe.getTopic().equalsIgnoreCase(item.getTopic()))                        .collect(Collectors.toList());                connectPoint.setSubTopics(filtered);            }            mqttAndroidClient.unsubscribe(subscribe.getTopic());        } catch (MqttException e) {            e.printStackTrace();        }    }    private void unsubscribeAll(String[] topics) {        try {            if (mqttAndroidClient == null) return;            if (topics == null) mqttAndroidClient.unsubscribe("#");            else {                mqttAndroidClient.unsubscribe(topics);            }            if (connectPoint != null) {                connectPoint.setSubTopics(null);            }        } catch (MqttException e) {            e.printStackTrace();        }    }    private void publish(MQTTMessage subscribe) {        try {            MqttMessage.validateQos(subscribe.getQos());            mqttAndroidClient.publish(subscribe.getTopic(), subscribe.getMessage().getBytes(), subscribe.getQos(), subscribe.isRetained());        } catch (IllegalArgumentException e) {            e.printStackTrace();        } catch (MqttException e) {            e.printStackTrace();        }    }}

 

更多相关文章

  1. USB UMS MTP设置过程 (一)
  2. 【EditText】Android(安卓)中设置 EditText 光标颜色
  3. Android(安卓)高通代码预制apk可卸载,恢复出厂设置apk可恢复 Andr
  4. Android(安卓)permission 访问权限大全
  5. android-----EditText
  6. android 闹钟提醒并且在锁屏下弹出Dialog对话框并播放铃声和震动
  7. android 开机不弹出Launcher选择,直接进入需要默认的Launcher,
  8. Android(安卓)图片设置圆角
  9. android 调用 SharedPreferences 实现偏好信息设置

随机推荐

  1. Android呼出电话流程(原)
  2. android关于轮询的一种解决方案
  3. react-native 键盘遮挡输入框
  4. Android(安卓)- adb - Linux - 程序“adb
  5. Android(安卓)Wifi 启动过程分析
  6. android 开发工具相关
  7. Android琐碎(壹)
  8. Android中HAL如何向上层提供接口总结-hw_
  9. Cocos2d-x官方中文文档
  10. Android(安卓)各种功能代码收集