客户端:

package com.example.fc.activity;import android.app.AlarmManager;import android.app.PendingIntent;import android.app.Service;import android.content.BroadcastReceiver;import android.content.Context;import android.content.Intent;import android.content.IntentFilter;import android.net.ConnectivityManager;import android.net.NetworkInfo;import android.os.Handler;import android.os.HandlerThread;import android.os.IBinder;import android.provider.Settings.Secure;import android.util.Log;import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;import org.eclipse.paho.client.mqttv3.MqttCallback;import org.eclipse.paho.client.mqttv3.MqttClient;import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;import org.eclipse.paho.client.mqttv3.MqttException;import org.eclipse.paho.client.mqttv3.MqttMessage;import org.eclipse.paho.client.mqttv3.MqttPersistenceException;import org.eclipse.paho.client.mqttv3.MqttTopic;import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence;import java.util.Locale;/** * 推送服务 */public class MqttService extends Service implements MqttCallback {    public static final String DEBUG_TAG = "MqttService"; // Log标记    public static String            MQTT_CLIENT_ID = "Fangchao";    private static final String MQTT_THREAD_NAME = "MqttService[" + DEBUG_TAG + "]"; // Handler Thread ID    private static final String MQTT_BROKER_TEST = "192.168.0.183"; //测试地址    private static final String MQTT_BROKER_ONLINE = "mqtt.supumall.com"; //正式地址    private static final String MQTT_BROKER = MQTT_BROKER_TEST;    private static final int MQTT_PORT = 1883;                // 服务器推送端口    public static final int MQTT_QOS_0 = 0; //消息投放级别 QOS Level 0 (最多一次,有可能重复或丢失。 )    public static final int MQTT_QOS_1 = 1; //消息投放级别 QOS Level 1 (至少一次,有可能重复。 )    public static final int MQTT_QOS_2 = 2; //消息投放级别 QOS Level 2 (只有一次,确保消息只到达一次(用于比较严格的计费系统)。)    public static final String[] topicFilters = {"Fangchao"};//订阅的主题    public static int[] qos = {MQTT_QOS_0};//订阅级别    private static final int MQTT_KEEP_ALIVE = 4 * 60 * 1000; //心跳包时间,毫秒    private static final String MQTT_KEEP_ALIVE_TOPIC_FORAMT = "/users/%s/keepalive"; // Topic format for KeepAlives    private static final byte[] MQTT_KEEP_ALIVE_MESSAGE = {0}; // 心跳包发送内容    private static final int MQTT_KEEP_ALIVE_QOS = MQTT_QOS_0; //心跳包的发送级别默认最低    private static final boolean MQTT_CLEAN_SESSION = true; // Start a clean session?    private static final String MQTT_URL_FORMAT = "tcp://%s:%d"; // 推送url格式组装    private static final String ACTION_START = MQTT_CLIENT_ID + ".START"; // Action to start 启动    private static final String ACTION_STOP = MQTT_CLIENT_ID + ".STOP"; // Action to stop 停止    private static final String ACTION_KEEPALIVE = MQTT_CLIENT_ID + ".KEEPALIVE"; // Action to keep alive used by alarm manager保持心跳闹钟使用    private static final String ACTION_RECONNECT = MQTT_CLIENT_ID + ".RECONNECT"; // Action to reconnect 重新连接    private static final String DEVICE_ID_FORMAT = "an_%s"; // 设备id的前缀    // Note:设备id限制长度为23个 字符    // An NPE if you go over that limit    private boolean mStarted = false; //推送client是否启动    private String mDeviceId;          // Device ID, Secure.ANDROID_ID    private Handler mConnHandler;      // Seperate Handler thread for networking    private MqttDefaultFilePersistence mDataStore; // Defaults to FileStore// private MqttConnectOptions mOpts; //连接参数    private MqttTopic mKeepAliveTopic;            // Instance Variable for Keepalive topic    private MqttClient mClient;                    // Mqtt Client    private AlarmManager mAlarmManager;            //闹钟    private ConnectivityManager mConnectivityManager; //网络改变接收器    /** * 启动推送服务 * * @param ctx context to start the service with * @return void */    public static void actionStart(Context ctx) {        Intent i = new Intent(ctx, MqttService.class);        i.setAction(ACTION_START);        ctx.startService(i);    }    /** * 停止推送服务 * * @param ctx context to start the service with * @return void */    public static void actionStop(Context ctx) {        Intent i = new Intent(ctx, MqttService.class);        i.setAction(ACTION_STOP);        ctx.startService(i);    }    /** * 发送心跳包 * * @param ctx context to start the service with * @return void */    public static void actionKeepalive(Context ctx) {        Intent i = new Intent(ctx, MqttService.class);        i.setAction(ACTION_KEEPALIVE);        ctx.startService(i);    }    /** * Initalizes the DeviceId and most instance variables * Including the Connection Handler, Datastore, Alarm Manager * and ConnectivityManager. * 初始化设备id和请求参数包含连接处理、数据存储、闹钟警报、网络接收器 */    @Override    public void onCreate() {        super.onCreate();        Log.e("fc","oncreate");        //初始化设备id,长度不能超过23        mDeviceId = String.format(DEVICE_ID_FORMAT,                Secure.getString(getContentResolver(), Secure.ANDROID_ID));        HandlerThread thread = new HandlerThread(MQTT_THREAD_NAME);        thread.start();        mConnHandler = new Handler(thread.getLooper());        mDataStore = new MqttDefaultFilePersistence(getCacheDir().getAbsolutePath());// mOpts = new MqttConnectOptions();// mOpts.setCleanSession(MQTT_CLEAN_SESSION);        // Do not set keep alive interval on mOpts we keep track of it with alarm's        mAlarmManager = (AlarmManager) getSystemService(ALARM_SERVICE);        mConnectivityManager = (ConnectivityManager) getSystemService(CONNECTIVITY_SERVICE);    }    @Override    public boolean isRestricted() {        return super.isRestricted();    }    /** * Service onStartCommand * Handles the action passed via the Intent * 通过意图处理服务 * * @return START_REDELIVER_INTENT */    @Override    public int onStartCommand(Intent intent, int flags, int startId) {        super.onStartCommand(intent, flags, startId);        String action = intent.getAction();        Log.e(DEBUG_TAG, "推送服务接收到一个请求 " + action);        if (action == null) {            Log.e(DEBUG_TAG, "推送服务接收到的请求为null!推送服务不执行任何操作");        } else {            if (action.equals(ACTION_START)) {                       Log.e(DEBUG_TAG, "接收到《启动》推送服务命令");                start();            } else if (action.equals(ACTION_STOP)) {                       Log.e(DEBUG_TAG, "接收到《停止》推送服务命令");                stop();            } else if (action.equals(ACTION_KEEPALIVE)) {                       Log.e(DEBUG_TAG, "接收到《发送心跳包》推送服务命令");                keepAlive();            } else if (action.equals(ACTION_RECONNECT)) {                       Log.e(DEBUG_TAG, "接收到《重启》推送服务命令");                if (isNetworkAvailable()) {                    reconnectIfNecessary();                }            }        }        return START_REDELIVER_INTENT;    }    /** * 尝试启动推送服务器,并注册网络改变接收器 */    private synchronized void start() {        if (mStarted) {                   Log.e(DEBUG_TAG, "尝试启动推送服务,但推送服务已经启动");            return;        }        if (hasScheduledKeepAlives()) {            stopKeepAlives();        }        connect();        registerReceiver(mConnectivityReceiver, new IntentFilter(ConnectivityManager.CONNECTIVITY_ACTION));    }    /** * 停止推送服务 */    private synchronized void stop() {        if (!mStarted) {                   Log.e(DEBUG_TAG, "试图停止推送服务器但是推送服务并没有运行");            return;        }        if (mClient != null) {            mConnHandler.post(new Runnable() {                @Override                public void run() {                    try {                        mClient.disconnect();                    } catch (MqttException ex) {                        ex.printStackTrace();                    }                    mClient = null;                    mStarted = false;                    stopKeepAlives();                }            });        }        unregisterReceiver(mConnectivityReceiver);    }    /** * Connects to the broker with the appropriate datastore * 连接到推送服务器与适当的数据存储 */    private synchronized void connect() {        String url = String.format(Locale.US, MQTT_URL_FORMAT, MQTT_BROKER, MQTT_PORT);               Log.e(DEBUG_TAG, "连接推送服务器 设备id:" + mDeviceId + " with URL: " + url);        try {            mClient = new MqttClient(url, mDeviceId, mDataStore);        } catch (MqttException e) {            e.printStackTrace();        }        mConnHandler.post(new Runnable() {            @Override            public void run() {                try {                    mClient.connect();                    mClient.subscribe(topicFilters, qos);                    mClient.setCallback(MqttService.this);                    mStarted = true; // Service is now connected                           Log.e(DEBUG_TAG, "成功连接推送服务器并启动心跳包闹钟");                    startKeepAlives();                } catch (MqttException e) {                    e.printStackTrace();                }            }        });    }    /** * 启动心跳包闹钟 */    private void startKeepAlives() {        Intent i = new Intent();        i.setClass(this, MqttService.class);        i.setAction(ACTION_KEEPALIVE);        PendingIntent pi = PendingIntent.getService(this, 0, i, 0);        mAlarmManager.setRepeating(AlarmManager.RTC_WAKEUP,                System.currentTimeMillis() + MQTT_KEEP_ALIVE,                MQTT_KEEP_ALIVE, pi);    }    /** * 取消已经存在的闹钟 */    private void stopKeepAlives() {        Intent i = new Intent();        i.setClass(this, MqttService.class);        i.setAction(ACTION_KEEPALIVE);        PendingIntent pi = PendingIntent.getService(this, 0, i, 0);        mAlarmManager.cancel(pi);    }    /** * 发送心跳数据到服务器 */    private synchronized void keepAlive() {        if (isConnected()) {            try {                sendKeepAlive();                return;            } catch (MqttConnectivityException ex) {                ex.printStackTrace();                reconnectIfNecessary();            } catch (MqttPersistenceException ex) {                ex.printStackTrace();                stop();            } catch (MqttException ex) {                ex.printStackTrace();                stop();            }        }    }    /** * 重新连接如果他是必须的 */    private synchronized void reconnectIfNecessary() {        if (mStarted && mClient == null) {            connect();        } else {                   Log.e(DEBUG_TAG, "重新连接没有启动,mStarted:" + String.valueOf(mStarted) + " mClient:" + mClient);        }    }    @Override    public void onStart(Intent intent, int startId) {        super.onStart(intent, startId);    }    /** * 通过ConnectivityManager查询网络连接状态 * * @return 如果网络状态正常则返回true反之flase */    private boolean isNetworkAvailable() {        NetworkInfo info = mConnectivityManager.getActiveNetworkInfo();        return (info == null) ? false : info.isConnected() && info.isAvailable();    }    /** * 判断推送服务是否连接 * * @return 如果是连接的则返回true反之false */    private boolean isConnected() {        if (mStarted && mClient != null && !mClient.isConnected()) {                   Log.e(DEBUG_TAG, "判断推送服务已经断开");        }        if (mClient != null) {            return (mStarted && mClient.isConnected()) ? true : false;        }        return false;    }    /** * 网络状态发生变化接收器 */    private final BroadcastReceiver mConnectivityReceiver = new BroadcastReceiver() {        @Override        public void onReceive(Context context, Intent intent) {            if (isNetworkAvailable()) {                       Log.e(DEBUG_TAG, "网络连接发生了变化--网络连接");                reconnectIfNecessary();            } else {                       Log.e(DEBUG_TAG, "网络连接发生了变化--网络断开");                stopKeepAlives();                mClient = null;            }        }    };    /** * 发送保持连接的指定的主题 * * @return MqttDeliveryToken specified token you can choose to wait for completion */    private synchronized MqttDeliveryToken sendKeepAlive()            throws MqttConnectivityException, MqttPersistenceException, MqttException {        if (!isConnected())            throw new MqttConnectivityException();        if (mKeepAliveTopic == null) {            mKeepAliveTopic = mClient.getTopic(                    String.format(Locale.US, MQTT_KEEP_ALIVE_TOPIC_FORAMT, mDeviceId));        }               Log.e(DEBUG_TAG, "向服务器发送心跳包url: " + MQTT_BROKER);        MqttMessage message = new MqttMessage(MQTT_KEEP_ALIVE_MESSAGE);        message.setQos(MQTT_KEEP_ALIVE_QOS);        return mKeepAliveTopic.publish(message);    }    /** * 查询是否已经有一个心跳包的闹钟 * * @return 如果已经有一个心跳包的闹钟则返回true反之false */    private synchronized boolean hasScheduledKeepAlives() {        Intent i = new Intent();        i.setClass(this, MqttService.class);        i.setAction(ACTION_KEEPALIVE);        PendingIntent pi = PendingIntent.getBroadcast(this, 0, i, PendingIntent.FLAG_NO_CREATE);        return (pi != null) ? true : false;    }    @Override    public IBinder onBind(Intent arg0) {        return null;    }    /** * 连接丢失回调 */    @Override    public void connectionLost(Throwable arg0) {               Log.e(DEBUG_TAG, "推送回调函数连接丢失connectionLost方法执行");        stopKeepAlives();        mClient = null;        if (isNetworkAvailable()) {            reconnectIfNecessary();        }    }    /** * 收到推送信息 */    @Override    public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {               Log.e(DEBUG_TAG, "收到推送信息如下\n Topic:\t" + topic +                " Message:\t" + new String(mqttMessage.getPayload()) +                " QoS:\t" + mqttMessage.getQos());        com.example.fc.activity.MqttNotifier.notify(this, new String(mqttMessage.getPayload()));    }    @Override    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {               Log.e(DEBUG_TAG, "推送回调函数deliveryComplete方法执行");    }    /** * MqttConnectivityException Exception class */    private class MqttConnectivityException extends Exception {        private static final long serialVersionUID = -7385866796799469420L;    }}

服务端:

package com.fc;import javax.jms.Connection;  import javax.jms.ConnectionFactory;  import javax.jms.DeliveryMode;  import javax.jms.Destination;  import javax.jms.JMSException;import javax.jms.MessageProducer;  import javax.jms.Session;  import javax.jms.TextMessage;  import javax.jms.TopicConnection;import javax.jms.TopicConnectionFactory;import javax.jms.TopicSession;import javax.swing.JScrollBar;import org.apache.activemq.ActiveMQConnection;  import org.apache.activemq.ActiveMQConnectionFactory;  import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import com.ibm.mqtt.MqttClient;import com.ibm.mqtt.MqttException;import com.ibm.mqtt.MqttSimpleCallback;@SuppressWarnings("unused")public class MqttBroker {      private static final int SEND_NUMBER = 5;      static TopicConnection  connection;    static TopicSession session;/* public static void main(String[] args) throws JMSException { // ConnectionFactory :连接工厂,JMS 用它创建连接 ConnectionFactory connectionFactory; // Connection :JMS 客户端到JMS Provider 的连接 Connection connection = null; // Session: 一个发送或接收消息的线程 Session session; // Destination :消息的目的地;消息发送给谁. ActiveMQConnectionFactory Destination destination; // MessageProducer:消息发送者 MessageProducer producer; // TextMessage message; // 构造ConnectionFactory实例对象,此处采用ActiveMq的实现jar connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://192.168.0.183:61616"); try { // 构造从工厂得到连接对象 connection = connectionFactory.createConnection(); // 启动 connection.start(); // 获取操作连接 session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置 destination = session.createQueue("FirstQueue"); // 得到消息生成者【发送者】 producer = session.createProducer(destination); // 设置不持久化,此处学习,实际根据项目决定 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); // 构造消息,此处写死,项目就是参数,或者方法获取 sendMessage(session, producer); session.commit(); } catch (Exception e) { e.printStackTrace(); } finally { try { if (null != connection) connection.close(); } catch (Throwable ignore) { } } init(); sendMessage("client123456","11111111111111111111111111111111"); } private static void init() throws JMSException { // 创建链接工厂 TopicConnectionFactory factory = new ActiveMQConnectionFactory( ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://192.168.0.183:1883"); connection = factory.createTopicConnection(); // 启动连接 connection.start(); System.out.println("启动成功"); // 创建一个session会话 transacted session = connection.createTopicSession( Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); } *//** * 关闭connection和session * * @throws Exception *//* private void close() throws Exception { // 关闭释放资源 if (session != null) { session.close(); session = null; } if (connection != null) { connection.close(); connection = null; } System.out.println("断开连接"); } public static int sendMessage( String DESTINATION,String message)throws Exception { for (int i = 1; i <= 1; i++) { TextMessage message = session.createTextMessage("ActiveMq 发送的消息" + i); // 发送消息到目的地方 System.out.println("发送消息:" + "ActiveMq 发送的消息" + i); producer.send(message); } javax.jms.Topic topic; try { // 创建一个消息队列 topic = session.createTopic(DESTINATION); } catch (Exception e) { init(); topic = session.createTopic(DESTINATION); } // 创建消息发送者 javax.jms.TopicPublisher publisher = session.createPublisher(topic); // 设置持久化模式 publisher.setDeliveryMode(DeliveryMode.PERSISTENT); // flag=0,表示成功;flag=1,表示失败 int flag = 0; System.out.println(message); TextMessage textMessage = session.createTextMessage(message); publisher.send(textMessage); return flag; } */     private final static Log logger = LogFactory.getLog(MqttBroker.class);// 日志对象         // 连接参数         private final static String CONNECTION_STRING = "tcp://192.168.0.183:1883";  // private final static String CONNECTION_STRING = "tcp://192.168.0.183:1883";         private final static boolean CLEAN_START = true;          private final static short KEEP_ALIVE = 30;// 低耗网络,但是又需要及时获取数据,心跳30s         private final static String CLIENT_ID = "master";// 客户端标识         private final static int[] QOS_VALUES = { 0, 0, 2, 0 };// 对应主题的消息级别         private final static String[] TOPICS = { "Test/TestTopics/Topic1",                  "Test/TestTopics/Topic2", "Test/TestTopics/Topic3",                  "client/keepalive" };          private static MqttBroker instance = new MqttBroker();          private MqttClient mqttClient;          /** * 返回实例对象 * * @return */          public static MqttBroker getInstance() {              return instance;          }          /** * 重新连接服务 */          private void connect() throws MqttException {              logger.info("connect to mqtt broker.");              System.out.println("connect to mqtt broker.");              mqttClient = new MqttClient(CONNECTION_STRING);              logger.info("***********register Simple Handler***********");              System.out.println("***********register Simple Handler***********");              SimpleCallbackHandler simpleCallbackHandler = new SimpleCallbackHandler();              mqttClient.registerSimpleHandler(simpleCallbackHandler);// 注册接收消息方法             mqttClient.connect(CLIENT_ID, CLEAN_START, KEEP_ALIVE);              logger.info("***********subscribe receiver topics***********");              System.out.println("***********subscribe receiver topics***********");              mqttClient.subscribe(TOPICS, QOS_VALUES);// 订阅接主题             logger.info("***********CLIENT_ID:" + CLIENT_ID);              System.out.println("***********CLIENT_ID:" + CLIENT_ID);              /** * 完成订阅后,可以增加心跳,保持网络通畅,也可以发布自己的消息 */              mqttClient.publish("keepalive", "keepalive".getBytes(), QOS_VALUES[0],                      true);// 增加心跳,保持网络通畅         }          /** * 发送消息 * * @param clientId 客户端主题 不是id * @param messageId */          public void sendMessage(String clientId, String message) {              try {                  if (mqttClient == null || !mqttClient.isConnected()) {                      connect();                  }                  System.out.println("send message to " + clientId + ", message is "                          + message);                  logger.info("send message to " + clientId + ", message is "                          + message);                  // 发布自己的消息                 mqttClient.publish(clientId, message.getBytes(),                          0, false);                  /*mqttClient.publish("GMCC/tokudu/" + clientId, message.getBytes(), 0, false); */               System.out.println(" #####################" + CLIENT_ID);              } catch (MqttException e) {                  logger.error(e.getCause());                  e.printStackTrace();              }          }          /** * 简单回调函数,处理server接收到的主题消息 * * @author Join * */          class SimpleCallbackHandler implements MqttSimpleCallback {              /** * 当客户机和broker意外断开时触发 可以再此处理重新订阅 */              @Override              public void connectionLost() throws Exception {                  // TODO Auto-generated method stub                 System.out.println("客户机和broker已经断开");              }              /** 和broker已 * 客户端订阅消息后,该方法负责回调接收处理消息 */              @Override              public void publishArrived(String topicName, byte[] payload, int Qos,                      boolean retained) throws Exception {                  // TODO Auto-generated method stub                 System.out.println("订阅主题: " + topicName);                  System.out.println("消息数据: " + new String(payload));                  System.out.println("消息级别(0,1,2): " + Qos);                  System.out.println("是否是实时发送的消息(false=实时,true=服务器上保留的最后消息): "                          + retained);              }          }          public static void main(String[] args) {              String ss="{\"title\": \"ghjklffff\",\"content\":\"aaaaaaaaaaaaaaaaaaa\",\"type\":1}";            new MqttBroker().sendMessage("Fangchao",ss.replace("\\", ""));          }  //一定要用tokudu/a06e51a5f424fb70 不要用tokudu.aasdf 用反斜杠 不要用点    } 

客户端 服务器 所用到的jar 包 。只用wmqtt.jar 就可以进行发送与接收消息,但notification弹窗跳转有问题,所以客户端 改用org.eclipse ….jar 服务器还用wmqtt.jar 就可以.客户端有两个接受推送服务:第一个是pushservice 用的是wmqtt.jar 第二个是Mqttservice 用到的是org.eclipse…那个。这两个服务可以在pushactivity中切换,去掉注释即可。下载链接

客户端源码 只看PushClient

更多相关文章

  1. Android(安卓)网络管理类的使用(一)
  2. Android(安卓)Eventbus发送消息
  3. 蓝牙后台长连接 服务
  4. Notification 最简单的使用
  5. android 获取wifi信息
  6. Android(安卓)Studio 蓝牙配对
  7. EventBus 3 for Android
  8. android bluetooth开发基础-7管理连接
  9. Android(安卓)通知提示功能

随机推荐

  1. 一句实现jquery导航栏
  2. 百花齐放 日月同辉――采用JSI解决不同类
  3. 基于AdminLTE的jquery头像更新
  4. jQuery Validate 表单验证插件
  5. css3和jquery实现的可折叠导航菜单(适合
  6. jquery 如何同时选择多个不同的id执行同
  7. Struts2 json jQuery ajax 的集成配置与
  8. Yii就地crud文本小部件
  9. jQuery遍历----------(遍历、祖先、后代
  10. day049--jQuery文档操作示例