由于andoid项目中有大量需要用到消息推送,平台端在实现消息推送选择activemq,为了能使用消息推送,决定研究一下如果在android端实现activemq的消息推送。
       这个问题说难不难,说易也不易,平台端开发人员选择activemq并且认为activemq是用java的,所以认为android使用它应该是很容易的,其实我很想告诉他们:"you are wrong!"。android其实能直接把activemq中的lib库导入android并不能正常使用,虽然编译通过了,但是执行时会报一些类没有找到的错误!在这个问题上我纠结了好久,网上也找过相关资料,根本就找不到。通过仅一天的查找,调试,到网络找相关项目,最后终于让我找到一个能用的项目,毫无疑问,老外写的。 Android-Paho-Mqtt-Service。使用这个项目库终于可以正常使用activemq的消息推送了。
     activemq下载地址:http://activemq.apache.org/download-archives.html,我使用的是activemq5.9.1。为什么不使用最新版本,因为我们服务器就是使用的这个版本。

     Android-Paho-Mqtt-Service 下载地址:https://github.com/JesseFarebro/Android-Mqtt

     测试项目相关文件

         1.MainActivity类

   

public class MainActivity extends Activity { @Override protected void onCreate(Bundle savedInstanceState) {  super.onCreate(savedInstanceState);  setContentView(R.layout.activity_main);  MqttService.actionStart(this); }  @Override protected void onDestroy() {  // TODO Auto-generated method stub  super.onDestroy();  MqttService.actionStop(this); }}

2 MqttService类,这个类文件是我从Android-Paho-Mqtt-Service项目中下载的文件,修改了服务器地址

public class MqttService extends Service implements MqttCallback{ public static final String DEBUG_TAG = "MqttService"; // Debug TAG  private static final String MQTT_THREAD_NAME = "MqttService[" + DEBUG_TAG + "]"; // Handler Thread ID  private static final String MQTT_BROKER = "192.168.11.62"; // Broker URL or IP Address private static final int MQTT_PORT = 1883; // Broker Port  public static final int MQTT_QOS_0 = 0; // QOS Level 0 ( Delivery Once no confirmation ) public static final int MQTT_QOS_1 = 1; // QOS Level 1 ( Delevery at least Once with confirmation ) public static final int MQTT_QOS_2 = 2; // QOS Level 2 ( Delivery only once with confirmation with handshake )  private static final int MQTT_KEEP_ALIVE = 240000; // KeepAlive Interval in MS private static final String MQTT_KEEP_ALIVE_TOPIC_FORAMT = "/users/%s/keepalive"; // Topic format for KeepAlives private static final byte[] MQTT_KEEP_ALIVE_MESSAGE = { 0 }; // Keep Alive message to send private static final int MQTT_KEEP_ALIVE_QOS = MQTT_QOS_0; // Default Keepalive QOS  private static final boolean MQTT_CLEAN_SESSION = true; // Start a clean session?  private static final String MQTT_URL_FORMAT = "tcp://%s:%d"; // URL Format normally don't change  private static final String ACTION_START = DEBUG_TAG + ".START"; // Action to start private static final String ACTION_STOP = DEBUG_TAG + ".STOP"; // Action to stop private static final String ACTION_KEEPALIVE= DEBUG_TAG + ".KEEPALIVE"; // Action to keep alive used by alarm manager private static final String ACTION_RECONNECT= DEBUG_TAG + ".RECONNECT"; // Action to reconnect   private static final String DEVICE_ID_FORMAT = "ljf_%s"; // Device ID Format, add any prefix you'd like                  // Note: There is a 23 character limit you will get                  // An NPE if you go over that limit private boolean mStarted = false; // Is the Client started? private String mDeviceId; // Device ID, Secure.ANDROID_ID private Handler mConnHandler; // Seperate Handler thread for networking  private MqttDefaultFilePersistence mDataStore; // Defaults to FileStore private MemoryPersistence mMemStore; // On Fail reverts to MemoryStore private MqttConnectOptions mOpts; // Connection Options  private MqttTopic mKeepAliveTopic; // Instance Variable for Keepalive topic  private MqttClient mClient; // Mqtt Client  private AlarmManager mAlarmManager; // Alarm manager to perform repeating tasks private ConnectivityManager mConnectivityManager; // To check for connectivity changes /**  * Start MQTT Client  * @param Context context to start the service with  * @return void  */ public static void actionStart(Context ctx) {  Intent i = new Intent(ctx,MqttService.class);  i.setAction(ACTION_START);  ctx.startService(i); } /**  * Stop MQTT Client  * @param Context context to start the service with  * @return void  */ public static void actionStop(Context ctx) {  Intent i = new Intent(ctx,MqttService.class);  i.setAction(ACTION_STOP);  ctx.startService(i); } /**  * Send a KeepAlive Message  * @param Context context to start the service with  * @return void  */ public static void actionKeepalive(Context ctx) {  Intent i = new Intent(ctx,MqttService.class);  i.setAction(ACTION_KEEPALIVE);  ctx.startService(i); }  /**  * Initalizes the DeviceId and most instance variables  * Including the Connection Handler, Datastore, Alarm Manager  * and ConnectivityManager.  */ @Override public void onCreate() {  super.onCreate();    mDeviceId = String.format(DEVICE_ID_FORMAT,     Secure.getString(getContentResolver(), Secure.ANDROID_ID));    HandlerThread thread = new HandlerThread(MQTT_THREAD_NAME);  thread.start();    mConnHandler = new Handler(thread.getLooper());    try {   mDataStore = new MqttDefaultFilePersistence(getCacheDir().getAbsolutePath());  } catch(MqttPersistenceException e) {   e.printStackTrace();   mDataStore = null;   mMemStore = new MemoryPersistence();  }    mOpts = new MqttConnectOptions();  mOpts.setCleanSession(MQTT_CLEAN_SESSION);  // Do not set keep alive interval on mOpts we keep track of it with alarm's    mAlarmManager = (AlarmManager) getSystemService(ALARM_SERVICE);  mConnectivityManager = (ConnectivityManager) getSystemService(CONNECTIVITY_SERVICE); }  /**  * Service onStartCommand  * Handles the action passed via the Intent  *   * @return START_REDELIVER_INTENT  */ @Override public int onStartCommand(Intent intent, int flags, int startId) {  super.onStartCommand(intent, flags, startId);    String action = intent.getAction();    Log.i(DEBUG_TAG,"Received action of " + action);    if(action == null) {   Log.i(DEBUG_TAG,"Starting service with no action\n Probably from a crash");  } else {   if(action.equals(ACTION_START)) {    Log.i(DEBUG_TAG,"Received ACTION_START");    start();   } else if(action.equals(ACTION_STOP)) {    stop();   } else if(action.equals(ACTION_KEEPALIVE)) {     keepAlive();   } else if(action.equals(ACTION_RECONNECT)) {    if(isNetworkAvailable()) {     reconnectIfNecessary();    }   }  }    return START_REDELIVER_INTENT; }  /**  * Attempts connect to the Mqtt Broker  * and listen for Connectivity changes  * via ConnectivityManager.CONNECTVITIY_ACTION BroadcastReceiver  */ private synchronized void start() {  if(mStarted) {   Log.i(DEBUG_TAG,"Attempt to start while already started");   return;  }    if(hasScheduledKeepAlives()) {   stopKeepAlives();  }    connect();    registerReceiver(mConnectivityReceiver,new IntentFilter(ConnectivityManager.CONNECTIVITY_ACTION)); } /**  * Attempts to stop the Mqtt client  * as well as halting all keep alive messages queued  * in the alarm manager  */ private synchronized void stop() {  if(!mStarted) {   Log.i(DEBUG_TAG,"Attemtpign to stop connection that isn't running");   return;  }    if(mClient != null) {   mConnHandler.post(new Runnable() {    @Override    public void run() {     try {      mClient.disconnect();     } catch(MqttException ex) {      ex.printStackTrace();     }     mClient = null;     mStarted = false;          stopKeepAlives();    }   });  }    unregisterReceiver(mConnectivityReceiver); } /**  * Connects to the broker with the appropriate datastore  */ private synchronized void connect() {  String url = String.format(Locale.US, MQTT_URL_FORMAT, MQTT_BROKER, MQTT_PORT);  Log.i(DEBUG_TAG,"Connecting with URL: " + url);  try {   if(mDataStore != null) {    Log.i(DEBUG_TAG,"Connecting with DataStore");    mClient = new MqttClient(url,mDeviceId,mDataStore);   } else {    Log.i(DEBUG_TAG,"Connecting with MemStore");    mClient = new MqttClient(url,mDeviceId,mMemStore);   }  } catch(MqttException e) {   e.printStackTrace();  }    mConnHandler.post(new Runnable() {   @Override   public void run() {    try {     mClient.connect(mOpts);          mClient.subscribe("hello", 0);          mClient.setCallback(MqttService.this);          mStarted = true; // Service is now connected          Log.i(DEBUG_TAG,"Successfully connected and subscribed starting keep alives");          startKeepAlives();    } catch(MqttException e) {     e.printStackTrace();    }   }  }); } /**  * Schedules keep alives via a PendingIntent  * in the Alarm Manager  */ private void startKeepAlives() {  Intent i = new Intent();  i.setClass(this, MqttService.class);  i.setAction(ACTION_KEEPALIVE);  PendingIntent pi = PendingIntent.getService(this, 0, i, 0);  mAlarmManager.setRepeating(AlarmManager.RTC_WAKEUP,    System.currentTimeMillis() + MQTT_KEEP_ALIVE,    MQTT_KEEP_ALIVE, pi); } /**  * Cancels the Pending Intent  * in the alarm manager  */ private void stopKeepAlives() {  Intent i = new Intent();  i.setClass(this, MqttService.class);  i.setAction(ACTION_KEEPALIVE);  PendingIntent pi = PendingIntent.getService(this, 0, i , 0);  mAlarmManager.cancel(pi); } /**  * Publishes a KeepALive to the topic  * in the broker  */ private synchronized void keepAlive() {  if(isConnected()) {   try {    sendKeepAlive();    return;   } catch(MqttConnectivityException ex) {    ex.printStackTrace();    reconnectIfNecessary();   } catch(MqttPersistenceException ex) {    ex.printStackTrace();    stop();   } catch(MqttException ex) {    ex.printStackTrace();    stop();   }  } } /**  * Checkes the current connectivity  * and reconnects if it is required.  */ private synchronized void reconnectIfNecessary() {  if(mStarted && mClient == null) {   connect();  } } /**  * Query's the NetworkInfo via ConnectivityManager  * to return the current connected state  * @return boolean true if we are connected false otherwise  */ private boolean isNetworkAvailable() {  NetworkInfo info = mConnectivityManager.getActiveNetworkInfo();    return (info == null) ? false : info.isConnected(); } /**  * Verifies the client State with our local connected state  * @return true if its a match we are connected false if we aren't connected  */ private boolean isConnected() {  if(mStarted && mClient != null && !mClient.isConnected()) {   Log.i(DEBUG_TAG,"Mismatch between what we think is connected and what is connected");  }    if(mClient != null) {   return (mStarted && mClient.isConnected()) ? true : false;  }    return false; } /**  * Receiver that listens for connectivity chanes  * via ConnectivityManager  */ private final BroadcastReceiver mConnectivityReceiver = new BroadcastReceiver() {  @Override  public void onReceive(Context context, Intent intent) {   Log.i(DEBUG_TAG,"Connectivity Changed...");  } }; /**  * Sends a Keep Alive message to the specified topic  * @see MQTT_KEEP_ALIVE_MESSAGE  * @see MQTT_KEEP_ALIVE_TOPIC_FORMAT  * @return MqttDeliveryToken specified token you can choose to wait for completion  */ private synchronized MqttDeliveryToken sendKeepAlive()  throws MqttConnectivityException, MqttPersistenceException, MqttException {  if(!isConnected())   throw new MqttConnectivityException();    if(mKeepAliveTopic == null) {   mKeepAliveTopic = mClient.getTopic(     String.format(Locale.US, MQTT_KEEP_ALIVE_TOPIC_FORAMT,mDeviceId));  }    Log.i(DEBUG_TAG,"Sending Keepalive to " + MQTT_BROKER);    MqttMessage message = new MqttMessage(MQTT_KEEP_ALIVE_MESSAGE);  message.setQos(MQTT_KEEP_ALIVE_QOS);    return mKeepAliveTopic.publish(message); } /**  * Query's the AlarmManager to check if there is  * a keep alive currently scheduled  * @return true if there is currently one scheduled false otherwise  */ private synchronized boolean hasScheduledKeepAlives() {  Intent i = new Intent();  i.setClass(this, MqttService.class);  i.setAction(ACTION_KEEPALIVE);  PendingIntent pi = PendingIntent.getBroadcast(this, 0, i, PendingIntent.FLAG_NO_CREATE);    return (pi != null) ? true : false; }   @Override public IBinder onBind(Intent arg0) {  return null; } /**  * Connectivity Lost from broker  */ @Override public void connectionLost(Throwable arg0) {  stopKeepAlives();    mClient = null;    if(isNetworkAvailable()) {   reconnectIfNecessary();  } } /**  * Publish Message Completion  */ @Override public void deliveryComplete(MqttDeliveryToken arg0) {   } /**  * Received Message from broker  */ @Override public void messageArrived(MqttTopic topic, MqttMessage message)   throws Exception {  Log.i(DEBUG_TAG," Topic:\t" + topic.getName() +                " Message:\t" + new String(message.getPayload()) +                " QoS:\t" + message.getQos());  Toast.makeText(this, topic.getName(), Toast.LENGTH_LONG).show(); } /**  * MqttConnectivityException Exception class  */ private class MqttConnectivityException extends Exception {  private static final long serialVersionUID = -7385866796799469420L;  }}

3.manifest 文件

                                                                                                                                                                      

测试项目源码下载地址: http://download.csdn.net/detail/junfeng120125/7582209


更多相关文章

  1. 微软发布 mircosft remote desktop for android
  2. Handler消息传递机制
  3. Eclipse项目导入Android Studio,.9图片报错解决办法
  4. Android几种消息推送方案总结
  5. 将Android项目导入Android Studio
  6. Android socket 编程 实现消息推送(二)

随机推荐

  1. android ListView 去除下划线和选中模式
  2. Android:Material Design(三) 动画
  3. Android系统自带样式Android:theme
  4. android Style属性介绍
  5. Android(安卓)textAppearance的属性设置
  6. android:inputType参数类型说明
  7. Android应用开发——系统自带样式Android
  8. Google的Android设备别名
  9. Android应用开发——系统自带样式Android
  10. android:inputType参数类型说明