结构:
struct WATCHDOG {
unsigned timeout; /* our time resolution */
WATCHDOG_FN action; /* application routine */
char *context; /* application context */
int trip_run; /* number of successive timeouts */
WATCHDOG *saved_watchdog; /* saved state */
struct sigaction saved_action; /* saved state */
unsigned saved_time; /* saved state */
};
static WATCHDOG *watchdog_curr;watchdog 单例。
调用alarm函数,一个进程只能有一个闹钟时间。
Alarm(0)取消此闹钟并返回剩余时间。
Watchdog_start中开启定时器。
Postfix 事件机制模块
Event模块结构体
*/
typedef void (*EVENT_NOTIFY_FN) (int, void *);
#define EVENT_NOTIFY_TIME_FN EVENT_NOTIFY_FN /* legacy */
#define EVENT_NOTIFY_RDWR_FN EVENT_NOTIFY_FN /* legacy */
/*
* Event codes.
*/
定义了四种事件类型,读,写,异常事件和时间事件1
#define EVENT_READ (1<<0) /* read event */
#define EVENT_WRITE (1<<1) /* write event */
#define EVENT_XCPT (1<<2) /* exception */
#define EVENT_TIME (1<<3) /* timer event */
#define EVENT_ERROR EVENT_XCPT
/*
* Dummies.
*/
#define EVENT_NULL_TYPE (0)
#define EVENT_NULL_CONTEXT ((void *) 0)
#define EVENT_NULL_DELAY (0)
This module delivers I/O and timer events.
/* Multiple I/O streams and timers can be monitored simultaneously.
/* Events are delivered via callback routines provided by the
/* application. When requesting an event, the application can provide
/* private context that is passed back when the callback routine is
/* executed.
/*
/* event_time() returns a cached value of the current time.
因为事件在程序中只调用一次,调用EVENT_INIT_NEEDED()判断event_present 是否为空。判断时间是否执行过。并返回当前时间。
/* event_init - set up tables and such */
Static void event_init(void);
//根据事件类型机制,判断每个进程文件打开的数量。
#if (EVENTS_STYLE == EVENTS_STYLE_SELECT)
if ((event_fdlimit = open_limit(FD_SETSIZE)) < 0)
msg_fatal("unable to determine open file limit");
#else
if ((event_fdlimit = open_limit(INT_MAX)) < 0)
msg_fatal("unable to determine open file limit");
#endif
//初始化 event_fdslots, event_fdtable;
if (event_fdlimit < FD_SETSIZE / 2 && event_fdlimit < 256)
msg_warn("could allocate space for only %d open files", event_fdlimit);
event_fdslots = EVENT_ALLOC_INCR;
event_fdtable = (EVENT_FDTABLE *)
mymalloc(sizeof(EVENT_FDTABLE) * event_fdslots);
for (fdp = event_fdtable; fdp < event_fdtable + event_fdslots; fdp++) {
fdp->callback = 0;
fdp->context = 0;
再调用宏函数初始化event_rmask,event_wmask,event_xmask。
初始化event_timer_head结构体,使用双向链表管理EVENT_TIMER结构体,
1
ring_init(&event_timer_head);
(void) time(&event_present);
/* event_loop() monitors all I/O channels for which the application has
/* expressed interest, and monitors the timer request queue.
/* It notifies the application whenever events of interest happen.
/* A negative delay value causes the function to pause until something
/* happens; a positive delay value causes event_loop() to return when
/* the next event happens or when the delay time in seconds is over,
/* whatever happens first. A zero delay effectuates a poll.
/*
/* Note: in order to avoid race conditions, event_loop() cannot
/* not be called recursively.
/*
Event_loop,用于等待下一个事件,判断EVENT_STYLE,根据I/O复用模型不同定义不同变量
if (EVENTS_STYLE == EVENTS_STYLE_SELECT)
fd_set rmasank
fd_set wmask;
fd_set xmask;
struct timeval tv;
struct timeval *tvp;
int new_max_fd;
#else
EVENT_BUFFER event_buf[100];
EVENT_BUFFER *bp;
#endif
遍历双向环形链表,设定I/O多路复用的等待时间,event_loop的参数delay就是用来设定等待时间。由于多路复用函数会引发阻塞,所以确定等待时间的原则是:如果当前定时事件已经超时,来不及执行,则将select的等待时间为0,即检查后立即返回,以免妨碍下一个时间事件的执行。否则,如果当前时间事件还来得及执行,那多路复用函数可以阻塞到这个时间事件执行前,这样一方面多路复用函数可以有机会捕获事件,同时也不会影响定时事件的执行。
FOREACH_QUEUE_ENTRY(ring, &event_timer_head) {
timer = RING_TO_TIMER(ring);
msg_info("%s: time left %3d for 0x%lx 0x%lx", myname,
(int) (timer->when - event_present),
(long) timer->callback, (long) timer->context);
}
}
/*
* Find out when the next timer would go off. Timer requests are sorted.
* If any timer is scheduled, adjust the delay appropriately.
*/
if ((timer = FIRST_TIMER(&event_timer_head)) != 0) {
event_present = time((time_t *) 0);
if ((select_delay = timer->when - event_present) < 0) {
select_delay = 0;
} else if (delay >= 0 && select_delay > delay) {
select_delay = delay;
}
} else {
select_delay = delay;
}
if (msg_verbose > 2)
msg_info("event_loop: select_delay %d", select_delay);
除了select和epoll,还使用了kqueue,通过kevent函数和kqueue实例交互。
#define EVENT_BUFFER_READ(event_count, event_buf, buflen, delay) do { \
struct timespec ts; \
struct timespec *tsp; \
if ((delay) < 0) { \
tsp = 0; \
} else { \
tsp = &ts; \
ts.tv_nsec = 0; \
ts.tv_sec = (delay); \
} \
(event_count) = kevent(event_kq, (struct kevent *) 0, 0, (event_buf), \
(buflen), (tsp)); \
} while (0)
#define EVENT_BUFFER_READ_TEXT "kevent"
/* event_request_timer() causes the specified callback function to
/* be called with the specified context argument after \fIdelay\fR
/* seconds, or as soon as possible thereafter. The delay should
/* not be negative (the manifest EVENT_NULL_DELAY provides for
/* convenient zero-delay notification).
/* The event argument is equal to EVENT_TIME.
/* Only one timer request can be active per (callback, context) pair.
/* Calling event_request_timer() with an existing (callback, context)
/* pair does not schedule a new event, but updates the time of event
/* delivery. The result is the absolute time at which the timer is
/* scheduled to go off.
/*
time_t event_request_timer(EVENT_NOTIFY_TIME_FN callback, void *context, int delay);
/* event_request_timer - (re)set timer */
callback 为事件回调函数,context为上下文,delay是延时。遍历整个双向链表,
将callback context 和时间都保存在EVENT_TIMER 的结构体中。
timer = RING_TO_TIMER(ring);
if (timer->callback == callback && timer->context == context) {
timer->when = event_present + delay;
timer->loop_instance = event_loop_instance;
ring_detach(ring);
if (msg_verbose > 2)
msg_info("%s: reset 0x%lx 0x%lx %d", myname,
(long) callback, (long) context, delay);
break;
}
/* event_cancel_timer() cancels the specified (callback, context) request.
/* The application is allowed to cancel non-existing requests. The result
/* value is the amount of time left before the timer would have gone off,
/* or -1 in case of no pending timer.
/*
遍历事件列表,从列表中取消延时时间,如果在列表中没找到timer,或许它在别的线程中被取消了。
FOREACH_QUEUE_ENTRY(ring, &event_timer_head) {
timer = RING_TO_TIMER(ring);
if (timer->callback == callback && timer->context == context) {
if ((time_left = timer->when - event_present) < 0)
time_left = 0;
ring_detach(ring);
myfree((void *) timer);
break;
}
}
/* event_enable_read() (event_enable_write()) enables read (write) events
/* on the named I/O channel. It is up to the application to assemble
/* partial reads or writes.
/* An I/O channel cannot handle more than one request at the
/* same time. The application is allowed to enable an event that
/* is already enabled (same channel, same read or write operation,
/* but perhaps a different callback or context). On systems with
/* kernel-based event filters this is preferred usage, because
/* each disable and enable request would cost a system call.
/*
/* event_enable_write
将事件回调挂载到callback函数中。通过文件描述符fd,挂载回调函数,
if (EVENT_MASK_ISSET(fd, &event_wmask) == 0) {
EVENT_MASK_SET(fd, &event_xmask);
EVENT_MASK_SET(fd, &event_wmask);
if (event_max_fd < fd)
event_max_fd = fd;
#if (EVENTS_STYLE != EVENTS_STYLE_SELECT)
EVENT_REG_ADD_WRITE(err, fd);
if (err < 0)
msg_fatal("%s: %s: %m", myname, EVENT_REG_ADD_TEXT);
#endif
}
fdp = event_fdtable + fd;
if (fdp->callback != callback || fdp->context != context) {
fdp->callback = callback;
fdp->context = context;
}
Event_loop 根据时间执行其中的事件,调用回调函数,通过context 这个闭包保存的函数结构,执行其中的内容。