adapt the event loop code to support watching the same

fd from more than one place (and with different conditions)
This commit is contained in:
Ray Strode 2007-06-03 20:09:48 -04:00
parent 6eeab19385
commit f8b5c8157e
2 changed files with 355 additions and 110 deletions

View file

@ -47,12 +47,20 @@ typedef void (* ply_event_loop_free_handler_t) (void *);
typedef struct
{
int fd;
ply_list_t *destinations;
uint32_t is_being_watched : 1;
} ply_event_source_t;
typedef struct
{
ply_event_source_t *source;
ply_event_loop_fd_status_t status;
ply_event_handler_t status_met_handler;
ply_event_handler_t disconnected_handler;
void *user_data;
ply_event_loop_free_handler_t free_function;
} ply_event_source_t;
} ply_event_destination_t;
typedef struct
{
@ -94,8 +102,7 @@ static void ply_event_loop_process_pending_events (ply_event_loop_t *loop);
static void ply_event_loop_remove_source (ply_event_loop_t *loop,
ply_event_source_t *source);
static ply_list_node_t *ply_event_loop_find_source_node (ply_event_loop_t *loop,
int fd,
ply_event_loop_fd_status_t status);
int fd);
static ply_list_node_t *
ply_signal_dispatcher_find_source_node (ply_signal_dispatcher_t *dispatcher,
@ -247,24 +254,49 @@ ply_signal_dispatcher_reset_signal_sources (ply_signal_dispatcher_t *dispatcher,
}
}
static ply_event_destination_t *
ply_event_destination_new (ply_event_loop_fd_status_t status,
ply_event_handler_t status_met_handler,
ply_event_handler_t disconnected_handler,
void *user_data,
ply_event_loop_free_handler_t free_function)
{
ply_event_destination_t *destination;
destination = calloc (1, sizeof (ply_event_destination_t));
destination->source = NULL;
destination->status = status;
destination->status_met_handler = status_met_handler;
destination->disconnected_handler = disconnected_handler;
destination->user_data = user_data;
destination->free_function = free_function;
return destination;
}
static void
ply_event_destination_free (ply_event_destination_t *destination)
{
if (destination == NULL)
return;
if (destination->free_function != NULL)
destination->free_function ((void *) destination->user_data);
free (destination);
}
static ply_event_source_t *
ply_event_source_new (int fd,
ply_event_loop_fd_status_t status,
ply_event_handler_t status_met_handler,
ply_event_handler_t disconnected_handler,
void *user_data,
ply_event_loop_free_handler_t free_function)
ply_event_source_new (int fd)
{
ply_event_source_t *source;
source = calloc (1, sizeof (ply_event_source_t));
source->fd = fd;
source->status = status;
source->status_met_handler = status_met_handler;
source->disconnected_handler = disconnected_handler;
source->user_data = user_data;
source->free_function = free_function;
source->destinations = ply_list_new ();
source->is_being_watched = false;
return source;
}
@ -275,12 +307,143 @@ ply_event_source_free (ply_event_source_t *source)
if (source == NULL)
return;
if (source->free_function != NULL)
source->free_function ((void *) source->user_data);
assert (ply_list_get_length (source->destinations) == 0);
ply_list_free (source->destinations);
free (source);
}
static void
ply_event_loop_update_source_event_mask (ply_event_loop_t *loop,
ply_event_source_t *source)
{
ply_list_node_t *node;
struct epoll_event event = { 0 };
int status;
assert (loop != NULL);
assert (source != NULL);
assert (source->destinations != NULL);
event.events = EPOLLERR | EPOLLHUP;
node = ply_list_get_first_node (source->destinations);
while (node != NULL)
{
ply_list_node_t *next_node;
ply_event_destination_t *destination;
destination = (ply_event_destination_t *) ply_list_node_get_data (node);
next_node = ply_list_get_next_node (source->destinations, node);
if (destination->status & PLY_EVENT_LOOP_FD_STATUS_HAS_DATA)
event.events |= EPOLLIN;
if (destination->status & PLY_EVENT_LOOP_FD_STATUS_HAS_CONTROL_DATA)
event.events |= EPOLLPRI;
if (destination->status & PLY_EVENT_LOOP_FD_STATUS_CAN_TAKE_DATA)
event.events |= EPOLLOUT;
node = next_node;
}
event.data.ptr = source;
if (source->is_being_watched)
{
status = epoll_ctl (loop->epoll_fd, EPOLL_CTL_MOD, source->fd, &event);
assert (status == 0);
}
}
static ply_fd_watch_t *
ply_event_loop_add_destination_for_source (ply_event_loop_t *loop,
ply_event_destination_t *destination,
ply_event_source_t *source)
{
ply_list_node_t *destination_node;
assert (loop != NULL);
assert (destination != NULL);
assert (destination->source == NULL);
assert (source != NULL);
destination->source = source;
destination_node = ply_list_append_data (source->destinations, destination);
assert (destination_node != NULL);
assert (destination->source == source);
ply_event_loop_update_source_event_mask (loop, source);
return (ply_fd_watch_t *) destination_node;
}
static ply_event_destination_t *
ply_event_loop_get_destination_from_fd_watch (ply_event_loop_t *loop,
ply_fd_watch_t *watch)
{
ply_list_node_t *destination_node;
ply_event_destination_t *destination;
assert (loop != NULL);
assert (watch != NULL);
destination_node = (ply_list_node_t *) watch;
destination = (ply_event_destination_t *)
ply_list_node_get_data (destination_node);
return destination;
}
static void
ply_event_loop_remove_destination_by_watch (ply_event_loop_t *loop,
ply_fd_watch_t *watch)
{
ply_list_node_t *destination_node;
ply_event_destination_t *destination;
ply_event_source_t *source;
assert (loop != NULL);
assert (watch != NULL);
destination_node = (ply_list_node_t *) watch;
destination = ply_event_loop_get_destination_from_fd_watch (loop, watch);
assert (destination != NULL);
source = destination->source;
assert (source != NULL);
ply_list_remove_node (source->destinations, destination_node);
ply_event_loop_update_source_event_mask (loop, source);
}
static void
ply_event_loop_remove_destinations_for_source (ply_event_loop_t *loop,
ply_event_source_t *source)
{
ply_list_node_t *destination_node;
destination_node = ply_list_get_first_node (source->destinations);
while (destination_node != NULL)
{
ply_list_node_t *next_node;
ply_event_destination_t *destination;
destination = (ply_event_destination_t *)
ply_list_node_get_data (destination_node);
assert (destination != NULL);
assert (destination->source == source);
next_node = ply_list_get_next_node (source->destinations,
destination_node);
ply_list_remove_node (source->destinations, destination_node);
ply_event_loop_update_source_event_mask (loop, source);
ply_event_destination_free (destination);
destination_node = next_node;
}
}
ply_event_loop_t *
ply_event_loop_new (void)
{
@ -393,6 +556,28 @@ ply_event_loop_free (ply_event_loop_t *loop)
free (loop);
}
static ply_list_node_t *
ply_event_loop_find_source_node (ply_event_loop_t *loop,
int fd)
{
ply_list_node_t *node;
node = ply_list_get_first_node (loop->sources);
while (node != NULL)
{
ply_event_source_t *source;
source = (ply_event_source_t *) ply_list_node_get_data (node);
if (source->fd == fd)
break;
node = ply_list_get_next_node (loop->sources, node);
}
return node;
}
static void
ply_event_loop_add_source (ply_event_loop_t *loop,
ply_event_source_t *source)
@ -400,22 +585,17 @@ ply_event_loop_add_source (ply_event_loop_t *loop,
struct epoll_event event = { 0 };
int status;
assert (ply_event_loop_find_source_node (loop, source->fd) == NULL);
assert (source->is_being_watched == false);
event.events = EPOLLERR | EPOLLHUP;
if (source->status & PLY_EVENT_LOOP_FD_STATUS_HAS_DATA)
event.events |= EPOLLIN;
if (source->status & PLY_EVENT_LOOP_FD_STATUS_HAS_CONTROL_DATA)
event.events |= EPOLLPRI;
if (source->status & PLY_EVENT_LOOP_FD_STATUS_CAN_TAKE_DATA)
event.events |= EPOLLOUT;
event.data.ptr = source;
status = epoll_ctl (loop->epoll_fd, EPOLL_CTL_ADD, source->fd, &event);
assert (status == 0);
source->is_being_watched = true;
ply_list_append_data (loop->sources, source);
}
@ -424,20 +604,18 @@ ply_event_loop_remove_source_node (ply_event_loop_t *loop,
ply_list_node_t *source_node)
{
ply_event_source_t *source;
struct epoll_event event = { 0 };
int status;
source = (ply_event_source_t *) ply_list_node_get_data (source_node);
assert (source != NULL);
event.events = EPOLLIN | EPOLLPRI | EPOLLERR | EPOLLHUP;
event.data.ptr = source;
status = epoll_ctl (loop->epoll_fd, EPOLL_CTL_DEL, source->fd, &event);
#ifdef EXPOSE_FD_ACCOUNTING_BUG
assert (status == 0);
#endif
if (source->is_being_watched)
{
status = epoll_ctl (loop->epoll_fd, EPOLL_CTL_DEL, source->fd, NULL);
assert (status == 0);
source->is_being_watched = false;
}
ply_list_remove_node (loop->sources, source_node);
}
@ -452,42 +630,45 @@ ply_event_loop_remove_source (ply_event_loop_t *loop,
assert (source_node != NULL);
ply_event_loop_remove_destinations_for_source (loop, source);
ply_event_loop_remove_source_node (loop, source_node);
}
static ply_list_node_t *
ply_event_loop_find_source_node (ply_event_loop_t *loop,
int fd,
ply_event_loop_fd_status_t status)
{
ply_list_node_t *node;
node = ply_list_get_first_node (loop->sources);
while (node != NULL)
{
ply_event_source_t *source;
source = (ply_event_source_t *) ply_list_node_get_data (node);
if ((source->fd == fd) && (source->status == status))
break;
node = ply_list_get_next_node (loop->sources, node);
}
return node;
}
static bool
ply_event_loop_fd_status_is_valid (ply_event_loop_fd_status_t status)
{
return (status & ~(PLY_EVENT_LOOP_FD_STATUS_HAS_DATA
return (status & ~(PLY_EVENT_LOOP_FD_STATUS_NONE
| PLY_EVENT_LOOP_FD_STATUS_HAS_DATA
| PLY_EVENT_LOOP_FD_STATUS_HAS_CONTROL_DATA
| PLY_EVENT_LOOP_FD_STATUS_CAN_TAKE_DATA)) == 0;
}
void
static ply_event_source_t *
ply_event_loop_get_source_from_fd (ply_event_loop_t *loop,
int fd)
{
ply_list_node_t *source_node;
ply_event_source_t *source;
source_node = ply_event_loop_find_source_node (loop, fd);
if (source_node == NULL)
{
source = ply_event_source_new (fd);
ply_event_loop_add_source (loop, source);
source_node = ply_list_get_last_node (loop->sources);
assert (source_node != NULL);
}
source = (ply_event_source_t *) ply_list_node_get_data (source_node);
assert (source->fd == fd);
return source;
}
ply_fd_watch_t *
ply_event_loop_watch_fd (ply_event_loop_t *loop,
int fd,
ply_event_loop_fd_status_t status,
@ -495,43 +676,48 @@ ply_event_loop_watch_fd (ply_event_loop_t *loop,
ply_event_handler_t disconnected_handler,
void *user_data)
{
ply_list_node_t *node;
ply_event_source_t *source;
ply_event_destination_t *destination;
ply_fd_watch_t *watch;
assert (loop != NULL);
assert (fd >= 0);
assert (ply_event_loop_fd_status_is_valid (status));
assert (status != PLY_EVENT_LOOP_FD_STATUS_NONE || status_met_handler == NULL);
node = ply_event_loop_find_source_node (loop, fd, status);
source = ply_event_loop_get_source_from_fd (loop, fd);
assert (source != NULL);
assert (node == NULL);
destination = ply_event_destination_new (status, status_met_handler,
disconnected_handler, user_data,
NULL);
watch = ply_event_loop_add_destination_for_source (loop, destination, source);
source = ply_event_source_new (fd,
status,
status_met_handler,
disconnected_handler,
user_data, NULL);
ply_event_loop_add_source (loop, source);
return watch;
}
void
ply_event_loop_stop_watching_fd (ply_event_loop_t *loop,
int fd,
ply_event_loop_fd_status_t status)
ply_event_loop_stop_watching_fd (ply_event_loop_t *loop,
ply_fd_watch_t *watch)
{
ply_list_node_t *node;
ply_event_destination_t *destination;
ply_event_source_t *source;
node = ply_event_loop_find_source_node (loop, fd, status);
destination = ply_event_loop_get_destination_from_fd_watch (loop, watch);
assert (destination != NULL);
assert (node != NULL);
source = destination->source;
assert (source != NULL);
assert (source->fd >= 0);
source = (ply_event_source_t *) ply_list_node_get_data (node);
ply_event_loop_remove_destination_by_watch (loop, watch);
ply_event_loop_remove_source_node (loop, node);
ply_event_source_free (source);
if (ply_list_get_length (source->destinations) == 0)
{
ply_event_loop_remove_source (loop, source);
ply_event_source_free (source);
}
}
static ply_list_node_t *
@ -546,6 +732,8 @@ ply_signal_dispatcher_find_source_node (ply_signal_dispatcher_t *dispatcher,
ply_signal_source_t *handler;
handler = (ply_signal_source_t *) ply_list_node_get_data (node);
assert (handler != NULL);
if (handler->signal_number == signal_number)
break;
@ -619,6 +807,71 @@ ply_event_loop_watch_for_exit (ply_event_loop_t *loop,
ply_list_append_data (loop->exit_closures, exit_closure);
}
static ply_event_loop_fd_status_t
ply_event_loop_get_fd_status_from_poll_mask (uint32_t mask)
{
ply_event_loop_fd_status_t status;
status = PLY_EVENT_LOOP_FD_STATUS_NONE;
if (mask & EPOLLIN)
status |= PLY_EVENT_LOOP_FD_STATUS_HAS_DATA;
if (mask & EPOLLPRI)
status |= PLY_EVENT_LOOP_FD_STATUS_HAS_CONTROL_DATA;
if (mask & EPOLLOUT)
status |= PLY_EVENT_LOOP_FD_STATUS_CAN_TAKE_DATA;
return status;
}
static void
ply_event_loop_dispatch_event_for_source (ply_event_loop_t *loop,
ply_event_source_t *source,
ply_event_loop_fd_status_t status,
bool is_disconnected)
{
ply_list_node_t *node;
node = ply_list_get_first_node (source->destinations);
while (node != NULL)
{
ply_list_node_t *next_node;
ply_event_destination_t *destination;
destination = (ply_event_destination_t *) ply_list_node_get_data (node);
next_node = ply_list_get_next_node (source->destinations, node);
if (((destination->status & status) != 0)
&& (destination->status_met_handler != NULL))
destination->status_met_handler (destination->user_data, source->fd);
node = next_node;
}
if (is_disconnected)
{
node = ply_list_get_first_node (source->destinations);
while (node != NULL)
{
ply_list_node_t *next_node;
ply_event_destination_t *destination;
destination = (ply_event_destination_t *) ply_list_node_get_data (node);
next_node = ply_list_get_next_node (source->destinations, node);
if (destination->disconnected_handler != NULL)
destination->disconnected_handler (destination->user_data,
source->fd);
node = next_node;
}
ply_event_loop_remove_source (loop, source);
}
}
static void
ply_event_loop_process_pending_events (ply_event_loop_t *loop)
{
@ -655,33 +908,21 @@ ply_event_loop_process_pending_events (ply_event_loop_t *loop)
{
ply_event_source_t *source;
ply_event_loop_fd_status_t status;
bool is_disconnected;
source = (ply_event_source_t *) (events[i].data.ptr);
status = PLY_EVENT_LOOP_FD_STATUS_NONE;
if (events[i].events & EPOLLIN)
status |= PLY_EVENT_LOOP_FD_STATUS_HAS_DATA;
if (events[i].events & EPOLLPRI)
status |= PLY_EVENT_LOOP_FD_STATUS_HAS_CONTROL_DATA;
if (events[i].events & EPOLLOUT)
status |= PLY_EVENT_LOOP_FD_STATUS_CAN_TAKE_DATA;
if (((source->status & status) != 0)
&& (source->status_met_handler != NULL))
source->status_met_handler (source->user_data, source->fd);
status = ply_event_loop_get_fd_status_from_poll_mask (events[i].events);
if ((events[i].events & EPOLLHUP) || (events[i].events & EPOLLERR))
{
if (source->disconnected_handler != NULL)
source->disconnected_handler (source->user_data, source->fd);
is_disconnected = true;
else
is_disconnected = false;
ply_event_loop_remove_source (loop, source);
ply_event_source_free (source);
source = NULL;
}
if (is_disconnected)
source->is_being_watched = false;
ply_event_loop_dispatch_event_for_source (loop, source, status,
is_disconnected);
if (loop->should_exit)
break;
@ -689,8 +930,11 @@ ply_event_loop_process_pending_events (ply_event_loop_t *loop)
}
void
ply_event_loop_exit (ply_event_loop_t *loop, int exit_code)
ply_event_loop_exit (ply_event_loop_t *loop,
int exit_code)
{
assert (loop != NULL);
loop->should_exit = true;
loop->exit_code = exit_code;
}

View file

@ -23,8 +23,10 @@
#define PLY_EVENT_LOOP_H
#include <stdbool.h>
#include <stdint.h>
typedef struct _ply_event_loop ply_event_loop_t;
typedef intptr_t ply_fd_watch_t;
typedef enum {
PLY_EVENT_LOOP_FD_STATUS_NONE = 0,
@ -43,15 +45,14 @@ typedef void (* ply_event_loop_exit_handler_t) (void *user_data,
#ifndef PLY_HIDE_FUNCTION_DECLARATIONS
ply_event_loop_t *ply_event_loop_new (void);
void ply_event_loop_free (ply_event_loop_t *loop);
void ply_event_loop_watch_fd (ply_event_loop_t *loop,
int fd,
ply_event_loop_fd_status_t status,
ply_event_handler_t status_met_handler,
ply_event_handler_t disconnected_handler,
void *user_data);
ply_fd_watch_t *ply_event_loop_watch_fd (ply_event_loop_t *loop,
int fd,
ply_event_loop_fd_status_t status,
ply_event_handler_t status_met_handler,
ply_event_handler_t disconnected_handler,
void *user_data);
void ply_event_loop_stop_watching_fd (ply_event_loop_t *loop,
int fd,
ply_event_loop_fd_status_t status);
ply_fd_watch_t *watch);
void ply_event_loop_watch_signal (ply_event_loop_t *loop,
int signal_number,
ply_event_handler_t signal_handler,