分类: 2011 February

libevent源码浅析: 定时器和信号

上一篇文章介绍了libevent下基本的I/O事件,这篇文章将讲讲libevent对定时器和信号事件的处理.

Timer事件

反应堆event_base包含了一个最小堆min_heap结构体的实例,以此维护注册到这个反应堆实例的定时器事件:

struct event_base {
        //其他成员
	struct min_heap timeheap;
};

回顾一下最小堆min_heap:

typedef struct min_heap
{
    //p指向一个动态分配的数组,数组元素是event指针.
    struct event** p; 
    unsigned n, a; // n表示目前保存了多少元素,a表示p指向的内存能够存储event指针的个数
} min_heap_t;

可以看到,它包含一个连续的内存块用于存储定时器事件.针对min_heap的操作主要有:

static inline int min_heap_push(min_heap_t* s, struct event* e);
static inline struct event*  min_heap_pop(min_heap_t* s);

其中,min_heap_push()用于插入节点,min_heap_pop()用于弹出节点.其内部逻辑很简单,不必描述了.

现在看看libevent处理定时器事件的例子:

static void timeout_cb(int fd, short event, void *arg) {...}

int main (int argc, char **argv)
{
	struct event timeout;
	struct timeval tv;
        
        event_init();

	evtimer_set(&timeout, timeout_cb, &timeout);

	evutil_timerclear(&tv);
	tv.tv_sec = 2;
	event_add(&timeout, &tv);

	lasttime = time(NULL);
	
	event_dispatch();
}

首先,和上篇文章例子一样的,event_init()初始化一个event_base(反应堆实例),然后由evtimer_set()设置定时器事件的回调函数,接着event_add()把定时器事件加入反应堆实例中.最后进入event_dispatch()主循环.

在这里,evtimer_set定义如下:

#define evtimer_set(ev, cb, arg)	event_set(ev, -1, 0, cb, arg)

至于event_set(),没有什么好说的,就是对一个event结构体做初始化罢了.

上一篇文章已经从I/O事件的角度介绍了event_add(),这里看看它是如何处理定时器事件的:

int event_add(struct event *ev, const struct timeval *tv)
{	
    struct event_base *base = ev->ev_base;

    ....//处理IO事件或者信号事件的逻辑.

    //如果tv不为0
    if (tv != NULL)
    {
        event_queue_insert(base, ev, EVLIST_TIMEOUT);
    }
}

可以看到,event_add()会把一个定时器事件压入到其对应的反应堆实例下的定时器最小堆timeheap中(&ev->base.timeheap).

回到event_dispatch(),它会调用event_base_loop(),此函数对定时器事件处理如下:


//事件主循环
int
event_base_loop(struct event_base *base, int flags)
{	
    ...//不必多虑的其他代码

    done = 0;
    while (!done)
    {
	//检查heap中的timer events,将就绪的timer event从heap上删除,并插入到激活链表中,
	//这意味着base->event_count_active会增加
        timeout_process(base);

	//有就绪事件了
        if (base->event_count_active)
        {
            //处理就绪事件吧.
            event_process_active(base);
        }
    }
}

其中,timeout_process()会将已超时的定时器事件插入到反应堆实例下的已就绪事件队列中,接着由event_process_active()处理已就绪事件.event_process_active()代码在上一篇文章中已经介绍过了,这里看一下timeout_process():

/时间到~~~
//开始处理base里面的定时器堆里的事件鸟.
//检查heap中的timer events,将就绪的timer event从heap上删除,并插入到激活链表中 
void timeout_process(struct event_base *base)
{
    struct timeval now;
    struct event *ev;

   while ((ev = min_heap_top(&base->timeheap)))
    {
    	//ev超时时间的比现在的时间大,也就是说,这个ev还没有超时,那么while循环结束
        if (evutil_timercmp(&ev->ev_timeout, &now, >))
            break;

	//else 意味着 evutil_timercmp(&ev->ev_timeout, &now, <=)为真
	//也就说明定时器最小堆的根超时了
		
        //从定时器堆删除
        event_del(ev);

	//把它插到激活链表吧.
        event_active(ev, EV_TIMEOUT, 1);
    }
}

Signal事件

signal事件的处理时libevent中比较难懂的地方,前人之述不详,本文重点讲解之.

反应堆event_base包含了一个evsignal_info结构体的实例,来维护注册到这个反应堆实例的信号事件:

struct event_base {
        //其他成员
	struct evsignal_info sig;
};

这里仔细研究一下evsignal_info结构体的定义:


struct evsignal_info {

	//为 socket pair 的读 socket向 event_base 注册读事件时使用的 event 结构体
	//这个是所有信号事件共用的.
	struct event ev_signal; 

	//这个也是所有信号事件共用的.
	int ev_signal_pair[2]; 

	//记录ev_signal 事件是否已经注册了
	int ev_signal_added;

	//是否有信号发生的标记
	//只在evsignal_handler()中被修改为1
	volatile sig_atomic_t evsignal_caught;

	//evsigevents[signo]表示注册到信号 signo 的事件链表
	struct event_list evsigevents[NSIG];

	//具体记录每个信号触发的次数,evsigcaught[signo]是记录信号signo被触发的次数
	sig_atomic_t evsigcaught[NSIG];
	
	//记录了原来的signal处理函数指针,当信号signo注册的event被清空时,需要重新设置其处理函数
	struct sigaction **sh_old;
};

要了解evsignal_info为何是这样设计的,首先需要明白int ev_signal_pair[2];的作用.它实际上表示两个文件描述符,在libevent中一个用于写,一个用于读,它们在event_init()是被初始化.好吧,其实更确切点说,event_init()会调用event_base_new(),而event_base_new()调用封装好I/O多路复用技术的结构体eventop实例(&event_base->evsel)的init函数(&event_base->evsel),这个init函数会初始化eventop实例的内部数据结构,然后调用evsignal_init()对evsignal_info结构体实例(&event_base->sig)做初始化.而在初始化实例的过程中,对其内部的ev_signal_pair[2]数组的初始化是通过调用evutil_socketpair()函数来实现的.够了,上面这段话已经够恶心了,图示如下:

看看evutil_socketpair()代码:

int
evutil_socketpair(int family, int type, int protocol, int fd[2])
{
#ifndef WIN32
	return socketpair(family, type, protocol, fd);
#else
       ...//山寨一个socketpair函数
}

它使用socketpair系统调用创建一对全双工管道(如果有时间的话,可以读一下evutil_socketpair()后半部分的代码,它在WIN32环境下如何山寨了一个socketpair函数,熟悉之可以加深不少理解.).这个全双工管道有什么用呢? 这里先卖个关子,我们看看evsignal_info结构体下的成员struct event ev_signal是如何被初始化的.

evsignal_init()调用event_set()函数,event_set()将&event_base->sig.ev_signal.ev_fd设置为&event_base->sig.ev_signal_pair[1],其回调函数为evsignal_cb(). ([1]).

至此,铺垫基本上做好了.我们看一个使用libevent处理信号事件的例子吧:

static void signal_cb(int fd, short event, void *arg) {...}

int main (int argc, char **argv)
{
	/* Initalize the event library */
        event_init();

	struct event signal_int;
	event_set(&signal_int, SIGINT, EV_SIGNAL|EV_PERSIST, signal_cb, &signal_int);
	
	event_add(&signal_int, NULL);

	event_dispatch();
}

首先是由event_init()创建一个反应堆实例(在此背后,对维护信号事件的结构体evsigal_info的实例(&event_base.sig)如何被初始化在上文已经做了介绍了.),然后由event_set()设置一个事件,将其标志&signal_int.events设为EV_SIGNAL|EV_PERSIST,文件描述符&signal_int.ev_fd设置对应的信号(在例子中是SIGIN,即中断信号,中断下可以用ctrl-c触发).然后设置好这个信号事件对应的回调函数 ([2]注意,回调函数对应的是信号事件,而非信号.注意与[3]的不同.).

之后,调用event_add()将信号事件注册到反应堆实例中,event_add()对信号事件的处理如下:

int event_add(struct event *ev, const struct timeval *tv)
{
    struct event_base *base = ev->ev_base;
    const struct eventop *evsel = base->evsel;
    void *evbase = base->evbase;
    int res = 0;

    //ev->ev_events表示事件类型
    //如果ev->ev_events是 读/写/信号 事件,而且ev不在 已注册链表 或 已激活链表,那么调用evbase注册ev事件
    if ((ev->ev_events & (EV_READ|EV_WRITE|EV_SIGNAL)) &&
            !(ev->ev_flags & (EVLIST_INSERTED|EVLIST_ACTIVE)))
    {
	//实际执行操作的是evbase
        res = evsel->add(evbase, ev);
		
        if (res != -1) //注册成功,把事件ev插入 已注册链表 中
            event_queue_insert(base, ev, EVLIST_INSERTED);
    }
}

为了描述方便,我们假定libevent使用的I/O多路复用技术是select,看看select_add()代码吧:

static int select_add(void *arg, struct event *ev)
{
	if (ev->ev_events & EV_SIGNAL)
		return (evsignal_add(ev));
}

对于信号事件,它转手给evsignal_add()函数处理,evsignal_add()代码如下:

//将信号事件ev下的描述符ev_fd(也就是信号)添加到&ev->ev_base->sig->evsigevents[ev_fd]队列中
int evsignal_add(struct event *ev)
{
    int evsignal;
    struct event_base *base = ev->ev_base;
    struct evsignal_info *sig = &ev->ev_base->sig;

    //拿到event下的信号标号
    evsignal = EVENT_SIGNAL(ev);

    if (TAILQ_EMPTY(&sig->evsigevents[evsignal]))
    {
		//设置这个事件对应的信号对应的处理函数

		//watch out!!!!针对的是信号,不是事件
	if (_evsignal_set_handler(
                    base, evsignal, evsignal_handler) == -1)
            return (-1);

	//这里注册的sig本身,而不是信号事件
	//也就是就是说,sig是在真正有信号事件时才注册的.
        if (!sig->ev_signal_added)
        {
            //注册这个信号对应的事件
            if (event_add(&sig->ev_signal, NULL))
                return (-1);
            sig->ev_signal_added = 1;
        }
    }

    //多个事件可能对应同一信号
    TAILQ_INSERT_TAIL(&sig->evsigevents[evsignal], ev, ev_signal_next);
}

evsignal_add()函数先获得信号事件对应的信号,通过_evsignal_set_handler()函数将此信号相应的信号处理函数设置为evsignal_handler(). ([3]注意,[2]设置的回调函数是针对信号事件的,这里设置的处理函数才是针对信号的.) 接着,evsignal_add()判断sig->ev_signal_added是否为0,为0则将&sig->ev_signal事件注册到反应堆实例中,然后将sig->ev_signal_added置1。;如果不为0,那么跳过这段代码.需要指出的是,sig->ev_signal_added唯一一次被置1就是在这段代码中,这保证了&sig->ev_signal事件只被注册到反应堆实例中一次.其实也就是说,只有在第1次有信号事件需要通过event_add()被注册到反应堆实例时,&sig->ev_signal事件才会被一起注册,这是libevent对&event_base->sig的延后处理.

<hr/>

接下来,貌似应该讲讲event_dispatch()对信号事件的处理了.且慢,我们回头把 [1], [2], [3] 整理一下:

(1) 在调用event_init()新建一个反应堆实例(以base表示)时,evsignal_info结构体(libevent用它来管理信号事件集合) base->sig被初始化,base->sig->ev_signal的回调函数总是被设置为evsignal_cb(),而evsignal_cb()是定义在libevent内部的,对libevent用户完全透明,其代码如下:

static void
evsignal_cb(int fd, short what, void *arg)
{
    recv(fd, signals, sizeof(signals), 0);
}

它从一个文件描述符(后文会看到,这个文件描述符总是&event_base->sig.ev_signal_pair[1])读1比特的数据.

(2) 在已经通过调用event_init()获得一个反应堆实例后,通过event_set()设置一个信号事件signal_int的文件描述符signal_int.ev_fd(其实对于信号事件而言,ev_fd也就是此信号事件对应的信号),event_set()还设置了这个信号事件的回调函数.很明显,对于同一个信号,可以有不同的信号事件,这些信号事件的回调函数也可以完全不同.在这里,回调函数是由用户设计的,表示信号被触发时希望作出的反馈函数.

(3) 为了将一个事件(这个事件可以是I/O事件,也可以是定时器事件,也可以是信号事件)注册到反应堆实例中,我们必须调用event_add(),而event_add()通过重重调用,最终由evsignal_add()来完成将信号事件注册.回顾一下evsignal_add():

它通过_evsignal_set_handler总是将信号事件对应的信号的处理函数设置为evsignal_handler(),evsignal_handler()代码如下:

//通知event_base有信号发生的技巧,往sig.ev_signal_pair[0]写1字节数据
//会设置sig.evsignal_caught = 1,标记有信号产生.
static void evsignal_handler(int sig)
{
    evsignal_base->sig.evsigcaught[sig]++;
    evsignal_base->sig.evsignal_caught = 1; //将信号发生标志至1

    send(evsignal_base->sig.ev_signal_pair[0], "a", 1, 0);
}

它将信号发生标志evsignal_base->sig.evsignal_caught置1,以此通过libevent有信号发生.然后往&event_base->sig.ev_signal_pair[0]写1比特数据.

<hr/>

好吧,现在终于可以看看libevent是如何处理信号事件的了:

libevent先进入event_base_loop()主循环,等待已经准备好(可读可写或异常)的事件(通过select_dispatch找出已准备好的文件描述符).当有一个信号产生时,由于这个信号的信号处理函数(总是evsignal_handler())总是会往&event_base->sig.ev_signal_pair[0]写1比特数据(这是由操作系统调用的,对libevent是透明的,对libevent的用户就更加透明了).此时,根据前面的描述,由于ev_signal_pair[0]与ev_signal_pair[1]是一对全双工管道,所以,ev_signal_pair[1]将变得可读.而&event_base->sig.ev_signal事件的文件描述符正是ev_signal_pair[1],所以libevent可以知道&event_base->sig.ev_signal事件准备好了.为此,&event_base->sig.ev_signal事件被移入反应堆实例下的已就绪事件队列.接着在event_base_loop()的后续部分代码中被处理,通过event_process_active()调用其回调函数,也就是evsignal_cb(),从&event_base->sig.ev_signal_pair[1])读1比特的数据.[4]我们把信号被捕捉到的这个while()循环记为第1次while()循环.

写到这里,仍然有一个疑惑没有解开,上面都是讲libevent内部定义的&event_base->sig.ev_signal如何如何,可是我们希望的是自己定义的信号事件signal_int如何如何啊.

答案是,正如(3)描述的那样,在操作系统调用信号处理函数evsignal_handler()时,它会将信号发生标志置1.然后将evsignal_info结构体中用于记录信号被捕捉次数的evsigcaught[id]++,id也就是这个信号.

在第2此while()循环时(参考[4]),它还是调用select_dispatch(),这时,由于信号发生标志为1,所以select_dispatch()会调用函数evsignal_process().select_dispatch()相关代码如下:


static int
select_dispatch(struct event_base *base, void *arg, struct timeval *tv)
{
        if (base->sig.evsignal_caught){
		evsignal_process(base);
}

evsignal_process()代码如下:

void evsignal_process(struct event_base *base)
{
    struct evsignal_info *sig = &base->sig;
    struct event *ev, *next_ev;
    sig_atomic_t ncalls;
    int i;

    base->sig.evsignal_caught = 0;
    for (i = 1; i < NSIG; ++i)
    {
        ncalls = sig->evsigcaught[i];
        if (ncalls == 0)
            continue;
        sig->evsigcaught[i] -= ncalls;

        for (ev = TAILQ_FIRST(&sig->evsigevents[i]);
                ev != NULL; ev = next_ev)
        {
            next_ev = TAILQ_NEXT(ev, ev_signal_next);
            if (!(ev->ev_events & EV_PERSIST))
                event_del(ev);
			
	    //移到已就绪事件队列,ncalls回调函数将会被调用多少次
            event_active(ev, EV_SIGNAL, ncalls);
        }
    }
}

总结一下,反应堆结构体event_base有一个数据成员evsignal_info结构体,它维护信号事件集.之所以evsignal_info会有一个event事件成员ev_signal,是因为libevent通过socket pair让操作系统通知自己有信号发生,在信号处理函数中将信号发生标志置1,并使该信号被捕捉的次数自增,然后ev_signal被移到已就绪事件队列,接着被处理.然后libevent检查到信号发生标志已经被置1,遍历所有信号事件,找出信号被捕捉次数不为0的那个信号事件集,将它们移到已就绪事件队列,然后处理之.

以上,就是libevent处理信号事件的逻辑.

libevent源码浅析: 事件处理框架

本文将从一个使用libevent的小例子出发,解释libevent处理事件的流程.

例子如下:

static void fifo_read(int fd, short event, void *arg) {...}

int main (int argc, char **argv)
{
	int socket = open ("/tmp/event.fifo", O_RDONLY | O_NONBLOCK, 0);

	fprintf(stdout, "Please write data to %s\n", fifo);	
	
	event_init();
	
	struct event evfifo;
	event_set(&evfifo, socket, EV_READ, fifo_read, &evfifo);
	
	event_add(&evfifo, NULL);

	event_dispatch();
}

libevent库的使用方法大体上就像例子展示的那样,先由event_init()得到一个event_base实例(也就是反应堆实例),然后由 event_set()初始化一个event,接着用event_add()将event绑定到event_base,最后调用event_dispatch()进入时间主循环.

event_init()和event_set()功能都很简单,它们分别对event_base结构体和event结构体做初始化.我们直接看看event_add():

//为了思路清晰,这里分析的是I/O事件,暂不考虑信号和定时器相关处理代码.

//函数将ev注册到ev->ev_base上,事件类型由ev->ev_events指明.如果注册成功,ev将被插入到已注册链表中.
int event_add(struct event *ev, const struct timeval *tv)
{	
	//得到ev对应的反应堆实例event_base
    struct event_base *base = ev->ev_base;
	
	//得到libevent选择的已封装的I/O多路复用技术
    const struct eventop *evsel = base->evsel;
	
    void *evbase = base->evbase;
    int res = 0;

    //ev->ev_events表示事件类型
    //如果ev->ev_events是 读/写/信号 事件,而且ev不在 已注册队列 或 已就绪队列,
    //那么调用evbase注册ev事件
    if ((ev->ev_events & (EV_READ|EV_WRITE|EV_SIGNAL)) &&
            !(ev->ev_flags & (EVLIST_INSERTED|EVLIST_ACTIVE)))
    {

		//实际执行操作的是evbase
        res = evsel->add(evbase, ev);
		
		 //注册成功,把事件ev插入已注册队列中
        if (res != -1)
            event_queue_insert(base, ev, EVLIST_INSERTED);
    }

    return (res);
}

注释已经很明了了,如果一个事件不在已注册队列或者已激活队列,而且它是I/O事件或者信号事件,那么调用select_add()将ev插入到selectop的内部数据结构中(本文以select为例,下文不再说明.).select_add()代码如下:

//略去信号处理的相关代码

static int select_add(void *arg, struct event *ev)
{
    struct selectop *sop = arg;
	
    //如果是读类型事件,把该事件的文件描述符加入到selectop维护的读fd_set集
    //event_readset_in中,并且把该事件插入到读事件队列.
    if (ev->ev_events & EV_READ)
    {
        FD_SET(ev->ev_fd, sop->event_readset_in);
        sop->event_r_by_fd[ev->ev_fd] = ev;
    }

    //略去对写事件的处理
}

小结一下,结合event_add()代码和select_add()代码,可以看出在调用event_add()时,事件将被插入其对应的反应堆实例event_base的已注册事件队列中,而且还会被加入到selectop维护的内部数据结构中进行监视.

现在可以看看event_dispatch()代码了:

//略去信号事件和定时器事件处理的相关代码

int event_dispatch(void)
{
    return (event_loop(0));
}

int event_loop(int flags)
{
    return event_base_loop(current_base, flags);
}

//事件主循环
int event_base_loop(struct event_base *base, int flags)
{
    const struct eventop *evsel = base->evsel;
    void *evbase = base->evbase;
    struct timeval *tv_p;
    int res, done;

    done = 0;
    while (!done)
    {
	//从定时器最小堆中取出根节点,其时间值作为select最大等待时间
	//如果定时器最小堆没有元素,那么select最大等待时间为0
	timeout_next(base, &tv_p);
		
	//调用select_dispatch(),它会将已经准备好的事件移到已就绪事件队列中
        res = evsel->dispatch(base, evbase, tv_p);

	//有就绪事件了,那就处理就绪事件吧.
        if (base->event_count_active)
            event_process_active(base);
    }
}

event_base_loop()先从定时器最小堆中取出根节点作为select的最大等待时间,然后调用select_dispatch()将已经准备好的事件移到已就绪事件队列中,最后调用event_process_active()处理已就绪事件队列.

//略去信号事件和定时器事件处理的相关代码

static int select_dispatch(struct event_base *base, void *arg, struct timeval *tv)
{
    int res, j;
    struct selectop *sop = arg;

    //根据前面对select_add()的解释,事件fd已被加入到fd_set集中进行监视.
    res = select(sop->event_fds + 1, sop->event_readset_out,
                 sop->event_writeset_out, NULL, tv);

    for (j = 0, res = 0; j <= sop->event_fds; ++j, res = 0)
    {
        struct event *r_ev = NULL, *w_ev = NULL;

       //找出已经准备好读的事件
        if (FD_ISSET(j, sop->event_readset_out))
        {
            r_ev = sop->event_r_by_fd[i];
            res |= EV_READ;
        }
	
       //将已经准备好读的事件移到已就绪事件队列
        if (r_ev && (res & r_ev->ev_events))
            event_active(r_ev, res & r_ev->ev_events, 1);

	//略去对已经准备好写的事件的处理
    }
}

看看在event_base_loop()中被调用的event_process_active()代码:

static void event_process_active(struct event_base *base)
{
    struct event *ev;
    struct event_list *activeq = NULL;
    int i;
    short ncalls;

    //寻找最高优先级(priority值越小优先级越高)的已就绪事件队列
    for (i = 0; i < base->nactivequeues; ++i)
    {
        if (TAILQ_FIRST(base->activequeues[i]) != NULL)
        {
            activeq = base->activequeues[i];
            break;
        }
    }

    for (ev = TAILQ_FIRST(activeq); ev; ev = TAILQ_FIRST(activeq))
    {
	//如果有persist标志,则只从激活队列中移除此事件,
        if (ev->ev_events & EV_PERSIST)
            event_queue_remove(base, ev, EVLIST_ACTIVE);
		
        else //否则则从激活事件列表,以及已注册事件中双杀此事件
            event_del(ev);

        ncalls = ev->ev_ncalls;
        ev->ev_pncalls = &ncalls;

	//每个事件的回调函数的调用次数 
        while (ncalls)
        {
            ncalls--;
            ev->ev_ncalls = ncalls;

	    //调用回调函数
            (*ev->ev_callback)((int)ev->ev_fd, ev->ev_res, ev->ev_arg);
        }
    }
}

现在,看看这个被阉割的只考虑I/O事件的libevent主循环框架:

event_base_loop:
	
	while()
	{
		//从定时器最小堆取出select最大等待时间
		
		//select出已准备事件,将它们移到已就绪事件队列中
		
		//处理已就绪事件
	}

这真是篇节能环保的文章啊,哈哈.因为libevent代码太恶心了,描述出来都觉得恶心,有空得拿来重构下..下篇文章会讲讲libevent中非常恶心的信号集成处理.

libevent源码浅析: 主要的结构体

About

libevent是一个开源的跨平台网络库,属于事件驱动机制,支持多种I/O多路复用技术,从其主页 http://www.monkey.org/~provos/libevent/ 可以看到libevent使用者众多,其中甚至包括 Memcached.

本文参照的版本是主版本号为1的最后一个版本1.4.14(没办法,2.0.1版本代码量都在4W+行以上了, :-(

File Tree

先看一下libevent的文件组织, 源文件数目是:

$ find . -name “*\.[hc]” | wc -l
51

源代码总行数是:

$ find . -name “*\.[hc]” | xargs wc -l | tail -n1
25562 总计

Structure Call Graph

先送上一张高清无码大图:

[虚线表示包含关系,其label表示是通过哪个成员变量来包含另一个结构体的.]

从上图的虚线指向可以看出, event_base 是绝对的核心,它包含了定时器事件最小堆结构 min_heap ,信号事件队列结构 evsignal_info 和I/O事件队列结构 eventqueue ,也就是说,libevent把定时器,I/O,信号这些事件都统一到event_base管理.另外,既然libevent是事件驱动的,那么事件属性也很自然的自成一个结构体了,就是 event .

最后,结构体 eventop 是拿来封装多种I/O多路复用技术的,主要是起隐藏底层系统机制的作用.

Tail Queue

evsignal_info 和 eventqueue 都是tail queue结构,其定义如下:

A tail queue is headed by a pair of pointers, one to the head of the list and the other to the tail of the list. The elements are doubly linked so that an arbitrary element can be removed without a need to traverse the list. New elements can be added to the list before or after an existing element, at the head of the list, or at the end of the list. A tail queue may be traversed in either direction.

tail queue有两个指针,分别指向头部和尾部.它的元素是双向链接的,所以无须遍历整个序列就可以删除任意一个元素,而且,新的元素也可以在任意位置插入,不管是在一个已存在元素的前后,还是在队列头部,还是尾部.一个tail queue可以双向遍历.

很自然的,一个tail queue有多种实现方式,在libevent中,使用的是linux下的/usr/include/sys/queue.h,我们看看linux的实现(这个纯宏实现了4种数据结构,任何一行除了注释就是宏,非常牛逼.):

#define TAILQ_HEAD(name, type)						\
struct name {								\
	struct type *tqh_first;	/* first element */			\
	struct type **tqh_last;	/* addr of last next element */		\
}

#define TAILQ_ENTRY(type)						\
struct {								\
	struct type *tqe_next;	/* next element */			\
	struct type **tqe_prev;	/* address of previous next element */	\
}

使用方式如下:

struct QUEUE_ITEM //定义节点的结构体
{
	int value; //在这里可以定义一系列的数据
	TAILQ_ENTRY(QUEUE_ITEM) entries; //作为队列入口
};   

//定义一个匿名队列的头部,这个队列的节点结构为 QUEUE_ITEM
TAILQ_HEAD(,TAILQ_ITEM) queue_head; 

从上面对这些宏的使用可以看出,节点的数据结构是自由定义的(废话),但是一定得包含有这个节点相应于队列的入口.当然,我们很容易就可以将一个节点扩展为对应多个队列,只要在这个节点的定义中有多个队列入口成员就可以了.下文将会讲到,事件结构体event就做了这种扩展.

这篇文章 对tail queue有比较详细的介绍,强烈建议仔细看完,因为Tail Queue在libevent中几乎是无处不在的.

event

首先看看event的成员:

//event提供了函数接口,供Reactor在事件发生时调用,以执行相应的事件处理,
//通常它会绑定一个有效的句柄(ev_fd)做为回调函数的参数.
struct event {

	//已注册事件队列入口
	TAILQ_ENTRY (event) ev_next;

	//已激活事件队列入口
	TAILQ_ENTRY (event) ev_active_next;

	//信号事件队列入口
	TAILQ_ENTRY (event) ev_signal_next;

	//表示该event在定时器事件最小堆min_heap的索引
	unsigned int min_heap_idx;	/* for managing timeouts */

	//该事件所属的反应堆实例
	struct event_base *ev_base;

	//对于I/O事件,是绑定的文件描述符; 对于signal事件,是绑定的信号.
	int ev_fd;

	//表示事件类型: I/O,定时器或者信号
	short ev_events;

	//事件就绪执行时,将要调用ev_callback 的次数,通常为1
	short ev_ncalls;

	//该事件的超时时间,在定时器最小堆min_heap操作中作为节点值进行比较.
	struct timeval ev_timeout;

	//该事件的优先级,越小越优先.
	int ev_pri;		/* smaller numbers are higher priority */

	//该事件被激活时的回调函数
	void (*ev_callback)(int, short, void *arg);

	//该事件的标记信息,表示其当前的状态,即它在哪个链表中
	int ev_flags;

	... //其他成员.
};

可以看到,一个事件是可以插入到多个队列的,当它与一个反应堆实例(event_base)关联时,这个事件被插入到反应堆实例下的已注册事件队列 event_base -> eventqueue ,当它处于就绪状态时,会被插入到反应堆实例下的已激活事件队列 event_base -> activequeues[id], id = event -> ev_pri .同时,如果此事件是信号事件,那么它会被插入到反应堆结构体下的信号事件结构体下的信号队列 event_base -> evsignal_info -> evsigevents[id], id = event -> ev_fd .

需要指出的,每个事件都保持了一个成员 struct event_base *ev_base; ,它表示该事件属于哪个反应堆实例.

还有一个成员需要注意, short ev_events; ,它表明此事件的事件类型,libevent正是基于此实现对I/O,信号,定时 3种事件的封装的.

min_heap

min_heap是存储定时事件的最小堆,它应该是libevent里最简单的结构体了:

typedef struct min_heap
{
    struct event** p; //p指向一个动态分配的数组,数组元素是event指针.
    unsigned n, a; // n表示目前保存了多少元素,a表示p指向的内存能够存储event指针的个数.
} min_heap_t;

之所以会有这个结构体,是因为I/O多路复用机制,比如说select,往往要求一个最大等待时间,而最小堆的根节点表示的就是最小超时时间,所以把根节点时间值传给select作为其最大等待时间就可以了.而在堆中取出根节点复杂度为O(1).

btw,从min_heap的成员p可以看出,libevent使用的是开辟一块连续存储区(即数组)来实现堆的策略.这很容易就可以办到,无非就是shift_up和shift_down操作罢了,但是libevent做了小小的优化,这导致了算法的不清晰,有点得不偿失的感觉,不提为妙.

eventop

eventop实现了对系统I/O多路复用机制的封装,这些机制包括 select poll epoll evport kqueue devpoll (别忘了libevent是跨平台的).

看看eventop的成员吧:

struct eventop {
	const char *name; //表示哪种I/O多路复用机制
	void *(*init)(struct event_base *); //初始化
	int (*add)(void *, struct event *); //注册事件
	int (*del)(void *, struct event *); //删除事件
	int (*dispatch)(struct event_base *, void *, struct timeval *); //事件分发
	void (*dealloc)(struct event_base *, void *); //注销,释放资源

	//是否需要重新初始化
	int need_reinit;
};

可以看到,eventop里面包含了5中操作的函数指针,libevent就是通过这一点来实现封装的.不同的机制定义不同的操作,但这些操作的接口却是保持一致的,以select接口为例:

const struct eventop selectops =
{
    "select",
    select_init,
    select_add,
    select_del,
    select_dispatch,
    select_dealloc,
    0
};

其中,5个select_* 成员都是函数指针,不同机制的函数以static形势封装在不同的文件下 ,举个例子, static void *
select_init(struct event_base *base) 函数的声明和定义在select.c文件下,而epoll机制的初始化函数 static void *
epoll_init(struct event_base *base) 声明和定义在epoll.c文件下.

好了,做好了这些准备,各机制的信息已经隐藏起来了,那么到底该怎样封装呢? libevent借助了一个static数组来保存这些I/O复用机制的结构体指针,代码如下:

#ifdef HAVE_EVENT_PORTS
extern const struct eventop evportops;
#endif
#ifdef HAVE_SELECT
extern const struct eventop selectops;
#endif
...
#ifdef WIN32
extern const struct eventop win32ops;
#endif

static const struct eventop *eventops[] =
{
#ifdef HAVE_EVENT_PORTS
    &evportops,
#endif
#ifdef HAVE_SELECT
    &selectops, //在select.c中
#endif
...
#ifdef WIN32
    &win32ops,
#endif
    NULL
};

其中, HAVE_* 宏是在configure时保存在config.h中的.注意在声明 evportops selectops 等变量时使用的是extern,由此使编译器知道这是外部变量,在别处寻找,或者在链接时寻找其符号名.(在libevent中,是在链接时才寻找到symbol的,封装嘛..)

封装已经ok了,那么程序运行时应该选择哪种I/O多路复用机制呢? libevent很不人性化的一点就在这里,它没有用配置文件或者config.h信息来保证用户可以灵活的选择,而是写死在代码里了(当然,手动修改代码重编译就是唯一的方法了.说到这里,还有一点要吐槽的是,libevent用的autoconf,configure文件和makefile文件难懂异常,在这方面完败于手工编写makefile的nginx.). 代码正是上面的 struct eventop *eventops[] 数组,此数组最后一个有效的元素就是libevent选择的机制.选择代码如下:

//初始化一个反应堆实例
struct event_base *event_base_new(void)
{
    ...
    for (i = 0; eventops[i] && !base->evbase; i++)
    {
        //选定I/O多路复用机制
        base->evsel = eventops[i];
    }
    ...
}

evsignal_info

evsignal_info 是用来管理信号事件的,代码如下:

struct evsignal_info {

	//是否有信号发生的标记
	volatile sig_atomic_t evsignal_caught;

	//evsigevents[signo]表示注册到信号 signo 的事件链表
	struct event_list evsigevents[NSIG];

	//具体记录每个信号触发的次数,evsigcaught[signo]是记录信号signo被触发的次数
	sig_atomic_t evsigcaught[NSIG];

	... //其他成员
};

其中, struct event_list evsigevents[NSIG]; 成员是一个数组,它的元素 evsigevents[id] 表示注册到信号id的事件链表.

关于evsignal_info 还有很多很多要说的,这留给下篇文章,本文对其描述到此为止.

event_base

终于到了最最核心的 event_base 了.秒杀之,代码如下:

struct event_base {

	//保存I/O机制
	const struct eventop *evsel; 

	//有多少个event
	int event_count;		/* counts number of total events */ 

	//有多少个活动的event
	int event_count_active;	/* counts number of active events */ 

	//存储已就绪事件队列的数组
	struct event_list **activequeues;

        //已就绪事件队列数组的元素个数
	int nactivequeues;

	//保存信号事件队列的结构体
	struct evsignal_info sig;

	//已注册事件队列
	struct event_list eventqueue; 

	//这是定时器事件最小堆
	struct min_heap timeheap;

	...//其他成员
};

重要的数据成员的说明已经在注释中给出了.相信我,回头看看前面event的描述吧.

Conclusion and next steps

本文介绍了libevent主要的结构体,接下来会分析事件处理框架或者是信号/定时器如何集成到I/O处理中的.我还没决定呢.