diff --git a/pinos/client/loop.c b/pinos/client/loop.c new file mode 100644 index 000000000..87d0c602c --- /dev/null +++ b/pinos/client/loop.c @@ -0,0 +1,591 @@ +/* Pinos + * Copyright (C) 2016 Wim Taymans + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include +#include + +struct _PinosSource { + SpaSource source; + SpaList link; + + bool close; + union { + PinosSourceIOFunc io; + PinosSourceIdleFunc idle; + PinosSourceEventFunc event; + PinosSourceTimerFunc timer; + PinosSourceSignalFunc signal; + } func; + int signal_number; +}; + +#define DATAS_SIZE (4096 * 8) + +typedef struct { + size_t item_size; + SpaInvokeFunc func; + uint32_t seq; + size_t size; + void *data; + void *user_data; +} InvokeItem; + +typedef struct { + PinosLoop this; + + SpaList source_list; + + PinosLoopHook pre_func; + PinosLoopHook post_func; + void *hook_data; + + pthread_t thread; + + SpaLoop loop; + + PinosSource *event; + + SpaRingbuffer buffer; + uint8_t buffer_data[DATAS_SIZE]; + + int epoll_fd; +} PinosLoopImpl; + +static SpaResult +loop_add_source (SpaLoop *loop, + SpaSource *source) +{ + PinosLoopImpl *impl = SPA_CONTAINER_OF (loop, PinosLoopImpl, loop); + struct epoll_event ep; + + source->loop = loop; + + if (source->fd != -1) { + spa_zero (ep); + if (source->mask & SPA_IO_IN) + ep.events |= EPOLLIN; + if (source->mask & SPA_IO_OUT) + ep.events |= EPOLLOUT; + if (source->mask & SPA_IO_ERR) + ep.events |= EPOLLERR; + if (source->mask & SPA_IO_HUP) + ep.events |= EPOLLHUP; + ep.data.ptr = source; + + if (epoll_ctl (impl->epoll_fd, EPOLL_CTL_ADD, source->fd, &ep) < 0) + return SPA_RESULT_ERRNO; + } + + return SPA_RESULT_OK; +} + +static SpaResult +loop_update_source (SpaSource *source) +{ + SpaLoop *loop = source->loop; + PinosLoopImpl *impl = SPA_CONTAINER_OF (loop, PinosLoopImpl, loop); + + if (source->fd != -1) { + struct epoll_event ep; + + spa_zero (ep); + if (source->mask & SPA_IO_IN) + ep.events |= EPOLLIN; + if (source->mask & SPA_IO_OUT) + ep.events |= EPOLLOUT; + if (source->mask & SPA_IO_ERR) + ep.events |= EPOLLERR; + if (source->mask & SPA_IO_HUP) + ep.events |= EPOLLHUP; + ep.data.ptr = source; + + if (epoll_ctl (impl->epoll_fd, EPOLL_CTL_MOD, source->fd, &ep) < 0) + return SPA_RESULT_ERRNO; + } + return SPA_RESULT_OK; +} + +static void +loop_remove_source (SpaSource *source) +{ + SpaLoop *loop = source->loop; + PinosLoopImpl *impl = SPA_CONTAINER_OF (loop, PinosLoopImpl, loop); + + if (source->fd != -1) + epoll_ctl (impl->epoll_fd, EPOLL_CTL_DEL, source->fd, NULL); + + source->loop = NULL; +} + +static SpaResult +loop_invoke (SpaLoop *loop, + SpaInvokeFunc func, + uint32_t seq, + size_t size, + void *data, + void *user_data) +{ + PinosLoopImpl *impl = SPA_CONTAINER_OF (loop, PinosLoopImpl, loop); + bool in_thread = pthread_equal (impl->thread, pthread_self()); + SpaRingbufferArea areas[2]; + InvokeItem *item; + SpaResult res; + + if (in_thread) { + res = func (loop, false, seq, size, data, user_data); + } else { + spa_ringbuffer_get_write_areas (&impl->buffer, areas); + if (areas[0].len < sizeof (InvokeItem)) { + pinos_log_warn ("data-loop %p: queue full", impl); + return SPA_RESULT_ERROR; + } + item = SPA_MEMBER (impl->buffer_data, areas[0].offset, InvokeItem); + item->func = func; + item->seq = seq; + item->size = size; + item->user_data = user_data; + + if (areas[0].len > sizeof (InvokeItem) + size) { + item->data = SPA_MEMBER (item, sizeof (InvokeItem), void); + item->item_size = sizeof (InvokeItem) + size; + if (areas[0].len < sizeof (InvokeItem) + item->item_size) + item->item_size = areas[0].len; + } else { + item->data = SPA_MEMBER (impl->buffer_data, areas[1].offset, void); + item->item_size = areas[0].len + 1 + size; + } + memcpy (item->data, data, size); + + spa_ringbuffer_write_advance (&impl->buffer, item->item_size); + + pinos_source_event_signal (impl->event); + + if (seq != SPA_ID_INVALID) + res = SPA_RESULT_RETURN_ASYNC (seq); + else + res = SPA_RESULT_OK; + } + return res; +} + +static void +event_func (PinosSource *source, + void *data) +{ + PinosLoopImpl *impl = data; + size_t offset; + + while (spa_ringbuffer_get_read_offset (&impl->buffer, &offset) > 0) { + InvokeItem *item = SPA_MEMBER (impl->buffer_data, offset, InvokeItem); + item->func (impl->this.loop, true, item->seq, item->size, item->data, item->user_data); + spa_ringbuffer_read_advance (&impl->buffer, item->item_size); + } +} + +PinosLoop * +pinos_loop_new (void) +{ + PinosLoopImpl *impl; + PinosLoop *this; + + impl = calloc (1, sizeof (PinosLoopImpl)); + this = &impl->this; + + impl->epoll_fd = epoll_create1 (EPOLL_CLOEXEC); + if (impl->epoll_fd == -1) { + free (impl); + return NULL; + } + + spa_list_init (&impl->source_list); + + pinos_signal_init (&this->destroy_signal); + + impl->loop.size = sizeof (SpaLoop); + impl->loop.add_source = loop_add_source; + impl->loop.update_source = loop_update_source; + impl->loop.remove_source = loop_remove_source; + impl->loop.invoke = loop_invoke; + this->loop = &impl->loop; + + spa_ringbuffer_init (&impl->buffer, DATAS_SIZE); + + impl->event = pinos_loop_add_event (this, + event_func, + impl); + return this; +} + +void +pinos_loop_destroy (PinosLoop *loop) +{ + PinosLoopImpl *impl = SPA_CONTAINER_OF (loop, PinosLoopImpl, this); + PinosSource *source, *tmp; + + pinos_signal_emit (&loop->destroy_signal, loop); + + spa_list_for_each_safe (source, tmp, &impl->source_list, link) + pinos_source_destroy (source); + + close (impl->epoll_fd); + free (impl); +} + +int +pinos_loop_get_fd (PinosLoop *loop) +{ + PinosLoopImpl *impl = SPA_CONTAINER_OF (loop, PinosLoopImpl, this); + + return impl->epoll_fd; +} + +void +pinos_loop_set_hooks (PinosLoop *loop, + PinosLoopHook pre_func, + PinosLoopHook post_func, + void *data) +{ + PinosLoopImpl *impl = SPA_CONTAINER_OF (loop, PinosLoopImpl, this); + + impl->pre_func = pre_func; + impl->post_func = post_func; + impl->hook_data = data; +} + +void +pinos_loop_set_thread (PinosLoop *loop, + void *thread) +{ + PinosLoopImpl *impl = SPA_CONTAINER_OF (loop, PinosLoopImpl, this); + impl->thread = *((pthread_t*)thread); +} + +SpaResult +pinos_loop_iterate (PinosLoop *loop, + int timeout) +{ + PinosLoopImpl *impl = SPA_CONTAINER_OF (loop, PinosLoopImpl, this); + struct epoll_event ep[32]; + int i, nfds, save_errno; + + if (SPA_UNLIKELY (impl->pre_func)) + impl->pre_func (loop, impl->hook_data); + + if (SPA_UNLIKELY ((nfds = epoll_wait(impl->epoll_fd, ep, SPA_N_ELEMENTS (ep), timeout)) < 0)) + save_errno = errno; + + if (SPA_UNLIKELY (impl->post_func)) + impl->post_func (loop, impl->hook_data); + + if (SPA_UNLIKELY (nfds < 0)) { + errno = save_errno; + return SPA_RESULT_ERRNO; + } + + for (i = 0; i < nfds; i++) { + SpaSource *source = ep[i].data.ptr; + + source->rmask = 0; + if (ep[i].events & EPOLLIN) + source->rmask |= SPA_IO_IN; + if (ep[i].events & EPOLLOUT) + source->rmask |= SPA_IO_OUT; + if (ep[i].events & EPOLLHUP) + source->rmask |= SPA_IO_HUP; + if (ep[i].events & EPOLLERR) + source->rmask |= SPA_IO_ERR; + + source->func (source); + } + return SPA_RESULT_OK; +} + +static void +source_io_func (SpaSource *source) +{ + PinosSource *s = SPA_CONTAINER_OF (source, PinosSource, source); + s->func.io (s, s->source.fd, s->source.rmask, s->source.data); +} + +PinosSource * +pinos_loop_add_io (PinosLoop *loop, + int fd, + SpaIO mask, + bool close, + PinosSourceIOFunc func, + void *data) +{ + PinosLoopImpl *impl = SPA_CONTAINER_OF (loop, PinosLoopImpl, this); + PinosSource *source; + + source = calloc (1, sizeof (PinosSource)); + + source->source.loop = loop->loop; + source->source.func = source_io_func; + source->source.data = data; + source->source.fd = fd; + source->source.mask = mask; + source->close = close; + source->func.io = func; + + spa_loop_add_source (loop->loop, &source->source); + + spa_list_insert (&impl->source_list, &source->link); + + return source; +} + +SpaResult +pinos_source_io_update (PinosSource *source, + SpaIO mask) +{ + source->source.mask = mask; + return spa_loop_update_source (source->source.loop, &source->source); +} + + +static void +source_idle_func (SpaSource *source) +{ + PinosSource *s = SPA_CONTAINER_OF (source, PinosSource, source); + s->func.idle (s, s->source.data); +} + +PinosSource * +pinos_loop_add_idle (PinosLoop *loop, + PinosSourceIdleFunc func, + void *data) +{ + PinosLoopImpl *impl = SPA_CONTAINER_OF (loop, PinosLoopImpl, this); + PinosSource *source; + + source = calloc (1, sizeof (PinosSource)); + + source->source.loop = loop->loop; + source->source.func = source_idle_func; + source->source.data = data; + source->source.fd = eventfd (0, EFD_CLOEXEC | EFD_NONBLOCK); + source->close = true; + source->source.mask = SPA_IO_IN; + source->func.idle = func; + + spa_loop_add_source (loop->loop, &source->source); + + spa_list_insert (&impl->source_list, &source->link); + + pinos_source_idle_enable (source, true); + + return source; +} + +void +pinos_source_idle_enable (PinosSource *source, + bool enabled) +{ + uint64_t count; + + if (enabled) { + count = 1; + if (write (source->source.fd, &count, sizeof (uint64_t)) != sizeof (uint64_t)) + pinos_log_warn ("loop %p: failed to write idle fd: %s", source, strerror (errno)); + } else { + if (read (source->source.fd, &count, sizeof (uint64_t)) != sizeof (uint64_t)) + pinos_log_warn ("loop %p: failed to read idle fd: %s", source, strerror (errno)); + } +} + +static void +source_event_func (SpaSource *source) +{ + PinosSource *s = SPA_CONTAINER_OF (source, PinosSource, source); + uint64_t count; + + if (read (source->fd, &count, sizeof (uint64_t)) != sizeof (uint64_t)) + pinos_log_warn ("loop %p: failed to read event fd: %s", source, strerror (errno)); + + s->func.event (s, s->source.data); +} + +PinosSource * +pinos_loop_add_event (PinosLoop *loop, + PinosSourceEventFunc func, + void *data) +{ + PinosLoopImpl *impl = SPA_CONTAINER_OF (loop, PinosLoopImpl, this); + PinosSource *source; + + source = calloc (1, sizeof (PinosSource)); + + source->source.loop = loop->loop; + source->source.func = source_event_func; + source->source.data = data; + source->source.fd = eventfd (0, EFD_CLOEXEC | EFD_NONBLOCK); + source->source.mask = SPA_IO_IN; + source->close = true; + source->func.event = func; + + spa_loop_add_source (loop->loop, &source->source); + + spa_list_insert (&impl->source_list, &source->link); + + return source; +} + +void +pinos_source_event_signal (PinosSource *source) +{ + uint64_t count = 1; + + if (write (source->source.fd, &count, sizeof(uint64_t)) != sizeof(uint64_t)) + pinos_log_warn ("loop %p: failed to write event fd: %s", source, strerror (errno)); +} + +static void +source_timer_func (SpaSource *source) +{ + PinosSource *s = SPA_CONTAINER_OF (source, PinosSource, source); + uint64_t expires; + + if (read (source->fd, &expires, sizeof (uint64_t)) != sizeof (uint64_t)) + pinos_log_warn ("loop %p: failed to read timer fd: %s", source, strerror (errno)); + + s->func.timer (s, s->source.data); +} + +PinosSource * +pinos_loop_add_timer (PinosLoop *loop, + PinosSourceTimerFunc func, + void *data) +{ + PinosLoopImpl *impl = SPA_CONTAINER_OF (loop, PinosLoopImpl, this); + PinosSource *source; + + source = calloc (1, sizeof (PinosSource)); + + source->source.loop = loop->loop; + source->source.func = source_timer_func; + source->source.data = data; + source->source.fd = timerfd_create (CLOCK_MONOTONIC, TFD_CLOEXEC | TFD_NONBLOCK); + source->source.mask = SPA_IO_IN; + source->close = true; + source->func.timer = func; + + spa_loop_add_source (loop->loop, &source->source); + + spa_list_insert (&impl->source_list, &source->link); + + return source; +} + +SpaResult +pinos_source_timer_update (PinosSource *source, + struct timespec *value, + struct timespec *interval, + bool absolute) +{ + struct itimerspec its; + int flags = 0; + + spa_zero (its); + if (value) + its.it_value = *value; + if (interval) + its.it_interval = *interval; + if (absolute) + flags |= TFD_TIMER_ABSTIME; + + if (timerfd_settime (source->source.fd, flags, &its, NULL) < 0) + return SPA_RESULT_ERRNO; + + return SPA_RESULT_OK; +} + +static void +source_signal_func (SpaSource *source) +{ + PinosSource *s = SPA_CONTAINER_OF (source, PinosSource, source); + struct signalfd_siginfo signal_info; + + if (read (source->fd, &signal_info, sizeof (signal_info)) != sizeof (signal_info)) + pinos_log_warn ("loop %p: failed to read signal fd: %s", source, strerror (errno)); + + s->func.signal (s, s->signal_number, s->source.data); +} + +PinosSource * +pinos_loop_add_signal (PinosLoop *loop, + int signal_number, + PinosSourceSignalFunc func, + void *data) +{ + PinosLoopImpl *impl = SPA_CONTAINER_OF (loop, PinosLoopImpl, this); + PinosSource *source; +#if 0 + sigset_t mask; +#endif + + source = calloc (1, sizeof (PinosSource)); + + source->source.loop = loop->loop; + source->source.func = source_signal_func; + source->source.data = data; +#if 0 + sigemptyset (&mask); + sigaddset (&mask, signal_number); + source->source.fd = signalfd (-1, &mask, SFD_CLOEXEC | SFD_NONBLOCK); + sigprocmask (SIG_BLOCK, &mask, NULL); +#endif + source->source.mask = SPA_IO_IN; + source->close = true; + source->func.signal = func; + source->signal_number = signal_number; + + spa_loop_add_source (loop->loop, &source->source); + + spa_list_insert (&impl->source_list, &source->link); + + return source; +} + +void +pinos_source_destroy (PinosSource *source) +{ + spa_list_remove (&source->link); + + spa_loop_remove_source (source->source.loop, &source->source); + + if (source->source.fd != -1 && source->close) + close (source->source.fd); + free (source); +} diff --git a/pinos/client/loop.h b/pinos/client/loop.h new file mode 100644 index 000000000..6c8029877 --- /dev/null +++ b/pinos/client/loop.h @@ -0,0 +1,124 @@ +/* Pinos + * Copyright (C) 2016 Wim Taymans + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#ifndef __PINOS_LOOP_H__ +#define __PINOS_LOOP_H__ + +#ifdef __cplusplus +extern "C" { +#endif + +#include +#include +#include + +typedef struct _PinosLoop PinosLoop; + +typedef bool (*PinosCheckfunc) (PinosLoop *loop, + void *data); + +typedef void (*PinosLoopHook) (PinosLoop *loop, + void *data); + +typedef struct _PinosSource PinosSource; + +typedef void (*PinosSourceIOFunc) (PinosSource *source, + int fd, + SpaIO mask, + void *data); +typedef void (*PinosSourceIdleFunc) (PinosSource *source, + void *data); +typedef void (*PinosSourceEventFunc) (PinosSource *source, + void *data); +typedef void (*PinosSourceTimerFunc) (PinosSource *source, + void *data); +typedef void (*PinosSourceSignalFunc) (PinosSource *source, + int signal_number, + void *data); +/** + * PinosLoop: + * + * Pinos loop interface. + */ +struct _PinosLoop { + SpaLoop *loop; + + PINOS_SIGNAL (destroy_signal, (PinosListener *listener, + PinosLoop *loop)); +}; + +PinosLoop * pinos_loop_new (void); +void pinos_loop_destroy (PinosLoop *loop); + +int pinos_loop_get_fd (PinosLoop *loop); + +void pinos_loop_set_hooks (PinosLoop *loop, + PinosLoopHook pre_func, + PinosLoopHook post_func, + void *data); +void pinos_loop_set_thread (PinosLoop *loop, + void *thread); + +SpaResult pinos_loop_iterate (PinosLoop *loop, + int timeout); + +#define pinos_loop_add_source(l,s) ((l)->loop->add_source((l)->loop,s); +#define pinos_loop_update_source(l,s) ((l)->loop->update_source(s); +#define pinos_loop_remove_source(l,s) ((l)->loop->remove_source(s); + +PinosSource * pinos_loop_add_io (PinosLoop *loop, + int fd, + SpaIO mask, + bool close, + PinosSourceIOFunc func, + void *data); +SpaResult pinos_source_io_update (PinosSource *source, + SpaIO mask); + +PinosSource * pinos_loop_add_idle (PinosLoop *loop, + PinosSourceIdleFunc func, + void *data); +void pinos_source_idle_enable (PinosSource *source, + bool enabled); + +PinosSource * pinos_loop_add_event (PinosLoop *loop, + PinosSourceEventFunc func, + void *data); +void pinos_source_event_signal (PinosSource *source); + +PinosSource * pinos_loop_add_timer (PinosLoop *loop, + PinosSourceTimerFunc func, + void *data); +SpaResult pinos_source_timer_update (PinosSource *source, + struct timespec *value, + struct timespec *interval, + bool absolute); + +PinosSource * pinos_loop_add_signal (PinosLoop *loop, + int signal_number, + PinosSourceSignalFunc func, + void *data); + +void pinos_source_destroy (PinosSource *source); + +#ifdef __cplusplus +} +#endif + +#endif /* __PINOS_LOOP_H__ */ diff --git a/pinos/client/mem.c b/pinos/client/mem.c index 24f448c17..696dd255b 100644 --- a/pinos/client/mem.c +++ b/pinos/client/mem.c @@ -17,6 +17,10 @@ * Boston, MA 02110-1301, USA. */ +#ifndef _GNU_SOURCE +#define _GNU_SOURCE +#endif + #include #include #include diff --git a/pinos/client/meson.build b/pinos/client/meson.build index f3bcd21e3..883f7b9c9 100644 --- a/pinos/client/meson.build +++ b/pinos/client/meson.build @@ -4,6 +4,7 @@ pinos_headers = [ 'format.h', 'introspect.h', 'log.h', + 'loop.h', 'map.h', 'mem.h', 'pinos.h', @@ -22,6 +23,7 @@ pinos_sources = [ 'format.c', 'introspect.c', 'log.c', + 'loop.c', 'mapper.c', 'mem.c', 'properties.c', @@ -58,6 +60,7 @@ enumtypes_c = custom_target('enumtypes_c', libpinos_c_args = [ '-DHAVE_CONFIG_H', '-D_GNU_SOURCE', + '-D_POSIX_C_SOURCE', ] diff --git a/pinos/client/pinos.h b/pinos/client/pinos.h index 07f786a9d..9e9e77795 100644 --- a/pinos/client/pinos.h +++ b/pinos/client/pinos.h @@ -26,6 +26,7 @@ extern const char g_log_domain_pinos[]; #include #include #include +#include #include #include #include diff --git a/pinos/server/client-node.c b/pinos/server/client-node.c index 2b7b07d9c..b8d622508 100644 --- a/pinos/server/client-node.c +++ b/pinos/server/client-node.c @@ -24,7 +24,6 @@ #include #include #include -#include #include #include #include @@ -94,18 +93,16 @@ struct _SpaProxy SpaIDMap *map; SpaLog *log; - SpaPoll *main_loop; - SpaPoll *data_loop; + SpaLoop *main_loop; + SpaLoop *data_loop; SpaNodeEventCallback event_cb; void *user_data; - SpaPollFd ctrl_fds[1]; - SpaPollItem ctrl_poll; + SpaSource ctrl_source; PinosConnection *conn; - SpaPollFd data_fds[1]; - SpaPollItem data_poll; + SpaSource data_source; unsigned int max_inputs; unsigned int n_inputs; @@ -858,7 +855,7 @@ spa_proxy_node_port_reuse_buffer (SpaNode *node, rb.buffer_id = buffer_id; pinos_transport_add_event (pnode->transport, &rb.event); cmd = PINOS_TRANSPORT_CMD_HAVE_EVENT; - write (this->data_fds[0].fd, &cmd, 1); + write (this->data_source.fd, &cmd, 1); return SPA_RESULT_OK; } @@ -927,7 +924,7 @@ spa_proxy_node_process_input (SpaNode *node) return SPA_RESULT_ERROR; cmd = PINOS_TRANSPORT_CMD_HAVE_DATA; - write (this->data_fds[0].fd, &cmd, 1); + write (this->data_source.fd, &cmd, 1); return SPA_RESULT_OK; } @@ -1089,27 +1086,25 @@ parse_connection (SpaProxy *this) return SPA_RESULT_OK; } -static int -proxy_on_fd_events (SpaPollNotifyData *data) +static void +proxy_on_fd_events (SpaSource *source) { - SpaProxy *this = data->user_data; + SpaProxy *this = source->data; - if (data->fds[0].revents & POLLIN) { + if (source->rmask & SPA_IO_IN) parse_connection (this); - } - return 0; } -static int -proxy_on_data_fd_events (SpaPollNotifyData *data) +static void +proxy_on_data_fd_events (SpaSource *source) { - SpaProxy *this = data->user_data; + SpaProxy *this = source->data; PinosNode *pnode = this->pnode; - if (data->fds[0].revents & POLLIN) { + if (source->rmask & SPA_IO_IN) { uint8_t cmd; - read (this->data_fds[0].fd, &cmd, 1); + read (this->data_source.fd, &cmd, 1); if (cmd & PINOS_TRANSPORT_CMD_HAVE_EVENT) { SpaNodeEvent event; @@ -1134,7 +1129,6 @@ proxy_on_data_fd_events (SpaPollNotifyData *data) this->event_cb (&this->node, &ni.event, this->user_data); } } - return 0; } static const SpaNode proxy_node = { @@ -1176,9 +1170,9 @@ proxy_init (SpaProxy *this, for (i = 0; i < n_support; i++) { if (strcmp (support[i].uri, SPA_LOG_URI) == 0) this->log = support[i].data; - else if (strcmp (support[i].uri, SPA_POLL__MainLoop) == 0) + else if (strcmp (support[i].uri, SPA_LOOP__MainLoop) == 0) this->main_loop = support[i].data; - else if (strcmp (support[i].uri, SPA_POLL__DataLoop) == 0) + else if (strcmp (support[i].uri, SPA_LOOP__DataLoop) == 0) this->data_loop = support[i].data; } if (this->data_loop == NULL) { @@ -1190,29 +1184,17 @@ proxy_init (SpaProxy *this, this->node = proxy_node; - this->ctrl_fds[0].fd = -1; - this->ctrl_fds[0].events = POLLIN | POLLPRI | POLLERR; - this->ctrl_fds[0].revents = 0; - this->ctrl_poll.id = 0; - this->ctrl_poll.enabled = true; - this->ctrl_poll.fds = this->ctrl_fds; - this->ctrl_poll.n_fds = 1; - this->ctrl_poll.idle_cb = NULL; - this->ctrl_poll.before_cb = NULL; - this->ctrl_poll.after_cb = proxy_on_fd_events; - this->ctrl_poll.user_data = this; + this->ctrl_source.func = proxy_on_fd_events; + this->ctrl_source.data = this; + this->ctrl_source.fd = -1; + this->ctrl_source.mask = SPA_IO_IN | SPA_IO_ERR; + this->ctrl_source.rmask = 0; - this->data_fds[0].fd = -1; - this->data_fds[0].events = POLLIN | POLLPRI | POLLERR; - this->data_fds[0].revents = 0; - this->data_poll.id = 0; - this->data_poll.enabled = true; - this->data_poll.fds = this->data_fds; - this->data_poll.n_fds = 1; - this->data_poll.idle_cb = NULL; - this->data_poll.before_cb = NULL; - this->data_poll.after_cb = proxy_on_data_fd_events; - this->data_poll.user_data = this; + this->data_source.func = proxy_on_data_fd_events; + this->data_source.data = this; + this->data_source.fd = -1; + this->data_source.mask = SPA_IO_IN | SPA_IO_ERR; + this->data_source.rmask = 0; return SPA_RESULT_RETURN_ASYNC (this->seq++); } @@ -1243,7 +1225,7 @@ on_loop_changed (PinosListener *listener, PinosNode *node) { PinosClientNodeImpl *impl = SPA_CONTAINER_OF (listener, PinosClientNodeImpl, loop_changed); - impl->proxy.data_loop = &node->data_loop->poll; + impl->proxy.data_loop = node->data_loop->loop->loop; } static SpaResult @@ -1259,13 +1241,13 @@ proxy_clear (SpaProxy *this) if (this->out_ports[i].valid) clear_port (this, &this->out_ports[i], SPA_DIRECTION_OUTPUT, i); } - if (this->ctrl_fds[0].fd != -1) { - spa_poll_remove_item (this->main_loop, &this->ctrl_poll); - close (this->ctrl_fds[0].fd); + if (this->ctrl_source.fd != -1) { + spa_loop_remove_source (this->main_loop, &this->ctrl_source); + close (this->ctrl_source.fd); } - if (this->data_fds[0].fd != -1) { - spa_poll_remove_item (this->data_loop, &this->data_poll); - close (this->data_fds[0].fd); + if (this->data_source.fd != -1) { + spa_loop_remove_source (this->data_loop, &this->data_source); + close (this->data_source.fd); } return SPA_RESULT_OK; @@ -1380,9 +1362,9 @@ pinos_client_node_get_ctrl_socket (PinosClientNode *this, if (socketpair (AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, 0, fd) != 0) return SPA_RESULT_ERRNO; - impl->proxy.ctrl_fds[0].fd = fd[0]; - impl->proxy.conn = pinos_connection_new (impl->proxy.ctrl_fds[0].fd); - spa_poll_add_item (impl->proxy.main_loop, &impl->proxy.ctrl_poll); + impl->proxy.ctrl_source.fd = fd[0]; + impl->proxy.conn = pinos_connection_new (impl->proxy.ctrl_source.fd); + spa_loop_add_source (impl->proxy.main_loop, &impl->proxy.ctrl_source); impl->ctrl_fd = fd[1]; } *fd = impl->ctrl_fd; @@ -1411,8 +1393,8 @@ pinos_client_node_get_data_socket (PinosClientNode *this, if (socketpair (AF_UNIX, SOCK_STREAM, 0, fd) != 0) return SPA_RESULT_ERRNO; - impl->proxy.data_fds[0].fd = fd[0]; - spa_poll_add_item (impl->proxy.data_loop, &impl->proxy.data_poll); + impl->proxy.data_source.fd = fd[0]; + spa_loop_add_source (impl->proxy.data_loop, &impl->proxy.data_source); impl->data_fd = fd[1]; } *fd = impl->data_fd; diff --git a/pinos/server/core.c b/pinos/server/core.c index d66f389d0..5b020a2e5 100644 --- a/pinos/server/core.c +++ b/pinos/server/core.c @@ -47,13 +47,15 @@ pinos_core_new (PinosMainLoop *main_loop) impl->support[0].data = this->registry.map; impl->support[1].uri = SPA_LOG_URI; impl->support[1].data = pinos_log_get (); - impl->support[2].uri = SPA_POLL__DataLoop; - impl->support[2].data = &this->data_loop->poll; - impl->support[3].uri = SPA_POLL__MainLoop; - impl->support[3].data = this->main_loop->poll; + impl->support[2].uri = SPA_LOOP__DataLoop; + impl->support[2].data = this->data_loop->loop->loop; + impl->support[3].uri = SPA_LOOP__MainLoop; + impl->support[3].data = this->main_loop->loop; this->support = impl->support; this->n_support = 4; + pinos_data_loop_start (this->data_loop); + spa_list_init (&this->global_list); spa_list_init (&this->client_list); spa_list_init (&this->node_list); diff --git a/pinos/server/core.h b/pinos/server/core.h index 5b3d5ed0a..885504256 100644 --- a/pinos/server/core.h +++ b/pinos/server/core.h @@ -28,6 +28,7 @@ typedef struct _PinosCore PinosCore; typedef struct _PinosGlobal PinosGlobal; #include + #include #include #include diff --git a/pinos/server/data-loop.c b/pinos/server/data-loop.c index f76cc22f4..f9d97def0 100644 --- a/pinos/server/data-loop.c +++ b/pinos/server/data-loop.c @@ -28,39 +28,15 @@ #include #include -#include "spa/include/spa/ringbuffer.h" #include "pinos/client/log.h" #include "pinos/client/rtkit.h" #include "pinos/server/data-loop.h" -#define DATAS_SIZE (4096 * 8) - -typedef struct { - size_t item_size; - SpaPollInvokeFunc func; - uint32_t seq; - size_t size; - void *data; - void *user_data; -} InvokeItem; - typedef struct { PinosDataLoop this; - SpaRingbuffer buffer; - uint8_t buffer_data[DATAS_SIZE]; - - unsigned int n_poll; - SpaPollItem poll[16]; - int idx[16]; - - bool rebuild_fds; - SpaPollFd fds[32]; - unsigned int n_fds; - - uint32_t counter; - uint32_t seq; + PinosSource *event; bool running; pthread_t thread; @@ -111,266 +87,31 @@ make_realtime (PinosDataLoop *this) } static void * -loop (void *user_data) +do_loop (void *user_data) { PinosDataLoopImpl *impl = user_data; PinosDataLoop *this = &impl->this; - SpaPoll *p = &this->poll; - unsigned int i, j; + SpaResult res; make_realtime (this); pinos_log_debug ("data-loop %p: enter thread", this); while (impl->running) { - SpaPollNotifyData ndata; - unsigned int n_idle = 0; - int r; - - /* prepare */ - for (i = 0; i < impl->n_poll; i++) { - SpaPollItem *p = &impl->poll[i]; - - if (p->enabled && p->idle_cb) { - ndata.fds = NULL; - ndata.n_fds = 0; - ndata.user_data = p->user_data; - if (SPA_RESULT_IS_ERROR (p->idle_cb (&ndata))) - p->enabled = false; - n_idle++; - } - } -// if (n_idle > 0) -// continue; - - /* rebuild */ - if (impl->rebuild_fds) { - impl->n_fds = 1; - for (i = 0; i < impl->n_poll; i++) { - SpaPollItem *p = &impl->poll[i]; - - if (!p->enabled) - continue; - - for (j = 0; j < p->n_fds; j++) - impl->fds[impl->n_fds + j] = p->fds[j]; - impl->idx[i] = impl->n_fds; - impl->n_fds += p->n_fds; - } - impl->rebuild_fds = false; - } - - /* before */ - for (i = 0; i < impl->n_poll; i++) { - SpaPollItem *p = &impl->poll[i]; - - if (p->enabled && p->before_cb) { - ndata.fds = &impl->fds[impl->idx[i]]; - ndata.n_fds = p->n_fds; - ndata.user_data = p->user_data; - if (SPA_RESULT_IS_ERROR (p->before_cb (&ndata))) - p->enabled = false; - } - } - - r = poll ((struct pollfd *) impl->fds, impl->n_fds, -1); - if (r < 0) { - if (errno == EINTR) - continue; - break; - } - if (r == 0) { - pinos_log_warn ("data-loop %p: select timeout should not happen", this); - continue; - } - - /* check wakeup */ - if (impl->fds[0].revents & POLLIN) { - uint64_t u; - size_t offset; - - if (read (impl->fds[0].fd, &u, sizeof(uint64_t)) != sizeof(uint64_t)) - pinos_log_warn ("data-loop %p: failed to read fd: %s", this, strerror (errno)); - - while (spa_ringbuffer_get_read_offset (&impl->buffer, &offset) > 0) { - InvokeItem *item = SPA_MEMBER (impl->buffer_data, offset, InvokeItem); - item->func (p, true, item->seq, item->size, item->data, item->user_data); - spa_ringbuffer_read_advance (&impl->buffer, item->item_size); - } - continue; - } - - /* after */ - for (i = 0; i < impl->n_poll; i++) { - SpaPollItem *p = &impl->poll[i]; - - if (p->enabled && p->after_cb && (p->n_fds == 0 || impl->fds[impl->idx[i]].revents != 0)) { - ndata.fds = &impl->fds[impl->idx[i]]; - ndata.n_fds = p->n_fds; - ndata.user_data = p->user_data; - if (SPA_RESULT_IS_ERROR (p->after_cb (&ndata))) - p->enabled = false; - } - } + if ((res = pinos_loop_iterate (this->loop, -1)) < 0) + pinos_log_warn ("data-loop %p: iterate error %d", this, res); } pinos_log_debug ("data-loop %p: leave thread", this); return NULL; } -static void -wakeup_thread (PinosDataLoopImpl *impl) -{ - uint64_t u = 1; - - if (write (impl->fds[0].fd, &u, sizeof(uint64_t)) != sizeof(uint64_t)) - pinos_log_warn ("data-loop %p: failed to write fd: %s", impl, strerror (errno)); -} static void -start_thread (PinosDataLoopImpl *impl) +do_stop (PinosSource *source, + void *data) { - int err; - - if (!impl->running) { - impl->running = true; - if ((err = pthread_create (&impl->thread, NULL, loop, impl)) != 0) { - pinos_log_warn ("data-loop %p: can't create thread: %s", impl, strerror (err)); - impl->running = false; - } - } -} - -static void -stop_thread (PinosDataLoopImpl *impl, bool in_thread) -{ - if (impl->running) { - impl->running = false; - if (!in_thread) { - wakeup_thread (impl); - pthread_join (impl->thread, NULL); - } - } -} - -static SpaResult -do_add_item (SpaPoll *poll, - SpaPollItem *item) -{ - PinosDataLoop *this = SPA_CONTAINER_OF (poll, PinosDataLoop, poll); - PinosDataLoopImpl *impl = SPA_CONTAINER_OF (this, PinosDataLoopImpl, this); - bool in_thread = pthread_equal (impl->thread, pthread_self()); - - item->id = ++impl->counter; - impl->poll[impl->n_poll] = *item; - impl->n_poll++; - if (item->n_fds) - impl->rebuild_fds = true; - - if (!in_thread) { - wakeup_thread (impl); - start_thread (impl); - } - return SPA_RESULT_OK; -} - - -static SpaResult -do_update_item (SpaPoll *poll, - SpaPollItem *item) -{ - PinosDataLoop *this = SPA_CONTAINER_OF (poll, PinosDataLoop, poll); - PinosDataLoopImpl *impl = SPA_CONTAINER_OF (this, PinosDataLoopImpl, this); - bool in_thread = pthread_equal (impl->thread, pthread_self()); - unsigned int i; - - for (i = 0; i < impl->n_poll; i++) { - if (impl->poll[i].id == item->id) - impl->poll[i] = *item; - } - if (item->n_fds) - impl->rebuild_fds = true; - - if (!in_thread) - wakeup_thread (impl); - - return SPA_RESULT_OK; -} - -static SpaResult -do_remove_item (SpaPoll *poll, - SpaPollItem *item) -{ - PinosDataLoop *this = SPA_CONTAINER_OF (poll, PinosDataLoop, poll); - PinosDataLoopImpl *impl = SPA_CONTAINER_OF (this, PinosDataLoopImpl, this); - bool in_thread = pthread_equal (impl->thread, pthread_self()); - unsigned int i; - - for (i = 0; i < impl->n_poll; i++) { - if (impl->poll[i].id == item->id) { - impl->n_poll--; - for (; i < impl->n_poll; i++) - impl->poll[i] = impl->poll[i+1]; - break; - } - } - if (item->n_fds) { - impl->rebuild_fds = true; - if (!in_thread) - wakeup_thread (impl); - } - return SPA_RESULT_OK; -} - -static SpaResult -do_invoke (SpaPoll *poll, - SpaPollInvokeFunc func, - uint32_t seq, - size_t size, - void *data, - void *user_data) -{ - PinosDataLoop *this = SPA_CONTAINER_OF (poll, PinosDataLoop, poll); - PinosDataLoopImpl *impl = SPA_CONTAINER_OF (this, PinosDataLoopImpl, this); - bool in_thread = pthread_equal (impl->thread, pthread_self()); - SpaRingbufferArea areas[2]; - InvokeItem *item; - SpaResult res; - - if (in_thread) { - res = func (poll, false, seq, size, data, user_data); - } else { - spa_ringbuffer_get_write_areas (&impl->buffer, areas); - if (areas[0].len < sizeof (InvokeItem)) { - pinos_log_warn ("queue full"); - return SPA_RESULT_ERROR; - } - item = SPA_MEMBER (impl->buffer_data, areas[0].offset, InvokeItem); - item->seq = seq; - item->func = func; - item->user_data = user_data; - item->size = size; - - if (areas[0].len > sizeof (InvokeItem) + size) { - item->data = SPA_MEMBER (item, sizeof (InvokeItem), void); - item->item_size = sizeof (InvokeItem) + size; - if (areas[0].len < sizeof (InvokeItem) + item->item_size) - item->item_size = areas[0].len; - } else { - item->data = SPA_MEMBER (impl->buffer_data, areas[1].offset, void); - item->item_size = areas[0].len + 1 + size; - } - memcpy (item->data, data, size); - - spa_ringbuffer_write_advance (&impl->buffer, item->item_size); - - wakeup_thread (impl); - - if (seq != SPA_ID_INVALID) - res = SPA_RESULT_RETURN_ASYNC (seq); - else - res = SPA_RESULT_OK; - } - return res; + PinosDataLoopImpl *impl = data; + impl->running = false; } /** @@ -387,38 +128,64 @@ pinos_data_loop_new (void) PinosDataLoop *this; impl = calloc (1, sizeof (PinosDataLoopImpl)); - this = &impl->this; - pinos_log_debug ("data-loop %p: new", impl); - this->poll.size = sizeof (SpaPoll); - this->poll.info = NULL; - this->poll.add_item = do_add_item; - this->poll.update_item = do_update_item; - this->poll.remove_item = do_remove_item; - this->poll.invoke = do_invoke; - - impl->fds[0].fd = eventfd (0, 0); - impl->fds[0].events = POLLIN | POLLPRI | POLLERR; - impl->fds[0].revents = 0; - impl->n_fds = 1; - - spa_ringbuffer_init (&impl->buffer, DATAS_SIZE); + this = &impl->this; + this->loop = pinos_loop_new (); + pinos_signal_init (&this->destroy_signal); + impl->event = pinos_loop_add_event (this->loop, + do_stop, + impl); return this; } void -pinos_data_loop_destroy (PinosDataLoop * loop) +pinos_data_loop_destroy (PinosDataLoop *loop) { PinosDataLoopImpl *impl = SPA_CONTAINER_OF (loop, PinosDataLoopImpl, this); pinos_log_debug ("data-loop %p: destroy", impl); - stop_thread (impl, false); - close (impl->fds[0].fd); + pinos_signal_emit (&loop->destroy_signal, loop); + + pinos_data_loop_stop (loop); + + pinos_source_destroy (impl->event); + pinos_loop_destroy (loop->loop); free (impl); } +SpaResult +pinos_data_loop_start (PinosDataLoop *loop) +{ + PinosDataLoopImpl *impl = SPA_CONTAINER_OF (loop, PinosDataLoopImpl, this); + + if (!impl->running) { + int err; + + impl->running = true; + if ((err = pthread_create (&impl->thread, NULL, do_loop, impl)) != 0) { + pinos_log_warn ("data-loop %p: can't create thread: %s", impl, strerror (err)); + impl->running = false; + return SPA_RESULT_ERROR; + } + pinos_loop_set_thread (impl->this.loop, &impl->thread); + } + return SPA_RESULT_OK; +} + +SpaResult +pinos_data_loop_stop (PinosDataLoop *loop) +{ + PinosDataLoopImpl *impl = SPA_CONTAINER_OF (loop, PinosDataLoopImpl, this); + + pinos_source_event_signal (impl->event); + + pthread_join (impl->thread, NULL); + + return SPA_RESULT_OK; +} + bool pinos_data_loop_in_thread (PinosDataLoop *loop) { diff --git a/pinos/server/data-loop.h b/pinos/server/data-loop.h index 04a18946e..5cee3a9ba 100644 --- a/pinos/server/data-loop.h +++ b/pinos/server/data-loop.h @@ -24,7 +24,7 @@ extern "C" { #endif -#include +#include typedef struct _PinosDataLoop PinosDataLoop; @@ -34,12 +34,18 @@ typedef struct _PinosDataLoop PinosDataLoop; * Pinos rt-loop object. */ struct _PinosDataLoop { - SpaPoll poll; + PinosLoop *loop; + + PINOS_SIGNAL (destroy_signal, (PinosListener *listener, + PinosDataLoop *loop)); }; PinosDataLoop * pinos_data_loop_new (void); void pinos_data_loop_destroy (PinosDataLoop *loop); +SpaResult pinos_data_loop_start (PinosDataLoop *loop); +SpaResult pinos_data_loop_stop (PinosDataLoop *loop); + bool pinos_data_loop_in_thread (PinosDataLoop *loop); #ifdef __cplusplus diff --git a/pinos/server/link.c b/pinos/server/link.c index 20ee3f229..a6b259608 100644 --- a/pinos/server/link.c +++ b/pinos/server/link.c @@ -732,7 +732,7 @@ clear_port_buffers (PinosLink *link, PinosPort *port) } static SpaResult -do_link_remove_done (SpaPoll *poll, +do_link_remove_done (SpaLoop *loop, bool async, uint32_t seq, size_t size, @@ -776,7 +776,7 @@ do_link_remove_done (SpaPoll *poll, } static SpaResult -do_link_remove (SpaPoll *poll, +do_link_remove (SpaLoop *loop, bool async, uint32_t seq, size_t size, @@ -795,7 +795,7 @@ do_link_remove (SpaPoll *poll, this->rt.output = NULL; } - res = spa_poll_invoke (this->core->main_loop->poll, + res = spa_loop_invoke (this->core->main_loop->loop, do_link_remove_done, seq, 0, @@ -826,7 +826,7 @@ pinos_link_destroy (PinosLink * this) pinos_signal_remove (&impl->input_port_destroy); pinos_signal_remove (&impl->input_async_complete); - res = spa_poll_invoke (&this->input->node->data_loop->poll, + res = spa_loop_invoke (this->input->node->data_loop->loop->loop, do_link_remove, impl->seq++, 0, @@ -837,7 +837,7 @@ pinos_link_destroy (PinosLink * this) pinos_signal_remove (&impl->output_port_destroy); pinos_signal_remove (&impl->output_async_complete); - res = spa_poll_invoke (&this->output->node->data_loop->poll, + res = spa_loop_invoke (this->output->node->data_loop->loop->loop, do_link_remove, impl->seq++, 0, diff --git a/pinos/server/main-loop.c b/pinos/server/main-loop.c index a4b6e04ea..cb3e5f8ac 100644 --- a/pinos/server/main-loop.c +++ b/pinos/server/main-loop.c @@ -20,7 +20,6 @@ #include #include #include -#include #include #include @@ -34,12 +33,12 @@ #define DATAS_SIZE (4096 * 8) typedef struct { - size_t item_size; - SpaPollInvokeFunc func; - uint32_t seq; - size_t size; - void *data; - void *user_data; + size_t item_size; + SpaInvokeFunc func; + uint32_t seq; + size_t size; + void *data; + void *user_data; } InvokeItem; typedef struct _WorkItem WorkItem; @@ -59,18 +58,17 @@ typedef struct { PinosMainLoop this; - SpaPoll poll; + SpaLoop loop; GMainContext *context; - GMainLoop *loop; + GMainLoop *main_loop; uint32_t counter; SpaRingbuffer buffer; uint8_t buffer_data[DATAS_SIZE]; - SpaPollFd fds[1]; - SpaPollItem wakeup; + SpaSource wakeup; SpaList work_list; SpaList free_list; @@ -79,103 +77,109 @@ typedef struct } PinosMainLoopImpl; typedef struct { - PinosMainLoop *loop; - SpaPollItem item; -} PollData; + PinosMainLoopImpl *impl; + SpaSource *source; + guint id; +} LoopData; static bool -poll_event (GIOChannel *source, - GIOCondition condition, - void * user_data) +poll_event (GIOChannel *source, + GIOCondition condition, + void *user_data) { - PollData *data = user_data; - SpaPollNotifyData d; + LoopData *data = user_data; + SpaSource *s = data->source; - d.user_data = data->item.user_data; - d.fds = data->item.fds; - d.fds[0].revents = condition; - d.n_fds = data->item.n_fds; - data->item.after_cb (&d); + s->rmask = 0; + if (condition & G_IO_IN) + s->rmask |= SPA_IO_IN; + if (condition & G_IO_OUT) + s->rmask |= SPA_IO_OUT; + if (condition & G_IO_ERR) + s->rmask |= SPA_IO_ERR; + if (condition & G_IO_HUP) + s->rmask |= SPA_IO_HUP; + s->func (s); return TRUE; } static SpaResult -do_add_item (SpaPoll *poll, - SpaPollItem *item) +do_add_source (SpaLoop *loop, + SpaSource *source) { - PinosMainLoop *this = SPA_CONTAINER_OF (poll, PinosMainLoop, poll); + PinosMainLoopImpl *impl = SPA_CONTAINER_OF (loop, PinosMainLoopImpl, loop); GIOChannel *channel; - GSource *source; - PollData data; + GSource *s; + LoopData *data; - channel = g_io_channel_unix_new (item->fds[0].fd); - source = g_io_create_watch (channel, G_IO_IN); + channel = g_io_channel_unix_new (source->fd); + s = g_io_create_watch (channel, G_IO_IN); g_io_channel_unref (channel); - data.loop = this; - data.item = *item; + data = g_new0 (LoopData, 1); + data->impl = impl; + data->source = source; - g_source_set_callback (source, (GSourceFunc) poll_event, g_slice_dup (PollData, &data) , NULL); - item->id = g_source_attach (source, g_main_context_get_thread_default ()); - g_source_unref (source); + g_source_set_callback (s, (GSourceFunc) poll_event, data, g_free); + data->id = g_source_attach (s, g_main_context_get_thread_default ()); + g_source_unref (s); - pinos_log_debug ("added main poll %d", item->id); + source->loop_private = data; + source->loop = loop; + + pinos_log_debug ("added main poll %d", data->id); return SPA_RESULT_OK; } static SpaResult -do_update_item (SpaPoll *poll, - SpaPollItem *item) +do_update_source (SpaSource *source) { - pinos_log_debug ("update main poll %d", item->id); + LoopData *data = source->loop_private; + pinos_log_debug ("update main poll %d", data->id); return SPA_RESULT_OK; } -static SpaResult -do_remove_item (SpaPoll *poll, - SpaPollItem *item) +static void +do_remove_source (SpaSource *source) { - GSource *source; + GSource *gsource; + LoopData *data = source->loop_private; - pinos_log_debug ("remove main poll %d", item->id); - source = g_main_context_find_source_by_id (g_main_context_get_thread_default (), item->id); - g_source_destroy (source); - - return SPA_RESULT_OK; + pinos_log_debug ("remove main poll %d", data->id); + gsource = g_main_context_find_source_by_id (g_main_context_get_thread_default (), data->id); + g_source_destroy (gsource); } -static int -main_loop_dispatch (SpaPollNotifyData *data) +static void +main_loop_dispatch (SpaSource *source) { - PinosMainLoopImpl *impl = data->user_data; - SpaPoll *p = &impl->poll; + LoopData *data = source->loop_private; + PinosMainLoopImpl *impl = data->impl; uint64_t u; size_t offset; InvokeItem *item; - if (read (impl->fds[0].fd, &u, sizeof(uint64_t)) != sizeof(uint64_t)) + if (read (impl->wakeup.fd, &u, sizeof(uint64_t)) != sizeof(uint64_t)) pinos_log_warn ("main-loop %p: failed to read fd", strerror (errno)); while (spa_ringbuffer_get_read_offset (&impl->buffer, &offset) > 0) { item = SPA_MEMBER (impl->buffer_data, offset, InvokeItem); - item->func (p, true, item->seq, item->size, item->data, item->user_data); + item->func (&impl->loop, true, item->seq, item->size, item->data, item->user_data); spa_ringbuffer_read_advance (&impl->buffer, item->item_size); } - - return 0; } static SpaResult -do_invoke (SpaPoll *poll, - SpaPollInvokeFunc func, - uint32_t seq, - size_t size, - void *data, - void *user_data) +do_invoke (SpaLoop *loop, + SpaInvokeFunc func, + uint32_t seq, + size_t size, + void *data, + void *user_data) { - PinosMainLoopImpl *impl = SPA_CONTAINER_OF (poll, PinosMainLoopImpl, poll); + PinosMainLoopImpl *impl = SPA_CONTAINER_OF (loop, PinosMainLoopImpl, loop); bool in_thread = false; SpaRingbufferArea areas[2]; InvokeItem *item; @@ -183,7 +187,7 @@ do_invoke (SpaPoll *poll, SpaResult res; if (in_thread) { - res = func (poll, false, seq, size, data, user_data); + res = func (loop, false, seq, size, data, user_data); } else { spa_ringbuffer_get_write_areas (&impl->buffer, areas); if (areas[0].len < sizeof (InvokeItem)) { @@ -191,10 +195,10 @@ do_invoke (SpaPoll *poll, return SPA_RESULT_ERROR; } item = SPA_MEMBER (impl->buffer_data, areas[0].offset, InvokeItem); - item->seq = seq; item->func = func; - item->user_data = user_data; + item->seq = seq; item->size = size; + item->user_data = user_data; if (areas[0].len > sizeof (InvokeItem) + size) { item->data = SPA_MEMBER (item, sizeof (InvokeItem), void); @@ -209,10 +213,14 @@ do_invoke (SpaPoll *poll, spa_ringbuffer_write_advance (&impl->buffer, item->item_size); - if (write (impl->fds[0].fd, &u, sizeof(uint64_t)) != sizeof(uint64_t)) + if (write (impl->wakeup.fd, &u, sizeof(uint64_t)) != sizeof(uint64_t)) pinos_log_warn ("data-loop %p: failed to write fd", strerror (errno)); - res = SPA_RESULT_RETURN_ASYNC (seq); + if (seq != SPA_ID_INVALID) + res = SPA_RESULT_RETURN_ASYNC (seq); + else + res = SPA_RESULT_OK; + } return res; } @@ -360,7 +368,7 @@ main_loop_quit (PinosMainLoop *loop) { PinosMainLoopImpl *impl = SPA_CONTAINER_OF (loop, PinosMainLoopImpl, this); pinos_log_debug ("main-loop %p: quit", impl); - g_main_loop_quit (impl->loop); + g_main_loop_quit (impl->main_loop); } static void @@ -368,7 +376,7 @@ main_loop_run (PinosMainLoop *loop) { PinosMainLoopImpl *impl = SPA_CONTAINER_OF (loop, PinosMainLoopImpl, this); pinos_log_debug ("main-loop %p: run", impl); - g_main_loop_run (impl->loop); + g_main_loop_run (impl->main_loop); } /** @@ -396,33 +404,25 @@ pinos_main_loop_new (void) this->defer_complete = main_loop_defer_complete; this->sync = main_loop_sync; - impl->poll.size = sizeof (SpaPoll); - impl->poll.info = NULL; - impl->poll.add_item = do_add_item; - impl->poll.update_item = do_update_item; - impl->poll.remove_item = do_remove_item; - impl->poll.invoke = do_invoke; - this->poll = &impl->poll; + impl->loop.size = sizeof (SpaLoop); + impl->loop.add_source = do_add_source; + impl->loop.update_source = do_update_source; + impl->loop.remove_source = do_remove_source; + impl->loop.invoke = do_invoke; + this->loop = &impl->loop; spa_list_init (&impl->work_list); spa_list_init (&impl->free_list); spa_ringbuffer_init (&impl->buffer, DATAS_SIZE); - impl->loop = g_main_loop_new (impl->context, false); + impl->main_loop = g_main_loop_new (impl->context, false); - impl->fds[0].fd = eventfd (0, 0); - impl->fds[0].events = POLLIN | POLLPRI | POLLERR; - impl->fds[0].revents = 0; - - impl->wakeup.id = SPA_ID_INVALID; - impl->wakeup.enabled = false; - impl->wakeup.fds = impl->fds; - impl->wakeup.n_fds = 1; - impl->wakeup.idle_cb = NULL; - impl->wakeup.before_cb = NULL; - impl->wakeup.after_cb = main_loop_dispatch; - impl->wakeup.user_data = impl; - do_add_item (&impl->poll, &impl->wakeup); + impl->wakeup.func = main_loop_dispatch; + impl->wakeup.data = impl; + impl->wakeup.fd = eventfd (0, 0); + impl->wakeup.mask = SPA_IO_IN | SPA_IO_ERR; + impl->wakeup.rmask = 0; + do_add_source (&impl->loop, &impl->wakeup); return this; } @@ -435,9 +435,9 @@ pinos_main_loop_destroy (PinosMainLoop *loop) pinos_log_debug ("main-loop %p: destroy", impl); - g_main_loop_unref (impl->loop); + g_main_loop_unref (impl->main_loop); - close (impl->fds[0].fd); + close (impl->wakeup.fd); spa_list_for_each_safe (item, tmp, &impl->free_list, link) free (item); diff --git a/pinos/server/main-loop.h b/pinos/server/main-loop.h index dea6295be..a0df71880 100644 --- a/pinos/server/main-loop.h +++ b/pinos/server/main-loop.h @@ -24,7 +24,7 @@ extern "C" { #endif -#include +#include typedef struct _PinosMainLoop PinosMainLoop; @@ -39,7 +39,7 @@ typedef void (*PinosDeferFunc) (void *obj, * Pinos main-loop interface. */ struct _PinosMainLoop { - SpaPoll *poll; + SpaLoop *loop; void (*run) (PinosMainLoop *loop); void (*quit) (PinosMainLoop *loop); diff --git a/pinos/server/node.c b/pinos/server/node.c index be99ef3de..bd20d94f2 100644 --- a/pinos/server/node.c +++ b/pinos/server/node.c @@ -251,7 +251,7 @@ send_clock_update (PinosNode *this) } static SpaResult -do_read_link (SpaPoll *poll, +do_read_link (SpaLoop *loop, bool async, uint32_t seq, size_t size, @@ -315,7 +315,7 @@ on_node_event (SpaNode *node, SpaNodeEvent *event, void *user_data) continue; link->rt.in_ready++; - spa_poll_invoke (&link->rt.input->node->data_loop->poll, + spa_loop_invoke (link->rt.input->node->data_loop->loop->loop, do_read_link, SPA_ID_INVALID, sizeof (PinosLink *), @@ -348,7 +348,7 @@ on_node_event (SpaNode *node, SpaNodeEvent *event, void *user_data) link->queue[offset] = po->buffer_id; spa_ringbuffer_write_advance (&link->ringbuffer, 1); - spa_poll_invoke (&link->rt.input->node->data_loop->poll, + spa_loop_invoke (link->rt.input->node->data_loop->loop->loop, do_read_link, SPA_ID_INVALID, sizeof (PinosLink *), @@ -473,7 +473,7 @@ pinos_node_new (PinosCore *core, } static SpaResult -do_node_remove_done (SpaPoll *poll, +do_node_remove_done (SpaLoop *loop, bool async, uint32_t seq, size_t size, @@ -502,7 +502,7 @@ do_node_remove_done (SpaPoll *poll, } static SpaResult -do_node_remove (SpaPoll *poll, +do_node_remove (SpaLoop *loop, bool async, uint32_t seq, size_t size, @@ -530,7 +530,7 @@ do_node_remove (SpaPoll *poll, } } - res = spa_poll_invoke (this->core->main_loop->poll, + res = spa_loop_invoke (this->core->main_loop->loop, do_node_remove_done, seq, 0, @@ -559,7 +559,7 @@ pinos_node_destroy (PinosNode * this) spa_list_remove (&this->link); pinos_global_destroy (this->global); - res = spa_poll_invoke (&this->data_loop->poll, + res = spa_loop_invoke (this->data_loop->loop->loop, do_node_remove, impl->seq++, 0, diff --git a/pinos/server/node.h b/pinos/server/node.h index 4f8672d09..6507779f8 100644 --- a/pinos/server/node.h +++ b/pinos/server/node.h @@ -31,7 +31,6 @@ typedef struct _PinosNode PinosNode; #include -#include #include #include diff --git a/pinos/server/port.c b/pinos/server/port.c index f0b5b680f..77eaa8303 100644 --- a/pinos/server/port.c +++ b/pinos/server/port.c @@ -73,7 +73,7 @@ pinos_port_destroy (PinosPort *port) } static SpaResult -do_add_link (SpaPoll *poll, +do_add_link (SpaLoop *loop, bool async, uint32_t seq, size_t size, @@ -169,13 +169,13 @@ pinos_port_link (PinosPort *output_port, output_node->n_used_output_links++; input_node->n_used_input_links++; - spa_poll_invoke (&output_node->data_loop->poll, + spa_loop_invoke (output_node->data_loop->loop->loop, do_add_link, SPA_ID_INVALID, sizeof (PinosLink *), &link, output_port); - spa_poll_invoke (&input_node->data_loop->poll, + spa_loop_invoke (input_node->data_loop->loop->loop, do_add_link, SPA_ID_INVALID, sizeof (PinosLink *), @@ -210,7 +210,7 @@ pinos_port_pause (PinosPort *port) } static SpaResult -do_remove_link_done (SpaPoll *poll, +do_remove_link_done (SpaLoop *loop, bool async, uint32_t seq, size_t size, @@ -259,7 +259,7 @@ do_remove_link_done (SpaPoll *poll, } static SpaResult -do_remove_link (SpaPoll *poll, +do_remove_link (SpaLoop *loop, bool async, uint32_t seq, size_t size, @@ -284,7 +284,7 @@ do_remove_link (SpaPoll *poll, pinos_port_pause (port); #endif - res = spa_poll_invoke (this->core->main_loop->poll, + res = spa_loop_invoke (this->core->main_loop->loop, do_remove_link_done, seq, sizeof (PinosLink *), @@ -301,7 +301,7 @@ pinos_port_unlink (PinosPort *port, PinosLink *link) pinos_log_debug ("port %p: start unlink %p", port, link); - res = spa_poll_invoke (&port->node->data_loop->poll, + res = spa_loop_invoke (port->node->data_loop->loop->loop, do_remove_link, impl->seq++, sizeof (PinosLink *), @@ -311,7 +311,7 @@ pinos_port_unlink (PinosPort *port, PinosLink *link) } static SpaResult -do_clear_buffers_done (SpaPoll *poll, +do_clear_buffers_done (SpaLoop *loop, bool async, uint32_t seq, size_t size, @@ -339,7 +339,7 @@ do_clear_buffers_done (SpaPoll *poll, } static SpaResult -do_clear_buffers (SpaPoll *poll, +do_clear_buffers (SpaLoop *loop, bool async, uint32_t seq, size_t size, @@ -352,7 +352,7 @@ do_clear_buffers (SpaPoll *poll, pinos_port_pause (port); - res = spa_poll_invoke (node->core->main_loop->poll, + res = spa_loop_invoke (node->core->main_loop->loop, do_clear_buffers_done, seq, 0, NULL, @@ -367,7 +367,7 @@ pinos_port_clear_buffers (PinosPort *port) PinosPortImpl *impl = SPA_CONTAINER_OF (port, PinosPortImpl, this); pinos_log_debug ("port %p: clear buffers", port); - res = spa_poll_invoke (&port->node->data_loop->poll, + res = spa_loop_invoke (port->node->data_loop->loop->loop, do_clear_buffers, impl->seq++, 0, NULL, diff --git a/spa/include/spa/defs.h b/spa/include/spa/defs.h index 96c3fb6d6..a5cf6edf7 100644 --- a/spa/include/spa/defs.h +++ b/spa/include/spa/defs.h @@ -27,6 +27,7 @@ extern "C" { #endif #include #include +#include #include typedef enum { diff --git a/spa/include/spa/loop.h b/spa/include/spa/loop.h new file mode 100644 index 000000000..22338dc1b --- /dev/null +++ b/spa/include/spa/loop.h @@ -0,0 +1,95 @@ +/* Simple Plugin API + * Copyright (C) 2016 Wim Taymans + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#ifndef __SPA_LOOP_H__ +#define __SPA_LOOP_H__ + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct _SpaLoop SpaLoop; +typedef struct _SpaSource SpaSource; + +#define SPA_LOOP_URI "http://spaplug.in/ns/loop" +#define SPA_LOOP_PREFIX SPA_LOOP_URI "#" +#define SPA_LOOP__MainLoop SPA_LOOP_PREFIX "MainLoop" +#define SPA_LOOP__DataLoop SPA_LOOP_PREFIX "DataLoop" + +#include + +typedef void (*SpaSourceFunc) (SpaSource *source); + +typedef enum { + SPA_IO_IN = (1 << 0), + SPA_IO_OUT = (1 << 1), + SPA_IO_HUP = (1 << 2), + SPA_IO_ERR = (1 << 3), +} SpaIO; + +struct _SpaSource { + SpaLoop *loop; + SpaSourceFunc func; + void *data; + int fd; + SpaIO mask; + SpaIO rmask; + void *loop_private; +}; + +typedef SpaResult (*SpaInvokeFunc) (SpaLoop *loop, + bool async, + uint32_t seq, + size_t size, + void *data, + void *user_data); +/** + * SpaLoop: + * + * Register sources to an event loop + */ +struct _SpaLoop { + /* the total size of this structure. This can be used to expand this + * structure in the future */ + size_t size; + + SpaResult (*add_source) (SpaLoop *loop, + SpaSource *source); + SpaResult (*update_source) (SpaSource *source); + + void (*remove_source) (SpaSource *source); + + SpaResult (*invoke) (SpaLoop *loop, + SpaInvokeFunc func, + uint32_t seq, + size_t size, + void *data, + void *user_data); +}; + +#define spa_loop_add_source(l,...) (l)->add_source((l),__VA_ARGS__) +#define spa_loop_update_source(l,...) (l)->update_source(__VA_ARGS__) +#define spa_loop_remove_source(l,...) (l)->remove_source(__VA_ARGS__) +#define spa_loop_invoke(l,...) (l)->invoke((l),__VA_ARGS__) + +#ifdef __cplusplus +} /* extern "C" */ +#endif + +#endif /* __SPA_LOOP_H__ */ diff --git a/spa/include/spa/node-event.h b/spa/include/spa/node-event.h index 090a00d49..4833bc102 100644 --- a/spa/include/spa/node-event.h +++ b/spa/include/spa/node-event.h @@ -27,7 +27,6 @@ extern "C" { typedef struct _SpaNodeEvent SpaNodeEvent; #include -#include #include #define SPA_NODE_EVENT_URI "http://spaplug.in/ns/node-event" diff --git a/spa/include/spa/poll.h b/spa/include/spa/poll.h deleted file mode 100644 index 00153d8e5..000000000 --- a/spa/include/spa/poll.h +++ /dev/null @@ -1,183 +0,0 @@ -/* Simple Plugin API - * Copyright (C) 2016 Wim Taymans - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Library General Public - * License as published by the Free Software Foundation; either - * version 2 of the License, or (at your option) any later version. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Library General Public License for more details. - * - * You should have received a copy of the GNU Library General Public - * License along with this library; if not, write to the - * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, - * Boston, MA 02110-1301, USA. - */ - -#ifndef __SPA_POLL_H__ -#define __SPA_POLL_H__ - -#ifdef __cplusplus -extern "C" { -#endif - -typedef struct _SpaPoll SpaPoll; - -#define SPA_POLL_URI "http://spaplug.in/ns/poll" -#define SPA_POLL_PREFIX SPA_POLL_URI "#" -#define SPA_POLL__MainLoop SPA_POLL_PREFIX "MainLoop" -#define SPA_POLL__DataLoop SPA_POLL_PREFIX "DataLoop" - -#include -#include -#include - -/** - * SpaPollFd: - * @fd: a file descriptor - * @events: events to watch - * @revents: events after poll - */ -typedef struct { - int fd; - short events; - short revents; -} SpaPollFd; - -/** - * SpaPollNotifyData: - * @user_data: user data - * @fds: array of file descriptors - * @n_fds: number of elements in @fds - * - * Data passed to #SpaPollNotify. - */ -typedef struct { - void *user_data; - SpaPollFd *fds; - unsigned int n_fds; -} SpaPollNotifyData; - -typedef SpaResult (*SpaPollNotify) (SpaPollNotifyData *data); - -/** - * SpaPollItem: - * @id: id of the poll item. This will be set when - * adding the item to #SpaPoll. - * @enabled: if the item is enabled - * @fds: array of file descriptors to watch - * @n_fds: number of elements in @fds - * @idle_cb: callback called when there is no other work - * @before_cb: callback called before starting the poll - * @after_cb: callback called after the poll loop - * @user_data: user data passed to callbacks - */ -typedef struct { - uint32_t id; - bool enabled; - SpaPollFd *fds; - unsigned int n_fds; - SpaPollNotify idle_cb; - SpaPollNotify before_cb; - SpaPollNotify after_cb; - void *user_data; -} SpaPollItem; - - -/** - * SpaPollInvokeFunc: - * @poll: a #SpaPoll - * @async: If this function was called async - * @seq: sequence number - * @size: size of data - * @data: data - * @user_data: extra user data - * - * Function called from SpaPoll::invoke. If @async is %true, invoke returned - * an async return value and this function should possibly schedule an async - * reply. - * - * Returns: the result of the invoke function - */ -typedef SpaResult (*SpaPollInvokeFunc) (SpaPoll *poll, - bool async, - uint32_t seq, - size_t size, - void *data, - void *user_data); - - -/** - * SpaPoll: - * - * Register poll events - */ -struct _SpaPoll { - /* the total size of this structure. This can be used to expand this - * structure in the future */ - size_t size; - /** - * SpaPoll::info - * - * Extra information - */ - const SpaDict *info; - /** - * SpaPoll::add_item: - * @poll: a #SpaPoll - * @item: a #SpaPollItem - * - * Add @item to the list of polled items. - * - * The id in @item will be set and must be passed when updating or removing - * the @item. - * - * Returns: #SPA_RESULT_OK on success - * #SPA_RESULT_INVALID_ARGUMENTS when @poll or @item is %NULL - */ - SpaResult (*add_item) (SpaPoll *poll, - SpaPollItem *item); - - SpaResult (*update_item) (SpaPoll *poll, - SpaPollItem *item); - - SpaResult (*remove_item) (SpaPoll *poll, - SpaPollItem *item); - - /** - * SpaPoll::invoke: - * @poll: a #SpaPoll - * @func: function to call - * @seq: sequence number - * @size: size of data - * @data: data - * @user_data: extra user data - * - * Invoke @func from the poll context of @poll. @seq, @data, @size and - * @user_data are passed to @func. - * - * Returns: The result of @func when this function is already called in - * the @poll context, otherwise an async return value with @seq or, when - * @seq is %SPA_ID_INVALID, %SPA_RETURN_OK. - */ - SpaResult (*invoke) (SpaPoll *poll, - SpaPollInvokeFunc func, - uint32_t seq, - size_t size, - void *data, - void *user_data); -}; - -#define spa_poll_add_item(n,...) (n)->add_item((n),__VA_ARGS__) -#define spa_poll_update_item(n,...) (n)->update_item((n),__VA_ARGS__) -#define spa_poll_remove_item(n,...) (n)->remove_item((n),__VA_ARGS__) -#define spa_poll_invoke(n,...) (n)->invoke((n),__VA_ARGS__) - -#ifdef __cplusplus -} /* extern "C" */ -#endif - -#endif /* __SPA_POLL_H__ */ diff --git a/spa/plugins/alsa/alsa-monitor.c b/spa/plugins/alsa/alsa-monitor.c index 886318612..75f2930fe 100644 --- a/spa/plugins/alsa/alsa-monitor.c +++ b/spa/plugins/alsa/alsa-monitor.c @@ -29,7 +29,7 @@ #include #include -#include +#include #include #include @@ -56,7 +56,7 @@ struct _SpaALSAMonitor { URI uri; SpaIDMap *map; SpaLog *log; - SpaPoll *main_loop; + SpaLoop *main_loop; SpaMonitorEventCallback event_cb; void *user_data; @@ -68,8 +68,7 @@ struct _SpaALSAMonitor { ALSAItem uitem; int fd; - SpaPollFd fds[1]; - SpaPollItem poll; + SpaSource source; }; static SpaResult @@ -244,17 +243,17 @@ fill_item (SpaALSAMonitor *this, ALSAItem *item, struct udev_device *udevice) return 0; } -static int -alsa_on_fd_events (SpaPollNotifyData *data) +static void +alsa_on_fd_events (SpaSource *source) { - SpaALSAMonitor *this = data->user_data; + SpaALSAMonitor *this = source->data; struct udev_device *dev; const char *str; SpaMonitorItem *item; dev = udev_monitor_receive_device (this->umonitor); if (fill_item (this, &this->uitem, dev) < 0) - return 0; + return; if ((str = udev_device_get_action (dev)) == NULL) str = "change"; @@ -270,8 +269,6 @@ alsa_on_fd_events (SpaPollNotifyData *data) } item->event.size = sizeof (this->uitem); this->event_cb (&this->monitor, &item->event, this->user_data); - - return 0; } static SpaResult @@ -303,22 +300,15 @@ spa_alsa_monitor_set_event_callback (SpaMonitor *monitor, NULL); udev_monitor_enable_receiving (this->umonitor); - this->fd = udev_monitor_get_fd (this->umonitor);; - this->fds[0].fd = this->fd; - this->fds[0].events = POLLIN | POLLPRI | POLLERR; - this->fds[0].revents = 0; - this->poll.id = 0; - this->poll.enabled = true; - this->poll.fds = this->fds; - this->poll.n_fds = 1; - this->poll.idle_cb = NULL; - this->poll.before_cb = NULL; - this->poll.after_cb = alsa_on_fd_events; - this->poll.user_data = this; - spa_poll_add_item (this->main_loop, &this->poll); + this->source.func = alsa_on_fd_events; + this->source.data = this; + this->source.fd = udev_monitor_get_fd (this->umonitor);; + this->source.mask = SPA_IO_IN | SPA_IO_ERR; + + spa_loop_add_source (this->main_loop, &this->source); } else { - spa_poll_remove_item (this->main_loop, &this->poll); + spa_loop_remove_source (this->main_loop, &this->source); } return SPA_RESULT_OK; @@ -433,7 +423,7 @@ alsa_monitor_init (const SpaHandleFactory *factory, this->map = support[i].data; else if (strcmp (support[i].uri, SPA_LOG_URI) == 0) this->log = support[i].data; - else if (strcmp (support[i].uri, SPA_POLL__MainLoop) == 0) + else if (strcmp (support[i].uri, SPA_LOOP__MainLoop) == 0) this->main_loop = support[i].data; } if (this->map == NULL) { diff --git a/spa/plugins/alsa/alsa-sink.c b/spa/plugins/alsa/alsa-sink.c index b81ce2f71..736ab3b20 100644 --- a/spa/plugins/alsa/alsa-sink.c +++ b/spa/plugins/alsa/alsa-sink.c @@ -152,7 +152,7 @@ spa_alsa_sink_node_set_props (SpaNode *node, } static SpaResult -do_send_event (SpaPoll *poll, +do_send_event (SpaLoop *loop, bool async, uint32_t seq, size_t size, @@ -167,7 +167,7 @@ do_send_event (SpaPoll *poll, } static SpaResult -do_command (SpaPoll *poll, +do_command (SpaLoop *loop, bool async, uint32_t seq, size_t size, @@ -197,7 +197,7 @@ do_command (SpaPoll *poll, ac.event.size = sizeof (SpaNodeEventAsyncComplete); ac.seq = seq; ac.res = res; - spa_poll_invoke (this->main_loop, + spa_loop_invoke (this->main_loop, do_send_event, SPA_ID_INVALID, sizeof (ac), @@ -231,7 +231,7 @@ spa_alsa_sink_node_send_command (SpaNode *node, if (this->n_buffers == 0) return SPA_RESULT_NO_BUFFERS; - return spa_poll_invoke (this->data_loop, + return spa_loop_invoke (this->data_loop, do_command, ++this->seq, command->size, @@ -790,9 +790,9 @@ alsa_sink_init (const SpaHandleFactory *factory, this->map = support[i].data; else if (strcmp (support[i].uri, SPA_LOG_URI) == 0) this->log = support[i].data; - else if (strcmp (support[i].uri, SPA_POLL__DataLoop) == 0) + else if (strcmp (support[i].uri, SPA_LOOP__DataLoop) == 0) this->data_loop = support[i].data; - else if (strcmp (support[i].uri, SPA_POLL__MainLoop) == 0) + else if (strcmp (support[i].uri, SPA_LOOP__MainLoop) == 0) this->main_loop = support[i].data; } if (this->map == NULL) { diff --git a/spa/plugins/alsa/alsa-source.c b/spa/plugins/alsa/alsa-source.c index 359de198e..9c79791d6 100644 --- a/spa/plugins/alsa/alsa-source.c +++ b/spa/plugins/alsa/alsa-source.c @@ -153,7 +153,7 @@ spa_alsa_source_node_set_props (SpaNode *node, } static SpaResult -do_send_event (SpaPoll *poll, +do_send_event (SpaLoop *loop, bool async, uint32_t seq, size_t size, @@ -168,7 +168,7 @@ do_send_event (SpaPoll *poll, } static SpaResult -do_start (SpaPoll *poll, +do_start (SpaLoop *loop, bool async, uint32_t seq, size_t size, @@ -188,7 +188,7 @@ do_start (SpaPoll *poll, ac.event.size = sizeof (SpaNodeEventAsyncComplete); ac.seq = seq; ac.res = res; - spa_poll_invoke (this->main_loop, + spa_loop_invoke (this->main_loop, do_send_event, SPA_ID_INVALID, sizeof (ac), @@ -199,7 +199,7 @@ do_start (SpaPoll *poll, } static SpaResult -do_pause (SpaPoll *poll, +do_pause (SpaLoop *loop, bool async, uint32_t seq, size_t size, @@ -219,7 +219,7 @@ do_pause (SpaPoll *poll, ac.event.size = sizeof (SpaNodeEventAsyncComplete); ac.seq = seq; ac.res = res; - spa_poll_invoke (this->main_loop, + spa_loop_invoke (this->main_loop, do_send_event, SPA_ID_INVALID, sizeof (ac), @@ -252,7 +252,7 @@ spa_alsa_source_node_send_command (SpaNode *node, if (this->n_buffers == 0) return SPA_RESULT_NO_BUFFERS; - return spa_poll_invoke (this->data_loop, + return spa_loop_invoke (this->data_loop, do_start, ++this->seq, 0, @@ -268,7 +268,7 @@ spa_alsa_source_node_send_command (SpaNode *node, if (this->n_buffers == 0) return SPA_RESULT_NO_BUFFERS; - return spa_poll_invoke (this->data_loop, + return spa_loop_invoke (this->data_loop, do_pause, ++this->seq, 0, @@ -888,9 +888,9 @@ alsa_source_init (const SpaHandleFactory *factory, this->map = support[i].data; else if (strcmp (support[i].uri, SPA_LOG_URI) == 0) this->log = support[i].data; - else if (strcmp (support[i].uri, SPA_POLL__DataLoop) == 0) + else if (strcmp (support[i].uri, SPA_LOOP__DataLoop) == 0) this->data_loop = support[i].data; - else if (strcmp (support[i].uri, SPA_POLL__MainLoop) == 0) + else if (strcmp (support[i].uri, SPA_LOOP__MainLoop) == 0) this->main_loop = support[i].data; } if (this->map == NULL) { diff --git a/spa/plugins/alsa/alsa-utils.c b/spa/plugins/alsa/alsa-utils.c index 4071c9a1c..4fa72e06f 100644 --- a/spa/plugins/alsa/alsa-utils.c +++ b/spa/plugins/alsa/alsa-utils.c @@ -12,7 +12,7 @@ #define CHECK(s,msg) if ((err = (s)) < 0) { spa_log_error (state->log, msg ": %s", snd_strerror(err)); return err; } -static int alsa_on_fd_events (SpaPollNotifyData *data); +static void alsa_on_fd_events (SpaSource *source); static int spa_alsa_open (SpaALSAState *state) @@ -35,16 +35,6 @@ spa_alsa_open (SpaALSAState *state) SND_PCM_NO_AUTO_FORMAT), "open failed"); - state->poll.id = 0; - state->poll.enabled = false; - state->poll.fds = state->fds; - state->poll.n_fds = 0; - state->poll.idle_cb = NULL; - state->poll.before_cb = NULL; - state->poll.after_cb = alsa_on_fd_events; - state->poll.user_data = state; - spa_poll_add_item (state->data_loop, &state->poll); - state->opened = true; return 0; @@ -58,8 +48,6 @@ spa_alsa_close (SpaALSAState *state) if (!state->opened) return 0; - spa_poll_remove_item (state->data_loop, &state->poll); - spa_log_info (state->log, "Device closing"); CHECK (snd_pcm_close (state->hndl), "close failed"); @@ -508,44 +496,48 @@ mmap_read (SpaALSAState *state) return 0; } -static int -alsa_on_fd_events (SpaPollNotifyData *data) +static void +alsa_on_fd_events (SpaSource *source) { - SpaALSAState *state = data->user_data; + SpaALSAState *state = source->data; snd_pcm_t *hndl = state->hndl; int err; unsigned short revents = 0; +#if 0 snd_pcm_poll_descriptors_revents (hndl, - (struct pollfd *)data->fds, - data->n_fds, + state->fds, + state->n_fds, &revents); - if (revents & POLLERR) { + + spa_log_debug (state->log, "revents: %d %d", revents, state->n_fds); +#endif + revents = source->rmask; + + if (revents & SPA_IO_ERR) { if ((err = xrun_recovery (state, hndl, err)) < 0) { spa_log_error (state->log, "error: %s", snd_strerror (err)); - return -1; + return; } } if (state->stream == SND_PCM_STREAM_CAPTURE) { - if (!(revents & POLLIN)) - return 0; + if (!(revents & SPA_IO_IN)) + return; mmap_read (state); } else { - if (!(revents & POLLOUT)) - return 0; + if (!(revents & SPA_IO_OUT)) + return; mmap_write (state); } - - return 0; } SpaResult spa_alsa_start (SpaALSAState *state, bool xrun_recover) { - int err; + int err, i; if (state->started) return SPA_RESULT_OK; @@ -559,19 +551,33 @@ spa_alsa_start (SpaALSAState *state, bool xrun_recover) return SPA_RESULT_ERROR; } - if ((state->poll.n_fds = snd_pcm_poll_descriptors_count (state->hndl)) <= 0) { - spa_log_error (state->log, "Invalid poll descriptors count %d", state->poll.n_fds); + for (i = 0; i < state->n_fds; i++) + spa_loop_remove_source (state->data_loop, &state->sources[i]); + + if ((state->n_fds = snd_pcm_poll_descriptors_count (state->hndl)) <= 0) { + spa_log_error (state->log, "Invalid poll descriptors count %d", state->n_fds); return SPA_RESULT_ERROR; } - if ((err = snd_pcm_poll_descriptors (state->hndl, (struct pollfd *)state->fds, state->poll.n_fds)) < 0) { + if ((err = snd_pcm_poll_descriptors (state->hndl, state->fds, state->n_fds)) < 0) { spa_log_error (state->log, "snd_pcm_poll_descriptors: %s", snd_strerror(err)); return SPA_RESULT_ERROR; } - if (!xrun_recover) { - state->poll.enabled = true; - spa_poll_update_item (state->data_loop, &state->poll); + for (i = 0; i < state->n_fds; i++) { + state->sources[i].func = alsa_on_fd_events; + state->sources[i].data = state; + state->sources[i].fd = state->fds[i].fd; + state->sources[i].mask = 0; + if (state->fds[i].events & POLLIN) + state->sources[i].mask |= SPA_IO_IN; + if (state->fds[i].events & POLLOUT) + state->sources[i].mask |= SPA_IO_OUT; + if (state->fds[i].events & POLLERR) + state->sources[i].mask |= SPA_IO_ERR; + if (state->fds[i].events & POLLHUP) + state->sources[i].mask |= SPA_IO_HUP; + spa_loop_add_source (state->data_loop, &state->sources[i]); } if (state->stream == SND_PCM_STREAM_PLAYBACK) { @@ -591,15 +597,14 @@ spa_alsa_start (SpaALSAState *state, bool xrun_recover) SpaResult spa_alsa_pause (SpaALSAState *state, bool xrun_recover) { - int err; + int err, i; if (!state->started) return SPA_RESULT_OK; - if (!xrun_recover) { - state->poll.enabled = false; - spa_poll_update_item (state->data_loop, &state->poll); - } + for (i = 0; i < state->n_fds; i++) + spa_loop_remove_source (state->data_loop, &state->sources[i]); + state->n_fds = 0; if ((err = snd_pcm_drop (state->hndl)) < 0) spa_log_error (state->log, "snd_pcm_drop %s", snd_strerror (err)); diff --git a/spa/plugins/alsa/alsa-utils.h b/spa/plugins/alsa/alsa-utils.h index b6f7fdf1a..4ad324fe4 100644 --- a/spa/plugins/alsa/alsa-utils.h +++ b/spa/plugins/alsa/alsa-utils.h @@ -32,6 +32,7 @@ extern "C" { #include #include #include +#include #include #include @@ -73,8 +74,8 @@ struct _SpaALSAState { URI uri; SpaIDMap *map; SpaLog *log; - SpaPoll *main_loop; - SpaPoll *data_loop; + SpaLoop *main_loop; + SpaLoop *data_loop; snd_pcm_stream_t stream; snd_output_t *output; @@ -114,8 +115,9 @@ struct _SpaALSAState { size_t ready_offset; bool started; - SpaPollFd fds[16]; - SpaPollItem poll; + int n_fds; + struct pollfd fds[16]; + SpaSource sources[16]; int64_t sample_count; int64_t last_ticks; diff --git a/spa/plugins/audiotestsrc/audiotestsrc.c b/spa/plugins/audiotestsrc/audiotestsrc.c index 774cec2c3..11a024b6c 100644 --- a/spa/plugins/audiotestsrc/audiotestsrc.c +++ b/spa/plugins/audiotestsrc/audiotestsrc.c @@ -22,11 +22,11 @@ #include #include #include -#include #include #include #include +#include #include #include #include @@ -71,15 +71,15 @@ struct _SpaAudioTestSrc { URI uri; SpaIDMap *map; SpaLog *log; - SpaPoll *data_loop; + SpaLoop *data_loop; SpaAudioTestSrcProps props[2]; SpaNodeEventCallback event_cb; void *user_data; - SpaPollItem timer; - SpaPollFd fds[1]; + bool timer_enabled; + SpaSource timer_source; struct itimerspec timerspec; SpaPortInfo info; @@ -256,20 +256,19 @@ fill_buffer (SpaAudioTestSrc *this, ATSBuffer *b) return SPA_RESULT_OK; } -static int audiotestsrc_on_output (SpaPollNotifyData *data); -static SpaResult update_poll_enabled (SpaAudioTestSrc *this, bool enabled); +static SpaResult update_loop_enabled (SpaAudioTestSrc *this, bool enabled); -static int -audiotestsrc_on_output (SpaPollNotifyData *data) +static void +audiotestsrc_on_output (SpaSource *source) { - SpaAudioTestSrc *this = data->user_data; + SpaAudioTestSrc *this = source->data; ATSBuffer *b; if (spa_list_is_empty (&this->empty)) { if (!this->props[1].live) - update_poll_enabled (this, false); - return 0; + update_loop_enabled (this, false); + return; } b = spa_list_first (&this->empty, ATSBuffer, list); spa_list_remove (&b->list); @@ -288,19 +287,17 @@ audiotestsrc_on_output (SpaPollNotifyData *data) if (this->props[1].live) { uint64_t expirations, next_time; - if (read (this->fds[0].fd, &expirations, sizeof (uint64_t)) < sizeof (uint64_t)) + if (read (this->timer_source.fd, &expirations, sizeof (uint64_t)) < sizeof (uint64_t)) perror ("read timerfd"); next_time = this->start_time + this->elapsed_time; this->timerspec.it_value.tv_sec = next_time / SPA_NSEC_PER_SEC; this->timerspec.it_value.tv_nsec = next_time % SPA_NSEC_PER_SEC; - timerfd_settime (this->fds[0].fd, TFD_TIMER_ABSTIME, &this->timerspec, NULL); + timerfd_settime (this->timer_source.fd, TFD_TIMER_ABSTIME, &this->timerspec, NULL); } spa_list_insert (this->ready.prev, &b->list); send_have_output (this); - - return 0; } static void @@ -310,10 +307,15 @@ update_state (SpaAudioTestSrc *this, SpaNodeState state) } static SpaResult -update_poll_enabled (SpaAudioTestSrc *this, bool enabled) +update_loop_enabled (SpaAudioTestSrc *this, bool enabled) { if (this->event_cb) { - this->timer.enabled = enabled; + this->timer_enabled = enabled; + if (enabled) + this->timer_source.mask = SPA_IO_IN; + else + this->timer_source.mask = 0; + if (this->props[1].live) { if (enabled) { uint64_t next_time = this->start_time + this->elapsed_time; @@ -324,18 +326,11 @@ update_poll_enabled (SpaAudioTestSrc *this, bool enabled) this->timerspec.it_value.tv_sec = 0; this->timerspec.it_value.tv_nsec = 0; } - timerfd_settime (this->fds[0].fd, TFD_TIMER_ABSTIME, &this->timerspec, NULL); - this->timer.fds = this->fds; - this->timer.n_fds = 1; - this->timer.idle_cb = NULL; - this->timer.after_cb = audiotestsrc_on_output; - } else { - this->timer.fds = NULL; - this->timer.n_fds = 0; - this->timer.idle_cb = audiotestsrc_on_output; - this->timer.after_cb = NULL; + timerfd_settime (this->timer_source.fd, TFD_TIMER_ABSTIME, &this->timerspec, NULL); + + this->timer_source.func = audiotestsrc_on_output; } - spa_poll_update_item (this->data_loop, &this->timer); + spa_loop_update_source (this->data_loop, &this->timer_source); } return SPA_RESULT_OK; } @@ -377,7 +372,7 @@ spa_audiotestsrc_node_send_command (SpaNode *node, this->elapsed_time = 0; this->started = true; - update_poll_enabled (this, true); + update_loop_enabled (this, true); update_state (this, SPA_NODE_STATE_STREAMING); break; } @@ -393,7 +388,7 @@ spa_audiotestsrc_node_send_command (SpaNode *node, return SPA_RESULT_OK; this->started = false; - update_poll_enabled (this, false); + update_loop_enabled (this, false); update_state (this, SPA_NODE_STATE_PAUSED); break; } @@ -419,14 +414,14 @@ spa_audiotestsrc_node_set_event_callback (SpaNode *node, this = SPA_CONTAINER_OF (node, SpaAudioTestSrc, node); if (event_cb == NULL && this->event_cb) { - spa_poll_remove_item (this->data_loop, &this->timer); + spa_loop_remove_source (this->data_loop, &this->timer_source); } this->event_cb = event_cb; this->user_data = user_data; if (this->event_cb) { - spa_poll_add_item (this->data_loop, &this->timer); + spa_loop_add_source (this->data_loop, &this->timer_source); } return SPA_RESULT_OK; } @@ -801,7 +796,7 @@ spa_audiotestsrc_node_port_reuse_buffer (SpaNode *node, spa_list_insert (this->empty.prev, &b->list); if (!this->props[1].live) - update_poll_enabled (this, true); + update_loop_enabled (this, true); return SPA_RESULT_OK; } @@ -958,7 +953,7 @@ audiotestsrc_clear (SpaHandle *handle) this = (SpaAudioTestSrc *) handle; - close (this->fds[0].fd); + close (this->timer_source.fd); return SPA_RESULT_OK; } @@ -986,7 +981,7 @@ audiotestsrc_init (const SpaHandleFactory *factory, this->map = support[i].data; else if (strcmp (support[i].uri, SPA_LOG_URI) == 0) this->log = support[i].data; - else if (strcmp (support[i].uri, SPA_POLL__DataLoop) == 0) + else if (strcmp (support[i].uri, SPA_LOOP__DataLoop) == 0) this->data_loop = support[i].data; } if (this->map == NULL) { @@ -1009,21 +1004,15 @@ audiotestsrc_init (const SpaHandleFactory *factory, spa_list_init (&this->empty); spa_list_init (&this->ready); - this->fds[0].fd = timerfd_create (CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC); - this->fds[0].events = POLLIN | POLLPRI | POLLERR; - this->fds[0].revents = 0; + this->timer_source.func = audiotestsrc_on_output; + this->timer_source.data = this; + this->timer_source.fd = timerfd_create (CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC); + this->timer_source.mask = SPA_IO_IN | SPA_IO_ERR; this->timerspec.it_value.tv_sec = 0; this->timerspec.it_value.tv_nsec = 0; this->timerspec.it_interval.tv_sec = 0; this->timerspec.it_interval.tv_nsec = 0; - this->timer.id = 0; - this->timer.enabled = false; - this->timer.idle_cb = NULL; - this->timer.before_cb = NULL; - this->timer.after_cb = NULL; - this->timer.user_data = this; - this->info.flags = SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS | SPA_PORT_INFO_FLAG_NO_REF; if (this->props[1].live) diff --git a/spa/plugins/v4l2/v4l2-monitor.c b/spa/plugins/v4l2/v4l2-monitor.c index e8357a352..6fc666338 100644 --- a/spa/plugins/v4l2/v4l2-monitor.c +++ b/spa/plugins/v4l2/v4l2-monitor.c @@ -22,13 +22,12 @@ #include #include #include -#include #include #include #include -#include +#include #include #include @@ -54,7 +53,7 @@ struct _SpaV4l2Monitor { URI uri; SpaIDMap *map; SpaLog *log; - SpaPoll *main_loop; + SpaLoop *main_loop; SpaMonitorEventCallback event_cb; void *user_data; @@ -65,9 +64,7 @@ struct _SpaV4l2Monitor { V4l2Item uitem; - int fd; - SpaPollFd fds[1]; - SpaPollItem poll; + SpaSource source; }; static SpaResult @@ -180,19 +177,18 @@ fill_item (V4l2Item *item, struct udev_device *udevice) } -static int -v4l2_on_fd_events (SpaPollNotifyData *data) +static void +v4l2_on_fd_events (SpaSource *source) { - SpaV4l2Monitor *this = data->user_data; + SpaV4l2Monitor *this = source->data; struct udev_device *dev; const char *action; SpaMonitorItem *item; - dev = udev_monitor_receive_device (this->umonitor); fill_item (&this->uitem, dev); if (dev == NULL) - return 0; + return; if ((action = udev_device_get_action (dev)) == NULL) action = "change"; @@ -208,8 +204,6 @@ v4l2_on_fd_events (SpaPollNotifyData *data) } item->event.size = sizeof (this->uitem); this->event_cb (&this->monitor, &item->event, this->user_data); - - return 0; } static SpaResult @@ -241,23 +235,14 @@ spa_v4l2_monitor_set_event_callback (SpaMonitor *monitor, NULL); udev_monitor_enable_receiving (this->umonitor); - this->fd = udev_monitor_get_fd (this->umonitor);; + this->source.func = v4l2_on_fd_events; + this->source.data = this; + this->source.fd = udev_monitor_get_fd (this->umonitor);; + this->source.mask = SPA_IO_IN | SPA_IO_ERR; - this->fds[0].fd = this->fd; - this->fds[0].events = POLLIN | POLLPRI | POLLERR; - this->fds[0].revents = 0; - - this->poll.id = 0; - this->poll.enabled = true; - this->poll.fds = this->fds; - this->poll.n_fds = 1; - this->poll.idle_cb = NULL; - this->poll.before_cb = NULL; - this->poll.after_cb = v4l2_on_fd_events; - this->poll.user_data = this; - spa_poll_add_item (this->main_loop, &this->poll); + spa_loop_add_source (this->main_loop, &this->source); } else { - spa_poll_remove_item (this->main_loop, &this->poll); + spa_loop_remove_source (this->main_loop, &this->source); } return SPA_RESULT_OK; @@ -372,7 +357,7 @@ v4l2_monitor_init (const SpaHandleFactory *factory, this->map = support[i].data; else if (strcmp (support[i].uri, SPA_LOG_URI) == 0) this->log = support[i].data; - else if (strcmp (support[i].uri, SPA_POLL__MainLoop) == 0) + else if (strcmp (support[i].uri, SPA_LOOP__MainLoop) == 0) this->main_loop = support[i].data; } if (this->map == NULL) { diff --git a/spa/plugins/v4l2/v4l2-source.c b/spa/plugins/v4l2/v4l2-source.c index 006df9c4b..9dafb5f92 100644 --- a/spa/plugins/v4l2/v4l2-source.c +++ b/spa/plugins/v4l2/v4l2-source.c @@ -28,6 +28,7 @@ #include #include #include +#include #include #include #include @@ -88,8 +89,8 @@ typedef struct { typedef struct { SpaLog *log; - SpaPoll *main_loop; - SpaPoll *data_loop; + SpaLoop *main_loop; + SpaLoop *data_loop; bool export_buf; bool started; @@ -115,8 +116,8 @@ typedef struct { unsigned int n_buffers; SpaList ready; - SpaPollFd fds[1]; - SpaPollItem poll; + bool source_enabled; + SpaSource source; SpaPortInfo info; SpaAllocParam *params[2]; @@ -228,7 +229,7 @@ spa_v4l2_source_node_set_props (SpaNode *node, } static SpaResult -do_pause_done (SpaPoll *poll, +do_pause_done (SpaLoop *loop, bool async, uint32_t seq, size_t size, @@ -253,7 +254,7 @@ do_pause_done (SpaPoll *poll, } static SpaResult -do_pause (SpaPoll *poll, +do_pause (SpaLoop *loop, bool async, uint32_t seq, size_t size, @@ -275,7 +276,7 @@ do_pause (SpaPoll *poll, ac.event.size = sizeof (SpaNodeEventAsyncComplete); ac.seq = seq; ac.res = res; - spa_poll_invoke (this->state[0].main_loop, + spa_loop_invoke (this->state[0].main_loop, do_pause_done, seq, sizeof (ac), @@ -286,7 +287,7 @@ do_pause (SpaPoll *poll, } static SpaResult -do_start_done (SpaPoll *poll, +do_start_done (SpaLoop *loop, bool async, uint32_t seq, size_t size, @@ -307,7 +308,7 @@ do_start_done (SpaPoll *poll, } static SpaResult -do_start (SpaPoll *poll, +do_start (SpaLoop *loop, bool async, uint32_t seq, size_t size, @@ -329,7 +330,7 @@ do_start (SpaPoll *poll, ac.event.size = sizeof (SpaNodeEventAsyncComplete); ac.seq = seq; ac.res = res; - spa_poll_invoke (this->state[0].main_loop, + spa_loop_invoke (this->state[0].main_loop, do_start_done, seq, sizeof (ac), @@ -372,7 +373,7 @@ spa_v4l2_source_node_send_command (SpaNode *node, if ((res = spa_v4l2_stream_on (this)) < 0) return res; - return spa_poll_invoke (this->state[0].data_loop, + return spa_loop_invoke (this->state[0].data_loop, do_start, ++this->seq, command->size, @@ -392,7 +393,7 @@ spa_v4l2_source_node_send_command (SpaNode *node, if (!state->started) return SPA_RESULT_OK; - return spa_poll_invoke (this->state[0].data_loop, + return spa_loop_invoke (this->state[0].data_loop, do_pause, ++this->seq, command->size, @@ -1007,9 +1008,9 @@ v4l2_source_init (const SpaHandleFactory *factory, this->map = support[i].data; else if (strcmp (support[i].uri, SPA_LOG_URI) == 0) this->log = support[i].data; - else if (strcmp (support[i].uri, SPA_POLL__MainLoop) == 0) + else if (strcmp (support[i].uri, SPA_LOOP__MainLoop) == 0) this->state[0].main_loop = support[i].data; - else if (strcmp (support[i].uri, SPA_POLL__DataLoop) == 0) + else if (strcmp (support[i].uri, SPA_LOOP__DataLoop) == 0) this->state[0].data_loop = support[i].data; } if (this->map == NULL) { diff --git a/spa/plugins/v4l2/v4l2-utils.c b/spa/plugins/v4l2/v4l2-utils.c index 9fbf02449..de291aa10 100644 --- a/spa/plugins/v4l2/v4l2-utils.c +++ b/spa/plugins/v4l2/v4l2-utils.c @@ -9,7 +9,7 @@ #define CLEAR(x) memset(&(x), 0, sizeof(x)) -static int v4l2_on_fd_events (SpaPollNotifyData *data); +static void v4l2_on_fd_events (SpaSource *source); static int xioctl (int fd, int request, void *arg) @@ -69,19 +69,11 @@ spa_v4l2_open (SpaV4l2Source *this) return -1; } - state->fds[0].fd = state->fd; - state->fds[0].events = POLLIN | POLLPRI | POLLERR; - state->fds[0].revents = 0; - - state->poll.id = 0; - state->poll.enabled = false; - state->poll.fds = state->fds; - state->poll.n_fds = 1; - state->poll.idle_cb = NULL; - state->poll.before_cb = NULL; - state->poll.after_cb = v4l2_on_fd_events; - state->poll.user_data = this; - spa_poll_add_item (state->data_loop, &state->poll); + state->source.func = v4l2_on_fd_events; + state->source.data = this; + state->source.fd = state->fd; + state->source.mask = SPA_IO_IN | SPA_IO_ERR; + state->source.rmask = 0; state->opened = true; @@ -158,7 +150,8 @@ spa_v4l2_close (SpaV4l2Source *this) spa_log_info (state->log, "v4l2: close"); - spa_poll_remove_item (state->data_loop, &state->poll); + if (state->source_enabled) + spa_loop_remove_source (state->data_loop, &state->source); if (close(state->fd)) perror ("close"); @@ -890,27 +883,25 @@ mmap_read (SpaV4l2Source *this) return SPA_RESULT_OK; } -static int -v4l2_on_fd_events (SpaPollNotifyData *data) +static void +v4l2_on_fd_events (SpaSource *source) { - SpaV4l2Source *this = data->user_data; + SpaV4l2Source *this = source->data; SpaNodeEventHaveOutput ho; - if (data->fds[0].revents & POLLERR) - return -1; + if (source->rmask & SPA_IO_ERR) + return; - if (!(data->fds[0].revents & POLLIN)) - return 0; + if (!(source->rmask & SPA_IO_IN)) + return; if (mmap_read (this) < 0) - return 0; + return; ho.event.type = SPA_NODE_EVENT_TYPE_HAVE_OUTPUT; ho.event.size = sizeof (ho); ho.port_id = 0; this->event_cb (&this->node, &ho.event, this->user_data); - - return 0; } static SpaResult @@ -1150,8 +1141,13 @@ static SpaResult spa_v4l2_port_set_enabled (SpaV4l2Source *this, bool enabled) { SpaV4l2State *state = &this->state[0]; - state->poll.enabled = enabled; - spa_poll_update_item (state->data_loop, &state->poll); + if (state->source_enabled != enabled) { + state->source_enabled = enabled; + if (enabled) + spa_loop_add_source (state->data_loop, &state->source); + else + spa_loop_remove_source (state->data_loop, &state->source); + } return SPA_RESULT_OK; } diff --git a/spa/plugins/videotestsrc/videotestsrc.c b/spa/plugins/videotestsrc/videotestsrc.c index a29349bd8..2016a7ad0 100644 --- a/spa/plugins/videotestsrc/videotestsrc.c +++ b/spa/plugins/videotestsrc/videotestsrc.c @@ -23,10 +23,10 @@ #include #include #include -#include #include #include +#include #include #include #include @@ -74,15 +74,15 @@ struct _SpaVideoTestSrc { URI uri; SpaIDMap *map; SpaLog *log; - SpaPoll *data_loop; + SpaLoop *data_loop; SpaVideoTestSrcProps props[2]; SpaNodeEventCallback event_cb; void *user_data; - SpaPollItem timer; - SpaPollFd fds[1]; + SpaSource timer_source; + bool timer_enabled; struct itimerspec timerspec; SpaPortInfo info; @@ -205,18 +205,18 @@ fill_buffer (SpaVideoTestSrc *this, VTSBuffer *b) return SPA_RESULT_OK; } -static SpaResult update_poll_enabled (SpaVideoTestSrc *this, bool enabled); +static SpaResult update_loop_enabled (SpaVideoTestSrc *this, bool enabled); -static int -videotestsrc_on_output (SpaPollNotifyData *data) +static void +videotestsrc_on_output (SpaSource *source) { - SpaVideoTestSrc *this = data->user_data; + SpaVideoTestSrc *this = source->data; VTSBuffer *b; if (spa_list_is_empty (&this->empty)) { if (!this->props[1].live) - update_poll_enabled (this, false); - return 0; + update_loop_enabled (this, false); + return; } b = spa_list_first (&this->empty, VTSBuffer, list); spa_list_remove (&b->list); @@ -235,19 +235,17 @@ videotestsrc_on_output (SpaPollNotifyData *data) if (this->props[1].live) { uint64_t expirations, next_time; - if (read (this->fds[0].fd, &expirations, sizeof (uint64_t)) < sizeof (uint64_t)) + if (read (this->timer_source.fd, &expirations, sizeof (uint64_t)) < sizeof (uint64_t)) perror ("read timerfd"); next_time = this->start_time + this->elapsed_time; this->timerspec.it_value.tv_sec = next_time / SPA_NSEC_PER_SEC; this->timerspec.it_value.tv_nsec = next_time % SPA_NSEC_PER_SEC; - timerfd_settime (this->fds[0].fd, TFD_TIMER_ABSTIME, &this->timerspec, NULL); + timerfd_settime (this->timer_source.fd, TFD_TIMER_ABSTIME, &this->timerspec, NULL); } spa_list_insert (this->ready.prev, &b->list); send_have_output (this); - - return 0; } static void @@ -257,10 +255,15 @@ update_state (SpaVideoTestSrc *this, SpaNodeState state) } static SpaResult -update_poll_enabled (SpaVideoTestSrc *this, bool enabled) +update_loop_enabled (SpaVideoTestSrc *this, bool enabled) { - if (this->event_cb && this->timer.enabled != enabled) { - this->timer.enabled = enabled; + if (this->event_cb && this->timer_enabled != enabled) { + this->timer_enabled = enabled; + if (enabled) + this->timer_source.mask = SPA_IO_IN; + else + this->timer_source.mask = 0; + if (this->props[1].live) { if (enabled) { uint64_t next_time = this->start_time + this->elapsed_time; @@ -271,18 +274,9 @@ update_poll_enabled (SpaVideoTestSrc *this, bool enabled) this->timerspec.it_value.tv_sec = 0; this->timerspec.it_value.tv_nsec = 0; } - timerfd_settime (this->fds[0].fd, TFD_TIMER_ABSTIME, &this->timerspec, NULL); - this->timer.fds = this->fds; - this->timer.n_fds = 1; - this->timer.idle_cb = NULL; - this->timer.after_cb = videotestsrc_on_output; - } else { - this->timer.fds = NULL; - this->timer.n_fds = 0; - this->timer.idle_cb = videotestsrc_on_output; - this->timer.after_cb = NULL; + timerfd_settime (this->timer_source.fd, TFD_TIMER_ABSTIME, &this->timerspec, NULL); } - spa_poll_update_item (this->data_loop, &this->timer); + spa_loop_update_source (this->data_loop, &this->timer_source); } return SPA_RESULT_OK; } @@ -324,7 +318,7 @@ spa_videotestsrc_node_send_command (SpaNode *node, this->elapsed_time = 0; this->started = true; - update_poll_enabled (this, true); + update_loop_enabled (this, true); update_state (this, SPA_NODE_STATE_STREAMING); break; } @@ -340,7 +334,7 @@ spa_videotestsrc_node_send_command (SpaNode *node, return SPA_RESULT_OK; this->started = false; - update_poll_enabled (this, false); + update_loop_enabled (this, false); update_state (this, SPA_NODE_STATE_PAUSED); break; } @@ -365,15 +359,14 @@ spa_videotestsrc_node_set_event_callback (SpaNode *node, this = SPA_CONTAINER_OF (node, SpaVideoTestSrc, node); - if (event_cb == NULL && this->event_cb) { - spa_poll_remove_item (this->data_loop, &this->timer); - } + if (event_cb == NULL && this->event_cb) + spa_loop_remove_source (this->data_loop, &this->timer_source); this->event_cb = event_cb; this->user_data = user_data; if (this->event_cb) { - spa_poll_add_item (this->data_loop, &this->timer); + spa_loop_add_source (this->data_loop, &this->timer_source); } return SPA_RESULT_OK; } @@ -748,7 +741,7 @@ spa_videotestsrc_node_port_reuse_buffer (SpaNode *node, spa_list_insert (this->empty.prev, &b->list); if (!this->props[1].live) - update_poll_enabled (this, true); + update_loop_enabled (this, true); return SPA_RESULT_OK; } @@ -910,7 +903,7 @@ videotestsrc_clear (SpaHandle *handle) this = (SpaVideoTestSrc *) handle; - close (this->fds[0].fd); + close (this->timer_source.fd); return SPA_RESULT_OK; } @@ -938,7 +931,7 @@ videotestsrc_init (const SpaHandleFactory *factory, this->map = support[i].data; else if (strcmp (support[i].uri, SPA_LOG_URI) == 0) this->log = support[i].data; - else if (strcmp (support[i].uri, SPA_POLL__DataLoop) == 0) + else if (strcmp (support[i].uri, SPA_LOOP__DataLoop) == 0) this->data_loop = support[i].data; } if (this->map == NULL) { @@ -957,21 +950,16 @@ videotestsrc_init (const SpaHandleFactory *factory, spa_list_init (&this->empty); spa_list_init (&this->ready); - this->fds[0].fd = timerfd_create (CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC); - this->fds[0].events = POLLIN | POLLPRI | POLLERR; - this->fds[0].revents = 0; + this->timer_source.func = videotestsrc_on_output; + this->timer_source.data = this; + this->timer_source.fd = timerfd_create (CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC); + this->timer_source.mask = SPA_IO_IN; + this->timer_source.rmask = 0; this->timerspec.it_value.tv_sec = 0; this->timerspec.it_value.tv_nsec = 0; this->timerspec.it_interval.tv_sec = 0; this->timerspec.it_interval.tv_nsec = 0; - this->timer.id = 0; - this->timer.enabled = false; - this->timer.idle_cb = NULL; - this->timer.before_cb = NULL; - this->timer.after_cb = NULL; - this->timer.user_data = this; - this->info.flags = SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS | SPA_PORT_INFO_FLAG_NO_REF; if (this->props[1].live) diff --git a/spa/tests/meson.build b/spa/tests/meson.build index db474b142..faf05f43a 100644 --- a/spa/tests/meson.build +++ b/spa/tests/meson.build @@ -1,11 +1,11 @@ -executable('test-mixer', 'test-mixer.c', - include_directories : [spa_inc, spa_libinc ], - dependencies : [dl_lib, pthread_lib], - link_with : spalib, - install : false) - -executable('test-v4l2', 'test-v4l2.c', - include_directories : [spa_inc, spa_libinc ], - dependencies : [dl_lib, sdl_dep, pthread_lib], - link_with : spalib, - install : false) +#executable('test-mixer', 'test-mixer.c', +# include_directories : [spa_inc, spa_libinc ], +# dependencies : [dl_lib, pthread_lib], +# link_with : spalib, +# install : false) +# +#executable('test-v4l2', 'test-v4l2.c', +# include_directories : [spa_inc, spa_libinc ], +# dependencies : [dl_lib, sdl_dep, pthread_lib], +# link_with : spalib, +# install : false) diff --git a/spa/tests/test-v4l2.c b/spa/tests/test-v4l2.c index dc300171b..662227df0 100644 --- a/spa/tests/test-v4l2.c +++ b/spa/tests/test-v4l2.c @@ -31,6 +31,7 @@ #include #include #include +#include #include #include #include diff --git a/spa/tools/spa-monitor.c b/spa/tools/spa-monitor.c index 55c54e3df..6fe7e3109 100644 --- a/spa/tools/spa-monitor.c +++ b/spa/tools/spa-monitor.c @@ -28,7 +28,7 @@ #include #include #include -#include +#include #include #include @@ -41,16 +41,16 @@ typedef struct { SpaIDMap *map; SpaLog *log; - SpaPoll main_loop; + SpaLoop main_loop; SpaSupport support[3]; unsigned int n_support; - unsigned int n_poll; - SpaPollItem poll[16]; + unsigned int n_sources; + SpaSource sources[16]; bool rebuild_fds; - SpaPollFd fds[16]; + struct pollfd fds[16]; unsigned int n_fds; } AppData; @@ -97,31 +97,28 @@ on_monitor_event (SpaMonitor *monitor, } static SpaResult -do_add_item (SpaPoll *poll, - SpaPollItem *item) +do_add_source (SpaLoop *loop, + SpaSource *source) { - AppData *data = SPA_CONTAINER_OF (poll, AppData, main_loop); + AppData *data = SPA_CONTAINER_OF (loop, AppData, main_loop); - data->poll[data->n_poll] = *item; - data->n_poll++; - if (item->n_fds) - data->rebuild_fds = true; + data->sources[data->n_sources] = *source; + data->n_sources++; + data->rebuild_fds = true; return SPA_RESULT_OK; } static SpaResult -do_update_item (SpaPoll *poll, - SpaPollItem *item) +do_update_source (SpaSource *source) { return SPA_RESULT_OK; } -static SpaResult -do_remove_item (SpaPoll *poll, - SpaPollItem *item) +static void +do_remove_source (SpaSource *source) { - return SPA_RESULT_OK; } + static void handle_monitor (AppData *data, SpaMonitor *monitor) { @@ -145,23 +142,16 @@ handle_monitor (AppData *data, SpaMonitor *monitor) spa_monitor_set_event_callback (monitor, on_monitor_event, &data); while (true) { - SpaPollNotifyData ndata; - int i, j, r; + int i, r; /* rebuild */ if (data->rebuild_fds) { - data->n_fds = 0; - for (i = 0; i < data->n_poll; i++) { - SpaPollItem *p = &data->poll[i]; - - if (!p->enabled) - continue; - - for (j = 0; j < p->n_fds; j++) - data->fds[data->n_fds + j] = p->fds[j]; - p->fds = &data->fds[data->n_fds]; - data->n_fds += p->n_fds; + for (i = 0; i < data->n_sources; i++) { + SpaSource *p = &data->sources[i]; + data->fds[i].fd = p->fd; + data->fds[i].events = p->mask; } + data->n_fds = data->n_sources; data->rebuild_fds = false; } @@ -177,15 +167,9 @@ handle_monitor (AppData *data, SpaMonitor *monitor) } /* after */ - for (i = 0; i < data->n_poll; i++) { - SpaPollItem *p = &data->poll[i]; - - if (p->enabled && p->after_cb) { - ndata.fds = p->fds; - ndata.n_fds = p->n_fds; - ndata.user_data = p->user_data; - p->after_cb (&ndata); - } + for (i = 0; i < data->n_sources; i++) { + SpaSource *p = &data->sources[i]; + p->func (p); } } } @@ -201,17 +185,16 @@ main (int argc, char *argv[]) data.map = spa_id_map_get_default (); data.log = NULL; - data.main_loop.size = sizeof (SpaPoll); - data.main_loop.info = NULL; - data.main_loop.add_item = do_add_item; - data.main_loop.update_item = do_update_item; - data.main_loop.remove_item = do_remove_item; + data.main_loop.size = sizeof (SpaLoop); + data.main_loop.add_source = do_add_source; + data.main_loop.update_source = do_update_source; + data.main_loop.remove_source = do_remove_source; data.support[0].uri = SPA_ID_MAP_URI; data.support[0].data = data.map; data.support[1].uri = SPA_LOG_URI; data.support[1].data = data.log; - data.support[2].uri = SPA_POLL__MainLoop; + data.support[2].uri = SPA_LOOP__MainLoop; data.support[2].data = &data.main_loop; data.n_support = 3;