对于跨进程IPC通信,Android提供了一个完整的框架Binder,而对于线程之间的通信,Android同样提供了一个强大的消息机制:Handler/Looper/MessageQueue,通过Handler我们很容易的实现在UI线程与其他线程之间的消息传递。这篇文章,就来看一看Android消息机制的具体应用以及原理。主要有3个方面:

  • Android消息机制相关的基本概念;
  • Android Handler的具体应用实例;
  • 从源码的角度直接深入了解Handler/Looper/MessageQueue的机制;

首先来看下相关的概念。

基本概念

Handler

一个Handler用于发送或者处理与一个线程的消息队列(MessageQueue)相关联的消息或者可执行对象(Runnable)。每个Handler实例跟一个线程以及线程消息队列相关联。当新建一个Handler实例时,该Handler就跟创建它的线程以及线程队列绑定在一起。此后,它就会将消息以及可执行对象传送到消息队列中,并且处理从消息队列中出来的消息。

Handler有两个用途:(1)对消息以及可执行对象进行调度,以便未来某个时间点执行之;(2)向另一个线程发送一个消息或者执行对象。

当应用程序进程创建完成后,主线程负责运行一个用于管理应用内的对象,如activities,broadcast receivers的消息队列。用户可以创建自己的线程,通过一个Handler跟主线程进行通信。

Looper

Looper用于为线程运行一个消息循环;线程默认是没有消息循环的,要建立一个新的Looper,首先需要在线程内调用prepare,然后调用loop进入消息处理。

MessageQueue

MessageQueue用于保存被looper分发出来的消息。消息并不是直接添加到MessageQueue中的,而是通过跟looper相关联的Handler进行传递。

Android应用实例

在Android应用开发过程中,为了保持UI的响应,通常需要将一些耗时的操作放到非UI线程,然后将结果返回到UI线程。这里,假如我们需要从网络上下载一副图片,然后在ImageView中显示。对于网络访问这种耗时的操作,启动一个新的线程来运行。因此,首先我们自定义一个线程类用于下载图片:

    /**     * thread to download image from a given URL     */    public class DownloadThread extends Thread {        private static final String LOG_TAG = "DownloadThread";        private static final int MSG_DOWNLOAD_IMAGE = 0x01;        private static final int DEFAULT_CONNECT_TIMEOUT = 10*1000;        private static EventHandler mHandler;        private Handler mMainHandler; // main thread handler        public DownloadThread(Handler main){            super(LOG_TAG);            mMainHandler = main;        }        @Override        public void run() {            //启动线程消息循环            Looper.prepare();            // 新建一个线程Handler            mHandler = new EventHandler();            //进入消息循环            Looper.loop();        }        public void addNewTask(String url){            if(URLUtil.isNetworkUrl(url)){                Message msg = mHandler.obtainMessage(MSG_DOWNLOAD_IMAGE,url);                mHandler.sendMessage(msg);            }else{                //向主线程发送错误消息                String error = "illegal format image url";                Message msg = mMainHandler.obtainMessage(MainActivity.MSG_IMAGE_DOWNLOAD_FAIL,error);                msg.sendToTarget();            }        }        private void downloadImage(String imgUrl){            Log.v(LOG_TAG, "downloadImage(): url = " + imgUrl);            // notify main thread that it starts            Message msg = mMainHandler.obtainMessage(MainActivity.MSG_IMAGE_DOWNLOAD_START);            msg.sendToTarget();            InputStream in = null;            try {                // configure Http connection                URL myUrl = new URL(imgUrl);                HttpURLConnection urlCnn = (HttpURLConnection)myUrl.openConnection();                urlCnn.setDoInput(true);                urlCnn.setConnectTimeout(DEFAULT_CONNECT_TIMEOUT);                // get input stream from the given url                in = urlCnn.getInputStream();                Bitmap bitmap = BitmapFactory.decodeStream(in);                Bundle bundle = new Bundle();                bundle.putParcelable("bitmap",bitmap);                // send message containing data to the main thread                Message msg1 = mMainHandler.obtainMessage(MainActivity.MSG_IMAGE_DOWNLOAD_SUCCESS);                msg1.setData(bundle);                msg1.sendToTarget();            }catch (MalformedURLException e){                e.printStackTrace();                String error = "wrong format url";                Message message = mMainHandler.obtainMessage(MainActivity.MSG_IMAGE_DOWNLOAD_FAIL,error);                message.sendToTarget();            }catch (IOException e){                e.printStackTrace();                String error = "network connection error";                Message message = mMainHandler.obtainMessage(MainActivity.MSG_IMAGE_DOWNLOAD_FAIL,error);                message.sendToTarget();            }        }        // 自定义一个事件处理Handler        private final class EventHandler extends Handler{            @Override            public void handleMessage(Message msg){                int w = msg.what;                switch (w){                    case MSG_DOWNLOAD_IMAGE:                        String url = (String)msg.obj;                        downloadImage(url);                        break;                    default:                        break;                }            }        }    }

注意,启动一个新的线程时,需要调用prepareloop两个函数,以确保消息循环处于运行状态。接着,在MainActivity中启动该线程用于下载图片:

    public class MainActivity extends AppCompatActivity {        private static final String TAG = MainActivity.class.getSimpleName();        public static final int MSG_IMAGE_DOWNLOAD_START = 0x01;        public static final int MSG_IMAGE_DOWNLOAD_FAIL = 0x02;        public static final int MSG_IMAGE_DOWNLOAD_SUCCESS = 0x03;        private Handler mH;        @Override        protected void onCreate(Bundle savedInstanceState) {            super.onCreate(savedInstanceState);            setContentView(R.layout.activity_main);            Toolbar toolbar = (Toolbar) findViewById(R.id.toolbar);            setSupportActionBar(toolbar);            mIvFilm = (ImageView)findViewById(R.id.iv_film);            mH = new H();        }        @Override        public void onResume(){            super.onResume();            DownloadThread downloadThread = new DownloadThread(mH);            downloadThread.start();            String url = "http://image.baidu.com/search/redirect?tn=redirect&word=j&juid=9127CC&sign=ciwziioaoz&url=http%3A%2F%2Fwww.4493.com%2Fmotemeinv%2F6156%2F1.htm&objurl=http%3A%2F%2Fh.hiphotos.baidu.com%2Fimage%2Fpic%2Fitem%2Fdc54564e9258d109a4d1165ad558ccbf6c814d23.jpg";            // 下载图片            downloadThread.addNewTask(url);        }        // 自定义一个Handler对象,用于处理UI线程的消息        private final class H extends Handler{            @Override            public void handleMessage(Message msg){                int w = msg.what;                switch (w){                    case MSG_IMAGE_DOWNLOAD_START:                        Toast.makeText(MainActivity.this,"start download",Toast.LENGTH_SHORT).show();                        break;                    case MSG_IMAGE_DOWNLOAD_SUCCESS:                        Bitmap bitmap = msg.getData().getParcelable("bitmap");                        mIvFilm.setImageBitmap(bitmap);                        Toast.makeText(MainActivity.this,"download complete",Toast.LENGTH_SHORT).show();                        break;                    case MSG_IMAGE_DOWNLOAD_FAIL:                        Toast.makeText(MainActivity.this,"download failure",Toast.LENGTH_SHORT).show();                        break;                    default:                        break;                }            }        }    }

可见,对于一个新的线程来说,使用Handler来处理消息或者可执行对象时,需要做如下几件事情:

  1. 自定义一个Handler,用于处理Looper发送过来的消息;
  2. 启动线程消息循环: Looper.prepare(),将线程与Looper进行绑定;
  3. 进入消息循环: Looper.loop(),等待接收并处理消息。

Handler原理详解

如下图所示,为HandlerLooper以及MessageQueue三者之间的关系。

  • Looper不断查询消息队列中的消息,如果发现有新的消息,则将其发送给对应的Handler执行;
  • Handler接收到来自Looper的消息后,对其进行处理;
  • MessageQueue接收发自从Handler产生的消息,将其放入队列;

明白了三者的作用,接下来就来看一看具体的实现细节。

线程与Looper的绑定

调用prepare()函数后,新建一个Looper实例与线程进行绑定,并将其保存在线程本地变量中。

    public static void prepare() {        prepare(true);    }    private static void prepare(boolean quitAllowed) {        if (sThreadLocal.get() != null) {            throw new RuntimeException("Only one Looper may be created per thread");        }        sThreadLocal.set(new Looper(quitAllowed));    }    ...    // 新建一个与该线程对应的MessageQueue    private Looper(boolean quitAllowed) {        mQueue = new MessageQueue(quitAllowed);        mThread = Thread.currentThread();    }

源码:/android/frameworks/base/core/java/android/os/Looper.java

同时,创建一个消息队列MessageQueue

    public final class MessageQueue {        // True if the message queue can be quit.        private final boolean mQuitAllowed;        @SuppressWarnings("unused")        private long mPtr; // used by native code        private boolean mQuitting;        ....        private native static long nativeInit();        private native static void nativeDestroy(long ptr);        private native void nativePollOnce(long ptr, int timeoutMillis); /*non-static for callbacks*/        private native static void nativeWake(long ptr);        private native static boolean nativeIsPolling(long ptr);        private native static void nativeSetFileDescriptorEvents(long ptr, int fd, int events);        MessageQueue(boolean quitAllowed) {            mQuitAllowed = quitAllowed;            //本地初始化            mPtr = nativeInit();        }        ....    }

源码: /android/frameworks/base/core/java/android/os/MessageQueue.java

初始化本地消息队列(新建一个本地消息队列),该消息队列包含了一个本地的Looper, 消息循环开始之前,NativeMessageQueue中没有Looper对象,因此创建一个并将其保存到线程私有变量中:

    static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) {        NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();        if (!nativeMessageQueue) {            jniThrowRuntimeException(env, "Unable to allocate native queue");            return 0;        }        nativeMessageQueue->incStrong(env);        return reinterpret_cast(nativeMessageQueue);    }    // 本地消息队列    NativeMessageQueue::NativeMessageQueue() :            mPollEnv(NULL), mPollObj(NULL), mExceptionObj(NULL) {        //首次为空        mLooper = Looper::getForThread();        if (mLooper == NULL) {            mLooper = new Looper(false);            Looper::setForThread(mLooper);        }    }    // 将looper保存为线程私有变量    void Looper::setForThread(const sp& looper) {        sp old = getForThread(); // also has side-effect of initializing TLS        if (looper != NULL) {            looper->incStrong((void*)threadDestructor);        }        pthread_setspecific(gTLSKey, looper.get());        if (old != NULL) {            old->decStrong((void*)threadDestructor);        }    }

源码: /android/frameworks/base/core/jni/android_os_MessageQueue.cpp

新建一个本地Looper,创建一个eventfd用于监听I/O事件:

    Looper::Looper(bool allowNonCallbacks) :            mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false),            mPolling(false), mEpollFd(-1), mEpollRebuildRequired(false),            mNextRequestSeq(0), mResponseIndex(0), mNextMessageUptime(LLONG_MAX) {        mWakeEventFd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);        LOG_ALWAYS_FATAL_IF(mWakeEventFd < 0, "Could not make wake event fd: %s",                            strerror(errno));        AutoMutex _l(mLock);        rebuildEpollLocked();    }    void Looper::rebuildEpollLocked() {        // Close old epoll instance if we have one.        if (mEpollFd >= 0) {            close(mEpollFd);        }        // Allocate the new epoll instance and register the wake pipe.        mEpollFd = epoll_create(EPOLL_SIZE_HINT);        struct epoll_event eventItem;        memset(& eventItem, 0, sizeof(epoll_event));         eventItem.events = EPOLLIN;        eventItem.data.fd = mWakeEventFd;        int result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeEventFd, & eventItem);        //如果添加了其他fd,则将其添加到epoll的监听列表中        for (size_t i = 0; i < mRequests.size(); i++) {            const Request& request = mRequests.valueAt(i);            struct epoll_event eventItem;            request.initEventItem(&eventItem);            int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, request.fd, & eventItem);            if (epollResult < 0) {                ALOGE("Error adding epoll events for fd %d while rebuilding epoll set: %s",                      request.fd, strerror(errno));            }        }    }

源码: /android/system/core/libutils/Looper.cpp

有关异步I/O EPOLL相关资料:

  1. http://davmac.org/davpage/linux/async-io.html
  2. poll/select/epoll 性能比较
  3. https://linux.die.net/man/4/epoll
  4. select-poll-epoll-practical-difference-for-system-architects/
  5. http://blog.lucode.net/linux/epoll-tutorial.html
  6. evenfd Linux manpage

线程进入消息循环

线程与Looper绑定后,运行消息队列,准备处理消息:

    public static void loop() {        final Looper me = myLooper();        if (me == null) {            throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread.");        }        final MessageQueue queue = me.mQueue;        for (;;) {            // 查询消息            Message msg = queue.next(); // might block            if (msg == null) {                // No message indicates that the message queue is quitting.                return;            }            ....            try {                // 发送消息到目标Handler                msg.target.dispatchMessage(msg);            } finally {                if (traceTag != 0) {                    Trace.traceEnd(traceTag);                }            }            ....            msg.recycleUnchecked();        }    }

通过调用本地方法nativePollOnce不断查询本地消息队列是否有消息,有消息时则返回。这里可以看到,如果没有IdleHandler,nextPollTimeoutMillis == -1,则nativePollOnce()一直处于阻塞状态:

    public final class MessageQueue {        // 查询可用的消息        Message next() {            // mPtr指向本地的nativeMessageQueue            final long ptr = mPtr;            int pendingIdleHandlerCount = -1; // -1 only during first iteration            int nextPollTimeoutMillis = 0;            for (;;) {                // 查询本地消息,如果有则返回,没有则阻塞直到超时                nativePollOnce(ptr, nextPollTimeoutMillis);                synchronized (this) {                    Message prevMsg = null;                    Message msg = mMessages;                    if (msg != null && msg.target == null) {                        do {                            prevMsg = msg;                            msg = msg.next;                        } while (msg != null && !msg.isAsynchronous());                    }                    if (msg != null) {                        if (now < msg.when) {                            // Next message is not ready.  Set a timeout to wake up when it is ready.                            nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);                        } else {                            // 获得消息,返回                            mBlocked = false;                            if (prevMsg != null) {                                prevMsg.next = msg.next;                            } else {                                mMessages = msg.next;                            }                            msg.next = null;                            msg.markInUse();                            return msg;                        }                    } else {                        // No more messages.                        nextPollTimeoutMillis = -1;                    }                    ....                    //消息队列为空,或者第一个消息尚未就绪                    if (pendingIdleHandlerCount < 0                            && (mMessages == null || now < mMessages.when)) {                        pendingIdleHandlerCount = mIdleHandlers.size();                    }                    // 没有IdleHandler则设置为阻塞状态,继续监听本地消息队列                    if (pendingIdleHandlerCount <= 0) {                        // No idle handlers to run.  Loop and wait some more.                        mBlocked = true;                        continue;                    }                    if (mPendingIdleHandlers == null) {                        mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)];                    }                    mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers);                }                for (int i = 0; i < pendingIdleHandlerCount; i++) {                    final IdleHandler idler = mPendingIdleHandlers[i];                    mPendingIdleHandlers[i] = null; // release the reference to the handler                    boolean keep = false;                    try {                        keep = idler.queueIdle();                    } catch (Throwable t) {                        Log.wtf(TAG, "IdleHandler threw exception", t);                    }                    if (!keep) {                        synchronized (this) {                            mIdleHandlers.remove(idler);                        }                    }                }                // Reset the idle handler count to 0 so we do not run them again.                pendingIdleHandlerCount = 0;            }        }    }

调用本地方法,查看队列中是否有消息,实际是利用Looper中的epoll机制来监听是否有IO事件:

    static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,            jlong ptr, jint timeoutMillis) {        NativeMessageQueue* nativeMessageQueue = reinterpret_cast(ptr);        nativeMessageQueue->pollOnce(env, obj, timeoutMillis);    }    void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) {        mPollEnv = env;        mPollObj = pollObj;        mLooper->pollOnce(timeoutMillis);        mPollObj = NULL;        mPollEnv = NULL;        if (mExceptionObj) {            env->Throw(mExceptionObj);            env->DeleteLocalRef(mExceptionObj);            mExceptionObj = NULL;        }    }

Looper::pollOnce不断通过epoll_wait来监听之前创建的eventfd的IO事件,查看是否有消息,没有消息则直接返回:

    int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {        int result = 0;        for (;;) {            while (mResponseIndex < mResponses.size()) {                const Response& response = mResponses.itemAt(mResponseIndex++);                int ident = response.request.ident;                if (ident >= 0) {                    int fd = response.request.fd;                    int events = response.events;                    void* data = response.request.data;                    if (outFd != NULL) *outFd = fd;                    if (outEvents != NULL) *outEvents = events;                    if (outData != NULL) *outData = data;                    return ident;                }            }            // 第二次循环时,由于result == POLL_TIMEOUT,因此返回            if (result != 0) {                if (outFd != NULL) *outFd = 0;                if (outEvents != NULL) *outEvents = 0;                if (outData != NULL) *outData = NULL;                return result;            }            // 超时时间为0, 如果没有消息则返回POLL_TIMEOUT            result = pollInner(timeoutMillis);        }    }

将消息发送到队列

Handler.java中,有很多方法用于创建消息,并将其发送到对应目标线程的消息队列中,以sendMessage(Message msg)为例:

    public final boolean sendMessage(Message msg)    {        return sendMessageDelayed(msg, 0);    }    public final boolean sendMessageDelayed(Message msg, long delayMillis)    {        if (delayMillis < 0) {            delayMillis = 0;        }        return sendMessageAtTime(msg, SystemClock.uptimeMillis() + delayMillis);    }    public boolean sendMessageAtTime(Message msg, long uptimeMillis) {        MessageQueue queue = mQueue;        if (queue == null) {            RuntimeException e = new RuntimeException(                    this + " sendMessageAtTime() called with no mQueue");            Log.w("Looper", e.getMessage(), e);            return false;        }        return enqueueMessage(queue, msg, uptimeMillis);    }    //发送消息到消息队列    private boolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis) {        msg.target = this;        if (mAsynchronous) {            msg.setAsynchronous(true);        }        return queue.enqueueMessage(msg, uptimeMillis);    }

调用MessageQueue中的enqueueMessage()将消息发送到对应线程消息队列,由于之前消息队列为空,线程处于阻塞状态,因此需要通过调用本地方法nativeWake来唤醒线程:

    boolean enqueueMessage(Message msg, long when) {        if (msg.target == null) {            throw new IllegalArgumentException("Message must have a target.");        }        if (msg.isInUse()) {            throw new IllegalStateException(msg + " This message is already in use.");        }        synchronized (this) {            if (mQuitting) {                IllegalStateException e = new IllegalStateException(                        msg.target + " sending message to a Handler on a dead thread");                Log.w(TAG, e.getMessage(), e);                msg.recycle();                return false;            }            msg.markInUse();            msg.when = when;            Message p = mMessages;            boolean needWake;            if (p == null || when == 0 || when < p.when) {                // New head, wake up the event queue if blocked.                msg.next = p;                mMessages = msg;                needWake = mBlocked; // 当前队列为空,阻塞状态            } else {                // Inserted within the middle of the queue.  Usually we don't have to wake                // up the event queue unless there is a barrier at the head of the queue                // and the message is the earliest asynchronous message in the queue.                needWake = mBlocked && p.target == null && msg.isAsynchronous();                Message prev;                for (;;) {                    prev = p;                    p = p.next;                    if (p == null || when < p.when) {                        break;                    }                    if (needWake && p.isAsynchronous()) {                        needWake = false;                    }                }                msg.next = p; // invariant: p == prev.next                prev.next = msg;            }            // We can assume mPtr != 0 because mQuitting is false.            if (needWake) {                nativeWake(mPtr);            }        }        return true;    }

nativeWake通过Looper::wakemWakeEventFd这个文件描述写入一个整型值,从而唤醒等到线程,这样之前的epoll_wait就会返回一个大于0的值,表示有IO事件发生了。

    static void android_os_MessageQueue_nativeWake(JNIEnv* env, jclass clazz, jlong ptr) {        NativeMessageQueue* nativeMessageQueue = reinterpret_cast(ptr);        nativeMessageQueue->wake();    }    void NativeMessageQueue::wake() {        mLooper->wake();    }    void Looper::wake() {        uint64_t inc = 1;        ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd, &inc, sizeof(uint64_t)));        if (nWrite != sizeof(uint64_t)) {            if (errno != EAGAIN) {                LOG_ALWAYS_FATAL("Could not write wake signal to fd %d: %s",                        mWakeEventFd, strerror(errno));            }        }    }

将消息发送给目标线程Handler

目标线程唤醒后,Looper::loop中的que.next()返回一个消息,接着会将该消息发送个对应线程的Handler处理:

    public static void loop() {        final Looper me = myLooper();        if (me == null) {            throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread.");        }        final MessageQueue queue = me.mQueue;        for (;;) {            // 查询消息            Message msg = queue.next(); // might block            if (msg == null) {                // No message indicates that the message queue is quitting.                return;            }            ....            try {                // 发送消息到目标Handler                msg.target.dispatchMessage(msg);            } finally {                if (traceTag != 0) {                    Trace.traceEnd(traceTag);                }            }            ....            msg.recycleUnchecked();        }    }

目标Handler接收到消息,直接调用dispatchMessage(),这样一个线程消息循环就结束了。接着通过Looper::loop线程有进入新的消息循环。

    public class Handler {        public void dispatchMessage(Message msg) {            // 有回调,直接调用回调函数            if (msg.callback != null) {                handleCallback(msg);            } else {                // handler本身有回调函数,调用回调函数处理消息                if (mCallback != null) {                    if (mCallback.handleMessage(msg)) {                        return;                    }                }                // 调用自定义的函数处理消息                handleMessage(msg);            }        }        // 执行回调        private static void handleCallback(Message message) {            message.callback.run();        }    }

参考文献

  • http://gityuan.com/2015/12/26/handler-message-framework/

更多相关文章

  1. Android即时消息介绍
  2. android Java 笔试考题
  3. Android(安卓)核心分析(13) -----Android(安卓)GWES之Android窗
  4. Android线程学习
  5. 图解Android(安卓)- System Service 概论 和 Android(安卓)GUI
  6. 图解Android(安卓)- Android(安卓)GUI 系统 (5) - Android的Even
  7. Android-线程笔记
  8. Android进程与线程基本知识
  9. Android,谁动了我的内存(1)

随机推荐

  1. Android(安卓)AsyncTask 源码解析
  2. Android客户端三步完成支付宝支付SDK接入
  3. Android自定义控件的实现
  4. Android按两次返回键退出应用
  5. Android(安卓)Java压缩Zlib,Gzip,Zip支持
  6. S5PV210 三个Camera Interface/CAMIF/FIM
  7. Android基于UDP的局域网聊天通信(有完整De
  8. 【android】自定义ViewGroup的onLayout()
  9. Android的几种按钮控件: Options Menu / C
  10. Handler: 更新UI的方法