Android之Handler、Looper、MessageQueue源码分析.md
概述
Android中的Handler、Looper、MessageQueue是用来解决线程间通讯问题的一种机制.介绍它们的文章非常多,这里做一些学习和整理,帮助自己理解,也希望能方便大家.
一句话
“Looper不断获取MessageQueue中的Message,然后由Handler来处理”
一张图
先记住一句话和一张图,对这三者有一个感性的认识,后面会介绍一个简单的demo,通过demo来阅读Java和Native层的代码.
例子
做一个简单的demo程序:线程A给线程B发送消息.
class Thread_B extends Thread{ private final static String TAG = "Thread_B:"; public Handler mHandler; public Handler GetHandler(){ return mHandler; } public void run(){ Looper.prepare(); mHandler = new Handler(){ public void handleMessage(Message msg){ Log.d(TAG,"get msg=" + msg.what); } }; Looper.loop(); }}public class MainActivity extends AppCompatActivity { @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_main); //创建Thread_B线程,在run()方法中打印thread_a线程发送过来的消息. final Thread_B thread_b = new Thread_B(); thread_b.start(); //间隔500ms向thread_b发送消息. Thread thread_a = new Thread(){ public void run() { int what = 1; while(true){ try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } thread_b.GetHandler().sendEmptyMessage(what++); } } }; thread_a.start(); }}
线程B接收并打印线程A发送过来的数据,输出结果
“当你觉得轻松的时候,一定是有人在为你负重前行”,我们通过源码流程看一下它的实现原理
实现原理
JAVA层代码
接收方
上面代码中接收方Thread_B中主要代码如下
Looper.prepare(); mHandler = new Handler(){ public void handleMessage(Message msg){ Log.d(TAG,"get msg=" + msg.what); } }; Looper.loop();
prepare()为Looper类的静态方法
private static void prepare(boolean quitAllowed) { if (sThreadLocal.get() != null) { throw new RuntimeException("Only one Looper may be created per thread"); } sThreadLocal.set(new Looper(quitAllowed)); }
sThreadLocal为ThreadLocal类型的对象,在Looper中使用ThreadLocal的set/get方法,set方法用来建立当前线程与Looper对象的Map,get方法用当前线程来获取looper对象.prepare方法确保一个线程中只能存在一个Looper对象,如果没有存在则创建Looper对象,并通过set方法建立当前线程和Looper的键/值关系,如果存在则通过get方法获取.接着来看Looper的构造函数.
private Looper(boolean quitAllowed) { mQueue = new MessageQueue(quitAllowed); mThread = Thread.currentThread(); }
创建MessageQueue对象,用来存放各种消息接着看Handler的构造函数
//构造函数1 public Handler() { this(null, false); } //构造函数2 public Handler(Callback callback, boolean async) { //... mLooper = Looper.myLooper(); if (mLooper == null) { throw new RuntimeException( "Can't create handler inside thread that has not called Looper.prepare()"); } mQueue = mLooper.mQueue; mCallback = callback; mAsynchronous = async; }
构造函数1调用构造函数2,在构造函数2中,调用myLooper()方法获得prepare中创建的Looper对象和MessageQueue对象.接着看Looper.loop()和MessageQueue的next方法
public static void loop() { final Looper me = myLooper(); if (me == null) { throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread."); } final MessageQueue queue = me.mQueue; for (;;) { Message msg = queue.next(); // might block if (msg == null) { // No message indicates that the message queue is quitting. return; } //... msg.target.dispatchMessage(msg); //... } } Message next() { //... for (;;) { nativePollOnce(ptr, nextPollTimeoutMillis); synchronized (this) { // Try to retrieve the next message. Return if found. final long now = SystemClock.uptimeMillis(); Message prevMsg = null; Message msg = mMessages; if (msg != null && msg.target == null) { // Stalled by a barrier. Find the next asynchronous message in the queue. do { prevMsg = msg; msg = msg.next; } while (msg != null && !msg.isAsynchronous()); } if (msg != null) { if (now < msg.when) { // Next message is not ready. Set a timeout to wake up when it is ready. nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE); } else { // Got a message. mBlocked = false; if (prevMsg != null) { prevMsg.next = msg.next; } else { mMessages = msg.next; } msg.next = null; if (DEBUG) Log.v(TAG, "Returning message: " + msg); msg.markInUse(); return msg; } } else { // No more messages. nextPollTimeoutMillis = -1; } // Process the quit message now that all pending messages have been handled. if (mQuitting) { dispose(); return null; } //... }
loop()函数中将会一直循环的从MessageQueue中读取消息,当下一条消息为空时,将nextPollTimeoutMillis设置为-1,调用nativePollOnce方法,会一直处于block状态(放弃CPU的占用),等待新消息进入MessageQueue,当信消息来时会通过Native机制来唤醒.当loop()获取到新的消息后,调用msg.target所指向的对象(Handler)的dispatchMessage方法来分发消息.这里看一下Handler的dispatchMessage方法
public void dispatchMessage(Message msg) { if (msg.callback != null) { handleCallback(msg); } else { if (mCallback != null) { if (mCallback.handleMessage(msg)) { return; } } handleMessage(msg); } }
这里可以看到,执行处理的地方优先级从高到底分别是msg中的callback函数、handler对象的callback函数、handler对象的默认方法handlerMessage.上面示例代码是最后一种方式.
发送方
发送方的主要代码
thread_b.GetHandler().sendEmptyMessage(what++);
获取到接收方的handler对象,调用sendEmptyMessage接口向接收方发送数据,sendEmptyMessage代码如下
public final boolean sendEmptyMessage(int what) { return sendEmptyMessageDelayed(what, 0); } public final boolean sendEmptyMessageDelayed(int what, long delayMillis) { Message msg = Message.obtain(); msg.what = what; return sendMessageDelayed(msg, delayMillis); } public final boolean sendMessageDelayed(Message msg, long delayMillis) { if (delayMillis < 0) { delayMillis = 0; } return sendMessageAtTime(msg, SystemClock.uptimeMillis() + delayMillis); } public boolean sendMessageAtTime(Message msg, long uptimeMillis) { MessageQueue queue = mQueue; if (queue == null) { RuntimeException e = new RuntimeException( this + " sendMessageAtTime() called with no mQueue"); Log.w("Looper", e.getMessage(), e); return false; } return enqueueMessage(queue, msg, uptimeMillis); } private boolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis) { msg.target = this; if (mAsynchronous) { msg.setAsynchronous(true); } return queue.enqueueMessage(msg, uptimeMillis); } boolean enqueueMessage(Message msg, long when) { //... msg.markInUse(); msg.when = when; Message p = mMessages; boolean needWake; if (p == null || when == 0 || when < p.when) { // New head, wake up the event queue if blocked. msg.next = p; mMessages = msg; needWake = mBlocked; } else { // Inserted within the middle of the queue. Usually we don't have to wake // up the event queue unless there is a barrier at the head of the queue // and the message is the earliest asynchronous message in the queue. needWake = mBlocked && p.target == null && msg.isAsynchronous(); Message prev; for (;;) { prev = p; p = p.next; if (p == null || when < p.when) { break; } if (needWake && p.isAsynchronous()) { needWake = false; } } msg.next = p; // invariant: p == prev.next prev.next = msg; } // We can assume mPtr != 0 because mQuitting is false. if (needWake) { nativeWake(mPtr); } } return true; }
sendEmptyMessage接口发送数据,会经过sendEmptyMessageDelayed—>sendMessageDelayed—>sendMessageAtTime—>enqueueMessage,最终调用enqueueMessage方法将消息放到mMessages所保存的队列中,并在最后调用nativeWake方法,来唤醒Looper的loop方法.
Java层的Looper的生命周期如下所示
到这里按照上面示例对JAVA层的完整流程做了一个说明.总之Looper不断轮询MessageQueue中的消息,当获取到消息后,调用Handler中的方法进行处理.
Native层代码
MessageQueue是JAVA层这套机制的核心,Native层的Looper(JAVA层的Looper没有关系)在MessageQueue中发挥了重大作用..Native层的Looper类(Looper.cpp)内部又是依赖于epoll来实现,这个链接对epoll做一个简介,对理解Native层的Looper有一定帮助.
我们从这里对Native层代码进行分析,MessageQueue 通过JNI调用到Native层的Looper.手上是Android7.1.2的代码,可以看到MessageQueue有6个JNI接口
static const JNINativeMethod gMessageQueueMethods[] = { /* name, signature, funcPtr */ { "nativeInit", "()J", (void*)android_os_MessageQueue_nativeInit }, { "nativeDestroy", "(J)V", (void*)android_os_MessageQueue_nativeDestroy }, { "nativePollOnce", "(JI)V", (void*)android_os_MessageQueue_nativePollOnce }, { "nativeWake", "(J)V", (void*)android_os_MessageQueue_nativeWake }, { "nativeIsPolling", "(J)Z", (void*)android_os_MessageQueue_nativeIsPolling }, { "nativeSetFileDescriptorEvents", "(JII)V", (void*)android_os_MessageQueue_nativeSetFileDescriptorEvents },};
接下用这六个接口来分析MessageQueue
接口1:android_os_MessageQueue_nativeInit
static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) { NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue(); if (!nativeMessageQueue) { jniThrowRuntimeException(env, "Unable to allocate native queue"); return 0; } nativeMessageQueue->incStrong(env); return reinterpret_cast(nativeMessageQueue);}
创建NativeMessageQueue对象,并增加引用计数.
NativeMessageQueue::NativeMessageQueue() : mPollEnv(NULL), mPollObj(NULL), mExceptionObj(NULL) { mLooper = Looper::getForThread(); if (mLooper == NULL) { mLooper = new Looper(false); Looper::setForThread(mLooper); }}
在构造函数中,如果线程已有Looper则直接获取,如果没有则创建,这个过程主要由下面三个函数组成
sp<Looper> Looper::getForThread() { int result = pthread_once(& gTLSOnce, initTLSKey); LOG_ALWAYS_FATAL_IF(result != 0, "pthread_once failed"); return (Looper*)pthread_getspecific(gTLSKey);}void Looper::initTLSKey() { int result = pthread_key_create(& gTLSKey, threadDestructor); LOG_ALWAYS_FATAL_IF(result != 0, "Could not allocate TLS key.");}void Looper::setForThread(const sp<Looper>& looper) { sp<Looper> old = getForThread(); // also has side-effect of initializing TLS if (looper != NULL) { looper->incStrong((void*)threadDestructor); } pthread_setspecific(gTLSKey, looper.get()); if (old != NULL) { old->decStrong((void*)threadDestructor); }}
initTLSKey(pthread_once确保此函数只会被执行一次)调用pthread_key_create创建gTLSKey键,setForThread中调用pthread_setspecific将gTLSKey与Looper绑定,如果gTLSKey对应旧的Looper则用新的Looper替换,之后通过pthread_getspecific来获取gTLSKey对应的Looper.(这个过程与Looper.java中sThreadLocal功能类似),接着看一下Looper的构造函数
Looper::Looper(bool allowNonCallbacks) : mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false), mPolling(false), mEpollFd(-1), mEpollRebuildRequired(false), mNextRequestSeq(0), mResponseIndex(0), mNextMessageUptime(LLONG_MAX) { mWakeEventFd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); LOG_ALWAYS_FATAL_IF(mWakeEventFd < 0, "Could not make wake event fd: %s", strerror(errno)); AutoMutex _l(mLock); rebuildEpollLocked();}
调用eventfd创建专门用来唤醒Looper的mWakeEventFd,后面唤醒事件依赖它.接着调用rebuildEpollLocked.
void Looper::rebuildEpollLocked() { // Close old epoll instance if we have one. if (mEpollFd >= 0) {#if DEBUG_CALLBACKS ALOGD("%p ~ rebuildEpollLocked - rebuilding epoll set", this);#endif close(mEpollFd); } // Allocate the new epoll instance and register the wake pipe. mEpollFd = epoll_create(EPOLL_SIZE_HINT); LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance: %s", strerror(errno)); struct epoll_event eventItem; memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union eventItem.events = EPOLLIN; eventItem.data.fd = mWakeEventFd; int result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeEventFd, & eventItem); LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake event fd to epoll instance: %s", strerror(errno)); for (size_t i = 0; i < mRequests.size(); i++) { const Request& request = mRequests.valueAt(i); struct epoll_event eventItem; request.initEventItem(&eventItem); int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, request.fd, & eventItem); if (epollResult < 0) { ALOGE("Error adding epoll events for fd %d while rebuilding epoll set: %s", request.fd, strerror(errno)); } }}
重新创建epoll实例,通过EPOLL_CTL_ADD将要被监控的fd全部添加到epoll当中.
接口2:接口android_os_MessageQueue_nativeSetFileDescriptorEvents
static void android_os_MessageQueue_nativeSetFileDescriptorEvents(JNIEnv* env, jclass clazz, jlong ptr, jint fd, jint events) { NativeMessageQueue* nativeMessageQueue = reinterpret_cast(ptr); nativeMessageQueue->setFileDescriptorEvents(fd, events);}
调用NativeMessageQueue的setFileDescriptorEvents方法,来添加所要监控的文件描述符.
void NativeMessageQueue::setFileDescriptorEvents(int fd, int events) { ALOGE("setFileDescriptorEvents[oeh]:fd=%d,events=%d",fd,events); if (events) { int looperEvents = 0; if (events & CALLBACK_EVENT_INPUT) { looperEvents |= Looper::EVENT_INPUT; } if (events & CALLBACK_EVENT_OUTPUT) { looperEvents |= Looper::EVENT_OUTPUT; } mLooper->addFd(fd, Looper::POLL_CALLBACK, looperEvents, this, reinterpret_cast<void*>(events)); } else { mLooper->removeFd(fd); }}
setFileDescriptorEvents函数调用mLooper的addFd方法来添加文件描述符,调用removeFd方法来删除文件描述符.我们来看一下Looper的addFd函数
int Looper::addFd(int fd, int ident, int events, const sp& callback, void* data) {#if DEBUG_CALLBACKS ALOGD("%p ~ addFd - fd=%d, ident=%d, events=0x%x, callback=%p, data=%p", this, fd, ident, events, callback.get(), data);#endif if (!callback.get()) { if (! mAllowNonCallbacks) { ALOGE("Invalid attempt to set NULL callback but not allowed for this looper."); return -1; } if (ident < 0) { ALOGE("Invalid attempt to set NULL callback with ident < 0."); return -1; } } else { ident = POLL_CALLBACK; } { // acquire lock AutoMutex _l(mLock); Request request; request.fd = fd; request.ident = ident; request.events = events; request.seq = mNextRequestSeq++; request.callback = callback; request.data = data; if (mNextRequestSeq == -1) mNextRequestSeq = 0; // reserve sequence number -1 struct epoll_event eventItem; request.initEventItem(&eventItem); ssize_t requestIndex = mRequests.indexOfKey(fd); if (requestIndex < 0) { int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, fd, & eventItem); if (epollResult < 0) { ALOGE("Error adding epoll events for fd %d: %s", fd, strerror(errno)); return -1; } mRequests.add(fd, request); } else { int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_MOD, fd, & eventItem); if (epollResult < 0) { if (errno == ENOENT) { // Tolerate ENOENT because it means that an older file descriptor was // closed before its callback was unregistered and meanwhile a new // file descriptor with the same number has been created and is now // being registered for the first time. This error may occur naturally // when a callback has the side-effect of closing the file descriptor // before returning and unregistering itself. Callback sequence number // checks further ensure that the race is benign. // // Unfortunately due to kernel limitations we need to rebuild the epoll // set from scratch because it may contain an old file handle that we are // now unable to remove since its file descriptor is no longer valid. // No such problem would have occurred if we were using the poll system // call instead, but that approach carries others disadvantages.#if DEBUG_CALLBACKS ALOGD("%p ~ addFd - EPOLL_CTL_MOD failed due to file descriptor " "being recycled, falling back on EPOLL_CTL_ADD: %s", this, strerror(errno));#endif epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, fd, & eventItem); if (epollResult < 0) { ALOGE("Error modifying or adding epoll events for fd %d: %s", fd, strerror(errno)); return -1; } scheduleEpollRebuildLocked(); } else { ALOGE("Error modifying epoll events for fd %d: %s", fd, strerror(errno)); return -1; } } mRequests.replaceValueAt(requestIndex, request); } } // release lock return 1;}
如果该请求时间的文件描述符已经存在,则使用EPOLL_CTL_MOD参数对文件描述符进行修改,如果不存在,则调用EPOLL_CTL_ADD来添加文件描述符,实现监听.
接口3:android_os_MessageQueue_nativePollOnce
static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj, jlong ptr, jint timeoutMillis) { NativeMessageQueue* nativeMessageQueue = reinterpret_cast(ptr); nativeMessageQueue->pollOnce(env, obj, timeoutMillis);}void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) { mPollEnv = env; mPollObj = pollObj; mLooper->pollOnce(timeoutMillis); mPollObj = NULL; mPollEnv = NULL; if (mExceptionObj) { env->Throw(mExceptionObj); env->DeleteLocalRef(mExceptionObj); mExceptionObj = NULL; }}
调用Native层Looper的pollOnce,pollOnce里主要是调用pollInner,
int Looper::pollInner(int timeoutMillis) { //... int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis); for (int i = 0; i < eventCount; i++) { int fd = eventItems[i].data.fd; uint32_t epollEvents = eventItems[i].events; if (fd == mWakeEventFd) { if (epollEvents & EPOLLIN) { awoken(); } else { ALOGW("Ignoring unexpected epoll events 0x%x on wake event fd.", epollEvents); } } else { ssize_t requestIndex = mRequests.indexOfKey(fd); if (requestIndex >= 0) { int events = 0; if (epollEvents & EPOLLIN) events |= EVENT_INPUT; if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT; if (epollEvents & EPOLLERR) events |= EVENT_ERROR; if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP; pushResponse(events, mRequests.valueAt(requestIndex)); } else { ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is " "no longer registered.", epollEvents, fd); } //...}
epoll_wait阻塞等待监听,当所监控的文件描述符有事件到来时,进行处理.(Java层MessageQueue调用next时,如果下一条消息为空,下发的timeoutMillis为-1,将会一直阻塞等待唤醒)
接口4:android_os_MessageQueue_nativeWake
static void android_os_MessageQueue_nativeWake(JNIEnv* env, jclass clazz, jlong ptr) { NativeMessageQueue* nativeMessageQueue = reinterpret_cast(ptr); nativeMessageQueue->wake();}void NativeMessageQueue::wake() { mLooper->wake();}
android_os_MessageQueue_nativeWake用来调用Looper的wake接口,epoll_wait.
void Looper::wake() {#if DEBUG_POLL_AND_WAKE ALOGD("%p ~ wake", this);#endif uint64_t inc = 1; ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd, &inc, sizeof(uint64_t))); if (nWrite != sizeof(uint64_t)) { if (errno != EAGAIN) { ALOGW("Could not write wake signal: %s", strerror(errno)); } }}
往Looper构造时创建的mWakeEventFd写入数据,来达到唤醒Looper的目的.
接口5:android_os_MessageQueue_nativeIsPolling
static jboolean android_os_MessageQueue_nativeIsPolling(JNIEnv* env, jclass clazz, jlong ptr) { NativeMessageQueue* nativeMessageQueue = reinterpret_cast(ptr); return nativeMessageQueue->getLooper()->isPolling();}bool Looper::isPolling() const { return mPolling;}
获取Looper的状态,Looper在调用epoll_wait之前为true,在epoll_wait之后(有事件触发)为false.
接口6:android_os_MessageQueue_nativeDestroy
static void android_os_MessageQueue_nativeDestroy(JNIEnv* env, jclass clazz, jlong ptr) { NativeMessageQueue* nativeMessageQueue = reinterpret_cast(ptr); nativeMessageQueue->decStrong(env);}
减少NativeMessageQueue引用计数.
参考资料
stackoverflow
what-is-the-purpose-of-looper-and-how-to-use-it
android-looper-and-toast-from
android_developer
Communicate with the UI thread
android-ui-thread
<<深入理解Android内核设计思想>>
更多相关文章
- Android(安卓)MVP 模式 项目初体验(一)
- 学习Android闹钟源代码(三)-AlarmClock类分析(part1)
- adb通过wifi连接方法
- Android抓包方法(一)之Fiddler代理
- Android(安卓)消息机制(Handler Looper Message )理解
- Android信息推送—AndroidPN的学习(上)
- Android(安卓)GPRS的自动打开与关闭
- 控制android弹出框不消失(用到反射的方法)
- Android(安卓)Native程序crash的一些定位方法简介