概述
Linux
服务器程序必须处理三类事件:I/O
事件- 信号
- 定时事件
- 在处理这三类事件时通常需要考虑三个问题:
- 统一事件源
- 可移植性
- 对并发编程的支持
I/O框架概述
-
I/O
框架库以库函数的形式,封装了较为底层的系统调用,给应用程序提供了一组方便使用的接口- 这些库函数往往比程序员自己实现的同样功能更合理,更高效,且更健壮
-
各种
I/O
框架库的实现原理基本相似,要么以Reactor
模式实现,要么以Proactor
模式实现,要么同时以这两种实现 -
基于
Reactor
的I/O
框架库包含:- 句柄
- 事件多路分发器
- 事件处理器
- 具体的事件处理器
Reactor
句柄
I/O
框架要处理的对象,即I/O
事件、信号和定时事件,统一称为事件源- 一个事件源通常和一个句柄绑定在一起
- 句柄的作用是,当内核检测到就绪事件时,他将通过句柄来通知应用程序这一事件
- 在
Linux
环境下,I/O
事件对应的句柄是文件描述符,信号事件对应的句柄就是信号值
事件多路分发器
-
事件的到来是随机的、异步的
- 我们无法预知程序何时收到一个客户连接请求,又或者收到一个暂停信号,所以程序需要循环地等待并处理事件,这就是事件循环
-
在事件循环中,等待事件一般使用
I/O
复用技术实现- 而
I/O
框架库一般将系统支持的各种I/O
复用系统调用封装成统一的接口,称为事件多路分发器
- 而
-
事件多路分发器的
demultiplex
方法是等待事件的核心函数,内部调用的是select
、poll
、epoll_wait
函数 -
此外,事件多路分发器还需要实现
register_event
和remove_event
方法,以供调用者往多路分发器中添加事件和从事件多路分发器中删除事件
事件处理器和具体事件处理器
-
事件处理器执行事件对应的业务逻辑,通常包含一个或多个的
handle_event
回调函数,这些回调函数在事件循环中被执行 -
I/O
框架库提供的事件处理器通常是一个接口,用户需要继承它来实现自己的事件处理器,即具体事件处理器- 因此,事件处理器中的回调函数一般被声明为虚函数,以支持用户扩展
-
事件处理器一般还提供一个
get_handle
方法,它返回与该事件处理器关联的句柄- 当事件多路分发器检测到有事件发生时,它是通过句柄来通知应用程序的
- 因此,必须将事件处理器和句柄绑定,才能在事件发生时获取到正确的事件处理器
Reactor
Reactor
是I/O
框架库的核心,提供几个主要方法:handle_events
:用于执行事件循环,重复以下过程:
等待事件
依次处理所有就绪事件对应的事件处理器register_handler
:调用事件多路分发器的register_event
方法来往事件多路分发器中注册一个事件remove_handler
:调用事件多路分发器的remove_event
方法来删除事件多路分发器中的一个事件
Libevent源码分析
理解
event_base
可以理解为一个管理器- 这个管理器可以处理它从属的事件对象
event
- 这个管理器可以处理它从属的事件对象
event
事件对象,里面会有事件消息的类型,如EV_READ
,以及对该消息的回调函数- 也就是说,事件对象将事件消息以及回调函数进行了关联
- 事件对象创建后可以通过
event_add
注册到管理器里面,管理器发现是自己从属的事件对象,就对里面的事件消息类型以及回调函数进行注册 - 管理器有了,事件对象也有了,然后用
event_base_dispatch
将管理器的事件循环跑起来- 管理器就开始一直监控注册的事件消息类型有没有发生,有,就调用对应的回调函数
- 没有发生,如果一直阻塞等待,那效率就不高,所以就需要用
I/O
复用模型,来高效地监控多个文件描述符
确保事件循环在等待事件时不会浪费资源 - 至于
I/O
复用模型,Libevent
把这些常见的I/O
复用机制进行了抽象,提供了统一的接口 - 然后,
Libevent
的事件基(event base
)会在内部使用适当的I/O
复用机制来监控文件描述符上的事件
特点
- 跨平台
- 统一事件源
- 线程安全
- 基于
Reactor
模式
实例
- 调用
event_init
函数创建了event_base
对象
一个event_base
相当于一个Reactor
实例。 - 创建具体的事件处理器,并设置它们所从属的
Reactor
实例
evsignal_new
和evtimer_new
分别用于创建信号事件处理器和定时事件处理器,它们是定义在include/event2/event.h
中的宏,它们的统一入口是event_new
函数,用于创建通用事件处理器- 其中,
base
参数指定新创建的事件处理器从属的Reactor
fd
参数指定与该事件处理器关联的句柄
创建I/O
事件处理器时,应该给fd
参数传递文件描述符值
创建信号事件处理器时,应该给fd
参数传递信号值
创建定时事件处理器时,应该给fd
参数传递-1
event
参数指定事件类型(下图)
EV_PERSIST
的作用是:事件被触发后,自动重新对这个event
调用event_add
函数cb
参数指定目标事件对应的回调函数arg
参数则是Reactor
传递给回调函数的参数
- 其中,
1 2 3 4 |
#define evsignal_new(b, x ,cb, arg) \ event_new((b), (x), EV_SIGNAL|EV_PERSIST, (cb), (arg)) #define evtimer_new(b, cb, arg) event_new((b), -1, 0, (cb), (arg)) |
1 2 3 4 5 |
struct event* event_new(struct event_base* base, evutil_socket_t fd, short events, void (*cb)(evutil_socket_t, short, void*), void * arg); |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
#include <sys/signal.h> #include <event.h> void signal_cb(int fd, short event, void* argc) { struct event_base* base = (event_base*)argc; struct timeval delay = {2,0}; printf("aught an iterrupt signal; exiting cleanly in two seconds\n"); event_base_loopexit(base, &delay); } void timeout_cb(int fd, short event, void* argc) { printf("timeout\n"); } int main() { struct event_base* base = event_init(); struct event* signal_event = evsignal_new(base, SIGINT, signal_cb, base); event_add(signal_event, NULL); timeval tv = {1,0}; struct event* timeout_event = evtimer_new(base, timeout_cb, NULL); event_add(timeout_event, &tv); event_base_dispatch(base); event_free(timeout_event); event_free(signal_event); event_base_free(base); } |
event_new
成功时返回一个event
类型对象,也就是Libevent
的事件处理器libevent
用“event
"描述事件处理器,而不是事件:- 事件指的是一个句柄上绑定的事件,比如文件描述符上的可读事件
- 事件处理器,也就是
event
结构体类型的对象,除了包含事件必备的两个要素(句柄和事件类型)之外,还有很多其他成员,比如回调函数。 - 事件由事件多路分发器管理,事件处理器则由事件队列管理。事件队列包括多种,比如
event_base
中的注册事件队列、活动事件队列和通用定时器队列,以及evmap
中的I/O
事件队列、信号事件队列 - 事件循环对一个被激活事件(就绪事件)的处理,指的是执行该事件对应的事件处理器中回调函数
event_add
,将事件处理器添加到注册事件队列中,并将该事件处理器对应的事件添加到事件多路分发器中
event_add
相当于Reactor
中的register_handler
方法event_base_dispatch
函数来执行事件循环- 事件循环结束后,使用
*_free
系列函数来释放系统资源。
源代码组织架构
include/event2
- 提供给应用程序使用,比如
event.h
提供核心函数,http.h
提供HTTP
协议相关服务,rpc.h
提供了远程过程调用支持 - 根目录下的头文件,一类是对
include/event2
目录下的部分头文件的包装,另一类是供libevent
内部使用的辅助性头文件
- 提供给应用程序使用,比如
compat/sys
queue.h
封装了跨平台的基础数据结构,包括单向链表、双向链表、队列、尾队列和循环队列。
sample
- 示例程序
test
- 测试代码
WIN32-Code
- 提供了
Windows
平台上的一些专用代码
- 提供了
event.c
- 实现了
libevent
的整体框架,主要是event
和event_base
两个结构体相关操作
- 实现了
devpoll.c;kqueue.c;evport.c;select.c;win32select.c;poll.c;epoll.c
- 分别封装了
/dev/poll;kqueue;event_ports;POSIX select;Windows select;poll;epoll
- 这些文件主要内容相似,都是针对
eventop
结构体所定义的接口函数的具体实现
- 分别封装了
minheap-internal.h
- 实现了一个时间堆,以提供对定时时间的支持
signal.c
- 提供了对信号的支持
- 其内容也是针对结构体
eventop
所定义的接口函数的具体实现
evmap.c
- 维护句柄(文件描述符或信号)与事件处理器的映射关系
event_tagging.c
- 提供了往缓冲区中添加标记数据(比如一个整数),以及从缓冲区中读取标记数据的函数
event_iocp.c
- 提供了对
Windows IOCP
(输入输出完成端口)的支持
- 提供了对
buffer*.c
- 提供了对网络
I/O
缓冲的控制,包括:输入输出数据过滤,传输速率限制,使用SSL
协议对应用数据进行保护,以及零拷贝文件传输等
- 提供了对网络
evthread*.c
- 提供了对多线程的支持
listener.c
- 封装了对监听
socket
的操作,包括监听连接和接受连接
- 封装了对监听
logs.c
- 是
libevent
的日志系统
- 是
evutil.c;evutil_rand.c;strlcpy.c;arc4random.c
- 提供了一些基础操作,比如生成随机数、获取
socket
地址信息,读取文件,设置socket
属性等
- 提供了一些基础操作,比如生成随机数、获取
evdns.c;http.c;evrpc.c
- 分别提供了对
DNS
协议、HTTP
协议和RPC
协议的支持
- 分别提供了对
epoll_sub.c
event-internal.h;include/event2/event_struct.h;event.c;evmap.c
- 整个源码中,这
4
个文件最重要
- 整个源码中,这
event结构体
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
struct event { TAILQ_ENTRY(event) ev_active_next; TAILQ_ENTRY(event) ev_next; union { TAILQ_ENTRY(event) ev_next_with_common_timeout; int min_heap_idx; } ev_timeout_pos; struct event_base* ev_base; union { struct { TAILQ_ENTRY(event) ev_io_next; struct timeval ev_timeout; } ev_io; struct { TAILQ_ENTRY(event) ev_signal_next; short ev_ncalls; short *ev_pncalls; } ev_signal; } _ev; short ev_events; short ev_res; short ev_flags; ev_uint8_t ev_pri; ev_uint8_t ev_closure; struct timeval ev_timeout; void (*ev_callback)(evutil_socket_t, short, void *arg); void *ev_arg; }; |
ev_events
- 代表事件类型
ev_next
- 所有以及注册的事件处理器(包括
I/O
事件处理器和信号事件处理器)通过该成员串联成一个尾队列,称之为注册事件队列
TAILQ_ENTRY
是尾队列中的节点类型,定义在compat/sys/queue.h
- 所有以及注册的事件处理器(包括
1 2 3 4 5 |
#define TAILQ_ENTRY(type) \ struct {\ struct type* tqe_next; \ // 下一个元素 struct type** dqe_prev; \ // 前一个元素的地址 } |
ev_active_next
- 所有被激活的事件处理器通过该成员串联成一个尾队列,称之为活动事件队列。
- 活动事件队列不止一个,不同优先级的事件处理器被激活后将插入不同的活动事件队列中。
- 在事件循环中,
Reactor
将按优先级从高到低遍历所有活动事件队列,并依次处理其中的事件处理器。
ev_timeout_pos
- 这个是一个联合体。仅用于定时事件处理器。
- 老版本的
libevent
中,定时器都是由时间堆来管理的。但开发者认为有时候使用简单的链表来管理定时器将有更高的效率。
新版本libevent
引入了所谓的“通用定时器”概念。这些定时器不是存储在时间堆中,而是存储在尾队列中,称之为通用定时器队列。 - 对于通用定时器而言,
ev_timeout_pos
联合体的ev_next_with_common_timeout
成员指出了该定时器在通用定时器队列中的位置。 - 对于其他定时器而言,
ev_timeout_pos
联合体的min_heap_idx
成员指出了该定时器在时间堆中的位置。 - 一个定时器是否是通用定时器取决于其超时值大小,具体判断原则由
event.c
的is_common_timeout
函数决定。
_ev
- 这是一个联合体。
- 所有具有相同文件描述符值的
I/O
事件处理器通过ev.ev_io.ev_io_next
成员串联成一个尾队列,称之为信号事件队列 ev.ev_signal.ev_ncalls
成员指定信号事件发生时,Reactor
需要执行多少次该事件对应的事件处理器中的回调函数ev.ev_signal.ev_pncalls
指针成员要么是NULL
,要么指向ev.ev_signal.ev_ncalls
- 在程序中,可能针对同一个
socket
文件描述符上的可读、可写事件创建多个事件处理器(它们拥有不同的回调函数)
当该文件描述符上有可读或可写事件发生时,所有这些事件处理器都应该被处理
所以,Libevent
使用I/O
事件队列将具有相同文件描述符值的事件处理器组织在一起
这样,当一个文件描述符上有事件发生时,事件多路分发器就能很快地把所有相关的事件处理器添加到活动事件队列中
信号事件队列的存在也是同样的原因
可见,I/O
事件队列和信号事件队列并不是注册事件队列的细致分类,而是另有用处 ev_fd
- 对于
I/O
事件处理器,它是文件描述符;对于信号事件处理器,它是信号值
- 对于
ev_base
- 该事件处理器从属的
event_base
实例
- 该事件处理器从属的
ev_res
- 它记录当前激活事件的类型
ev_flags
- 它是一些事件标志,值在
include/event2/event_struct.h
- 它是一些事件标志,值在
ev_pri
- 指定事件处理器优先级,值越小优先级越高
ev_closure
- 指定
event_base
执行事件处理器的回调函数时的行为
- 指定
ev_timeout
- 仅对定时器有效,指定定时器的超时值
ev_callback
- 事件处理器的回调函数,由
event_base
调用
- 事件处理器的回调函数,由
ev_arg
- 回调函数的参数
往注册事件队列中添加事件处理器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 |
static inline int event_add_internal(struct event* ev, const struct timeval* tv, int tv_is_absolute) { struct event_base* base = ev->ev_base; int res = 0; int notify = 0; EVENT_BASE_ASSERT_LOCKED(base); _event_debug_assert_is_setup(ev); event_debug(("event_add: event:%p (fd %d),%s%s%scall %p", ev, (int)ev->ev_fd, ev->ev_events & EV_READ ? "EV_READ" : "", ev->ev_events & EV_WRITE ? " EV_WRITE" : "", tv ? "EV_TIMEOUT" : "", ev->ev_callback)); EVUTIL_ASSERT(!(ev->ev_flags & !EVLIST_ALL)); // 如果新添加的事件处理器是定时器,且它尚未被添加到通用定时器队列或时间堆中, // 则为该定时器在时间堆上预留一个位置 if (tv != NULL && !(ev->ev_flags & EVLIST_TIMEOUT)) { if (min_heap_reserve(&base->timeheap, 1 + min_heap_size(&base->timeheap)) == -1) { return -1; } #ifndef _EVENT_DISABLE_THREAD_SUPPORT if (base->current_event == ev && (ev->ev_events & EV_SIGNAL) && !EVBASE_IN_THREAD(base)) { ++base->current_event_waiters; EVTHREAD_COND_WAIT(base->current_cond, base->th_base_lock); } #endif if ((ev->ev_events & (EV_READ|EV_WRITE|EV_SIGNAL)) && !(ev->ev_flags & (EVLIST_INSERTED|EVLIST_ACTIVE))) { if (ev->ev_events & (EV_READ|EV_WRITE)) { // 添加I/O事件和I/O事件处理器的映射关系 res = evmap_io_add(base, ev->ev_fd, ev); } else if(ev->ev_events & EV_SIGNAL) { // 添加信号事件和信号事件处理器的映射关系 res = evmap_signal_add(base, (int)ev->ev_fd, ev); } if(res != -1) { // 将事件处理器插入注册事件队列 event_queue_insert(base, ev, EVLIST_INSERTED); } if (res == 1) { // 事件多路分发器中添加了新的事件,所有要通知主线程 notify = 1; res = 0; } } // 下面将事件处理器添加至通用定时器队列或事件堆中。 // 对于信号事件处理器和I/O事件处理器,根据evmap_*_add函数的结果决定是否添加(为了给事件设置超时) // 而对于定时器,则始终应该添加 if (res != -1 && tv != NULL) { struct timeval timeout; int common_timeout; // 对于永久性事件处理器,如果其超时时间不是绝对时间 // 则将该事件处理器的超时事件记录在变量ev->ev_io_timeout中 // ev_io_timeout是定义在event-internal.h文件中的宏 if (ev->ev_closure == EV_CLOSURE_PERSIST && !tv_is_absolute) { ev->ev_io_timeout = *tv; // 如果该事件处理器已经被激活,且原因是超时,则从活动事件队列中删除它,以避免其回调函数被执行, // 对于信号事件处理器,必要时还需要将其ncalls成员设置为0(ev_pncalls如果不为NULL,它指向ncalls) // 信号事件被触发时,ncalls指定其回调函数被执行的次数,将ncalls设置为0,可以干净地终止信号事件的处理 if ((ev->ev_flags & EVLIST_ACTIVE) && (ev->ev_res & EV_TIMEOUT)) { if (ev->ev_events & EV_SIGNAL) { if (ev->ev_ncalls && ev->ev_pncalls) { *ev->ev_pncalls = 0; } } event_queue_remove(base, ev, EVLIST_ACTIVE); } gettime(base, &now); common_timeout = is_common_timeout(tv, base); if (tv_is_absolute) { ev->ev_timeout = *tv; // 判断应该将定时器插入通用定时器队列,还是插入事件堆 } else if (common_timeout) { struct timeval tmp = *tv; tmp.tv_usec &= MICROSECONDS_MASK; evutil_timeradd(&now, &tmp, &ev->ev_timeout); ev->ev_timeout.tv_usec |= (tv->tv_usec & ~MICROSECONDS_MASK); } else { // 加上当前系统时间,以取得定时器超时的绝对时间 evutil_timeradd(&now, tv, &ev->ev_timeout); } event_debug(("event_add: timeout in %d seconds, call %p", (int)tv_tv_sec, ev->ev_callback)); // 插入定时器 event_queue_insert(base, ev, EVLIST_TIMEOUT); // 如果被插入的事件处理器是通过定时器队列中的第一个元素,则通过common_timeout_schedule函数 // 将其转移到事件堆中,这样,通用定时器链表和时间堆中的定时器就得到了统一的处理 if (common_timeout) { struct common_timeout_list* lctl = get_common_timeout_list(base, &ev->ev_timeout); if (ev == TAILQ_FIRST(&ctl->events)) { common_timeout_schedule(ctl, &now, ev); } } else { if (min_heap_eit_is_top(ev)) { notify = 1; } } } // 如果必要,唤醒主线程 if (res != -1 && notify && EVBASE_NEED_NOTIFY(base)) { evthread_notify_base(base); } _event_debug_note_add(ev); return res; } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
static void qvent_queue_insert(struct event_base* base, struct event* ev, int queue) { EVENT_BASE_ASSERT_LOCKED(base); // 比秒重复插入 if (ev->ev_flags & queue) { if (queue & EVLIST_ACTIVE) { return; } event_errx(1, "%s:%p(fd %d) already on queue %x", __func__, ev, ev->ev_fd, queue); return; } if(~ev->ev_flags & EVLIST_INTERNAL) { base->event_count++; } ev->ev_flags |= queue; switch(queue) { case EVLIST_INSERTED: TAILQ_INSERT_TAIL(&base->eventqueue, ev, ev_next); break; case EVLIST_ACTIVE: base->event_count_active++; TAILQ_INSERT_TAIL(&base->activequeues[ev->ev_pri], ev, ev_active_next); break; case EVLIST_TIMEOUT:{ if (is_common_timeout(&ev->ev_timeout, base)) { struct common_timeout_list *ctl = get_common_list(base, &ev->ev_timeout); insert_common_timeout_inorder(ctl, ev); } else { min_heap_push(&base->timeheap, ev); } } default: event_errx(1, "%s: unknown queue %x", __func__, queue); } } |
往事件多路分发器中注册事件
event_queue_insert
函数做的仅仅是将一个事件处理器加入event_base
的某个事件队列中.- 对于添加的
I/O
事件处理器和信号事件处理器,我们还需要让事件多路分发器来监听其对应的事件,同时建立文件描述符、信号值与事件处理器之间的映射关系
通过evmap_io_add
和evmap_signal_add
两个函数来完成。这两函数相当于事件多路分发器中的register_event
方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 |
#ifdef EVMAP_USE_HT #incldue "ht-internal.h" struct event_map_entry; // 如果定义了EVMAP_USE_HT,则将event_io_map定义为哈希表 // 该哈希表存储event_map_entry对象和I/O事件队列之间的映射关系, // 实际上也就是存储了文件描述符和I/O事件处理器之间的映射关系。 HT_HEAD(event_io_map, event_map_entry); #else #define event_io_map event_signal_map #endif struct event_signal_map { void** entries; // 存储evmap_io或evmap_signal数组 int nentries; }; // 如果定义了EVMAP_USE_HT,则哈希表event_io_map中的成员具有如下类型 struct event_map_entry { HT_ENTRY(event_map_entry) map_node; evutil_socket_t fd; union { struct evmap_io_evmap_io; } ent; }; // event_list是由event组成的尾队列 TAILQ_HEAD(event_list, event); // I/O事件队列 struct evmap_io { struct event_list events; ev_util16_t nread; ev_util16_t nwrite; }; // 信号事件队列 struct evmap_signal { struct event_list events; }; int evmap_io_add(struct event_base* base, evutil_socket_t fd, struct event* ev) { // 获得event_base的后端I/O复用机制 const struct eventop* evsel = base->evsel; // 获得event_base中文件描述符与I/O事件队列的映射表(哈希表或数组) struct event_io_map* io = &base->io; // fd参数对应的I/O事件队列 struct evmap_io * ctx = NULL; int nread, nwrite, retval = 0; short res = 0, old = 0; struct event * old_ev; EVUTIL_ASSERT(fd == ev->ev_fd); if (fd < 0) { return 0; } #ifndef EVMAP_USE_HT // I/O事件队列数据io.entries中,每个文件描述符占用一项 // 如果fd大于当前数组的大小则增加数组的大小 if (fd >= io->nentries) { if (evmap_map_space(io,fd,sizeof(struct evmap_io*)) == -1) { return (-1); } } #endif // 下面这个宏根据EVMAP_USE_HT是否被定义而有不同的实现,但目的都是创建ctx,在映射表io中为fd和ctx添加映射关系。 GET_IO_SLOT_CTOR(ctx, io, fd, evmap_io, evmap_io_init, evsel->fdinfo_len); nread = ctx->nread; nwrite = ctx->nwrite; if(nread) { old |= EV_READ; } if (nwrite) { old |= EV_WRITE; } if(ev->ev_events & EV_READ) { if (++nread == 1) { res |= EV_READ; } } if (ev->ev_events & EV_WRITE) { if (++nwrite == 1) { res |= EV_WRITE; } } if (EVUTIL_UNLIKELY(nread > 0xffff || nwrite > 0xffff)) { event_warnx("too many events reading or writing on fd %d", (int)fd); return -1; } if (EVENT_DEBUG_MODE_IS_ON() && (old_ev = TAILQ_FIRST(&ctx->events)) && (old_ev->ev_events&EV_ET) != (ev-?ev_events&EV_ET)) { event_warnx("tried to mix edge_triggered and non-edge-triggered event on fd %d", (int)fd); return -1; } if (res) { void* extra = ((char*)ctx) + sizeof(struct evmap_io); // 往事件多路分发器中注册事件,add是事件多路分发器的接口函数之一 if (evsel->add(base, ev->ev_fd, old, (ev->ev_events&EV_ET)|red, extra) == -1) { retval = 1; } } ctx->nread = (ev_uint16_t)nread; ctx->nwrite = (ev_uint16_t)nwrite; // 将ev插到I/O事件队列ctx的尾部 TAILQ_INSERT(&ctx->events, ev, ev_io_next); return (retval); } |
eventop结构体
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
struct eventop { // 后端I/O复用技术的名称 const char* name; // 初始化函数 void *(*init)(struct event_base*); // 注册事件 int (*add)(struct event_base*, evutil_socket_t fd, short old, short events, void* fdinfo); //删除事件 int (*del)(struct event_base*, evutil_socket_t fd, short old, short events, void* fdinfo); //等待事件 int (*dispatch)(struct event_base*, struct timeval*); //释放I/O复用机制使用的资源 void (*dealloc)(struct event_base*); // 调用fork之后是否需要重新初始化event_base int need_reinit; // I/O复用技术支持的一些特性 enum event_mathod_feature features; // I/O复用机制需要为每个I/O事件队列和信号事件队列分配额外的内存,避免同一个文件描述符被重复插入I/O复用机制的事件表中 size_t fdinfo_len; }; |
event_base结构体
事件循环
libevent
实现事件循环的函数是event_base_loop
- 该函数首选调用
I/O
事件多路分发器的事件监听函数,以等待事件。当有事件发生时,就依次处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 |
int event_base_loop(struct event_base* base, int flags) { const struct eventop* evsel = base->evsel; struct timeval tv; struct timeval * tv_p; int res, done, retval = 0; EVBASE_ACQUIRE_LOCK(base, th_base_lock); // 一个event_base仅允许运行一个事件循环 if (base->running_loop) { event_warnx("%s: reentrant invocation: only one event_base_loop can run on each event_base at once.", __func__); EVBASE_RELEASE_LOCK(base, th_base_lock); return -1; } base->running_loop = 1; // 标记已经开始运行 clear_time_cache(base); // 清除event_base的系统时间缓存 if (base->sig.ev_signal_added && base->sig.ev_n_signals_added) { evsig_set_base(base); } done = 0; #ifnef _EVENT_DISABLE_THREAD_SUPPORT base->th_owner_id = EVTHREAD_GET_ID(); #endif base->event_gotterm = base->event_break = 0; while(!done) { base->event_continue = 0; if (base->event_gotterm) { break; } if (base->event_break) { break; } timeout_correct(base, &tv); // 校准系统时间 tv_p = &tv; if (!N_ACTIVE_CALLBACKS(base) && !(flags & EVLOOP_NONBLOCK)) { // 获取时间堆顶元素的超时值 timeout_next(base, &tv_p); } else { // 有事件就绪尚未处理,将I/O系统调用的超时时间置0 evutil_timerclear(&tv); } if (!event_haveevents(base) && !N_ACTIVE_CALLBACKS(base)) { event_debug(("%s: no events registered.", __func__)); retval = 1; goto done; } // 更新系统时间 gettime(base, &base->event_tv); clear_time_cache(base); res = evsel->dispatch(base, tv_p); if (res == -1) { event_debug(("%s: disaptch returned unsuccessfully.", __func__)); retval = -1; goto done; } // 将时间缓存更新为当前系统时间 update_time_cache(base); // 检测时间堆上的到期事件并依次处理 timeout_process(base); if (N_ACTIVE_CALLBACKS(base)) { int n = event_process_active(base); if ((flags & EVLOOP_ONCE) && N_ACTIVE_CALLBACKS(base) == 0 && n != 0) { done = 1; } else if(flags & EVLOOP_NONBLOCK) { done = 1; } } event_debug(("%s: asked to terminate loop.", __func__)); done: clear_time_cache(base); base->running_loop = 0; EVBASE_RELEASE_LOCK(base, th_base_lock); return (retval); } |
本文为原创文章,版权归Aet所有,欢迎分享本文,转载请保留出处!
你可能也喜欢
- ♥ Linux 高性能服务器编程:I/O复用一12/11
- ♥ Cef:介绍06/29
- ♥ 包管理器:各平台安装卸载相关记述09/17
- ♥ 51CTO:Linux C++网络编程五08/20
- ♥ 各平台调试方法总结记述一09/25
- ♥ Linux » Linux 高性能服务器编程:信号12/12