1 enqueueMessage


handler发送一条消息

mHandler.sendEmptyMessage(1);
经过层层调用,进入到sendMessageAtTime函数块,最后调用到enqueueMessage

Handler.java

public boolean sendMessageAtTime(Message msg, long uptimeMillis) {        MessageQueue queue = mQueue;        if (queue == null) {            RuntimeException e = new RuntimeException(                    this + " sendMessageAtTime() called with no mQueue");            Log.w("Looper", e.getMessage(), e);            return false;        }        return enqueueMessage(queue, msg, uptimeMillis);    }


最后调用到Handler私有的函数enqueueMessage,把handler对象赋值给msg.target,调用queue.enqueueMessage

Handler.java
private boolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis) {        msg.target = this;        if (mAsynchronous) {            msg.setAsynchronous(true);        }        return queue.enqueueMessage(msg, uptimeMillis);    }


下面是核心代码,首先是获得同步锁,

MessageQueue.java

boolean enqueueMessage(Message msg, long when) {        if (msg.isInUse()) {            throw new AndroidRuntimeException(msg + " This message is already in use.");        }        if (msg.target == null) {            throw new AndroidRuntimeException("Message must have a target.");        }        synchronized (this) {            if (mQuitting) {                RuntimeException e = new RuntimeException(                        msg.target + " sending message to a Handler on a dead thread");                Log.w("MessageQueue", e.getMessage(), e);                return false;            }            msg.when = when;            Message p = mMessages;            boolean needWake;            if (p == null || when == 0 || when < p.when) {                // New head, wake up the event queue if blocked.                msg.next = p;                mMessages = msg;                needWake = mBlocked;            } else {                // Inserted within the middle of the queue.  Usually we don't have to wake                // up the event queue unless there is a barrier at the head of the queue                // and the message is the earliest asynchronous message in the queue.                needWake = mBlocked && p.target == null && msg.isAsynchronous();                Message prev;                for (;;) {                    prev = p;                    p = p.next;                    if (p == null || when < p.when) {                        break;                    }                    if (needWake && p.isAsynchronous()) {                        needWake = false;                    }                }                msg.next = p; // invariant: p == prev.next                prev.next = msg;            }            // We can assume mPtr != 0 because mQuitting is false.            if (needWake) {                nativeWake(mPtr);            }        }        return true;    }

首先是获得自身的同步锁synchronized (this),接着这个msg跟MessageQueue实例的头结点Message进行触发时间先后的比较,

如果触发时间比现有的头结点Message前,则这个新的Message作为整个MessageQueue的头结点,如果阻塞着,则立即唤醒线程处理

如果触发时间比头结点晚,则按照触发时间先后,在消息队列中间插入这个结点

接着如果需要唤醒,则调用nativeWake函数


在android_os_MessageQueue.cpp里定义了nativeWake函数

static void android_os_MessageQueue_nativeWake(JNIEnv* env, jobject obj, jint ptr) {    NativeMessageQueue* nativeMessageQueue = reinterpret_cast(ptr);    return nativeMessageQueue->wake();}

实际调用到mLooper->wake();

android_os_MessageQueue.cpp

void NativeMessageQueue::wake() {    mLooper->wake();}
而mLooper是cpp层的Looper对象,

framework/base/libs/utils/Looper.cpp

void Looper::wake() {#if DEBUG_POLL_AND_WAKE    LOGD("%p ~ wake", this);#endif#ifdef LOOPER_STATISTICS    // FIXME: Possible race with awoken() but this code is for testing only and is rarely enabled.    if (mPendingWakeCount++ == 0) {        mPendingWakeTime = systemTime(SYSTEM_TIME_MONOTONIC);    }#endif    ssize_t nWrite;    do {        nWrite = write(mWakeWritePipeFd, "W", 1);    } while (nWrite == -1 && errno == EINTR);    if (nWrite != 1) {        if (errno != EAGAIN) {            LOGW("Could not write wake signal, errno=%d", errno);        }    }}

是不是很熟悉?基本就是上一讲epoll原型的唤醒函数,向mWakeWritePipeFD写入1字节,唤醒监听block在mWakeReadPipeFD端口的epoll_wait



2 dequeueMessage


首先dequeueMessage只是我取的一个叫法,当java层的Looper进行loop的时候,就已经在不停地读取MessageQueue里的Message了

Looper.java

public static void loop() {        final Looper me = myLooper();        if (me == null) {            throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread.");        }        final MessageQueue queue = me.mQueue;        // Make sure the identity of this thread is that of the local process,        // and keep track of what that identity token actually is.        Binder.clearCallingIdentity();        final long ident = Binder.clearCallingIdentity();        for (;;) {            Message msg = queue.next(); // might block            if (msg == null) {                // No message indicates that the message queue is quitting.                return;            }            // This must be in a local variable, in case a UI event sets the logger            Printer logging = me.mLogging;            if (logging != null) {                logging.println(">>>>> Dispatching to " + msg.target + " " +                        msg.callback + ": " + msg.what);            }            msg.target.dispatchMessage(msg);            if (logging != null) {                logging.println("<<<<< Finished to " + msg.target + " " + msg.callback);            }            // Make sure that during the course of dispatching the            // identity of the thread wasn't corrupted.            final long newIdent = Binder.clearCallingIdentity();            if (ident != newIdent) {                Log.wtf(TAG, "Thread identity changed from 0x"                        + Long.toHexString(ident) + " to 0x"                        + Long.toHexString(newIdent) + " while dispatching to "                        + msg.target.getClass().getName() + " "                        + msg.callback + " what=" + msg.what);            }            msg.recycle();        }    }

调用queue.next()读取下一条消息(在loop调用的线程中),如果读取到了就msg,target.dispatchMessage,

下面来看看queue.next()如何实现


MessageQueue.java

Message next() {        int pendingIdleHandlerCount = -1; // -1 only during first iteration        int nextPollTimeoutMillis = 0;        for (;;) {            if (nextPollTimeoutMillis != 0) {                Binder.flushPendingCommands();            }            // We can assume mPtr != 0 because the loop is obviously still running.            // The looper will not call this method after the loop quits.            nativePollOnce(mPtr, nextPollTimeoutMillis);            synchronized (this) {                // Try to retrieve the next message.  Return if found.                final long now = SystemClock.uptimeMillis();                Message prevMsg = null;                Message msg = mMessages;                if (msg != null && msg.target == null) {                    // Stalled by a barrier.  Find the next asynchronous message in the queue.                    do {                        prevMsg = msg;                        msg = msg.next;                    } while (msg != null && !msg.isAsynchronous());                }                if (msg != null) {                    if (now < msg.when) {                        // Next message is not ready.  Set a timeout to wake up when it is ready.                        nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);                    } else {                        // Got a message.                        mBlocked = false;                        if (prevMsg != null) {                            prevMsg.next = msg.next;                        } else {                            mMessages = msg.next;                        }                        msg.next = null;                        if (false) Log.v("MessageQueue", "Returning message: " + msg);                        msg.markInUse();                        return msg;                    }                } else {                    // No more messages.                    nextPollTimeoutMillis = -1;                }                // Process the quit message now that all pending messages have been handled.                if (mQuitting) {                    dispose();                    return null;                }                // If first time idle, then get the number of idlers to run.                // Idle handles only run if the queue is empty or if the first message                // in the queue (possibly a barrier) is due to be handled in the future.                if (pendingIdleHandlerCount < 0                        && (mMessages == null || now < mMessages.when)) {                    pendingIdleHandlerCount = mIdleHandlers.size();                }                if (pendingIdleHandlerCount <= 0) {                    // No idle handlers to run.  Loop and wait some more.                    mBlocked = true;                    continue;                }                if (mPendingIdleHandlers == null) {                    mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)];                }                mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers);            }            // Run the idle handlers.            // We only ever reach this code block during the first iteration.            for (int i = 0; i < pendingIdleHandlerCount; i++) {                final IdleHandler idler = mPendingIdleHandlers[i];                mPendingIdleHandlers[i] = null; // release the reference to the handler                boolean keep = false;                try {                    keep = idler.queueIdle();                } catch (Throwable t) {                    Log.wtf("MessageQueue", "IdleHandler threw exception", t);                }                if (!keep) {                    synchronized (this) {                        mIdleHandlers.remove(idler);                    }                }            }            // Reset the idle handler count to 0 so we do not run them again.            pendingIdleHandlerCount = 0;            // While calling an idle handler, a new message could have been delivered            // so go back and look again for a pending message without waiting.            nextPollTimeoutMillis = 0;        }    }

首先是个包内函数,所以在同一个包中(android.os)的Looper对象能调用到

nativePollOnce(mPtr, nextPollTimeoutMillis);函数待会展开,功能是调用上一讲的epoll_wait,

nextPollTimeoutMillis超时时间为下一条Message的触发时间,如果没有消息则会一直阻塞到超过超时时间

被唤醒后,我们暂时先忽略barrier类型的Message(这是android4.1后加入的一个特性Choreographer,http://blog.csdn.net/innost/article/details/8272867),

如果头结点msg不为null,就判断现在到了这条msg触发时间没有,

如果没到,则nextPollTimeoutMillis设置为这个条消息需要执行的时间和现在的时间差,给for循环下一次调用nativePollOnce时使用

如果到了甚至超过了,则取出这条msg,退出for循环返回这条msg,给上面上的handler进行dispatch


那么nativePollOnce具体是如何实现的呢?

android_os_MessageQueue.cpp

static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,        jint ptr, jint timeoutMillis) {    NativeMessageQueue* nativeMessageQueue = reinterpret_cast(ptr);    nativeMessageQueue->pollOnce(timeoutMillis);}


调用到了nativeMessageQueue->pollOnce

android_os_MessageQueue.cpp

void NativeMessageQueue::pollOnce(int timeoutMillis) {    mLooper->pollOnce(timeoutMillis);}


调用到了mLooper->pollOnce

同样,在framework/base/libs/utils/Looper.cpp中

int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {    int result = 0;    for (;;) {        while (mResponseIndex < mResponses.size()) {            const Response& response = mResponses.itemAt(mResponseIndex++);            if (! response.request.callback) {#if DEBUG_POLL_AND_WAKE                LOGD("%p ~ pollOnce - returning signalled identifier %d: "                        "fd=%d, events=0x%x, data=%p", this,                        response.request.ident, response.request.fd,                        response.events, response.request.data);#endif                if (outFd != NULL) *outFd = response.request.fd;                if (outEvents != NULL) *outEvents = response.events;                if (outData != NULL) *outData = response.request.data;                return response.request.ident;            }        }        if (result != 0) {#if DEBUG_POLL_AND_WAKE            LOGD("%p ~ pollOnce - returning result %d", this, result);#endif            if (outFd != NULL) *outFd = 0;            if (outEvents != NULL) *outEvents = NULL;            if (outData != NULL) *outData = NULL;            return result;        }        result = pollInner(timeoutMillis);    }}
因为这个流程和mResponses无关,先忽略这部分,

调用到pollInner

framework/base/libs/utils/Looper.cpp

nt Looper::pollInner(int timeoutMillis) {#if DEBUG_POLL_AND_WAKE    LOGD("%p ~ pollOnce - waiting: timeoutMillis=%d", this, timeoutMillis);#endif    int result = ALOOPER_POLL_WAKE;    mResponses.clear();    mResponseIndex = 0;#ifdef LOOPER_STATISTICS    nsecs_t pollStartTime = systemTime(SYSTEM_TIME_MONOTONIC);#endif#ifdef LOOPER_USES_EPOLL    struct epoll_event eventItems[EPOLL_MAX_EVENTS];    int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);    bool acquiredLock = false;#else    // Wait for wakeAndLock() waiters to run then set mPolling to true.    mLock.lock();    while (mWaiters != 0) {        mResume.wait(mLock);    }    mPolling = true;    mLock.unlock();    size_t requestedCount = mRequestedFds.size();    int eventCount = poll(mRequestedFds.editArray(), requestedCount, timeoutMillis);#endif    if (eventCount < 0) {        if (errno == EINTR) {            goto Done;        }        LOGW("Poll failed with an unexpected error, errno=%d", errno);        result = ALOOPER_POLL_ERROR;        goto Done;    }    if (eventCount == 0) {#if DEBUG_POLL_AND_WAKE        LOGD("%p ~ pollOnce - timeout", this);#endif        result = ALOOPER_POLL_TIMEOUT;        goto Done;    }#if DEBUG_POLL_AND_WAKE    LOGD("%p ~ pollOnce - handling events from %d fds", this, eventCount);#endif#ifdef LOOPER_USES_EPOLL    for (int i = 0; i < eventCount; i++) {        int fd = eventItems[i].data.fd;        uint32_t epollEvents = eventItems[i].events;        if (fd == mWakeReadPipeFd) {            if (epollEvents & EPOLLIN) {                awoken();            } else {                LOGW("Ignoring unexpected epoll events 0x%x on wake read pipe.", epollEvents);            }        } else {            if (! acquiredLock) {                mLock.lock();                acquiredLock = true;            }            ssize_t requestIndex = mRequests.indexOfKey(fd);            if (requestIndex >= 0) {                int events = 0;                if (epollEvents & EPOLLIN) events |= ALOOPER_EVENT_INPUT;                if (epollEvents & EPOLLOUT) events |= ALOOPER_EVENT_OUTPUT;                if (epollEvents & EPOLLERR) events |= ALOOPER_EVENT_ERROR;                if (epollEvents & EPOLLHUP) events |= ALOOPER_EVENT_HANGUP;                pushResponse(events, mRequests.valueAt(requestIndex));            } else {                LOGW("Ignoring unexpected epoll events 0x%x on fd %d that is "                        "no longer registered.", epollEvents, fd);            }        }    }    if (acquiredLock) {        mLock.unlock();    }Done: ;#else    for (size_t i = 0; i < requestedCount; i++) {        const struct pollfd& requestedFd = mRequestedFds.itemAt(i);        short pollEvents = requestedFd.revents;        if (pollEvents) {            if (requestedFd.fd == mWakeReadPipeFd) {                if (pollEvents & POLLIN) {                    awoken();                } else {                    LOGW("Ignoring unexpected poll events 0x%x on wake read pipe.", pollEvents);                }            } else {                int events = 0;                if (pollEvents & POLLIN) events |= ALOOPER_EVENT_INPUT;                if (pollEvents & POLLOUT) events |= ALOOPER_EVENT_OUTPUT;                if (pollEvents & POLLERR) events |= ALOOPER_EVENT_ERROR;                if (pollEvents & POLLHUP) events |= ALOOPER_EVENT_HANGUP;                if (pollEvents & POLLNVAL) events |= ALOOPER_EVENT_INVALID;                pushResponse(events, mRequests.itemAt(i));            }            if (--eventCount == 0) {                break;            }        }    }Done:    // Set mPolling to false and wake up the wakeAndLock() waiters.    mLock.lock();    mPolling = false;    if (mWaiters != 0) {        mAwake.broadcast();    }    mLock.unlock();#endif#ifdef LOOPER_STATISTICS    nsecs_t pollEndTime = systemTime(SYSTEM_TIME_MONOTONIC);    mSampledPolls += 1;    if (timeoutMillis == 0) {        mSampledZeroPollCount += 1;        mSampledZeroPollLatencySum += pollEndTime - pollStartTime;    } else if (timeoutMillis > 0 && result == ALOOPER_POLL_TIMEOUT) {        mSampledTimeoutPollCount += 1;        mSampledTimeoutPollLatencySum += pollEndTime - pollStartTime                - milliseconds_to_nanoseconds(timeoutMillis);    }    if (mSampledPolls == SAMPLED_POLLS_TO_AGGREGATE) {        LOGD("%p ~ poll latency statistics: %0.3fms zero timeout, %0.3fms non-zero timeout", this,                0.000001f * float(mSampledZeroPollLatencySum) / mSampledZeroPollCount,                0.000001f * float(mSampledTimeoutPollLatencySum) / mSampledTimeoutPollCount);        mSampledPolls = 0;        mSampledZeroPollCount = 0;        mSampledZeroPollLatencySum = 0;        mSampledTimeoutPollCount = 0;        mSampledTimeoutPollLatencySum = 0;    }#endif    for (size_t i = 0; i < mResponses.size(); i++) {        const Response& response = mResponses.itemAt(i);        if (response.request.callback) {#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS            LOGD("%p ~ pollOnce - invoking callback: fd=%d, events=0x%x, data=%p", this,                    response.request.fd, response.events, response.request.data);#endif            int callbackResult = response.request.callback(                    response.request.fd, response.events, response.request.data);            if (callbackResult == 0) {                removeFd(response.request.fd);            }            result = ALOOPER_POLL_CALLBACK;        }    }    return result;}

主要看#ifdef  LOOPER_USES_EPOLL部分

int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);

等待所有attach到mEpollFd上的事件,如果收到唤醒信号继续执行,否则阻塞等待

之后的#ifdef  LOOPER_USES_EPOLL部分

#ifdef LOOPER_USES_EPOLL    for (int i = 0; i < eventCount; i++) {        int fd = eventItems[i].data.fd;        uint32_t epollEvents = eventItems[i].events;        if (fd == mWakeReadPipeFd) {            if (epollEvents & EPOLLIN) {                awoken();            } else {                LOGW("Ignoring unexpected epoll events 0x%x on wake read pipe.", epollEvents);            }        } else {            if (! acquiredLock) {                mLock.lock();                acquiredLock = true;            }            ssize_t requestIndex = mRequests.indexOfKey(fd);            if (requestIndex >= 0) {                int events = 0;                if (epollEvents & EPOLLIN) events |= ALOOPER_EVENT_INPUT;                if (epollEvents & EPOLLOUT) events |= ALOOPER_EVENT_OUTPUT;                if (epollEvents & EPOLLERR) events |= ALOOPER_EVENT_ERROR;                if (epollEvents & EPOLLHUP) events |= ALOOPER_EVENT_HANGUP;                pushResponse(events, mRequests.valueAt(requestIndex));            } else {                LOGW("Ignoring unexpected epoll events 0x%x on fd %d that is "                        "no longer registered.", epollEvents, fd);            }        }    }    if (acquiredLock) {        mLock.unlock();    }Done: ;

对所有attach在mEpollFd上的事件进行遍历,如果对象文件描述符有mWakeReadPipeFd,则awoken()

framework/base/libs/utils/Looper.cpp

void Looper::awoken() {#if DEBUG_POLL_AND_WAKE    LOGD("%p ~ awoken", this);#endif#ifdef LOOPER_STATISTICS    if (mPendingWakeCount == 0) {        LOGD("%p ~ awoken: spurious!", this);    } else {        mSampledWakeCycles += 1;        mSampledWakeCountSum += mPendingWakeCount;        mSampledWakeLatencySum += systemTime(SYSTEM_TIME_MONOTONIC) - mPendingWakeTime;        mPendingWakeCount = 0;        mPendingWakeTime = -1;        if (mSampledWakeCycles == SAMPLED_WAKE_CYCLES_TO_AGGREGATE) {            LOGD("%p ~ wake statistics: %0.3fms wake latency, %0.3f wakes per cycle", this,                    0.000001f * float(mSampledWakeLatencySum) / mSampledWakeCycles,                    float(mSampledWakeCountSum) / mSampledWakeCycles);            mSampledWakeCycles = 0;            mSampledWakeCountSum = 0;            mSampledWakeLatencySum = 0;        }    }#endif    char buffer[16];    ssize_t nRead;    do {        nRead = read(mWakeReadPipeFd, buffer, sizeof(buffer));    } while ((nRead == -1 && errno == EINTR) || nRead == sizeof(buffer));}

awoken()即上一讲中得awoken()函数,用于把mWakeReadPipeFd上的数据读取干净,因为mWakeWriteReadPipeFd可能写入多次

读取干净后下一次epoll_wait时就会等待mWakeWriteReadPipeFd写入,如果没有读取干净,即通知epoll内核和mWakeReadPipeFd这个事件相关的处理完毕了,

否则epoll_wait就一直会触发对应的事件了(不等待新的写入,一直不阻塞)


3 总结

那么至此,enqueueMessage和定义dequeueMessage都解释清楚,感觉豁然开朗了有木有!!!!

下一讲讲nativeapp的线程消息循环处理过程(主要解读android_native_app_glue.c)

欢迎各位指正!!


4 reference

android sdk sourcecode

android framework sourcecode




更多相关文章

  1. C语言函数的递归(上)
  2. Android动态显示具体到秒的相聚时间
  3. Js中JSON.parse函数解析导致的数据异常
  4. rockchip rk3368(px5)车载开发之路-bug解决篇1.快速倒车和正常系
  5. Android(安卓)APP首次登录和之后自动登录流程
  6. 在 Android(安卓)上使用 XML 和 JSON,第 2 部分: 交付混合了 JSON
  7. Linux Kernel and Android(安卓)休眠与唤醒(中文版)
  8. 在android的java代码中自定义log
  9. Android应用程序窗口(Activity)的测量(Measure)、布局(Layout)和绘制(Dr

随机推荐

  1. 初识javascript编程模式
  2. 即使通过一系列图像预先加载也表现不佳
  3. 返回指定时间段相同间隔数组
  4. JavaScript 中的相等性判断
  5. 前端文件上传原理
  6. 通过Angular Factory过滤,切片,排序Javascr
  7. javascript添加两个文本框值,并在asp.net
  8. $ postLink的角度组件/指令运行得太早
  9. 如何在Sencha Touch中向模型添加自定义验
  10. js获取点击事件的位置,兼容主流浏览器