Handler源码分析之三 Native层实现机制

7/9/2021 Android源码Handler

前面两章我们讲解了Handler的基本使用,以及异步消息的作用,本章我们深入理解一下Handler的native层,我们在第一章知道Handler的构建需要先创建Looper,而创建Looper需要先创建MessageQueue,我们直接来看MessageQueue的构造,顺便放上native相关的变量。

# 1 构造函数

package android.os;
public final class MessageQueue {
    private long mPtr; //这个mPtr是native层返回的
    private native static long nativeInit();
    private native static void nativeDestroy(long ptr);
    private native void nativePollOnce(long ptr, int timeoutMillis);
    private native static void nativeWake(long ptr);
    private native static boolean nativeIsPolling(long ptr);
    private native static void nativeSetFileDescriptorEvents(long ptr, int fd, int events);
    MessageQueue(boolean quitAllowed) {
        mQuitAllowed = quitAllowed;
        mPtr = nativeInit(); //这里调用了第一个native函数,对应native层的android_os_MessageQueue.cpp的android_os_MessageQueue_nativeInit,记住这个返回值
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14

我们来看下对应的native函数:

static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) {
    //创建native层的MessageQueue
    NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();
    //...
    //计数器加1
    nativeMessageQueue->incStrong(env);
    //将native层的MessageQueue指针转换成long并返回给java层,也就是java层的mPtr
    return reinterpret_cast<jlong>(nativeMessageQueue);
}

//NativeMessateQueue的构造函数
NativeMessageQueue::NativeMessageQueue():mPollEnv(NULL), mPollObj(NULL), mExceptionObj(NULL) {
    mLooper = Looper::getForThread(); //从当前Thread中获取Looper
    if (mLooper == NULL) { 
        mLooper = new Looper(false); //没有就创建
        Looper::setForThread(mLooper); //放入当前线程
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

接着我们来看native层Looper的构造函数

Looper::Looper(bool allowNonCallbacks): mAllowNonCallbacks(allowNonCallbacks),
    //...建立文件描述符
    mWakeEventFd.reset(eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC));
    //...建立epoll机制
    rebuildEpollLocked();
}

1
2
3
4
5
6
7

# Linux的epoll机制

# 1 关于fd

在linux上,一切都是以文件的形式存在的,设备、驱动等都是文件,比如binder驱动,这些文件都是以文件描述符fd来描述的,fd是一个非负整数,代表一个文件的编号,linux上每一个进程控制块PCB都会有一个文件描述符映射表,fd就是这个表的索引,这个表的每个项有一个指向已打开的文件的指针。所以我们可以将fd理解为文件、驱动、设备等。

# 2 epoll的相关方法

    1. epoll_create: 创建一个epoll对象
    1. epoll_ctl(epollfd, option, fd, event): 这个方法根据参数可以分为很多作用 option可以是下面类型:

EPOLL_CTL_ADD 添加
EPOLL_CTL_DEL 删除
EPOLL_CTL_MOD 修改

event可以是下面类型:

EPOLLIN 文件描述符可读
EPOLLOUT 文件描述符可写
EPOLLERR 文件描述符错误
EPOLLHUP 文件描述符挂起

    1. epoll_wait(epollfd,...): 等待直到epollfd对应的事情发生

# 3 epoll的流程

可以简单的理解为EPOLLIN和EPOLLOUT,EPOLLIN表示可读,EPOLLOUT表示可写,写入端写入数据后,读出端会立刻感应到,从而去读数据,当读出端读出数据后,写入端会立刻感应到,从而去写数据。

接着我们来看一下epoll代码:

void Looper::rebuildEpollLocked() {
    //如果文件描述符已经存在就关闭
    if (mEpollFd >= 0) {
        mEpollFd.reset();
    }
    //创建一个新的epoll文件描述符,并添加到wake管道    
    mEpollFd.reset(epoll_create1(EPOLL_CLOEXEC));

    struct epoll_event eventItem; //创建一个epoll事件eventItem
    memset(& eventItem, 0, sizeof(epoll_event)); //将eventItem清空
    eventItem.events = EPOLLIN; //设置这个事件类型为EPOLLIN,也就是可读事件
    eventItem.data.fd = mWakeEventFd.get(); 设置文件描述符为唤醒事件

    //注册唤醒事件到epoll
    int result = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, mWakeEventFd.get(), &eventItem);

    //注册其他事件
    for (size_t i = 0; i < mRequests.size(); i++) {
        const Request& request = mRequests.valueAt(i);
        struct epoll_event eventItem;
        request.initEventItem(&eventItem);

        int epollResult = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, request.fd, &eventItem);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

到这里我们总结一下:

  • 1 我们要创建Handler;
  • 2 因为创建Handler需要Looper,所以我们去创建Looper;
  • 3 因为创建Looper,需要创建MessageQueue,所以我们去创建MessageQueue;
  • 4 在创建MessageQueue的时候,我们发现他调用了nativeInit();
  • 5 于是我们进入native层的nativeInit();这里开始进入native了
  • 6 我们在nativeInit中发现创建了native的MessageQueue
  • 7 然后我们跟着进入MessageQueue的构造发现创建了native的Looper
  • 8 然后我们进入Looper的构造发现 1 建立文件描述符 2 建立epoll
  • 9 然后我们进入建立epoll的代码发现注册了一个唤醒事件
  • 10 逻辑结束

我们来看post一个消息后发生了什么?

# 2 发消息

我们通过第一章直到发送消息最后是要进入MessageQueue的,直接看enqueue即可

boolean enqueueMessage(Message msg, long when) {
    synchronized (this) {
        //...
        boolean needWake;
        if (p == null || when == 0 || when < p.when) {
            //...
            needWake = mBlocked;
        } else {
            //是否需要唤醒
            needWake = mBlocked && p.target == null && msg.isAsynchronous();
            //...
        }
        //本次逻辑在这里,如果需要唤醒,就调用nativeWake(mPtr),这个mPtr就是构造函数nativeInit()的那个
        if (needWake) {
            nativeWake(mPtr);
        }
    }
    return true;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

我们看下native层的nativeWake():

static void android_os_MessageQueue_nativeWake(JNIEnv* env, jclass clazz, jlong ptr) {
    //根据ptr取出对应的MessageQueue
    NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
    //调用wake()函数
    nativeMessageQueue->wake();
}

void NativeMessageQueue::wake() {
    //直接调用了Looper的wake()
   mLooper->wake();
}

void Looper::wake() {
    uint64_t inc = 1;
    //将唤醒事件的文件描述符mWakeEventFd写入inc,我们通过epoll机制知道,当写入数据的时候,读出端会立刻感知到,
    //也就是说,此时读出端已经知道有数据写入了,也就是有message消息需要处理了,那么在哪处理呢,对了,就在MessageQueue的next()里面,我们去看
    ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd, &inc, sizeof(uint64_t)));
    //...
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

总结一下:

  • 1 我们在Java层send一个message
  • 2 最终会调用到java层的MessageQueue的enqueue中
  • 3 如果需要唤醒就会去调用nativeWake() 这里开始进入native
  • 4 nativeWake最终会调用到native Looper的wake()
  • 5 这个wake()最终写入了一个唤醒事件,用于通知有消息写入
  • 6 那么我们来看java层怎么收到这个消息

# 3 收消息

经过第一章我们知道消息是在Looper.loop()中处理的,又是从MessageQueue.next()中取出的,我们来看MessageQueue.next():

Message next() {
    final long ptr = mPtr;
    if (ptr == 0) {
        return null;
    }
    int nextPollTimeoutMillis = 0;
    for (; ; ) {
        //...
        //ptr就是上面的mPtr,nextPollTimeoutMillis就是休眠时间
        nativePollOnce(ptr, nextPollTimeoutMillis);
        //...
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13

这里大幅简化了代码,可以看到在开始就调用了nativePollOnce(ptr,time)这个函数,我们看下这个函数:

static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj, jlong ptr, jint timeoutMillis) {
    //又是这一行,根据ptr取MessageQueue的
    NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
    //调用MessageQueue的pollOnce()
    nativeMessageQueue->pollOnce(env, obj, timeoutMillis);
}

void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) {
    //...
    //看这句就行,又是调用了Looper的pollOnce,看来native的MessageQueue就是个传话的,真正干事的是Looper,这难道就是传说中的代理模式?
    mLooper->pollOnce(timeoutMillis);
    //...
}

int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
    int result = 0;
    //死循环...
    for (;;) {
        //死循环结束的条件是result != 0,那就是说如果等于0就一直跑
        if (result != 0) {
            //...
            return result;
        }

        //等于0就一直跑这个,那就来看这个,pollInner其实是不会返回0的,那也就是说,只要pollInner有返回,就能结束循环
        result = pollInner(timeoutMillis);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

来看下最核心的pollInner方法,参数就是java层传递的那个nextPollTimeoutMillis参数:

int Looper::pollInner(int timeoutMillis) {    
    //...
    //result的取值有四种: POLL_WAKE = -1 POLL_CALLBACK = -2 POLL_TIMEOUT = -3 POLL_ERROR = -4
    int result = POLL_WAKE; 
    //创建事件集合eventItems,EPOLL_MAX_EVENTS=16
    struct epoll_event eventItems[EPOLL_MAX_EVENTS];
    
    //调用epoll_wait()来等待事件,如果有事件,就放入事件集合eventItems中,并返回事件数量,如果没有,就一直等,超时时间为我们传入的timeoutMillis
    int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);  
    
    //加锁
    mLock.lock();
    
    //如果发生的事件小于0,也就是说没有事件处理,就跳转到Done
    if (eventCount <= 0) {
        if (errno == EINTR) {
            goto Done;
        }
        result = POLL_ERROR;
        goto Done;
    }

    //没有goto到Done,也就是有事件发生,就跑到这里
    //遍历事件集合eventItems
    for (int i = 0; i < eventCount; i++) {
        //取出文件描述符
        int fd = eventItems[i].data.fd;
        //取出事件类型
        uint32_t epollEvents = eventItems[i].events;
        if (fd == mWakeEventFd) {//如果文件描述符为mWakeEventFd
            if (epollEvents & EPOLLIN) {//并且事件类型为EPOLLIN(可读事件)
                //说明有数据可读,就调用awoken()读数据,直到读完为止,
                awoken();
            } else {
                ALOGW("Ignoring unexpected epoll events 0x%x on wake event fd.", epollEvents);
            }
        } else {
            //...
        }
    }
    
    //这里就是Done,是从if(eventCount<=0)跳转过来的
    Done:;
    mNextMessageUptime = LLONG_MAX;
    //mMessageEnvelopes是一个Vector,存放native层的消息
    while (mMessageEnvelopes.size() != 0) {
        nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
        //取出第一个MessageEnvelope,MessageEnvelop有收件人Hanlder和消息内容Message,可以理解为jave层的Message 
        const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);
        //判断消息的执行时间,跟java层的那个 if(message.when <= now)类似
        if (messageEnvelope.uptime <= now) { //表示消息已经到了执行时间
            {
                //取出Handler
                sp<MessageHandler> handler = messageEnvelope.handler;
                //取出Message
                Message message = messageEnvelope.message;
                mMessageEnvelopes.removeAt(0); //删除,因为接下来就处理了
                mSendingMessage = true;
                //释放锁
                mLock.unlock();
                //处理消息
                handler->handleMessage(message);
            }
            mLock.lock();
            mSendingMessage = false;
            result = POLL_CALLBACK;
        } else {//消息还没到执行时间
            mNextMessageUptime = messageEnvelope.uptime;
            //跳出循环,进入下一次轮询,java层是计算等待时间nextPollTimeoutMillis
            break;
        }
    }
    //释放锁
    mLock.unlock();
    //...
    return result;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77

好,来个小结:

  • 1 我们要处理消息,所以先取消息,所以进入了java层的MessageQueue
  • 2 如果当前消息还不到执行,怎么办,就计算还有多久执行,得出timeout,然后调用nativePollOnce(timeout);
  • 3 然后我们进入native的MessageQueue 此处进入native
  • 4 我们一路追踪,发现native的MessageQueue就是个靠嘴吃饭的,毛都没干,最终进入了native的Looper里
  • 5 我们发现Looper的pollOnce()是个死循环,内部调用了pollInner(),只要pollInner()返回,就跳出循环从而回到java层继续向下执行
  • 6 进入pollInner(),我们发现他有三个逻辑
    等待事件,超时时间就是java层传递过来的timeout,如果有事件发生,则会返回发生的事件,并去处理
    有事件发生,就取出事件并处理
    没事件发生,就处理native的Message
    返回result,且result!=0
  • 7 返回到jave层,继续向下处理

这里有几个问题,我们先来捋一下:

  • 1 java层的Message如果不该处理,就会调用nativePollOnce()等待,在代码中哪里等待呢? 答: 就在native Looper的pollOnce里面的epoll_wait(timeout),只要时间不到timeout,就会一直等待,等价于卡在这里,不向下执行
  • 2 如果我的timeout传递-1,就是一直等待,那不就卡死在这里了吗 答: 如果在等待期间,有事件发生,那么会立刻返回事件数目,此时向下执行,发现有事件,就会去读取事件并处理,比如java层enqueue事件的时候,就会跑到nativeWake()里面,此时写入一个唤醒事件,此时 epoll_wait()就会监听到唤醒事件,就会立刻返回,向下执行。
  • 3 如果一直没有事件处理呢? 答: 那么就在等待timeout后,直接返回0,此时就会去执行native的message,代码中的goto Done就是这个逻辑

# 总结

java层通过nativePollOnce(timeout)来实现等待,唤醒的方法要么是过了timeout时间后,超时了,要么是通过nativeWake()来写入一个事件,让native监听到这个事件从而返回。

  • 1 现在我们调用handler.sendMessageDelay(msg,3000);让一个message在3秒后执行。
  • 2 MessageQueue.next()查看此message,发现msg.when > now,还不该执行,于是就将nextPollTimeoutMillis = 3000,并且将mBlockd = true,然后调用nativePollOnce(nextPollTimeoutMillis)去等待,此时native先跑到pollOnce(),result被初始化为0,所以一直在for里面,然后进入pollOnce(),在epoll_wait()死等着。
  • 3 此时我们又调用handler.sendMessage(msg),让一个message立即执行。
  • 4 此时在MessageQueue.enqueueMessage()中,先将这个消息插入到延时3000ms的那个message前面,然后发现when==0,于是就将needwake = mBlocked,也就是true,于是接着就调用了nativeWake(); 于是接着进入native层的Looper的wake()里面写入了一个唤醒事件,此时Looper的epoll_wait()就监听到了,于是立刻返回,处理监听事件后就return result,此时pollOnce()里面因为result!=0,就返回到了java层, 于是MessageQueue的next就立刻活跃起来,马上拿出刚刚sendMessage(msg)的这个msg开始处理,处理完了到下一个(也就是延时3000ms那个),发现msg.when > now,就继续第2步的等待,后面如果还有新消息,就重复前面的过程,没有新消息就等待epoll_wait()超时返回,超时时间是3000ms,返回后跟上述步骤基本一样。
Last Updated: 1/30/2022, 3:38:46 PM