Merge branch 'gst-fixes-backport-1.4' into '1.4'

[1.4] Backport all the GStreamer fixes

See merge request pipewire/pipewire!2416
This commit is contained in:
Arun Raghavan 2025-12-19 07:51:21 +00:00
commit ec2147928d
9 changed files with 600 additions and 78 deletions

View file

@ -410,6 +410,7 @@ gst_deps_def = {
gst_dep = [] gst_dep = []
gst_dma_drm_found = false gst_dma_drm_found = false
gst_shm_allocator_found = false
foreach depname, kwargs: gst_deps_def foreach depname, kwargs: gst_deps_def
dep = dependency(depname, required: gst_option, kwargs: kwargs) dep = dependency(depname, required: gst_option, kwargs: kwargs)
summary({depname: dep.found()}, bool_yn: true, section: 'GStreamer modules') summary({depname: dep.found()}, bool_yn: true, section: 'GStreamer modules')
@ -425,9 +426,13 @@ foreach depname, kwargs: gst_deps_def
if depname == 'gstreamer-allocators-1.0' and dep.version().version_compare('>= 1.23.1') if depname == 'gstreamer-allocators-1.0' and dep.version().version_compare('>= 1.23.1')
gst_dma_drm_found = true gst_dma_drm_found = true
gst_shm_allocator_found = true
endif endif
endforeach endforeach
summary({'gstreamer SHM allocator': gst_shm_allocator_found}, bool_yn: true, section: 'Backend')
cdata.set('HAVE_GSTREAMER_SHM_ALLOCATOR', gst_shm_allocator_found)
# This code relies on the array being empty if any dependency was not found # This code relies on the array being empty if any dependency was not found
gst_dp_found = gst_dep.length() > 0 gst_dp_found = gst_dep.length() > 0
summary({'gstreamer-device-provider': gst_dp_found}, bool_yn: true, section: 'Backend') summary({'gstreamer-device-provider': gst_dp_found}, bool_yn: true, section: 'Backend')

View file

@ -1665,8 +1665,8 @@ impl_node_port_use_buffers(void *object,
b->buf = buffers[i]; b->buf = buffers[i];
if (n_datas != port->blocks) { if (n_datas != port->blocks) {
spa_log_error(this->log, "%p: invalid blocks %d on buffer %d", spa_log_error(this->log, "%p: invalid blocks %d on buffer %d, expected %d",
this, n_datas, i); this, n_datas, i, port->blocks);
return -EINVAL; return -EINVAL;
} }
if (SPA_FLAG_IS_SET(flags, SPA_NODE_BUFFERS_FLAG_ALLOC)) { if (SPA_FLAG_IS_SET(flags, SPA_NODE_BUFFERS_FLAG_ALLOC)) {

View file

@ -10,8 +10,12 @@
#include <gst/allocators/gstfdmemory.h> #include <gst/allocators/gstfdmemory.h>
#include <gst/allocators/gstdmabuf.h> #include <gst/allocators/gstdmabuf.h>
#ifdef HAVE_GSTREAMER_SHM_ALLOCATOR
#include <gst/allocators/gstshmallocator.h>
#endif
#include <gst/video/gstvideometa.h> #include <gst/video/gstvideometa.h>
#include <gst/video/gstvideopool.h>
#include "gstpipewirepool.h" #include "gstpipewirepool.h"
@ -61,13 +65,42 @@ void gst_pipewire_pool_wrap_buffer (GstPipeWirePool *pool, struct pw_buffer *b)
GstBuffer *buf; GstBuffer *buf;
uint32_t i; uint32_t i;
GstPipeWirePoolData *data; GstPipeWirePoolData *data;
/* Default to a large enough value */
gsize plane_0_size = pool->has_rawvideo ?
pool->video_info.size :
(gsize) pool->video_info.width * pool->video_info.height;
gsize plane_sizes[GST_VIDEO_MAX_PLANES] = { plane_0_size, };
GST_DEBUG_OBJECT (pool, "wrap buffer"); GST_DEBUG_OBJECT (pool, "wrap buffer, datas:%d", b->buffer->n_datas);
data = g_slice_new (GstPipeWirePoolData); data = g_slice_new (GstPipeWirePoolData);
buf = gst_buffer_new (); buf = gst_buffer_new ();
if (pool->add_metavideo) {
GstVideoMeta *meta = gst_buffer_add_video_meta_full (buf,
GST_VIDEO_FRAME_FLAG_NONE,
GST_VIDEO_INFO_FORMAT (&pool->video_info),
GST_VIDEO_INFO_WIDTH (&pool->video_info),
GST_VIDEO_INFO_HEIGHT (&pool->video_info),
GST_VIDEO_INFO_N_PLANES (&pool->video_info),
pool->video_info.offset,
pool->video_info.stride);
gst_video_meta_set_alignment (meta, pool->video_align);
if (!gst_video_meta_get_plane_size (meta, plane_sizes)) {
GST_ERROR_OBJECT (pool, "could not compute plane sizes");
}
/*
* We need to set the video meta as pooled, else gst_buffer_pool_release_buffer
* will call reset_buffer and the default_reset_buffer implementation for
* GstBufferPool removes all metadata without the POOLED flag.
*/
GST_META_FLAG_SET (meta, GST_META_FLAG_POOLED);
}
for (i = 0; i < b->buffer->n_datas; i++) { for (i = 0; i < b->buffer->n_datas; i++) {
struct spa_data *d = &b->buffer->datas[i]; struct spa_data *d = &b->buffer->datas[i];
GstMemory *gmem = NULL; GstMemory *gmem = NULL;
@ -75,7 +108,51 @@ void gst_pipewire_pool_wrap_buffer (GstPipeWirePool *pool, struct pw_buffer *b)
GST_DEBUG_OBJECT (pool, "wrap data (%s %d) %d %d", GST_DEBUG_OBJECT (pool, "wrap data (%s %d) %d %d",
spa_debug_type_find_short_name(spa_type_data_type, d->type), d->type, spa_debug_type_find_short_name(spa_type_data_type, d->type), d->type,
d->mapoffset, d->maxsize); d->mapoffset, d->maxsize);
if (d->type == SPA_DATA_MemFd) {
if (pool->allocate_memory) {
#ifdef HAVE_GSTREAMER_SHM_ALLOCATOR
gsize block_size = d->maxsize;
if (pool->has_video) {
/* For video, we know block sizes from the video info already */
block_size = plane_sizes[i];
} else {
/* For audio, reserve space based on the quantum limit and channel count */
g_autoptr (GstPipeWireStream) s = g_weak_ref_get (&pool->stream);
struct pw_context *context = pw_core_get_context(pw_stream_get_core(s->pwstream));
const struct pw_properties *props = pw_context_get_properties(context);
uint32_t quantum_limit = 8192; /* "reasonable" default */
const char *quantum = spa_dict_lookup(&props->dict, "clock.quantum-limit");
if (!quantum) {
quantum = spa_dict_lookup(&props->dict, "default.clock.quantum-limit");
GST_DEBUG_OBJECT (pool, "using default quantum limit %s", quantum);
}
if (quantum)
spa_atou32(quantum, &quantum_limit, 0);
GST_DEBUG_OBJECT (pool, "quantum limit %s", quantum);
block_size = quantum_limit * pool->audio_info.bpf;
}
GST_DEBUG_OBJECT (pool, "setting block size %zu", block_size);
if (!pool->shm_allocator)
pool->shm_allocator = gst_shm_allocator_get();
/* use MemFd only. That is the only supported data type when memory is remote i.e. allocated by the client */
gmem = gst_allocator_alloc (pool->shm_allocator, block_size, NULL);
d->fd = gst_fd_memory_get_fd (gmem);
d->mapoffset = 0;
d->flags = SPA_DATA_FLAG_READWRITE | SPA_DATA_FLAG_MAPPABLE;
d->type = SPA_DATA_MemFd;
d->maxsize = block_size;
d->data = NULL;
#endif
} else if (d->type == SPA_DATA_MemFd) {
gmem = gst_fd_allocator_alloc (pool->fd_allocator, dup(d->fd), gmem = gst_fd_allocator_alloc (pool->fd_allocator, dup(d->fd),
d->mapoffset + d->maxsize, GST_FD_MEMORY_FLAG_NONE); d->mapoffset + d->maxsize, GST_FD_MEMORY_FLAG_NONE);
gst_memory_resize (gmem, d->mapoffset, d->maxsize); gst_memory_resize (gmem, d->mapoffset, d->maxsize);
@ -89,18 +166,21 @@ void gst_pipewire_pool_wrap_buffer (GstPipeWirePool *pool, struct pw_buffer *b)
gmem = gst_memory_new_wrapped (0, d->data, d->maxsize, 0, gmem = gst_memory_new_wrapped (0, d->data, d->maxsize, 0,
d->maxsize, NULL, NULL); d->maxsize, NULL, NULL);
} }
else {
GST_WARNING_OBJECT (pool, "unknown data type (%s %d)",
spa_debug_type_find_short_name(spa_type_data_type, d->type), d->type);
}
if (gmem) if (gmem)
gst_buffer_insert_memory (buf, i, gmem); gst_buffer_insert_memory (buf, i, gmem);
} }
if (pool->add_metavideo) { if (pool->add_metavideo && !pool->allocate_memory) {
gst_buffer_add_video_meta_full (buf, GST_VIDEO_FRAME_FLAG_NONE, /* Set memory sizes to expected plane sizes, so we know the valid size,
GST_VIDEO_INFO_FORMAT (&pool->video_info), * and the offsets in the meta make sense */
GST_VIDEO_INFO_WIDTH (&pool->video_info), for (i = 0; i < gst_buffer_n_memory (buf); i++) {
GST_VIDEO_INFO_HEIGHT (&pool->video_info), GstMemory *mem = gst_buffer_peek_memory (buf, i);
GST_VIDEO_INFO_N_PLANES (&pool->video_info), gst_memory_resize (mem, 0, plane_sizes[i]);
pool->video_info.offset, }
pool->video_info.stride);
} }
data->pool = gst_object_ref (pool); data->pool = gst_object_ref (pool);
@ -133,7 +213,8 @@ void gst_pipewire_pool_remove_buffer (GstPipeWirePool *pool, struct pw_buffer *b
data->crop = NULL; data->crop = NULL;
data->videotransform = NULL; data->videotransform = NULL;
gst_buffer_remove_all_memory (data->buf); if (!pool->allocate_memory)
gst_buffer_remove_all_memory (data->buf);
/* this will also destroy the pool data, if this is the last reference */ /* this will also destroy the pool data, if this is the last reference */
gst_clear_buffer (&data->buf); gst_clear_buffer (&data->buf);
@ -212,7 +293,13 @@ no_more_buffers:
static const gchar ** static const gchar **
get_options (GstBufferPool * pool) get_options (GstBufferPool * pool)
{ {
static const gchar *options[] = { GST_BUFFER_POOL_OPTION_VIDEO_META, NULL }; static const gchar *options[] = {
GST_BUFFER_POOL_OPTION_VIDEO_META,
#ifdef HAVE_GSTREAMER_SHM_ALLOCATOR
GST_BUFFER_POOL_OPTION_VIDEO_ALIGNMENT,
#endif
NULL
};
return options; return options;
} }
@ -223,7 +310,7 @@ set_config (GstBufferPool * pool, GstStructure * config)
GstCaps *caps; GstCaps *caps;
GstStructure *structure; GstStructure *structure;
guint size, min_buffers, max_buffers; guint size, min_buffers, max_buffers;
gboolean has_video; gboolean has_videoalign;
if (!gst_buffer_pool_config_get_params (config, &caps, &size, &min_buffers, &max_buffers)) { if (!gst_buffer_pool_config_get_params (config, &caps, &size, &min_buffers, &max_buffers)) {
GST_WARNING_OBJECT (pool, "invalid config"); GST_WARNING_OBJECT (pool, "invalid config");
@ -235,18 +322,63 @@ set_config (GstBufferPool * pool, GstStructure * config)
return FALSE; return FALSE;
} }
/* We don't support unlimited buffers */
if (max_buffers == 0)
max_buffers = PIPEWIRE_POOL_MAX_BUFFERS;
/* Pick a sensible min to avoid starvation */
if (min_buffers == 0)
min_buffers = PIPEWIRE_POOL_MIN_BUFFERS;
if (min_buffers < PIPEWIRE_POOL_MIN_BUFFERS || max_buffers > PIPEWIRE_POOL_MAX_BUFFERS)
return FALSE;
structure = gst_caps_get_structure (caps, 0); structure = gst_caps_get_structure (caps, 0);
if (g_str_has_prefix (gst_structure_get_name (structure), "video/") || if (g_str_has_prefix (gst_structure_get_name (structure), "video/") ||
g_str_has_prefix (gst_structure_get_name (structure), "image/")) { g_str_has_prefix (gst_structure_get_name (structure), "image/")) {
has_video = TRUE; p->has_video = TRUE;
gst_video_info_from_caps (&p->video_info, caps); gst_video_info_from_caps (&p->video_info, caps);
if (GST_VIDEO_FORMAT_INFO_IS_VALID_RAW (p->video_info.finfo)
#ifdef HAVE_GSTREAMER_DMA_DRM
&& GST_VIDEO_FORMAT_INFO_FORMAT (p->video_info.finfo) != GST_VIDEO_FORMAT_DMA_DRM
#endif
)
p->has_rawvideo = TRUE;
else
p->has_rawvideo = FALSE;
#ifdef HAVE_GSTREAMER_SHM_ALLOCATOR
if (p->has_rawvideo) {
gst_video_alignment_reset (&p->video_align);
gst_video_info_align (&p->video_info, &p->video_align);
}
#endif
} else if (g_str_has_prefix(gst_structure_get_name(structure), "audio/")) {
p->has_video = FALSE;
gst_audio_info_from_caps(&p->audio_info, caps);
} else { } else {
has_video = FALSE; g_assert_not_reached ();
} }
p->add_metavideo = has_video && gst_buffer_pool_config_has_option (config, p->add_metavideo = p->has_rawvideo && gst_buffer_pool_config_has_option (config,
GST_BUFFER_POOL_OPTION_VIDEO_META); GST_BUFFER_POOL_OPTION_VIDEO_META);
#ifdef HAVE_GSTREAMER_SHM_ALLOCATOR
has_videoalign = p->has_rawvideo && gst_buffer_pool_config_has_option (config,
GST_BUFFER_POOL_OPTION_VIDEO_ALIGNMENT);
if (has_videoalign) {
gst_buffer_pool_config_get_video_alignment (config, &p->video_align);
gst_video_info_align (&p->video_info, &p->video_align);
gst_buffer_pool_config_set_video_alignment (config, &p->video_align);
GST_LOG_OBJECT (pool, "Set alignment: %u-%ux%u-%u",
p->video_align.padding_left, p->video_align.padding_right,
p->video_align.padding_top, p->video_align.padding_bottom);
}
#endif
if (p->video_info.size != 0) if (p->video_info.size != 0)
size = p->video_info.size; size = p->video_info.size;
@ -321,6 +453,10 @@ gst_pipewire_pool_finalize (GObject * object)
g_weak_ref_set (&pool->stream, NULL); g_weak_ref_set (&pool->stream, NULL);
g_object_unref (pool->fd_allocator); g_object_unref (pool->fd_allocator);
g_object_unref (pool->dmabuf_allocator); g_object_unref (pool->dmabuf_allocator);
#ifdef HAVE_GSTREAMER_SHM_ALLOCATOR
if (pool->shm_allocator)
g_object_unref (pool->shm_allocator);
#endif
G_OBJECT_CLASS (gst_pipewire_pool_parent_class)->finalize (object); G_OBJECT_CLASS (gst_pipewire_pool_parent_class)->finalize (object);
} }
@ -355,5 +491,8 @@ gst_pipewire_pool_init (GstPipeWirePool * pool)
{ {
pool->fd_allocator = gst_fd_allocator_new (); pool->fd_allocator = gst_fd_allocator_new ();
pool->dmabuf_allocator = gst_dmabuf_allocator_new (); pool->dmabuf_allocator = gst_dmabuf_allocator_new ();
#ifdef HAVE_GSTREAMER_SHM_ALLOCATOR
gst_shm_allocator_init_once();
#endif
g_cond_init (&pool->cond); g_cond_init (&pool->cond);
} }

View file

@ -9,6 +9,7 @@
#include <gst/gst.h> #include <gst/gst.h>
#include <gst/audio/audio.h>
#include <gst/video/video.h> #include <gst/video/video.h>
#include <pipewire/pipewire.h> #include <pipewire/pipewire.h>
@ -18,6 +19,15 @@ G_BEGIN_DECLS
#define GST_TYPE_PIPEWIRE_POOL (gst_pipewire_pool_get_type()) #define GST_TYPE_PIPEWIRE_POOL (gst_pipewire_pool_get_type())
G_DECLARE_FINAL_TYPE (GstPipeWirePool, gst_pipewire_pool, GST, PIPEWIRE_POOL, GstBufferPool) G_DECLARE_FINAL_TYPE (GstPipeWirePool, gst_pipewire_pool, GST, PIPEWIRE_POOL, GstBufferPool)
#define PIPEWIRE_POOL_MIN_BUFFERS 2u
#define PIPEWIRE_POOL_MAX_BUFFERS 16u
/* Only available in GStreamer 1.22+ */
#ifndef GST_VIDEO_FORMAT_INFO_IS_VALID_RAW
#define GST_VIDEO_FORMAT_INFO_IS_VALID_RAW(info) \
(info != NULL && (info)->format > GST_VIDEO_FORMAT_ENCODED)
#endif
typedef struct _GstPipeWirePoolData GstPipeWirePoolData; typedef struct _GstPipeWirePoolData GstPipeWirePoolData;
struct _GstPipeWirePoolData { struct _GstPipeWirePoolData {
GstPipeWirePool *pool; GstPipeWirePool *pool;
@ -37,14 +47,20 @@ struct _GstPipeWirePool {
GWeakRef stream; GWeakRef stream;
guint n_buffers; guint n_buffers;
gboolean has_video;
gboolean has_rawvideo;
gboolean add_metavideo; gboolean add_metavideo;
GstAudioInfo audio_info;
GstVideoInfo video_info; GstVideoInfo video_info;
GstVideoAlignment video_align;
GstAllocator *fd_allocator; GstAllocator *fd_allocator;
GstAllocator *dmabuf_allocator; GstAllocator *dmabuf_allocator;
GstAllocator *shm_allocator;
GCond cond; GCond cond;
gboolean paused; gboolean paused;
gboolean allocate_memory;
}; };
enum GstPipeWirePoolMode { enum GstPipeWirePoolMode {

View file

@ -40,7 +40,8 @@ GST_DEBUG_CATEGORY_STATIC (pipewire_sink_debug);
#define DEFAULT_PROP_SLAVE_METHOD GST_PIPEWIRE_SINK_SLAVE_METHOD_NONE #define DEFAULT_PROP_SLAVE_METHOD GST_PIPEWIRE_SINK_SLAVE_METHOD_NONE
#define DEFAULT_PROP_USE_BUFFERPOOL USE_BUFFERPOOL_AUTO #define DEFAULT_PROP_USE_BUFFERPOOL USE_BUFFERPOOL_AUTO
#define MIN_BUFFERS 8u #define MAX_ERROR_MS 1
#define RESYNC_TIMEOUT_MS 10
enum enum
{ {
@ -167,7 +168,8 @@ gst_pipewire_sink_propose_allocation (GstBaseSink * bsink, GstQuery * query)
GstPipeWireSink *pwsink = GST_PIPEWIRE_SINK (bsink); GstPipeWireSink *pwsink = GST_PIPEWIRE_SINK (bsink);
if (pwsink->use_bufferpool != USE_BUFFERPOOL_NO) if (pwsink->use_bufferpool != USE_BUFFERPOOL_NO)
gst_query_add_allocation_pool (query, GST_BUFFER_POOL_CAST (pwsink->stream->pool), 0, 0, 0); gst_query_add_allocation_pool (query, GST_BUFFER_POOL_CAST (pwsink->stream->pool), 0,
PIPEWIRE_POOL_MIN_BUFFERS, PIPEWIRE_POOL_MAX_BUFFERS);
gst_query_add_allocation_meta (query, GST_VIDEO_META_API_TYPE, NULL); gst_query_add_allocation_meta (query, GST_VIDEO_META_API_TYPE, NULL);
return TRUE; return TRUE;
@ -242,7 +244,8 @@ gst_pipewire_sink_class_init (GstPipeWireSinkClass * klass)
GST_TYPE_PIPEWIRE_SINK_MODE, GST_TYPE_PIPEWIRE_SINK_MODE,
DEFAULT_PROP_MODE, DEFAULT_PROP_MODE,
G_PARAM_READWRITE | G_PARAM_READWRITE |
G_PARAM_STATIC_STRINGS)); G_PARAM_STATIC_STRINGS |
GST_PARAM_MUTABLE_READY));
g_object_class_install_property (gobject_class, g_object_class_install_property (gobject_class,
PROP_FD, PROP_FD,
@ -310,21 +313,38 @@ gst_pipewire_sink_update_params (GstPipeWireSink *sink)
config = gst_buffer_pool_get_config (GST_BUFFER_POOL (pool)); config = gst_buffer_pool_get_config (GST_BUFFER_POOL (pool));
gst_buffer_pool_config_get_params (config, &caps, &size, &min_buffers, &max_buffers); gst_buffer_pool_config_get_params (config, &caps, &size, &min_buffers, &max_buffers);
/* We cannot dynamically grow the pool */
if (max_buffers == 0) {
GST_WARNING_OBJECT (sink, "cannot support unlimited buffers in pool");
max_buffers = PIPEWIRE_POOL_MAX_BUFFERS;
}
spa_pod_builder_init (&b, buffer, sizeof (buffer)); spa_pod_builder_init (&b, buffer, sizeof (buffer));
spa_pod_builder_push_object (&b, &f, SPA_TYPE_OBJECT_ParamBuffers, SPA_PARAM_Buffers); spa_pod_builder_push_object (&b, &f, SPA_TYPE_OBJECT_ParamBuffers, SPA_PARAM_Buffers);
spa_pod_builder_add (&b, spa_pod_builder_add (&b,
SPA_PARAM_BUFFERS_size, SPA_POD_CHOICE_RANGE_Int(size, size, INT32_MAX), SPA_PARAM_BUFFERS_size, SPA_POD_CHOICE_RANGE_Int(size, size, INT32_MAX),
0); 0);
if (sink->is_rawvideo) {
/* MUST have n_datas == n_planes */
spa_pod_builder_add (&b,
SPA_PARAM_BUFFERS_blocks,
SPA_POD_Int(GST_VIDEO_INFO_N_PLANES (&pool->video_info)),
0);
} else {
/* Non-planar data, get a single block */
spa_pod_builder_add (&b,
SPA_PARAM_BUFFERS_blocks,
SPA_POD_Int(1),
0);
}
spa_pod_builder_add (&b, spa_pod_builder_add (&b,
SPA_PARAM_BUFFERS_stride, SPA_POD_CHOICE_RANGE_Int(0, 0, INT32_MAX), SPA_PARAM_BUFFERS_stride, SPA_POD_CHOICE_RANGE_Int(0, 0, INT32_MAX),
/* At this stage, we will request as many buffers as we _might_ need as
* the default, since we can't grow the pool once this is set */
SPA_PARAM_BUFFERS_buffers, SPA_POD_CHOICE_RANGE_Int( SPA_PARAM_BUFFERS_buffers, SPA_POD_CHOICE_RANGE_Int(
SPA_MAX(MIN_BUFFERS, min_buffers), max_buffers, min_buffers, max_buffers),
SPA_MAX(MIN_BUFFERS, min_buffers), SPA_PARAM_BUFFERS_dataType, SPA_POD_CHOICE_FLAGS_Int(1<<SPA_DATA_MemFd),
max_buffers ? max_buffers : INT32_MAX),
SPA_PARAM_BUFFERS_dataType, SPA_POD_CHOICE_FLAGS_Int(
(1<<SPA_DATA_MemFd) |
(1<<SPA_DATA_MemPtr)),
0); 0);
port_params[n_params++] = spa_pod_builder_pop (&b, &f); port_params[n_params++] = spa_pod_builder_pop (&b, &f);
@ -333,7 +353,7 @@ gst_pipewire_sink_update_params (GstPipeWireSink *sink)
SPA_PARAM_META_type, SPA_POD_Id(SPA_META_Header), SPA_PARAM_META_type, SPA_POD_Id(SPA_META_Header),
SPA_PARAM_META_size, SPA_POD_Int(sizeof (struct spa_meta_header))); SPA_PARAM_META_size, SPA_POD_Int(sizeof (struct spa_meta_header)));
if (sink->is_video) { if (sink->is_rawvideo) {
port_params[n_params++] = spa_pod_builder_add_object (&b, port_params[n_params++] = spa_pod_builder_add_object (&b,
SPA_TYPE_OBJECT_ParamMeta, SPA_PARAM_Meta, SPA_TYPE_OBJECT_ParamMeta, SPA_PARAM_Meta,
SPA_PARAM_META_type, SPA_POD_Id(SPA_META_VideoCrop), SPA_PARAM_META_type, SPA_POD_Id(SPA_META_VideoCrop),
@ -359,7 +379,8 @@ gst_pipewire_sink_init (GstPipeWireSink * sink)
sink->mode = DEFAULT_PROP_MODE; sink->mode = DEFAULT_PROP_MODE;
sink->use_bufferpool = DEFAULT_PROP_USE_BUFFERPOOL; sink->use_bufferpool = DEFAULT_PROP_USE_BUFFERPOOL;
sink->is_video = false; sink->is_rawvideo = false;
sink->first_buffer = true;
GST_OBJECT_FLAG_SET (sink, GST_ELEMENT_FLAG_PROVIDE_CLOCK); GST_OBJECT_FLAG_SET (sink, GST_ELEMENT_FLAG_PROVIDE_CLOCK);
@ -377,7 +398,7 @@ gst_pipewire_sink_sink_fixate (GstBaseSink * bsink, GstCaps * caps)
structure = gst_caps_get_structure (caps, 0); structure = gst_caps_get_structure (caps, 0);
if (gst_structure_has_name (structure, "video/x-raw")) { if (gst_structure_has_name (structure, "video/x-raw")) {
pwsink->is_video = true; pwsink->is_rawvideo = true;
gst_structure_fixate_field_nearest_int (structure, "width", 320); gst_structure_fixate_field_nearest_int (structure, "width", 320);
gst_structure_fixate_field_nearest_int (structure, "height", 240); gst_structure_fixate_field_nearest_int (structure, "height", 240);
gst_structure_fixate_field_nearest_fraction (structure, "framerate", 30, 1); gst_structure_fixate_field_nearest_fraction (structure, "framerate", 30, 1);
@ -591,14 +612,17 @@ static void
on_remove_buffer (void *_data, struct pw_buffer *b) on_remove_buffer (void *_data, struct pw_buffer *b)
{ {
GstPipeWireSink *pwsink = _data; GstPipeWireSink *pwsink = _data;
GST_DEBUG_OBJECT (pwsink, "remove pw_buffer %p", b); GST_DEBUG_OBJECT (pwsink, "remove pw_buffer %p", b);
gst_pipewire_pool_remove_buffer (pwsink->stream->pool, b); gst_pipewire_pool_remove_buffer (pwsink->stream->pool, b);
if (!gst_pipewire_pool_has_buffers (pwsink->stream->pool) && if (!gst_pipewire_pool_has_buffers (pwsink->stream->pool) &&
!GST_BUFFER_POOL_IS_FLUSHING (GST_BUFFER_POOL_CAST (pwsink->stream->pool))) { !GST_BUFFER_POOL_IS_FLUSHING (GST_BUFFER_POOL_CAST (pwsink->stream->pool))) {
GST_ELEMENT_ERROR (pwsink, RESOURCE, NOT_FOUND, if (pwsink->mode != GST_PIPEWIRE_SINK_MODE_PROVIDE) {
("all buffers have been removed"), GST_ELEMENT_ERROR (pwsink, RESOURCE, NOT_FOUND,
("PipeWire link to remote node was destroyed")); ("all buffers have been removed"),
("PipeWire link to remote node was destroyed"));
}
} }
} }
@ -633,6 +657,9 @@ do_send_buffer (GstPipeWireSink *pwsink, GstBuffer *buffer)
} }
} }
data->b->size = 0; data->b->size = 0;
spa_assert(b->n_datas == gst_buffer_n_memory(buffer));
for (i = 0; i < b->n_datas; i++) { for (i = 0; i < b->n_datas; i++) {
struct spa_data *d = &b->datas[i]; struct spa_data *d = &b->datas[i];
GstMemory *mem = gst_buffer_peek_memory (buffer, i); GstMemory *mem = gst_buffer_peek_memory (buffer, i);
@ -646,16 +673,20 @@ do_send_buffer (GstPipeWireSink *pwsink, GstBuffer *buffer)
GstVideoMeta *meta = gst_buffer_get_video_meta (buffer); GstVideoMeta *meta = gst_buffer_get_video_meta (buffer);
if (meta) { if (meta) {
if (meta->n_planes == b->n_datas) { if (meta->n_planes == b->n_datas) {
uint32_t n_planes = GST_VIDEO_INFO_N_PLANES (&data->pool->video_info);
gsize video_size = 0; gsize video_size = 0;
for (i = 0; i < meta->n_planes; i++) {
for (i = 0; i < n_planes; i++) {
struct spa_data *d = &b->datas[i]; struct spa_data *d = &b->datas[i];
d->chunk->offset += meta->offset[i] - video_size;
d->chunk->stride = meta->stride[i]; d->chunk->stride = meta->stride[i];
d->chunk->offset = meta->offset[i] - video_size;
video_size += d->chunk->size; video_size += d->chunk->size;
} }
} else { } else {
GST_ERROR_OBJECT (pwsink, "plane num not matching, meta:%u buffer:%u", meta->n_planes, b->n_datas); GST_ERROR_OBJECT (pwsink, "plane num not matching, meta:%u buffer:%u",
meta->n_planes, b->n_datas);
} }
} }
@ -663,7 +694,18 @@ do_send_buffer (GstPipeWireSink *pwsink, GstBuffer *buffer)
GST_WARNING_OBJECT (pwsink, "can't send buffer %s", spa_strerror(res)); GST_WARNING_OBJECT (pwsink, "can't send buffer %s", spa_strerror(res));
} else { } else {
data->queued = TRUE; data->queued = TRUE;
GST_LOG_OBJECT(pwsink, "queued pwbuffer: %p; gstbuffer %p ",data->b, buffer); GST_LOG_OBJECT(pwsink, "queued pwbuffer: %p size: %"PRIu64"; gstbuffer %p",
data->b, data->b->size, buffer);
if (pwsink->first_buffer) {
pwsink->first_buffer = false;
pwsink->first_buffer_pts = GST_BUFFER_PTS(buffer);
}
stream->position = gst_util_uint64_scale_int(GST_BUFFER_PTS(buffer) - pwsink->first_buffer_pts,
pwsink->rate, 1 * GST_SECOND);
// have the buffer duration value minimum as 1, in case of video where rate is 0 (not applicable)
stream->buf_duration = SPA_MAX((uint64_t)1, gst_util_uint64_scale_int(GST_BUFFER_DURATION(buffer),
pwsink->rate, 1 * GST_SECOND));
} }
switch (pwsink->slave_method) { switch (pwsink->slave_method) {
@ -675,6 +717,56 @@ do_send_buffer (GstPipeWireSink *pwsink, GstBuffer *buffer)
} }
} }
static void update_time (GstPipeWireSink *pwsink)
{
struct spa_io_position *p = pwsink->stream->io_position;
double err = 0.0, corr = 1.0;
guint64 now;
double max_err = pwsink->rate * MAX_ERROR_MS/1000.0;
double resync_timeout = pwsink->rate * RESYNC_TIMEOUT_MS/1000.0;
if (pwsink->first_buffer) {
// use the target duration before the first buffer
pwsink->stream->buf_duration = p->clock.target_duration;
spa_dll_set_bw(&pwsink->stream->dll, SPA_DLL_BW_MIN, pwsink->stream->buf_duration,
pwsink->rate);
}
now = pw_stream_get_nsec(pwsink->stream->pwstream);
err = (double)gst_util_uint64_scale(now, pwsink->rate, 1 * GST_SECOND) -
(double)gst_util_uint64_scale(p->clock.next_nsec, pwsink->rate, 1 * GST_SECOND);
GST_LOG_OBJECT(pwsink, "err is %f max err is %f now %"PRIu64" next is %"PRIu64"", err, max_err, now,
p->clock.next_nsec);
if (fabs(err) > max_err) {
if (fabs(err) > resync_timeout) {
GST_WARNING_OBJECT(pwsink, "err %f exceeds resync timeout, resetting", err);
spa_dll_set_bw(&pwsink->stream->dll, SPA_DLL_BW_MIN, pwsink->stream->buf_duration,
pwsink->rate);
err = 0.0;
} else {
err = SPA_CLAMPD(err, -max_err, max_err);
}
}
corr = spa_dll_update(&pwsink->stream->dll, err);
p->clock.nsec = now;
p->clock.position = pwsink->stream->position;
p->clock.duration = pwsink->stream->buf_duration;
/* we don't have a way to estimate the target (next cycle) buffer duration
* so use the current buffer duration
*/
p->clock.target_duration = pwsink->stream->buf_duration;
p->clock.rate = SPA_FRACTION(1, pwsink->rate);
// current time plus duration scaled with correlation
p->clock.next_nsec = now + (uint64_t)(p->clock.duration / corr * GST_SECOND / pwsink->rate);
p->clock.rate_diff = corr;
GST_DEBUG_OBJECT(pwsink, "now %"PRIu64", position %"PRIu64", duration %"PRIu64", rate :%d,"
"next : %"PRIu64", delay is %"PRIi64", rate_diff is %f", p->clock.nsec, p->clock.position,
p->clock.duration, pwsink->rate, p->clock.next_nsec, p->clock.delay,p->clock.rate_diff);
}
static void static void
on_process (void *data) on_process (void *data)
@ -684,6 +776,23 @@ on_process (void *data)
g_cond_signal (&pwsink->stream->pool->cond); g_cond_signal (&pwsink->stream->pool->cond);
} }
static int invoke_trigger_process(struct spa_loop *loop,
bool async, uint32_t seq, const void *data, size_t size, void *user_data)
{
GstPipeWireSink *pwsink = user_data;
/* Note: We cannot use the rate for computation of other clock params
* in case of video because the rate for video is set as 0 in the _setcaps.
* So skip update time for video (i.e. when rate is 0). The video buffers
* get timestamp from the SPA_META_Header anyway
*/
if (pwsink->rate)
update_time(pwsink);
return pw_stream_trigger_process(pwsink->stream->pwstream);
}
static void static void
on_state_changed (void *data, enum pw_stream_state old, enum pw_stream_state state, const char *error) on_state_changed (void *data, enum pw_stream_state old, enum pw_stream_state state, const char *error)
{ {
@ -699,7 +808,8 @@ on_state_changed (void *data, enum pw_stream_state old, enum pw_stream_state sta
break; break;
case PW_STREAM_STATE_STREAMING: case PW_STREAM_STATE_STREAMING:
if (pw_stream_is_driving (pwsink->stream->pwstream)) if (pw_stream_is_driving (pwsink->stream->pwstream))
pw_stream_trigger_process (pwsink->stream->pwstream); pw_loop_invoke(pw_stream_get_data_loop(pwsink->stream->pwstream),
invoke_trigger_process, 1, NULL, 0 , false, pwsink);
break; break;
case PW_STREAM_STATE_ERROR: case PW_STREAM_STATE_ERROR:
/* make the error permanent, if it is not already; /* make the error permanent, if it is not already;
@ -759,9 +869,21 @@ gst_pipewire_sink_setcaps (GstBaseSink * bsink, GstCaps * caps)
if (pwsink->use_bufferpool != USE_BUFFERPOOL_YES) if (pwsink->use_bufferpool != USE_BUFFERPOOL_YES)
pwsink->use_bufferpool = USE_BUFFERPOOL_NO; pwsink->use_bufferpool = USE_BUFFERPOOL_NO;
} else { } else {
GstVideoInfo video_info;
pwsink->rate = rate = 0; pwsink->rate = rate = 0;
pwsink->rate_match = false; pwsink->rate_match = false;
pwsink->is_video = true;
gst_video_info_from_caps (&video_info, caps);
if (GST_VIDEO_FORMAT_INFO_IS_VALID_RAW (video_info.finfo)
#ifdef HAVE_GSTREAMER_DMA_DRM
&& GST_VIDEO_FORMAT_INFO_FORMAT (video_info.finfo) != GST_VIDEO_FORMAT_DMA_DRM
#endif
)
pwsink->is_rawvideo = TRUE;
else
pwsink->is_rawvideo = FALSE;
} }
spa_dll_set_bw(&pwsink->stream->dll, SPA_DLL_BW_MIN, 4096, rate); spa_dll_set_bw(&pwsink->stream->dll, SPA_DLL_BW_MIN, 4096, rate);
@ -788,6 +910,11 @@ gst_pipewire_sink_setcaps (GstBaseSink * bsink, GstCaps * caps)
else else
flags |= PW_STREAM_FLAG_DRIVER; flags |= PW_STREAM_FLAG_DRIVER;
#ifdef HAVE_GSTREAMER_SHM_ALLOCATOR
flags |= PW_STREAM_FLAG_ALLOC_BUFFERS;
pwsink->stream->pool->allocate_memory = true;
#endif
target_id = pwsink->stream->path ? (uint32_t)atoi(pwsink->stream->path) : PW_ID_ANY; target_id = pwsink->stream->path ? (uint32_t)atoi(pwsink->stream->path) : PW_ID_ANY;
if (pwsink->stream->target_object) { if (pwsink->stream->target_object) {
@ -841,8 +968,12 @@ gst_pipewire_sink_setcaps (GstBaseSink * bsink, GstCaps * caps)
config = gst_buffer_pool_get_config (GST_BUFFER_POOL_CAST (pwsink->stream->pool)); config = gst_buffer_pool_get_config (GST_BUFFER_POOL_CAST (pwsink->stream->pool));
gst_buffer_pool_config_get_params (config, NULL, &size, &min_buffers, &max_buffers); gst_buffer_pool_config_get_params (config, NULL, &size, &min_buffers, &max_buffers);
gst_buffer_pool_config_set_params (config, caps, size, min_buffers, max_buffers); gst_buffer_pool_config_set_params (config, caps, size, min_buffers, max_buffers);
if(pwsink->is_video) if (pwsink->is_rawvideo) {
gst_buffer_pool_config_add_option(config, GST_BUFFER_POOL_OPTION_VIDEO_META); gst_buffer_pool_config_add_option (config, GST_BUFFER_POOL_OPTION_VIDEO_META);
#ifdef HAVE_GSTREAMER_SHM_ALLOCATOR
gst_buffer_pool_config_add_option (config, GST_BUFFER_POOL_OPTION_VIDEO_ALIGNMENT);
#endif
}
gst_buffer_pool_set_config (GST_BUFFER_POOL_CAST (pwsink->stream->pool), config); gst_buffer_pool_set_config (GST_BUFFER_POOL_CAST (pwsink->stream->pool), config);
pw_thread_loop_unlock (pwsink->stream->core->loop); pw_thread_loop_unlock (pwsink->stream->core->loop);
@ -924,20 +1055,17 @@ gst_pipewire_sink_render (GstBaseSink * bsink, GstBuffer * buffer)
if (res != GST_FLOW_OK) if (res != GST_FLOW_OK)
goto done; goto done;
if (pwsink->is_video) { if (pwsink->is_rawvideo) {
GstVideoFrame src, dst; GstVideoFrame src, dst;
gboolean copied = FALSE; gboolean copied = FALSE;
buf_size = 0; // to break from the loop buf_size = 0; // to break from the loop
/* /* splitting of buffers in the case of video might break the frame layout
splitting of buffers in the case of video might break the frame layout * and that seems to be causing issues while retrieving the buffers on the receiver
and that seems to be causing issues while retrieving the buffers on the receiver * side. Hence use the video_frame_map to copy the buffer of bigger size into the
side. Hence use the video_frame_map to copy the buffer of bigger size into the * pipewirepool's buffer */
pipewirepool's buffer
*/
if (!gst_video_frame_map (&dst, &pwsink->stream->pool->video_info, b, if (!gst_video_frame_map (&dst, &pwsink->stream->pool->video_info, b, GST_MAP_WRITE)) {
GST_MAP_WRITE)) {
GST_ERROR_OBJECT(pwsink, "Failed to map dest buffer"); GST_ERROR_OBJECT(pwsink, "Failed to map dest buffer");
return GST_FLOW_ERROR; return GST_FLOW_ERROR;
} }
@ -957,8 +1085,6 @@ gst_pipewire_sink_render (GstBaseSink * bsink, GstBuffer * buffer)
GST_ERROR_OBJECT(pwsink, "Failed to copy the frame"); GST_ERROR_OBJECT(pwsink, "Failed to copy the frame");
return GST_FLOW_ERROR; return GST_FLOW_ERROR;
} }
gst_buffer_copy_into(b, buffer, GST_BUFFER_COPY_METADATA, 0, -1);
} else { } else {
gst_buffer_map (b, &info, GST_MAP_WRITE); gst_buffer_map (b, &info, GST_MAP_WRITE);
gsize extract_size = (buf_size <= info.maxsize) ? buf_size: info.maxsize; gsize extract_size = (buf_size <= info.maxsize) ? buf_size: info.maxsize;
@ -980,7 +1106,8 @@ gst_pipewire_sink_render (GstBaseSink * bsink, GstBuffer * buffer)
gst_buffer_unref (b); gst_buffer_unref (b);
if (pw_stream_is_driving (pwsink->stream->pwstream)) if (pw_stream_is_driving (pwsink->stream->pwstream))
pw_stream_trigger_process (pwsink->stream->pwstream); pw_loop_invoke(pw_stream_get_data_loop(pwsink->stream->pwstream),
invoke_trigger_process, 1, NULL, 0 , false, pwsink);
} }
} else { } else {
GST_TRACE_OBJECT(pwsink, "Buffer is from pipewirepool"); GST_TRACE_OBJECT(pwsink, "Buffer is from pipewirepool");
@ -988,7 +1115,8 @@ gst_pipewire_sink_render (GstBaseSink * bsink, GstBuffer * buffer)
do_send_buffer (pwsink, buffer); do_send_buffer (pwsink, buffer);
if (pw_stream_is_driving (pwsink->stream->pwstream)) if (pw_stream_is_driving (pwsink->stream->pwstream))
pw_stream_trigger_process (pwsink->stream->pwstream); pw_loop_invoke(pw_stream_get_data_loop(pwsink->stream->pwstream),
invoke_trigger_process, 1, NULL, 0 , false, pwsink);
} }
done_unlock: done_unlock:
@ -1002,6 +1130,18 @@ not_negotiated:
} }
} }
static void
on_io_changed(void *data, uint32_t id, void *area, uint32_t size)
{
GstPipeWireSink *pwsink = data;
switch (id) {
case SPA_IO_Position:
pwsink->stream->io_position = area;
break;
}
}
static const struct pw_stream_events stream_events = { static const struct pw_stream_events stream_events = {
PW_VERSION_STREAM_EVENTS, PW_VERSION_STREAM_EVENTS,
.state_changed = on_state_changed, .state_changed = on_state_changed,
@ -1009,6 +1149,7 @@ static const struct pw_stream_events stream_events = {
.add_buffer = on_add_buffer, .add_buffer = on_add_buffer,
.remove_buffer = on_remove_buffer, .remove_buffer = on_remove_buffer,
.process = on_process, .process = on_process,
.io_changed = on_io_changed,
}; };
static GstStateChangeReturn static GstStateChangeReturn
@ -1023,6 +1164,14 @@ gst_pipewire_sink_change_state (GstElement * element, GstStateChange transition)
goto open_failed; goto open_failed;
break; break;
case GST_STATE_CHANGE_READY_TO_PAUSED: case GST_STATE_CHANGE_READY_TO_PAUSED:
/* If we are a driver, we shouldn't try to also provide the clock, as we
* _are_ the clock for the graph. For that case, we rely on the pipeline
* clock to drive the pipeline (and thus the graph). */
if (this->mode == GST_PIPEWIRE_SINK_MODE_PROVIDE)
GST_OBJECT_FLAG_UNSET (this, GST_ELEMENT_FLAG_PROVIDE_CLOCK);
else
GST_OBJECT_FLAG_SET (this, GST_ELEMENT_FLAG_PROVIDE_CLOCK);
/* the initial stream state is active, which is needed for linking and /* the initial stream state is active, which is needed for linking and
* negotiation to happen and the bufferpool to be set up. We don't know * negotiation to happen and the bufferpool to be set up. We don't know
* if we'll go to plaing, so we deactivate the stream until that * if we'll go to plaing, so we deactivate the stream until that

View file

@ -69,7 +69,9 @@ struct _GstPipeWireSink {
gboolean negotiated; gboolean negotiated;
gboolean rate_match; gboolean rate_match;
gint rate; gint rate;
gboolean is_video; gboolean is_rawvideo;
gboolean first_buffer;
GstClockTime first_buffer_pts;
GstPipeWireSinkMode mode; GstPipeWireSinkMode mode;
GstPipeWireSinkSlaveMethod slave_method; GstPipeWireSinkSlaveMethod slave_method;

View file

@ -46,7 +46,9 @@ GST_DEBUG_CATEGORY_STATIC (pipewire_src_debug);
#define DEFAULT_RESEND_LAST false #define DEFAULT_RESEND_LAST false
#define DEFAULT_KEEPALIVE_TIME 0 #define DEFAULT_KEEPALIVE_TIME 0
#define DEFAULT_AUTOCONNECT true #define DEFAULT_AUTOCONNECT true
#define DEFAULT_USE_BUFFERPOOL USE_BUFFERPOOL_AUTO #define DEFAULT_USE_BUFFERPOOL USE_BUFFERPOOL_AUTO
#define DEFAULT_ON_DISCONNECT GST_PIPEWIRE_SRC_ON_DISCONNECT_NONE
#define DEFAULT_PROVIDE_CLOCK TRUE
enum enum
{ {
@ -64,8 +66,29 @@ enum
PROP_KEEPALIVE_TIME, PROP_KEEPALIVE_TIME,
PROP_AUTOCONNECT, PROP_AUTOCONNECT,
PROP_USE_BUFFERPOOL, PROP_USE_BUFFERPOOL,
PROP_ON_DISCONNECT,
PROP_PROVIDE_CLOCK,
}; };
GType
gst_pipewire_src_on_disconnect_get_type (void)
{
static gsize on_disconnect_type = 0;
static const GEnumValue on_disconnect[] = {
{GST_PIPEWIRE_SRC_ON_DISCONNECT_NONE, "GST_PIPEWIRE_SRC_ON_DISCONNECT_NONE", "none"},
{GST_PIPEWIRE_SRC_ON_DISCONNECT_EOS, "GST_PIPEWIRE_SRC_ON_DISCONNECT_EOS", "eos"},
{GST_PIPEWIRE_SRC_ON_DISCONNECT_ERROR, "GST_PIPEWIRE_SRC_ON_DISCONNECT_ERROR", "error"},
{0, NULL, NULL},
};
if (g_once_init_enter (&on_disconnect_type)) {
GType tmp =
g_enum_register_static ("GstPipeWireSrcOnDisconnect", on_disconnect);
g_once_init_leave (&on_disconnect_type, tmp);
}
return (GType) on_disconnect_type;
}
static GstStaticPadTemplate gst_pipewire_src_template = static GstStaticPadTemplate gst_pipewire_src_template =
GST_STATIC_PAD_TEMPLATE ("src", GST_STATIC_PAD_TEMPLATE ("src",
@ -170,6 +193,20 @@ gst_pipewire_src_set_property (GObject * object, guint prop_id,
pwsrc->use_bufferpool = USE_BUFFERPOOL_NO; pwsrc->use_bufferpool = USE_BUFFERPOOL_NO;
break; break;
case PROP_ON_DISCONNECT:
pwsrc->on_disconnect = g_value_get_enum (value);
break;
case PROP_PROVIDE_CLOCK:
gboolean provide = g_value_get_boolean (value);
GST_OBJECT_LOCK (pwsrc);
if (provide)
GST_OBJECT_FLAG_SET (pwsrc, GST_ELEMENT_FLAG_PROVIDE_CLOCK);
else
GST_OBJECT_FLAG_UNSET (pwsrc, GST_ELEMENT_FLAG_PROVIDE_CLOCK);
GST_OBJECT_UNLOCK (pwsrc);
break;
default: default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break; break;
@ -235,6 +272,18 @@ gst_pipewire_src_get_property (GObject * object, guint prop_id,
g_value_set_boolean (value, !!pwsrc->use_bufferpool); g_value_set_boolean (value, !!pwsrc->use_bufferpool);
break; break;
case PROP_ON_DISCONNECT:
g_value_set_enum (value, pwsrc->on_disconnect);
break;
case PROP_PROVIDE_CLOCK:
gboolean result;
GST_OBJECT_LOCK (pwsrc);
result = GST_OBJECT_FLAG_IS_SET (pwsrc, GST_ELEMENT_FLAG_PROVIDE_CLOCK);
GST_OBJECT_UNLOCK (pwsrc);
g_value_set_boolean (value, result);
break;
default: default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break; break;
@ -414,6 +463,25 @@ gst_pipewire_src_class_init (GstPipeWireSrcClass * klass)
G_PARAM_READWRITE | G_PARAM_READWRITE |
G_PARAM_STATIC_STRINGS)); G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class,
PROP_ON_DISCONNECT,
g_param_spec_enum ("on-disconnect",
"On disconnect",
"Action to take on disconnect",
GST_TYPE_PIPEWIRE_SRC_ON_DISCONNECT,
DEFAULT_ON_DISCONNECT,
G_PARAM_READWRITE |
G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class,
PROP_PROVIDE_CLOCK,
g_param_spec_boolean ("provide-clock",
"Provide Clock",
"Provide a clock to be used as the global pipeline clock",
DEFAULT_PROVIDE_CLOCK,
G_PARAM_READWRITE |
G_PARAM_STATIC_STRINGS));
gstelement_class->provide_clock = gst_pipewire_src_provide_clock; gstelement_class->provide_clock = gst_pipewire_src_provide_clock;
gstelement_class->change_state = gst_pipewire_src_change_state; gstelement_class->change_state = gst_pipewire_src_change_state;
gstelement_class->send_event = gst_pipewire_src_send_event; gstelement_class->send_event = gst_pipewire_src_send_event;
@ -462,6 +530,9 @@ gst_pipewire_src_init (GstPipeWireSrc * src)
src->autoconnect = DEFAULT_AUTOCONNECT; src->autoconnect = DEFAULT_AUTOCONNECT;
src->min_latency = 0; src->min_latency = 0;
src->max_latency = GST_CLOCK_TIME_NONE; src->max_latency = GST_CLOCK_TIME_NONE;
src->n_buffers = 0;
src->flushing_on_remove_buffer = FALSE;
src->on_disconnect = DEFAULT_ON_DISCONNECT;
src->transform_value = UINT32_MAX; src->transform_value = UINT32_MAX;
} }
@ -469,11 +540,26 @@ gst_pipewire_src_init (GstPipeWireSrc * src)
static gboolean static gboolean
buffer_recycle (GstMiniObject *obj) buffer_recycle (GstMiniObject *obj)
{ {
GstPipeWireSrc *src; GstPipeWirePoolData *data = gst_pipewire_pool_get_data (GST_BUFFER_CAST(obj));
GstPipeWirePoolData *data; GstPipeWireSrc *src = data->owner;
int res; int res;
data = gst_pipewire_pool_get_data (GST_BUFFER_CAST(obj)); if (src->flushing_on_remove_buffer) {
/*
* If a flush-start was initiated, this might be called by elements like
* queues downstream purging buffers from their internal queues. This can
* deadlock if queues use min-threshold-buffers/bytes/time with src_create
* trying to take the loop lock and buffer_recycle trying to take the loop
* lock down below. We return from here, to prevent deadlock with streaming
* thread in a queue thread.
*
* We will take care of queueing the buffer in on_remove_buffer.
*/
GstBuffer *buffer = GST_BUFFER_CAST(obj);
GST_DEBUG_OBJECT (src,
"flush-start initiated, skipping buffer recycle %p", buffer);
return TRUE;
}
GST_OBJECT_LOCK (data->pool); GST_OBJECT_LOCK (data->pool);
if (!obj->dispose) { if (!obj->dispose) {
@ -482,7 +568,6 @@ buffer_recycle (GstMiniObject *obj)
} }
GST_BUFFER_FLAGS (obj) = data->flags; GST_BUFFER_FLAGS (obj) = data->flags;
src = data->owner;
pw_thread_loop_lock (src->stream->core->loop); pw_thread_loop_lock (src->stream->core->loop);
if (!obj->dispose) { if (!obj->dispose) {
@ -519,6 +604,8 @@ on_add_buffer (void *_data, struct pw_buffer *b)
data->owner = pwsrc; data->owner = pwsrc;
data->queued = TRUE; data->queued = TRUE;
GST_MINI_OBJECT_CAST (data->buf)->dispose = buffer_recycle; GST_MINI_OBJECT_CAST (data->buf)->dispose = buffer_recycle;
pwsrc->n_buffers++;
} }
static void static void
@ -527,17 +614,76 @@ on_remove_buffer (void *_data, struct pw_buffer *b)
GstPipeWireSrc *pwsrc = _data; GstPipeWireSrc *pwsrc = _data;
GstPipeWirePoolData *data = b->user_data; GstPipeWirePoolData *data = b->user_data;
GstBuffer *buf = data->buf; GstBuffer *buf = data->buf;
gboolean flush_on_remove;
int res; int res;
GST_DEBUG_OBJECT (pwsrc, "remove buffer %p", buf); GST_DEBUG_OBJECT (pwsrc, "remove buffer %p, queued: %d",
buf, data->queued);
GST_MINI_OBJECT_CAST (buf)->dispose = NULL; GST_MINI_OBJECT_CAST (buf)->dispose = NULL;
flush_on_remove =
pwsrc->on_disconnect == GST_PIPEWIRE_SRC_ON_DISCONNECT_ERROR ||
pwsrc->on_disconnect == GST_PIPEWIRE_SRC_ON_DISCONNECT_EOS;
if (flush_on_remove && !pwsrc->flushing_on_remove_buffer) {
pwsrc->flushing_on_remove_buffer = TRUE;
GST_DEBUG_OBJECT (pwsrc, "flush-start on remove buffer");
/*
* It is possible that when buffers are being removed, a downstream
* element can be holding on to a buffer or in the middle of rendering
* the same. Former is possible with queues min-threshold-buffers or
* similar. Latter can result in a crash during gst_video_frame_copy.
*
* We send a flush-start event downstream to make elements discard
* any buffers they may be holding on to as well as return from their
* chain function ASAP.
*/
gst_pad_push_event (GST_BASE_SRC_PAD (pwsrc),
gst_event_new_flush_start ());
}
if (data->queued) { if (data->queued) {
gst_buffer_unref (buf); gst_buffer_unref (buf);
} else { } else {
if ((res = pw_stream_queue_buffer (pwsrc->stream->pwstream, b)) < 0) if ((res = pw_stream_queue_buffer (pwsrc->stream->pwstream, b)) < 0)
GST_WARNING_OBJECT (pwsrc, "can't queue removed buffer %p, %s", buf, spa_strerror(res)); GST_WARNING_OBJECT (pwsrc, "can't queue removed buffer %p, %s",
buf, spa_strerror(res));
else
GST_DEBUG_OBJECT (pwsrc, "queued buffer %p", buf);
}
pwsrc->n_buffers--;
if (pwsrc->n_buffers == 0) {
GST_DEBUG_OBJECT (pwsrc, "removed all buffers");
pwsrc->flushing_on_remove_buffer = FALSE;
switch (pwsrc->on_disconnect) {
case GST_PIPEWIRE_SRC_ON_DISCONNECT_ERROR:
GST_DEBUG_OBJECT (pwsrc, "flush-stop on removing all buffers");
gst_pad_push_event (GST_BASE_SRC_PAD (pwsrc),
gst_event_new_flush_stop (FALSE));
GST_ELEMENT_ERROR (pwsrc, RESOURCE, NOT_FOUND,
("all buffers have been removed"),
("PipeWire link to remote node was destroyed"));
break;
case GST_PIPEWIRE_SRC_ON_DISCONNECT_EOS:
GST_DEBUG_OBJECT (pwsrc, "flush-stop on removing all buffers");
gst_pad_push_event (GST_BASE_SRC_PAD (pwsrc),
gst_event_new_flush_stop (FALSE));
GST_DEBUG_OBJECT (pwsrc, "sending eos downstream");
gst_pad_push_event (GST_BASE_SRC_PAD (pwsrc),
gst_event_new_eos());
break;
case GST_PIPEWIRE_SRC_ON_DISCONNECT_NONE:
GST_DEBUG_OBJECT (pwsrc, "stream closed or removed");
break;
}
} }
} }
@ -660,9 +806,12 @@ static GstBuffer *dequeue_buffer(GstPipeWireSrc *pwsrc)
pwsrc->transform_value = transform_value; pwsrc->transform_value = transform_value;
} }
if (pwsrc->is_video) { if (pwsrc->is_rawvideo) {
gsize video_size = 0;
GstVideoInfo *info = &pwsrc->video_info; GstVideoInfo *info = &pwsrc->video_info;
uint32_t n_datas = b->buffer->n_datas;
uint32_t n_planes = GST_VIDEO_INFO_N_PLANES (info);
gsize video_size = 0;
GstVideoMeta *meta = gst_buffer_add_video_meta_full (buf, GST_VIDEO_FRAME_FLAG_NONE, GstVideoMeta *meta = gst_buffer_add_video_meta_full (buf, GST_VIDEO_FRAME_FLAG_NONE,
GST_VIDEO_INFO_FORMAT (info), GST_VIDEO_INFO_FORMAT (info),
GST_VIDEO_INFO_WIDTH (info), GST_VIDEO_INFO_WIDTH (info),
@ -671,7 +820,7 @@ static GstBuffer *dequeue_buffer(GstPipeWireSrc *pwsrc)
info->offset, info->offset,
info->stride); info->stride);
for (i = 0; i < MIN (b->buffer->n_datas, GST_VIDEO_MAX_PLANES); i++) { for (i = 0; i < MIN (n_datas, n_planes); i++) {
struct spa_data *d = &b->buffer->datas[i]; struct spa_data *d = &b->buffer->datas[i];
meta->offset[i] = video_size; meta->offset[i] = video_size;
meta->stride[i] = d->chunk->stride; meta->stride[i] = d->chunk->stride;
@ -680,6 +829,10 @@ static GstBuffer *dequeue_buffer(GstPipeWireSrc *pwsrc)
} }
} }
if (b->buffer->n_datas != gst_buffer_n_memory(data->buf)) {
GST_ERROR_OBJECT(pwsrc, "n_datas != n_memory, (%d != %d)", b->buffer->n_datas, gst_buffer_n_memory(data->buf));
}
for (i = 0; i < b->buffer->n_datas; i++) { for (i = 0; i < b->buffer->n_datas; i++) {
struct spa_data *d = &b->buffer->datas[i]; struct spa_data *d = &b->buffer->datas[i];
@ -730,13 +883,29 @@ on_state_changed (void *data,
enum pw_stream_state state, const char *error) enum pw_stream_state state, const char *error)
{ {
GstPipeWireSrc *pwsrc = data; GstPipeWireSrc *pwsrc = data;
GstState current_state = GST_ELEMENT_CAST (pwsrc)->current_state;
GST_DEBUG ("got stream state %s", pw_stream_state_as_string (state)); GST_DEBUG_OBJECT (pwsrc, "got stream state %s", pw_stream_state_as_string (state));
switch (state) { switch (state) {
case PW_STREAM_STATE_UNCONNECTED: case PW_STREAM_STATE_UNCONNECTED:
case PW_STREAM_STATE_CONNECTING: case PW_STREAM_STATE_CONNECTING:
break;
case PW_STREAM_STATE_PAUSED: case PW_STREAM_STATE_PAUSED:
/*
* We may see a driver/quantum/clock rate change on switching audio
* sources. The same is not applicable for video.
*
* We post the clock lost message here to take care of a possible
* jump or shift in base_time/clock for the pipeline. Application
* must handle the clock lost message in it's bus handler by pausing
* the pipeline and then setting it back to playing.
*/
if (current_state == GST_STATE_PLAYING && !pwsrc->is_video)
gst_element_post_message (GST_ELEMENT_CAST (pwsrc),
gst_message_new_clock_lost (GST_OBJECT_CAST (pwsrc),
GST_CLOCK_CAST (pwsrc->stream->clock)));
break;
case PW_STREAM_STATE_STREAMING: case PW_STREAM_STATE_STREAMING:
break; break;
case PW_STREAM_STATE_ERROR: case PW_STREAM_STATE_ERROR:
@ -982,7 +1151,7 @@ gst_pipewire_src_negotiate (GstBaseSrc * basesrc)
GST_DEBUG_OBJECT (basesrc, "connect capture with path %s, target-object %s", GST_DEBUG_OBJECT (basesrc, "connect capture with path %s, target-object %s",
pwsrc->stream->path, pwsrc->stream->target_object); pwsrc->stream->path, pwsrc->stream->target_object);
pwsrc->possible_caps = possible_caps; pwsrc->possible_caps = gst_caps_ref (possible_caps);
pwsrc->negotiated = FALSE; pwsrc->negotiated = FALSE;
enum pw_stream_flags flags; enum pw_stream_flags flags;
@ -1019,7 +1188,6 @@ gst_pipewire_src_negotiate (GstBaseSrc * basesrc)
} }
negotiated_caps = g_steal_pointer (&pwsrc->caps); negotiated_caps = g_steal_pointer (&pwsrc->caps);
pwsrc->possible_caps = NULL;
pw_thread_loop_unlock (pwsrc->stream->core->loop); pw_thread_loop_unlock (pwsrc->stream->core->loop);
if (negotiated_caps == NULL) if (negotiated_caps == NULL)
@ -1140,10 +1308,21 @@ handle_format_change (GstPipeWireSrc *pwsrc,
pw_stream_set_error (pwsrc->stream->pwstream, -EINVAL, "internal error"); pw_stream_set_error (pwsrc->stream->pwstream, -EINVAL, "internal error");
return; return;
} }
pwsrc->is_rawvideo = TRUE;
} else { } else {
gst_video_info_dma_drm_init (&pwsrc->drm_info); gst_video_info_dma_drm_init (&pwsrc->drm_info);
#endif #endif
gst_video_info_from_caps (&pwsrc->video_info, pwsrc->caps); gst_video_info_from_caps (&pwsrc->video_info, pwsrc->caps);
if (GST_VIDEO_FORMAT_INFO_IS_VALID_RAW (pwsrc->video_info.finfo)
#ifdef HAVE_GSTREAMER_DMA_DRM
&& GST_VIDEO_FORMAT_INFO_FORMAT (pwsrc->video_info.finfo) != GST_VIDEO_FORMAT_DMA_DRM
#endif
)
pwsrc->is_rawvideo = TRUE;
else
pwsrc->is_rawvideo = FALSE;
#ifdef HAVE_GSTREAMER_DMA_DRM #ifdef HAVE_GSTREAMER_DMA_DRM
} }
#endif #endif
@ -1156,6 +1335,7 @@ handle_format_change (GstPipeWireSrc *pwsrc,
} else { } else {
pwsrc->negotiated = FALSE; pwsrc->negotiated = FALSE;
pwsrc->is_video = FALSE; pwsrc->is_video = FALSE;
pwsrc->is_rawvideo = FALSE;
} }
if (pwsrc->caps) { if (pwsrc->caps) {
@ -1330,6 +1510,8 @@ gst_pipewire_src_create (GstPushSrc * psrc, GstBuffer ** buffer)
GstBuffer *buf; GstBuffer *buf;
gboolean update_time = FALSE, timeout = FALSE; gboolean update_time = FALSE, timeout = FALSE;
GstCaps *caps = NULL; GstCaps *caps = NULL;
struct timespec abstime = { 0, };
bool have_abstime = false;
pwsrc = GST_PIPEWIRE_SRC (psrc); pwsrc = GST_PIPEWIRE_SRC (psrc);
@ -1373,13 +1555,11 @@ gst_pipewire_src_create (GstPushSrc * psrc, GstBuffer ** buffer)
update_time = TRUE; update_time = TRUE;
GST_LOG_OBJECT (pwsrc, "EOS, send last buffer"); GST_LOG_OBJECT (pwsrc, "EOS, send last buffer");
break; break;
} else if (timeout) { } else if (timeout && pwsrc->last_buffer != NULL) {
if (pwsrc->last_buffer != NULL) { update_time = TRUE;
update_time = TRUE; buf = gst_buffer_ref(pwsrc->last_buffer);
buf = gst_buffer_ref(pwsrc->last_buffer); GST_LOG_OBJECT (pwsrc, "timeout, send keepalive buffer");
GST_LOG_OBJECT (pwsrc, "timeout, send keepalive buffer"); break;
break;
}
} else { } else {
buf = dequeue_buffer (pwsrc); buf = dequeue_buffer (pwsrc);
GST_LOG_OBJECT (pwsrc, "popped buffer %p", buf); GST_LOG_OBJECT (pwsrc, "popped buffer %p", buf);
@ -1391,9 +1571,13 @@ gst_pipewire_src_create (GstPushSrc * psrc, GstBuffer ** buffer)
} }
timeout = FALSE; timeout = FALSE;
if (pwsrc->keepalive_time > 0) { if (pwsrc->keepalive_time > 0) {
struct timespec abstime; if (!have_abstime) {
pw_thread_loop_get_time(pwsrc->stream->core->loop, &abstime, /* Record the time we want to timeout at once, for this loop -- the loop might get unrelated signal()s,
pwsrc->keepalive_time * SPA_NSEC_PER_MSEC); * and we don't want the keepalive time to get reset by that */
pw_thread_loop_get_time(pwsrc->stream->core->loop, &abstime,
pwsrc->keepalive_time * SPA_NSEC_PER_MSEC);
have_abstime = TRUE;
}
if (pw_thread_loop_timed_wait_full (pwsrc->stream->core->loop, &abstime) == -ETIMEDOUT) if (pw_thread_loop_timed_wait_full (pwsrc->stream->core->loop, &abstime) == -ETIMEDOUT)
timeout = TRUE; timeout = TRUE;
} else { } else {
@ -1464,6 +1648,7 @@ gst_pipewire_src_stop (GstBaseSrc * basesrc)
pwsrc->eos = false; pwsrc->eos = false;
gst_buffer_replace (&pwsrc->last_buffer, NULL); gst_buffer_replace (&pwsrc->last_buffer, NULL);
gst_caps_replace(&pwsrc->caps, NULL); gst_caps_replace(&pwsrc->caps, NULL);
gst_caps_replace(&pwsrc->possible_caps, NULL);
pwsrc->transform_value = UINT32_MAX; pwsrc->transform_value = UINT32_MAX;
pw_thread_loop_unlock (pwsrc->stream->core->loop); pw_thread_loop_unlock (pwsrc->stream->core->loop);

View file

@ -24,6 +24,22 @@ G_BEGIN_DECLS
#define GST_PIPEWIRE_SRC_CAST(obj) ((GstPipeWireSrc *) (obj)) #define GST_PIPEWIRE_SRC_CAST(obj) ((GstPipeWireSrc *) (obj))
G_DECLARE_FINAL_TYPE (GstPipeWireSrc, gst_pipewire_src, GST, PIPEWIRE_SRC, GstPushSrc) G_DECLARE_FINAL_TYPE (GstPipeWireSrc, gst_pipewire_src, GST, PIPEWIRE_SRC, GstPushSrc)
/**
* GstPipeWireSrcOnDisconnect:
* @GST_PIPEWIRE_SRC_ON_DISCONNECT_EOS: send EoS downstream
* @GST_PIPEWIRE_SRC_ON_DISCONNECT_ERROR: raise pipeline error
* @GST_PIPEWIRE_SRC_ON_DISCONNECT_NONE: no action
*
* Different actions on disconnect.
*/
typedef enum
{
GST_PIPEWIRE_SRC_ON_DISCONNECT_NONE,
GST_PIPEWIRE_SRC_ON_DISCONNECT_EOS,
GST_PIPEWIRE_SRC_ON_DISCONNECT_ERROR,
} GstPipeWireSrcOnDisconnect;
#define GST_TYPE_PIPEWIRE_SRC_ON_DISCONNECT (gst_pipewire_src_on_disconnect_get_type ())
/** /**
* GstPipeWireSrc: * GstPipeWireSrc:
@ -36,6 +52,7 @@ struct _GstPipeWireSrc {
GstPipeWireStream *stream; GstPipeWireStream *stream;
/*< private >*/ /*< private >*/
gint n_buffers;
gint use_bufferpool; gint use_bufferpool;
gint min_buffers; gint min_buffers;
gint max_buffers; gint max_buffers;
@ -47,6 +64,7 @@ struct _GstPipeWireSrc {
GstCaps *possible_caps; GstCaps *possible_caps;
gboolean is_video; gboolean is_video;
gboolean is_rawvideo;
GstVideoInfo video_info; GstVideoInfo video_info;
#ifdef HAVE_GSTREAMER_DMA_DRM #ifdef HAVE_GSTREAMER_DMA_DRM
GstVideoInfoDmaDrm drm_info; GstVideoInfoDmaDrm drm_info;
@ -56,6 +74,7 @@ struct _GstPipeWireSrc {
gboolean flushing; gboolean flushing;
gboolean started; gboolean started;
gboolean eos; gboolean eos;
gboolean flushing_on_remove_buffer;
gboolean is_live; gboolean is_live;
int64_t delay; int64_t delay;
@ -65,8 +84,12 @@ struct _GstPipeWireSrc {
GstBuffer *last_buffer; GstBuffer *last_buffer;
enum spa_meta_videotransform_value transform_value; enum spa_meta_videotransform_value transform_value;
GstPipeWireSrcOnDisconnect on_disconnect;
}; };
GType gst_pipewire_src_on_stream_disconnect_get_type (void);
G_END_DECLS G_END_DECLS
#endif /* __GST_PIPEWIRE_SRC_H__ */ #endif /* __GST_PIPEWIRE_SRC_H__ */

View file

@ -31,6 +31,7 @@ struct _GstPipeWireStream {
GstClock *clock; GstClock *clock;
guint64 position; guint64 position;
guint64 buf_duration;
struct spa_dll dll; struct spa_dll dll;
double err_avg, err_var, err_wdw; double err_avg, err_var, err_wdw;
guint64 last_ts; guint64 last_ts;
@ -41,6 +42,8 @@ struct _GstPipeWireStream {
struct pw_stream *pwstream; struct pw_stream *pwstream;
struct spa_hook pwstream_listener; struct spa_hook pwstream_listener;
struct spa_io_position *io_position;
/* common properties */ /* common properties */
int fd; int fd;
gchar *path; gchar *path;