From c296c52cae9a28a8b2a310541e24746d4c040a91 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Thu, 18 Sep 2025 15:08:24 +0200 Subject: [PATCH] stream: avoid work at the end of the cycle Driver output streams will start the cycle with a _trigger() operation, which will call the process function (if necessary) to dequeue/queue a buffer before starting the graph cycle. At the end of the cycle, the internal stream process function is called again to recycle any buffers but we should not try to dequeue a new buffer (if there was any in the queue) and say that we have data. Do this by keeping track of when the internal process function was called because of trigger or because of the end of the cycle. At the end of the cycle, we can call the trigger_end() but we should not prepare a new buffer on the output io. --- src/pipewire/stream.c | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/src/pipewire/stream.c b/src/pipewire/stream.c index b0192a7a9..c4b785017 100644 --- a/src/pipewire/stream.c +++ b/src/pipewire/stream.c @@ -157,6 +157,8 @@ struct stream { int in_set_param; int in_emit_param_changed; int pending_drain; + + int in_trigger; }; static int get_param_index(uint32_t id) @@ -1107,7 +1109,7 @@ static int impl_node_process_output(void *object) struct spa_io_buffers *io = impl->io; struct buffer *b; int res; - bool ask_more; + bool ask_more, driver_end; if (io == NULL) return -EIO; @@ -1117,6 +1119,8 @@ again: io->status, io->buffer_id); ask_more = false; + driver_end = stream->node->driving && !impl->in_trigger; + if ((res = io->status) != SPA_STATUS_HAVE_DATA) { /* recycle old buffer */ if ((b = get_buffer(stream, io->buffer_id)) != NULL) { @@ -1126,8 +1130,9 @@ again: ask_more = true; } - /* pop new buffer */ - if ((b = queue_pop(impl, &impl->queued)) != NULL) { + /* pop new buffer but only if we are not a driver and completing + * the cycle. */ + if (!driver_end && (b = queue_pop(impl, &impl->queued)) != NULL) { impl->drained = false; io->buffer_id = b->id; res = io->status = SPA_STATUS_HAVE_DATA; @@ -1145,6 +1150,9 @@ again: pw_log_trace_fp("%p: no more buffers %p", stream, io); ask_more = true; } + } else if (driver_end) { + /* if we are completing the cycle, don't say we have more data */ + res = SPA_STATUS_NEED_DATA; } copy_position(impl, impl->queued.outcount); @@ -1159,7 +1167,7 @@ again: pw_log_trace_fp("%p: res %d", stream, res); - if (stream->node->driving && impl->using_trigger && res != SPA_STATUS_HAVE_DATA) + if (driver_end && impl->using_trigger) call_trigger_done(impl); return res; @@ -2484,7 +2492,9 @@ do_trigger_deprecated(struct spa_loop *loop, bool async, uint32_t seq, const void *data, size_t size, void *user_data) { struct stream *impl = user_data; + impl->in_trigger++; int res = impl->node_methods.process(impl); + impl->in_trigger--; return spa_node_call_ready(&impl->callbacks, res); } @@ -2661,7 +2671,9 @@ do_trigger_driver(struct spa_loop *loop, int res; if (impl->direction == SPA_DIRECTION_OUTPUT) { call_process(impl); + impl->in_trigger++; res = impl->node_methods.process(impl); + impl->in_trigger--; } else { res = SPA_STATUS_NEED_DATA; }