Android Handler机制10之Native的实现
Android Handler机制系列文章整体内容如下:
- Android Handler机制1之Thread
- Android Handler机制2之ThreadLocal
- Android Handler机制3之SystemClock类
- Android Handler机制4之Looper与Handler简介
- Android Handler机制5之Message简介与消息对象对象池
- Android Handler机制6之MessageQueue简介
- Android Handler机制7之消息发送
- Android Handler机制8之消息的取出与消息的其他操作
- Android Handler机制9之Handler的Native实现前奏之Linux IO多路复用
- Android Handler机制10之Handdler的Native实现Native的实现
- Android Handler机制11之Handler机制总结
- Android Handler机制12之Callable、Future和FutureTask
- Android Handler机制13之AsyncTask源码解析
一、简述
前面的文章讲解了Java层的消息处理机制,其中MessageQueue类里面涉及到的多个Native方法,除了MessageQueue的native方法,native本身也有一套完整的消息机制,处理native消息。在整个消息机制中,MessageQueue是连接Java层和Native层的纽带,换而言之,Java层可以向MessageQueue消息队列中添加消息,Native层也可以向MessageQueue消息队列中添加消息。
Native的层关系图:
Native关系图.png二、MessageQueue
在MessageQueue的native方法如下:
private native static long nativeInit(); private native static void nativeDestroy(long ptr); private native void nativePollOnce(long ptr, int timeoutMillis); /*non-static for callbacks*/ private native static void nativeWake(long ptr); private native static boolean nativeIsPolling(long ptr); private native static void nativeSetFileDescriptorEvents(long ptr, int fd, int events);
在Android Handler机制6之MessageQueue简介中的五、native层代码的初始化中 说了MessaegQueue构造函数调用了nativeInit(),为了更好的理解,我们便从MessageQueue构造函数开始说起
(一)、 nativeInit() 函数
nativeInit() 的主要作用是初始化,是在MessageQueue的构造函数中调用
代码在MessageQueue.java 68行
MessageQueue(boolean quitAllowed) { mQuitAllowed = quitAllowed; // 通过JNI调用了Native层的相关函数,导致了NativeMessageQueue的创建 mPtr = nativeInit(); }
MessageQueue只是有一个构造函数,该构造函数是包内可见的,其内部就两行代码,分别是设置了MessageQueue是否可以退出和native层代码的相关初始化
在MessageQueue的构造函数里面调用 nativeInit(),我们来看下
代码在MessageQueue.java 61行
private native static long nativeInit();
根据Android跨进程通信IPC之3——关于"JNI"的那些事中知道,nativeInit这个native方法对应的是android_os_MessageQueue.cpp里面的android_os_MessageQueue_nativeInit(JNIEnv* , jclass )函数
1、jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz)方法
代码在android_os_MessageQueue.cpp 172 行
static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) { // 在Native层又创建了NativeMessageQueue NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue(); if (!nativeMessageQueue) { jniThrowRuntimeException(env, "Unable to allocate native queue"); return 0; } nativeMessageQueue->incStrong(env); // 这里返回值是Java层的mPtr,因此mPtr实际上是Java层MessageQueue与NativeMessesageQueue的桥梁 return reinterpret_cast(nativeMessageQueue);}
此时Java层和Native层的MessageQueue被mPtr连接起来了,NativeMessageQueue只是Java层MessageQueue在Native层的体现,其本身并没有实现Queue的数据结构,而是从其父类MessageQueue中继承mLooper变量。与Java层类型,这个Looper也是线程级别的单列。
代码中是直接new 的NativeMessageQueue无参构造函数,那我们那就来看下
2、NativeMessageQueue无参构造函数
NativeMessageQueue是android_os_MessageQueue.cpp的内部类
代码在android_os_MessageQueue.cpp 78行
NativeMessageQueue::NativeMessageQueue() : mPollEnv(NULL), mPollObj(NULL), mExceptionObj(NULL) { // 获取TLS中的Looper对象 mLooper = Looper::getForThread(); if (mLooper == NULL) { // 创建native层的Looper对象 mLooper = new Looper(false); // 保存native 层的Looper到TLS中(线程级单例) Looper::setForThread(mLooper); }}
- Looper::getForThread():功能类比于Java层的Looper.myLooper();
- Looper::setForThread(mLooper):功能类比于Java层的ThreadLocal.set()
通过上述代码我们知道:
- 1、Java层的Looper的创建导致了MessageQueue的创建,而在Native层则刚刚相反,NativeMessageQueue的创建导致了Looper的创建
- 2、MessageQueue是在Java层与Native层有着紧密的联系,但是此次Native层的Looper与Java层的Looper没有任何关系。
- 3、Native层的Looper创建和Java层的也完全不一样,它利用了Linux的epoll机制检测了Input的fd和唤醒fd。从功能上来讲,这个唤醒fd才是真正处理Java Message和Native Message的钥匙。
PS:5.0以上的版本Loooer定义在System/core下
上面说了半天,那我们就来看下Native的Looper的构造函数
3、 Native层的Looper的构造函数
代码在Looper.cpp 71行
Looper::Looper(bool allowNonCallbacks) : mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false), mPolling(false), mEpollFd(-1), mEpollRebuildRequired(false), mNextRequestSeq(0), mResponseIndex(0), mNextMessageUptime(LLONG_MAX) { /** 这才是Linux后来才有的东西,负责线程通信,替换了老版本的pipe */ //构造唤醒时间的fd mWakeEventFd = eventfd(0, EFD_NONBLOCK); AutoMutex _l(mLock); rebuildEpollLocked(); }
这个方法重点就是调用了rebuildEpollLocked(); 函数
PS:这里说一下eventfd,event具体与pipe有点像,用来完成两个线程之间(现在也支持进程级别),能够用来作为线程之间通讯,类似于pthread_cond_t。
4、 Looper::rebuildEpollLocked() 函数
代码在Looper.cpp 140行
void Looper::rebuildEpollLocked() { // Close old epoll instance if we have one. if (mEpollFd >= 0) {#if DEBUG_CALLBACKS ALOGD("%p ~ rebuildEpollLocked - rebuilding epoll set", this);#endif // 关闭老的epoll实例 close(mEpollFd); } // Allocate the new epoll instance and register the wake pipe. // 创建一个epoll实例,并注册wake管道。 mEpollFd = epoll_create(EPOLL_SIZE_HINT); LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance. errno=%d", errno); struct epoll_event eventItem; // 清空,把未使用的数据区域进行置0操作 memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union //关注EPOLLIN事件,也就是可读 eventItem.events = EPOLLIN; //设置Fd eventItem.data.fd = mWakeEventFd; //将唤醒事件(mWakeEventFd)添加到epoll实例(mEpollFd),其实只是为epoll放置一个唤醒机制 int result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeEventFd, & eventItem); LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake event fd to epoll instance. errno=%d", errno); // 这里主要添加的是Input事件,如键盘、传感器输入,这里基本上是由系统负责。 for (size_t i = 0; i < mRequests.size(); i++) { const Request& request = mRequests.valueAt(i); struct epoll_event eventItem; request.initEventItem(&eventItem); // 将request的队列事件,分别添加到epoll实例 int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, request.fd, & eventItem); if (epollResult < 0) { ALOGE("Error adding epoll events for fd %d while rebuilding epoll set, errno=%d", request.fd, errno); } }
这里一定要明白的是,添加这些fd除了mWakeEventFd负责解除阻塞让程序继续运行,从而处理Native Message和Java Message外,其他fd与Message的处理其实毫无关系。此时Java层与Native联系如下:
Java与Native.png这时候大家可能有点蒙,所以我下面补充1个知识点,希望能帮助大家
8、 小结
所有整个流程整理如下图:
流程.png
(二) nativeDestroy()
nativeDestroy是在MessageQueue的dispose()方法中调用,主要用于清空回收
代码在MessageQueue.java 84行
// Disposes of the underlying message queue. // Must only be called on the looper thread or the finalizer. private void dispose() { if (mPtr != 0) { // native方法 nativeDestroy(mPtr); mPtr = 0; } }
根据Android跨进程通信IPC之3——关于"JNI"的那些事中知道,nativeDestroy()这个native方法对应的是android_os_MessageQueue.cpp里面的android_os_MessageQueue_nativeDestroy()函数
1、android_os_MessageQueue_nativeDestroy()函数
代码在MessageQueue.java 183行
static void android_os_MessageQueue_nativeDestroy(JNIEnv* env, jclass clazz, jlong ptr) { // 强制类型转换为nativeMessageQueue NativeMessageQueue* nativeMessageQueue = reinterpret_cast(ptr); //调用nativeMessageQueue的decStrong()函数 nativeMessageQueue->decStrong(env);}
我们看到上面代码是
- 首先,将Java层传递下来的mPtr转换为nativeMessageQueue
- 其次,nativeMessageQueue调用decStrong(env)
nativeMessageQueue继承自RefBase类,所以decStrong最终调用的是RefBase.decStrong()。
Android跨进程通信IPC之4——AndroidIPC基础2的第五部分五、智能指针,中对智能指针有详细描述,这里就不过多介绍了
2、总体流程图
nativeDestroy()流程.png(三) nativePollOnce()
nativePollOnce()是在MessageQueue的next()方法中调用,用于提取消息的调用链
代码在MessageQueue.java 323行
Message next() { final long ptr = mPtr; if (ptr == 0) { return null; } for (;;) { ... //阻塞操作 nativePollOnce(ptr, nextPollTimeoutMillis); ... }
根据Android跨进程通信IPC之3——关于"JNI"的那些事中知道,nativeDestroy()这个native方法对应的是android_os_MessageQueue.cpp里面的android_os_MessageQueue_nativePollOnce()函数
1、nativePollOnce()
代码在MessageQueue.java 188行
static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj, jlong ptr, jint timeoutMillis) { //将Java层传递下来的mPtr转换为nativeMessageQueue NativeMessageQueue* nativeMessageQueue = reinterpret_cast(ptr); nativeMessageQueue->pollOnce(env, obj, timeoutMillis); }
我们看到上面代码是
- 首先,将Java层传递下来的mPtr转换为nativeMessageQueue
- 其次,nativeMessageQueue调用pollOnce(env, obj, timeoutMillis)
那我们就来看下pollOnce(env, obj, timeoutMillis)方法
2、 NativeMessageQueue::pollOnce(JNIEnv*, jobject, int)函数
void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) { mPollEnv = env; mPollObj = pollObj; // 重点函数 mLooper->pollOnce(timeoutMillis); mPollObj = NULL; mPollEnv = NULL; if (mExceptionObj) { env->Throw(mExceptionObj); env->DeleteLocalRef(mExceptionObj); mExceptionObj = NULL; }}
这个函数内容很简答, 主要就是进行赋值,并调用pollOnce(timeoutMillis)
那我们再来看一下pollOnce(timeoutMillis)函数
3、Looper::pollOnce()函数
代码在Looper.h 264 行
inline int pollOnce(int timeoutMillis) { return pollOnce(timeoutMillis, NULL, NULL, NULL); }
这个函数里面主要是调用的是ollOnce(timeoutMillis, NULL, NULL, NULL);
4、Looper::pollOnce(int, int, int, void**)函数
代码在Looper.cpp 264 行
int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) { int result = 0; // 对fd对应的Responses进行处理,后面发现Response里都是活动fd for (;;) { // 先处理没有Callback的Response事件 while (mResponseIndex < mResponses.size()) { const Response& response = mResponses.itemAt(mResponseIndex++); int ident = response.request.ident; if (ident >= 0) { // ident>=0则表示没有callback,因为POLL_CALLBACK=-2 int fd = response.request.fd; int events = response.events; void* data = response.request.data;#if DEBUG_POLL_AND_WAKE ALOGD("%p ~ pollOnce - returning signalled identifier %d: " "fd=%d, events=0x%x, data=%p", this, ident, fd, events, data);#endif if (outFd != NULL) *outFd = fd; if (outEvents != NULL) *outEvents = events; if (outData != NULL) *outData = data; return ident; } } // 注意这里处于循环内部,改变result的值在后面的pollInner if (result != 0) {#if DEBUG_POLL_AND_WAKE ALOGD("%p ~ pollOnce - returning result %d", this, result);#endif if (outFd != NULL) *outFd = 0; if (outEvents != NULL) *outEvents = 0; if (outData != NULL) *outData = NULL; return result; } // 再处理内部轮训 result = pollInner(timeoutMillis); }}
参数说明:
- timeoutMillis:超时时长
- outFd:发生事件的文件描述符
- outEvents:当前outFd上发生的事件,包含以下4类事件
- EVENT_INPUT:可读
- EVENT_OUTPUT:可写
- EVENT_ERROR:错误
- EVENT_HANGUP:中断
- outData:上下文数据
这个函数内部最后调用了pollInner(int),让我们来看一下
5、Looper::pollInner()函数
代码在Looper.cpp 220 行
int Looper::pollInner(int timeoutMillis) {#if DEBUG_POLL_AND_WAKE ALOGD("%p ~ pollOnce - waiting: timeoutMillis=%d", this, timeoutMillis);#endif // Adjust the timeout based on when the next message is due. if (timeoutMillis != 0 && mNextMessageUptime != LLONG_MAX) { nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC); int messageTimeoutMillis = toMillisecondTimeoutDelay(now, mNextMessageUptime); if (messageTimeoutMillis >= 0 && (timeoutMillis < 0 || messageTimeoutMillis < timeoutMillis)) { timeoutMillis = messageTimeoutMillis; }#if DEBUG_POLL_AND_WAKE ALOGD("%p ~ pollOnce - next message in %" PRId64 "ns, adjusted timeout: timeoutMillis=%d", this, mNextMessageUptime - now, timeoutMillis);#endif } // Poll. int result = POLL_WAKE; mResponses.clear(); mResponseIndex = 0; // We are about to idle. // 即将处于idle状态 mPolling = true; // fd最大的个数是16 struct epoll_event eventItems[EPOLL_MAX_EVENTS]; // 等待时间发生或者超时,在nativeWake()方法,向管道写端写入字符,则方法会返回。 int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis); // No longer idling. // 不再处于idle状态 mPolling = false; // 请求锁 ,因为在Native Message的处理和添加逻辑上需要同步 // Acquire lock. mLock.lock(); // Rebuild epoll set if needed. // 如果需要,重建epoll if (mEpollRebuildRequired) { mEpollRebuildRequired = false; // epoll重建,直接跳转到Done rebuildEpollLocked(); goto Done; } // Check for poll error. if (eventCount < 0) { if (errno == EINTR) { goto Done; } ALOGW("Poll failed with an unexpected error, errno=%d", errno); // epoll事件个数小于0,发生错误,直接跳转Done result = POLL_ERROR; goto Done; } // Check for poll timeout. //如果需要,重建epoll if (eventCount == 0) { //epoll事件个数等于0,发生超时,直接跳转Done#if DEBUG_POLL_AND_WAKE ALOGD("%p ~ pollOnce - timeout", this);#endif result = POLL_TIMEOUT; goto Done; } // Handle all events.#if DEBUG_POLL_AND_WAKE ALOGD("%p ~ pollOnce - handling events from %d fds", this, eventCount);#endif // 循环处理所有的事件 for (int i = 0; i < eventCount; i++) { int fd = eventItems[i].data.fd; uint32_t epollEvents = eventItems[i].events; //首先处理mWakeEventFd if (fd == mWakeEventFd) { //如果是唤醒mWakeEventFd有反应 if (epollEvents & EPOLLIN) { /**重点代码*/ // 已经唤醒了,则读取并清空管道数据 awoken(); // 该函数内部就是read,从而使FD可读状态被清除 } else { ALOGW("Ignoring unexpected epoll events 0x%x on wake event fd.", epollEvents); } } else { // 其他input fd处理,其实就是将活动放入response队列,等待处理 ssize_t requestIndex = mRequests.indexOfKey(fd); if (requestIndex >= 0) { int events = 0; if (epollEvents & EPOLLIN) events |= EVENT_INPUT; if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT; if (epollEvents & EPOLLERR) events |= EVENT_ERROR; if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP; // 处理request,生成对应的response对象,push到响应数组 pushResponse(events, mRequests.valueAt(requestIndex)); } else { ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is " "no longer registered.", epollEvents, fd); } } }Done: ; // Invoke pending message callbacks. // 再处理Native的Message,调用相应回调方法 mNextMessageUptime = LLONG_MAX; while (mMessageEnvelopes.size() != 0) { nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC); const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0); if (messageEnvelope.uptime <= now) { // Remove the envelope from the list. // We keep a strong reference to the handler until the call to handleMessage // finishes. Then we drop it so that the handler can be deleted *before* // we reacquire our lock. { // obtain handler sp handler = messageEnvelope.handler; Message message = messageEnvelope.message; mMessageEnvelopes.removeAt(0); mSendingMessage = true; // 释放锁 mLock.unlock();#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS ALOGD("%p ~ pollOnce - sending message: handler=%p, what=%d", this, handler.get(), message.what);#endif // 处理消息事件 handler->handleMessage(message); } // release handler // 请求锁 mLock.lock(); mSendingMessage = false; // 发生回调 result = POLL_CALLBACK; } else { // The last message left at the head of the queue determines the next wakeup time. mNextMessageUptime = messageEnvelope.uptime; break; } } // Release lock. // 释放锁 mLock.unlock(); // Invoke all response callbacks. // 处理带有Callback()方法的response事件,执行Response相应的回调方法 for (size_t i = 0; i < mResponses.size(); i++) { Response& response = mResponses.editItemAt(i); if (response.request.ident == POLL_CALLBACK) { int fd = response.request.fd; int events = response.events; void* data = response.request.data;#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS ALOGD("%p ~ pollOnce - invoking fd event callback %p: fd=%d, events=0x%x, data=%p", this, response.request.callback.get(), fd, events, data);#endif // Invoke the callback. Note that the file descriptor may be closed by // the callback (and potentially even reused) before the function returns so // we need to be a little careful when removing the file descriptor afterwards. // 处理请求的回调方法 int callbackResult = response.request.callback->handleEvent(fd, events, data); if (callbackResult == 0) { // 移除fd removeFd(fd, response.request.seq); } // Clear the callback reference in the response structure promptly because we // will not clear the response vector itself until the next poll. // 清除response引用的回调方法 response.request.callback.clear(); // 发生回调 result = POLL_CALLBACK; } } return result;}
pollOnce返回值说明:
- POLL_WAKE: 表示由wake()出发,即pipe写端的write事件触发
- POLL_CALLBACK:表示某个被监听fd被触发
- POLL_TIMEOUT:表示等待超时
- POLL_ERROR:表示等待期间发生错误
pollInner()方法的处理流程:
- 1、先调用epoll_wait(),这是阻塞方法,用于等待事件发生或者超时。
- 2、对于epoll_wait()返回,当且仅当以下3种情况出现
- POLL_ERROR:发生错误,直接跳转Done
- POLL_TIMEOUT:发生超时,直接跳转到Done
- 检测到管道有事情发生,则再根据情况做相应处理:
- 如果检测到管道产生事件,则直接读取管道的数据
- 如果是其他事件,则处理request,生成对应的response对象,push到response数组
- 3、进入Done标记位的代码:
- 先处理Native的Message,调用Native的Handler来处理该Message
- 再处理Resposne数组,POLL_CALLBACK类型的事件
从上面的流程,可以发现对于Request先收集,一并放入response数组,而不是马上执行。真正在Done开始执行的时候,先处理Native Message,再处理Request,说明Native Message优先级高于Request请求的优先级。
PS:在polOnce()方法中,先处理Response数组不带Callback的事件,再调用了再调用了pollInner()函数。
6、Looper::awoken()函数
代码在Looper.cpp 418行
void Looper::awoken() {#if DEBUG_POLL_AND_WAKE ALOGD("%p ~ awoken", this);#endif uint64_t counter; // 不断的读取管道数据,目的就是为了清空管道内容 TEMP_FAILURE_RETRY(read(mWakeEventFd, &counter, sizeof(uint64_t)));}
7、小结
整体的流程图如下:
流程图.png
(四)、nativeDestroy()
nativeWake用于唤醒功能,在添加消息到消息队列enqueueMessage(),或者把消息从消息队列中全部移除quit(),再有需要时会调用nativeWake方法。包含唤醒过程的添加消息的调用链
下面来进一步来看看调用链的过程:
1、enqueueMessage(Message, long)
代码在MessageQueue.java 533行
boolean enqueueMessage(Message msg, long when) { .... //将Message按按时间插入MessageQueue if (needWake) { nativeWake(mPtr); } .... }
在向消息队列添加Message时,需要根据mBlocked情况来就决定是否需要调用nativeWake。
根据Android跨进程通信IPC之3——关于"JNI"的那些事中知道,nativeDestroy()这个native方法对应的是android_os_MessageQueue.cpp里面的android_os_MessageQueue_nativeWake(JNIEnv*, jclass, jlong ) 函数
2、android_os_MessageQueue_nativeWake()
代码在android_os_MessageQueue.cpp 194行
static void android_os_MessageQueue_nativeWake(JNIEnv* env, jclass clazz, jlong ptr) { // 将Java层传递下来的mPtr转换为nativeMessageQueue NativeMessageQueue* nativeMessageQueue = reinterpret_cast(ptr); //调用wake函数 nativeMessageQueue->wake();}
我们看到上面代码是
- 首先,将Java层传递下来的mPtr转换为nativeMessageQueue
- 其次,nativeMessageQueue调用wake()函数
3、NativeMessageQueue::wake()函数
代码在android_os_MessageQueue.cpp 121行
void NativeMessageQueue::wake() { mLooper->wake();}
这个方法很简单,就是直接调用Looper的wake()函数,
4、Looper::wake()函数
代码在Looper.cpp 404行
void Looper::wake() {#if DEBUG_POLL_AND_WAKE ALOGD("%p ~ wake", this);#endif uint64_t inc = 1; // 向管道mWakeEventFd写入字符1 ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd, &inc, sizeof(uint64_t))); if (nWrite != sizeof(uint64_t)) { if (errno != EAGAIN) { ALOGW("Could not write wake signal, errno=%d", errno); } }}
Looper类的 wake()函数只是往mWakeEventfd中写了一些内容,这个fd只是通知而已,类似于pipi,最后会把epoll_wai唤醒,线程就不阻塞了继续发送
Native层的消息,然后处理之前的addFd事件,然后处理Java层的消息。
PS:其中TEMP_FAILURE_RETRY 是一个宏定义,当执行write失败后,会不断重复执行,直到执行成功为止。
5、小结
总结一下流程图如下:
流程图.png(五)、sendMessage()
前面几篇文章讲述了Java层如何向MessageQueue类添加消息,那么接下来讲讲Native层如何向MessageQueue发送消息。
1、Looper::sendMessage(const sp& handler, const Message& message) 函数
代码在Looper.cpp 583行
void Looper::sendMessage(const sp& handler, const Message& message) { nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC); sendMessageAtTime(now, handler, message);}
我们看到方法里面调用了sendMessageAtTime(now, handler, message) 函数
2、 Looper::sendMessageDelayed(nsecs_t uptimeDelay, const sp& handler,
const Message& message)函数
代码在Looper.cpp 588行
void Looper::sendMessageDelayed(nsecs_t uptimeDelay, const sp& handler, const Message& message) { nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC); sendMessageAtTime(now + uptimeDelay, handler, message);}
我们看到方法里面调用了sendMessageAtTime(now, handler, message) 函数
所以我们说:
sendMessage()和sendMessageDelayed()都是调用sendMessageAtTime()来完成消息插入。
那我们就来看一下sendMessageAtTime()
3、 Looper::sendMessageDelayed(nsecs_t uptimeDelay, const sp& handler,
const Message& message)函数
void Looper::sendMessageAtTime(nsecs_t uptime, const sp& handler, const Message& message) {#if DEBUG_CALLBACKS ALOGD("%p ~ sendMessageAtTime - uptime=%" PRId64 ", handler=%p, what=%d", this, uptime, handler.get(), message.what);#endif size_t i = 0; { // acquire lock // 请求锁 AutoMutex _l(mLock); size_t messageCount = mMessageEnvelopes.size(); // 找到message应该插入的位置i while (i < messageCount && uptime >= mMessageEnvelopes.itemAt(i).uptime) { i += 1; } MessageEnvelope messageEnvelope(uptime, handler, message); mMessageEnvelopes.insertAt(messageEnvelope, i, 1); // Optimization: If the Looper is currently sending a message, then we can skip // the call to wake() because the next thing the Looper will do after processing // messages is to decide when the next wakeup time should be. In fact, it does // not even matter whether this code is running on the Looper thread. // 如果当前正在发送消息,那么不再调用wake(),直接返回 if (mSendingMessage) { return; } } // release lock // 释放锁 // Wake the poll loop only when we enqueue a new message at the head. // 当消息加入到消息队列的头部时,需要唤醒poll循环 if (i == 0) { wake(); }}
(六)、sendMessage()
本节介绍了MessageQueue的native()方法,经过层层调用:
- nativeInit()方法,最终实现由epoll机制中的epoll_create()/epoll_ctl()完成
- nativeDestory()方法,最终实现由RefBase::decStrong()完成
- nativePollOnce()方法,最终实现由Looper::pollOnce()完成
- nativeWake()方法,最终实现由Looper::wake()调用write方法,向管道写入字符
- nativeIsPolling(),nativeSetFileDescriptorEvents()这两个方法类似,此处就不一一列举了。
三、Native结构体和类
Looper.h/Looper.cpp文件中定义了Message结构体,消息处理类,回调类,Looper类
(一)、Message结构体
代码在(http://androidxref.com/6.0.1_r10/xref/system/core/include/utils/Looper.h) 50行
struct Message { Message() : what(0) { } Message(int what) : what(what) { } /* The message type. (interpretation is left up to the handler) */ // 消息类型 int what;};
(二)、消息处理类
1、MessageHandler类
代码在Looper.h 67行
/** * Interface for a Looper message handler. * * The Looper holds a strong reference to the message handler whenever it has * a message to deliver to it. Make sure to call Looper::removeMessages * to remove any pending messages destined for the handler so that the handler * can be destroyed. */class MessageHandler : public virtual RefBase {protected: virtual ~MessageHandler() { }public: /** * Handles a message. */ virtual void handleMessage(const Message& message) = 0;};
这个类很简单,就不多说了,这里说下注释:
- 处理Looper消息程序的接口。
- 当一个消息要传递给其对应的Handler时候,Looper持有一个消息Handler的强引用。在这个Handler销毁之前,请确保调用Looper :: removeMessages来删除待处理的消息。
2、WeakMessageHandler类
代码在Looper.h 82行
/** * A simple proxy that holds a weak reference to a message handler. */class WeakMessageHandler : public MessageHandler {protected: virtual ~WeakMessageHandler();public: WeakMessageHandler(const wp& handler); virtual void handleMessage(const Message& message);private: wp mHandler;};
这里并没有handleMessage的代码,我们是不是忽略了什么?再找一下,果然这块的代码在
Looper.cpp 38行
void WeakMessageHandler::handleMessage(const Message& message) { sp handler = mHandler.promote(); if (handler != NULL) { 调用Mes handler->handleMessage(message); }}
(三)、回调类
1、LooperCallback类
代码在Looper.h 98行
/** * A looper callback. */class LooperCallback : public virtual RefBase {protected: virtual ~LooperCallback() { }public: /** * Handles a poll event for the given file descriptor. * It is given the file descriptor it is associated with, * a bitmask of the poll events that were triggered (typically EVENT_INPUT), * and the data pointer that was originally supplied. * * Implementations should return 1 to continue receiving callbacks, or 0 * to have this file descriptor and callback unregistered from the looper. */ // 用于处理指定的文件描述符poll事件 virtual int handleEvent(int fd, int events, void* data) = 0;};
简单翻译一下handleEvent方法的注释:
- 处理给定文件描述符的轮训事件。
- 用来 将 最初提供的数据指针和轮训事件的掩码(通常为EVENT_INPUT)来关联的文件描述符。
- 实现子类如果想继续接收回调则返回1,如果未注册文件描述符和回调则返回0
2、SimpleLooperCallback类
代码在Looper.cpp 118行
class SimpleLooperCallback : public LooperCallback {protected: virtual ~SimpleLooperCallback();public: SimpleLooperCallback(Looper_callbackFunc callback); virtual int handleEvent(int fd, int events, void* data);private: Looper_callbackFunc mCallback;};
它和WeakMessageHandler类一样handleEvent的方法在Looper.cpp 55行
int SimpleLooperCallback::handleEvent(int fd, int events, void* data) { // 调用回调方法 return mCallback(fd, events, data); }
(四)、Looper类
1 、 Native层的Looper类简介
Looper.cpp
2 、 Native层的Looper类常量
// 每个epoll实例默认的文件描述符个数static const int EPOLL_SIZE_HINT = 8; // 轮训事件的文件描述符个数上限static const int EPOLL_MAX_EVENTS = 16;
3、Native Looper类的常用方法:
方法 | 解释 |
---|---|
Looper(bool) | Looper的构造函数 |
static sp | 如果该线程没有绑定Looper,才创建Loopr,否则直接返回 |
int pollOnec(int ,int* int,void) | 轮训,等待事件发生 |
void wake() | 唤醒Looper |
void sendMessage(const sp | 发送消息 |
int addFd(int,int,int,Looper_callbackFunc,void*) | 添加要监听的文件描述符fd |
4、Request、Resposne、MessageEvent 三个结构体
Looper类的内部定义了Request、Resposne、MessageEnvelope这三个结构体
关系图如下:
4.1、Request 结构体
代码在Looper.h 420行
// 请求结构体struct Request { int fd; int ident; int events; int seq; sp callback; void* data; void initEventItem(struct epoll_event* eventItem) const;};
4.2、Resposne 结构体
代码在Looper.h 431行
// 响应结构体struct Response { int events; Request request;};
4.3、MessageEnvelope 结构体
代码在Looper.h 436行
// 信封结构体struct MessageEnvelope { MessageEnvelope() : uptime(0) { } MessageEnvelope(nsecs_t uptime, const sp handler, const Message& message) : uptime(uptime), handler(handler), message(message) { } nsecs_t uptime; sp handler; Message message;};
MessageEnvelope正如其名字,信封。MessageEnvelope里面记录着收信人(handler),发信时间(uptime),信件内容(message)。
5、Native Looper类的类图如下:
类图.png6 Native Looper的监听文件描述符
Native Looper除了提供message机制外,还提供监听文件描述符的方式。通过addFd()接口加入需要被监听的文件描述符。
代码在Looper.cpp 434行
int addFd(int fd, int ident, int events, Looper_callbackFunc callback, void* data); int addFd(int fd, int ident, int events, const sp& callback, void* data);
其中:
- fd:为所需要监听的文件描述符
- ident:表示为当前发生时间的标识符,必须>=0,或者为POLL_CALLBACK(-2)如果指定了callback
- events:表示为要监听的文件类型,默认是EVENT_INPUT。
- callback:当有事件发生时,会回调该callback函数。
- data:两种使用方式:
- 指定callback来处理事件:当该文件描述符上有事件来时,该callback会被执行,然后从fd读取数据。这个时候ident是被忽略的。
- 通过指定的ident来处理事件:当该文件描述符有数据来到时,pollOnce()会返回一个ident,调用者会判断该ident是否等于自己需要处理事件ident,如果是的话,则开始处理事件。
(####) 五、Java层的addFd
我之前一直以为只能在C层的Looper中才能addFd,原来在Java层也通过JNI做了这个功能。我们可以在MessageQueue中的addOnFileDescriptorEventListener来实现这个功能。
代码在MessageQueue.java 186行
/** * Adds a file descriptor listener to receive notification when file descriptor * related events occur. * * If the file descriptor has already been registered, the specified events * and listener will replace any that were previously associated with it. * It is not possible to set more than one listener per file descriptor. *
* It is important to always unregister the listener when the file descriptor * is no longer of use. *
* * @param fd The file descriptor for which a listener will be registered. * @param events The set of events to receive: a combination of the * {@link OnFileDescriptorEventListener#EVENT_INPUT}, * {@link OnFileDescriptorEventListener#EVENT_OUTPUT}, and * {@link OnFileDescriptorEventListener#EVENT_ERROR} event masks. If the requested * set of events is zero, then the listener is unregistered. * @param listener The listener to invoke when file descriptor events occur. * * @see OnFileDescriptorEventListener * @see #removeOnFileDescriptorEventListener */ public void addOnFileDescriptorEventListener(@NonNull FileDescriptor fd, @OnFileDescriptorEventListener.Events int events, @NonNull OnFileDescriptorEventListener listener) { if (fd == null) { throw new IllegalArgumentException("fd must not be null"); } if (listener == null) { throw new IllegalArgumentException("listener must not be null"); } synchronized (this) { updateOnFileDescriptorEventListenerLocked(fd, events, listener); } }
通过上面代码分析,我们知道这里面有两个重点
- 1 onFileDescriptorEventListener 这个回调
- 2 updateOnFileDescriptorEventListenerLocked()方法
8.1、OnFileDescriptorEventListener
代码在MessageQueue.java 186行
/** * A listener which is invoked when file descriptor related events occur. */ public interface OnFileDescriptorEventListener { /** * File descriptor event: Indicates that the file descriptor is ready for input * operations, such as reading. * * The listener should read all available data from the file descriptor * then return true
to keep the listener active or false
* to remove the listener. *
* In the case of a socket, this event may be generated to indicate * that there is at least one incoming connection that the listener * should accept. *
* This event will only be generated if the {@link #EVENT_INPUT} event mask was * specified when the listener was added. *
*/ public static final int EVENT_INPUT = 1 << 0; /** * File descriptor event: Indicates that the file descriptor is ready for output * operations, such as writing. * * The listener should write as much data as it needs. If it could not * write everything at once, then it should return true
to * keep the listener active. Otherwise, it should return false
* to remove the listener then re-register it later when it needs to write * something else. *
* This event will only be generated if the {@link #EVENT_OUTPUT} event mask was * specified when the listener was added. *
*/ public static final int EVENT_OUTPUT = 1 << 1; /** * File descriptor event: Indicates that the file descriptor encountered a * fatal error. * * File descriptor errors can occur for various reasons. One common error * is when the remote peer of a socket or pipe closes its end of the connection. *
* This event may be generated at any time regardless of whether the * {@link #EVENT_ERROR} event mask was specified when the listener was added. *
*/ public static final int EVENT_ERROR = 1 << 2; /** @hide */ @Retention(RetentionPolicy.SOURCE) @IntDef(flag=true, value={EVENT_INPUT, EVENT_OUTPUT, EVENT_ERROR}) public @interface Events {} /** * Called when a file descriptor receives events. * * @param fd The file descriptor. * @param events The set of events that occurred: a combination of the * {@link #EVENT_INPUT}, {@link #EVENT_OUTPUT}, and {@link #EVENT_ERROR} event masks. * @return The new set of events to watch, or 0 to unregister the listener. * * @see #EVENT_INPUT * @see #EVENT_OUTPUT * @see #EVENT_ERROR */ @Events int onFileDescriptorEvents(@NonNull FileDescriptor fd, @Events int events); } private static final class FileDescriptorRecord { public final FileDescriptor mDescriptor; public int mEvents; public OnFileDescriptorEventListener mListener; public int mSeq; public FileDescriptorRecord(FileDescriptor descriptor, int events, OnFileDescriptorEventListener listener) { mDescriptor = descriptor; mEvents = events; mListener = listener; } }
8.2、updateOnFileDescriptorEventListenerLocked()方法
代码在MessageQueue.java 222行
private void updateOnFileDescriptorEventListenerLocked(FileDescriptor fd, int events, OnFileDescriptorEventListener listener) { final int fdNum = fd.getInt$(); int index = -1; FileDescriptorRecord record = null; if (mFileDescriptorRecords != null) { index = mFileDescriptorRecords.indexOfKey(fdNum); if (index >= 0) { record = mFileDescriptorRecords.valueAt(index); if (record != null && record.mEvents == events) { return; } } } if (events != 0) { events |= OnFileDescriptorEventListener.EVENT_ERROR; if (record == null) { if (mFileDescriptorRecords == null) { mFileDescriptorRecords = new SparseArray(); } //fd保存在FileDescriptorRecord对象 record = new FileDescriptorRecord(fd, events, listener); // mFileDescriptorRecords 保存 mFileDescriptorRecords.put(fdNum, record); } else { record.mListener = listener; record.mEvents = events; record.mSeq += 1; } // 调用native函数 nativeSetFileDescriptorEvents(mPtr, fdNum, events); } else if (record != null) { record.mEvents = 0; mFileDescriptorRecords.removeAt(index); } }
8.2.1、android_os_MessageQueue_nativeSetFileDescriptorEvents()函数
根据Android跨进程通信IPC之3——关于"JNI"的那些事中知道,nativeInit这个native方法对应的是android_os_MessageQueue.cpp里面的android_os_MessageQueue_nativeSetFileDescriptorEvents(JNIEnv* env, jclass clazz, jlong ptr, jint fd, jint events)函数
代码在android_os_MessageQueue.cpp 204行
static void android_os_MessageQueue_nativeSetFileDescriptorEvents(JNIEnv* env, jclass clazz, jlong ptr, jint fd, jint events) { NativeMessageQueue* nativeMessageQueue = reinterpret_cast(ptr); nativeMessageQueue->setFileDescriptorEvents(fd, events);}
我们看到这个函数里面调用了nativeMessageQueue的setFileDescriptorEvents(fd, events);函数。
8.2.2、NativeMessageQueue::setFileDescriptorEvents(int fd, int events)函数
代码在android_os_MessageQueue.cpp 125行
void NativeMessageQueue::setFileDescriptorEvents(int fd, int events) { if (events) { int looperEvents = 0; if (events & CALLBACK_EVENT_INPUT) { looperEvents |= Looper::EVENT_INPUT; } if (events & CALLBACK_EVENT_OUTPUT) { looperEvents |= Looper::EVENT_OUTPUT; } // 重点代码 mLooper->addFd(fd, Looper::POLL_CALLBACK, looperEvents, this, reinterpret_cast(events)); } else { mLooper->removeFd(fd); }}
我们看到了在这个函数内部调用了mLooper的addFd函数。
大家注意一下Looper的addFd函数,中的倒数二个参数是this,侧面说明了NativeMessageQueue继承了LooperCallback。
代码在android_os_MessageQueue.cpp 41行
class NativeMessageQueue : public MessageQueue, public LooperCallback {public: NativeMessageQueue(); virtual ~NativeMessageQueue(); virtual void raiseException(JNIEnv* env, const char* msg, jthrowable exceptionObj); void pollOnce(JNIEnv* env, jobject obj, int timeoutMillis); void wake(); void setFileDescriptorEvents(int fd, int events); virtual int handleEvent(int fd, int events, void* data); ...}
所以说,需要实现handleEvent()函数。handleEvent()函数就是在looper中epoll_wait之后,当我们增加的fd有数据就会调用这个函数。
代码在android_os_MessageQueue.cpp 141行
int NativeMessageQueue::handleEvent(int fd, int looperEvents, void* data) { int events = 0; if (looperEvents & Looper::EVENT_INPUT) { events |= CALLBACK_EVENT_INPUT; } if (looperEvents & Looper::EVENT_OUTPUT) { events |= CALLBACK_EVENT_OUTPUT; } if (looperEvents & (Looper::EVENT_ERROR | Looper::EVENT_HANGUP | Looper::EVENT_INVALID)) { events |= CALLBACK_EVENT_ERROR; } int oldWatchedEvents = reinterpret_cast(data); // 调用回调 int newWatchedEvents = mPollEnv->CallIntMethod(mPollObj, gMessageQueueClassInfo.dispatchEvents, fd, events); / if (!newWatchedEvents) { return 0; // unregister the fd } if (newWatchedEvents != oldWatchedEvents) { setFileDescriptorEvents(fd, newWatchedEvents); } return 1;}
最后在java的MessageQueue中的dispatchEvent就是在jni层反调过来的,然后调用之前注册的回调函数
代码在MessageQueue.java259行
// Called from native code. private int dispatchEvents(int fd, int events) { // Get the file descriptor record and any state that might change. final FileDescriptorRecord record; final int oldWatchedEvents; final OnFileDescriptorEventListener listener; final int seq; synchronized (this) { record = mFileDescriptorRecords.get(fd); if (record == null) { return 0; // spurious, no listener registered } oldWatchedEvents = record.mEvents; events &= oldWatchedEvents; // filter events based on current watched set if (events == 0) { return oldWatchedEvents; // spurious, watched events changed } listener = record.mListener; seq = record.mSeq; } // Invoke the listener outside of the lock. int newWatchedEvents = listener.onFileDescriptorEvents( record.mDescriptor, events); if (newWatchedEvents != 0) { newWatchedEvents |= OnFileDescriptorEventListener.EVENT_ERROR; } // Update the file descriptor record if the listener changed the set of // events to watch and the listener itself hasn't been updated since. if (newWatchedEvents != oldWatchedEvents) { synchronized (this) { int index = mFileDescriptorRecords.indexOfKey(fd); if (index >= 0 && mFileDescriptorRecords.valueAt(index) == record && record.mSeq == seq) { record.mEvents = newWatchedEvents; if (newWatchedEvents == 0) { mFileDescriptorRecords.removeAt(index); } } } } // Return the new set of events to watch for native code to take care of. return newWatchedEvents; }
四、总结
(一)、Native与Java的对应关系
对应图.pngMessageQueue通过mPtr变量保存了NativeMessageQueue对象,从而使得MessageQueue成为Java层和Native层的枢纽,既能处理上层消息,也能处理Native消息,下图列举了Java层与Native层的对应图
图解:
- 1、红色虚线关系:Java层和Native层的MessageQueue通过JNI建立关联,彼此之间能相互调用,搞明白这个互调关系,也就搞明白Java如何调用C++代码,C++代码如何调用Java代码
- 2、蓝色虚线关系:Handler/Looper/Message这三大类Java层与Native层并没有任何真正的关系,只是分别在Java层和Native层的handler消息模型中具有相似的功能。都是彼此独立的,各自实现相应的逻辑。
- 3、WeakMessageHandler继承与MessageHandler类,NativeMessageQueue继承于MessageQueue类。
另外,消息处理流程是先处理NativeMessage,再处理Native Request,最后处理Java Message。理解了该流程也就明白了有时上层消息很少,但响应时间却比较长的真正原因。
(二)、Native的流程
整体流程如下:
整体流程.png四 总结
Handler机制中Native的实现主要涉及了两个类
- 1、NativeMessageQueue:在MessageQueue.java的构造函数中,调用了nativeInit创建了NativeMessageQueue对象,并且把指针变量返回给Java层的mPtr。而在NativeMessageQueue的构造函数中,会在当前线程中创建C++的Looper对象。
- 2、Looper:控制eventfd的读写,通过epoll监听eventfd的变化,来阻塞调用pollOnce和恢复调用wake当前线程
- 通过 epoll监听其他文件描述符的变化
- 通过 epoll处理C++层的消息机制,当调用Looper::sendMessageAtTime后,调用wake触发epoll
- Looper的构造函数,创建一个eventfd(以前版本是pipe),eventfd它的主要用于进程或者线程间的通信,然后创建epoll来监听该eventfd的变化
- Looper::pollOnce(int timeoutMillis) 内部调用了pollInner,再调用epoll_wait(mEpollFd, ..., timeoutMillis)阻塞timeoutMills时间,并监听文件描述符mEpollFd的变化,当时间到了或者消息到了,即eventfd被写入内容后,从epoll_wait继续往下执行,处理epoll_wait返回的消息,该消息既有可能是eventfd产生的,也可能是其他文件描述符产生的。处理顺序是,先处理普通的C++消息队列mMessageEnvelopes,然后处理之前addFd的事件,最后从pollOnce返回,会继续MessageQueue.java的next()函数,取得Java层的消息来处理;
- Looper类的wake,函数只是往mWakeEventfd中写了一些内容,这个fd只是通知而已,类似pipe,最后会把epoll_wait唤醒,线程就不阻塞了,继续先发送C层消息,然后处理之前addFd事件,然后处理Java层消息
更多相关文章
- C语言函数以及函数的使用
- Android Handler机制7之消息发送
- 多点触控测试代码 PointerLocation
- Android静态代码分析
- 【Android】附加Android源代码Androidandroid_gingerbread_javas
- Android中对NFC的实现代码分布在如下几个地方:
- Android简明开发教程二十四篇及示例代码下载
- 【Android 开发】: Android 消息处理机制之四: Android 消息循环
- Android ROM研究---如何在ubuntu下下载姜饼(Gingerbread)源代码