




class Thread_B extends Thread{    private final static String TAG = "Thread_B:";    public Handler mHandler;    public Handler GetHandler(){        return mHandler;    }    public void run(){        Looper.prepare();        mHandler = new Handler(){            public void handleMessage(Message msg){                Log.d(TAG,"get msg=" + msg.what);            }        };        Looper.loop();    }}public class MainActivity extends AppCompatActivity {    @Override    protected void onCreate(Bundle savedInstanceState) {        super.onCreate(savedInstanceState);        setContentView(R.layout.activity_main);        //创建Thread_B线程,在run()方法中打印thread_a线程发送过来的消息.        final Thread_B thread_b = new Thread_B();        thread_b.start();        //间隔500ms向thread_b发送消息.        Thread thread_a = new Thread(){            public void run() {                int what = 1;                while(true){                    try {                        Thread.sleep(500);                    } catch (InterruptedException e) {                        e.printStackTrace();                    }                    thread_b.GetHandler().sendEmptyMessage(what++);                }            }        };        thread_a.start();    }}







        Looper.prepare();        mHandler = new Handler(){            public void handleMessage(Message msg){                Log.d(TAG,"get msg=" + msg.what);            }        };        Looper.loop();


    private static void prepare(boolean quitAllowed) {        if (sThreadLocal.get() != null) {            throw new RuntimeException("Only one Looper may be created per thread");        }        sThreadLocal.set(new Looper(quitAllowed));    }


    private Looper(boolean quitAllowed) {        mQueue = new MessageQueue(quitAllowed);        mThread = Thread.currentThread();    }


    //构造函数1    public Handler() {        this(null, false);    }    //构造函数2    public Handler(Callback callback, boolean async) {        //...        mLooper = Looper.myLooper();        if (mLooper == null) {            throw new RuntimeException(                "Can't create handler inside thread that has not called Looper.prepare()");        }        mQueue = mLooper.mQueue;        mCallback = callback;        mAsynchronous = async;    }


    public static void loop() {        final Looper me = myLooper();        if (me == null) {            throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread.");        }        final MessageQueue queue = me.mQueue;        for (;;) {            Message msg = queue.next(); // might block            if (msg == null) {                // No message indicates that the message queue is quitting.                return;            }           //...           msg.target.dispatchMessage(msg);          //...        }    }    Message next() {        //...        for (;;) {            nativePollOnce(ptr, nextPollTimeoutMillis);            synchronized (this) {                // Try to retrieve the next message.  Return if found.                final long now = SystemClock.uptimeMillis();                Message prevMsg = null;                Message msg = mMessages;                if (msg != null && msg.target == null) {                    // Stalled by a barrier.  Find the next asynchronous message in the queue.                    do {                        prevMsg = msg;                        msg = msg.next;                    } while (msg != null && !msg.isAsynchronous());                }                if (msg != null) {                    if (now < msg.when) {                        // Next message is not ready.  Set a timeout to wake up when it is ready.                        nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);                    } else {                        // Got a message.                        mBlocked = false;                        if (prevMsg != null) {                            prevMsg.next = msg.next;                        } else {                            mMessages = msg.next;                        }                        msg.next = null;                        if (DEBUG) Log.v(TAG, "Returning message: " + msg);                        msg.markInUse();                        return msg;                    }                } else {                    // No more messages.                    nextPollTimeoutMillis = -1;                }                // Process the quit message now that all pending messages have been handled.                if (mQuitting) {                    dispose();                    return null;                }          //...    }


    public void dispatchMessage(Message msg) {        if (msg.callback != null) {            handleCallback(msg);        } else {            if (mCallback != null) {                if (mCallback.handleMessage(msg)) {                    return;                }            }            handleMessage(msg);        }    }






    public final boolean sendEmptyMessage(int what)    {        return sendEmptyMessageDelayed(what, 0);    }    public final boolean sendEmptyMessageDelayed(int what, long delayMillis) {        Message msg = Message.obtain();        msg.what = what;        return sendMessageDelayed(msg, delayMillis);    }    public final boolean sendMessageDelayed(Message msg, long delayMillis)    {        if (delayMillis < 0) {            delayMillis = 0;        }        return sendMessageAtTime(msg, SystemClock.uptimeMillis() + delayMillis);    }    public boolean sendMessageAtTime(Message msg, long uptimeMillis) {        MessageQueue queue = mQueue;        if (queue == null) {            RuntimeException e = new RuntimeException(                    this + " sendMessageAtTime() called with no mQueue");            Log.w("Looper", e.getMessage(), e);            return false;        }        return enqueueMessage(queue, msg, uptimeMillis);    }    private boolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis) {        msg.target = this;        if (mAsynchronous) {            msg.setAsynchronous(true);        }        return queue.enqueueMessage(msg, uptimeMillis);    }    boolean enqueueMessage(Message msg, long when) {            //...            msg.markInUse();            msg.when = when;            Message p = mMessages;            boolean needWake;            if (p == null || when == 0 || when < p.when) {                // New head, wake up the event queue if blocked.                msg.next = p;                mMessages = msg;                needWake = mBlocked;            } else {                // Inserted within the middle of the queue.  Usually we don't have to wake                // up the event queue unless there is a barrier at the head of the queue                // and the message is the earliest asynchronous message in the queue.                needWake = mBlocked && p.target == null && msg.isAsynchronous();                Message prev;                for (;;) {                    prev = p;                    p = p.next;                    if (p == null || when < p.when) {                        break;                    }                    if (needWake && p.isAsynchronous()) {                        needWake = false;                    }                }                msg.next = p; // invariant: p == prev.next                prev.next = msg;            }            // We can assume mPtr != 0 because mQuitting is false.            if (needWake) {                nativeWake(mPtr);            }        }        return true;    }




我们从这里对Native层代码进行分析,MessageQueue 通过JNI调用到Native层的Looper.手上是Android7.1.2的代码,可以看到MessageQueue有6个JNI接口

static const JNINativeMethod gMessageQueueMethods[] = {    /* name, signature, funcPtr */    { "nativeInit", "()J", (void*)android_os_MessageQueue_nativeInit },    { "nativeDestroy", "(J)V", (void*)android_os_MessageQueue_nativeDestroy },    { "nativePollOnce", "(JI)V", (void*)android_os_MessageQueue_nativePollOnce },    { "nativeWake", "(J)V", (void*)android_os_MessageQueue_nativeWake },    { "nativeIsPolling", "(J)Z", (void*)android_os_MessageQueue_nativeIsPolling },    { "nativeSetFileDescriptorEvents", "(JII)V",            (void*)android_os_MessageQueue_nativeSetFileDescriptorEvents },};



static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) {    NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();    if (!nativeMessageQueue) {        jniThrowRuntimeException(env, "Unable to allocate native queue");        return 0;    }    nativeMessageQueue->incStrong(env);    return reinterpret_cast(nativeMessageQueue);}


NativeMessageQueue::NativeMessageQueue() :        mPollEnv(NULL), mPollObj(NULL), mExceptionObj(NULL) {    mLooper = Looper::getForThread();    if (mLooper == NULL) {        mLooper = new Looper(false);        Looper::setForThread(mLooper);    }}


sp<Looper> Looper::getForThread() {    int result = pthread_once(& gTLSOnce, initTLSKey);    LOG_ALWAYS_FATAL_IF(result != 0, "pthread_once failed");    return (Looper*)pthread_getspecific(gTLSKey);}void Looper::initTLSKey() {    int result = pthread_key_create(& gTLSKey, threadDestructor);    LOG_ALWAYS_FATAL_IF(result != 0, "Could not allocate TLS key.");}void Looper::setForThread(const sp<Looper>& looper) {    sp<Looper> old = getForThread(); // also has side-effect of initializing TLS    if (looper != NULL) {        looper->incStrong((void*)threadDestructor);    }    pthread_setspecific(gTLSKey, looper.get());    if (old != NULL) {        old->decStrong((void*)threadDestructor);    }}


Looper::Looper(bool allowNonCallbacks) :        mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false),        mPolling(false), mEpollFd(-1), mEpollRebuildRequired(false),        mNextRequestSeq(0), mResponseIndex(0), mNextMessageUptime(LLONG_MAX) {    mWakeEventFd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);    LOG_ALWAYS_FATAL_IF(mWakeEventFd < 0, "Could not make wake event fd: %s",                        strerror(errno));    AutoMutex _l(mLock);    rebuildEpollLocked();}


void Looper::rebuildEpollLocked() {    // Close old epoll instance if we have one.    if (mEpollFd >= 0) {#if DEBUG_CALLBACKS        ALOGD("%p ~ rebuildEpollLocked - rebuilding epoll set", this);#endif        close(mEpollFd);    }    // Allocate the new epoll instance and register the wake pipe.    mEpollFd = epoll_create(EPOLL_SIZE_HINT);    LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance: %s", strerror(errno));    struct epoll_event eventItem;    memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union    eventItem.events = EPOLLIN;    eventItem.data.fd = mWakeEventFd;    int result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeEventFd, & eventItem);    LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake event fd to epoll instance: %s",                        strerror(errno));    for (size_t i = 0; i < mRequests.size(); i++) {        const Request& request = mRequests.valueAt(i);        struct epoll_event eventItem;        request.initEventItem(&eventItem);        int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, request.fd, & eventItem);        if (epollResult < 0) {            ALOGE("Error adding epoll events for fd %d while rebuilding epoll set: %s",                  request.fd, strerror(errno));        }    }}



static void android_os_MessageQueue_nativeSetFileDescriptorEvents(JNIEnv* env, jclass clazz, jlong ptr, jint fd, jint events) {    NativeMessageQueue* nativeMessageQueue = reinterpret_cast(ptr);    nativeMessageQueue->setFileDescriptorEvents(fd, events);}


void NativeMessageQueue::setFileDescriptorEvents(int fd, int events) {    ALOGE("setFileDescriptorEvents[oeh]:fd=%d,events=%d",fd,events);    if (events) {        int looperEvents = 0;        if (events & CALLBACK_EVENT_INPUT) {            looperEvents |= Looper::EVENT_INPUT;        }        if (events & CALLBACK_EVENT_OUTPUT) {            looperEvents |= Looper::EVENT_OUTPUT;        }        mLooper->addFd(fd, Looper::POLL_CALLBACK, looperEvents, this,                reinterpret_cast<void*>(events));    } else {        mLooper->removeFd(fd);    }}


int Looper::addFd(int fd, int ident, int events, const sp& callback, void* data) {#if DEBUG_CALLBACKS    ALOGD("%p ~ addFd - fd=%d, ident=%d, events=0x%x, callback=%p, data=%p", this, fd, ident,            events, callback.get(), data);#endif    if (!callback.get()) {        if (! mAllowNonCallbacks) {            ALOGE("Invalid attempt to set NULL callback but not allowed for this looper.");            return -1;        }        if (ident < 0) {            ALOGE("Invalid attempt to set NULL callback with ident < 0.");            return -1;        }    } else {        ident = POLL_CALLBACK;    }    { // acquire lock        AutoMutex _l(mLock);        Request request;        request.fd = fd;        request.ident = ident;        request.events = events;        request.seq = mNextRequestSeq++;        request.callback = callback;        request.data = data;        if (mNextRequestSeq == -1) mNextRequestSeq = 0; // reserve sequence number -1        struct epoll_event eventItem;        request.initEventItem(&eventItem);        ssize_t requestIndex = mRequests.indexOfKey(fd);        if (requestIndex < 0) {            int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, fd, & eventItem);            if (epollResult < 0) {                ALOGE("Error adding epoll events for fd %d: %s", fd, strerror(errno));                return -1;            }            mRequests.add(fd, request);        } else {            int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_MOD, fd, & eventItem);            if (epollResult < 0) {                if (errno == ENOENT) {                    // Tolerate ENOENT because it means that an older file descriptor was                    // closed before its callback was unregistered and meanwhile a new                    // file descriptor with the same number has been created and is now                    // being registered for the first time.  This error may occur naturally                    // when a callback has the side-effect of closing the file descriptor                    // before returning and unregistering itself.  Callback sequence number                    // checks further ensure that the race is benign.                    //                    // Unfortunately due to kernel limitations we need to rebuild the epoll                    // set from scratch because it may contain an old file handle that we are                    // now unable to remove since its file descriptor is no longer valid.                    // No such problem would have occurred if we were using the poll system                    // call instead, but that approach carries others disadvantages.#if DEBUG_CALLBACKS                    ALOGD("%p ~ addFd - EPOLL_CTL_MOD failed due to file descriptor "                            "being recycled, falling back on EPOLL_CTL_ADD: %s",                            this, strerror(errno));#endif                    epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, fd, & eventItem);                    if (epollResult < 0) {                        ALOGE("Error modifying or adding epoll events for fd %d: %s",                                fd, strerror(errno));                        return -1;                    }                    scheduleEpollRebuildLocked();                } else {                    ALOGE("Error modifying epoll events for fd %d: %s", fd, strerror(errno));                    return -1;                }            }            mRequests.replaceValueAt(requestIndex, request);        }    } // release lock    return 1;}



static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,        jlong ptr, jint timeoutMillis) {    NativeMessageQueue* nativeMessageQueue = reinterpret_cast(ptr);    nativeMessageQueue->pollOnce(env, obj, timeoutMillis);}void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) {    mPollEnv = env;    mPollObj = pollObj;    mLooper->pollOnce(timeoutMillis);    mPollObj = NULL;    mPollEnv = NULL;    if (mExceptionObj) {        env->Throw(mExceptionObj);        env->DeleteLocalRef(mExceptionObj);        mExceptionObj = NULL;    }}


int Looper::pollInner(int timeoutMillis) {    //...    int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);    for (int i = 0; i < eventCount; i++) {        int fd = eventItems[i].data.fd;        uint32_t epollEvents = eventItems[i].events;        if (fd == mWakeEventFd) {            if (epollEvents & EPOLLIN) {                awoken();            } else {                ALOGW("Ignoring unexpected epoll events 0x%x on wake event fd.", epollEvents);            }        } else {            ssize_t requestIndex = mRequests.indexOfKey(fd);            if (requestIndex >= 0) {                int events = 0;                if (epollEvents & EPOLLIN) events |= EVENT_INPUT;                if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT;                if (epollEvents & EPOLLERR) events |= EVENT_ERROR;                if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP;                pushResponse(events, mRequests.valueAt(requestIndex));            } else {                ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is "                        "no longer registered.", epollEvents, fd);            }    //...}



static void android_os_MessageQueue_nativeWake(JNIEnv* env, jclass clazz, jlong ptr) {    NativeMessageQueue* nativeMessageQueue = reinterpret_cast(ptr);    nativeMessageQueue->wake();}void NativeMessageQueue::wake() {    mLooper->wake();}


void Looper::wake() {#if DEBUG_POLL_AND_WAKE    ALOGD("%p ~ wake", this);#endif    uint64_t inc = 1;    ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd, &inc, sizeof(uint64_t)));    if (nWrite != sizeof(uint64_t)) {        if (errno != EAGAIN) {            ALOGW("Could not write wake signal: %s", strerror(errno));        }    }}



static jboolean android_os_MessageQueue_nativeIsPolling(JNIEnv* env, jclass clazz, jlong ptr) {    NativeMessageQueue* nativeMessageQueue = reinterpret_cast(ptr);    return nativeMessageQueue->getLooper()->isPolling();}bool Looper::isPolling() const {    return mPolling;}



static void android_os_MessageQueue_nativeDestroy(JNIEnv* env, jclass clazz, jlong ptr) {    NativeMessageQueue* nativeMessageQueue = reinterpret_cast(ptr);    nativeMessageQueue->decStrong(env);}



