热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

(转)Android消息处理机制(Handler、Looper、MessageQueue与Message)

转自http:www.cnblogs.comangeldevilp3340644.htmlAndroid消息处理机制(Handler、Looper、MessageQueue与Mes

转自 http://www.cnblogs.com/angeldevil/p/3340644.html

Android消息处理机制(Handler、Looper、MessageQueue与Message)

Android是消息驱动的,实现消息驱动有几个要素:

  1. 消息的表示:Message
  2. 消息队列:MessageQueue
  3. 消息循环,用于循环取出消息进行处理:Looper
  4. 消息处理,消息循环从消息队列中取出消息后要对消息进行处理:Handler

平时我们最常使用的就是Message与Handler了,如果使用过HandlerThread或者自己实现类似HandlerThread的东西可能还会接触到Looper,而MessageQueue是Looper内部使用的,对于标准的SDK,我们是无法实例化并使用的(构造函数是包可见性)。

我们平时接触到的Looper、Message、Handler都是用JAVA实现的,Android做为基于Linux的系统,底层用C、C++实现的,而且还有NDK的存在,消息驱动的模型怎么可能只存在于JAVA层,实际上,在Native层存在与Java层对应的类如Looper、MessageQueue等。

 初始化消息队列

首先来看一下如果一个线程想实现消息循环应该怎么做,以HandlerThread为例:

public void run() {mTid = Process.myTid();Looper.prepare();synchronized (this) {mLooper = Looper.myLooper();notifyAll();}Process.setThreadPriority(mPriority);onLooperPrepared();Looper.loop();mTid = -1;

主要是红色标明的两句,首先调用prepare初始化MessageQueue与Looper,然后调用loop进入消息循环。先看一下Looper.prepare。

public static void prepare() {prepare(true);
}private static void prepare(boolean quitAllowed) {if (sThreadLocal.get() != null) {throw new RuntimeException("Only one Looper may be created per thread");}sThreadLocal.set(new Looper(quitAllowed));
}

重载函数,quitAllowed默认为true,从名字可以看出来就是消息循环是否可以退出,默认是可退出的,Main线程(UI线程)初始化消息循环时会调用prepareMainLooper,传进去的是false。使用了ThreadLocal,每个线程可以初始化一个Looper。

再来看一下Looper在初始化时都做了什么:

private Looper(boolean quitAllowed) {mQueue = new MessageQueue(quitAllowed);mRun = true;mThread = Thread.currentThread();
}MessageQueue(boolean quitAllowed) {mQuitAllowed = quitAllowed;nativeInit();

在Looper初始化时,新建了一个MessageQueue的对象保存了在成员mQueue中。MessageQueue的构造函数是包可见性,所以我们是无法直接使用的,在MessageQueue初始化的时候调用了nativeInit,这是一个Native方法:

static void android_os_MessageQueue_nativeInit(JNIEnv* env, jobject obj) {NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();if (!nativeMessageQueue) {jniThrowRuntimeException(env, "Unable to allocate native queue");return;}nativeMessageQueue->incStrong(env);android_os_MessageQueue_setNativeMessageQueue(env, obj, nativeMessageQueue);
}static void android_os_MessageQueue_setNativeMessageQueue(JNIEnv* env, jobject messageQueueObj,NativeMessageQueue* nativeMessageQueue) {env->SetIntField(messageQueueObj, gMessageQueueClassInfo.mPtr,reinterpret_cast(nativeMessageQueue));
}

在nativeInit中,new了一个Native层的MessageQueue的对象,并将其地址保存在了Java层MessageQueue的成员mPtr中,Android中有好多这样的实现,一个类在Java层与Native层都有实现,通过JNI的GetFieldID与SetIntField把Native层的类的实例地址保存到Java层类的实例的mPtr成员中,比如Parcel。

再看NativeMessageQueue的实现:

NativeMessageQueue::NativeMessageQueue() : mInCallback(false), mExceptionObj(NULL) {mLooper = Looper::getForThread();if (mLooper == NULL) {mLooper = new Looper(false);Looper::setForThread(mLooper);}
}

在NativeMessageQueue的构造函数中获得了一个Native层的Looper对象,Native层的Looper也使用了线程本地存储,注意new Looper时传入了参数false。

Looper::Looper(bool allowNonCallbacks) :mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false),mResponseIndex(0), mNextMessageUptime(LLONG_MAX) {int wakeFds[2];int result &#61; pipe(wakeFds);LOG_ALWAYS_FATAL_IF(result !&#61; 0, "Could not create wake pipe. errno&#61;%d", errno);mWakeReadPipeFd &#61; wakeFds[0];mWakeWritePipeFd &#61; wakeFds[1];result &#61; fcntl(mWakeReadPipeFd, F_SETFL, O_NONBLOCK);LOG_ALWAYS_FATAL_IF(result !&#61; 0, "Could not make wake read pipe non-blocking. errno&#61;%d",errno);result &#61; fcntl(mWakeWritePipeFd, F_SETFL, O_NONBLOCK);LOG_ALWAYS_FATAL_IF(result !&#61; 0, "Could not make wake write pipe non-blocking. errno&#61;%d",errno);// Allocate the epoll instance and register the wake pipe.mEpollFd &#61; epoll_create(EPOLL_SIZE_HINT);LOG_ALWAYS_FATAL_IF(mEpollFd <0, "Could not create epoll instance. errno&#61;%d", errno);struct epoll_event eventItem;memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field unioneventItem.events &#61; EPOLLIN;eventItem.data.fd &#61; mWakeReadPipeFd;result &#61; epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeReadPipeFd, & eventItem);LOG_ALWAYS_FATAL_IF(result !&#61; 0, "Could not add wake read pipe to epoll instance. errno&#61;%d",errno);
}

Native层的Looper使用了epoll。初始化了一个管道&#xff0c;用mWakeWritePipeFd与mWakeReadPipeFd分别保存了管道的写端与读端&#xff0c;并监听了读端的EPOLLIN事件。注意下初始化列表的值&#xff0c;mAllowNonCallbacks的值为false。

mAllowNonCallback是做什么的&#xff1f;使用epoll仅为了监听mWakeReadPipeFd的事件&#xff1f;其实Native Looper不仅可以监听这一个描述符&#xff0c;Looper还提供了addFd方法&#xff1a;

int addFd(int fd, int ident, int events, ALooper_callbackFunc callback, void* data);
int addFd(int fd, int ident, int events, const sp& callback, void* data);

fd表示要监听的描述符。ident表示要监听的事件的标识&#xff0c;值必须>&#61;0或者为ALOOPER_POLL_CALLBACK(-2)&#xff0c;event表示要监听的事件&#xff0c;callback是事件发生时的回调函数&#xff0c;mAllowNonCallbacks的作用就在于此&#xff0c;当mAllowNonCallbacks为true时允许callback为NULL&#xff0c;在pollOnce中ident作为结果返回&#xff0c;否则不允许callback为空&#xff0c;当callback不为NULL时&#xff0c;ident的值会被忽略。还是直接看代码方便理解&#xff1a;

int Looper::addFd(int fd, int ident, int events, const sp& callback, void* data) {
#if DEBUG_CALLBACKSALOGD("%p ~ addFd - fd&#61;%d, ident&#61;%d, events&#61;0x%x, callback&#61;%p, data&#61;%p", this, fd, ident,events, callback.get(), data);
#endifif (!callback.get()) {if (! mAllowNonCallbacks) {ALOGE("Invalid attempt to set NULL callback but not allowed for this looper.");return -1;}if (ident <0) {ALOGE("Invalid attempt to set NULL callback with ident <0.");return -1;}} else {ident &#61; ALOOPER_POLL_CALLBACK;}int epollEvents &#61; 0;if (events & ALOOPER_EVENT_INPUT) epollEvents |&#61; EPOLLIN;if (events & ALOOPER_EVENT_OUTPUT) epollEvents |&#61; EPOLLOUT;{ // acquire lockAutoMutex _l(mLock);Request request;request.fd &#61; fd;request.ident &#61; ident;request.callback &#61; callback;request.data &#61; data;struct epoll_event eventItem;memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field unioneventItem.events &#61; epollEvents;eventItem.data.fd &#61; fd;ssize_t requestIndex &#61; mRequests.indexOfKey(fd);if (requestIndex <0) {int epollResult &#61; epoll_ctl(mEpollFd, EPOLL_CTL_ADD, fd, & eventItem);if (epollResult <0) {ALOGE("Error adding epoll events for fd %d, errno&#61;%d", fd, errno);return -1;}mRequests.add(fd, request);} else {int epollResult &#61; epoll_ctl(mEpollFd, EPOLL_CTL_MOD, fd, & eventItem);if (epollResult <0) {ALOGE("Error modifying epoll events for fd %d, errno&#61;%d", fd, errno);return -1;}mRequests.replaceValueAt(requestIndex, request);}} // release lockreturn 1;
}

如果callback为空会检查mAllowNonCallbacks看是否允许callback为空&#xff0c;如果允许callback为空还会检测ident是否>&#61;0。如果callback不为空会把ident的值赋值为ALOOPER_POLL_CALLBACK&#xff0c;不管传进来的是什么值。

接下来把传进来的参数值封装到一个Request结构体中&#xff0c;并以描述符为键保存到一个KeyedVector mRequests中&#xff0c;然后通过epoll_ctl添加或替换&#xff08;如果这个描述符之前有调用addFD添加监听&#xff09;对这个描述符事件的监听。

类图&#xff1a;

  

发送消息

通过Looper.prepare初始化好消息队列后就可以调用Looper.loop进入消息循环了&#xff0c;然后我们就可以向消息队列发送消息&#xff0c;消息循环就会取出消息进行处理&#xff0c;在看消息处理之前&#xff0c;先看一下消息是怎么被添加到消息队列的。

在Java层&#xff0c;Message类表示一个消息对象&#xff0c;要发送消息首先就要先获得一个消息对象&#xff0c;Message类的构造函数是public的&#xff0c;但是不建议直接new Message&#xff0c;Message内部保存了一个缓存的消息池&#xff0c;我们可以用obtain从缓存池获得一个消息&#xff0c;Message使用完后系统会调用recycle回收&#xff0c;如果自己new很多Message&#xff0c;每次使用完后系统放入缓存池&#xff0c;会占用很多内存的&#xff0c;如下所示&#xff1a;

public static Message obtain() {synchronized (sPoolSync) {if (sPool !&#61; null) {Message m &#61; sPool;sPool &#61; m.next;m.next &#61; null;sPoolSize--;return m;}}return new Message();}public void recycle() {clearForRecycle();synchronized (sPoolSync) {if (sPoolSize

Message内部通过next成员实现了一个链表&#xff0c;这样sPool就了为了一个Messages的缓存链表。

消息对象获取到了怎么发送呢&#xff0c;大家都知道是通过Handler的post、sendMessage等方法&#xff0c;其实这些方法最终都是调用的同一个方法sendMessageAtTime:

public boolean sendMessageAtTime(Message msg, long uptimeMillis) {MessageQueue queue &#61; mQueue;if (queue &#61;&#61; null) {RuntimeException e &#61; new RuntimeException(this &#43; " sendMessageAtTime() called with no mQueue");Log.w("Looper", e.getMessage(), e);return false;}return enqueueMessage(queue, msg, uptimeMillis);} 

sendMessageAtTime获取到消息队列然后调用enqueueMessage方法&#xff0c;消息队列mQueue是从与Handler关联的Looper获得的。

private boolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis) {msg.target &#61; this;if (mAsynchronous) {msg.setAsynchronous(true);}return queue.enqueueMessage(msg, uptimeMillis);}

enqueueMessage将message的target设置为当前的handler&#xff0c;然后调用MessageQueue的enqueueMessage&#xff0c;在调用queue.enqueueMessage之前判断了mAsynchronous&#xff0c;从名字看是异步消息的意思&#xff0c;要明白Asynchronous的作用&#xff0c;需要先了解一个概念Barrier。

Barrier与Asynchronous Message

Barrier是什么意思呢&#xff0c;从名字看是一个拦截器&#xff0c;在这个拦截器后面的消息都暂时无法执行&#xff0c;直到这个拦截器被移除了&#xff0c;MessageQueue有一个函数叫enqueueSyncBarier可以添加一个Barrier。

int enqueueSyncBarrier(long when) {// Enqueue a new sync barrier token.// We don&#39;t need to wake the queue because the purpose of a barrier is to stall it.synchronized (this) {final int token &#61; mNextBarrierToken&#43;&#43;;final Message msg &#61; Message.obtain();msg.arg1 &#61; token;Message prev &#61; null;Message p &#61; mMessages;if (when !&#61; 0) {while (p !&#61; null && p.when <&#61; when) {prev &#61; p;p &#61; p.next;}}if (prev !&#61; null) { // invariant: p &#61;&#61; prev.nextmsg.next &#61; p;prev.next &#61; msg;} else {msg.next &#61; p;mMessages &#61; msg;}return token;}}

在enqueueSyncBarrier中&#xff0c;obtain了一个Message&#xff0c;并设置msg.arg1&#61;token&#xff0c;token仅是一个每次调用enqueueSyncBarrier时自增的int值&#xff0c;目的是每次调用enqueueSyncBarrier时返回唯一的一个token&#xff0c;这个Message同样需要设置执行时间&#xff0c;然后插入到消息队列&#xff0c;特殊的是这个Message没有设置target&#xff0c;即msg.target为null。

进入消息循环后会不停地从MessageQueue中取消息执行&#xff0c;调用的是MessageQueue的next函数&#xff0c;其中有这么一段&#xff1a;

Message msg &#61; mMessages;
if (msg !&#61; null && msg.target &#61;&#61; null) {// Stalled by a barrier. Find the next asynchronous message in the queue.do {prevMsg &#61; msg;msg &#61; msg.next;} while (msg !&#61; null && !msg.isAsynchronous());
}

如果队列头部的消息的target为null就表示它是个Barrier&#xff0c;因为只有两种方法往mMessages中添加消息&#xff0c;一种是enqueueMessage&#xff0c;另一种是enqueueBarrier&#xff0c;而enqueueMessage中如果mst.target为null是直接抛异常的&#xff0c;后面会看到。

所谓的异步消息其实就是这样的&#xff0c;我们可以通过enqueueBarrier往消息队列中插入一个Barrier&#xff0c;那么队列中执行时间在这个Barrier以后的同步消息都会被这个Barrier拦截住无法执行&#xff0c;直到我们调用removeBarrier移除了这个Barrier&#xff0c;而异步消息则没有影响&#xff0c;消息默认就是同步消息&#xff0c;除非我们调用了Message的setAsynchronous&#xff0c;这个方法是隐藏的。只有在初始化Handler时通过参数指定往这个Handler发送的消息都是异步的&#xff0c;这样在Handler的enqueueMessage中就会调用Message的setAsynchronous设置消息是异步的&#xff0c;从上面Handler.enqueueMessage的代码中可以看到。

 所谓异步消息&#xff0c;其实只有一个作用&#xff0c;就是在设置Barrier时仍可以不受Barrier的影响被正常处理&#xff0c;如果没有设置Barrier&#xff0c;异步消息就与同步消息没有区别&#xff0c;可以通过removeSyncBarrier移除Barrier&#xff1a;

void removeSyncBarrier(int token) {// Remove a sync barrier token from the queue.// If the queue is no longer stalled by a barrier then wake it.final boolean needWake;synchronized (this) {Message prev &#61; null;Message p &#61; mMessages;while (p !&#61; null && (p.target !&#61; null || p.arg1 !&#61; token)) {prev &#61; p;p &#61; p.next;}if (p &#61;&#61; null) {throw new IllegalStateException("The specified message queue synchronization "&#43; " barrier token has not been posted or has already been removed.");}if (prev !&#61; null) {prev.next &#61; p.next;needWake &#61; false;} else {mMessages &#61; p.next;needWake &#61; mMessages &#61;&#61; null || mMessages.target !&#61; null;}p.recycle();}if (needWake) {nativeWake(mPtr);}
}

参数token就是enqueueSyncBarrier的返回值&#xff0c;如果没有调用指定的token不存在是会抛异常的。

enqueueMessage

接下来看一下是怎么MessageQueue的enqueueMessage。

final boolean enqueueMessage(Message msg, long when) {if (msg.isInUse()) {throw new AndroidRuntimeException(msg &#43; " This message is already in use.");}if (msg.target &#61;&#61; null) {throw new AndroidRuntimeException("Message must have a target.");}boolean needWake;synchronized (this) {if (mQuiting) {RuntimeException e &#61; new RuntimeException(msg.target &#43; " sending message to a Handler on a dead thread");Log.w("MessageQueue", e.getMessage(), e);return false;}msg.when &#61; when;Message p &#61; mMessages;if (p &#61;&#61; null || when &#61;&#61; 0 || when

注意上面代码红色的部分&#xff0c;当msg.target为null时是直接抛异常的。

在enqueueMessage中首先判断&#xff0c;如果当前的消息队列为空&#xff0c;或者新添加的消息的执行时间when是0&#xff0c;或者新添加的消息的执行时间比消息队列头的消息的执行时间还早&#xff0c;就把消息添加到消息队列头&#xff08;消息队列按时间排序&#xff09;&#xff0c;否则就要找到合适的位置将当前消息添加到消息队列。

Native发送消息

消息模型不只是Java层用的&#xff0c;Native层也可以用&#xff0c;前面也看到了消息队列初始化时也同时初始化了Native层的Looper与NativeMessageQueue&#xff0c;所以Native层应该也是可以发送消息的。与Java层不同的是&#xff0c;Native层是通过Looper发消息的&#xff0c;同样所有的发送方法最终是调用sendMessageAtTime&#xff1a;

void Looper::sendMessageAtTime(nsecs_t uptime, const sp& handler,const Message& message) {
#if DEBUG_CALLBACKSALOGD("%p ~ sendMessageAtTime - uptime&#61;%lld, handler&#61;%p, what&#61;%d",this, uptime, handler.get(), message.what);
#endifsize_t i &#61; 0;{ // acquire lockAutoMutex _l(mLock);size_t messageCount &#61; mMessageEnvelopes.size();while (i &#61; mMessageEnvelopes.itemAt(i).uptime) {i &#43;&#61; 1;}MessageEnvelope messageEnvelope(uptime, handler, message);mMessageEnvelopes.insertAt(messageEnvelope, i, 1);// Optimization: If the Looper is currently sending a message, then we can skip// the call to wake() because the next thing the Looper will do after processing// messages is to decide when the next wakeup time should be. In fact, it does// not even matter whether this code is running on the Looper thread.if (mSendingMessage) {return;}} // release lock// Wake the poll loop only when we enqueue a new message at the head.if (i &#61;&#61; 0) {wake();}
}

 Native Message只有一个int型的what字段用来区分不同的消息&#xff0c;sendMessageAtTime指定了Message&#xff0c;Message要执行的时间when&#xff0c;与处理这个消息的Handler&#xff1a;MessageHandler&#xff0c;然后用MessageEnvelope封装了time, MessageHandler与Message&#xff0c;Native层发的消息都保存到了mMessageEnvelopes中&#xff0c;mMessageEnvelopes是一个Vector。Native层消息同样是按时间排序&#xff0c;与Java层的消息分别保存在两个队列里。

消息循环

消息队列初始化好了&#xff0c;也知道怎么发消息了&#xff0c;下面就是怎么处理消息了&#xff0c;看Handler.loop函数&#xff1a;

public static void loop() {final Looper me &#61; myLooper();if (me &#61;&#61; null) {throw new RuntimeException("No Looper; Looper.prepare() wasn&#39;t called on this thread.");}final MessageQueue queue &#61; me.mQueue;// Make sure the identity of this thread is that of the local process,// and keep track of what that identity token actually is.Binder.clearCallingIdentity();final long ident &#61; Binder.clearCallingIdentity();for (;;) {Message msg &#61; queue.next(); // might blockif (msg &#61;&#61; null) {// No message indicates that the message queue is quitting.return;}// This must be in a local variable, in case a UI event sets the loggerPrinter logging &#61; me.mLogging;if (logging !&#61; null) {logging.println(">>>>> Dispatching to " &#43; msg.target &#43; " " &#43;msg.callback &#43; ": " &#43; msg.what);}msg.target.dispatchMessage(msg);if (logging !&#61; null) {logging.println("<<<<

loop每次从MessageQueue取出一个Message&#xff0c;调用msg.target.dispatchMessage(msg)&#xff0c;target就是发送message时跟message关联的handler&#xff0c;这样就调用到了熟悉的dispatchMessage&#xff0c;Message被处理后会被recycle。当queue.next返回null时会退出消息循环&#xff0c;接下来就看一下MessageQueue.next是怎么取出消息的&#xff0c;又会在什么时候返回null。

final Message next() {int pendingIdleHandlerCount &#61; -1; // -1 only during first iterationint nextPollTimeoutMillis &#61; 0;for (;;) {if (nextPollTimeoutMillis !&#61; 0) {Binder.flushPendingCommands();}nativePollOnce(mPtr, nextPollTimeoutMillis);synchronized (this) {if (mQuiting) {return null;}// Try to retrieve the next message. Return if found.final long now &#61; SystemClock.uptimeMillis();Message prevMsg &#61; null;Message msg &#61; mMessages;if (msg !&#61; null && msg.target &#61;&#61; null) {// Stalled by a barrier. Find the next asynchronous message in the queue.do {prevMsg &#61; msg;msg &#61; msg.next;} while (msg !&#61; null && !msg.isAsynchronous());}if (msg !&#61; null) {if (now

MessageQueue.next首先会调用nativePollOnce&#xff0c;然后如果mQuiting为true就返回null&#xff0c;Looper就会退出消息循环。

接下来取消息队列头部的消息&#xff0c;如果头部消息是Barrier&#xff08;target&#61;&#61;null&#xff09;就往后遍历找到第一个异步消息&#xff0c;接下来检测获取到的消息&#xff08;消息队列头部的消息或者第一个异步消息&#xff09;&#xff0c;如果为null表示没有消息要执行&#xff0c;设置nextPollTimeoutMillis &#61; -1&#xff1b;否则检测这个消息要执行的时间&#xff0c;如果到执行时间了就将这个消息markInUse并从消息队列移除&#xff0c;然后从next返回到loop&#xff1b;否则设置nextPollTimeoutMillis &#61; (int) Math.min(msg.when - now, Integer.MAX_VALUE)&#xff0c;即距离最近要执行的消息还需要多久&#xff0c;无论是当前消息队列没有消息可以执行&#xff08;设置了Barrier并且没有异步消息或消息队列为空&#xff09;还是队列头部的消息未到执行时间&#xff0c;都会执行后面的代码&#xff0c;看有没有设置IdleHandler&#xff0c;如果有就运行IdleHandler&#xff0c;当IdleHandler被执行之后会设置nextPollTimeoutMillis &#xff1d; 0。

首先看一下nativePollOnce&#xff0c;native方法&#xff0c;调用JNI&#xff0c;最后调到了Native Looper::pollOnce&#xff0c;并从Java层传进去了nextPollTimeMillis&#xff0c;即Java层的消息队列中执行时间最近的消息还要多久到执行时间。

int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {int result &#61; 0;for (;;) {while (mResponseIndex &#61; 0) {int fd &#61; response.request.fd;int events &#61; response.events;void* data &#61; response.request.data;
#if DEBUG_POLL_AND_WAKEALOGD("%p ~ pollOnce - returning signalled identifier %d: ""fd&#61;%d, events&#61;0x%x, data&#61;%p",this, ident, fd, events, data);
#endifif (outFd !&#61; NULL) *outFd &#61; fd;if (outEvents !&#61; NULL) *outEvents &#61; events;if (outData !&#61; NULL) *outData &#61; data;return ident;}}if (result !&#61; 0) {
#if DEBUG_POLL_AND_WAKEALOGD("%p ~ pollOnce - returning result %d", this, result);
#endifif (outFd !&#61; NULL) *outFd &#61; 0;if (outEvents !&#61; NULL) *outEvents &#61; 0;if (outData !&#61; NULL) *outData &#61; NULL;return result;}result &#61; pollInner(timeoutMillis);}
}

先不看开始的一大串代码&#xff0c;先看一下pollInner&#xff1a;

int Looper::pollInner(int timeoutMillis) {
#if DEBUG_POLL_AND_WAKEALOGD("%p ~ pollOnce - waiting: timeoutMillis&#61;%d", this, timeoutMillis);
#endif// Adjust the timeout based on when the next message is due.if (timeoutMillis !&#61; 0 && mNextMessageUptime !&#61; LLONG_MAX) {nsecs_t now &#61; systemTime(SYSTEM_TIME_MONOTONIC);int messageTimeoutMillis &#61; toMillisecondTimeoutDelay(now, mNextMessageUptime);if (messageTimeoutMillis >&#61; 0&& (timeoutMillis <0 || messageTimeoutMillis #if DEBUG_POLL_AND_WAKEALOGD("%p ~ pollOnce - next message in %lldns, adjusted timeout: timeoutMillis&#61;%d",this, mNextMessageUptime - now, timeoutMillis);
#endif}// Poll.int result &#61; ALOOPER_POLL_WAKE;mResponses.clear();mResponseIndex &#61; 0;struct epoll_event eventItems[EPOLL_MAX_EVENTS];int eventCount &#61; epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);// Acquire lock.mLock.lock();// Check for poll error.if (eventCount <0) {if (errno &#61;&#61; EINTR) {goto Done;}ALOGW("Poll failed with an unexpected error, errno&#61;%d", errno);result &#61; ALOOPER_POLL_ERROR;goto Done;}// Check for poll timeout.if (eventCount &#61;&#61; 0) {
#if DEBUG_POLL_AND_WAKEALOGD("%p ~ pollOnce - timeout", this);
#endifresult &#61; ALOOPER_POLL_TIMEOUT;goto Done;}// Handle all events.
#if DEBUG_POLL_AND_WAKEALOGD("%p ~ pollOnce - handling events from %d fds", this, eventCount);
#endiffor (int i &#61; 0; i &#61; 0) {int events &#61; 0;if (epollEvents & EPOLLIN) events |&#61; ALOOPER_EVENT_INPUT;if (epollEvents & EPOLLOUT) events |&#61; ALOOPER_EVENT_OUTPUT;if (epollEvents & EPOLLERR) events |&#61; ALOOPER_EVENT_ERROR;if (epollEvents & EPOLLHUP) events |&#61; ALOOPER_EVENT_HANGUP;pushResponse(events, mRequests.valueAt(requestIndex));} else {ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is ""no longer registered.", epollEvents, fd);}}}
Done: ;// Invoke pending message callbacks.mNextMessageUptime &#61; LLONG_MAX;while (mMessageEnvelopes.size() !&#61; 0) {nsecs_t now &#61; systemTime(SYSTEM_TIME_MONOTONIC);const MessageEnvelope& messageEnvelope &#61; mMessageEnvelopes.itemAt(0);if (messageEnvelope.uptime <&#61; now) {// Remove the envelope from the list.// We keep a strong reference to the handler until the call to handleMessage// finishes. Then we drop it so that the handler can be deleted *before*// we reacquire our lock.{ // obtain handlersp handler &#61; messageEnvelope.handler;Message message &#61; messageEnvelope.message;mMessageEnvelopes.removeAt(0);mSendingMessage &#61; true;mLock.unlock();#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKSALOGD("%p ~ pollOnce - sending message: handler&#61;%p, what&#61;%d",this, handler.get(), message.what);
#endifhandler->handleMessage(message);} // release handlermLock.lock();mSendingMessage &#61; false;result &#61; ALOOPER_POLL_CALLBACK;} else {// The last message left at the head of the queue determines the next wakeup time.mNextMessageUptime &#61; messageEnvelope.uptime;break;}}// Release lock.mLock.unlock();// Invoke all response callbacks.for (size_t i &#61; 0; i #if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKSALOGD("%p ~ pollOnce - invoking fd event callback %p: fd&#61;%d, events&#61;0x%x, data&#61;%p",this, response.request.callback.get(), fd, events, data);
#endifint callbackResult &#61; response.request.callback->handleEvent(fd, events, data);if (callbackResult &#61;&#61; 0) {removeFd(fd);}// Clear the callback reference in the response structure promptly because we// will not clear the response vector itself until the next poll.response.request.callback.clear();result &#61; ALOOPER_POLL_CALLBACK;}}return result;
}

Java层的消息都保存在了Java层MessageQueue的成员mMessages中&#xff0c;Native层的消息都保存在了Native Looper的mMessageEnvelopes中&#xff0c;这就可以说有两个消息队列&#xff0c;而且都是按时间排列的。timeOutMillis表示Java层下个要执行的消息还要多久执行&#xff0c;mNextMessageUpdate表示Native层下个要执行的消息还要多久执行&#xff0c;如果timeOutMillis为0&#xff0c;epoll_wait不设置TimeOut直接返回&#xff1b;如果为-1说明Java层无消息直接用Native的time out&#xff1b;否则pollInner取这两个中的最小值作为timeOut调用epoll_wait。当epoll_wait返回时就可能有以下几种情况&#xff1a;

  1. 出错返回。

  2. Time Out

  3. 正常返回&#xff0c;描述符上有事件产生。

如果是前两种情况直接goto DONE。

否则就说明FD上有事件发生了&#xff0c;如果是mWakeReadPipeFd的EPOLLIN事件就调用awoken&#xff0c;如果不是mWakeReadPipeFd&#xff0c;那就是通过addFD添加的fd&#xff0c;在addFD中将要监听的fd及其events&#xff0c;callback,data封装成了Request对象&#xff0c;并以fd为键保存到了KeyedVector mRequests中&#xff0c;所以在这里就以fd为键获得在addFD时关联的Request&#xff0c;并连同events通过pushResonse加入mResonse队列&#xff08;Vector&#xff09;&#xff0c;Resonse仅是对events与Request的封装。如果是epoll_wait出错或timeout&#xff0c;就没有描述符上有事件&#xff0c;就不用执行这一段代码&#xff0c;所以直接goto DONE了。

void Looper::pushResponse(int events, const Request& request) {Response response;response.events &#61; events;response.request &#61; request;mResponses.push(response);
}

接下来进入DONE部分&#xff0c;从mMessageEnvelopes取出头部的Native消息&#xff0c;如果到达了执行时间就调用它内部保存的MessageeHandler的handleMessage处理并从Native 消息队列移除&#xff0c;设置result为ALOOPER_POLL_CALLBACK&#xff0c;否则计算mNextMessageUptime表示Native消息队列下一次消息要执行的时间。如果未到头部消息的执行时间有可能是Java层消息队列消息的执行时间小于Native层消息队列头部消息的执行时间&#xff0c;到达了Java层消息的执行时间epoll_wait TimeOut返回了&#xff0c;或都通过addFd添加的描述符上有事件发生导致epoll_wait返回&#xff0c;或者epoll_wait是出错返回。Native消息是没有Barrier与Asynchronous的。

最后&#xff0c;遍历mResponses&#xff08;前面刚通过pushResponse存进去的&#xff09;&#xff0c;如果response.request.ident &#61;&#61;ALOOPER_POLL_CALLBACK&#xff0c;就调用注册的callback的handleEvent(fd, events, data)进行处理&#xff0c;然后从mResonses队列中移除&#xff0c;这次遍历完之后&#xff0c;mResponses中保留来来的就都是ident>&#61;0并且callback为NULL的了。在NativeMessageQueue初始化Looper时传入了mAllowNonCallbacks为false&#xff0c;所以这次处理完后mResponses一定为空。

接下来返回到pollOnce。pollOnce是一个for循环&#xff0c;pollInner中处理了所有response.request.ident&#61;&#61;ALOOPER_POLL_CALLBACK的Response&#xff0c;在第二次进入for循环后如果mResponses不为空就可以找到ident>0的Response&#xff0c;将其ident作为返回值返回由调用pollOnce的函数自己处理&#xff0c;在这里我们是在NativeMessageQueue中调用的Loope的pollOnce&#xff0c;没对返回值进行处理&#xff0c;而且mAllowNonCallbacks为false也就不可能进入这个循环。pollInner返回值不可能是0&#xff0c;或者说只可能是负数&#xff0c;所以pollOnce中的for循环只会执行两次&#xff0c;在第二次就返回了。

Native Looper可以单独使用&#xff0c;也有一个prepare函数&#xff0c;这时mAllowNonCallbakcs值可能为true&#xff0c;pollOnce中对mResponses的处理就有意义了。

 wake与awoken

在Native Looper的构造函数中&#xff0c;通过pipe打开了一个管道&#xff0c;并用mWakeReadPipeFd与mWakeWritePipeFd分别保存了管道的读端与写端&#xff0c;然后用epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeReadPipeFd,& eventItem)监听了读端的EPOLLIN事件&#xff0c;在pollInner中通过epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis)读取事件&#xff0c;那是在什么时候往mWakeWritePipeFd写&#xff0c;又是在什么时候读的mWakeReadPipeFd呢&#xff1f;

在Looper.cpp中我们可以发现如下两个函数&#xff1a;

void Looper::wake() {
#if DEBUG_POLL_AND_WAKEALOGD("%p ~ wake", this);
#endifssize_t nWrite;do {nWrite &#61; write(mWakeWritePipeFd, "W", 1);} while (nWrite &#61;&#61; -1 && errno &#61;&#61; EINTR);if (nWrite !&#61; 1) {if (errno !&#61; EAGAIN) {ALOGW("Could not write wake signal, errno&#61;%d", errno);}}
}void Looper::awoken() {
#if DEBUG_POLL_AND_WAKEALOGD("%p ~ awoken", this);
#endifchar buffer[16];ssize_t nRead;do {nRead &#61; read(mWakeReadPipeFd, buffer, sizeof(buffer));} while ((nRead &#61;&#61; -1 && errno &#61;&#61; EINTR) || nRead &#61;&#61; sizeof(buffer));
}

wake函数向mWakeWritePipeFd写入了一个“W”字符&#xff0c;awoken从mWakeReadPipeFd读&#xff0c;往mWakeWritePipeFd写数据只是为了在pollInner中的epoll_wait可以监听到事件返回。在pollInner也可以看到如果是mWakeReadPipeFd的EPOLLIN事件只是调用了awoken消耗掉了写入的字符就往后处理了。

那什么时候调用wake呢&#xff1f;这个只要找到调用的地方分析一下就行了&#xff0c;先看Looper.cpp&#xff0c;在sendMessageAtTime即发送Native Message的时候&#xff0c;根据发送的Message的执行时间查找mMessageEnvelopes计算应该插入的位置&#xff0c;如果是在头部插入&#xff0c;就调用wake唤醒epoll_wait&#xff0c;因为在进入pollInner时根据Java层消息队列头部消息的执行时间与Native层消息队列头部消息的执行时间计算出了一个timeout&#xff0c;如果这个新消息是在头部插入&#xff0c;说明执行时间至少在上述两个消息中的一个之前&#xff0c;所以应该唤醒epoll_wait&#xff0c;epoll_wait返回后&#xff0c;检查Native消息队列&#xff0c;看头部消息即刚插入的消息是否到执行时间了&#xff0c;到了就执行&#xff0c;否则就可能需要设置新的timeout。同样在Java层的MessageQueue中&#xff0c;有一个函数nativeWake也同样可以通过JNI调用wake&#xff0c;调用nativeWake的时机与在Native调用wake的时机类似&#xff0c;在消息队列头部插入消息&#xff0c;还有一种情况就是&#xff0c;消息队列头部是一个Barrier&#xff0c;而且插入的消息是第一个异步消息。

if (p &#61;&#61; null || when &#61;&#61; 0 || when } else {// Inserted within the middle of the queue. Usually we don&#39;t have to wake// up the event queue unless there is a barrier at the head of the queue// and the message is the earliest asynchronous message in the queue.needWake &#61; mBlocked && p.target &#61;&#61; null && msg.isAsynchronous();//如果头部是Barrier并且新消息是异步消息则“有可能”需要唤醒Message prev;for (;;) {prev &#61; p;p &#61; p.next;if (p &#61;&#61; null || when }

在头部插入消息不一定调用nativeWake&#xff0c;因为之前可能正在执行IdleHandler&#xff0c;如果执行了IdleHandler&#xff0c;就在IdleHandler执行后把nextPollTimeoutMillis设置为0&#xff0c;下次进入for循环就用0调用nativePollOnce&#xff0c;不需要wake&#xff0c;只有在没有消息可以执行&#xff08;消息队列为空或没到执行时间&#xff09;并且没有设置IdleHandler时mBlocked才会为true。

如果Java层的消息队列被Barrier Block住了并且当前插入的是一个异步消息有可能需要唤醒Looper&#xff0c;因为异步消息可以在Barrier下执行&#xff0c;但是这个异步消息一定要是执行时间最早的异步消息。

退出Looper也需要wake&#xff0c;removeSyncBarrier时也可能需要。


转:https://www.cnblogs.com/lwlapp/p/5444364.html



推荐阅读
author-avatar
yi品天下
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有