Android消息机制Handler的实现原理解析
Android的主线程为什么可以一直存在?
线程是一个动态执行的过程,从产生到死亡包括五个状态:新建、就绪、运行、死亡和堵塞。只要线程没有执行完毕或者没有被其它线程杀死,线程就不会进入死亡状态。Android中的主线程一直存在是因为主线程中一直在监听消息,从而使线程无法被执行完毕。
线程的五种状态:
- 新建
new Thread
当创建Thread类的一个实例对象
时,此线程进入新建状态未被启动
。 - 就绪
runnable
线程已经被启动,正在等待被分配给CPU时间片,也就是说此时线程正在就绪队列中排队等候得到CPU资源。 - 运行
running
线程获得CPU资源正在执行任务run()方法
,此时除非此线程自动放弃CPU资源或者有优先级更高的线程进入,线程将一直运行到结束。 - 死亡
dead
当线程执行完毕或被其它线程杀死,线程就进入死亡状态,这时线程不可能再进入就绪状态等待执行。 - 堵塞
blocked
由于某种原因导致正在运行的线程让出CPU并暂停自己的执行,即进入堵塞状态。
什么是Handler?
Android是消息驱动的,消息驱动包括Java层实现和Native层实现。Java层的消息机制包括Message、MessageQueue、Looper、Handler四要素, Native层的消息机制包括Looper、MessageQueue两要素。Handler是Android消息机制的统称,它主要是解决手机中进程内两个线程之间如何通讯。
消息驱动是围绕消息的产生与处理展开的,并依靠消息循环机制来实现。
消息的四要素:
- Message
消息的主体 - MessageQueue
消息队列 - Looper
消息循环,用于循环取出消息进行处理 - Handler
消息处理,Looper从MessageQueue取出Message后要对Message进行处理
Hanlder消息循环机制原理图
主线程通过调用Looper.prepare()初始化Looper、MessageQueue、NativeMessageQueue(Native)和Looper(Native);
Looper和MessageQueue是Java层的类,NativeMessageQueue(Native)和Looper(Native)是JNI层的类。Looper是在prepare()方法被调用的时候初始化的,初始化Looper的时候会跟着初始化MessageQueue,MessageQueue的初始化工作会通过JNI方法将NativeMessageQueue(Native)和MessageQueue关联起来,关联也就是在Native层初始化NativeMessageQueue,然后将NativeMessageQueue对象的收地址存放到MessageQueue对象中的mPtr变量中,NativeMessageQueue初始化的时候会跟着初始化Looper(Native)。
主线程通过调用Looper.loop()进入无限消息循环,保证主线程一直存活;
消息循环是通过Looper.loop()开始的,这个方法可以理解为是一个死循环的方法,可以通过阻塞和非阻塞两种方式去获取消息。当使用阻塞方式获取消息的时候跟Socket客户端的用法很相似。Looper(Native)是实现阻塞的地方,实现的原理是 I/O 多路复用中的epoll,这里我不详细讲它实现的原理,epoll你大致可以理解为它可以支持多个socket客户端连接,当某个socket客户端收到消息后阻塞过程被恢复,MessageQueue通过检测消息队列可以知道当前是不是有消息可以返回给loop方法。这个阻塞操作只是为了让线程空闲下来,没有其它的实际意义。
消息循环机制的建立
一、消息循环机制的初始化
不管是主线程还是子线程,都可以建立消息循环机制,主线程的消息循环机制是由系统替我们建立的,如果我们需要在子线程中建立消息循环机制,那么就要调用相关的API函数去建立,下面我们以主线程的消息循环机制为例讲解系统是如何为主线程初始化消息循环机制的。首先,每一个Activity创建都是从ActivityThread.main()方法开始的。
frameworks\base\core\java\android\app\ActivityThread.java
public static void main(String[] args) { ...... Looper.prepareMainLooper(); ActivityThread thread = new ActivityThread(); thread.attach(false); ...... Looper.loop(); throw new RuntimeException("Main thread loop unexpectedly exited"); }
从上面的代码可以看出loop()方法的后面直接抛出异常,说明正常情况Android是不允许主线程退出的,也就是说正常情况下下来loop()方法不可能执行完,接下来先看 Looper.prepareMainLooper()是如何实现的。
frameworks\base\core\java\android\os\Looper.java
/** * Initialize the current thread as a looper, marking it as an * application's main looper. The main looper for your application * is created by the Android environment, so you should never need * to call this function yourself. See also: {@link #prepare()} */ public static void prepareMainLooper() { prepare(false); synchronized (Looper.class) { if (sMainLooper != null) { throw new IllegalStateException("The main Looper has already been prepared."); } sMainLooper = myLooper(); } }
其它的代码不用看,只看 prepare(false);这一行就行了,我们来看一下prepare的实现
frameworks\base\core\java\android\os\Looper.java
private static void prepare(boolean quitAllowed) { if (sThreadLocal.get() != null) { throw new RuntimeException("Only one Looper may be created per thread"); } sThreadLocal.set(new Looper(quitAllowed)); }
prepare只是初始化了Looper并交给ThreadLocal去管理,,注意这里有一个quitAllowed参数,它表示当前Looper消息循环机制是否可以退出,true表示可以退出,false表示不能退出,主线程的Looper默认是不能退出的,在上面prepareMainLooper中调用 prepare(false)就可以知道,我们来看Looper的构造函数干了啥。
frameworks\base\core\java\android\os\Looper.java
private Looper(boolean quitAllowed) { mQueue = new MessageQueue(quitAllowed); mThread = Thread.currentThread(); }
在这里对MessageQueue进行了初始化,接着看MessageQueue的实现
frameworks\base\core\java\android\os\MessageQueue.java
MessageQueue(boolean quitAllowed) { mQuitAllowed = quitAllowed; mPtr = nativeInit(); }
这里直接调用Native层的方法nativeInit()来初始化Native层的对象,继续看Native层是如何初始化的
frameworks\base\core\jni\android_os_MessageQueue.cpp
static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) { NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue(); if (!nativeMessageQueue) { jniThrowRuntimeException(env, "Unable to allocate native queue"); return 0; } nativeMessageQueue->incStrong(env); return reinterpret_cast(nativeMessageQueue);}
第一行直接初始化了一个Native层的NativeMessageQueue对象,并将其首地址返回给Java层,从上一步可以知道是保存在MessageQueue的mPtr变量中。至此,整个初始化过程就已经完毕了。
二、消息循环机制
消息循环机制主要是实现了一个循环监听,通过linux中的IO多路复用里面的epoll机制来实现阻塞,不断的从MessageQueue中取出消息,然后执行消息,下面接着来看Looper.loop()是如何建立消息循环机制的。
frameworks\base\core\java\android\os\Looper.java
/** * Run the message queue in this thread. Be sure to call * {@link #quit()} to end the loop. */ public static void loop() { final Looper me = myLooper(); if (me == null) { throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread."); } final MessageQueue queue = me.mQueue; // Make sure the identity of this thread is that of the local process, // and keep track of what that identity token actually is. Binder.clearCallingIdentity(); final long ident = Binder.clearCallingIdentity(); for (;;) { Message msg = queue.next(); // might block if (msg == null) { // No message indicates that the message queue is quitting. return; } ...... msg.target.dispatchMessage(msg); ...... msg.recycleUnchecked(); } }
在这里方法中我们可以看到loop方法永远不会退出,只有当获取到的Message是null的时候才退出,queue.next()是获取Message的方法,我们等会再说,继续看下面的代码,当获取到的Message不为null的是就会调用 msg.target.dispatchMessage(msg)来分发消息,这里的msg.target指向的实际上就是我们代码中的Handler,我们来看Handler中的dispatchMessage的实现。
frameworks\base\core\java\android\os\Handler.java
/** * Handle system messages here. */ public void dispatchMessage(Message msg) { if (msg.callback != null) { handleCallback(msg); } else { if (mCallback != null) { if (mCallback.handleMessage(msg)) { return; } } handleMessage(msg); } }
这里会首先判断msg.callback是不是null,msg.callback实际上就是我们调用handler的post方法传入的Runnable,而handleCallback方法也就是执行Runnable.run(),当msg.callback不为null时执行handleCallback,当msg.callback为null时会先判断mCallback是不是null,mCallback是我们在初始化Handler时传入的参数,如果外部mCallback.handleMessage(msg)返回true,表示不执行Handler中的handleMessage,否则就继续往下执行。消息的处理就讲到这里,接着上一步的queue.next(),我们来看这个函数是怎么实现的。
frameworks\base\core\java\android\os\MessageQueue.java
Message next() { // Return here if the message loop has already quit and been disposed. // This can happen if the application tries to restart a looper after quit // which is not supported. final long ptr = mPtr; if (ptr == 0) { return null; } int pendingIdleHandlerCount = -1; // -1 only during first iteration int nextPollTimeoutMillis = 0; for (;;) { if (nextPollTimeoutMillis != 0) { Binder.flushPendingCommands(); } nativePollOnce(ptr, nextPollTimeoutMillis); synchronized (this) { // Try to retrieve the next message. Return if found. final long now = SystemClock.uptimeMillis(); Message prevMsg = null; Message msg = mMessages; if (msg != null && msg.target == null) { // Stalled by a barrier. Find the next asynchronous message in the queue. do { prevMsg = msg; msg = msg.next; } while (msg != null && !msg.isAsynchronous()); } if (msg != null) { if (now < msg.when) { // Next message is not ready. Set a timeout to wake up when it is ready. nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE); } else { // Got a message. mBlocked = false; if (prevMsg != null) { prevMsg.next = msg.next; } else { mMessages = msg.next; } msg.next = null; if (false) Log.v("MessageQueue", "Returning message: " + msg); return msg; } } else { // No more messages. nextPollTimeoutMillis = -1; } // Process the quit message now that all pending messages have been handled. if (mQuitting) { dispose(); return null; } // If first time idle, then get the number of idlers to run. // Idle handles only run if the queue is empty or if the first message // in the queue (possibly a barrier) is due to be handled in the future. if (pendingIdleHandlerCount < 0 && (mMessages == null || now < mMessages.when)) { pendingIdleHandlerCount = mIdleHandlers.size(); } if (pendingIdleHandlerCount <= 0) { // No idle handlers to run. Loop and wait some more. mBlocked = true; continue; } if (mPendingIdleHandlers == null) { mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)]; } mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers); } // Run the idle handlers. // We only ever reach this code block during the first iteration. for (int i = 0; i < pendingIdleHandlerCount; i++) { final IdleHandler idler = mPendingIdleHandlers[i]; mPendingIdleHandlers[i] = null; // release the reference to the handler boolean keep = false; try { keep = idler.queueIdle(); } catch (Throwable t) { Log.wtf("MessageQueue", "IdleHandler threw exception", t); } if (!keep) { synchronized (this) { mIdleHandlers.remove(idler); } } } // Reset the idle handler count to 0 so we do not run them again. pendingIdleHandlerCount = 0; // While calling an idle handler, a new message could have been delivered // so go back and look again for a pending message without waiting. nextPollTimeoutMillis = 0; } }
首先判断mPtr是不是为0,为0表示跟Native层NativeMessageQueue对象关联失败,直接返回null,从上一步可以知道这里返回null loop方法就会退出,同样线程也就跟着退出了。nextPollTimeoutMillis表示下一个消息将要在未来多久执行,nextPollTimeoutMillis的取值很重要!在下面通过nativePollOnce(ptr, nextPollTimeoutMillis)将nextPollTimeoutMillis的值传入了底层,nextPollTimeoutMillis的取值有三种情况,0,-1,和大于0。当等于0时,nativePollOnce不会阻塞,当等于-1时,nativePollOnce会一直阻塞知道被wake,大于0时,nativePollOnce会等待nativePollOnce指定值的时间,如果一直没有wake那么就会超时返回。nativePollOnce的实现我们暂时先不分析,下面我们继续分析nativePollOnce返回之后的流程
final long now = SystemClock.uptimeMillis(); Message prevMsg = null; Message msg = mMessages; if (msg != null && msg.target == null) { // Stalled by a barrier. Find the next asynchronous message in the queue. do { prevMsg = msg; msg = msg.next; } while (msg != null && !msg.isAsynchronous()); } if (msg != null) { if (now < msg.when) { // Next message is not ready. Set a timeout to wake up when it is ready. nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE); } else { // Got a message. mBlocked = false; if (prevMsg != null) { prevMsg.next = msg.next; } else { mMessages = msg.next; } msg.next = null; if (false) Log.v("MessageQueue", "Returning message: " + msg); return msg; } } else { // No more messages. nextPollTimeoutMillis = -1; }
上面这段代码主要是做逻辑判断,从消息队列中取出一条可以执行的消息,如果当前消息队列为空,那么nextPollTimeoutMillis = -1,nativePollOnce就会一直被阻塞,直到有新的Message被添加到队列并调用wake,wake是让Looper(Native)取消阻塞的,具体怎么实现后面再讲。如果消息队列不为空,那么就去找到当前可以执行的消息,找到了就直接返回这条消息,并将其从消息队列中移除。如果没找到可以执行的消息,就算出离现在执行时间最近的时间,将这个时间赋值给nextPollTimeoutMillis。 if (msg != null && msg.target == null) 这个if语句用于处理barrier消息的,barrier是什么我们在后面发送消息的时候再介绍,继续往下分析
// Process the quit message now that all pending messages have been handled. if (mQuitting) { dispose(); return null; } // If first time idle, then get the number of idlers to run. // Idle handles only run if the queue is empty or if the first message // in the queue (possibly a barrier) is due to be handled in the future. if (pendingIdleHandlerCount < 0 && (mMessages == null || now < mMessages.when)) { pendingIdleHandlerCount = mIdleHandlers.size(); } if (pendingIdleHandlerCount <= 0) { // No idle handlers to run. Loop and wait some more. mBlocked = true; continue; } if (mPendingIdleHandlers == null) { mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)]; } mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers);
这里先判断有没有退出,即mQuitting是不是true,是true就返回一个null消息,至此Looper消息循环机制就终止了。mQuitting前面介绍初始化的时候有介绍这个值。pendingIdleHandlerCount表示的是IdleHandler的数目,这里大致可以理解为当有IdleHandler的时候就会往下执行IdleHandler,没有就继续回到上面执行nativePollOnce。
// Run the idle handlers. // We only ever reach this code block during the first iteration. for (int i = 0; i < pendingIdleHandlerCount; i++) { final IdleHandler idler = mPendingIdleHandlers[i]; mPendingIdleHandlers[i] = null; // release the reference to the handler boolean keep = false; try { keep = idler.queueIdle(); } catch (Throwable t) { Log.wtf("MessageQueue", "IdleHandler threw exception", t); } if (!keep) { synchronized (this) { mIdleHandlers.remove(idler); } } } // Reset the idle handler count to 0 so we do not run them again. pendingIdleHandlerCount = 0; // While calling an idle handler, a new message could have been delivered // so go back and look again for a pending message without waiting. nextPollTimeoutMillis = 0;
这里就是执行IdleHandler的地方,注意如果IdleHandler的queueIdle方法返回false就会从队列中移除,表示这个IdleHandler只会执行一次,注意执行完IdleHandler后会将nextPollTimeoutMillis置为0,前面讲了0表示不阻塞直接返回。到这里Java层的消息循环就完成了,下面接着讲Native层是如何实现阻塞的,即nativePollOnce的实现。
frameworks\base\core\jni\android_os_MessageQueue.cpp
static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jclass clazz, jlong ptr, jint timeoutMillis) { NativeMessageQueue* nativeMessageQueue = reinterpret_cast(ptr); nativeMessageQueue->pollOnce(env, timeoutMillis);}
ptr是我们前面初始化NativeMessageQueue的时候存放其对象收地址的变量,这里通过首地址直接获取到NativeMessageQueue对象,获取到之后调用pollOnce,注意这里的timeoutMillis就是前面讲到的nextPollTimeoutMillis。
system\core\libutils\Looper.cpp
int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) { int result = 0; for (;;) { while (mResponseIndex < mResponses.size()) { const Response& response = mResponses.itemAt(mResponseIndex++); int ident = response.request.ident; if (ident >= 0) { int fd = response.request.fd; int events = response.events; void* data = response.request.data;#if DEBUG_POLL_AND_WAKE ALOGD("%p ~ pollOnce - returning signalled identifier %d: " "fd=%d, events=0x%x, data=%p", this, ident, fd, events, data);#endif if (outFd != NULL) *outFd = fd; if (outEvents != NULL) *outEvents = events; if (outData != NULL) *outData = data; return ident; } } if (result != 0) {#if DEBUG_POLL_AND_WAKE ALOGD("%p ~ pollOnce - returning result %d", this, result);#endif if (outFd != NULL) *outFd = 0; if (outEvents != NULL) *outEvents = 0; if (outData != NULL) *outData = NULL; return result; } result = pollInner(timeoutMillis); }}
pollOnce函数里面前面所有的方法都是对Native层消息的处理,Native层消息的处理我们以后再讲,我们来看关键阻塞的地方最后那一句pollInner的实现。
int Looper::pollInner(int timeoutMillis) {#if DEBUG_POLL_AND_WAKE ALOGD("%p ~ pollOnce - waiting: timeoutMillis=%d", this, timeoutMillis);#endif // Adjust the timeout based on when the next message is due. if (timeoutMillis != 0 && mNextMessageUptime != LLONG_MAX) { nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC); int messageTimeoutMillis = toMillisecondTimeoutDelay(now, mNextMessageUptime); if (messageTimeoutMillis >= 0 && (timeoutMillis < 0 || messageTimeoutMillis < timeoutMillis)) { timeoutMillis = messageTimeoutMillis; }#if DEBUG_POLL_AND_WAKE ALOGD("%p ~ pollOnce - next message in %lldns, adjusted timeout: timeoutMillis=%d", this, mNextMessageUptime - now, timeoutMillis);#endif } // Poll. int result = POLL_WAKE; mResponses.clear(); mResponseIndex = 0; // We are about to idle. mIdling = true; struct epoll_event eventItems[EPOLL_MAX_EVENTS]; int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis); // No longer idling. mIdling = false; // Acquire lock. mLock.lock(); // Check for poll error. if (eventCount < 0) { if (errno == EINTR) { goto Done; } ALOGW("Poll failed with an unexpected error, errno=%d", errno); result = POLL_ERROR; goto Done; } // Check for poll timeout. if (eventCount == 0) {#if DEBUG_POLL_AND_WAKE ALOGD("%p ~ pollOnce - timeout", this);#endif result = POLL_TIMEOUT; goto Done; } // Handle all events.#if DEBUG_POLL_AND_WAKE ALOGD("%p ~ pollOnce - handling events from %d fds", this, eventCount);#endif for (int i = 0; i < eventCount; i++) { int fd = eventItems[i].data.fd; uint32_t epollEvents = eventItems[i].events; if (fd == mWakeReadPipeFd) { if (epollEvents & EPOLLIN) { awoken(); } else { ALOGW("Ignoring unexpected epoll events 0x%x on wake read pipe.", epollEvents); } } else { ssize_t requestIndex = mRequests.indexOfKey(fd); if (requestIndex >= 0) { int events = 0; if (epollEvents & EPOLLIN) events |= EVENT_INPUT; if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT; if (epollEvents & EPOLLERR) events |= EVENT_ERROR; if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP; pushResponse(events, mRequests.valueAt(requestIndex)); } else { ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is " "no longer registered.", epollEvents, fd); } } }Done: ; // Invoke pending message callbacks. mNextMessageUptime = LLONG_MAX; while (mMessageEnvelopes.size() != 0) { nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC); const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0); if (messageEnvelope.uptime <= now) { // Remove the envelope from the list. // We keep a strong reference to the handler until the call to handleMessage // finishes. Then we drop it so that the handler can be deleted *before* // we reacquire our lock. { // obtain handler sp handler = messageEnvelope.handler; Message message = messageEnvelope.message; mMessageEnvelopes.removeAt(0); mSendingMessage = true; mLock.unlock();#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS ALOGD("%p ~ pollOnce - sending message: handler=%p, what=%d", this, handler.get(), message.what);#endif handler->handleMessage(message); } // release handler mLock.lock(); mSendingMessage = false; result = POLL_CALLBACK; } else { // The last message left at the head of the queue determines the next wakeup time. mNextMessageUptime = messageEnvelope.uptime; break; } } // Release lock. mLock.unlock(); // Invoke all response callbacks. for (size_t i = 0; i < mResponses.size(); i++) { Response& response = mResponses.editItemAt(i); if (response.request.ident == POLL_CALLBACK) { int fd = response.request.fd; int events = response.events; void* data = response.request.data;#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS ALOGD("%p ~ pollOnce - invoking fd event callback %p: fd=%d, events=0x%x, data=%p", this, response.request.callback.get(), fd, events, data);#endif int callbackResult = response.request.callback->handleEvent(fd, events, data); if (callbackResult == 0) { removeFd(fd); } // Clear the callback reference in the response structure promptly because we // will not clear the response vector itself until the next poll. response.request.callback.clear(); result = POLL_CALLBACK; } } return result;}
找到epoll_wait, epoll_wait调用之后线程就会阻塞在这个地方,前面介绍的三种阻塞方式其实就是epoll_wait最后一个参数timeoutMillis设置的,等0会直接返回,等-1会一直阻塞,大于0会一直阻塞直到超时。Native层大致就将这么多,以后再专门写一篇文章详细分析Native层的实现。
三、发送消息
Java层发送消息是通过Handler实现的,有三种类型的消息,第一种是发送一个空的消息,只需要指定int what这个值就行,第二种是可以发送Runnable为参数的消息,第三种是直接发送Message msg为参数的消息,其实前面两种方式跟第三种方是一样的,都会构建一个Message对象,然后添加到MessageQueue队列中。下面来看Handler中三种方式的具体实现。
frameworks\base\core\java\android\os\Handler.java
第一种方式:
/** * Sends a Message containing only the what value. * * @return Returns true if the message was successfully placed in to the * message queue. Returns false on failure, usually because the * looper processing the message queue is exiting. */ public final boolean sendEmptyMessage(int what) { return sendEmptyMessageDelayed(what, 0); } /** * Sends a Message containing only the what value, to be delivered * after the specified amount of time elapses. * @see #sendMessageDelayed(android.os.Message, long) * * @return Returns true if the message was successfully placed in to the * message queue. Returns false on failure, usually because the * looper processing the message queue is exiting. */ public final boolean sendEmptyMessageDelayed(int what, long delayMillis) { Message msg = Message.obtain(); msg.what = what; return sendMessageDelayed(msg, delayMillis); }
当我们调用sendEmptyMessage发送消息时,实际上就是调用的sendEmptyMessageDelayed,默认将delayMillis设置为0,在sendEmptyMessageDelayed可以看到通过Message.obtain()获取了一个消息体,然后将what变量赋值,最终调用sendMessageDelayed来发送这条Message。sendMessageDelayed的实现我们在介绍第三种方式的时候再将,这里我们先来看一下为什么要调用Message.obtain()去获取一条消息,而不是直接new Message的方式,事实上我们也不建议new Message的方式来获取消息,先来看Message.obtain()的代码实现。
frameworks\base\core\java\android\os\Message.java
/** * Return a new Message instance from the global pool. Allows us to * avoid allocating new objects in many cases. */ public static Message obtain() { synchronized (sPoolSync) { if (sPool != null) { Message m = sPool; sPool = m.next; m.next = null; m.flags = 0; // clear in-use flag sPoolSize--; /// M: Add message protect mechanism m.hasRecycle = false; return m; } } return new Message(); } /** * Recycles a Message that may be in-use. * Used internally by the MessageQueue and Looper when disposing of queued Messages. */ void recycleUnchecked() { // Mark the message as in use while it remains in the recycled object pool. // Clear out all other details. flags = FLAG_IN_USE; what = 0; arg1 = 0; arg2 = 0; obj = null; replyTo = null; sendingUid = -1; when = 0; target = null; callback = null; data = null; hasRecycle = true; synchronized (sPoolSync) { if (sPoolSize < MAX_POOL_SIZE) { next = sPool; sPool = this; sPoolSize++; } } }
从上面的代码可以看出调用obtain获取Message的流程是,首先判断消息池里面有没有消息,如果有Message就把消息池中的第一个Message取出来,并把这个Message从消息池中移除,然后把后面的Message放到消息池的第一个位置,做完这些操作后就直接把取出的Message返回给调用者。如果消息池中没有Message的话就直接new Message。看到这里可能大家还是没明白这个消息池到底是怎么一回事,没事,我们接着看下面那个函数recycleUnchecked的实现,这个函数是在Message执行完后被调用的,我们通过Handler发送一个Message后,通过Handler的handleMessage处理完这条Message后,recycleUnchecked就会被调用。分析代码,recycleUnchecked其实就是将这条已经使用过的Message重新初始化,如果当前消息池中的数目小于MAX_POOL_SIZE系统限定的值,就将其加入消息池的第一个位置,原消息池第一个位置的Message会被移动到第二个位置,大于就不会添加了。分析完这里我们就明白了为什么不建议new Message去创建一条消息,因为我们的Message使用完后会将其添加到消息池,而不会被虚拟机回收,达不到Message复用的效果。 消息的创建就讲到这里了,接下来继续将发送消息的第二种方式,先看代码。
frameworks\base\core\java\android\os\Handler.java
第二种方式:
/** * Causes the Runnable r to be added to the message queue. * The runnable will be run on the thread to which this handler is * attached. * * @param r The Runnable that will be executed. * * @return Returns true if the Runnable was successfully placed in to the * message queue. Returns false on failure, usually because the * looper processing the message queue is exiting. */ public final boolean post(Runnable r) { return sendMessageDelayed(getPostMessage(r), 0); } private static Message getPostMessage(Runnable r) { Message m = Message.obtain(); m.callback = r; return m; }
第二种方式可以接受一个Runnable类型的参数,其实也就是将Runnable参数传递给了Message的callback变量,这里代码很简单,我就不做详细分析了。看第三种方式吧。
frameworks\base\core\java\android\os\Handler.java
第三种方式:
/** * Pushes a message onto the end of the message queue after all pending messages * before the current time. It will be received in {@link #handleMessage}, * in the thread attached to this handler. * * @return Returns true if the message was successfully placed in to the * message queue. Returns false on failure, usually because the * looper processing the message queue is exiting. */ public final boolean sendMessage(Message msg) { return sendMessageDelayed(msg, 0); } /** * Enqueue a message into the message queue after all pending messages * before (current time + delayMillis). You will receive it in * {@link #handleMessage}, in the thread attached to this handler. * * @return Returns true if the message was successfully placed in to the * message queue. Returns false on failure, usually because the * looper processing the message queue is exiting. Note that a * result of true does not mean the message will be processed -- if * the looper is quit before the delivery time of the message * occurs then the message will be dropped. */ public final boolean sendMessageDelayed(Message msg, long delayMillis) { if (delayMillis < 0) { delayMillis = 0; } return sendMessageAtTime(msg, SystemClock.uptimeMillis() + delayMillis); } /** * Enqueue a message into the message queue after all pending messages * before the absolute time (in milliseconds) uptimeMillis. * The time-base is {@link android.os.SystemClock#uptimeMillis}. * Time spent in deep sleep will add an additional delay to execution. * You will receive it in {@link #handleMessage}, in the thread attached * to this handler. * * @param uptimeMillis The absolute time at which the message should be * delivered, using the * {@link android.os.SystemClock#uptimeMillis} time-base. * * @return Returns true if the message was successfully placed in to the * message queue. Returns false on failure, usually because the * looper processing the message queue is exiting. Note that a * result of true does not mean the message will be processed -- if * the looper is quit before the delivery time of the message * occurs then the message will be dropped. */ public boolean sendMessageAtTime(Message msg, long uptimeMillis) { MessageQueue queue = mQueue; if (queue == null) { RuntimeException e = new RuntimeException( this + " sendMessageAtTime() called with no mQueue"); Log.w("Looper", e.getMessage(), e); return false; } return enqueueMessage(queue, msg, uptimeMillis); } private boolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis) { msg.target = this; if (mAsynchronous) { msg.setAsynchronous(true); } return queue.enqueueMessage(msg, uptimeMillis); }
sendMessage和sendMessageDelayed很简单,这里我就不多做分析了,sendMessageAtTime会先判断当前消息队列存不存在,不存在直接返回,不继续往下执行了。在enqueueMessage中的第一行代码就是将当前的Handler赋值给了msg.target,这个赋值很关键,后面执行Handler的handleMessage方法就是通过这个msg.target执行的,下面判断mAsynchronous是不是真,为true表示这是异步消息,false表示非异步消息。那么什么又是异步消息和非异步消息呢?要明白Asynchronous的作用,需要先了解一个概念Barrier。
Barrier与Asynchronous Message
Barrier是什么意思呢,从名字看是一个拦截器,在这个拦截器后面的消息都暂时无法执行,直到这个拦截器被移除了,MessageQueue有一个函数叫enqueueSyncBarier可以添加一个Barrier。
frameworks\base\core\java\android\os\Looper.java
/** * Posts a synchronization barrier to the Looper's message queue. * * Message processing occurs as usual until the message queue encounters the * synchronization barrier that has been posted. When the barrier is encountered, * later synchronous messages in the queue are stalled (prevented from being executed) * until the barrier is released by calling {@link #removeSyncBarrier} and specifying * the token that identifies the synchronization barrier. * * This method is used to immediately postpone execution of all subsequently posted * synchronous messages until a condition is met that releases the barrier. * Asynchronous messages (see {@link Message#isAsynchronous} are exempt from the barrier * and continue to be processed as usual. * * This call must be always matched by a call to {@link #removeSyncBarrier} with * the same token to ensure that the message queue resumes normal operation. * Otherwise the application will probably hang! * * @return A token that uniquely identifies the barrier. This token must be * passed to {@link #removeSyncBarrier} to release the barrier. * * @hide */ public int postSyncBarrier() { return mQueue.enqueueSyncBarrier(SystemClock.uptimeMillis()); }
frameworks\base\core\java\android\os\MessageQueue.java
int enqueueSyncBarrier(long when) { // Enqueue a new sync barrier token. // We don't need to wake the queue because the purpose of a barrier is to stall it. synchronized (this) { final int token = mNextBarrierToken++; final Message msg = Message.obtain(); msg.markInUse(); msg.when = when; msg.arg1 = token; Message prev = null; Message p = mMessages; if (when != 0) { while (p != null && p.when <= when) { prev = p; p = p.next; } } if (prev != null) { // invariant: p == prev.next msg.next = p; prev.next = msg; } else { msg.next = p; mMessages = msg; } return token; } }
从上面的代码分析,我们要设置一个Barrier,只能调用Looper中的postSyncBarrier方法,在enqueueSyncBarrier中,obtain了一个Message,并设置msg.arg1=token,token仅是一个每次调用enqueueSyncBarrier时自增的int值,目的是每次调用enqueueSyncBarrier时返回唯一的一个token,这个Message同样需要设置执行时间,然后插入到消息队列,特殊的是这个Message没有设置target,即msg.target为null。 进入消息循环后会不停地从MessageQueue中取消息执行,调用的是MessageQueue的next函数,其中有这么一段:
Message msg = mMessages;if (msg != null && msg.target == null) { // Stalled by a barrier. Find the next asynchronous message in the queue. do { prevMsg = msg; msg = msg.next; } while (msg != null && !msg.isAsynchronous());}
如果队列头部的消息的target为null就表示它是个Barrier,因为只有两种方法往mMessages中添加消息,一种是enqueueMessage,另一种是enqueueBarrier,而enqueueMessage中如果mst.target为null是直接抛异常的,后面会看到。 所谓的异步消息其实就是这样的,我们可以通过enqueueBarrier往消息队列中插入一个Barrier,那么队列中执行时间在这个Barrier以后的同步消息都会被这个Barrier拦截住无法执行,直到我们调用removeBarrier移除了这个Barrier,而异步消息则没有影响,消息默认就是同步消息,除非我们调用了Message的setAsynchronous,这个方法是隐藏的。只有在初始化Handler时通过参数指定往这个Handler发送的消息都是异步的,这样在Handler的enqueueMessage中就会调用Message的setAsynchronous设置消息是异步的,从上面Handler.enqueueMessage的代码中可以看到。 所谓异步消息,其实只有一个作用,就是在设置Barrier时仍可以不受Barrier的影响被正常处理,如果没有设置Barrier,异步消息就与同步消息没有区别,可以通过Looper中的removeSyncBarrier移除Barrier。
enqueueMessage的实现
frameworks\base\core\java\android\os\MessageQueue.java
boolean enqueueMessage(Message msg, long when) { if (msg.target == null) { throw new IllegalArgumentException("Message must have a target."); } if (msg.isInUse()) { throw new IllegalStateException(msg + " This message is already in use."); } synchronized (this) { if (mQuitting) { IllegalStateException e = new IllegalStateException( msg.target + " sending message to a Handler on a dead thread"); Log.w("MessageQueue", e.getMessage(), e); msg.recycle(); return false; } msg.markInUse(); msg.when = when; Message p = mMessages; boolean needWake; if (p == null || when == 0 || when < p.when) { // New head, wake up the event queue if blocked. msg.next = p; mMessages = msg; needWake = mBlocked; } else { // Inserted within the middle of the queue. Usually we don't have to wake // up the event queue unless there is a barrier at the head of the queue // and the message is the earliest asynchronous message in the queue. needWake = mBlocked && p.target == null && msg.isAsynchronous(); Message prev; for (;;) { prev = p; p = p.next; if (p == null || when < p.when) { break; } if (needWake && p.isAsynchronous()) { needWake = false; } } msg.next = p; // invariant: p == prev.next prev.next = msg; } // We can assume mPtr != 0 because mQuitting is false. if (needWake) { nativeWake(mPtr); } } return true; }
enqueueMessage首先会检测msg.target是不是为null和msg.isInUse()是不是成立,如果条件成立就会抛出异常,然后再判断消息循环机制是不是退出了,如果退出了就直接返回。下面的逻辑我梳理一下,可以这么去理解,首先要明白一点是消息队列中的Message排序是通过Message将要执行的时间间隔实现的,间隔越小的排在队列前面。enqueueMessage在插入新Message的情况是消息队列中没有消息或者消息队列中第一个Message的执行间隔大于当前要设置Message的执行间隔。 如果不满足这些条件就会插入消息队列中的合适的位置。注意 nativeWake(mPtr)这个函数的作用是唤醒Native层的阻塞。使MessageQueue中next方法的nativePollOnce得到返回,可以继续执行nativePollOnce后面的代码。nativeWake唤醒的原理就是向管道发送一个字符,kernel层检测到管道中有数据之后,epoll_wait就能够返回了。epoll有三种唤醒方式,可读,可写和超时。这里就是可读的方式来唤醒epoll_wait。
更多相关文章
- 浅析android线程模型【Android】
- Android消息机制,从Java层到Native层剖析
- Umeng推送消息的坑,Android Service的android:exported详解
- Android消息机制-深入理解消息队列的工作模式
- Android应用程序的消息处理机制
- Android(线程二) 线程池详解
- Android消息机制初步分析