From eef960b1ec552e71b8e73623cf559ab5649a73bd Mon Sep 17 00:00:00 2001 From: Sanchayan Maity Date: Wed, 12 Mar 2025 17:34:19 +0530 Subject: [PATCH 01/21] gst: Fix handling of video planar formats Tiled formats are not tested and supported yet. --- src/gst/gstpipewirepool.c | 10 +++++++++- src/gst/gstpipewiresink.c | 21 ++++++++++++++++++--- src/gst/gstpipewiresrc.c | 14 +++++++++++--- 3 files changed, 38 insertions(+), 7 deletions(-) diff --git a/src/gst/gstpipewirepool.c b/src/gst/gstpipewirepool.c index 78869e163..f00612fc8 100644 --- a/src/gst/gstpipewirepool.c +++ b/src/gst/gstpipewirepool.c @@ -94,13 +94,21 @@ void gst_pipewire_pool_wrap_buffer (GstPipeWirePool *pool, struct pw_buffer *b) } if (pool->add_metavideo) { - gst_buffer_add_video_meta_full (buf, GST_VIDEO_FRAME_FLAG_NONE, + GstVideoMeta *meta = gst_buffer_add_video_meta_full (buf, + GST_VIDEO_FRAME_FLAG_NONE, GST_VIDEO_INFO_FORMAT (&pool->video_info), GST_VIDEO_INFO_WIDTH (&pool->video_info), GST_VIDEO_INFO_HEIGHT (&pool->video_info), GST_VIDEO_INFO_N_PLANES (&pool->video_info), pool->video_info.offset, pool->video_info.stride); + + /* + * We need to set the video meta as pooled, else gst_buffer_pool_release_buffer + * will call reset_buffer and the default_reset_buffer implementation for + * GstBufferPool removes all metadata without the POOLED flag. + */ + GST_META_FLAG_SET (meta, GST_META_FLAG_POOLED); } data->pool = gst_object_ref (pool); diff --git a/src/gst/gstpipewiresink.c b/src/gst/gstpipewiresink.c index b79ae6a14..4cfc8bb87 100644 --- a/src/gst/gstpipewiresink.c +++ b/src/gst/gstpipewiresink.c @@ -315,6 +315,10 @@ gst_pipewire_sink_update_params (GstPipeWireSink *sink) spa_pod_builder_add (&b, SPA_PARAM_BUFFERS_size, SPA_POD_CHOICE_RANGE_Int(size, size, INT32_MAX), 0); + /* MUST have n_datas == n_planes */ + spa_pod_builder_add (&b, + SPA_PARAM_BUFFERS_blocks, + SPA_POD_Int(GST_VIDEO_INFO_N_PLANES (&pool->video_info))); spa_pod_builder_add (&b, SPA_PARAM_BUFFERS_stride, SPA_POD_CHOICE_RANGE_Int(0, 0, INT32_MAX), @@ -633,6 +637,9 @@ do_send_buffer (GstPipeWireSink *pwsink, GstBuffer *buffer) } } data->b->size = 0; + + spa_assert(b->n_datas == gst_buffer_n_memory(buffer)); + for (i = 0; i < b->n_datas; i++) { struct spa_data *d = &b->datas[i]; GstMemory *mem = gst_buffer_peek_memory (buffer, i); @@ -646,16 +653,24 @@ do_send_buffer (GstPipeWireSink *pwsink, GstBuffer *buffer) GstVideoMeta *meta = gst_buffer_get_video_meta (buffer); if (meta) { if (meta->n_planes == b->n_datas) { + uint32_t n_planes = GST_VIDEO_INFO_N_PLANES (&data->pool->video_info); + gboolean is_planar = n_planes > 1; gsize video_size = 0; - for (i = 0; i < meta->n_planes; i++) { + + for (i = 0; i < n_planes; i++) { struct spa_data *d = &b->datas[i]; - d->chunk->offset += meta->offset[i] - video_size; + d->chunk->stride = meta->stride[i]; + if (is_planar) + d->chunk->offset = meta->offset[i]; + else + d->chunk->offset += meta->offset[i] - video_size; video_size += d->chunk->size; } } else { - GST_ERROR_OBJECT (pwsink, "plane num not matching, meta:%u buffer:%u", meta->n_planes, b->n_datas); + GST_ERROR_OBJECT (pwsink, "plane num not matching, meta:%u buffer:%u", + meta->n_planes, b->n_datas); } } diff --git a/src/gst/gstpipewiresrc.c b/src/gst/gstpipewiresrc.c index 994534c60..8dc50c246 100644 --- a/src/gst/gstpipewiresrc.c +++ b/src/gst/gstpipewiresrc.c @@ -661,8 +661,12 @@ static GstBuffer *dequeue_buffer(GstPipeWireSrc *pwsrc) } if (pwsrc->is_video) { - gsize video_size = 0; GstVideoInfo *info = &pwsrc->video_info; + uint32_t n_datas = b->buffer->n_datas; + uint32_t n_planes = GST_VIDEO_INFO_N_PLANES (info); + gboolean is_planar = n_planes > 1; + gsize video_size = 0; + GstVideoMeta *meta = gst_buffer_add_video_meta_full (buf, GST_VIDEO_FRAME_FLAG_NONE, GST_VIDEO_INFO_FORMAT (info), GST_VIDEO_INFO_WIDTH (info), @@ -671,15 +675,19 @@ static GstBuffer *dequeue_buffer(GstPipeWireSrc *pwsrc) info->offset, info->stride); - for (i = 0; i < MIN (b->buffer->n_datas, GST_VIDEO_MAX_PLANES); i++) { + for (i = 0; i < MIN (n_datas, n_planes); i++) { struct spa_data *d = &b->buffer->datas[i]; - meta->offset[i] = video_size; + meta->offset[i] = is_planar ? d->chunk->offset : video_size; meta->stride[i] = d->chunk->stride; video_size += d->chunk->size; } } + if (b->buffer->n_datas != gst_buffer_n_memory(data->buf)) { + GST_ERROR_OBJECT(pwsrc, "n_datas != n_memory"); + } + for (i = 0; i < b->buffer->n_datas; i++) { struct spa_data *d = &b->buffer->datas[i]; From b0e0e4c9f4722ece1aa4e474fec40c89be548812 Mon Sep 17 00:00:00 2001 From: Sanchayan Maity Date: Fri, 14 Mar 2025 16:07:28 +0530 Subject: [PATCH 02/21] gst: Do not use video only info for SPA_PARAM_BUFFERS_blocks We mistakenly used video only info for setting SPA_PARAM_BUFFERS_blocks, which would be completely incorrect for audio. Fixes 6c9ada270. --- src/gst/gstpipewiresink.c | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/gst/gstpipewiresink.c b/src/gst/gstpipewiresink.c index 4cfc8bb87..54cc6f6ca 100644 --- a/src/gst/gstpipewiresink.c +++ b/src/gst/gstpipewiresink.c @@ -315,10 +315,12 @@ gst_pipewire_sink_update_params (GstPipeWireSink *sink) spa_pod_builder_add (&b, SPA_PARAM_BUFFERS_size, SPA_POD_CHOICE_RANGE_Int(size, size, INT32_MAX), 0); - /* MUST have n_datas == n_planes */ - spa_pod_builder_add (&b, - SPA_PARAM_BUFFERS_blocks, - SPA_POD_Int(GST_VIDEO_INFO_N_PLANES (&pool->video_info))); + if (sink->is_video) { + /* MUST have n_datas == n_planes */ + spa_pod_builder_add (&b, + SPA_PARAM_BUFFERS_blocks, + SPA_POD_Int(GST_VIDEO_INFO_N_PLANES (&pool->video_info)), 0); + } spa_pod_builder_add (&b, SPA_PARAM_BUFFERS_stride, SPA_POD_CHOICE_RANGE_Int(0, 0, INT32_MAX), From a838af6771c5dd79dd9263d4e907352614ae8552 Mon Sep 17 00:00:00 2001 From: Sanchayan Maity Date: Tue, 25 Mar 2025 16:24:48 +0530 Subject: [PATCH 03/21] gstpipewiresrc: Fix re-linking When using PW source, one might want to dynamically link PW source to a different source. Setting possible_caps to NULL prevents the caps intersect from returning a successful result on format change. Do not set possible_caps to NULL as we get that from peer caps which should stay the same ideally for the duration of pipeline run. That allows re-linking PW source any number of times with a pipeline like below. gst-launch-1.0 pipewiresrc autoconnect=false ! queue ! video/x-raw,format=YUY2 ! videoconvert ! xvimagesink The above pipeline can be made to switch between a camera source and a screen capture source like wf-recorder. Note that this fix only improves the status quo and won't work if the peer caps change due to a re-negotiation. --- src/gst/gstpipewiresrc.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/gst/gstpipewiresrc.c b/src/gst/gstpipewiresrc.c index 8dc50c246..a3018357c 100644 --- a/src/gst/gstpipewiresrc.c +++ b/src/gst/gstpipewiresrc.c @@ -990,7 +990,7 @@ gst_pipewire_src_negotiate (GstBaseSrc * basesrc) GST_DEBUG_OBJECT (basesrc, "connect capture with path %s, target-object %s", pwsrc->stream->path, pwsrc->stream->target_object); - pwsrc->possible_caps = possible_caps; + pwsrc->possible_caps = gst_caps_ref (possible_caps); pwsrc->negotiated = FALSE; enum pw_stream_flags flags; @@ -1027,7 +1027,6 @@ gst_pipewire_src_negotiate (GstBaseSrc * basesrc) } negotiated_caps = g_steal_pointer (&pwsrc->caps); - pwsrc->possible_caps = NULL; pw_thread_loop_unlock (pwsrc->stream->core->loop); if (negotiated_caps == NULL) @@ -1472,6 +1471,7 @@ gst_pipewire_src_stop (GstBaseSrc * basesrc) pwsrc->eos = false; gst_buffer_replace (&pwsrc->last_buffer, NULL); gst_caps_replace(&pwsrc->caps, NULL); + gst_caps_replace(&pwsrc->possible_caps, NULL); pwsrc->transform_value = UINT32_MAX; pw_thread_loop_unlock (pwsrc->stream->core->loop); From ecb2faeef85d01f7d1cec09539652ee59f37837f Mon Sep 17 00:00:00 2001 From: Arun Raghavan Date: Mon, 24 Mar 2025 22:11:36 -0400 Subject: [PATCH 04/21] gst: sink: Don't provide clock in provide mode Counter-intuitive as it seems, when we are driving the clock, we can't also provide a clock from PipeWire to the pipeline -- we need the pipeline to drive the graph. So we make the mode control whether we provide a clock or not. --- src/gst/gstpipewiresink.c | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/gst/gstpipewiresink.c b/src/gst/gstpipewiresink.c index 54cc6f6ca..ebc09ecac 100644 --- a/src/gst/gstpipewiresink.c +++ b/src/gst/gstpipewiresink.c @@ -242,7 +242,8 @@ gst_pipewire_sink_class_init (GstPipeWireSinkClass * klass) GST_TYPE_PIPEWIRE_SINK_MODE, DEFAULT_PROP_MODE, G_PARAM_READWRITE | - G_PARAM_STATIC_STRINGS)); + G_PARAM_STATIC_STRINGS | + GST_PARAM_MUTABLE_READY)); g_object_class_install_property (gobject_class, PROP_FD, @@ -1040,6 +1041,12 @@ gst_pipewire_sink_change_state (GstElement * element, GstStateChange transition) goto open_failed; break; case GST_STATE_CHANGE_READY_TO_PAUSED: + /* If we are a driver, we shouldn't try to also provide the clock, as we + * _are_ the clock for the graph. For that case, we rely on the pipeline + * clock to drive the pipeline (and thus the graph). */ + if (this->mode == GST_PIPEWIRE_SINK_MODE_PROVIDE) + GST_OBJECT_FLAG_UNSET (this, GST_ELEMENT_FLAG_PROVIDE_CLOCK); + /* the initial stream state is active, which is needed for linking and * negotiation to happen and the bufferpool to be set up. We don't know * if we'll go to plaing, so we deactivate the stream until that From fcde0749c4124a50eb1b71bfe2784ad17737b3a9 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Wed, 26 Mar 2025 12:30:41 +0100 Subject: [PATCH 05/21] gst: fix video metadata offsets The offsets in GStreamer are always offsets into the buffer memory where the plane starts so set this to the accumulated plane sizes. --- src/gst/gstpipewiresrc.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/gst/gstpipewiresrc.c b/src/gst/gstpipewiresrc.c index a3018357c..03c622677 100644 --- a/src/gst/gstpipewiresrc.c +++ b/src/gst/gstpipewiresrc.c @@ -664,7 +664,6 @@ static GstBuffer *dequeue_buffer(GstPipeWireSrc *pwsrc) GstVideoInfo *info = &pwsrc->video_info; uint32_t n_datas = b->buffer->n_datas; uint32_t n_planes = GST_VIDEO_INFO_N_PLANES (info); - gboolean is_planar = n_planes > 1; gsize video_size = 0; GstVideoMeta *meta = gst_buffer_add_video_meta_full (buf, GST_VIDEO_FRAME_FLAG_NONE, @@ -677,7 +676,7 @@ static GstBuffer *dequeue_buffer(GstPipeWireSrc *pwsrc) for (i = 0; i < MIN (n_datas, n_planes); i++) { struct spa_data *d = &b->buffer->datas[i]; - meta->offset[i] = is_planar ? d->chunk->offset : video_size; + meta->offset[i] = video_size; meta->stride[i] = d->chunk->stride; video_size += d->chunk->size; From 99330effc9269cc46b4bd09d6e14ea445c9122ec Mon Sep 17 00:00:00 2001 From: Arun Raghavan Date: Wed, 26 Mar 2025 09:51:53 -0400 Subject: [PATCH 06/21] gst: sink: Minor style consistency fixup --- src/gst/gstpipewiresink.c | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/src/gst/gstpipewiresink.c b/src/gst/gstpipewiresink.c index ebc09ecac..c07a1c8e5 100644 --- a/src/gst/gstpipewiresink.c +++ b/src/gst/gstpipewiresink.c @@ -947,15 +947,12 @@ gst_pipewire_sink_render (GstBaseSink * bsink, GstBuffer * buffer) gboolean copied = FALSE; buf_size = 0; // to break from the loop - /* - splitting of buffers in the case of video might break the frame layout - and that seems to be causing issues while retrieving the buffers on the receiver - side. Hence use the video_frame_map to copy the buffer of bigger size into the - pipewirepool's buffer - */ + /* splitting of buffers in the case of video might break the frame layout + * and that seems to be causing issues while retrieving the buffers on the receiver + * side. Hence use the video_frame_map to copy the buffer of bigger size into the + * pipewirepool's buffer */ - if (!gst_video_frame_map (&dst, &pwsink->stream->pool->video_info, b, - GST_MAP_WRITE)) { + if (!gst_video_frame_map (&dst, &pwsink->stream->pool->video_info, b, GST_MAP_WRITE)) { GST_ERROR_OBJECT(pwsink, "Failed to map dest buffer"); return GST_FLOW_ERROR; } From 08cdd63255c1b73346af9b9894d864679cc62df7 Mon Sep 17 00:00:00 2001 From: Arun Raghavan Date: Wed, 26 Mar 2025 11:49:02 -0400 Subject: [PATCH 07/21] gst: sink: Correctly set size and offset on planar data We need to make sure the memory sizes are correctly initialised so the meta makes sense, and we don't copy the meta from the input buffer as that doesn't make sense given we have our own meta already. --- src/gst/gstpipewirepool.c | 12 ++++++++++++ src/gst/gstpipewiresink.c | 8 +------- 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/src/gst/gstpipewirepool.c b/src/gst/gstpipewirepool.c index f00612fc8..6c6e2dcab 100644 --- a/src/gst/gstpipewirepool.c +++ b/src/gst/gstpipewirepool.c @@ -102,6 +102,18 @@ void gst_pipewire_pool_wrap_buffer (GstPipeWirePool *pool, struct pw_buffer *b) GST_VIDEO_INFO_N_PLANES (&pool->video_info), pool->video_info.offset, pool->video_info.stride); + gsize plane_sizes[GST_VIDEO_MAX_PLANES]; + + if (!gst_video_meta_get_plane_size (meta, plane_sizes)) { + GST_ERROR_OBJECT (pool, "could not compute plane sizes"); + } else { + /* Set memory sizes to expected plane sizes, so we know the valid size, + * and the offsets in the meta make sense */ + for (i = 0; i < gst_buffer_n_memory (buf); i++) { + GstMemory *mem = gst_buffer_peek_memory (buf, i); + gst_memory_resize (mem, 0, plane_sizes[i]); + } + } /* * We need to set the video meta as pooled, else gst_buffer_pool_release_buffer diff --git a/src/gst/gstpipewiresink.c b/src/gst/gstpipewiresink.c index c07a1c8e5..2cba09aa7 100644 --- a/src/gst/gstpipewiresink.c +++ b/src/gst/gstpipewiresink.c @@ -657,17 +657,13 @@ do_send_buffer (GstPipeWireSink *pwsink, GstBuffer *buffer) if (meta) { if (meta->n_planes == b->n_datas) { uint32_t n_planes = GST_VIDEO_INFO_N_PLANES (&data->pool->video_info); - gboolean is_planar = n_planes > 1; gsize video_size = 0; for (i = 0; i < n_planes; i++) { struct spa_data *d = &b->datas[i]; d->chunk->stride = meta->stride[i]; - if (is_planar) - d->chunk->offset = meta->offset[i]; - else - d->chunk->offset += meta->offset[i] - video_size; + d->chunk->offset = meta->offset[i] - video_size; video_size += d->chunk->size; } @@ -972,8 +968,6 @@ gst_pipewire_sink_render (GstBaseSink * bsink, GstBuffer * buffer) GST_ERROR_OBJECT(pwsink, "Failed to copy the frame"); return GST_FLOW_ERROR; } - - gst_buffer_copy_into(b, buffer, GST_BUFFER_COPY_METADATA, 0, -1); } else { gst_buffer_map (b, &info, GST_MAP_WRITE); gsize extract_size = (buf_size <= info.maxsize) ? buf_size: info.maxsize; From a3e94b48fabaae7b954ae509e8286c1a476303ea Mon Sep 17 00:00:00 2001 From: Arun Raghavan Date: Wed, 26 Mar 2025 11:55:13 -0400 Subject: [PATCH 08/21] gst: pool: Some refinements to min/max handling A number of changes for correctness. 1) We expose the actualy min and max values we support in the allocation query. 2) We don't support max_buffers as 0, as unlimited buffers is not an option 3) In ParamBuffers, we request the max_buffers from bufferpool config, as we cannot dynamically allocate buffers --- src/gst/gstpipewirepool.c | 10 ++++++++++ src/gst/gstpipewirepool.h | 3 +++ src/gst/gstpipewiresink.c | 17 +++++++++++------ 3 files changed, 24 insertions(+), 6 deletions(-) diff --git a/src/gst/gstpipewirepool.c b/src/gst/gstpipewirepool.c index 6c6e2dcab..ec0b9bc59 100644 --- a/src/gst/gstpipewirepool.c +++ b/src/gst/gstpipewirepool.c @@ -255,6 +255,16 @@ set_config (GstBufferPool * pool, GstStructure * config) return FALSE; } + /* We don't support unlimited buffers */ + if (max_buffers == 0) + max_buffers = PIPEWIRE_POOL_MAX_BUFFERS; + /* Pick a sensible min to avoid starvation */ + if (min_buffers == 0) + min_buffers = PIPEWIRE_POOL_MIN_BUFFERS; + + if (min_buffers < PIPEWIRE_POOL_MIN_BUFFERS || max_buffers > PIPEWIRE_POOL_MAX_BUFFERS) + return FALSE; + structure = gst_caps_get_structure (caps, 0); if (g_str_has_prefix (gst_structure_get_name (structure), "video/") || g_str_has_prefix (gst_structure_get_name (structure), "image/")) { diff --git a/src/gst/gstpipewirepool.h b/src/gst/gstpipewirepool.h index fb00a100c..15b7f0d59 100644 --- a/src/gst/gstpipewirepool.h +++ b/src/gst/gstpipewirepool.h @@ -18,6 +18,9 @@ G_BEGIN_DECLS #define GST_TYPE_PIPEWIRE_POOL (gst_pipewire_pool_get_type()) G_DECLARE_FINAL_TYPE (GstPipeWirePool, gst_pipewire_pool, GST, PIPEWIRE_POOL, GstBufferPool) +#define PIPEWIRE_POOL_MIN_BUFFERS 2u +#define PIPEWIRE_POOL_MAX_BUFFERS 16u + typedef struct _GstPipeWirePoolData GstPipeWirePoolData; struct _GstPipeWirePoolData { GstPipeWirePool *pool; diff --git a/src/gst/gstpipewiresink.c b/src/gst/gstpipewiresink.c index 2cba09aa7..c40fd32ec 100644 --- a/src/gst/gstpipewiresink.c +++ b/src/gst/gstpipewiresink.c @@ -40,8 +40,6 @@ GST_DEBUG_CATEGORY_STATIC (pipewire_sink_debug); #define DEFAULT_PROP_SLAVE_METHOD GST_PIPEWIRE_SINK_SLAVE_METHOD_NONE #define DEFAULT_PROP_USE_BUFFERPOOL USE_BUFFERPOOL_AUTO -#define MIN_BUFFERS 8u - enum { PROP_0, @@ -167,7 +165,8 @@ gst_pipewire_sink_propose_allocation (GstBaseSink * bsink, GstQuery * query) GstPipeWireSink *pwsink = GST_PIPEWIRE_SINK (bsink); if (pwsink->use_bufferpool != USE_BUFFERPOOL_NO) - gst_query_add_allocation_pool (query, GST_BUFFER_POOL_CAST (pwsink->stream->pool), 0, 0, 0); + gst_query_add_allocation_pool (query, GST_BUFFER_POOL_CAST (pwsink->stream->pool), 0, + PIPEWIRE_POOL_MIN_BUFFERS, PIPEWIRE_POOL_MAX_BUFFERS); gst_query_add_allocation_meta (query, GST_VIDEO_META_API_TYPE, NULL); return TRUE; @@ -311,6 +310,12 @@ gst_pipewire_sink_update_params (GstPipeWireSink *sink) config = gst_buffer_pool_get_config (GST_BUFFER_POOL (pool)); gst_buffer_pool_config_get_params (config, &caps, &size, &min_buffers, &max_buffers); + /* We cannot dynamically grow the pool */ + if (max_buffers == 0) { + GST_WARNING_OBJECT (sink, "cannot support unlimited buffers in pool"); + max_buffers = PIPEWIRE_POOL_MAX_BUFFERS; + } + spa_pod_builder_init (&b, buffer, sizeof (buffer)); spa_pod_builder_push_object (&b, &f, SPA_TYPE_OBJECT_ParamBuffers, SPA_PARAM_Buffers); spa_pod_builder_add (&b, @@ -325,10 +330,10 @@ gst_pipewire_sink_update_params (GstPipeWireSink *sink) spa_pod_builder_add (&b, SPA_PARAM_BUFFERS_stride, SPA_POD_CHOICE_RANGE_Int(0, 0, INT32_MAX), + /* At this stage, we will request as many buffers as we _might_ need as + * the default, since we can't grow the pool once this is set */ SPA_PARAM_BUFFERS_buffers, SPA_POD_CHOICE_RANGE_Int( - SPA_MAX(MIN_BUFFERS, min_buffers), - SPA_MAX(MIN_BUFFERS, min_buffers), - max_buffers ? max_buffers : INT32_MAX), + max_buffers, min_buffers, max_buffers), SPA_PARAM_BUFFERS_dataType, SPA_POD_CHOICE_FLAGS_Int( (1< Date: Fri, 28 Mar 2025 16:08:57 +0100 Subject: [PATCH 09/21] improve debug and error reporting a little --- spa/plugins/videoconvert/videoconvert-ffmpeg.c | 4 ++-- src/gst/gstpipewirepool.c | 6 +++++- src/gst/gstpipewiresrc.c | 2 +- 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/spa/plugins/videoconvert/videoconvert-ffmpeg.c b/spa/plugins/videoconvert/videoconvert-ffmpeg.c index 3da0e5677..5d16326e9 100644 --- a/spa/plugins/videoconvert/videoconvert-ffmpeg.c +++ b/spa/plugins/videoconvert/videoconvert-ffmpeg.c @@ -1665,8 +1665,8 @@ impl_node_port_use_buffers(void *object, b->buf = buffers[i]; if (n_datas != port->blocks) { - spa_log_error(this->log, "%p: invalid blocks %d on buffer %d", - this, n_datas, i); + spa_log_error(this->log, "%p: invalid blocks %d on buffer %d, expected %d", + this, n_datas, i, port->blocks); return -EINVAL; } if (SPA_FLAG_IS_SET(flags, SPA_NODE_BUFFERS_FLAG_ALLOC)) { diff --git a/src/gst/gstpipewirepool.c b/src/gst/gstpipewirepool.c index ec0b9bc59..f2a925c94 100644 --- a/src/gst/gstpipewirepool.c +++ b/src/gst/gstpipewirepool.c @@ -62,7 +62,7 @@ void gst_pipewire_pool_wrap_buffer (GstPipeWirePool *pool, struct pw_buffer *b) uint32_t i; GstPipeWirePoolData *data; - GST_DEBUG_OBJECT (pool, "wrap buffer"); + GST_DEBUG_OBJECT (pool, "wrap buffer, datas:%d", b->buffer->n_datas); data = g_slice_new (GstPipeWirePoolData); @@ -89,6 +89,10 @@ void gst_pipewire_pool_wrap_buffer (GstPipeWirePool *pool, struct pw_buffer *b) gmem = gst_memory_new_wrapped (0, d->data, d->maxsize, 0, d->maxsize, NULL, NULL); } + else { + GST_WARNING_OBJECT (pool, "unknown data type (%s %d)", + spa_debug_type_find_short_name(spa_type_data_type, d->type), d->type); + } if (gmem) gst_buffer_insert_memory (buf, i, gmem); } diff --git a/src/gst/gstpipewiresrc.c b/src/gst/gstpipewiresrc.c index 03c622677..2a8f366ca 100644 --- a/src/gst/gstpipewiresrc.c +++ b/src/gst/gstpipewiresrc.c @@ -684,7 +684,7 @@ static GstBuffer *dequeue_buffer(GstPipeWireSrc *pwsrc) } if (b->buffer->n_datas != gst_buffer_n_memory(data->buf)) { - GST_ERROR_OBJECT(pwsrc, "n_datas != n_memory"); + GST_ERROR_OBJECT(pwsrc, "n_datas != n_memory, (%d != %d)", b->buffer->n_datas, gst_buffer_n_memory(data->buf)); } for (i = 0; i < b->buffer->n_datas; i++) { From 6e4ded46f0dfc89c27d9d35363b3f3692f66e981 Mon Sep 17 00:00:00 2001 From: Arun Raghavan Date: Thu, 3 Apr 2025 06:59:31 -0400 Subject: [PATCH 10/21] gst: sink: Set provide clock flag if not in provide mode Handle a theoretical corner case of an element that is first started with mode=provide, and then restarted without mode=provide. --- src/gst/gstpipewiresink.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/gst/gstpipewiresink.c b/src/gst/gstpipewiresink.c index c40fd32ec..ecb2a62ea 100644 --- a/src/gst/gstpipewiresink.c +++ b/src/gst/gstpipewiresink.c @@ -1042,6 +1042,8 @@ gst_pipewire_sink_change_state (GstElement * element, GstStateChange transition) * clock to drive the pipeline (and thus the graph). */ if (this->mode == GST_PIPEWIRE_SINK_MODE_PROVIDE) GST_OBJECT_FLAG_UNSET (this, GST_ELEMENT_FLAG_PROVIDE_CLOCK); + else + GST_OBJECT_FLAG_SET (this, GST_ELEMENT_FLAG_PROVIDE_CLOCK); /* the initial stream state is active, which is needed for linking and * negotiation to happen and the bufferpool to be set up. We don't know From a542aec2df08a7c5dc33dccc1129d9fb1c7b9c1c Mon Sep 17 00:00:00 2001 From: Taruntej Kanakamalla Date: Thu, 20 Mar 2025 21:13:03 -0400 Subject: [PATCH 11/21] gst: sink: Manage buffer pool memory manually Let's make sure we own the memory in buffers, so that we can be resilient to the PW link going away. This currently maintains the status quo of copying data into the pipewirepool for sending to the remote end, but moves the allocation of buffers so that ownership is maintained by the sink in all cases. There are some tricky corners, especially with bufferpool vs. buffers param negotiation -- bufferpool parameters can be negotiated in GStreamer before the link even comes up, so we try to adapt the buffers param to use the negotiated value. For now, that is more brittle than tying those two aspects together. We can revisit this if we can find a way to tie pipeline state and link state more closely. Co-authored-by: Arun Raghavan --- meson.build | 5 ++ src/gst/gstpipewirepool.c | 151 ++++++++++++++++++++++++++++++-------- src/gst/gstpipewirepool.h | 6 ++ src/gst/gstpipewiresink.c | 26 +++++-- 4 files changed, 151 insertions(+), 37 deletions(-) diff --git a/meson.build b/meson.build index bfd47a5bc..271ba391d 100644 --- a/meson.build +++ b/meson.build @@ -410,6 +410,7 @@ gst_deps_def = { gst_dep = [] gst_dma_drm_found = false +gst_shm_allocator_found = false foreach depname, kwargs: gst_deps_def dep = dependency(depname, required: gst_option, kwargs: kwargs) summary({depname: dep.found()}, bool_yn: true, section: 'GStreamer modules') @@ -425,9 +426,13 @@ foreach depname, kwargs: gst_deps_def if depname == 'gstreamer-allocators-1.0' and dep.version().version_compare('>= 1.23.1') gst_dma_drm_found = true + gst_shm_allocator_found = true endif endforeach +summary({'gstreamer SHM allocator': gst_shm_allocator_found}, bool_yn: true, section: 'Backend') +cdata.set('HAVE_GSTREAMER_SHM_ALLOCATOR', gst_shm_allocator_found) + # This code relies on the array being empty if any dependency was not found gst_dp_found = gst_dep.length() > 0 summary({'gstreamer-device-provider': gst_dp_found}, bool_yn: true, section: 'Backend') diff --git a/src/gst/gstpipewirepool.c b/src/gst/gstpipewirepool.c index f2a925c94..3e061a0fd 100644 --- a/src/gst/gstpipewirepool.c +++ b/src/gst/gstpipewirepool.c @@ -10,8 +10,12 @@ #include #include +#ifdef HAVE_GSTREAMER_SHM_ALLOCATOR +#include +#endif #include +#include #include "gstpipewirepool.h" @@ -61,6 +65,8 @@ void gst_pipewire_pool_wrap_buffer (GstPipeWirePool *pool, struct pw_buffer *b) GstBuffer *buf; uint32_t i; GstPipeWirePoolData *data; + /* Default to a large enough value */ + gsize plane_sizes[GST_VIDEO_MAX_PLANES] = { pool->video_info.size, }; GST_DEBUG_OBJECT (pool, "wrap buffer, datas:%d", b->buffer->n_datas); @@ -68,6 +74,30 @@ void gst_pipewire_pool_wrap_buffer (GstPipeWirePool *pool, struct pw_buffer *b) buf = gst_buffer_new (); + if (pool->add_metavideo) { + GstVideoMeta *meta = gst_buffer_add_video_meta_full (buf, + GST_VIDEO_FRAME_FLAG_NONE, + GST_VIDEO_INFO_FORMAT (&pool->video_info), + GST_VIDEO_INFO_WIDTH (&pool->video_info), + GST_VIDEO_INFO_HEIGHT (&pool->video_info), + GST_VIDEO_INFO_N_PLANES (&pool->video_info), + pool->video_info.offset, + pool->video_info.stride); + + gst_video_meta_set_alignment (meta, pool->video_align); + + if (!gst_video_meta_get_plane_size (meta, plane_sizes)) { + GST_ERROR_OBJECT (pool, "could not compute plane sizes"); + } + + /* + * We need to set the video meta as pooled, else gst_buffer_pool_release_buffer + * will call reset_buffer and the default_reset_buffer implementation for + * GstBufferPool removes all metadata without the POOLED flag. + */ + GST_META_FLAG_SET (meta, GST_META_FLAG_POOLED); + } + for (i = 0; i < b->buffer->n_datas; i++) { struct spa_data *d = &b->buffer->datas[i]; GstMemory *gmem = NULL; @@ -75,7 +105,51 @@ void gst_pipewire_pool_wrap_buffer (GstPipeWirePool *pool, struct pw_buffer *b) GST_DEBUG_OBJECT (pool, "wrap data (%s %d) %d %d", spa_debug_type_find_short_name(spa_type_data_type, d->type), d->type, d->mapoffset, d->maxsize); - if (d->type == SPA_DATA_MemFd) { + + if (pool->allocate_memory) { +#ifdef HAVE_GSTREAMER_SHM_ALLOCATOR + gsize block_size = d->maxsize; + + if (pool->has_video) { + /* For video, we know block sizes from the video info already */ + block_size = plane_sizes[i]; + } else { + /* For audio, reserve space based on the quantum limit and channel count */ + g_autoptr (GstPipeWireStream) s = g_weak_ref_get (&pool->stream); + + struct pw_context *context = pw_core_get_context(pw_stream_get_core(s->pwstream)); + const struct pw_properties *props = pw_context_get_properties(context); + uint32_t quantum_limit = 8192; /* "reasonable" default */ + + const char *quantum = spa_dict_lookup(&props->dict, "clock.quantum-limit"); + if (!quantum) { + quantum = spa_dict_lookup(&props->dict, "default.clock.quantum-limit"); + GST_DEBUG_OBJECT (pool, "using default quantum limit %s", quantum); + } + + if (quantum) + spa_atou32(quantum, &quantum_limit, 0); + GST_DEBUG_OBJECT (pool, "quantum limit %s", quantum); + + block_size = quantum_limit * pool->audio_info.bpf; + } + + GST_DEBUG_OBJECT (pool, "setting block size %lu", block_size); + + if (!pool->shm_allocator) + pool->shm_allocator = gst_shm_allocator_get(); + + /* use MemFd only. That is the only supported data type when memory is remote i.e. allocated by the client */ + gmem = gst_allocator_alloc (pool->shm_allocator, block_size, NULL); + d->fd = gst_fd_memory_get_fd (gmem); + d->mapoffset = 0; + d->flags = SPA_DATA_FLAG_READWRITE | SPA_DATA_FLAG_MAPPABLE; + + d->type = SPA_DATA_MemFd; + d->maxsize = block_size; + d->data = NULL; +#endif + } else if (d->type == SPA_DATA_MemFd) { gmem = gst_fd_allocator_alloc (pool->fd_allocator, dup(d->fd), d->mapoffset + d->maxsize, GST_FD_MEMORY_FLAG_NONE); gst_memory_resize (gmem, d->mapoffset, d->maxsize); @@ -97,34 +171,13 @@ void gst_pipewire_pool_wrap_buffer (GstPipeWirePool *pool, struct pw_buffer *b) gst_buffer_insert_memory (buf, i, gmem); } - if (pool->add_metavideo) { - GstVideoMeta *meta = gst_buffer_add_video_meta_full (buf, - GST_VIDEO_FRAME_FLAG_NONE, - GST_VIDEO_INFO_FORMAT (&pool->video_info), - GST_VIDEO_INFO_WIDTH (&pool->video_info), - GST_VIDEO_INFO_HEIGHT (&pool->video_info), - GST_VIDEO_INFO_N_PLANES (&pool->video_info), - pool->video_info.offset, - pool->video_info.stride); - gsize plane_sizes[GST_VIDEO_MAX_PLANES]; - - if (!gst_video_meta_get_plane_size (meta, plane_sizes)) { - GST_ERROR_OBJECT (pool, "could not compute plane sizes"); - } else { + if (pool->add_metavideo && !pool->allocate_memory) { /* Set memory sizes to expected plane sizes, so we know the valid size, * and the offsets in the meta make sense */ for (i = 0; i < gst_buffer_n_memory (buf); i++) { GstMemory *mem = gst_buffer_peek_memory (buf, i); gst_memory_resize (mem, 0, plane_sizes[i]); - } } - - /* - * We need to set the video meta as pooled, else gst_buffer_pool_release_buffer - * will call reset_buffer and the default_reset_buffer implementation for - * GstBufferPool removes all metadata without the POOLED flag. - */ - GST_META_FLAG_SET (meta, GST_META_FLAG_POOLED); } data->pool = gst_object_ref (pool); @@ -157,7 +210,8 @@ void gst_pipewire_pool_remove_buffer (GstPipeWirePool *pool, struct pw_buffer *b data->crop = NULL; data->videotransform = NULL; - gst_buffer_remove_all_memory (data->buf); + if (!pool->allocate_memory) + gst_buffer_remove_all_memory (data->buf); /* this will also destroy the pool data, if this is the last reference */ gst_clear_buffer (&data->buf); @@ -236,7 +290,13 @@ no_more_buffers: static const gchar ** get_options (GstBufferPool * pool) { - static const gchar *options[] = { GST_BUFFER_POOL_OPTION_VIDEO_META, NULL }; + static const gchar *options[] = { + GST_BUFFER_POOL_OPTION_VIDEO_META, +#ifdef HAVE_GSTREAMER_SHM_ALLOCATOR + GST_BUFFER_POOL_OPTION_VIDEO_ALIGNMENT, +#endif + NULL + }; return options; } @@ -247,7 +307,7 @@ set_config (GstBufferPool * pool, GstStructure * config) GstCaps *caps; GstStructure *structure; guint size, min_buffers, max_buffers; - gboolean has_video; + gboolean has_videoalign; if (!gst_buffer_pool_config_get_params (config, &caps, &size, &min_buffers, &max_buffers)) { GST_WARNING_OBJECT (pool, "invalid config"); @@ -272,15 +332,41 @@ set_config (GstBufferPool * pool, GstStructure * config) structure = gst_caps_get_structure (caps, 0); if (g_str_has_prefix (gst_structure_get_name (structure), "video/") || g_str_has_prefix (gst_structure_get_name (structure), "image/")) { - has_video = TRUE; + p->has_video = TRUE; gst_video_info_from_caps (&p->video_info, caps); + +#ifdef HAVE_GSTREAMER_SHM_ALLOCATOR + if (GST_VIDEO_FORMAT_INFO_IS_VALID_RAW (p->video_info.finfo) && + GST_VIDEO_FORMAT_INFO_FORMAT (p->video_info.finfo) != GST_VIDEO_FORMAT_DMA_DRM) { + gst_video_alignment_reset (&p->video_align); + gst_video_info_align (&p->video_info, &p->video_align); + } +#endif + } else if (g_str_has_prefix(gst_structure_get_name(structure), "audio/")) { + p->has_video = FALSE; + gst_audio_info_from_caps(&p->audio_info, caps); } else { - has_video = FALSE; + g_assert_not_reached (); } - p->add_metavideo = has_video && gst_buffer_pool_config_has_option (config, + p->add_metavideo = p->has_video && gst_buffer_pool_config_has_option (config, GST_BUFFER_POOL_OPTION_VIDEO_META); +#ifdef HAVE_GSTREAMER_SHM_ALLOCATOR + has_videoalign = p->has_video && gst_buffer_pool_config_has_option (config, + GST_BUFFER_POOL_OPTION_VIDEO_ALIGNMENT); + + if (has_videoalign) { + gst_buffer_pool_config_get_video_alignment (config, &p->video_align); + gst_video_info_align (&p->video_info, &p->video_align); + gst_buffer_pool_config_set_video_alignment (config, &p->video_align); + + GST_LOG_OBJECT (pool, "Set alignment: %u-%ux%u-%u", + p->video_align.padding_left, p->video_align.padding_right, + p->video_align.padding_top, p->video_align.padding_bottom); + } +#endif + if (p->video_info.size != 0) size = p->video_info.size; @@ -355,6 +441,10 @@ gst_pipewire_pool_finalize (GObject * object) g_weak_ref_set (&pool->stream, NULL); g_object_unref (pool->fd_allocator); g_object_unref (pool->dmabuf_allocator); +#ifdef HAVE_GSTREAMER_SHM_ALLOCATOR + if (pool->shm_allocator) + g_object_unref (pool->shm_allocator); +#endif G_OBJECT_CLASS (gst_pipewire_pool_parent_class)->finalize (object); } @@ -389,5 +479,8 @@ gst_pipewire_pool_init (GstPipeWirePool * pool) { pool->fd_allocator = gst_fd_allocator_new (); pool->dmabuf_allocator = gst_dmabuf_allocator_new (); +#ifdef HAVE_GSTREAMER_SHM_ALLOCATOR + gst_shm_allocator_init_once(); +#endif g_cond_init (&pool->cond); } diff --git a/src/gst/gstpipewirepool.h b/src/gst/gstpipewirepool.h index 15b7f0d59..c62d79e7d 100644 --- a/src/gst/gstpipewirepool.h +++ b/src/gst/gstpipewirepool.h @@ -9,6 +9,7 @@ #include +#include #include #include @@ -40,14 +41,19 @@ struct _GstPipeWirePool { GWeakRef stream; guint n_buffers; + gboolean has_video; gboolean add_metavideo; + GstAudioInfo audio_info; GstVideoInfo video_info; + GstVideoAlignment video_align; GstAllocator *fd_allocator; GstAllocator *dmabuf_allocator; + GstAllocator *shm_allocator; GCond cond; gboolean paused; + gboolean allocate_memory; }; enum GstPipeWirePoolMode { diff --git a/src/gst/gstpipewiresink.c b/src/gst/gstpipewiresink.c index ecb2a62ea..26be00195 100644 --- a/src/gst/gstpipewiresink.c +++ b/src/gst/gstpipewiresink.c @@ -334,9 +334,7 @@ gst_pipewire_sink_update_params (GstPipeWireSink *sink) * the default, since we can't grow the pool once this is set */ SPA_PARAM_BUFFERS_buffers, SPA_POD_CHOICE_RANGE_Int( max_buffers, min_buffers, max_buffers), - SPA_PARAM_BUFFERS_dataType, SPA_POD_CHOICE_FLAGS_Int( - (1<stream->pool, b); if (!gst_pipewire_pool_has_buffers (pwsink->stream->pool) && !GST_BUFFER_POOL_IS_FLUSHING (GST_BUFFER_POOL_CAST (pwsink->stream->pool))) { - GST_ELEMENT_ERROR (pwsink, RESOURCE, NOT_FOUND, - ("all buffers have been removed"), - ("PipeWire link to remote node was destroyed")); + if (pwsink->mode != GST_PIPEWIRE_SINK_MODE_PROVIDE) { + GST_ELEMENT_ERROR (pwsink, RESOURCE, NOT_FOUND, + ("all buffers have been removed"), + ("PipeWire link to remote node was destroyed")); + } } } @@ -807,6 +808,11 @@ gst_pipewire_sink_setcaps (GstBaseSink * bsink, GstCaps * caps) else flags |= PW_STREAM_FLAG_DRIVER; +#ifdef HAVE_GSTREAMER_SHM_ALLOCATOR + flags |= PW_STREAM_FLAG_ALLOC_BUFFERS; + pwsink->stream->pool->allocate_memory = true; +#endif + target_id = pwsink->stream->path ? (uint32_t)atoi(pwsink->stream->path) : PW_ID_ANY; if (pwsink->stream->target_object) { @@ -860,8 +866,12 @@ gst_pipewire_sink_setcaps (GstBaseSink * bsink, GstCaps * caps) config = gst_buffer_pool_get_config (GST_BUFFER_POOL_CAST (pwsink->stream->pool)); gst_buffer_pool_config_get_params (config, NULL, &size, &min_buffers, &max_buffers); gst_buffer_pool_config_set_params (config, caps, size, min_buffers, max_buffers); - if(pwsink->is_video) - gst_buffer_pool_config_add_option(config, GST_BUFFER_POOL_OPTION_VIDEO_META); + if (pwsink->is_video) { + gst_buffer_pool_config_add_option (config, GST_BUFFER_POOL_OPTION_VIDEO_META); +#ifdef HAVE_GSTREAMER_SHM_ALLOCATOR + gst_buffer_pool_config_add_option (config, GST_BUFFER_POOL_OPTION_VIDEO_ALIGNMENT); +#endif + } gst_buffer_pool_set_config (GST_BUFFER_POOL_CAST (pwsink->stream->pool), config); pw_thread_loop_unlock (pwsink->stream->core->loop); From c176d19b3ef023e199306c07a63c99c14a7b15e1 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Wed, 2 Apr 2025 13:25:31 +0200 Subject: [PATCH 12/21] fix printf modifier for gsize Fixes #4641 --- src/gst/gstpipewirepool.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/gst/gstpipewirepool.c b/src/gst/gstpipewirepool.c index 3e061a0fd..3ac1b42d0 100644 --- a/src/gst/gstpipewirepool.c +++ b/src/gst/gstpipewirepool.c @@ -134,7 +134,7 @@ void gst_pipewire_pool_wrap_buffer (GstPipeWirePool *pool, struct pw_buffer *b) block_size = quantum_limit * pool->audio_info.bpf; } - GST_DEBUG_OBJECT (pool, "setting block size %lu", block_size); + GST_DEBUG_OBJECT (pool, "setting block size %zu", block_size); if (!pool->shm_allocator) pool->shm_allocator = gst_shm_allocator_get(); From 2427cf7d6b8cf4fdcc9952e1e6ef2644272c6514 Mon Sep 17 00:00:00 2001 From: Sanchayan Maity Date: Tue, 1 Apr 2025 17:34:15 +0530 Subject: [PATCH 13/21] gstpipewiresrc: Fix re-linking for audio For a pipeline like below, we might want to dynamically switch the audio source. gst-launch-1.0 -e pipewiresrc autoconnect=false ! queue ! audioconvert ! autoaudiosink On switching to a different audio source, any one of driver, quantum or clock rate might change which changes the return `result` value of gst_pipewire_clock_get_internal_time. This can result in the basesrc create function incorrectly waiting in gst_clock_id_wait. We post clock lost message to fix this. In the case of gst-launch, it will set the pipeline to PAUSED and then PLAYING to to force a new clock and a new base_time distribution. Without the clock lost message, the following can be seen before re-linking to a different source 0:00:30.887602864 79499 0x7fffe8000d40 DEBUG GST_CLOCK gstsystemclock.c:1158:gst_system_clock_id_wait_jitter_unlocked: entry 0x7fffd803fad0 time 0:00:17.024565416 now 0:00:17.024109144 diff (time-now) 456272 after re-linking to a different source 0:00:45.790843245 79499 0x7fffe8000d40 DEBUG GST_CLOCK gstsystemclock.c:1158:gst_system_clock_id_wait_jitter_unlocked: entry 0x7fffd803fad0 time 0:00:31.927694059 now 0:00:17.066883864 diff (time-now) 14860810195 With the clock lost message, the following can be seen before re-linking to a different source 0:01:09.336533552 89461 0x7fffe8000d40 DEBUG GST_CLOCK gstsystemclock.c:1158:gst_system_clock_id_wait_jitter_unlocked: entry 0x7fffd803fad0 time 0:00:58.198536772 now 0:00:58.197444926 diff (time-now) 1091846 after re-linking to a different source 0:01:21.659827958 89461 0x7fffe8000d40 DEBUG GST_CLOCK gstsystemclock.c:1158:gst_system_clock_id_wait_jitter_unlocked: entry 0x7fffd803fad0 time 0:28:24.853517646 now 0:28:24.853527204 diff (time-now) -9558 Note the difference in `time` and `now` fields of the above log message. This is easy to reproduce by using a pipewiresink as the audio source with a pipeline like below, as one of the sources during switching. gst-launch-1.0 -e audiotestsrc wave=ticks ! audioconvert ! audio/x-raw,format=F32LE,rate=48000,channels=1 ! pipewiresink stream-properties="props,media.class=Audio/Source,node.description=pwsink" client-name=pwsink Applications need to handle the GST_MESSAGE_CLOCK_LOST message in their bus handlers. --- src/gst/gstpipewiresrc.c | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/src/gst/gstpipewiresrc.c b/src/gst/gstpipewiresrc.c index 2a8f366ca..3f6ef340a 100644 --- a/src/gst/gstpipewiresrc.c +++ b/src/gst/gstpipewiresrc.c @@ -737,13 +737,29 @@ on_state_changed (void *data, enum pw_stream_state state, const char *error) { GstPipeWireSrc *pwsrc = data; + GstState current_state = GST_ELEMENT_CAST (pwsrc)->current_state; GST_DEBUG ("got stream state %s", pw_stream_state_as_string (state)); switch (state) { case PW_STREAM_STATE_UNCONNECTED: case PW_STREAM_STATE_CONNECTING: + break; case PW_STREAM_STATE_PAUSED: + /* + * We may see a driver/quantum/clock rate change on switching audio + * sources. The same is not applicable for video. + * + * We post the clock lost message here to take care of a possible + * jump or shift in base_time/clock for the pipeline. Application + * must handle the clock lost message in it's bus handler by pausing + * the pipeline and then setting it back to playing. + */ + if (current_state == GST_STATE_PLAYING && !pwsrc->is_video) + gst_element_post_message (GST_ELEMENT_CAST (pwsrc), + gst_message_new_clock_lost (GST_OBJECT_CAST (pwsrc), + GST_CLOCK_CAST (pwsrc->stream->clock))); + break; case PW_STREAM_STATE_STREAMING: break; case PW_STREAM_STATE_ERROR: From 63d18833521c695ec4a9415dad7be15ada30a197 Mon Sep 17 00:00:00 2001 From: Sanchayan Maity Date: Mon, 24 Mar 2025 15:50:22 +0530 Subject: [PATCH 14/21] gstpipewiresrc: Handle stream being disconnected When PW source is used with something like Camera and the camera is disconnected, all buffers are removed and stream will be paused. When using PW sink with source, the sink side pipeline can go to EOS. This again results in all the buffers being removed and stream being paused on the source side. PW source side pipeline can also crash if the sink was in the middle of frame copying a buffer to render which got removed. Handle this scenario by sending a flush-start event at the start of buffer removal and flush-stop at the end followed by an end of stream or pipeline error depending on user selection. --- src/gst/gstpipewiresrc.c | 133 ++++++++++++++++++++++++++++++++++++--- src/gst/gstpipewiresrc.h | 22 +++++++ 2 files changed, 147 insertions(+), 8 deletions(-) diff --git a/src/gst/gstpipewiresrc.c b/src/gst/gstpipewiresrc.c index 3f6ef340a..1cfb04db5 100644 --- a/src/gst/gstpipewiresrc.c +++ b/src/gst/gstpipewiresrc.c @@ -46,7 +46,8 @@ GST_DEBUG_CATEGORY_STATIC (pipewire_src_debug); #define DEFAULT_RESEND_LAST false #define DEFAULT_KEEPALIVE_TIME 0 #define DEFAULT_AUTOCONNECT true -#define DEFAULT_USE_BUFFERPOOL USE_BUFFERPOOL_AUTO +#define DEFAULT_USE_BUFFERPOOL USE_BUFFERPOOL_AUTO +#define DEFAULT_ON_DISCONNECT GST_PIPEWIRE_SRC_ON_DISCONNECT_NONE enum { @@ -64,8 +65,28 @@ enum PROP_KEEPALIVE_TIME, PROP_AUTOCONNECT, PROP_USE_BUFFERPOOL, + PROP_ON_DISCONNECT, }; +GType +gst_pipewire_src_on_disconnect_get_type (void) +{ + static gsize on_disconnect_type = 0; + static const GEnumValue on_disconnect[] = { + {GST_PIPEWIRE_SRC_ON_DISCONNECT_NONE, "GST_PIPEWIRE_SRC_ON_DISCONNECT_NONE", "none"}, + {GST_PIPEWIRE_SRC_ON_DISCONNECT_EOS, "GST_PIPEWIRE_SRC_ON_DISCONNECT_EOS", "eos"}, + {GST_PIPEWIRE_SRC_ON_DISCONNECT_ERROR, "GST_PIPEWIRE_SRC_ON_DISCONNECT_ERROR", "error"}, + {0, NULL, NULL}, + }; + + if (g_once_init_enter (&on_disconnect_type)) { + GType tmp = + g_enum_register_static ("GstPipeWireSrcOnDisconnect", on_disconnect); + g_once_init_leave (&on_disconnect_type, tmp); + } + + return (GType) on_disconnect_type; +} static GstStaticPadTemplate gst_pipewire_src_template = GST_STATIC_PAD_TEMPLATE ("src", @@ -170,6 +191,10 @@ gst_pipewire_src_set_property (GObject * object, guint prop_id, pwsrc->use_bufferpool = USE_BUFFERPOOL_NO; break; + case PROP_ON_DISCONNECT: + pwsrc->on_disconnect = g_value_get_enum (value); + break; + default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -235,6 +260,10 @@ gst_pipewire_src_get_property (GObject * object, guint prop_id, g_value_set_boolean (value, !!pwsrc->use_bufferpool); break; + case PROP_ON_DISCONNECT: + g_value_set_enum (value, pwsrc->on_disconnect); + break; + default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -414,6 +443,16 @@ gst_pipewire_src_class_init (GstPipeWireSrcClass * klass) G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, + PROP_ON_DISCONNECT, + g_param_spec_enum ("on-disconnect", + "On disconnect", + "Action to take on disconnect", + GST_TYPE_PIPEWIRE_SRC_ON_DISCONNECT, + DEFAULT_ON_DISCONNECT, + G_PARAM_READWRITE | + G_PARAM_STATIC_STRINGS)); + gstelement_class->provide_clock = gst_pipewire_src_provide_clock; gstelement_class->change_state = gst_pipewire_src_change_state; gstelement_class->send_event = gst_pipewire_src_send_event; @@ -462,6 +501,9 @@ gst_pipewire_src_init (GstPipeWireSrc * src) src->autoconnect = DEFAULT_AUTOCONNECT; src->min_latency = 0; src->max_latency = GST_CLOCK_TIME_NONE; + src->n_buffers = 0; + src->flushing_on_remove_buffer = FALSE; + src->on_disconnect = DEFAULT_ON_DISCONNECT; src->transform_value = UINT32_MAX; } @@ -469,11 +511,26 @@ gst_pipewire_src_init (GstPipeWireSrc * src) static gboolean buffer_recycle (GstMiniObject *obj) { - GstPipeWireSrc *src; - GstPipeWirePoolData *data; + GstPipeWirePoolData *data = gst_pipewire_pool_get_data (GST_BUFFER_CAST(obj)); + GstPipeWireSrc *src = data->owner; int res; - data = gst_pipewire_pool_get_data (GST_BUFFER_CAST(obj)); + if (src->flushing_on_remove_buffer) { + /* + * If a flush-start was initiated, this might be called by elements like + * queues downstream purging buffers from their internal queues. This can + * deadlock if queues use min-threshold-buffers/bytes/time with src_create + * trying to take the loop lock and buffer_recycle trying to take the loop + * lock down below. We return from here, to prevent deadlock with streaming + * thread in a queue thread. + * + * We will take care of queueing the buffer in on_remove_buffer. + */ + GstBuffer *buffer = GST_BUFFER_CAST(obj); + GST_DEBUG_OBJECT (src, + "flush-start initiated, skipping buffer recycle %p", buffer); + return TRUE; + } GST_OBJECT_LOCK (data->pool); if (!obj->dispose) { @@ -482,7 +539,6 @@ buffer_recycle (GstMiniObject *obj) } GST_BUFFER_FLAGS (obj) = data->flags; - src = data->owner; pw_thread_loop_lock (src->stream->core->loop); if (!obj->dispose) { @@ -519,6 +575,8 @@ on_add_buffer (void *_data, struct pw_buffer *b) data->owner = pwsrc; data->queued = TRUE; GST_MINI_OBJECT_CAST (data->buf)->dispose = buffer_recycle; + + pwsrc->n_buffers++; } static void @@ -527,17 +585,76 @@ on_remove_buffer (void *_data, struct pw_buffer *b) GstPipeWireSrc *pwsrc = _data; GstPipeWirePoolData *data = b->user_data; GstBuffer *buf = data->buf; + gboolean flush_on_remove; int res; - GST_DEBUG_OBJECT (pwsrc, "remove buffer %p", buf); + GST_DEBUG_OBJECT (pwsrc, "remove buffer %p, queued: %d", + buf, data->queued); GST_MINI_OBJECT_CAST (buf)->dispose = NULL; + flush_on_remove = + pwsrc->on_disconnect == GST_PIPEWIRE_SRC_ON_DISCONNECT_ERROR || + pwsrc->on_disconnect == GST_PIPEWIRE_SRC_ON_DISCONNECT_EOS; + + if (flush_on_remove && !pwsrc->flushing_on_remove_buffer) { + pwsrc->flushing_on_remove_buffer = TRUE; + + GST_DEBUG_OBJECT (pwsrc, "flush-start on remove buffer"); + /* + * It is possible that when buffers are being removed, a downstream + * element can be holding on to a buffer or in the middle of rendering + * the same. Former is possible with queues min-threshold-buffers or + * similar. Latter can result in a crash during gst_video_frame_copy. + * + * We send a flush-start event downstream to make elements discard + * any buffers they may be holding on to as well as return from their + * chain function ASAP. + */ + gst_pad_push_event (GST_BASE_SRC_PAD (pwsrc), + gst_event_new_flush_start ()); + } + if (data->queued) { gst_buffer_unref (buf); } else { if ((res = pw_stream_queue_buffer (pwsrc->stream->pwstream, b)) < 0) - GST_WARNING_OBJECT (pwsrc, "can't queue removed buffer %p, %s", buf, spa_strerror(res)); + GST_WARNING_OBJECT (pwsrc, "can't queue removed buffer %p, %s", + buf, spa_strerror(res)); + else + GST_DEBUG_OBJECT (pwsrc, "queued buffer %p", buf); + } + + pwsrc->n_buffers--; + + if (pwsrc->n_buffers == 0) { + GST_DEBUG_OBJECT (pwsrc, "removed all buffers"); + + pwsrc->flushing_on_remove_buffer = FALSE; + + switch (pwsrc->on_disconnect) { + case GST_PIPEWIRE_SRC_ON_DISCONNECT_ERROR: + GST_DEBUG_OBJECT (pwsrc, "flush-stop on removing all buffers"); + gst_pad_push_event (GST_BASE_SRC_PAD (pwsrc), + gst_event_new_flush_stop (FALSE)); + + GST_ELEMENT_ERROR (pwsrc, RESOURCE, NOT_FOUND, + ("all buffers have been removed"), + ("PipeWire link to remote node was destroyed")); + break; + case GST_PIPEWIRE_SRC_ON_DISCONNECT_EOS: + GST_DEBUG_OBJECT (pwsrc, "flush-stop on removing all buffers"); + gst_pad_push_event (GST_BASE_SRC_PAD (pwsrc), + gst_event_new_flush_stop (FALSE)); + + GST_DEBUG_OBJECT (pwsrc, "sending eos downstream"); + gst_pad_push_event (GST_BASE_SRC_PAD (pwsrc), + gst_event_new_eos()); + break; + case GST_PIPEWIRE_SRC_ON_DISCONNECT_NONE: + GST_DEBUG_OBJECT (pwsrc, "stream closed or removed"); + break; + } } } @@ -739,7 +856,7 @@ on_state_changed (void *data, GstPipeWireSrc *pwsrc = data; GstState current_state = GST_ELEMENT_CAST (pwsrc)->current_state; - GST_DEBUG ("got stream state %s", pw_stream_state_as_string (state)); + GST_DEBUG_OBJECT (pwsrc, "got stream state %s", pw_stream_state_as_string (state)); switch (state) { case PW_STREAM_STATE_UNCONNECTED: diff --git a/src/gst/gstpipewiresrc.h b/src/gst/gstpipewiresrc.h index d5728cdc9..704ae682c 100644 --- a/src/gst/gstpipewiresrc.h +++ b/src/gst/gstpipewiresrc.h @@ -24,6 +24,22 @@ G_BEGIN_DECLS #define GST_PIPEWIRE_SRC_CAST(obj) ((GstPipeWireSrc *) (obj)) G_DECLARE_FINAL_TYPE (GstPipeWireSrc, gst_pipewire_src, GST, PIPEWIRE_SRC, GstPushSrc) +/** + * GstPipeWireSrcOnDisconnect: + * @GST_PIPEWIRE_SRC_ON_DISCONNECT_EOS: send EoS downstream + * @GST_PIPEWIRE_SRC_ON_DISCONNECT_ERROR: raise pipeline error + * @GST_PIPEWIRE_SRC_ON_DISCONNECT_NONE: no action + * + * Different actions on disconnect. + */ +typedef enum +{ + GST_PIPEWIRE_SRC_ON_DISCONNECT_NONE, + GST_PIPEWIRE_SRC_ON_DISCONNECT_EOS, + GST_PIPEWIRE_SRC_ON_DISCONNECT_ERROR, +} GstPipeWireSrcOnDisconnect; + +#define GST_TYPE_PIPEWIRE_SRC_ON_DISCONNECT (gst_pipewire_src_on_disconnect_get_type ()) /** * GstPipeWireSrc: @@ -36,6 +52,7 @@ struct _GstPipeWireSrc { GstPipeWireStream *stream; /*< private >*/ + gint n_buffers; gint use_bufferpool; gint min_buffers; gint max_buffers; @@ -56,6 +73,7 @@ struct _GstPipeWireSrc { gboolean flushing; gboolean started; gboolean eos; + gboolean flushing_on_remove_buffer; gboolean is_live; int64_t delay; @@ -65,8 +83,12 @@ struct _GstPipeWireSrc { GstBuffer *last_buffer; enum spa_meta_videotransform_value transform_value; + + GstPipeWireSrcOnDisconnect on_disconnect; }; +GType gst_pipewire_src_on_stream_disconnect_get_type (void); + G_END_DECLS #endif /* __GST_PIPEWIRE_SRC_H__ */ From 31cc4e9838ce8b60a8301624d873dac4e51e4faa Mon Sep 17 00:00:00 2001 From: Arun Raghavan Date: Wed, 16 Apr 2025 19:47:40 -0400 Subject: [PATCH 15/21] gst: handle blocks and size allocation for encoded format In case of encoded video we get n_planes as 0 from the video info so passing that as n_datas is failing during the buffer negotiation. Make sure to use an appropriate value based on whether we have raw video or not. Co-authored-by: Taruntej Kanakamalla --- src/gst/gstpipewirepool.c | 22 +++++++++++++++++----- src/gst/gstpipewirepool.h | 7 +++++++ src/gst/gstpipewiresink.c | 35 +++++++++++++++++++++++++++-------- src/gst/gstpipewiresink.h | 2 +- src/gst/gstpipewiresrc.c | 13 ++++++++++++- src/gst/gstpipewiresrc.h | 1 + 6 files changed, 65 insertions(+), 15 deletions(-) diff --git a/src/gst/gstpipewirepool.c b/src/gst/gstpipewirepool.c index 3ac1b42d0..f82dfbf46 100644 --- a/src/gst/gstpipewirepool.c +++ b/src/gst/gstpipewirepool.c @@ -66,7 +66,10 @@ void gst_pipewire_pool_wrap_buffer (GstPipeWirePool *pool, struct pw_buffer *b) uint32_t i; GstPipeWirePoolData *data; /* Default to a large enough value */ - gsize plane_sizes[GST_VIDEO_MAX_PLANES] = { pool->video_info.size, }; + gsize plane_0_size = pool->has_rawvideo ? + pool->video_info.size : + (gsize) pool->video_info.width * pool->video_info.height; + gsize plane_sizes[GST_VIDEO_MAX_PLANES] = { plane_0_size, }; GST_DEBUG_OBJECT (pool, "wrap buffer, datas:%d", b->buffer->n_datas); @@ -333,11 +336,20 @@ set_config (GstBufferPool * pool, GstStructure * config) if (g_str_has_prefix (gst_structure_get_name (structure), "video/") || g_str_has_prefix (gst_structure_get_name (structure), "image/")) { p->has_video = TRUE; + gst_video_info_from_caps (&p->video_info, caps); + if (GST_VIDEO_FORMAT_INFO_IS_VALID_RAW (p->video_info.finfo) +#ifdef HAVE_GSTREAMER_DMA_DRM + && GST_VIDEO_FORMAT_INFO_FORMAT (p->video_info.finfo) != GST_VIDEO_FORMAT_DMA_DRM +#endif + ) + p->has_rawvideo = TRUE; + else + p->has_rawvideo = FALSE; + #ifdef HAVE_GSTREAMER_SHM_ALLOCATOR - if (GST_VIDEO_FORMAT_INFO_IS_VALID_RAW (p->video_info.finfo) && - GST_VIDEO_FORMAT_INFO_FORMAT (p->video_info.finfo) != GST_VIDEO_FORMAT_DMA_DRM) { + if (p->has_rawvideo) { gst_video_alignment_reset (&p->video_align); gst_video_info_align (&p->video_info, &p->video_align); } @@ -349,11 +361,11 @@ set_config (GstBufferPool * pool, GstStructure * config) g_assert_not_reached (); } - p->add_metavideo = p->has_video && gst_buffer_pool_config_has_option (config, + p->add_metavideo = p->has_rawvideo && gst_buffer_pool_config_has_option (config, GST_BUFFER_POOL_OPTION_VIDEO_META); #ifdef HAVE_GSTREAMER_SHM_ALLOCATOR - has_videoalign = p->has_video && gst_buffer_pool_config_has_option (config, + has_videoalign = p->has_rawvideo && gst_buffer_pool_config_has_option (config, GST_BUFFER_POOL_OPTION_VIDEO_ALIGNMENT); if (has_videoalign) { diff --git a/src/gst/gstpipewirepool.h b/src/gst/gstpipewirepool.h index c62d79e7d..f71344354 100644 --- a/src/gst/gstpipewirepool.h +++ b/src/gst/gstpipewirepool.h @@ -22,6 +22,12 @@ G_DECLARE_FINAL_TYPE (GstPipeWirePool, gst_pipewire_pool, GST, PIPEWIRE_POOL, Gs #define PIPEWIRE_POOL_MIN_BUFFERS 2u #define PIPEWIRE_POOL_MAX_BUFFERS 16u +/* Only available in GStreamer 1.22+ */ +#ifndef GST_VIDEO_FORMAT_INFO_IS_VALID_RAW +#define GST_VIDEO_FORMAT_INFO_IS_VALID_RAW(info) \ + (info != NULL && (info)->format > GST_VIDEO_FORMAT_ENCODED) +#endif + typedef struct _GstPipeWirePoolData GstPipeWirePoolData; struct _GstPipeWirePoolData { GstPipeWirePool *pool; @@ -42,6 +48,7 @@ struct _GstPipeWirePool { guint n_buffers; gboolean has_video; + gboolean has_rawvideo; gboolean add_metavideo; GstAudioInfo audio_info; GstVideoInfo video_info; diff --git a/src/gst/gstpipewiresink.c b/src/gst/gstpipewiresink.c index 26be00195..79b341e2c 100644 --- a/src/gst/gstpipewiresink.c +++ b/src/gst/gstpipewiresink.c @@ -321,11 +321,18 @@ gst_pipewire_sink_update_params (GstPipeWireSink *sink) spa_pod_builder_add (&b, SPA_PARAM_BUFFERS_size, SPA_POD_CHOICE_RANGE_Int(size, size, INT32_MAX), 0); - if (sink->is_video) { + if (sink->is_rawvideo) { /* MUST have n_datas == n_planes */ spa_pod_builder_add (&b, SPA_PARAM_BUFFERS_blocks, - SPA_POD_Int(GST_VIDEO_INFO_N_PLANES (&pool->video_info)), 0); + SPA_POD_Int(GST_VIDEO_INFO_N_PLANES (&pool->video_info)), + 0); + } else { + /* Non-planar data, get a single block */ + spa_pod_builder_add (&b, + SPA_PARAM_BUFFERS_blocks, + SPA_POD_Int(1), + 0); } spa_pod_builder_add (&b, @@ -343,7 +350,7 @@ gst_pipewire_sink_update_params (GstPipeWireSink *sink) SPA_PARAM_META_type, SPA_POD_Id(SPA_META_Header), SPA_PARAM_META_size, SPA_POD_Int(sizeof (struct spa_meta_header))); - if (sink->is_video) { + if (sink->is_rawvideo) { port_params[n_params++] = spa_pod_builder_add_object (&b, SPA_TYPE_OBJECT_ParamMeta, SPA_PARAM_Meta, SPA_PARAM_META_type, SPA_POD_Id(SPA_META_VideoCrop), @@ -369,7 +376,7 @@ gst_pipewire_sink_init (GstPipeWireSink * sink) sink->mode = DEFAULT_PROP_MODE; sink->use_bufferpool = DEFAULT_PROP_USE_BUFFERPOOL; - sink->is_video = false; + sink->is_rawvideo = false; GST_OBJECT_FLAG_SET (sink, GST_ELEMENT_FLAG_PROVIDE_CLOCK); @@ -387,7 +394,7 @@ gst_pipewire_sink_sink_fixate (GstBaseSink * bsink, GstCaps * caps) structure = gst_caps_get_structure (caps, 0); if (gst_structure_has_name (structure, "video/x-raw")) { - pwsink->is_video = true; + pwsink->is_rawvideo = true; gst_structure_fixate_field_nearest_int (structure, "width", 320); gst_structure_fixate_field_nearest_int (structure, "height", 240); gst_structure_fixate_field_nearest_fraction (structure, "framerate", 30, 1); @@ -779,9 +786,21 @@ gst_pipewire_sink_setcaps (GstBaseSink * bsink, GstCaps * caps) if (pwsink->use_bufferpool != USE_BUFFERPOOL_YES) pwsink->use_bufferpool = USE_BUFFERPOOL_NO; } else { + GstVideoInfo video_info; + pwsink->rate = rate = 0; pwsink->rate_match = false; - pwsink->is_video = true; + + gst_video_info_from_caps (&video_info, caps); + + if (GST_VIDEO_FORMAT_INFO_IS_VALID_RAW (video_info.finfo) +#ifdef HAVE_GSTREAMER_DMA_DRM + && GST_VIDEO_FORMAT_INFO_FORMAT (video_info.finfo) != GST_VIDEO_FORMAT_DMA_DRM +#endif + ) + pwsink->is_rawvideo = TRUE; + else + pwsink->is_rawvideo = FALSE; } spa_dll_set_bw(&pwsink->stream->dll, SPA_DLL_BW_MIN, 4096, rate); @@ -866,7 +885,7 @@ gst_pipewire_sink_setcaps (GstBaseSink * bsink, GstCaps * caps) config = gst_buffer_pool_get_config (GST_BUFFER_POOL_CAST (pwsink->stream->pool)); gst_buffer_pool_config_get_params (config, NULL, &size, &min_buffers, &max_buffers); gst_buffer_pool_config_set_params (config, caps, size, min_buffers, max_buffers); - if (pwsink->is_video) { + if (pwsink->is_rawvideo) { gst_buffer_pool_config_add_option (config, GST_BUFFER_POOL_OPTION_VIDEO_META); #ifdef HAVE_GSTREAMER_SHM_ALLOCATOR gst_buffer_pool_config_add_option (config, GST_BUFFER_POOL_OPTION_VIDEO_ALIGNMENT); @@ -953,7 +972,7 @@ gst_pipewire_sink_render (GstBaseSink * bsink, GstBuffer * buffer) if (res != GST_FLOW_OK) goto done; - if (pwsink->is_video) { + if (pwsink->is_rawvideo) { GstVideoFrame src, dst; gboolean copied = FALSE; buf_size = 0; // to break from the loop diff --git a/src/gst/gstpipewiresink.h b/src/gst/gstpipewiresink.h index 60eb3b79f..f4a961f9a 100644 --- a/src/gst/gstpipewiresink.h +++ b/src/gst/gstpipewiresink.h @@ -69,7 +69,7 @@ struct _GstPipeWireSink { gboolean negotiated; gboolean rate_match; gint rate; - gboolean is_video; + gboolean is_rawvideo; GstPipeWireSinkMode mode; GstPipeWireSinkSlaveMethod slave_method; diff --git a/src/gst/gstpipewiresrc.c b/src/gst/gstpipewiresrc.c index 1cfb04db5..945ee1b7f 100644 --- a/src/gst/gstpipewiresrc.c +++ b/src/gst/gstpipewiresrc.c @@ -777,7 +777,7 @@ static GstBuffer *dequeue_buffer(GstPipeWireSrc *pwsrc) pwsrc->transform_value = transform_value; } - if (pwsrc->is_video) { + if (pwsrc->is_rawvideo) { GstVideoInfo *info = &pwsrc->video_info; uint32_t n_datas = b->buffer->n_datas; uint32_t n_planes = GST_VIDEO_INFO_N_PLANES (info); @@ -1283,6 +1283,16 @@ handle_format_change (GstPipeWireSrc *pwsrc, gst_video_info_dma_drm_init (&pwsrc->drm_info); #endif gst_video_info_from_caps (&pwsrc->video_info, pwsrc->caps); + + if (GST_VIDEO_FORMAT_INFO_IS_VALID_RAW (pwsrc->video_info.finfo) +#ifdef HAVE_GSTREAMER_DMA_DRM + && GST_VIDEO_FORMAT_INFO_FORMAT (pwsrc->video_info.finfo) != GST_VIDEO_FORMAT_DMA_DRM +#endif + ) + pwsrc->is_rawvideo = TRUE; + else + pwsrc->is_rawvideo = FALSE; + #ifdef HAVE_GSTREAMER_DMA_DRM } #endif @@ -1295,6 +1305,7 @@ handle_format_change (GstPipeWireSrc *pwsrc, } else { pwsrc->negotiated = FALSE; pwsrc->is_video = FALSE; + pwsrc->is_rawvideo = FALSE; } if (pwsrc->caps) { diff --git a/src/gst/gstpipewiresrc.h b/src/gst/gstpipewiresrc.h index 704ae682c..3ac88b10b 100644 --- a/src/gst/gstpipewiresrc.h +++ b/src/gst/gstpipewiresrc.h @@ -64,6 +64,7 @@ struct _GstPipeWireSrc { GstCaps *possible_caps; gboolean is_video; + gboolean is_rawvideo; GstVideoInfo video_info; #ifdef HAVE_GSTREAMER_DMA_DRM GstVideoInfoDmaDrm drm_info; From 04f7279996cfba5d4ce25c83890eb34d5e511fe2 Mon Sep 17 00:00:00 2001 From: Philippe Normand Date: Tue, 22 Apr 2025 10:48:00 +0100 Subject: [PATCH 16/21] gst: src: Attach video meta when receiving DMABufs from PipeWire Fixes getDisplayMedia() in WebKitGTK after regression introduced by: https://gitlab.freedesktop.org/pipewire/pipewire/-/merge_requests/2330 --- src/gst/gstpipewiresrc.c | 1 + 1 file changed, 1 insertion(+) diff --git a/src/gst/gstpipewiresrc.c b/src/gst/gstpipewiresrc.c index 945ee1b7f..81942d332 100644 --- a/src/gst/gstpipewiresrc.c +++ b/src/gst/gstpipewiresrc.c @@ -1279,6 +1279,7 @@ handle_format_change (GstPipeWireSrc *pwsrc, pw_stream_set_error (pwsrc->stream->pwstream, -EINVAL, "internal error"); return; } + pwsrc->is_rawvideo = TRUE; } else { gst_video_info_dma_drm_init (&pwsrc->drm_info); #endif From cbaef456d8e0e72b25d30b43a0e75d94dca25686 Mon Sep 17 00:00:00 2001 From: Taruntej Kanakamalla Date: Fri, 18 Apr 2025 23:01:56 +0530 Subject: [PATCH 17/21] gst: sink: update clock before every trigger process Get the clock pointer using the io_changed stream event. and update the clock before triggering the process The clock needs to be updated in the data loop thread and before triggering the process so move the calls to `pw_stream_trigger_process` from gstreamer thread context into the data loop thread context by invoking a callback and update the clock inside the data loop callback before the trigger --- src/gst/gstpipewiresink.c | 98 +++++++++++++++++++++++++++++++++++-- src/gst/gstpipewiresink.h | 2 + src/gst/gstpipewirestream.h | 3 ++ 3 files changed, 99 insertions(+), 4 deletions(-) diff --git a/src/gst/gstpipewiresink.c b/src/gst/gstpipewiresink.c index 79b341e2c..ac8985c29 100644 --- a/src/gst/gstpipewiresink.c +++ b/src/gst/gstpipewiresink.c @@ -40,6 +40,9 @@ GST_DEBUG_CATEGORY_STATIC (pipewire_sink_debug); #define DEFAULT_PROP_SLAVE_METHOD GST_PIPEWIRE_SINK_SLAVE_METHOD_NONE #define DEFAULT_PROP_USE_BUFFERPOOL USE_BUFFERPOOL_AUTO +#define MAX_ERROR_MS 1 +#define RESYNC_TIMEOUT_MS 10 + enum { PROP_0, @@ -377,6 +380,7 @@ gst_pipewire_sink_init (GstPipeWireSink * sink) sink->mode = DEFAULT_PROP_MODE; sink->use_bufferpool = DEFAULT_PROP_USE_BUFFERPOOL; sink->is_rawvideo = false; + sink->first_buffer = true; GST_OBJECT_FLAG_SET (sink, GST_ELEMENT_FLAG_PROVIDE_CLOCK); @@ -690,7 +694,15 @@ do_send_buffer (GstPipeWireSink *pwsink, GstBuffer *buffer) GST_WARNING_OBJECT (pwsink, "can't send buffer %s", spa_strerror(res)); } else { data->queued = TRUE; - GST_LOG_OBJECT(pwsink, "queued pwbuffer: %p; gstbuffer %p ",data->b, buffer); + GST_LOG_OBJECT(pwsink, "queued pwbuffer: %p size: %"PRIu64"; gstbuffer %p ",data->b, data->b->size, buffer); + if (pwsink->first_buffer) { + pwsink->first_buffer = false; + pwsink->first_buffer_pts = GST_BUFFER_PTS(buffer); + } + stream->position = gst_util_uint64_scale_int(GST_BUFFER_PTS(buffer) - pwsink->first_buffer_pts, pwsink->rate, 1 * GST_SECOND); + + // have the buffer duration value minimum as 1, in case of video where rate is 0 (not applicable) + stream->buf_duration = SPA_MAX((uint64_t)1, gst_util_uint64_scale_int(GST_BUFFER_DURATION(buffer), pwsink->rate, 1 * GST_SECOND)); } switch (pwsink->slave_method) { @@ -702,6 +714,52 @@ do_send_buffer (GstPipeWireSink *pwsink, GstBuffer *buffer) } } +static void update_time (GstPipeWireSink *pwsink) +{ + struct spa_io_position *p = pwsink->stream->io_position; + double err = 0.0, corr = 1.0; + guint64 now; + double_t max_err = pwsink->rate * MAX_ERROR_MS/1000; + double_t resync_timeout = pwsink->rate * RESYNC_TIMEOUT_MS/1000; + + if (pwsink->first_buffer) { + // use the target duration before the first buffer + pwsink->stream->buf_duration = p->clock.target_duration; + spa_dll_set_bw(&pwsink->stream->dll, SPA_DLL_BW_MIN, pwsink->stream->buf_duration, pwsink->rate); + } + + now = pw_stream_get_nsec(pwsink->stream->pwstream); + err = (double)gst_util_uint64_scale(now, pwsink->rate, 1 *GST_SECOND) - + (double)gst_util_uint64_scale(p->clock.next_nsec, pwsink->rate, 1 *GST_SECOND); + + GST_LOG_OBJECT(pwsink, "err is %f max err is %f now %lu next is %lu", err, max_err, now, p->clock.next_nsec); + + if (fabs(err) > max_err) { + if (fabs(err) > resync_timeout) { + GST_WARNING_OBJECT(pwsink, "err %f exceeds resync timeout, resetting", err); + spa_dll_set_bw(&pwsink->stream->dll, SPA_DLL_BW_MIN, pwsink->stream->buf_duration, pwsink->rate); + err = 0.0; + } else { + err = SPA_CLAMPD(err, -max_err, max_err); + } + } + corr = spa_dll_update(&pwsink->stream->dll, err); + + p->clock.nsec = now; + p->clock.position = pwsink->stream->position; + p->clock.duration = pwsink->stream->buf_duration; + /* we don't have a way to estimate the target (next cycle) buffer duration + * so use the current buffer duration + */ + p->clock.target_duration = pwsink->stream->buf_duration; + p->clock.rate = SPA_FRACTION(1, pwsink->rate); + // current time plus duration scaled with correlation + p->clock.next_nsec = now + (uint64_t)(p->clock.duration / corr * GST_SECOND / pwsink->rate); + p->clock.rate_diff = corr; + + GST_DEBUG_OBJECT(pwsink, "now %lu, position %lu, duration %lu, rate :%d, next : %lu, delay is %ld, rate_diff is %f", + p->clock.nsec, p->clock.position, p->clock.duration, pwsink->rate, p->clock.next_nsec, p->clock.delay,p->clock.rate_diff); +} static void on_process (void *data) @@ -711,6 +769,24 @@ on_process (void *data) g_cond_signal (&pwsink->stream->pool->cond); } +static int invoke_trigger_process(struct spa_loop *loop, + bool async, uint32_t seq, const void *data, size_t size, void *user_data) +{ + + GstPipeWireSink *pwsink = user_data; + + + /* Note: We cannot use the rate for computation of other clock params + * in case of video because the rate for video is set as 0 in the _setcaps. + * So skip update time for video (i.e. when rate is 0). The video buffers + * get timestamp from the SPA_META_Header anyway + */ + + if (pwsink->rate) + update_time(pwsink); + return pw_stream_trigger_process(pwsink->stream->pwstream); +} + static void on_state_changed (void *data, enum pw_stream_state old, enum pw_stream_state state, const char *error) { @@ -726,7 +802,7 @@ on_state_changed (void *data, enum pw_stream_state old, enum pw_stream_state sta break; case PW_STREAM_STATE_STREAMING: if (pw_stream_is_driving (pwsink->stream->pwstream)) - pw_stream_trigger_process (pwsink->stream->pwstream); + pw_loop_invoke(pw_stream_get_data_loop(pwsink->stream->pwstream), invoke_trigger_process, 1, NULL, 0 , false, pwsink); break; case PW_STREAM_STATE_ERROR: /* make the error permanent, if it is not already; @@ -1023,7 +1099,7 @@ gst_pipewire_sink_render (GstBaseSink * bsink, GstBuffer * buffer) gst_buffer_unref (b); if (pw_stream_is_driving (pwsink->stream->pwstream)) - pw_stream_trigger_process (pwsink->stream->pwstream); + pw_loop_invoke(pw_stream_get_data_loop(pwsink->stream->pwstream), invoke_trigger_process, 1, NULL, 0 , false, pwsink); } } else { GST_TRACE_OBJECT(pwsink, "Buffer is from pipewirepool"); @@ -1031,7 +1107,7 @@ gst_pipewire_sink_render (GstBaseSink * bsink, GstBuffer * buffer) do_send_buffer (pwsink, buffer); if (pw_stream_is_driving (pwsink->stream->pwstream)) - pw_stream_trigger_process (pwsink->stream->pwstream); + pw_loop_invoke(pw_stream_get_data_loop(pwsink->stream->pwstream), invoke_trigger_process, 1, NULL, 0 , false, pwsink); } done_unlock: @@ -1045,6 +1121,19 @@ not_negotiated: } } +static void +on_io_changed(void *data, uint32_t id, void *area, uint32_t size) +{ + GstPipeWireSink *pwsink = data; + + switch (id) { + case SPA_IO_Position: + GST_DEBUG_OBJECT(pwsink, "got io position %p", area); + pwsink->stream->io_position = area; + break; + } +} + static const struct pw_stream_events stream_events = { PW_VERSION_STREAM_EVENTS, .state_changed = on_state_changed, @@ -1052,6 +1141,7 @@ static const struct pw_stream_events stream_events = { .add_buffer = on_add_buffer, .remove_buffer = on_remove_buffer, .process = on_process, + .io_changed = on_io_changed, }; static GstStateChangeReturn diff --git a/src/gst/gstpipewiresink.h b/src/gst/gstpipewiresink.h index f4a961f9a..5816f7a15 100644 --- a/src/gst/gstpipewiresink.h +++ b/src/gst/gstpipewiresink.h @@ -70,6 +70,8 @@ struct _GstPipeWireSink { gboolean rate_match; gint rate; gboolean is_rawvideo; + gboolean first_buffer; + GstClockTime first_buffer_pts; GstPipeWireSinkMode mode; GstPipeWireSinkSlaveMethod slave_method; diff --git a/src/gst/gstpipewirestream.h b/src/gst/gstpipewirestream.h index a301375c7..23dc996a9 100644 --- a/src/gst/gstpipewirestream.h +++ b/src/gst/gstpipewirestream.h @@ -31,6 +31,7 @@ struct _GstPipeWireStream { GstClock *clock; guint64 position; + guint64 buf_duration; struct spa_dll dll; double err_avg, err_var, err_wdw; guint64 last_ts; @@ -41,6 +42,8 @@ struct _GstPipeWireStream { struct pw_stream *pwstream; struct spa_hook pwstream_listener; + struct spa_io_position *io_position; + /* common properties */ int fd; gchar *path; From d7421ecb75ccdfbe8a2353937a30a6de2c856fbd Mon Sep 17 00:00:00 2001 From: Taruntej Kanakamalla Date: Thu, 8 May 2025 21:16:48 +0530 Subject: [PATCH 18/21] gst: sink: minor formatting fixes follow up of !2337 --- src/gst/gstpipewiresink.c | 42 +++++++++++++++++++++++---------------- 1 file changed, 25 insertions(+), 17 deletions(-) diff --git a/src/gst/gstpipewiresink.c b/src/gst/gstpipewiresink.c index ac8985c29..9a9efde8c 100644 --- a/src/gst/gstpipewiresink.c +++ b/src/gst/gstpipewiresink.c @@ -40,8 +40,8 @@ GST_DEBUG_CATEGORY_STATIC (pipewire_sink_debug); #define DEFAULT_PROP_SLAVE_METHOD GST_PIPEWIRE_SINK_SLAVE_METHOD_NONE #define DEFAULT_PROP_USE_BUFFERPOOL USE_BUFFERPOOL_AUTO -#define MAX_ERROR_MS 1 -#define RESYNC_TIMEOUT_MS 10 +#define MAX_ERROR_MS 1 +#define RESYNC_TIMEOUT_MS 10 enum { @@ -694,15 +694,18 @@ do_send_buffer (GstPipeWireSink *pwsink, GstBuffer *buffer) GST_WARNING_OBJECT (pwsink, "can't send buffer %s", spa_strerror(res)); } else { data->queued = TRUE; - GST_LOG_OBJECT(pwsink, "queued pwbuffer: %p size: %"PRIu64"; gstbuffer %p ",data->b, data->b->size, buffer); + GST_LOG_OBJECT(pwsink, "queued pwbuffer: %p size: %"PRIu64"; gstbuffer %p", + data->b, data->b->size, buffer); if (pwsink->first_buffer) { pwsink->first_buffer = false; pwsink->first_buffer_pts = GST_BUFFER_PTS(buffer); } - stream->position = gst_util_uint64_scale_int(GST_BUFFER_PTS(buffer) - pwsink->first_buffer_pts, pwsink->rate, 1 * GST_SECOND); + stream->position = gst_util_uint64_scale_int(GST_BUFFER_PTS(buffer) - pwsink->first_buffer_pts, + pwsink->rate, 1 * GST_SECOND); // have the buffer duration value minimum as 1, in case of video where rate is 0 (not applicable) - stream->buf_duration = SPA_MAX((uint64_t)1, gst_util_uint64_scale_int(GST_BUFFER_DURATION(buffer), pwsink->rate, 1 * GST_SECOND)); + stream->buf_duration = SPA_MAX((uint64_t)1, gst_util_uint64_scale_int(GST_BUFFER_DURATION(buffer), + pwsink->rate, 1 * GST_SECOND)); } switch (pwsink->slave_method) { @@ -725,19 +728,22 @@ static void update_time (GstPipeWireSink *pwsink) if (pwsink->first_buffer) { // use the target duration before the first buffer pwsink->stream->buf_duration = p->clock.target_duration; - spa_dll_set_bw(&pwsink->stream->dll, SPA_DLL_BW_MIN, pwsink->stream->buf_duration, pwsink->rate); + spa_dll_set_bw(&pwsink->stream->dll, SPA_DLL_BW_MIN, pwsink->stream->buf_duration, + pwsink->rate); } now = pw_stream_get_nsec(pwsink->stream->pwstream); - err = (double)gst_util_uint64_scale(now, pwsink->rate, 1 *GST_SECOND) - - (double)gst_util_uint64_scale(p->clock.next_nsec, pwsink->rate, 1 *GST_SECOND); + err = (double)gst_util_uint64_scale(now, pwsink->rate, 1 * GST_SECOND) - + (double)gst_util_uint64_scale(p->clock.next_nsec, pwsink->rate, 1 * GST_SECOND); - GST_LOG_OBJECT(pwsink, "err is %f max err is %f now %lu next is %lu", err, max_err, now, p->clock.next_nsec); + GST_LOG_OBJECT(pwsink, "err is %f max err is %f now %"PRIu64" next is %"PRIu64"", err, max_err, now, + p->clock.next_nsec); if (fabs(err) > max_err) { if (fabs(err) > resync_timeout) { GST_WARNING_OBJECT(pwsink, "err %f exceeds resync timeout, resetting", err); - spa_dll_set_bw(&pwsink->stream->dll, SPA_DLL_BW_MIN, pwsink->stream->buf_duration, pwsink->rate); + spa_dll_set_bw(&pwsink->stream->dll, SPA_DLL_BW_MIN, pwsink->stream->buf_duration, + pwsink->rate); err = 0.0; } else { err = SPA_CLAMPD(err, -max_err, max_err); @@ -757,8 +763,9 @@ static void update_time (GstPipeWireSink *pwsink) p->clock.next_nsec = now + (uint64_t)(p->clock.duration / corr * GST_SECOND / pwsink->rate); p->clock.rate_diff = corr; - GST_DEBUG_OBJECT(pwsink, "now %lu, position %lu, duration %lu, rate :%d, next : %lu, delay is %ld, rate_diff is %f", - p->clock.nsec, p->clock.position, p->clock.duration, pwsink->rate, p->clock.next_nsec, p->clock.delay,p->clock.rate_diff); + GST_DEBUG_OBJECT(pwsink, "now %"PRIu64", position %"PRIu64", duration %"PRIu64", rate :%d," + "next : %"PRIu64", delay is %"PRIi64", rate_diff is %f", p->clock.nsec, p->clock.position, + p->clock.duration, pwsink->rate, p->clock.next_nsec, p->clock.delay,p->clock.rate_diff); } static void @@ -775,7 +782,6 @@ static int invoke_trigger_process(struct spa_loop *loop, GstPipeWireSink *pwsink = user_data; - /* Note: We cannot use the rate for computation of other clock params * in case of video because the rate for video is set as 0 in the _setcaps. * So skip update time for video (i.e. when rate is 0). The video buffers @@ -802,7 +808,8 @@ on_state_changed (void *data, enum pw_stream_state old, enum pw_stream_state sta break; case PW_STREAM_STATE_STREAMING: if (pw_stream_is_driving (pwsink->stream->pwstream)) - pw_loop_invoke(pw_stream_get_data_loop(pwsink->stream->pwstream), invoke_trigger_process, 1, NULL, 0 , false, pwsink); + pw_loop_invoke(pw_stream_get_data_loop(pwsink->stream->pwstream), + invoke_trigger_process, 1, NULL, 0 , false, pwsink); break; case PW_STREAM_STATE_ERROR: /* make the error permanent, if it is not already; @@ -1099,7 +1106,8 @@ gst_pipewire_sink_render (GstBaseSink * bsink, GstBuffer * buffer) gst_buffer_unref (b); if (pw_stream_is_driving (pwsink->stream->pwstream)) - pw_loop_invoke(pw_stream_get_data_loop(pwsink->stream->pwstream), invoke_trigger_process, 1, NULL, 0 , false, pwsink); + pw_loop_invoke(pw_stream_get_data_loop(pwsink->stream->pwstream), + invoke_trigger_process, 1, NULL, 0 , false, pwsink); } } else { GST_TRACE_OBJECT(pwsink, "Buffer is from pipewirepool"); @@ -1107,7 +1115,8 @@ gst_pipewire_sink_render (GstBaseSink * bsink, GstBuffer * buffer) do_send_buffer (pwsink, buffer); if (pw_stream_is_driving (pwsink->stream->pwstream)) - pw_loop_invoke(pw_stream_get_data_loop(pwsink->stream->pwstream), invoke_trigger_process, 1, NULL, 0 , false, pwsink); + pw_loop_invoke(pw_stream_get_data_loop(pwsink->stream->pwstream), + invoke_trigger_process, 1, NULL, 0 , false, pwsink); } done_unlock: @@ -1128,7 +1137,6 @@ on_io_changed(void *data, uint32_t id, void *area, uint32_t size) switch (id) { case SPA_IO_Position: - GST_DEBUG_OBJECT(pwsink, "got io position %p", area); pwsink->stream->io_position = area; break; } From 8784c5e877913b9ff501aee4003367212f63cf69 Mon Sep 17 00:00:00 2001 From: Taruntej Kanakamalla Date: Mon, 12 May 2025 17:06:01 +0530 Subject: [PATCH 19/21] gst: sink: minor type fix --- src/gst/gstpipewiresink.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/gst/gstpipewiresink.c b/src/gst/gstpipewiresink.c index 9a9efde8c..c7ac851ae 100644 --- a/src/gst/gstpipewiresink.c +++ b/src/gst/gstpipewiresink.c @@ -722,8 +722,8 @@ static void update_time (GstPipeWireSink *pwsink) struct spa_io_position *p = pwsink->stream->io_position; double err = 0.0, corr = 1.0; guint64 now; - double_t max_err = pwsink->rate * MAX_ERROR_MS/1000; - double_t resync_timeout = pwsink->rate * RESYNC_TIMEOUT_MS/1000; + double max_err = pwsink->rate * MAX_ERROR_MS/1000.0; + double resync_timeout = pwsink->rate * RESYNC_TIMEOUT_MS/1000.0; if (pwsink->first_buffer) { // use the target duration before the first buffer From f12d02bccf0a018b31d72a9dd77fc5244ae2cdc2 Mon Sep 17 00:00:00 2001 From: Elliot Chen Date: Fri, 13 Jun 2025 16:52:40 +0900 Subject: [PATCH 20/21] pipewiresrc: add provide clock property --- src/gst/gstpipewiresrc.c | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/src/gst/gstpipewiresrc.c b/src/gst/gstpipewiresrc.c index 81942d332..25b0df0cb 100644 --- a/src/gst/gstpipewiresrc.c +++ b/src/gst/gstpipewiresrc.c @@ -48,6 +48,7 @@ GST_DEBUG_CATEGORY_STATIC (pipewire_src_debug); #define DEFAULT_AUTOCONNECT true #define DEFAULT_USE_BUFFERPOOL USE_BUFFERPOOL_AUTO #define DEFAULT_ON_DISCONNECT GST_PIPEWIRE_SRC_ON_DISCONNECT_NONE +#define DEFAULT_PROVIDE_CLOCK TRUE enum { @@ -66,6 +67,7 @@ enum PROP_AUTOCONNECT, PROP_USE_BUFFERPOOL, PROP_ON_DISCONNECT, + PROP_PROVIDE_CLOCK, }; GType @@ -195,6 +197,16 @@ gst_pipewire_src_set_property (GObject * object, guint prop_id, pwsrc->on_disconnect = g_value_get_enum (value); break; + case PROP_PROVIDE_CLOCK: + gboolean provide = g_value_get_boolean (value); + GST_OBJECT_LOCK (pwsrc); + if (provide) + GST_OBJECT_FLAG_SET (pwsrc, GST_ELEMENT_FLAG_PROVIDE_CLOCK); + else + GST_OBJECT_FLAG_UNSET (pwsrc, GST_ELEMENT_FLAG_PROVIDE_CLOCK); + GST_OBJECT_UNLOCK (pwsrc); + break; + default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -264,6 +276,14 @@ gst_pipewire_src_get_property (GObject * object, guint prop_id, g_value_set_enum (value, pwsrc->on_disconnect); break; + case PROP_PROVIDE_CLOCK: + gboolean result; + GST_OBJECT_LOCK (pwsrc); + result = GST_OBJECT_FLAG_IS_SET (pwsrc, GST_ELEMENT_FLAG_PROVIDE_CLOCK); + GST_OBJECT_UNLOCK (pwsrc); + g_value_set_boolean (value, result); + break; + default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -453,6 +473,15 @@ gst_pipewire_src_class_init (GstPipeWireSrcClass * klass) G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, + PROP_PROVIDE_CLOCK, + g_param_spec_boolean ("provide-clock", + "Provide Clock", + "Provide a clock to be used as the global pipeline clock", + DEFAULT_PROVIDE_CLOCK, + G_PARAM_READWRITE | + G_PARAM_STATIC_STRINGS)); + gstelement_class->provide_clock = gst_pipewire_src_provide_clock; gstelement_class->change_state = gst_pipewire_src_change_state; gstelement_class->send_event = gst_pipewire_src_send_event; From 672f3f77e2ac566991386e9ef2df13fc96319a81 Mon Sep 17 00:00:00 2001 From: Elliot Chen Date: Sat, 21 Jun 2025 17:35:48 +0900 Subject: [PATCH 21/21] pipewiresrc: fix sending last buffer failure if waiting operation exits in advance --- src/gst/gstpipewiresrc.c | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/src/gst/gstpipewiresrc.c b/src/gst/gstpipewiresrc.c index 25b0df0cb..ca55baa46 100644 --- a/src/gst/gstpipewiresrc.c +++ b/src/gst/gstpipewiresrc.c @@ -1510,6 +1510,8 @@ gst_pipewire_src_create (GstPushSrc * psrc, GstBuffer ** buffer) GstBuffer *buf; gboolean update_time = FALSE, timeout = FALSE; GstCaps *caps = NULL; + struct timespec abstime = { 0, }; + bool have_abstime = false; pwsrc = GST_PIPEWIRE_SRC (psrc); @@ -1553,13 +1555,11 @@ gst_pipewire_src_create (GstPushSrc * psrc, GstBuffer ** buffer) update_time = TRUE; GST_LOG_OBJECT (pwsrc, "EOS, send last buffer"); break; - } else if (timeout) { - if (pwsrc->last_buffer != NULL) { - update_time = TRUE; - buf = gst_buffer_ref(pwsrc->last_buffer); - GST_LOG_OBJECT (pwsrc, "timeout, send keepalive buffer"); - break; - } + } else if (timeout && pwsrc->last_buffer != NULL) { + update_time = TRUE; + buf = gst_buffer_ref(pwsrc->last_buffer); + GST_LOG_OBJECT (pwsrc, "timeout, send keepalive buffer"); + break; } else { buf = dequeue_buffer (pwsrc); GST_LOG_OBJECT (pwsrc, "popped buffer %p", buf); @@ -1571,9 +1571,13 @@ gst_pipewire_src_create (GstPushSrc * psrc, GstBuffer ** buffer) } timeout = FALSE; if (pwsrc->keepalive_time > 0) { - struct timespec abstime; - pw_thread_loop_get_time(pwsrc->stream->core->loop, &abstime, - pwsrc->keepalive_time * SPA_NSEC_PER_MSEC); + if (!have_abstime) { + /* Record the time we want to timeout at once, for this loop -- the loop might get unrelated signal()s, + * and we don't want the keepalive time to get reset by that */ + pw_thread_loop_get_time(pwsrc->stream->core->loop, &abstime, + pwsrc->keepalive_time * SPA_NSEC_PER_MSEC); + have_abstime = TRUE; + } if (pw_thread_loop_timed_wait_full (pwsrc->stream->core->loop, &abstime) == -ETIMEDOUT) timeout = TRUE; } else {