From 165bd7b219c769179f9dcf13a53112f0989b2646 Mon Sep 17 00:00:00 2001 From: Robert Rosengren Date: Thu, 18 Dec 2025 10:51:07 +0100 Subject: [PATCH] pipewiresrc: fix race when node suspended moving from PAUSED to PLAYING If in PAUSED state, the node can move from idle to suspended resulting in format cleared and state is no longer negotiated. To avoid returning not-negotiated error upon basesrc calling create callback, wait for new format to be provided and negotiated state is back. --- src/gst/gstpipewiresrc.c | 70 ++++++++++++++++++++++++++++------------ 1 file changed, 49 insertions(+), 21 deletions(-) diff --git a/src/gst/gstpipewiresrc.c b/src/gst/gstpipewiresrc.c index 7eeecc768..8bfd799a1 100644 --- a/src/gst/gstpipewiresrc.c +++ b/src/gst/gstpipewiresrc.c @@ -1056,6 +1056,44 @@ wait_started (GstPipeWireSrc *this) return state; } + +static enum pw_stream_state +wait_negotiated (GstPipeWireSrc *this) +{ + enum pw_stream_state state; + const char *error = NULL; + struct timespec abstime; + + pw_thread_loop_get_time (this->stream->core->loop, &abstime, + GST_PIPEWIRE_DEFAULT_TIMEOUT * SPA_NSEC_PER_SEC); + + while (TRUE) { + state = pw_stream_get_state (this->stream->pwstream, &error); + + GST_DEBUG_OBJECT (this, "waiting for NEGOTIATED, now %s", pw_stream_state_as_string (state)); + if (state == PW_STREAM_STATE_ERROR) + break; + if (this->flushing) { + state = PW_STREAM_STATE_ERROR; + break; + } + + if (this->negotiated) + break; + + if (this->autoconnect) { + if (pw_thread_loop_timed_wait_full (this->stream->core->loop, &abstime) < 0) { + state = PW_STREAM_STATE_ERROR; + break; + } + } else { + pw_thread_loop_wait (this->stream->core->loop); + } + } + GST_DEBUG_OBJECT (this, state != PW_STREAM_STATE_ERROR ? "got negotiated signal" : "error during negotiation"); + return state; +} + static gboolean gst_pipewire_src_negotiate (GstBaseSrc * basesrc) { @@ -1067,7 +1105,6 @@ gst_pipewire_src_negotiate (GstBaseSrc * basesrc) g_autoptr (GPtrArray) possible = NULL; gboolean result = FALSE; const char *error = NULL; - struct timespec abstime; uint32_t target_id; /* first see what is possible on our source pad */ @@ -1177,26 +1214,8 @@ gst_pipewire_src_negotiate (GstBaseSrc * basesrc) (const struct spa_pod **)possible->pdata, possible->len); - pw_thread_loop_get_time (pwsrc->stream->core->loop, &abstime, - GST_PIPEWIRE_DEFAULT_TIMEOUT * SPA_NSEC_PER_SEC); - - while (TRUE) { - enum pw_stream_state state = pw_stream_get_state (pwsrc->stream->pwstream, &error); - - GST_DEBUG_OBJECT (basesrc, "waiting for NEGOTIATED, now %s", pw_stream_state_as_string (state)); - if (state == PW_STREAM_STATE_ERROR || pwsrc->flushing) - goto connect_error; - - if (pwsrc->negotiated) - break; - - if (pwsrc->autoconnect) { - if (pw_thread_loop_timed_wait_full (pwsrc->stream->core->loop, &abstime) < 0) - goto connect_error; - } else { - pw_thread_loop_wait (pwsrc->stream->core->loop); - } - } + if (wait_negotiated(pwsrc) == PW_STREAM_STATE_ERROR) + goto connect_error; negotiated_caps = g_steal_pointer (&pwsrc->caps); pw_thread_loop_unlock (pwsrc->stream->core->loop); @@ -1724,12 +1743,21 @@ gst_pipewire_src_change_state (GstElement * element, GstStateChange transition) break; case GST_STATE_CHANGE_PAUSED_TO_PLAYING: /* uncork and start recording */ + GST_DEBUG_OBJECT (this, "activating stream"); + pw_thread_loop_lock (this->stream->core->loop); pw_stream_set_active (this->stream->pwstream, true); + /* if state have been paused for longer time, the underlying node might + * be moved from idle to suspended, which would mean format cleared via + * handle_format_change. Wait for new format to avoid basesrc calling + * create() and get not-negotiated error as response. */ + if (wait_negotiated(this) == PW_STREAM_STATE_ERROR) + goto open_failed; pw_thread_loop_unlock (this->stream->core->loop); break; case GST_STATE_CHANGE_PLAYING_TO_PAUSED: /* stop recording ASAP by corking */ + GST_DEBUG_OBJECT (this, "in-activating stream"); pw_thread_loop_lock (this->stream->core->loop); pw_stream_set_active (this->stream->pwstream, false); pw_thread_loop_unlock (this->stream->core->loop);