Android(安卓)Stomp客户端
16lz
2021-01-25
STOMP是一个简单的可互操作的协议, 被用于通过中间服务器在客户端之间进行异步消息传递。它定义了一种在客户端与服务端进行消息传递的文本格式.
使用 StompProtocolAndroid 实现Android端的websocket长连接。
package com.kedacom.androidteam.trafficpoliceaffairs.view.activity.checkpointimport android.arch.lifecycle.Observerimport android.content.Intentimport android.os.Bundleimport android.support.v4.content.ContextCompatimport android.support.v7.widget.LinearLayoutManagerimport android.view.Viewimport com.kedacom.androidteam.trafficpoliceaffairs.BRimport com.kedacom.androidteam.trafficpoliceaffairs.Rimport com.kedacom.androidteam.trafficpoliceaffairs.bean.checkpoint.ClientInfoimport com.kedacom.androidteam.trafficpoliceaffairs.bean.checkpoint.InspectItemInfoimport com.kedacom.androidteam.trafficpoliceaffairs.constants.CheckpointConstantimport com.kedacom.androidteam.trafficpoliceaffairs.databinding.ActivityCheckpointVehicleInspectionBindingimport com.kedacom.androidteam.trafficpoliceaffairs.net.NetConfigimport com.kedacom.androidteam.trafficpoliceaffairs.utils.ACacheimport com.kedacom.androidteam.trafficpoliceaffairs.utils.GsonUtilimport com.kedacom.androidteam.trafficpoliceaffairs.view.activity.BaseActivityimport com.kedacom.androidteam.trafficpoliceaffairs.view.activity.process.VideoCollectActivityimport com.kedacom.androidteam.trafficpoliceaffairs.viewmodel.checkpoint.VehicleInspectionViewModelimport com.kedacom.lego.adapter.recyclerview.LegoBaseRecyclerViewAdapterimport com.kedacom.lego.annotation.ContentViewimport com.kedacom.lego.annotation.ViewModelimport com.kedacom.lego.message.LegoEventBusimport com.kedacom.util.LegoLogimport com.kedacom.widget.keyboard.KeyboardUtilimport com.kedacom.widget.keyboard.platenum.PlateNumEditorKeyboardViewimport io.reactivex.android.schedulers.AndroidSchedulersimport io.reactivex.disposables.CompositeDisposableimport io.reactivex.schedulers.Schedulersimport kotlinx.android.synthetic.main.activity_checkpoint_vehicle_inspection.*import ua.naiksoftware.stomp.Stompimport ua.naiksoftware.stomp.StompClientimport ua.naiksoftware.stomp.dto.LifecycleEventimport ua.naiksoftware.stomp.dto.StompHeaderimport ua.naiksoftware.stomp.dto.StompMessageimport java.util.*@ContentView(R.layout.activity_checkpoint_vehicle_inspection)@ViewModel(VehicleInspectionViewModel::class)class VehicleInspectionActivity : BaseActivity() { private var mStompClient: StompClient? = null private var compositeDisposable: CompositeDisposable? = null private var errorFlag = false//Stomp发生错误标识 private var reconnectionNum = 0//重连次数 public override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) initWebsocket() } ////////////////////////////////////websocket-start/////////////////////////////////////// /** * 初始化Stomp,并自动连接 * https://www.jianshu.com/p/a60865393239 */ private fun initWebsocket() { mStompClient = Stomp.over(Stomp.ConnectionProvider.OKHTTP, NetConfig.HTTP_BASE_URL + "kits-jcz-server/ws/stomp/websocket") resetSubscriptions() connectStomp() } /** * 连接Stomp */ private fun connectStomp() { val headers: MutableList = ArrayList() val userId = ACache.get(this).getAsString(NetConfig.CONSTANT_ID) val deptCode = ACache.get(this).getAsString(NetConfig.CONSTANT_DEPT_CODE) val map = mapOf( "userId" to userId, "stationCode" to deptCode, "topics" to arrayOf("/topic/kitsAlarm/app") ) headers.add(StompHeader("clientId", map.toJson)) headers.add(StompHeader("onDisconnectTopic", "/topic")) mStompClient!!.withClientHeartbeat(1000).withServerHeartbeat(1000) resetSubscriptions() //监听lifecycleEvent的回调状态 val dispLifecycle = mStompClient!!.lifecycle() .doOnError { throwable: Throwable -> LegoLog.e("连接异常:$throwable.message", throwable) } .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe({ lifecycleEvent: LifecycleEvent -> when (lifecycleEvent.type) { LifecycleEvent.Type.OPENED -> LegoLog.d("Stomp connection opened") LifecycleEvent.Type.ERROR -> { errorFlag = true LegoLog.e("Stomp connection error", lifecycleEvent.exception) } LifecycleEvent.Type.CLOSED -> { LegoLog.d("Stomp connection closed") resetSubscriptions() reconnectionNum++ if (errorFlag && reconnectionNum < 11) { LegoLog.i("连接异常断开,第${reconnectionNum}次自动重连") connectStomp() } } LifecycleEvent.Type.FAILED_SERVER_HEARTBEAT -> LegoLog.d("Stomp failed server heartbeat") } }, { throwable: Throwable? -> LegoLog.w("stomp连接时subscribe发生异常:${throwable?.message}") }) compositeDisposable?.add(dispLifecycle) // Receive greetings val dispTopic = mStompClient!!.topic("/topic/photo/app/$userId") .doOnError { throwable: Throwable -> LegoLog.e("订阅异常:$throwable.message", throwable) } .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe({ topicMessage: StompMessage -> val payloadStr = topicMessage.payload LegoLog.i("接收到核查消息: $payloadStr") if (payloadStr.isNullOrEmpty()) { LegoLog.w("推送的数据为空") } else { val itemInfo = GsonUtil.GsonToBean(payloadStr, InspectItemInfo::class.java) showToast("核查车辆:${itemInfo.plateNo}") nViewModel.saveInfo(itemInfo) } }, { throwable: Throwable? -> LegoLog.w("stomp订阅时subscribe发生异常:${throwable?.message}") }) compositeDisposable?.add(dispTopic) //开始连接 mStompClient?.connect(headers) } private fun resetSubscriptions() { compositeDisposable?.dispose() compositeDisposable = CompositeDisposable() } /////////////////////////////////////websocket-end////////////////////////////////////// override fun onDestroy() { mStompClient?.disconnect() compositeDisposable?.dispose() super.onDestroy() } //扩展函数 val Any.toJson: String get() = Gson().toJson(this)}
参考:StompProtocolAndroid 使用方法
更多相关文章
- Nginx系列教程(二)| 一文带你读懂Nginx的正向与反向代理
- RHEL 6 下 DHCP+TFTP+FTP+PXE+Kickstart 实现无人值守安装
- Android(安卓)-- 与WEB交互在同一个会话Session中通信
- HttpURLConnection和HttpClient
- Android下的RTSP客户端搭建
- Android(安卓)apk安装的几种方法
- Android(安卓)LockScreen .
- Android(安卓)Activity的显示过程简述
- android 通过intent调用短消息的正确方法