上一篇中关于ThreadLocal的使用,遗漏了一个点:ThreadLocal的回收,使用不当会操作内存泄漏。通过上一篇的分析我们知道了ThreadLocalMap.Entry中的key时ThreadLocal的弱引用,而value需要在堆上分配的内存,当key的强引用断开之后,在gc的时候会被回收,就过时key为null,但是value还存在。如果处理不当,可能会引发内存泄露。
图片来自:彻底搞清楚ThreadLocal与弱引用
那么key为null了value何时被清除呐?
ThreadLocal的设计者考虑到这个问题,在每次调用ThreadLocal的set/get方法时会清除key为null的value。但是谁知道下次什么时候调用set/get,这是一种被动的清除方式。
除此之外ThreadLocal还提供了remove方法,可以有我们自动的进行清除。当ThreadLocal不再使用的时候,要记得调用remove。示例如下
public class ExampleUnitTest {
private static ThreadLocal sStrThreadlocal = new ThreadLocal();
private static ThreadLocal sIntegerThreadLocal = new ThreadLocal();
@Test
public void testThreadLocal(){
//在主线程给Threadlocal赋值,请取出输出
sStrThreadlocal.set("aaa");
String value = sStrThreadlocal.get();
sIntegerThreadLocal.set(1);
Integer intValue = sIntegerThreadLocal.get();
System.out.println("111 curThreadId="+Thread.currentThread()+" strthreadLocalValue="+value
+" intThreadLocalValue="+intValue);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
//最后在输出下主线程的ThreadLocal值
value = sStrThreadlocal.get();
intValue = sIntegerThreadLocal.get();
System.out.println("444 curThreadId="+Thread.currentThread()+" strthreadLocalValue="+value
+" intThreadLocalValue="+intValue);
//用完之后记得remove,虽然,ThreadLocalMap.Entry的key是ThreadLocal的弱引用,但是value占用的内存空间还在。
//被回收的场景有两个,再次调用set或者get方法是会坚持map中是否有key为空的Entry(由于key是弱引用,外部强引用依赖断开后,gc时就会回收该key,
// 回收后即可null),如果有则清除。
//但是不知道什么时候再次调用set或者get,这种被动的方式只能说是做了个保证,如果没有调用set/get,就可能引发内存泄露
//ThreadLocal提供了一个remove方法,由我们主动清除ThreadLocal对应的ThreadLocalMap.Entry中的堆内存
sStrThreadlocal.remove();
sIntegerThreadLocal.remove();
value = sStrThreadlocal.get();
intValue = sIntegerThreadLocal.get();
System.out.println("after remove 444 curThreadId="+Thread.currentThread()+" strthreadLocalValue="+value
+" intThreadLocalValue="+intValue);
}
}
对应的输出为
111 curThreadId=Thread[main,5,main] strthreadLocalValue=aaa intThreadLocalValue=1
444 curThreadId=Thread[main,5,main] strthreadLocalValue=aaa intThreadLocalValue=1
after remove 444 curThreadId=Thread[main,5,main] strthreadLocalValue=null intThreadLocalValue=null
下面开始本篇的分析学习
我们在前面两篇关于Android消息机制的分析学习中了解到MessageQueue中涉及到何Native的交互,比如nativeInit初始化Native层的MessageQueue;nativePollOnce阻塞;nativeWake唤醒。
问题是,为什么要这样设计?直接Java层不是一样可以管理维护消息队列以及消息的读取分发吗?nativePollOnce真正的意义是什么?这个问题一致困挠着我。通过反复查资料翻源码,终于了解了其流程。在消息机制Native层实现之前我们先了解下其中用到的一些Linux基础知识。否则的话,很容易被卡住。
Linux常用的进程/线程间通信机制有管道、信号量、消息队列、信号、共享内存、socket等等,其中主要作为进程/线程间通知/等待的有管道pipe和socket。从Linux 2.6.27版本开始增加了eventfd,主要用于进程或者线程间的通信,eventfd用于进程/线程间通信,效率比pipe高.
epoll是Linux中最高效的IO多路复用机制,可以同时监控多个描述符,当某个描述符就绪(读或写就绪),则立刻通知相应程序进行读或写操作
epoll有3个方法,epoll_create(), epoll_ctl(),epoll_wait()_
epoll_create :创建epoll句柄
int epoll_create(int size);
:用于创建一个epoll的句柄,size是指监听的描述符个数, 现在内核支持动态扩展,该值的意义仅仅是初次分配的fd个数,后面空间不够时会动态扩容。 当创建完epoll句柄后,占用一个fd值.
使用完epoll后,必须调用close()关闭,否则可能导致fd被耗尽。
epoll_ctl:执行对需要监听的文件描述符(fd)的操作
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
用于执行对需要监听的文件描述符(fd)的操作,比如EPOLL_CTL_ADD
fd:需要监听的文件描述符;
epoll_event:需要监听的事件
epoll_wait:等待事件的到来
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
等待事件的到来,有三种情况会触发返回
1. 发生错误 返回值为负数
2. 等待超时(timeout)
3. 监测到某些文件句柄上有事件发生
epoll和eventfd结合使用
eventfd中无数据,线程触发epoll_wait()的等待,该线程处于阻塞,另外一个线程通过往eventfd中write数据,使描述符可读,epoll返回,这就达到了唤醒的目的。_
我们先来回顾下消息机制中Java层MessageQueue的往队列里加消息和从队列里面取消息的调用
//android.os.MessageQueue#enqueueMessage
boolean enqueueMessage(Message msg, long when) {
......
synchronized (this) {
msg.markInUse();
msg.when = when;
Message p = mMessages;
boolean needWake;
//如果消息链表为空,或者插入的Message比消息链表第一个消息要执行的更早,直接插入到头部
if (p == null || when == 0 || when
//android.os.MessageQueue#next
Message next() {
//native层NativeMessageQueue的指针
final long ptr = mPtr;
if (ptr == 0) {
return null;
}
......
for (;;) {
//阻塞操作,当等待nextPollTimeoutMillis时长,或者消息队列被唤醒
//nativePollOnce用于“等待”, 直到下一条消息可用为止. 如果在此调用期间花费的时间很长, 表明对应线程没有实际工作要做,或者Native层的message有耗时的操作在执行
nativePollOnce(ptr, nextPollTimeoutMillis);
}
可以看到在MessageQueue#next会调用课程阻塞的native方法nativePollOnce
,在MessageQueue#enqueueMessage 中如果需要唤醒会调用native方法nativeWake
。
问题是怎么阻塞的,怎么唤醒的,为什么要这样设计,直接在Java层完成处理不可以吗?
带着这样的困惑,开始Native层消息机制的分析学习。
图片来自:Android消息机制2-Handler(Native层)
调用Native方法初始化,返回值为native层的NativeMessageQueue指针地址
//android.os.MessageQueue#MessageQueue
MessageQueue(boolean quitAllowed) {
mQuitAllowed = quitAllowed;
//调用Native方法初始化,返回值为native层的NativeMessageQueue指针地址
mPtr = nativeInit();
}
android_os_MessageQueue_nativeInit
//java native方法 nativeInit 的jni方法,返回类型long,即 mptr
static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) {
//NativeMessageQueue是一个内部类
NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();
//返回为NativeMessageQueue指针地址
......
return reinterpret_cast(nativeMessageQueue);
}
NativeMessageQueue构造方法
会进行Native层的Looper创建。Java层创建Looper然后再创建MessageQueue,而在Native层则刚刚相反,先创建NativeMessageQueue然后再创建Looper。
// core/jni/android_os_MessageQueue.cpp
//NativeMessageQueue构造方法
//Java层创建Looper然后再创建MessageQueue,
//而在Native层则刚刚相反,先创建NativeMessageQueue然后再创建Looper
//MessageQueue是在Java层与Native层有着紧密的联系,
//但是Native层的Looper与Java层的Looper没有任何关系
NativeMessageQueue::NativeMessageQueue() :
mPollEnv(NULL), mPollObj(NULL), mExceptionObj(NULL) {
//Looper::getForThread()获取当前线程的Looper,相当于Java层的Looper.myLooper()
mLooper = Looper::getForThread();
if (mLooper == NULL) {
//如果为空, new一个native层的Looper
mLooper = new Looper(false);
//相当于java层的ThreadLocal.set() ?
Looper::setForThread(mLooper);
}
}
Natvie层的Looper的构造
MessageQueue是在Java层与Native层有着紧密的联系,但是Native层的Looper与Java层的Looper没有任何关系
// libutils/Looper.cpp
Looper::Looper(bool allowNonCallbacks)
: mAllowNonCallbacks(allowNonCallbacks),
mSendingMessage(false),
mPolling(false),
mEpollRebuildRequired(false),
mNextRequestSeq(0),
mResponseIndex(0),
mNextMessageUptime(LLONG_MAX) {
//eventfd事件句柄,负责线程通信,替换了之前版本的pipe 构造唤醒事件fd
mWakeEventFd.reset(eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC));
LOG_ALWAYS_FATAL_IF(mWakeEventFd.get()
epoll句柄的创建、添加唤醒事件句柄到epoll
//libutils/Looper.cpp
void Looper::rebuildEpollLocked() {
......
//epoll_create1创建一个epoll句柄实例,并注册wake管道
mEpollFd.reset(epoll_create1(EPOLL_CLOEXEC));
......
//epoll事件结构体
struct epoll_event eventItem;
memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
eventItem.events = EPOLLIN;
eventItem.data.fd = mWakeEventFd.get();
int result = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, mWakeEventFd.get(), &eventItem);
for (size_t i = 0; i
Native层的init流程主要内容如下:
图片来自:Android消息机制2-Handler(Native层)
nativePollOnce
//core/jni/android_os_MessageQueue.cpp
static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,
jlong ptr, jint timeoutMillis) {
//将Java层传过来的mPtr 转换为 nativeMessageQueue
NativeMessageQueue* nativeMessageQueue = reinterpret_cast(ptr);
//调用nativeMessageQueue的pollOnce
nativeMessageQueue->pollOnce(env, obj, timeoutMillis);
}
NativeMessageQueue :: pollOnce
//core/jni/android_os_MessageQueue.cpp
void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) {
......
//又调用了Natvie的Looper的pollOnce
mLooper->pollOnce(timeoutMillis);
......
}
Native层Looper::pollOnce
//libutils/Looper.cpp
/**
* timeoutMillis:超时时长
* outFd:发生事件的文件描述符
* outEvents:当前outFd上发生的事件,包含以下4类事件
EVENT_INPUT:可读
EVENT_OUTPUT:可写
EVENT_ERROR:错误
EVENT_HANGUP:中断
*outData:上下文数据
**/
int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
int result = 0;
for (;;) {
......
//关键实现在pollInner中
result = pollInner(timeoutMillis);
}
}
Looper::pollInner
先会调用epoll_wait进入阻塞专题,唤醒的场景 想epoll中添加的epollevent等待事件发生或者超时触发nativeWake()方法,会向eventfd写入字符,进行唤醒。
然后进性要处理的事件收集,然后在做处理。Natvie的消息的处理顺序如下
//libutils/Looper.cpp
int Looper::pollInner(int timeoutMillis) {
......
// Poll.
int result = POLL_WAKE;
mResponses.clear();
mRespOnseIndex= 0;
// We are about to idle.
mPolling = true;
struct epoll_event eventItems[EPOLL_MAX_EVENTS];
//等待事件发生或者超时触发nativeWake()方法,会向eventfd写入字符
int eventCount = epoll_wait(mEpollFd.get(), eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
// No longer idling.
mPolling = false;
// Acquire lock.
mLock.lock();
// Rebuild epoll set if needed.
if (mEpollRebuildRequired) {
mEpollRebuildRequired = false;
rebuildEpollLocked();
goto Done;
}
// Check for poll error.
if (eventCount = 0) {
int events = 0;
if (epollEvents & EPOLLIN) events |= EVENT_INPUT;
if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT;
if (epollEvents & EPOLLERR) events |= EVENT_ERROR;
if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP;
pushResponse(events, mRequests.valueAt(requestIndex));
}
}
}
//上面是收集,Done中是处理的部分,很多设计都是采用这种设计,逻辑分离
Done: ;
//1. 先处理Native的Message,调用Native的Handler来处理该Message
mNextMessageUptime = LLONG_MAX;
while (mMessageEnvelopes.size() != 0) {
......
if (messageEnvelope.uptime handleMessage(message);
}
} else {
mNextMessageUptime = messageEnvelope.uptime;
break;
}
}
//2. 再处理Resposne数组,POLL_CALLBACK类型的事件,比如一些
// Invoke all response callbacks.
for (size_t i = 0; i handleEvent(fd, events, data);
}
}
return result;
}
消息获取流程主要如下
图片来自:Android消息机制2-Handler(Native层)
nativeWake
//core/jni/android_os_MessageQueue.cpp
static void android_os_MessageQueue_nativeWake(JNIEnv* env, jclass clazz, jlong ptr) {
NativeMessageQueue* nativeMessageQueue = reinterpret_cast(ptr);
//调用NativeMessageQueue的wake
nativeMessageQueue->wake();
}
NativeMessageQueue::wake
//core/jni/android_os_MessageQueue.cpp
void NativeMessageQueue::wake() {
//调用Native层Looper的wake
mLooper->wake();
}
Looper::wake
libutils/Looper.cpp
void Looper::wake() {
//write方法 向mWakeEventfd中写一些内容,会把epoll_wait唤醒,线程就不阻塞了继续发送
//Native层的消息,然后处理之前的addFd事件,然后处理Java层的消息
ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd.get(), &inc, sizeof(uint64_t)));
......
}
消息发送流程如下
Android消息机制的分析到这里就结束了,最后来张消息处理家族类的关系图
图片来自:图书:《深入理解Android 卷3》
感谢你的阅读
Android消息机制就到这篇了,下篇我们进入Binder的分析学习,欢迎关注公众号“音视频开发之旅”,一起学习成长。
欢迎交流