HandlerEpoll

Looper的阻塞唤醒

简述:通过pipe/epoll机制,实现MessageQueue.next的无消息时阻塞,有消息时唤醒,pipe

原理

具体来说,当Looper在处理消息时,如果消息队列为空,那么它会调用MessageQueue的next方法来等待新的消息到来,通过Linux内核的epoll机制阻塞线程,等待新的消息到来。在阻塞期间,线程处于睡眠状态,不会占用CPU资源;当新的消息到来时,MessageQueue对象会将消息加入队列,并通知Looper对象,从而唤醒线程并继续执行消息的分发和处理。

epoll机制是Linux内核提供的一种高效的I/O多路复用机制,可以在多个文件描述符上等待,并在其中任何一个文件描述符有事件到达时立即返回,从而实现高效的I/O事件处理。

epoll机制主要分为以下三个步骤:

  1. 创建epoll句柄:在使用epoll机制之前,首先需要创建一个epoll句柄。这可以通过调用epoll_create函数来完成,它会返回一个整型的文件描述符,可以用于后续的epoll操作。
  2. 添加文件描述符到epoll:将需要监听的文件描述符添加到epoll中,可以通过调用epoll_ctl函数来完成。在添加文件描述符时,需要指定文件描述符的类型(例如管道、socket等)、事件类型(例如读事件、写事件等)以及回调函数等信息。
  3. 等待事件到达:等待事件到达是epoll机制的核心。可以通过调用epoll_wait函数来等待文件描述符上的事件。该函数会阻塞,直到有文件描述符上有事件到达或者超时时间到达。当有事件到达时,函数会立即返回,返回值为就绪文件描述符的个数,同时将就绪文件描述符的信息填充到一个事件数组中。

在处理就绪事件时,可以遍历事件数组,并根据每个事件的类型来进行相应的处理。例如,如果是读事件,可以使用read函数来读取文件描述符上的数据。

总之,通过使用epoll机制,可以高效地处理多个文件描述符上的I/O事件,并及时响应事件的到达。这种机制在Linux系统中得到了广泛的应用,包括网络编程、图形界面等领域。

为什么不用wait和notify实现Looper的阻塞唤醒机制

Android的Handler Framework中的Looper的阻塞唤醒机制是依靠Linux内核的轮询机制来实现的,而不是使用Java中的wait和notify方法。

在Android系统中,每个线程都有一个EventLoop,它由Looper来管理。Looper使用Linux内核的poll系统调用来阻塞线程,同时在有消息到来时唤醒线程。poll是Linux内核中的一个系统调用,它可以用来等待文件描述符的状态变化,比如数据是否可读或可写等。Looper将MessageQueue封装成一个管道(pipe),然后将管道的读端口添加到poll的监听列表中。当消息到达时,MessageQueue就会将消息写入管道的写端口,从而使得poll返回。然后Looper就可以从管道中读取消息,进行相应的处理。

使用Linux内核的poll系统调用可以避免使用Java中的wait和notify方法的缺陷,例如不会发生死锁、不会占用CPU资源等。此外,这种机制还可以支持更多的线程,因为每个线程都有自己的EventLoop和MessageQueue,它们之间相互独立,不会发生竞争问题。

Native Framework

img

早期的Android MessageQueue底层是select实现Looper唤醒的,后面由于select效率在频繁调用的情况下不如Linux 2.6内核正式引入的epoll

IO 多路复用的需求和原理。

IO 多路复用就是 1 个线程处理 多个 fd 的模式。我们的要求是:这个 “1” 就要尽可能的快,避免一切无效工作,要把所有的时间都用在处理句柄的 IO 上,不能有任何空转,sleep 的时间浪费。

img

这 3 种都能够管理 fd 的可读可写事件,在所有 fd 不可读不可写无所事事的时候,可以阻塞线程,切走 cpu 。fd 有情况的时候,都要线程能够要能被唤醒。

img

表 1. select、poll和epoll三种I/O复用模式的比较( 摘录自《linux高性能服务器编程》)
系统调用 select poll epoll
事件集合 通过3个参数分别传入感兴趣的可读,可写及异常等事件内核通过对这些参数的在线修改来反馈其中的就绪事件这使得用户每次调用select都要重置这3个参数 统一处理所有事件类型,因此只需要一个事件集参数。用户通过pollfd.events传入感兴趣的事件,内核通过修改pollfd.revents反馈其中就绪的事件 内核通过一个事件表直接管理用户感兴趣的所有事件。因此每次调用epoll_wait时,无需反复传入用户感兴趣的事件。epoll_wait系统调用的参数events仅用来反馈就绪的事件
应用程序索引就绪文件描述符的时间复杂度 O(n) O(n) O(1)
最大支持文件描述符数 一般有最大值限制 65535 65535
工作模式 LT LT 支持ET高效模式
内核实现和工作效率 采用轮询方式检测就绪事件,时间复杂度:O(n) 采用轮询方式检测就绪事件,时间复杂度:O(n) 采用回调方式检测就绪事件,时间复杂度:O(1)

select

在loop的一次循环中sleep 一段时间(cpu释放出去),在定时到达后进入下一次循环执行任务,此次任务中会遍历所有fd,查询哪个已经ready,效率很差

poll

相比于select机制,poll只是取消了最大监控文件描述符数限制,并没有从根本上解决select存在的问题。

poll和select库,它们的最大的问题就在于效率。它们的处理方式都是创建一个事件列表,然后把这个列表发给内核,返回的时候,再去轮询检查这个列表,这样在描述符比较多的应用中,效率就显得比较低下了。

epoll

执行完所有任务之后进入休眠,

挂了个钩子,设置了唤醒的回调路径。epoll 跟底层对接的回调函数是:ep_poll_callback,这个函数其实很简单,做两件事情:

  1. 把事件就绪的 fd 对应的结构体放到一个特定的队列(就绪队列,ready list);
  2. 唤醒 epoll ,活来啦!

当 fd 满足可读可写的时候就会经过层层回调,最终调用到这个回调函数,把对应 fd 的结构体放入就绪队列中,从而把 epoll 从 epoll_wait 出唤醒。

由epoll_wait() 唤醒的时候已经精准地拿到了ready的fd数组了,于是就能直接执行

poll是翻译轮询的意思,我们可以看到poll和epoll都有轮询的过程。
不同点在于:
poll轮询的是所有的socket。
而epoll只轮询就绪的socket

epoll是一种比较好的做法,它把描述符列表交给内核,一旦有事件发生,内核把发生事件的描述符列表通知给进程,这样就避免了轮询整个描述符列表。

epoll 之所以做到了高效,最关键的两点:

  1. 内部管理 fd 使用了高效的红黑树结构管理,做到了增删改之后性能的优化和平衡;
  2. epoll 池添加 fd 的时候,调用 file_operations->poll ,把这个 fd 就绪之后的回调路径安排好。通过事件通知的形式,做到最高效的运行;
  3. epoll 池核心的两个数据结构:红黑树和就绪列表。红黑树是为了应对用户的增删改需求,就绪列表是 fd 事件就绪之后放置的特殊地点,epoll 池只需要遍历这个就绪链表,就能给用户返回所有已经就绪的 fd 数组;

Java Framework

android-handler-looper-architecture-diagram

附录

NativeMessageQueue.cpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
/*
* Copyright (C) 2010 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#define LOG_TAG "MessageQueue-JNI"

#include <nativehelper/JNIHelp.h>
#include <android_runtime/AndroidRuntime.h>

#include <utils/Looper.h>
#include <utils/Log.h>
#include "android_os_MessageQueue.h"

#include "core_jni_helpers.h"

namespace android {

static struct {
jfieldID mPtr; // native object attached to the DVM MessageQueue
jmethodID dispatchEvents;
} gMessageQueueClassInfo;

// Must be kept in sync with the constants in Looper.FileDescriptorCallback
static const int CALLBACK_EVENT_INPUT = 1 << 0;
static const int CALLBACK_EVENT_OUTPUT = 1 << 1;
static const int CALLBACK_EVENT_ERROR = 1 << 2;


class NativeMessageQueue : public MessageQueue, public LooperCallback {
public:
NativeMessageQueue();
virtual ~NativeMessageQueue();

virtual void raiseException(JNIEnv* env, const char* msg, jthrowable exceptionObj);

void pollOnce(JNIEnv* env, jobject obj, int timeoutMillis);
void wake();
void setFileDescriptorEvents(int fd, int events);

virtual int handleEvent(int fd, int events, void* data);

private:
JNIEnv* mPollEnv;
jobject mPollObj;
jthrowable mExceptionObj;
};


MessageQueue::MessageQueue() {
}

MessageQueue::~MessageQueue() {
}

bool MessageQueue::raiseAndClearException(JNIEnv* env, const char* msg) {
if (env->ExceptionCheck()) {
jthrowable exceptionObj = env->ExceptionOccurred();
env->ExceptionClear();
raiseException(env, msg, exceptionObj);
env->DeleteLocalRef(exceptionObj);
return true;
}
return false;
}

NativeMessageQueue::NativeMessageQueue() :
mPollEnv(NULL), mPollObj(NULL), mExceptionObj(NULL) {
mLooper = Looper::getForThread();
if (mLooper == NULL) {
mLooper = new Looper(false);
Looper::setForThread(mLooper);
}
}

NativeMessageQueue::~NativeMessageQueue() {
}

void NativeMessageQueue::raiseException(JNIEnv* env, const char* msg, jthrowable exceptionObj) {
if (exceptionObj) {
if (mPollEnv == env) {
if (mExceptionObj) {
env->DeleteLocalRef(mExceptionObj);
}
mExceptionObj = jthrowable(env->NewLocalRef(exceptionObj));
ALOGE("Exception in MessageQueue callback: %s", msg);
jniLogException(env, ANDROID_LOG_ERROR, LOG_TAG, exceptionObj);
} else {
ALOGE("Exception: %s", msg);
jniLogException(env, ANDROID_LOG_ERROR, LOG_TAG, exceptionObj);
LOG_ALWAYS_FATAL("raiseException() was called when not in a callback, exiting.");
}
}
}

void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) {
mPollEnv = env;
mPollObj = pollObj;
mLooper->pollOnce(timeoutMillis);
mPollObj = NULL;
mPollEnv = NULL;

if (mExceptionObj) {
env->Throw(mExceptionObj);
env->DeleteLocalRef(mExceptionObj);
mExceptionObj = NULL;
}
}

void NativeMessageQueue::wake() {
mLooper->wake();
}

void NativeMessageQueue::setFileDescriptorEvents(int fd, int events) {
if (events) {
int looperEvents = 0;
if (events & CALLBACK_EVENT_INPUT) {
looperEvents |= Looper::EVENT_INPUT;
}
if (events & CALLBACK_EVENT_OUTPUT) {
looperEvents |= Looper::EVENT_OUTPUT;
}
mLooper->addFd(fd, Looper::POLL_CALLBACK, looperEvents, this,
reinterpret_cast<void*>(events));
} else {
mLooper->removeFd(fd);
}
}

int NativeMessageQueue::handleEvent(int fd, int looperEvents, void* data) {
int events = 0;
if (looperEvents & Looper::EVENT_INPUT) {
events |= CALLBACK_EVENT_INPUT;
}
if (looperEvents & Looper::EVENT_OUTPUT) {
events |= CALLBACK_EVENT_OUTPUT;
}
if (looperEvents & (Looper::EVENT_ERROR | Looper::EVENT_HANGUP | Looper::EVENT_INVALID)) {
events |= CALLBACK_EVENT_ERROR;
}
int oldWatchedEvents = reinterpret_cast<intptr_t>(data);
int newWatchedEvents = mPollEnv->CallIntMethod(mPollObj,
gMessageQueueClassInfo.dispatchEvents, fd, events);
if (!newWatchedEvents) {
return 0; // unregister the fd
}
if (newWatchedEvents != oldWatchedEvents) {
setFileDescriptorEvents(fd, newWatchedEvents);
}
return 1;
}


// ----------------------------------------------------------------------------

sp<MessageQueue> android_os_MessageQueue_getMessageQueue(JNIEnv* env, jobject messageQueueObj) {
jlong ptr = env->GetLongField(messageQueueObj, gMessageQueueClassInfo.mPtr);
return reinterpret_cast<NativeMessageQueue*>(ptr);
}

static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) {
NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();
if (!nativeMessageQueue) {
jniThrowRuntimeException(env, "Unable to allocate native queue");
return 0;
}

nativeMessageQueue->incStrong(env);
return reinterpret_cast<jlong>(nativeMessageQueue);
}

static void android_os_MessageQueue_nativeDestroy(JNIEnv* env, jclass clazz, jlong ptr) {
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
nativeMessageQueue->decStrong(env);
}

static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,
jlong ptr, jint timeoutMillis) {
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
nativeMessageQueue->pollOnce(env, obj, timeoutMillis);
}

static void android_os_MessageQueue_nativeWake(JNIEnv* env, jclass clazz, jlong ptr) {
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
nativeMessageQueue->wake();
}

static jboolean android_os_MessageQueue_nativeIsPolling(JNIEnv* env, jclass clazz, jlong ptr) {
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
return nativeMessageQueue->getLooper()->isPolling();
}

static void android_os_MessageQueue_nativeSetFileDescriptorEvents(JNIEnv* env, jclass clazz,
jlong ptr, jint fd, jint events) {
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
nativeMessageQueue->setFileDescriptorEvents(fd, events);
}

// ----------------------------------------------------------------------------

static const JNINativeMethod gMessageQueueMethods[] = {
/* name, signature, funcPtr */
{ "nativeInit", "()J", (void*)android_os_MessageQueue_nativeInit },
{ "nativeDestroy", "(J)V", (void*)android_os_MessageQueue_nativeDestroy },
{ "nativePollOnce", "(JI)V", (void*)android_os_MessageQueue_nativePollOnce },
{ "nativeWake", "(J)V", (void*)android_os_MessageQueue_nativeWake },
{ "nativeIsPolling", "(J)Z", (void*)android_os_MessageQueue_nativeIsPolling },
{ "nativeSetFileDescriptorEvents", "(JII)V",
(void*)android_os_MessageQueue_nativeSetFileDescriptorEvents },
};

int register_android_os_MessageQueue(JNIEnv* env) {
int res = RegisterMethodsOrDie(env, "android/os/MessageQueue", gMessageQueueMethods,
NELEM(gMessageQueueMethods));

jclass clazz = FindClassOrDie(env, "android/os/MessageQueue");
gMessageQueueClassInfo.mPtr = GetFieldIDOrDie(env, clazz, "mPtr", "J");
gMessageQueueClassInfo.dispatchEvents = GetMethodIDOrDie(env, clazz,
"dispatchEvents", "(II)I");

return res;
}

} // namespace android

Looper.cpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
//
// Copyright 2010 The Android Open Source Project
//
// A looper implementation based on epoll().
//
#define LOG_TAG "Looper"

//#define LOG_NDEBUG 0

// Debugs poll and wake interactions.
#define DEBUG_POLL_AND_WAKE 0

// Debugs callback registration and invocation.
#define DEBUG_CALLBACKS 0

#include <cutils/log.h>
#include <utils/Looper.h>
#include <utils/Timers.h>

#include <unistd.h>
#include <fcntl.h>
#include <limits.h>


namespace android {

// --- WeakMessageHandler ---

WeakMessageHandler::WeakMessageHandler(const wp<MessageHandler>& handler) :
mHandler(handler) {
}

WeakMessageHandler::~WeakMessageHandler() {
}

void WeakMessageHandler::handleMessage(const Message& message) {
sp<MessageHandler> handler = mHandler.promote();
if (handler != NULL) {
handler->handleMessage(message);
}
}


// --- SimpleLooperCallback ---

SimpleLooperCallback::SimpleLooperCallback(ALooper_callbackFunc callback) :
mCallback(callback) {
}

SimpleLooperCallback::~SimpleLooperCallback() {
}

int SimpleLooperCallback::handleEvent(int fd, int events, void* data) {
return mCallback(fd, events, data);
}


// --- Looper ---

// Hint for number of file descriptors to be associated with the epoll instance.
static const int EPOLL_SIZE_HINT = 8;

// Maximum number of file descriptors for which to retrieve poll events each iteration.
static const int EPOLL_MAX_EVENTS = 16;

static pthread_once_t gTLSOnce = PTHREAD_ONCE_INIT;
static pthread_key_t gTLSKey = 0;

Looper::Looper(bool allowNonCallbacks) :
mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false),
mResponseIndex(0), mNextMessageUptime(LLONG_MAX) {
int wakeFds[2];
int result = pipe(wakeFds);
LOG_ALWAYS_FATAL_IF(result != 0, "Could not create wake pipe. errno=%d", errno);

mWakeReadPipeFd = wakeFds[0];
mWakeWritePipeFd = wakeFds[1];

result = fcntl(mWakeReadPipeFd, F_SETFL, O_NONBLOCK);
LOG_ALWAYS_FATAL_IF(result != 0, "Could not make wake read pipe non-blocking. errno=%d",
errno);

result = fcntl(mWakeWritePipeFd, F_SETFL, O_NONBLOCK);
LOG_ALWAYS_FATAL_IF(result != 0, "Could not make wake write pipe non-blocking. errno=%d",
errno);

// Allocate the epoll instance and register the wake pipe.
mEpollFd = epoll_create(EPOLL_SIZE_HINT);
LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance. errno=%d", errno);

struct epoll_event eventItem;
memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
eventItem.events = EPOLLIN;
eventItem.data.fd = mWakeReadPipeFd;
result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeReadPipeFd, & eventItem);
LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake read pipe to epoll instance. errno=%d",
errno);
}

Looper::~Looper() {
close(mWakeReadPipeFd);
close(mWakeWritePipeFd);
close(mEpollFd);
}

void Looper::initTLSKey() {
int result = pthread_key_create(& gTLSKey, threadDestructor);
LOG_ALWAYS_FATAL_IF(result != 0, "Could not allocate TLS key.");
}

void Looper::threadDestructor(void *st) {
Looper* const self = static_cast<Looper*>(st);
if (self != NULL) {
self->decStrong((void*)threadDestructor);
}
}

void Looper::setForThread(const sp<Looper>& looper) {
sp<Looper> old = getForThread(); // also has side-effect of initializing TLS

if (looper != NULL) {
looper->incStrong((void*)threadDestructor);
}

pthread_setspecific(gTLSKey, looper.get());

if (old != NULL) {
old->decStrong((void*)threadDestructor);
}
}

sp<Looper> Looper::getForThread() {
int result = pthread_once(& gTLSOnce, initTLSKey);
LOG_ALWAYS_FATAL_IF(result != 0, "pthread_once failed");

return (Looper*)pthread_getspecific(gTLSKey);
}

sp<Looper> Looper::prepare(int opts) {
bool allowNonCallbacks = opts & ALOOPER_PREPARE_ALLOW_NON_CALLBACKS;
sp<Looper> looper = Looper::getForThread();
if (looper == NULL) {
looper = new Looper(allowNonCallbacks);
Looper::setForThread(looper);
}
if (looper->getAllowNonCallbacks() != allowNonCallbacks) {
ALOGW("Looper already prepared for this thread with a different value for the "
"ALOOPER_PREPARE_ALLOW_NON_CALLBACKS option.");
}
return looper;
}

bool Looper::getAllowNonCallbacks() const {
return mAllowNonCallbacks;
}

int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
int result = 0;
for (;;) {
while (mResponseIndex < mResponses.size()) {
const Response& response = mResponses.itemAt(mResponseIndex++);
int ident = response.request.ident;
if (ident >= 0) {
int fd = response.request.fd;
int events = response.events;
void* data = response.request.data;
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - returning signalled identifier %d: "
"fd=%d, events=0x%x, data=%p",
this, ident, fd, events, data);
#endif
if (outFd != NULL) *outFd = fd;
if (outEvents != NULL) *outEvents = events;
if (outData != NULL) *outData = data;
return ident;
}
}

if (result != 0) {
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - returning result %d", this, result);
#endif
if (outFd != NULL) *outFd = 0;
if (outEvents != NULL) *outEvents = 0;
if (outData != NULL) *outData = NULL;
return result;
}

result = pollInner(timeoutMillis);
}
}

int Looper::pollInner(int timeoutMillis) {
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - waiting: timeoutMillis=%d", this, timeoutMillis);
#endif

// Adjust the timeout based on when the next message is due.
if (timeoutMillis != 0 && mNextMessageUptime != LLONG_MAX) {
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
int messageTimeoutMillis = toMillisecondTimeoutDelay(now, mNextMessageUptime);
if (messageTimeoutMillis >= 0
&& (timeoutMillis < 0 || messageTimeoutMillis < timeoutMillis)) {
timeoutMillis = messageTimeoutMillis;
}
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - next message in %lldns, adjusted timeout: timeoutMillis=%d",
this, mNextMessageUptime - now, timeoutMillis);
#endif
}

// Poll.
int result = ALOOPER_POLL_WAKE;
mResponses.clear();
mResponseIndex = 0;

struct epoll_event eventItems[EPOLL_MAX_EVENTS];
int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);

// Acquire lock.
mLock.lock();

// Check for poll error.
if (eventCount < 0) {
if (errno == EINTR) {
goto Done;
}
ALOGW("Poll failed with an unexpected error, errno=%d", errno);
result = ALOOPER_POLL_ERROR;
goto Done;
}

// Check for poll timeout.
if (eventCount == 0) {
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - timeout", this);
#endif
result = ALOOPER_POLL_TIMEOUT;
goto Done;
}

// Handle all events.
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - handling events from %d fds", this, eventCount);
#endif

for (int i = 0; i < eventCount; i++) {
int fd = eventItems[i].data.fd;
uint32_t epollEvents = eventItems[i].events;
if (fd == mWakeReadPipeFd) {
if (epollEvents & EPOLLIN) {
awoken();
} else {
ALOGW("Ignoring unexpected epoll events 0x%x on wake read pipe.", epollEvents);
}
} else {
ssize_t requestIndex = mRequests.indexOfKey(fd);
if (requestIndex >= 0) {
int events = 0;
if (epollEvents & EPOLLIN) events |= ALOOPER_EVENT_INPUT;
if (epollEvents & EPOLLOUT) events |= ALOOPER_EVENT_OUTPUT;
if (epollEvents & EPOLLERR) events |= ALOOPER_EVENT_ERROR;
if (epollEvents & EPOLLHUP) events |= ALOOPER_EVENT_HANGUP;
pushResponse(events, mRequests.valueAt(requestIndex));
} else {
ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is "
"no longer registered.", epollEvents, fd);
}
}
}
Done: ;

// Invoke pending message callbacks.
mNextMessageUptime = LLONG_MAX;
while (mMessageEnvelopes.size() != 0) {
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);
if (messageEnvelope.uptime <= now) {
// Remove the envelope from the list.
// We keep a strong reference to the handler until the call to handleMessage
// finishes. Then we drop it so that the handler can be deleted *before*
// we reacquire our lock.
{ // obtain handler
sp<MessageHandler> handler = messageEnvelope.handler;
Message message = messageEnvelope.message;
mMessageEnvelopes.removeAt(0);
mSendingMessage = true;
mLock.unlock();

#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS
ALOGD("%p ~ pollOnce - sending message: handler=%p, what=%d",
this, handler.get(), message.what);
#endif
handler->handleMessage(message);
} // release handler

mLock.lock();
mSendingMessage = false;
result = ALOOPER_POLL_CALLBACK;
} else {
// The last message left at the head of the queue determines the next wakeup time.
mNextMessageUptime = messageEnvelope.uptime;
break;
}
}

// Release lock.
mLock.unlock();

// Invoke all response callbacks.
for (size_t i = 0; i < mResponses.size(); i++) {
Response& response = mResponses.editItemAt(i);
if (response.request.ident == ALOOPER_POLL_CALLBACK) {
int fd = response.request.fd;
int events = response.events;
void* data = response.request.data;
#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS
ALOGD("%p ~ pollOnce - invoking fd event callback %p: fd=%d, events=0x%x, data=%p",
this, response.request.callback.get(), fd, events, data);
#endifALOOPER_POLL_CALLBACK
int callbackResult = response.request.callback->handleEvent(fd, events, data);
if (callbackResult == 0) {
removeFd(fd);
}
// Clear the callback reference in the response structure promptly because we
// will not clear the response vector itself until the next poll.
response.request.callback.clear();
result = ALOOPER_POLL_CALLBACK;
}
}
return result;
}

int Looper::pollAll(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
if (timeoutMillis <= 0) {
int result;
do {
result = pollOnce(timeoutMillis, outFd, outEvents, outData);
} while (result == ALOOPER_POLL_CALLBACK);
return result;
} else {
nsecs_t endTime = systemTime(SYSTEM_TIME_MONOTONIC)
+ milliseconds_to_nanoseconds(timeoutMillis);

for (;;) {
int result = pollOnce(timeoutMillis, outFd, outEvents, outData);
if (result != ALOOPER_POLL_CALLBACK) {
return result;
}

nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
timeoutMillis = toMillisecondTimeoutDelay(now, endTime);
if (timeoutMillis == 0) {
return ALOOPER_POLL_TIMEOUT;
}
}
}
}

void Looper::wake() {
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ wake", this);
#endif

ssize_t nWrite;
do {
nWrite = write(mWakeWritePipeFd, "W", 1);
} while (nWrite == -1 && errno == EINTR);

if (nWrite != 1) {
if (errno != EAGAIN) {
ALOGW("Could not write wake signal, errno=%d", errno);
}
}
}

void Looper::awoken() {
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ awoken", this);
#endif

char buffer[16];
ssize_t nRead;
do {
nRead = read(mWakeReadPipeFd, buffer, sizeof(buffer));
} while ((nRead == -1 && errno == EINTR) || nRead == sizeof(buffer));
}

void Looper::pushResponse(int events, const Request& request) {
Response response;
response.events = events;
response.request = request;
mResponses.push(response);
}

int Looper::addFd(int fd, int ident, int events, ALooper_callbackFunc callback, void* data) {
return addFd(fd, ident, events, callback ? new SimpleLooperCallback(callback) : NULL, data);
}

int Looper::addFd(int fd, int ident, int events, const sp<LooperCallback>& callback, void* data) {
#if DEBUG_CALLBACKS
ALOGD("%p ~ addFd - fd=%d, ident=%d, events=0x%x, callback=%p, data=%p", this, fd, ident,
events, callback.get(), data);
#endif

if (!callback.get()) {
if (! mAllowNonCallbacks) {
ALOGE("Invalid attempt to set NULL callback but not allowed for this looper.");
return -1;
}

if (ident < 0) {
ALOGE("Invalid attempt to set NULL callback with ident < 0.");
return -1;
}
} else {
ident = ALOOPER_POLL_CALLBACK;
}

int epollEvents = 0;
if (events & ALOOPER_EVENT_INPUT) epollEvents |= EPOLLIN;
if (events & ALOOPER_EVENT_OUTPUT) epollEvents |= EPOLLOUT;

{ // acquire lock
AutoMutex _l(mLock);

Request request;
request.fd = fd;
request.ident = ident;
request.callback = callback;
request.data = data;

struct epoll_event eventItem;
memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
eventItem.events = epollEvents;
eventItem.data.fd = fd;

ssize_t requestIndex = mRequests.indexOfKey(fd);
if (requestIndex < 0) {
int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, fd, & eventItem);
if (epollResult < 0) {
ALOGE("Error adding epoll events for fd %d, errno=%d", fd, errno);
return -1;
}
mRequests.add(fd, request);
} else {
int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_MOD, fd, & eventItem);
if (epollResult < 0) {
ALOGE("Error modifying epoll events for fd %d, errno=%d", fd, errno);
return -1;
}
mRequests.replaceValueAt(requestIndex, request);
}
} // release lock
return 1;
}

int Looper::removeFd(int fd) {
#if DEBUG_CALLBACKS
ALOGD("%p ~ removeFd - fd=%d", this, fd);
#endif

{ // acquire lock
AutoMutex _l(mLock);
ssize_t requestIndex = mRequests.indexOfKey(fd);
if (requestIndex < 0) {
return 0;
}

int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_DEL, fd, NULL);
if (epollResult < 0) {
ALOGE("Error removing epoll events for fd %d, errno=%d", fd, errno);
return -1;
}

mRequests.removeItemsAt(requestIndex);
} // release lock
return 1;
}

void Looper::sendMessage(const sp<MessageHandler>& handler, const Message& message) {
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
sendMessageAtTime(now, handler, message);
}

void Looper::sendMessageDelayed(nsecs_t uptimeDelay, const sp<MessageHandler>& handler,
const Message& message) {
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
sendMessageAtTime(now + uptimeDelay, handler, message);
}

void Looper::sendMessageAtTime(nsecs_t uptime, const sp<MessageHandler>& handler,
const Message& message) {
#if DEBUG_CALLBACKS
ALOGD("%p ~ sendMessageAtTime - uptime=%lld, handler=%p, what=%d",
this, uptime, handler.get(), message.what);
#endif

size_t i = 0;
{ // acquire lock
AutoMutex _l(mLock);

size_t messageCount = mMessageEnvelopes.size();
while (i < messageCount && uptime >= mMessageEnvelopes.itemAt(i).uptime) {
i += 1;
}

MessageEnvelope messageEnvelope(uptime, handler, message);
mMessageEnvelopes.insertAt(messageEnvelope, i, 1);

// Optimization: If the Looper is currently sending a message, then we can skip
// the call to wake() because the next thing the Looper will do after processing
// messages is to decide when the next wakeup time should be. In fact, it does
// not even matter whether this code is running on the Looper thread.
if (mSendingMessage) {
return;
}
} // release lock

// Wake the poll loop only when we enqueue a new message at the head.
if (i == 0) {
wake();
}
}

void Looper::removeMessages(const sp<MessageHandler>& handler) {
#if DEBUG_CALLBACKS
ALOGD("%p ~ removeMessages - handler=%p", this, handler.get());
#endif

{ // acquire lock
AutoMutex _l(mLock);

for (size_t i = mMessageEnvelopes.size(); i != 0; ) {
const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(--i);
if (messageEnvelope.handler == handler) {
mMessageEnvelopes.removeAt(i);
}
}
} // release lock
}

void Looper::removeMessages(const sp<MessageHandler>& handler, int what) {
#if DEBUG_CALLBACKS
ALOGD("%p ~ removeMessages - handler=%p, what=%d", this, handler.get(), what);
#endif

{ // acquire lock
AutoMutex _l(mLock);

for (size_t i = mMessageEnvelopes.size(); i != 0; ) {
const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(--i);
if (messageEnvelope.handler == handler
&& messageEnvelope.message.what == what) {
mMessageEnvelopes.removeAt(i);
}
}
} // release lock
}

} // namespace android
Author

white crow

Posted on

2022-01-24

Updated on

2024-03-25

Licensed under