概述

Android中的Handler、Looper、MessageQueue是用来解决线程间通讯问题的一种机制.介绍它们的文章非常多,这里做一些学习和整理,帮助自己理解,也希望能方便大家.
一句话
“Looper不断获取MessageQueue中的Message,然后由Handler来处理”
一张图

先记住一句话和一张图,对这三者有一个感性的认识,后面会介绍一个简单的demo,通过demo来阅读Java和Native层的代码.

例子

做一个简单的demo程序:线程A给线程B发送消息.

class Thread_B extends Thread{    private final static String TAG = "Thread_B:";    public Handler mHandler;    public Handler GetHandler(){        return mHandler;    }    public void run(){        Looper.prepare();        mHandler = new Handler(){            public void handleMessage(Message msg){                Log.d(TAG,"get msg=" + msg.what);            }        };        Looper.loop();    }}public class MainActivity extends AppCompatActivity {    @Override    protected void onCreate(Bundle savedInstanceState) {        super.onCreate(savedInstanceState);        setContentView(R.layout.activity_main);        //创建Thread_B线程,在run()方法中打印thread_a线程发送过来的消息.        final Thread_B thread_b = new Thread_B();        thread_b.start();        //间隔500ms向thread_b发送消息.        Thread thread_a = new Thread(){            public void run() {                int what = 1;                while(true){                    try {                        Thread.sleep(500);                    } catch (InterruptedException e) {                        e.printStackTrace();                    }                    thread_b.GetHandler().sendEmptyMessage(what++);                }            }        };        thread_a.start();    }}

线程B接收并打印线程A发送过来的数据,输出结果

“当你觉得轻松的时候,一定是有人在为你负重前行”,我们通过源码流程看一下它的实现原理

实现原理

JAVA层代码

接收方

上面代码中接收方Thread_B中主要代码如下

        Looper.prepare();        mHandler = new Handler(){            public void handleMessage(Message msg){                Log.d(TAG,"get msg=" + msg.what);            }        };        Looper.loop();

prepare()为Looper类的静态方法

    private static void prepare(boolean quitAllowed) {        if (sThreadLocal.get() != null) {            throw new RuntimeException("Only one Looper may be created per thread");        }        sThreadLocal.set(new Looper(quitAllowed));    }

sThreadLocal为ThreadLocal类型的对象,在Looper中使用ThreadLocal的set/get方法,set方法用来建立当前线程与Looper对象的Map,get方法用当前线程来获取looper对象.prepare方法确保一个线程中只能存在一个Looper对象,如果没有存在则创建Looper对象,并通过set方法建立当前线程和Looper的键/值关系,如果存在则通过get方法获取.接着来看Looper的构造函数.

    private Looper(boolean quitAllowed) {        mQueue = new MessageQueue(quitAllowed);        mThread = Thread.currentThread();    }

创建MessageQueue对象,用来存放各种消息接着看Handler的构造函数

    //构造函数1    public Handler() {        this(null, false);    }    //构造函数2    public Handler(Callback callback, boolean async) {        //...        mLooper = Looper.myLooper();        if (mLooper == null) {            throw new RuntimeException(                "Can't create handler inside thread that has not called Looper.prepare()");        }        mQueue = mLooper.mQueue;        mCallback = callback;        mAsynchronous = async;    }

构造函数1调用构造函数2,在构造函数2中,调用myLooper()方法获得prepare中创建的Looper对象和MessageQueue对象.接着看Looper.loop()和MessageQueue的next方法

    public static void loop() {        final Looper me = myLooper();        if (me == null) {            throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread.");        }        final MessageQueue queue = me.mQueue;        for (;;) {            Message msg = queue.next(); // might block            if (msg == null) {                // No message indicates that the message queue is quitting.                return;            }           //...           msg.target.dispatchMessage(msg);          //...        }    }    Message next() {        //...        for (;;) {            nativePollOnce(ptr, nextPollTimeoutMillis);            synchronized (this) {                // Try to retrieve the next message.  Return if found.                final long now = SystemClock.uptimeMillis();                Message prevMsg = null;                Message msg = mMessages;                if (msg != null && msg.target == null) {                    // Stalled by a barrier.  Find the next asynchronous message in the queue.                    do {                        prevMsg = msg;                        msg = msg.next;                    } while (msg != null && !msg.isAsynchronous());                }                if (msg != null) {                    if (now < msg.when) {                        // Next message is not ready.  Set a timeout to wake up when it is ready.                        nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);                    } else {                        // Got a message.                        mBlocked = false;                        if (prevMsg != null) {                            prevMsg.next = msg.next;                        } else {                            mMessages = msg.next;                        }                        msg.next = null;                        if (DEBUG) Log.v(TAG, "Returning message: " + msg);                        msg.markInUse();                        return msg;                    }                } else {                    // No more messages.                    nextPollTimeoutMillis = -1;                }                // Process the quit message now that all pending messages have been handled.                if (mQuitting) {                    dispose();                    return null;                }          //...    }

loop()函数中将会一直循环的从MessageQueue中读取消息,当下一条消息为空时,将nextPollTimeoutMillis设置为-1,调用nativePollOnce方法,会一直处于block状态(放弃CPU的占用),等待新消息进入MessageQueue,当信消息来时会通过Native机制来唤醒.当loop()获取到新的消息后,调用msg.target所指向的对象(Handler)的dispatchMessage方法来分发消息.这里看一下Handler的dispatchMessage方法

    public void dispatchMessage(Message msg) {        if (msg.callback != null) {            handleCallback(msg);        } else {            if (mCallback != null) {                if (mCallback.handleMessage(msg)) {                    return;                }            }            handleMessage(msg);        }    }

这里可以看到,执行处理的地方优先级从高到底分别是msg中的callback函数、handler对象的callback函数、handler对象的默认方法handlerMessage.上面示例代码是最后一种方式.

发送方

发送方的主要代码

thread_b.GetHandler().sendEmptyMessage(what++);

获取到接收方的handler对象,调用sendEmptyMessage接口向接收方发送数据,sendEmptyMessage代码如下

    public final boolean sendEmptyMessage(int what)    {        return sendEmptyMessageDelayed(what, 0);    }    public final boolean sendEmptyMessageDelayed(int what, long delayMillis) {        Message msg = Message.obtain();        msg.what = what;        return sendMessageDelayed(msg, delayMillis);    }    public final boolean sendMessageDelayed(Message msg, long delayMillis)    {        if (delayMillis < 0) {            delayMillis = 0;        }        return sendMessageAtTime(msg, SystemClock.uptimeMillis() + delayMillis);    }    public boolean sendMessageAtTime(Message msg, long uptimeMillis) {        MessageQueue queue = mQueue;        if (queue == null) {            RuntimeException e = new RuntimeException(                    this + " sendMessageAtTime() called with no mQueue");            Log.w("Looper", e.getMessage(), e);            return false;        }        return enqueueMessage(queue, msg, uptimeMillis);    }    private boolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis) {        msg.target = this;        if (mAsynchronous) {            msg.setAsynchronous(true);        }        return queue.enqueueMessage(msg, uptimeMillis);    }    boolean enqueueMessage(Message msg, long when) {            //...            msg.markInUse();            msg.when = when;            Message p = mMessages;            boolean needWake;            if (p == null || when == 0 || when < p.when) {                // New head, wake up the event queue if blocked.                msg.next = p;                mMessages = msg;                needWake = mBlocked;            } else {                // Inserted within the middle of the queue.  Usually we don't have to wake                // up the event queue unless there is a barrier at the head of the queue                // and the message is the earliest asynchronous message in the queue.                needWake = mBlocked && p.target == null && msg.isAsynchronous();                Message prev;                for (;;) {                    prev = p;                    p = p.next;                    if (p == null || when < p.when) {                        break;                    }                    if (needWake && p.isAsynchronous()) {                        needWake = false;                    }                }                msg.next = p; // invariant: p == prev.next                prev.next = msg;            }            // We can assume mPtr != 0 because mQuitting is false.            if (needWake) {                nativeWake(mPtr);            }        }        return true;    }

sendEmptyMessage接口发送数据,会经过sendEmptyMessageDelayed—>sendMessageDelayed—>sendMessageAtTime—>enqueueMessage,最终调用enqueueMessage方法将消息放到mMessages所保存的队列中,并在最后调用nativeWake方法,来唤醒Looper的loop方法.
Java层的Looper的生命周期如下所示

到这里按照上面示例对JAVA层的完整流程做了一个说明.总之Looper不断轮询MessageQueue中的消息,当获取到消息后,调用Handler中的方法进行处理.

Native层代码

MessageQueue是JAVA层这套机制的核心,Native层的Looper(JAVA层的Looper没有关系)在MessageQueue中发挥了重大作用..Native层的Looper类(Looper.cpp)内部又是依赖于epoll来实现,这个链接对epoll做一个简介,对理解Native层的Looper有一定帮助.
我们从这里对Native层代码进行分析,MessageQueue 通过JNI调用到Native层的Looper.手上是Android7.1.2的代码,可以看到MessageQueue有6个JNI接口

static const JNINativeMethod gMessageQueueMethods[] = {    /* name, signature, funcPtr */    { "nativeInit", "()J", (void*)android_os_MessageQueue_nativeInit },    { "nativeDestroy", "(J)V", (void*)android_os_MessageQueue_nativeDestroy },    { "nativePollOnce", "(JI)V", (void*)android_os_MessageQueue_nativePollOnce },    { "nativeWake", "(J)V", (void*)android_os_MessageQueue_nativeWake },    { "nativeIsPolling", "(J)Z", (void*)android_os_MessageQueue_nativeIsPolling },    { "nativeSetFileDescriptorEvents", "(JII)V",            (void*)android_os_MessageQueue_nativeSetFileDescriptorEvents },};

接下用这六个接口来分析MessageQueue

接口1:android_os_MessageQueue_nativeInit

static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) {    NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();    if (!nativeMessageQueue) {        jniThrowRuntimeException(env, "Unable to allocate native queue");        return 0;    }    nativeMessageQueue->incStrong(env);    return reinterpret_cast(nativeMessageQueue);}

创建NativeMessageQueue对象,并增加引用计数.

NativeMessageQueue::NativeMessageQueue() :        mPollEnv(NULL), mPollObj(NULL), mExceptionObj(NULL) {    mLooper = Looper::getForThread();    if (mLooper == NULL) {        mLooper = new Looper(false);        Looper::setForThread(mLooper);    }}

在构造函数中,如果线程已有Looper则直接获取,如果没有则创建,这个过程主要由下面三个函数组成

sp<Looper> Looper::getForThread() {    int result = pthread_once(& gTLSOnce, initTLSKey);    LOG_ALWAYS_FATAL_IF(result != 0, "pthread_once failed");    return (Looper*)pthread_getspecific(gTLSKey);}void Looper::initTLSKey() {    int result = pthread_key_create(& gTLSKey, threadDestructor);    LOG_ALWAYS_FATAL_IF(result != 0, "Could not allocate TLS key.");}void Looper::setForThread(const sp<Looper>& looper) {    sp<Looper> old = getForThread(); // also has side-effect of initializing TLS    if (looper != NULL) {        looper->incStrong((void*)threadDestructor);    }    pthread_setspecific(gTLSKey, looper.get());    if (old != NULL) {        old->decStrong((void*)threadDestructor);    }}

initTLSKey(pthread_once确保此函数只会被执行一次)调用pthread_key_create创建gTLSKey键,setForThread中调用pthread_setspecific将gTLSKey与Looper绑定,如果gTLSKey对应旧的Looper则用新的Looper替换,之后通过pthread_getspecific来获取gTLSKey对应的Looper.(这个过程与Looper.java中sThreadLocal功能类似),接着看一下Looper的构造函数

Looper::Looper(bool allowNonCallbacks) :        mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false),        mPolling(false), mEpollFd(-1), mEpollRebuildRequired(false),        mNextRequestSeq(0), mResponseIndex(0), mNextMessageUptime(LLONG_MAX) {    mWakeEventFd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);    LOG_ALWAYS_FATAL_IF(mWakeEventFd < 0, "Could not make wake event fd: %s",                        strerror(errno));    AutoMutex _l(mLock);    rebuildEpollLocked();}

调用eventfd创建专门用来唤醒Looper的mWakeEventFd,后面唤醒事件依赖它.接着调用rebuildEpollLocked.

void Looper::rebuildEpollLocked() {    // Close old epoll instance if we have one.    if (mEpollFd >= 0) {#if DEBUG_CALLBACKS        ALOGD("%p ~ rebuildEpollLocked - rebuilding epoll set", this);#endif        close(mEpollFd);    }    // Allocate the new epoll instance and register the wake pipe.    mEpollFd = epoll_create(EPOLL_SIZE_HINT);    LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance: %s", strerror(errno));    struct epoll_event eventItem;    memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union    eventItem.events = EPOLLIN;    eventItem.data.fd = mWakeEventFd;    int result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeEventFd, & eventItem);    LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake event fd to epoll instance: %s",                        strerror(errno));    for (size_t i = 0; i < mRequests.size(); i++) {        const Request& request = mRequests.valueAt(i);        struct epoll_event eventItem;        request.initEventItem(&eventItem);        int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, request.fd, & eventItem);        if (epollResult < 0) {            ALOGE("Error adding epoll events for fd %d while rebuilding epoll set: %s",                  request.fd, strerror(errno));        }    }}

重新创建epoll实例,通过EPOLL_CTL_ADD将要被监控的fd全部添加到epoll当中.

接口2:接口android_os_MessageQueue_nativeSetFileDescriptorEvents

static void android_os_MessageQueue_nativeSetFileDescriptorEvents(JNIEnv* env, jclass clazz, jlong ptr, jint fd, jint events) {    NativeMessageQueue* nativeMessageQueue = reinterpret_cast(ptr);    nativeMessageQueue->setFileDescriptorEvents(fd, events);}

调用NativeMessageQueue的setFileDescriptorEvents方法,来添加所要监控的文件描述符.

void NativeMessageQueue::setFileDescriptorEvents(int fd, int events) {    ALOGE("setFileDescriptorEvents[oeh]:fd=%d,events=%d",fd,events);    if (events) {        int looperEvents = 0;        if (events & CALLBACK_EVENT_INPUT) {            looperEvents |= Looper::EVENT_INPUT;        }        if (events & CALLBACK_EVENT_OUTPUT) {            looperEvents |= Looper::EVENT_OUTPUT;        }        mLooper->addFd(fd, Looper::POLL_CALLBACK, looperEvents, this,                reinterpret_cast<void*>(events));    } else {        mLooper->removeFd(fd);    }}

setFileDescriptorEvents函数调用mLooper的addFd方法来添加文件描述符,调用removeFd方法来删除文件描述符.我们来看一下Looper的addFd函数

int Looper::addFd(int fd, int ident, int events, const sp& callback, void* data) {#if DEBUG_CALLBACKS    ALOGD("%p ~ addFd - fd=%d, ident=%d, events=0x%x, callback=%p, data=%p", this, fd, ident,            events, callback.get(), data);#endif    if (!callback.get()) {        if (! mAllowNonCallbacks) {            ALOGE("Invalid attempt to set NULL callback but not allowed for this looper.");            return -1;        }        if (ident < 0) {            ALOGE("Invalid attempt to set NULL callback with ident < 0.");            return -1;        }    } else {        ident = POLL_CALLBACK;    }    { // acquire lock        AutoMutex _l(mLock);        Request request;        request.fd = fd;        request.ident = ident;        request.events = events;        request.seq = mNextRequestSeq++;        request.callback = callback;        request.data = data;        if (mNextRequestSeq == -1) mNextRequestSeq = 0; // reserve sequence number -1        struct epoll_event eventItem;        request.initEventItem(&eventItem);        ssize_t requestIndex = mRequests.indexOfKey(fd);        if (requestIndex < 0) {            int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, fd, & eventItem);            if (epollResult < 0) {                ALOGE("Error adding epoll events for fd %d: %s", fd, strerror(errno));                return -1;            }            mRequests.add(fd, request);        } else {            int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_MOD, fd, & eventItem);            if (epollResult < 0) {                if (errno == ENOENT) {                    // Tolerate ENOENT because it means that an older file descriptor was                    // closed before its callback was unregistered and meanwhile a new                    // file descriptor with the same number has been created and is now                    // being registered for the first time.  This error may occur naturally                    // when a callback has the side-effect of closing the file descriptor                    // before returning and unregistering itself.  Callback sequence number                    // checks further ensure that the race is benign.                    //                    // Unfortunately due to kernel limitations we need to rebuild the epoll                    // set from scratch because it may contain an old file handle that we are                    // now unable to remove since its file descriptor is no longer valid.                    // No such problem would have occurred if we were using the poll system                    // call instead, but that approach carries others disadvantages.#if DEBUG_CALLBACKS                    ALOGD("%p ~ addFd - EPOLL_CTL_MOD failed due to file descriptor "                            "being recycled, falling back on EPOLL_CTL_ADD: %s",                            this, strerror(errno));#endif                    epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, fd, & eventItem);                    if (epollResult < 0) {                        ALOGE("Error modifying or adding epoll events for fd %d: %s",                                fd, strerror(errno));                        return -1;                    }                    scheduleEpollRebuildLocked();                } else {                    ALOGE("Error modifying epoll events for fd %d: %s", fd, strerror(errno));                    return -1;                }            }            mRequests.replaceValueAt(requestIndex, request);        }    } // release lock    return 1;}

如果该请求时间的文件描述符已经存在,则使用EPOLL_CTL_MOD参数对文件描述符进行修改,如果不存在,则调用EPOLL_CTL_ADD来添加文件描述符,实现监听.

接口3:android_os_MessageQueue_nativePollOnce

static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,        jlong ptr, jint timeoutMillis) {    NativeMessageQueue* nativeMessageQueue = reinterpret_cast(ptr);    nativeMessageQueue->pollOnce(env, obj, timeoutMillis);}void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) {    mPollEnv = env;    mPollObj = pollObj;    mLooper->pollOnce(timeoutMillis);    mPollObj = NULL;    mPollEnv = NULL;    if (mExceptionObj) {        env->Throw(mExceptionObj);        env->DeleteLocalRef(mExceptionObj);        mExceptionObj = NULL;    }}

调用Native层Looper的pollOnce,pollOnce里主要是调用pollInner,

int Looper::pollInner(int timeoutMillis) {    //...    int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);    for (int i = 0; i < eventCount; i++) {        int fd = eventItems[i].data.fd;        uint32_t epollEvents = eventItems[i].events;        if (fd == mWakeEventFd) {            if (epollEvents & EPOLLIN) {                awoken();            } else {                ALOGW("Ignoring unexpected epoll events 0x%x on wake event fd.", epollEvents);            }        } else {            ssize_t requestIndex = mRequests.indexOfKey(fd);            if (requestIndex >= 0) {                int events = 0;                if (epollEvents & EPOLLIN) events |= EVENT_INPUT;                if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT;                if (epollEvents & EPOLLERR) events |= EVENT_ERROR;                if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP;                pushResponse(events, mRequests.valueAt(requestIndex));            } else {                ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is "                        "no longer registered.", epollEvents, fd);            }    //...}

epoll_wait阻塞等待监听,当所监控的文件描述符有事件到来时,进行处理.(Java层MessageQueue调用next时,如果下一条消息为空,下发的timeoutMillis为-1,将会一直阻塞等待唤醒)

接口4:android_os_MessageQueue_nativeWake

static void android_os_MessageQueue_nativeWake(JNIEnv* env, jclass clazz, jlong ptr) {    NativeMessageQueue* nativeMessageQueue = reinterpret_cast(ptr);    nativeMessageQueue->wake();}void NativeMessageQueue::wake() {    mLooper->wake();}

android_os_MessageQueue_nativeWake用来调用Looper的wake接口,epoll_wait.

void Looper::wake() {#if DEBUG_POLL_AND_WAKE    ALOGD("%p ~ wake", this);#endif    uint64_t inc = 1;    ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd, &inc, sizeof(uint64_t)));    if (nWrite != sizeof(uint64_t)) {        if (errno != EAGAIN) {            ALOGW("Could not write wake signal: %s", strerror(errno));        }    }}

往Looper构造时创建的mWakeEventFd写入数据,来达到唤醒Looper的目的.

接口5:android_os_MessageQueue_nativeIsPolling

static jboolean android_os_MessageQueue_nativeIsPolling(JNIEnv* env, jclass clazz, jlong ptr) {    NativeMessageQueue* nativeMessageQueue = reinterpret_cast(ptr);    return nativeMessageQueue->getLooper()->isPolling();}bool Looper::isPolling() const {    return mPolling;}

获取Looper的状态,Looper在调用epoll_wait之前为true,在epoll_wait之后(有事件触发)为false.

接口6:android_os_MessageQueue_nativeDestroy

static void android_os_MessageQueue_nativeDestroy(JNIEnv* env, jclass clazz, jlong ptr) {    NativeMessageQueue* nativeMessageQueue = reinterpret_cast(ptr);    nativeMessageQueue->decStrong(env);}

减少NativeMessageQueue引用计数.

参考资料

stackoverflow
what-is-the-purpose-of-looper-and-how-to-use-it
android-looper-and-toast-from
android_developer
Communicate with the UI thread
android-ui-thread
<<深入理解Android内核设计思想>>

更多相关文章

  1. Android(安卓)MVP 模式 项目初体验(一)
  2. 学习Android闹钟源代码(三)-AlarmClock类分析(part1)
  3. adb通过wifi连接方法
  4. Android抓包方法(一)之Fiddler代理
  5. Android(安卓)消息机制(Handler Looper Message )理解
  6. Android信息推送—AndroidPN的学习(上)
  7. Android(安卓)GPRS的自动打开与关闭
  8. 控制android弹出框不消失(用到反射的方法)
  9. Android(安卓)Native程序crash的一些定位方法简介

随机推荐

  1. Android(安卓)Service两种启动启动方式
  2. Android(安卓)布局转化为View对象的两种
  3. Android(安卓)recovery 下使用 updater-s
  4. Android的数据存储之SharedPreferences1
  5. Android用户界面优化-Android(安卓)Slidi
  6. Android(安卓)存储路径选择方法
  7. Android(安卓)五种布局模式
  8. android 截屏的三种方法
  9. Android(安卓)USB Host与HID通讯(二)
  10. Unable to get buffer of resource asset