From 2865e40618b0e5d8e47132b54ff88a1fedf2ffa3 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Mon, 13 Feb 2023 17:58:42 +0100 Subject: [PATCH] filter: support pw_filter_trigger_process() Add pw_filter_trigger_process() as a way to implement a driver node. Also add a function to check if a filter is a driver. --- src/pipewire/filter.c | 98 +++++++++++++++++++++++++++++-------------- src/pipewire/filter.h | 10 +++++ 2 files changed, 77 insertions(+), 31 deletions(-) diff --git a/src/pipewire/filter.c b/src/pipewire/filter.c index 7393e6067..89a820a09 100644 --- a/src/pipewire/filter.c +++ b/src/pipewire/filter.c @@ -133,6 +133,7 @@ struct filter { struct spa_node impl_node; struct spa_hook_list hooks; struct spa_callbacks callbacks; + struct spa_io_clock *clock; struct spa_io_position *position; struct { @@ -168,6 +169,7 @@ struct filter { unsigned int allow_mlock:1; unsigned int warn_mlock:1; unsigned int process_rt:1; + unsigned int driving:1; }; static int get_param_index(uint32_t id) @@ -475,6 +477,12 @@ static int impl_set_io(void *object, uint32_t id, void *data, size_t size) pw_log_debug("%p: io %d %p/%zd", impl, id, data, size); switch(id) { + case SPA_IO_Clock: + if (data && size >= sizeof(struct spa_io_clock)) + impl->clock = data; + else + impl->clock = NULL; + break; case SPA_IO_Position: if (data && size >= sizeof(struct spa_io_position)) impl->position = data; @@ -484,6 +492,7 @@ static int impl_set_io(void *object, uint32_t id, void *data, size_t size) do_set_position, 1, NULL, 0, true, impl); break; } + impl->driving = impl->clock && impl->position && impl->position->clock.id == impl->clock->id; pw_filter_emit_io_changed(&impl->this, NULL, id, data, size); return 0; @@ -1749,22 +1758,23 @@ error_cleanup: return NULL; } +static inline void free_port(struct filter *impl, struct port *port) +{ + spa_list_remove(&port->link); + spa_node_emit_port_info(&impl->hooks, port->direction, port->id, NULL); + pw_map_remove(&impl->ports[port->direction], port->id); + clear_buffers(port); + clear_params(impl, port, SPA_ID_INVALID); + pw_properties_free(port->props); + free(port); +} + SPA_EXPORT int pw_filter_remove_port(void *port_data) { struct port *port = SPA_CONTAINER_OF(port_data, struct port, user_data); struct filter *impl = port->filter; - - spa_node_emit_port_info(&impl->hooks, port->direction, port->id, NULL); - - spa_list_remove(&port->link); - pw_map_remove(&impl->ports[port->direction], port->id); - - clear_buffers(port); - clear_params(impl, port, SPA_ID_INVALID); - pw_properties_free(port->props); - free(port); - + free_port(impl, port); return 0; } @@ -1844,25 +1854,6 @@ int pw_filter_get_time(struct pw_filter *filter, struct pw_time *time) return 0; } -static int -do_process(struct spa_loop *loop, - bool async, uint32_t seq, const void *data, size_t size, void *user_data) -{ - struct filter *impl = user_data; - int res = impl_node_process(impl); - return spa_node_call_ready(&impl->callbacks, res); -} - -static inline int call_trigger(struct filter *impl) -{ - int res = 0; - if (SPA_FLAG_IS_SET(impl->flags, PW_FILTER_FLAG_DRIVER)) { - res = pw_loop_invoke(impl->context->data_loop, - do_process, 1, NULL, 0, false, impl); - } - return res; -} - SPA_EXPORT struct pw_buffer *pw_filter_dequeue_buffer(void *port_data) { @@ -1894,7 +1885,7 @@ int pw_filter_queue_buffer(void *port_data, struct pw_buffer *buffer) if ((res = push_queue(p, &p->queued, b)) < 0) return res; - return call_trigger(impl); + return res; } SPA_EXPORT @@ -1960,3 +1951,48 @@ int pw_filter_flush(struct pw_filter *filter, bool drain) drain ? do_drain : do_flush, 1, NULL, 0, true, impl); return 0; } + +SPA_EXPORT +bool pw_filter_is_driving(struct pw_filter *filter) +{ + struct filter *impl = SPA_CONTAINER_OF(filter, struct filter, this); + return impl->driving; +} + +static int +do_trigger_process(struct spa_loop *loop, + bool async, uint32_t seq, const void *data, size_t size, void *user_data) +{ + struct filter *impl = user_data; + int res = impl_node_process(impl); + return spa_node_call_ready(&impl->callbacks, res); +} + +static int trigger_request_process(struct filter *impl) +{ + uint8_t buffer[1024]; + struct spa_pod_builder b = { 0 }; + + spa_pod_builder_init(&b, buffer, sizeof(buffer)); + spa_node_emit_event(&impl->hooks, + spa_pod_builder_add_object(&b, + SPA_TYPE_EVENT_Node, SPA_NODE_EVENT_RequestProcess)); + return 0; +} + +SPA_EXPORT +int pw_filter_trigger_process(struct pw_filter *filter) +{ + struct filter *impl = SPA_CONTAINER_OF(filter, struct filter, this); + int res = 0; + + pw_log_trace_fp("%p", impl); + + if (!impl->driving) { + res = trigger_request_process(impl); + } else { + res = pw_loop_invoke(impl->context->data_loop, + do_trigger_process, 1, NULL, 0, false, impl); + } + return res; +} diff --git a/src/pipewire/filter.h b/src/pipewire/filter.h index 255608ab8..278413baf 100644 --- a/src/pipewire/filter.h +++ b/src/pipewire/filter.h @@ -239,6 +239,16 @@ int pw_filter_set_active(struct pw_filter *filter, bool active); * be called when all data is played or recorded */ int pw_filter_flush(struct pw_filter *filter, bool drain); +/** Check if the filter is driving. The filter needs to have the + * PW_FILTER_FLAG_DRIVER set. When the filter is driving, + * pw_filter_trigger_process() needs to be called when data is + * available (output) or needed (input). Since 0.3.66 */ +bool pw_filter_is_driving(struct pw_filter *filter); + +/** Trigger a push/pull on the filter. One iteration of the graph will + * be scheduled and process() will be called. Since 0.3.66 */ +int pw_filter_trigger_process(struct pw_filter *filter); + /** * \} */