From 0c40c014775e3e155ba1381308d8361dd660e1a6 Mon Sep 17 00:00:00 2001 From: George Kiagiadakis Date: Wed, 29 May 2024 17:56:38 +0300 Subject: [PATCH] gst: factor out the stream management and some common variables in a new class Construct this new class from both the src and sink to be able to share code Consolidate the previous mess of open/close/start/stop into a single pair of open/close functions in the new stream class --- src/gst/gstpipewiresink.c | 329 +++++++++------------------------- src/gst/gstpipewiresink.h | 20 +-- src/gst/gstpipewiresrc.c | 342 ++++++++++++------------------------ src/gst/gstpipewiresrc.h | 22 +-- src/gst/gstpipewirestream.c | 168 ++++++++++++++++++ src/gst/gstpipewirestream.h | 53 ++++++ src/gst/meson.build | 2 + 7 files changed, 428 insertions(+), 508 deletions(-) create mode 100644 src/gst/gstpipewirestream.c create mode 100644 src/gst/gstpipewirestream.h diff --git a/src/gst/gstpipewiresink.c b/src/gst/gstpipewiresink.c index d570bd4d3..47c7b447a 100644 --- a/src/gst/gstpipewiresink.c +++ b/src/gst/gstpipewiresink.c @@ -96,8 +96,6 @@ static GstCaps *gst_pipewire_sink_sink_fixate (GstBaseSink * bsink, static GstFlowReturn gst_pipewire_sink_render (GstBaseSink * psink, GstBuffer * buffer); -static gboolean gst_pipewire_sink_start (GstBaseSink * basesink); -static gboolean gst_pipewire_sink_stop (GstBaseSink * basesink); static GstClock * gst_pipewire_sink_provide_clock (GstElement * elem) @@ -109,8 +107,8 @@ gst_pipewire_sink_provide_clock (GstElement * elem) if (!GST_OBJECT_FLAG_IS_SET (pwsink, GST_ELEMENT_FLAG_PROVIDE_CLOCK)) goto clock_disabled; - if (pwsink->clock) - clock = GST_CLOCK_CAST (gst_object_ref (pwsink->clock)); + if (pwsink->stream->clock) + clock = GST_CLOCK_CAST (gst_object_ref (pwsink->stream->clock)); else clock = NULL; GST_OBJECT_UNLOCK (pwsink); @@ -131,17 +129,7 @@ gst_pipewire_sink_finalize (GObject * object) { GstPipeWireSink *pwsink = GST_PIPEWIRE_SINK (object); - g_object_unref (pwsink->pool); - - if (pwsink->stream_properties) - gst_structure_free (pwsink->stream_properties); - if (pwsink->client_properties) - gst_structure_free (pwsink->client_properties); - if (pwsink->clock) - gst_object_unref (pwsink->clock); - g_free (pwsink->path); - g_free (pwsink->target_object); - g_free (pwsink->client_name); + gst_clear_object (&pwsink->stream); G_OBJECT_CLASS (parent_class)->finalize (object); } @@ -151,7 +139,7 @@ gst_pipewire_sink_propose_allocation (GstBaseSink * bsink, GstQuery * query) { GstPipeWireSink *pwsink = GST_PIPEWIRE_SINK (bsink); - gst_query_add_allocation_pool (query, GST_BUFFER_POOL_CAST (pwsink->pool), 0, 0, 0); + gst_query_add_allocation_pool (query, GST_BUFFER_POOL_CAST (pwsink->stream->pool), 0, 0, 0); gst_query_add_allocation_meta (query, GST_VIDEO_META_API_TYPE, NULL); return TRUE; } @@ -249,8 +237,6 @@ gst_pipewire_sink_class_init (GstPipeWireSinkClass * klass) gstbasesink_class->set_caps = gst_pipewire_sink_setcaps; gstbasesink_class->fixate = gst_pipewire_sink_sink_fixate; gstbasesink_class->propose_allocation = gst_pipewire_sink_propose_allocation; - gstbasesink_class->start = gst_pipewire_sink_start; - gstbasesink_class->stop = gst_pipewire_sink_stop; gstbasesink_class->render = gst_pipewire_sink_render; GST_DEBUG_CATEGORY_INIT (pipewire_sink_debug, "pipewiresink", 0, @@ -301,22 +287,21 @@ pool_activated (GstPipeWirePool *pool, GstPipeWireSink *sink) SPA_PARAM_META_type, SPA_POD_Id(SPA_META_VideoCrop), SPA_PARAM_META_size, SPA_POD_Int(sizeof (struct spa_meta_region))); - pw_thread_loop_lock (sink->core->loop); - pw_stream_update_params (sink->stream, port_params, 3); - pw_thread_loop_unlock (sink->core->loop); + pw_thread_loop_lock (sink->stream->core->loop); + pw_stream_update_params (sink->stream->pwstream, port_params, 3); + pw_thread_loop_unlock (sink->stream->core->loop); } static void gst_pipewire_sink_init (GstPipeWireSink * sink) { - sink->pool = gst_pipewire_pool_new (); - sink->client_name = g_strdup(pw_get_client_name()); + sink->stream = gst_pipewire_stream_new (GST_ELEMENT (sink)); + sink->mode = DEFAULT_PROP_MODE; - sink->fd = -1; GST_OBJECT_FLAG_SET (sink, GST_ELEMENT_FLAG_PROVIDE_CLOCK); - g_signal_connect (sink->pool, "activated", G_CALLBACK (pool_activated), sink); + g_signal_connect (sink->stream->pool, "activated", G_CALLBACK (pool_activated), sink); } static GstCaps * @@ -378,31 +363,31 @@ gst_pipewire_sink_set_property (GObject * object, guint prop_id, switch (prop_id) { case PROP_PATH: - g_free (pwsink->path); - pwsink->path = g_value_dup_string (value); + g_free (pwsink->stream->path); + pwsink->stream->path = g_value_dup_string (value); break; case PROP_TARGET_OBJECT: - g_free (pwsink->target_object); - pwsink->target_object = g_value_dup_string (value); + g_free (pwsink->stream->target_object); + pwsink->stream->target_object = g_value_dup_string (value); break; case PROP_CLIENT_NAME: - g_free (pwsink->client_name); - pwsink->client_name = g_value_dup_string (value); + g_free (pwsink->stream->client_name); + pwsink->stream->client_name = g_value_dup_string (value); break; case PROP_CLIENT_PROPERTIES: - if (pwsink->client_properties) - gst_structure_free (pwsink->client_properties); - pwsink->client_properties = + if (pwsink->stream->client_properties) + gst_structure_free (pwsink->stream->client_properties); + pwsink->stream->client_properties = gst_structure_copy (gst_value_get_structure (value)); break; case PROP_STREAM_PROPERTIES: - if (pwsink->stream_properties) - gst_structure_free (pwsink->stream_properties); - pwsink->stream_properties = + if (pwsink->stream->stream_properties) + gst_structure_free (pwsink->stream->stream_properties); + pwsink->stream->stream_properties = gst_structure_copy (gst_value_get_structure (value)); break; @@ -411,7 +396,7 @@ gst_pipewire_sink_set_property (GObject * object, guint prop_id, break; case PROP_FD: - pwsink->fd = g_value_get_int (value); + pwsink->stream->fd = g_value_get_int (value); break; default: @@ -428,23 +413,23 @@ gst_pipewire_sink_get_property (GObject * object, guint prop_id, switch (prop_id) { case PROP_PATH: - g_value_set_string (value, pwsink->path); + g_value_set_string (value, pwsink->stream->path); break; case PROP_TARGET_OBJECT: - g_value_set_string (value, pwsink->target_object); + g_value_set_string (value, pwsink->stream->target_object); break; case PROP_CLIENT_NAME: - g_value_set_string (value, pwsink->client_name); + g_value_set_string (value, pwsink->stream->client_name); break; case PROP_CLIENT_PROPERTIES: - gst_value_set_structure (value, pwsink->client_properties); + gst_value_set_structure (value, pwsink->stream->client_properties); break; case PROP_STREAM_PROPERTIES: - gst_value_set_structure (value, pwsink->stream_properties); + gst_value_set_structure (value, pwsink->stream->stream_properties); break; case PROP_MODE: @@ -452,7 +437,7 @@ gst_pipewire_sink_get_property (GObject * object, guint prop_id, break; case PROP_FD: - g_value_set_int (value, pwsink->fd); + g_value_set_int (value, pwsink->stream->fd); break; default: @@ -466,7 +451,7 @@ on_add_buffer (void *_data, struct pw_buffer *b) { GstPipeWireSink *pwsink = _data; GST_DEBUG_OBJECT (pwsink, "add pw_buffer %p", b); - gst_pipewire_pool_wrap_buffer (pwsink->pool, b); + gst_pipewire_pool_wrap_buffer (pwsink->stream->pool, b); } static void @@ -474,10 +459,10 @@ on_remove_buffer (void *_data, struct pw_buffer *b) { GstPipeWireSink *pwsink = _data; GST_DEBUG_OBJECT (pwsink, "remove pw_buffer %p", b); - gst_pipewire_pool_remove_buffer (pwsink->pool, b); + gst_pipewire_pool_remove_buffer (pwsink->stream->pool, b); - if (!gst_pipewire_pool_has_buffers (pwsink->pool) && - !GST_BUFFER_POOL_IS_FLUSHING (GST_BUFFER_POOL_CAST (pwsink->pool))) { + if (!gst_pipewire_pool_has_buffers (pwsink->stream->pool) && + !GST_BUFFER_POOL_IS_FLUSHING (GST_BUFFER_POOL_CAST (pwsink->stream->pool))) { GST_ELEMENT_ERROR (pwsink, RESOURCE, NOT_FOUND, ("all buffers have been removed"), ("PipeWire link to remote node was destroyed")); @@ -520,7 +505,7 @@ do_send_buffer (GstPipeWireSink *pwsink, GstBuffer *buffer) GstMemory *mem = gst_buffer_peek_memory (buffer, i); d->chunk->offset = mem->offset; d->chunk->size = mem->size; - d->chunk->stride = pwsink->pool->video_info.stride[i]; + d->chunk->stride = pwsink->stream->pool->video_info.stride[i]; } GstVideoMeta *meta = gst_buffer_get_video_meta (buffer); @@ -539,7 +524,7 @@ do_send_buffer (GstPipeWireSink *pwsink, GstBuffer *buffer) } } - if ((res = pw_stream_queue_buffer (pwsink->stream, data->b)) < 0) { + if ((res = pw_stream_queue_buffer (pwsink->stream->pwstream, data->b)) < 0) { g_warning ("can't send buffer %s", spa_strerror(res)); } } @@ -550,7 +535,7 @@ on_process (void *data) { GstPipeWireSink *pwsink = data; GST_DEBUG_OBJECT (pwsink, "signal"); - g_cond_signal (&pwsink->pool->cond); + g_cond_signal (&pwsink->stream->pool->cond); } static void @@ -566,20 +551,20 @@ on_state_changed (void *data, enum pw_stream_state old, enum pw_stream_state sta case PW_STREAM_STATE_PAUSED: break; case PW_STREAM_STATE_STREAMING: - if (pw_stream_is_driving (pwsink->stream)) - pw_stream_trigger_process (pwsink->stream); + if (pw_stream_is_driving (pwsink->stream->pwstream)) + pw_stream_trigger_process (pwsink->stream->pwstream); break; case PW_STREAM_STATE_ERROR: /* make the error permanent, if it is not already; pw_stream_set_error() will recursively call us again */ - if (pw_stream_get_state (pwsink->stream, NULL) != PW_STREAM_STATE_ERROR) - pw_stream_set_error (pwsink->stream, -EPIPE, "%s", error); + if (pw_stream_get_state (pwsink->stream->pwstream, NULL) != PW_STREAM_STATE_ERROR) + pw_stream_set_error (pwsink->stream->pwstream, -EPIPE, "%s", error); else GST_ELEMENT_ERROR (pwsink, RESOURCE, FAILED, ("stream error: %s", error), (NULL)); break; } - pw_thread_loop_signal (pwsink->core->loop, FALSE); + pw_thread_loop_signal (pwsink->stream->core->loop, FALSE); } static void @@ -590,8 +575,8 @@ on_param_changed (void *data, uint32_t id, const struct spa_pod *param) if (param == NULL || id != SPA_PARAM_Format) return; - if (gst_buffer_pool_is_active (GST_BUFFER_POOL_CAST (pwsink->pool))) - pool_activated (pwsink->pool, pwsink); + if (gst_buffer_pool_is_active (GST_BUFFER_POOL_CAST (pwsink->stream->pool))) + pool_activated (pwsink->stream->pool, pwsink); } static gboolean @@ -618,8 +603,8 @@ gst_pipewire_sink_setcaps (GstBaseSink * bsink, GstCaps * caps) possible = gst_caps_to_format_all (caps); - pw_thread_loop_lock (pwsink->core->loop); - state = pw_stream_get_state (pwsink->stream, &error); + pw_thread_loop_lock (pwsink->stream->core->loop); + state = pw_stream_get_state (pwsink->stream->pwstream, &error); if (state == PW_STREAM_STATE_ERROR) goto start_error; @@ -637,19 +622,19 @@ gst_pipewire_sink_setcaps (GstBaseSink * bsink, GstCaps * caps) else flags |= PW_STREAM_FLAG_DRIVER; - target_id = pwsink->path ? (uint32_t)atoi(pwsink->path) : PW_ID_ANY; + target_id = pwsink->stream->path ? (uint32_t)atoi(pwsink->stream->path) : PW_ID_ANY; - if (pwsink->target_object) { + if (pwsink->stream->target_object) { uint64_t serial; - items[n_items++] = SPA_DICT_ITEM_INIT(PW_KEY_TARGET_OBJECT, pwsink->target_object); + items[n_items++] = SPA_DICT_ITEM_INIT(PW_KEY_TARGET_OBJECT, pwsink->stream->target_object); /* If target.object is a name, set it also to node.target */ - if (!spa_atou64(pwsink->target_object, &serial, 0)) { + if (!spa_atou64(pwsink->stream->target_object, &serial, 0)) { target_id = PW_ID_ANY; /* XXX deprecated but the portal and some example apps only * provide the object id */ - items[n_items++] = SPA_DICT_ITEM_INIT(PW_KEY_NODE_TARGET, pwsink->target_object); + items[n_items++] = SPA_DICT_ITEM_INIT(PW_KEY_NODE_TARGET, pwsink->stream->target_object); } } if (rate != 0) { @@ -657,20 +642,20 @@ gst_pipewire_sink_setcaps (GstBaseSink * bsink, GstCaps * caps) items[n_items++] = SPA_DICT_ITEM_INIT(PW_KEY_NODE_RATE, buf); } if (n_items > 0) - pw_stream_update_properties (pwsink->stream, &SPA_DICT_INIT(items, n_items)); + pw_stream_update_properties (pwsink->stream->pwstream, &SPA_DICT_INIT(items, n_items)); - pw_stream_connect (pwsink->stream, + pw_stream_connect (pwsink->stream->pwstream, PW_DIRECTION_OUTPUT, target_id, flags, (const struct spa_pod **) possible->pdata, possible->len); - pw_thread_loop_get_time (pwsink->core->loop, &abstime, + pw_thread_loop_get_time (pwsink->stream->core->loop, &abstime, GST_PIPEWIRE_DEFAULT_TIMEOUT * SPA_NSEC_PER_SEC); while (TRUE) { - state = pw_stream_get_state (pwsink->stream, &error); + state = pw_stream_get_state (pwsink->stream->pwstream, &error); if (state >= PW_STREAM_STATE_PAUSED) break; @@ -678,7 +663,7 @@ gst_pipewire_sink_setcaps (GstBaseSink * bsink, GstCaps * caps) if (state == PW_STREAM_STATE_ERROR) goto start_error; - if (pw_thread_loop_timed_wait_full (pwsink->core->loop, &abstime) < 0) { + if (pw_thread_loop_timed_wait_full (pwsink->stream->core->loop, &abstime) < 0) { error = "timeout"; goto start_error; } @@ -686,14 +671,14 @@ gst_pipewire_sink_setcaps (GstBaseSink * bsink, GstCaps * caps) } res = TRUE; - gst_pipewire_clock_reset (GST_PIPEWIRE_CLOCK (pwsink->clock), 0); + gst_pipewire_clock_reset (GST_PIPEWIRE_CLOCK (pwsink->stream->clock), 0); - config = gst_buffer_pool_get_config (GST_BUFFER_POOL_CAST (pwsink->pool)); + config = gst_buffer_pool_get_config (GST_BUFFER_POOL_CAST (pwsink->stream->pool)); gst_buffer_pool_config_get_params (config, NULL, &size, &min_buffers, &max_buffers); gst_buffer_pool_config_set_params (config, caps, size, min_buffers, max_buffers); - gst_buffer_pool_set_config (GST_BUFFER_POOL_CAST (pwsink->pool), config); + gst_buffer_pool_set_config (GST_BUFFER_POOL_CAST (pwsink->stream->pool), config); - pw_thread_loop_unlock (pwsink->core->loop); + pw_thread_loop_unlock (pwsink->stream->core->loop); pwsink->negotiated = res; @@ -702,7 +687,7 @@ gst_pipewire_sink_setcaps (GstBaseSink * bsink, GstCaps * caps) start_error: { GST_ERROR_OBJECT (pwsink, "could not start stream: %s", error); - pw_thread_loop_unlock (pwsink->core->loop); + pw_thread_loop_unlock (pwsink->stream->core->loop); return FALSE; } } @@ -720,13 +705,13 @@ gst_pipewire_sink_render (GstBaseSink * bsink, GstBuffer * buffer) if (!pwsink->negotiated) goto not_negotiated; - if (buffer->pool != GST_BUFFER_POOL_CAST (pwsink->pool) && - !gst_buffer_pool_is_active (GST_BUFFER_POOL_CAST (pwsink->pool))) { + if (buffer->pool != GST_BUFFER_POOL_CAST (pwsink->stream->pool) && + !gst_buffer_pool_is_active (GST_BUFFER_POOL_CAST (pwsink->stream->pool))) { GstStructure *config; GstCaps *caps; guint size, min_buffers, max_buffers; - config = gst_buffer_pool_get_config (GST_BUFFER_POOL_CAST (pwsink->pool)); + config = gst_buffer_pool_get_config (GST_BUFFER_POOL_CAST (pwsink->stream->pool)); gst_buffer_pool_config_get_params (config, &caps, &size, &min_buffers, &max_buffers); if (size == 0) { @@ -736,23 +721,23 @@ gst_pipewire_sink_render (GstBaseSink * bsink, GstBuffer * buffer) } gst_buffer_pool_config_set_params (config, caps, size, min_buffers, max_buffers); - gst_buffer_pool_set_config (GST_BUFFER_POOL_CAST (pwsink->pool), config); + gst_buffer_pool_set_config (GST_BUFFER_POOL_CAST (pwsink->stream->pool), config); - gst_buffer_pool_set_active (GST_BUFFER_POOL_CAST (pwsink->pool), TRUE); + gst_buffer_pool_set_active (GST_BUFFER_POOL_CAST (pwsink->stream->pool), TRUE); } - pw_thread_loop_lock (pwsink->core->loop); - if (pw_stream_get_state (pwsink->stream, &error) != PW_STREAM_STATE_STREAMING) + pw_thread_loop_lock (pwsink->stream->core->loop); + if (pw_stream_get_state (pwsink->stream->pwstream, &error) != PW_STREAM_STATE_STREAMING) goto done_unlock; - if (buffer->pool != GST_BUFFER_POOL_CAST (pwsink->pool)) { + if (buffer->pool != GST_BUFFER_POOL_CAST (pwsink->stream->pool)) { GstBuffer *b = NULL; GstMapInfo info = { 0, }; GstBufferPoolAcquireParams params = { 0, }; - pw_thread_loop_unlock (pwsink->core->loop); + pw_thread_loop_unlock (pwsink->stream->core->loop); - if ((res = gst_buffer_pool_acquire_buffer (GST_BUFFER_POOL_CAST (pwsink->pool), &b, ¶ms)) != GST_FLOW_OK) + if ((res = gst_buffer_pool_acquire_buffer (GST_BUFFER_POOL_CAST (pwsink->stream->pool), &b, ¶ms)) != GST_FLOW_OK) goto done; gst_buffer_map (b, &info, GST_MAP_WRITE); @@ -763,8 +748,8 @@ gst_pipewire_sink_render (GstBaseSink * bsink, GstBuffer * buffer) buffer = b; unref_buffer = TRUE; - pw_thread_loop_lock (pwsink->core->loop); - if (pw_stream_get_state (pwsink->stream, &error) != PW_STREAM_STATE_STREAMING) + pw_thread_loop_lock (pwsink->stream->core->loop); + if (pw_stream_get_state (pwsink->stream->pwstream, &error) != PW_STREAM_STATE_STREAMING) goto done_unlock; } @@ -772,11 +757,11 @@ gst_pipewire_sink_render (GstBaseSink * bsink, GstBuffer * buffer) if (unref_buffer) gst_buffer_unref (buffer); - if (pw_stream_is_driving (pwsink->stream)) - pw_stream_trigger_process (pwsink->stream); + if (pw_stream_is_driving (pwsink->stream->pwstream)) + pw_stream_trigger_process (pwsink->stream->pwstream); done_unlock: - pw_thread_loop_unlock (pwsink->core->loop); + pw_thread_loop_unlock (pwsink->stream->core->loop); done: return res; @@ -786,26 +771,6 @@ not_negotiated: } } -static gboolean -copy_properties (GQuark field_id, - const GValue *value, - gpointer user_data) -{ - struct pw_properties *properties = user_data; - GValue dst = { 0 }; - - if (g_value_type_transformable (G_VALUE_TYPE(value), G_TYPE_STRING)) { - g_value_init(&dst, G_TYPE_STRING); - if (g_value_transform(value, &dst)) { - pw_properties_set (properties, - g_quark_to_string (field_id), - g_value_get_string (&dst)); - } - g_value_unset(&dst); - } - return TRUE; -} - static const struct pw_stream_events stream_events = { PW_VERSION_STREAM_EVENTS, .state_changed = on_state_changed, @@ -815,127 +780,6 @@ static const struct pw_stream_events stream_events = { .process = on_process, }; -static gboolean -gst_pipewire_sink_start (GstBaseSink * basesink) -{ - GstPipeWireSink *pwsink = GST_PIPEWIRE_SINK (basesink); - struct pw_properties *props; - - pwsink->negotiated = FALSE; - - pw_thread_loop_lock (pwsink->core->loop); - - props = pw_properties_new (NULL, NULL); - if (pwsink->client_name) { - pw_properties_set (props, PW_KEY_NODE_NAME, pwsink->client_name); - pw_properties_set (props, PW_KEY_NODE_DESCRIPTION, pwsink->client_name); - } - if (pwsink->stream_properties) { - gst_structure_foreach (pwsink->stream_properties, copy_properties, props); - } - - if ((pwsink->stream = pw_stream_new (pwsink->core->core, pwsink->client_name, props)) == NULL) - goto no_stream; - - pwsink->pool->stream = pwsink->stream; - - pw_stream_add_listener(pwsink->stream, - &pwsink->stream_listener, - &stream_events, - pwsink); - - pwsink->clock = gst_pipewire_clock_new (pwsink->stream, pwsink->last_time); - pw_thread_loop_unlock (pwsink->core->loop); - - return TRUE; - -no_stream: - { - GST_ELEMENT_ERROR (pwsink, RESOURCE, FAILED, ("can't create stream"), (NULL)); - pw_thread_loop_unlock (pwsink->core->loop); - return FALSE; - } -} - -static gboolean -gst_pipewire_sink_stop (GstBaseSink * basesink) -{ - GstPipeWireSink *pwsink = GST_PIPEWIRE_SINK (basesink); - - pw_thread_loop_lock (pwsink->core->loop); - if (pwsink->stream) { - pw_stream_destroy (pwsink->stream); - pwsink->stream = NULL; - pwsink->pool->stream = NULL; - } - pw_thread_loop_unlock (pwsink->core->loop); - - pwsink->negotiated = FALSE; - - return TRUE; -} - -static gboolean -gst_pipewire_sink_open (GstPipeWireSink * pwsink) -{ - struct pw_properties *props; - - GST_DEBUG_OBJECT (pwsink, "open"); - - pwsink->core = gst_pipewire_core_get(pwsink->fd); - if (pwsink->core == NULL) - goto connect_error; - - pw_thread_loop_lock (pwsink->core->loop); - - props = pw_properties_new (NULL, NULL); - if (pwsink->client_properties) { - gst_structure_foreach (pwsink->client_properties, copy_properties, props); - pw_core_update_properties (pwsink->core->core, &props->dict); - } - pw_properties_free(props); - pw_thread_loop_unlock (pwsink->core->loop); - - return TRUE; - - /* ERRORS */ -connect_error: - { - GST_ELEMENT_ERROR (pwsink, RESOURCE, FAILED, - ("Failed to connect"), (NULL)); - return FALSE; - } -} - -static gboolean -gst_pipewire_sink_close (GstPipeWireSink * pwsink) -{ - pwsink->last_time = gst_clock_get_time (pwsink->clock); - - GST_DEBUG_OBJECT (pwsink, "close"); - - gst_element_post_message (GST_ELEMENT (pwsink), - gst_message_new_clock_lost (GST_OBJECT_CAST (pwsink), pwsink->clock)); - - GST_OBJECT_LOCK (pwsink); - GST_PIPEWIRE_CLOCK (pwsink->clock)->stream = NULL; - g_clear_object (&pwsink->clock); - GST_OBJECT_UNLOCK (pwsink); - - pw_thread_loop_lock (pwsink->core->loop); - if (pwsink->stream) { - pw_stream_destroy (pwsink->stream); - pwsink->stream = NULL; - } - pw_thread_loop_unlock (pwsink->core->loop); - - if (pwsink->core) { - gst_pipewire_core_release (pwsink->core); - pwsink->core = NULL; - } - return TRUE; -} - static GstStateChangeReturn gst_pipewire_sink_change_state (GstElement * element, GstStateChange transition) { @@ -944,24 +788,24 @@ gst_pipewire_sink_change_state (GstElement * element, GstStateChange transition) switch (transition) { case GST_STATE_CHANGE_NULL_TO_READY: - if (!gst_pipewire_sink_open (this)) + if (!gst_pipewire_stream_open (this->stream, &stream_events)) goto open_failed; break; case GST_STATE_CHANGE_READY_TO_PAUSED: break; case GST_STATE_CHANGE_PAUSED_TO_PLAYING: /* uncork and start play */ - pw_thread_loop_lock (this->core->loop); - pw_stream_set_active(this->stream, true); - pw_thread_loop_unlock (this->core->loop); - gst_buffer_pool_set_flushing(GST_BUFFER_POOL_CAST(this->pool), FALSE); + pw_thread_loop_lock (this->stream->core->loop); + pw_stream_set_active(this->stream->pwstream, true); + pw_thread_loop_unlock (this->stream->core->loop); + gst_buffer_pool_set_flushing(GST_BUFFER_POOL_CAST(this->stream->pool), FALSE); break; case GST_STATE_CHANGE_PLAYING_TO_PAUSED: /* stop play ASAP by corking */ - pw_thread_loop_lock (this->core->loop); - pw_stream_set_active(this->stream, false); - pw_thread_loop_unlock (this->core->loop); - gst_buffer_pool_set_flushing(GST_BUFFER_POOL_CAST(this->pool), TRUE); + pw_thread_loop_lock (this->stream->core->loop); + pw_stream_set_active(this->stream->pwstream, false); + pw_thread_loop_unlock (this->stream->core->loop); + gst_buffer_pool_set_flushing(GST_BUFFER_POOL_CAST(this->stream->pool), TRUE); break; default: break; @@ -973,10 +817,11 @@ gst_pipewire_sink_change_state (GstElement * element, GstStateChange transition) case GST_STATE_CHANGE_PLAYING_TO_PAUSED: break; case GST_STATE_CHANGE_PAUSED_TO_READY: - gst_buffer_pool_set_active(GST_BUFFER_POOL_CAST(this->pool), FALSE); + gst_buffer_pool_set_active(GST_BUFFER_POOL_CAST(this->stream->pool), FALSE); + this->negotiated = FALSE; break; case GST_STATE_CHANGE_READY_TO_NULL: - gst_pipewire_sink_close (this); + gst_pipewire_stream_close (this->stream); break; default: break; diff --git a/src/gst/gstpipewiresink.h b/src/gst/gstpipewiresink.h index 50329947e..b9d128b35 100644 --- a/src/gst/gstpipewiresink.h +++ b/src/gst/gstpipewiresink.h @@ -5,6 +5,8 @@ #ifndef __GST_PIPEWIRE_SINK_H__ #define __GST_PIPEWIRE_SINK_H__ +#include "gstpipewirestream.h" + #include #include @@ -57,28 +59,12 @@ struct _GstPipeWireSink { GstBaseSink element; /*< private >*/ - gchar *path; - gchar *target_object; - gchar *client_name; - int fd; + GstPipeWireStream *stream; /* video state */ gboolean negotiated; - GstPipeWireCore *core; - struct spa_hook core_listener; - GstStructure *client_properties; - - struct pw_stream *stream; - struct spa_hook stream_listener; - - GstStructure *stream_properties; GstPipeWireSinkMode mode; - - GstPipeWirePool *pool; - - GstClock *clock; - GstClockTime last_time; }; struct _GstPipeWireSinkClass { diff --git a/src/gst/gstpipewiresrc.c b/src/gst/gstpipewiresrc.c index 5336fbc80..a90fe2043 100644 --- a/src/gst/gstpipewiresrc.c +++ b/src/gst/gstpipewiresrc.c @@ -101,31 +101,31 @@ gst_pipewire_src_set_property (GObject * object, guint prop_id, switch (prop_id) { case PROP_PATH: - g_free (pwsrc->path); - pwsrc->path = g_value_dup_string (value); + g_free (pwsrc->stream->path); + pwsrc->stream->path = g_value_dup_string (value); break; case PROP_TARGET_OBJECT: - g_free (pwsrc->target_object); - pwsrc->target_object = g_value_dup_string (value); + g_free (pwsrc->stream->target_object); + pwsrc->stream->target_object = g_value_dup_string (value); break; case PROP_CLIENT_NAME: - g_free (pwsrc->client_name); - pwsrc->client_name = g_value_dup_string (value); + g_free (pwsrc->stream->client_name); + pwsrc->stream->client_name = g_value_dup_string (value); break; case PROP_CLIENT_PROPERTIES: - if (pwsrc->client_properties) - gst_structure_free (pwsrc->client_properties); - pwsrc->client_properties = + if (pwsrc->stream->client_properties) + gst_structure_free (pwsrc->stream->client_properties); + pwsrc->stream->client_properties = gst_structure_copy (gst_value_get_structure (value)); break; case PROP_STREAM_PROPERTIES: - if (pwsrc->stream_properties) - gst_structure_free (pwsrc->stream_properties); - pwsrc->stream_properties = + if (pwsrc->stream->stream_properties) + gst_structure_free (pwsrc->stream->stream_properties); + pwsrc->stream->stream_properties = gst_structure_copy (gst_value_get_structure (value)); break; @@ -142,7 +142,7 @@ gst_pipewire_src_set_property (GObject * object, guint prop_id, break; case PROP_FD: - pwsrc->fd = g_value_get_int (value); + pwsrc->stream->fd = g_value_get_int (value); break; case PROP_RESEND_LAST: @@ -171,23 +171,23 @@ gst_pipewire_src_get_property (GObject * object, guint prop_id, switch (prop_id) { case PROP_PATH: - g_value_set_string (value, pwsrc->path); + g_value_set_string (value, pwsrc->stream->path); break; case PROP_TARGET_OBJECT: - g_value_set_string (value, pwsrc->target_object); + g_value_set_string (value, pwsrc->stream->target_object); break; case PROP_CLIENT_NAME: - g_value_set_string (value, pwsrc->client_name); + g_value_set_string (value, pwsrc->stream->client_name); break; case PROP_CLIENT_PROPERTIES: - gst_value_set_structure (value, pwsrc->client_properties); + gst_value_set_structure (value, pwsrc->stream->client_properties); break; case PROP_STREAM_PROPERTIES: - gst_value_set_structure (value, pwsrc->stream_properties); + gst_value_set_structure (value, pwsrc->stream->stream_properties); break; case PROP_ALWAYS_COPY: @@ -203,7 +203,7 @@ gst_pipewire_src_get_property (GObject * object, guint prop_id, break; case PROP_FD: - g_value_set_int (value, pwsrc->fd); + g_value_set_int (value, pwsrc->stream->fd); break; case PROP_RESEND_LAST: @@ -234,8 +234,8 @@ gst_pipewire_src_provide_clock (GstElement * elem) if (!GST_OBJECT_FLAG_IS_SET (pwsrc, GST_ELEMENT_FLAG_PROVIDE_CLOCK)) goto clock_disabled; - if (pwsrc->clock && pwsrc->is_live) - clock = GST_CLOCK_CAST (gst_object_ref (pwsrc->clock)); + if (pwsrc->stream->clock && pwsrc->is_live) + clock = GST_CLOCK_CAST (gst_object_ref (pwsrc->stream->clock)); else clock = NULL; GST_OBJECT_UNLOCK (pwsrc); @@ -256,16 +256,7 @@ gst_pipewire_src_finalize (GObject * object) { GstPipeWireSrc *pwsrc = GST_PIPEWIRE_SRC (object); - if (pwsrc->stream_properties) - gst_structure_free (pwsrc->stream_properties); - if (pwsrc->client_properties) - gst_structure_free (pwsrc->client_properties); - if (pwsrc->clock) - gst_object_unref (pwsrc->clock); - g_free (pwsrc->path); - g_free (pwsrc->target_object); - g_free (pwsrc->client_name); - g_object_unref(pwsrc->pool); + gst_clear_object (&pwsrc->stream); G_OBJECT_CLASS (parent_class)->finalize (object); } @@ -434,19 +425,16 @@ gst_pipewire_src_init (GstPipeWireSrc * src) GST_OBJECT_FLAG_SET (src, GST_ELEMENT_FLAG_PROVIDE_CLOCK); + src->stream = gst_pipewire_stream_new (GST_ELEMENT (src)); + src->always_copy = DEFAULT_ALWAYS_COPY; src->min_buffers = DEFAULT_MIN_BUFFERS; src->max_buffers = DEFAULT_MAX_BUFFERS; - src->fd = -1; src->resend_last = DEFAULT_RESEND_LAST; src->keepalive_time = DEFAULT_KEEPALIVE_TIME; src->autoconnect = DEFAULT_AUTOCONNECT; src->min_latency = 0; src->max_latency = GST_CLOCK_TIME_NONE; - - src->client_name = g_strdup(pw_get_client_name ()); - - src->pool = gst_pipewire_pool_new (); } static gboolean @@ -467,9 +455,9 @@ buffer_recycle (GstMiniObject *obj) GST_BUFFER_FLAGS (obj) = data->flags; src = data->owner; - pw_thread_loop_lock (src->core->loop); + pw_thread_loop_lock (src->stream->core->loop); if (!obj->dispose) { - pw_thread_loop_unlock (src->core->loop); + pw_thread_loop_unlock (src->stream->core->loop); GST_OBJECT_UNLOCK (data->pool); return TRUE; } @@ -478,12 +466,12 @@ buffer_recycle (GstMiniObject *obj) data->queued = TRUE; - if ((res = pw_stream_queue_buffer (src->stream, data->b)) < 0) + if ((res = pw_stream_queue_buffer (src->stream->pwstream, data->b)) < 0) GST_WARNING_OBJECT (src, "can't queue recycled buffer %p, %s", obj, spa_strerror(res)); else GST_LOG_OBJECT (src, "recycle buffer %p", obj); - pw_thread_loop_unlock (src->core->loop); + pw_thread_loop_unlock (src->stream->core->loop); GST_OBJECT_UNLOCK (data->pool); @@ -496,7 +484,7 @@ on_add_buffer (void *_data, struct pw_buffer *b) GstPipeWireSrc *pwsrc = _data; GstPipeWirePoolData *data; - gst_pipewire_pool_wrap_buffer (pwsrc->pool, b); + gst_pipewire_pool_wrap_buffer (pwsrc->stream->pool, b); data = b->user_data; GST_DEBUG_OBJECT (pwsrc, "add buffer %p", data->buf); data->owner = pwsrc; @@ -519,7 +507,7 @@ on_remove_buffer (void *_data, struct pw_buffer *b) if (data->queued) { gst_buffer_unref (buf); } else { - if ((res = pw_stream_queue_buffer (pwsrc->stream, b)) < 0) + if ((res = pw_stream_queue_buffer (pwsrc->stream->pwstream, b)) < 0) GST_WARNING_OBJECT (pwsrc, "can't queue removed buffer %p, %s", buf, spa_strerror(res)); } } @@ -554,7 +542,7 @@ static GstBuffer *dequeue_buffer(GstPipeWireSrc *pwsrc) struct pw_time time; guint i; - b = pw_stream_dequeue_buffer (pwsrc->stream); + b = pw_stream_dequeue_buffer (pwsrc->stream->pwstream); if (b == NULL) return NULL; @@ -570,7 +558,7 @@ static GstBuffer *dequeue_buffer(GstPipeWireSrc *pwsrc) return NULL; } - pw_stream_get_time_n(pwsrc->stream, &time, sizeof(time)); + pw_stream_get_time_n(pwsrc->stream->pwstream, &time, sizeof(time)); if (pwsrc->delay != time.delay && time.rate.denom != 0) { pwsrc->min_latency = time.delay * GST_SECOND * time.rate.num / time.rate.denom; @@ -689,7 +677,7 @@ static void on_process (void *_data) { GstPipeWireSrc *pwsrc = _data; - pw_thread_loop_signal (pwsrc->core->loop, FALSE); + pw_thread_loop_signal (pwsrc->stream->core->loop, FALSE); } static void @@ -710,14 +698,14 @@ on_state_changed (void *data, case PW_STREAM_STATE_ERROR: /* make the error permanent, if it is not already; pw_stream_set_error() will recursively call us again */ - if (pw_stream_get_state (pwsrc->stream, NULL) != PW_STREAM_STATE_ERROR) - pw_stream_set_error (pwsrc->stream, -EPIPE, "%s", error); + if (pw_stream_get_state (pwsrc->stream->pwstream, NULL) != PW_STREAM_STATE_ERROR) + pw_stream_set_error (pwsrc->stream->pwstream, -EPIPE, "%s", error); else GST_ELEMENT_ERROR (pwsrc, RESOURCE, FAILED, ("stream error: %s", error), (NULL)); break; } - pw_thread_loop_signal (pwsrc->core->loop, FALSE); + pw_thread_loop_signal (pwsrc->stream->core->loop, FALSE); } static void @@ -742,14 +730,14 @@ gst_pipewire_src_stream_start (GstPipeWireSrc *pwsrc) const char *error = NULL; struct timespec abstime; - pw_thread_loop_lock (pwsrc->core->loop); + pw_thread_loop_lock (pwsrc->stream->core->loop); GST_DEBUG_OBJECT (pwsrc, "doing stream start"); - pw_thread_loop_get_time (pwsrc->core->loop, &abstime, + pw_thread_loop_get_time (pwsrc->stream->core->loop, &abstime, GST_PIPEWIRE_DEFAULT_TIMEOUT * SPA_NSEC_PER_SEC); while (TRUE) { - enum pw_stream_state state = pw_stream_get_state (pwsrc->stream, &error); + enum pw_stream_state state = pw_stream_get_state (pwsrc->stream->pwstream, &error); GST_DEBUG_OBJECT (pwsrc, "waiting for STREAMING, now %s", pw_stream_state_as_string (state)); if (state == PW_STREAM_STATE_STREAMING) @@ -763,25 +751,25 @@ gst_pipewire_src_stream_start (GstPipeWireSrc *pwsrc) goto start_error; } - if (pw_thread_loop_timed_wait_full (pwsrc->core->loop, &abstime) < 0) { + if (pw_thread_loop_timed_wait_full (pwsrc->stream->core->loop, &abstime) < 0) { error = "timeout"; goto start_error; } } - parse_stream_properties (pwsrc, pw_stream_get_properties (pwsrc->stream)); + parse_stream_properties (pwsrc, pw_stream_get_properties (pwsrc->stream->pwstream)); GST_DEBUG_OBJECT (pwsrc, "signal started"); pwsrc->started = TRUE; - pw_thread_loop_signal (pwsrc->core->loop, FALSE); - pw_thread_loop_unlock (pwsrc->core->loop); + pw_thread_loop_signal (pwsrc->stream->core->loop, FALSE); + pw_thread_loop_unlock (pwsrc->stream->core->loop); return TRUE; start_error: { GST_DEBUG_OBJECT (pwsrc, "error starting stream: %s", error); - pw_thread_loop_signal (pwsrc->core->loop, FALSE); - pw_thread_loop_unlock (pwsrc->core->loop); + pw_thread_loop_signal (pwsrc->stream->core->loop, FALSE); + pw_thread_loop_unlock (pwsrc->stream->core->loop); return FALSE; } } @@ -793,13 +781,13 @@ wait_started (GstPipeWireSrc *this) const char *error = NULL; struct timespec abstime; - pw_thread_loop_lock (this->core->loop); + pw_thread_loop_lock (this->stream->core->loop); - pw_thread_loop_get_time (this->core->loop, &abstime, + pw_thread_loop_get_time (this->stream->core->loop, &abstime, GST_PIPEWIRE_DEFAULT_TIMEOUT * SPA_NSEC_PER_SEC); while (TRUE) { - state = pw_stream_get_state (this->stream, &error); + state = pw_stream_get_state (this->stream->pwstream, &error); GST_DEBUG_OBJECT (this, "waiting for started signal, state now %s", pw_stream_state_as_string (state)); @@ -815,19 +803,19 @@ wait_started (GstPipeWireSrc *this) break; if (this->autoconnect) { - if (pw_thread_loop_timed_wait_full (this->core->loop, &abstime) < 0) { + if (pw_thread_loop_timed_wait_full (this->stream->core->loop, &abstime) < 0) { state = PW_STREAM_STATE_ERROR; break; } } else { - pw_thread_loop_wait (this->core->loop); + pw_thread_loop_wait (this->stream->core->loop); } prev_state = state; } GST_DEBUG_OBJECT (this, "got started signal: %s", pw_stream_state_as_string (state)); - pw_thread_loop_unlock (this->core->loop); + pw_thread_loop_unlock (this->stream->core->loop); return state; } @@ -875,7 +863,7 @@ gst_pipewire_src_negotiate (GstBaseSrc * basesrc) GST_DEBUG_OBJECT (basesrc, "have common caps (sanitized): %" GST_PTR_FORMAT, possible_caps); - if (pw_stream_get_state(pwsrc->stream, NULL) == PW_STREAM_STATE_STREAMING) { + if (pw_stream_get_state(pwsrc->stream->pwstream, NULL) == PW_STREAM_STATE_STREAMING) { g_autoptr (GstCaps) current_caps = NULL; g_autoptr (GstCaps) preferred_new_caps = NULL; @@ -894,12 +882,12 @@ gst_pipewire_src_negotiate (GstBaseSrc * basesrc) possible = gst_caps_to_format_all (possible_caps); /* first disconnect */ - pw_thread_loop_lock (pwsrc->core->loop); - if (pw_stream_get_state(pwsrc->stream, &error) != PW_STREAM_STATE_UNCONNECTED) { + pw_thread_loop_lock (pwsrc->stream->core->loop); + if (pw_stream_get_state(pwsrc->stream->pwstream, &error) != PW_STREAM_STATE_UNCONNECTED) { GST_DEBUG_OBJECT (basesrc, "disconnect capture"); - pw_stream_disconnect (pwsrc->stream); + pw_stream_disconnect (pwsrc->stream->pwstream); while (TRUE) { - enum pw_stream_state state = pw_stream_get_state (pwsrc->stream, &error); + enum pw_stream_state state = pw_stream_get_state (pwsrc->stream->pwstream, &error); GST_DEBUG_OBJECT (basesrc, "waiting for UNCONNECTED, now %s", pw_stream_state_as_string (state)); if (state == PW_STREAM_STATE_UNCONNECTED) @@ -908,15 +896,15 @@ gst_pipewire_src_negotiate (GstBaseSrc * basesrc) if (state == PW_STREAM_STATE_ERROR || pwsrc->flushing) goto connect_error; - pw_thread_loop_wait (pwsrc->core->loop); + pw_thread_loop_wait (pwsrc->stream->core->loop); } } - target_id = pwsrc->path ? (uint32_t)atoi(pwsrc->path) : PW_ID_ANY; + target_id = pwsrc->stream->path ? (uint32_t)atoi(pwsrc->stream->path) : PW_ID_ANY; - if (pwsrc->target_object) { + if (pwsrc->stream->target_object) { struct spa_dict_item items[2] = { - SPA_DICT_ITEM_INIT(PW_KEY_TARGET_OBJECT, pwsrc->target_object), + SPA_DICT_ITEM_INIT(PW_KEY_TARGET_OBJECT, pwsrc->stream->target_object), /* XXX deprecated but the portal and some example apps only * provide the object id */ SPA_DICT_ITEM_INIT(PW_KEY_NODE_TARGET, NULL), @@ -925,39 +913,39 @@ gst_pipewire_src_negotiate (GstBaseSrc * basesrc) uint64_t serial; /* If target.object is a name, set it also to node.target */ - if (spa_atou64(pwsrc->target_object, &serial, 0)) { + if (spa_atou64(pwsrc->stream->target_object, &serial, 0)) { dict.n_items = 1; } else { target_id = PW_ID_ANY; - items[1].value = pwsrc->target_object; + items[1].value = pwsrc->stream->target_object; } - pw_stream_update_properties (pwsrc->stream, &dict); + pw_stream_update_properties (pwsrc->stream->pwstream, &dict); } GST_DEBUG_OBJECT (basesrc, "connect capture with path %s, target-object %s", - pwsrc->path, pwsrc->target_object); + pwsrc->stream->path, pwsrc->stream->target_object); enum pw_stream_flags flags; flags = PW_STREAM_FLAG_DONT_RECONNECT | PW_STREAM_FLAG_ASYNC; if (pwsrc->autoconnect) flags |= PW_STREAM_FLAG_AUTOCONNECT; - pw_stream_connect (pwsrc->stream, + pw_stream_connect (pwsrc->stream->pwstream, PW_DIRECTION_INPUT, target_id, flags, (const struct spa_pod **)possible->pdata, possible->len); - pw_thread_loop_get_time (pwsrc->core->loop, &abstime, + pw_thread_loop_get_time (pwsrc->stream->core->loop, &abstime, GST_PIPEWIRE_DEFAULT_TIMEOUT * SPA_NSEC_PER_SEC); pwsrc->possible_caps = possible_caps; pwsrc->negotiated = FALSE; while (TRUE) { - enum pw_stream_state state = pw_stream_get_state (pwsrc->stream, &error); + enum pw_stream_state state = pw_stream_get_state (pwsrc->stream->pwstream, &error); GST_DEBUG_OBJECT (basesrc, "waiting for NEGOTIATED, now %s", pw_stream_state_as_string (state)); if (state == PW_STREAM_STATE_ERROR || pwsrc->flushing) @@ -967,21 +955,21 @@ gst_pipewire_src_negotiate (GstBaseSrc * basesrc) break; if (pwsrc->autoconnect) { - if (pw_thread_loop_timed_wait_full (pwsrc->core->loop, &abstime) < 0) + if (pw_thread_loop_timed_wait_full (pwsrc->stream->core->loop, &abstime) < 0) goto connect_error; } else { - pw_thread_loop_wait (pwsrc->core->loop); + pw_thread_loop_wait (pwsrc->stream->core->loop); } } negotiated_caps = g_steal_pointer (&pwsrc->caps); pwsrc->possible_caps = NULL; - pw_thread_loop_unlock (pwsrc->core->loop); + pw_thread_loop_unlock (pwsrc->stream->core->loop); if (negotiated_caps == NULL) goto no_caps; - gst_pipewire_clock_reset (GST_PIPEWIRE_CLOCK (pwsrc->clock), 0); + gst_pipewire_clock_reset (GST_PIPEWIRE_CLOCK (pwsrc->stream->clock), 0); GST_DEBUG_OBJECT (pwsrc, "set format %" GST_PTR_FORMAT, negotiated_caps); result = gst_base_src_set_caps (GST_BASE_SRC (pwsrc), negotiated_caps); @@ -1004,7 +992,7 @@ no_caps: GST_ELEMENT_ERROR (basesrc, STREAM, FORMAT, ("%s", error_string), ("This element did not produce valid caps")); - pw_stream_set_error (pwsrc->stream, -EINVAL, "%s", error_string); + pw_stream_set_error (pwsrc->stream->pwstream, -EINVAL, "%s", error_string); return FALSE; } no_common_caps: @@ -1014,7 +1002,7 @@ no_common_caps: GST_ELEMENT_ERROR (basesrc, STREAM, FORMAT, ("%s", error_string), ("This element does not have formats in common with the peer")); - pw_stream_set_error (pwsrc->stream, -EPIPE, "%s", error_string); + pw_stream_set_error (pwsrc->stream->pwstream, -EPIPE, "%s", error_string); return FALSE; } connect_error: @@ -1022,7 +1010,7 @@ connect_error: g_clear_pointer (&pwsrc->caps, gst_caps_unref); pwsrc->possible_caps = NULL; GST_DEBUG_OBJECT (basesrc, "connect error"); - pw_thread_loop_unlock (pwsrc->core->loop); + pw_thread_loop_unlock (pwsrc->stream->core->loop); return FALSE; } } @@ -1061,14 +1049,14 @@ handle_format_change (GstPipeWireSrc *pwsrc, if (gst_video_is_dma_drm_caps (pwsrc->caps)) { if (!gst_video_info_dma_drm_from_caps (&pwsrc->drm_info, pwsrc->caps)) { GST_WARNING_OBJECT (pwsrc, "Can't create drm video info from caps"); - pw_stream_set_error (pwsrc->stream, -EINVAL, "internal error"); + pw_stream_set_error (pwsrc->stream->pwstream, -EINVAL, "internal error"); return; } if (!gst_video_info_dma_drm_to_video_info (&pwsrc->drm_info, &pwsrc->video_info)) { GST_WARNING_OBJECT (pwsrc, "Can't create video info from drm video info"); - pw_stream_set_error (pwsrc->stream, -EINVAL, "internal error"); + pw_stream_set_error (pwsrc->stream->pwstream, -EINVAL, "internal error"); return; } } else { @@ -1124,12 +1112,12 @@ handle_format_change (GstPipeWireSrc *pwsrc, SPA_PARAM_META_size, SPA_POD_Int(sizeof (struct spa_meta_videotransform))); GST_DEBUG_OBJECT (pwsrc, "doing finish format"); - pw_stream_update_params (pwsrc->stream, params, SPA_N_ELEMENTS(params)); + pw_stream_update_params (pwsrc->stream->pwstream, params, SPA_N_ELEMENTS(params)); } else { GST_WARNING_OBJECT (pwsrc, "finish format with error"); - pw_stream_set_error (pwsrc->stream, -EINVAL, "unhandled format"); + pw_stream_set_error (pwsrc->stream->pwstream, -EINVAL, "unhandled format"); } - pw_thread_loop_signal (pwsrc->core->loop, FALSE); + pw_thread_loop_signal (pwsrc->stream->core->loop, FALSE); } static void @@ -1149,11 +1137,11 @@ gst_pipewire_src_unlock (GstBaseSrc * basesrc) { GstPipeWireSrc *pwsrc = GST_PIPEWIRE_SRC (basesrc); - pw_thread_loop_lock (pwsrc->core->loop); + pw_thread_loop_lock (pwsrc->stream->core->loop); GST_DEBUG_OBJECT (pwsrc, "setting flushing"); pwsrc->flushing = TRUE; - pw_thread_loop_signal (pwsrc->core->loop, FALSE); - pw_thread_loop_unlock (pwsrc->core->loop); + pw_thread_loop_signal (pwsrc->stream->core->loop, FALSE); + pw_thread_loop_unlock (pwsrc->stream->core->loop); return TRUE; } @@ -1163,10 +1151,10 @@ gst_pipewire_src_unlock_stop (GstBaseSrc * basesrc) { GstPipeWireSrc *pwsrc = GST_PIPEWIRE_SRC (basesrc); - pw_thread_loop_lock (pwsrc->core->loop); + pw_thread_loop_lock (pwsrc->stream->core->loop); GST_DEBUG_OBJECT (pwsrc, "unsetting flushing"); pwsrc->flushing = FALSE; - pw_thread_loop_unlock (pwsrc->core->loop); + pw_thread_loop_unlock (pwsrc->stream->core->loop); return TRUE; } @@ -1260,7 +1248,7 @@ gst_pipewire_src_create (GstPushSrc * psrc, GstBuffer ** buffer) pwsrc = GST_PIPEWIRE_SRC (psrc); - pw_thread_loop_lock (pwsrc->core->loop); + pw_thread_loop_lock (pwsrc->stream->core->loop); if (!pwsrc->negotiated) goto not_negotiated; @@ -1273,7 +1261,7 @@ gst_pipewire_src_create (GstPushSrc * psrc, GstBuffer ** buffer) if (pwsrc->stream == NULL) goto streaming_error; - state = pw_stream_get_state (pwsrc->stream, &error); + state = pw_stream_get_state (pwsrc->stream->pwstream, &error); if (state == PW_STREAM_STATE_ERROR) goto streaming_error; @@ -1282,13 +1270,13 @@ gst_pipewire_src_create (GstPushSrc * psrc, GstBuffer ** buffer) if ((caps = pwsrc->caps) != NULL) { pwsrc->caps = NULL; - pw_thread_loop_unlock (pwsrc->core->loop); + pw_thread_loop_unlock (pwsrc->stream->core->loop); GST_DEBUG_OBJECT (pwsrc, "set format %" GST_PTR_FORMAT, caps); gst_base_src_set_caps (GST_BASE_SRC (pwsrc), caps); gst_caps_unref (caps); - pw_thread_loop_lock (pwsrc->core->loop); + pw_thread_loop_lock (pwsrc->stream->core->loop); continue; } @@ -1319,15 +1307,15 @@ gst_pipewire_src_create (GstPushSrc * psrc, GstBuffer ** buffer) timeout = FALSE; if (pwsrc->keepalive_time > 0) { struct timespec abstime; - pw_thread_loop_get_time(pwsrc->core->loop, &abstime, + pw_thread_loop_get_time(pwsrc->stream->core->loop, &abstime, pwsrc->keepalive_time * SPA_NSEC_PER_MSEC); - if (pw_thread_loop_timed_wait_full (pwsrc->core->loop, &abstime) == -ETIMEDOUT) + if (pw_thread_loop_timed_wait_full (pwsrc->stream->core->loop, &abstime) == -ETIMEDOUT) timeout = TRUE; } else { - pw_thread_loop_wait (pwsrc->core->loop); + pw_thread_loop_wait (pwsrc->stream->core->loop); } } - pw_thread_loop_unlock (pwsrc->core->loop); + pw_thread_loop_unlock (pwsrc->stream->core->loop); *buffer = buf; @@ -1354,22 +1342,22 @@ gst_pipewire_src_create (GstPushSrc * psrc, GstBuffer ** buffer) not_negotiated: { - pw_thread_loop_unlock (pwsrc->core->loop); + pw_thread_loop_unlock (pwsrc->stream->core->loop); return GST_FLOW_NOT_NEGOTIATED; } streaming_eos: { - pw_thread_loop_unlock (pwsrc->core->loop); + pw_thread_loop_unlock (pwsrc->stream->core->loop); return GST_FLOW_EOS; } streaming_error: { - pw_thread_loop_unlock (pwsrc->core->loop); + pw_thread_loop_unlock (pwsrc->stream->core->loop); return GST_FLOW_ERROR; } streaming_stopped: { - pw_thread_loop_unlock (pwsrc->core->loop); + pw_thread_loop_unlock (pwsrc->stream->core->loop); return GST_FLOW_FLUSHING; } } @@ -1387,35 +1375,15 @@ gst_pipewire_src_stop (GstBaseSrc * basesrc) pwsrc = GST_PIPEWIRE_SRC (basesrc); - pw_thread_loop_lock (pwsrc->core->loop); + pw_thread_loop_lock (pwsrc->stream->core->loop); pwsrc->eos = false; gst_buffer_replace (&pwsrc->last_buffer, NULL); gst_caps_replace(&pwsrc->caps, NULL); - pw_thread_loop_unlock (pwsrc->core->loop); + pw_thread_loop_unlock (pwsrc->stream->core->loop); return TRUE; } -static gboolean -copy_properties (GQuark field_id, - const GValue *value, - gpointer user_data) -{ - struct pw_properties *properties = user_data; - GValue dst = { 0 }; - - if (g_value_type_transformable (G_VALUE_TYPE(value), G_TYPE_STRING)) { - g_value_init(&dst, G_TYPE_STRING); - if (g_value_transform(value, &dst)) { - pw_properties_set (properties, - g_quark_to_string (field_id), - g_value_get_string (&dst)); - } - g_value_unset(&dst); - } - return TRUE; -} - static const struct pw_stream_events stream_events = { PW_VERSION_STREAM_EVENTS, .state_changed = on_state_changed, @@ -1425,94 +1393,6 @@ static const struct pw_stream_events stream_events = { .process = on_process, }; -static gboolean -gst_pipewire_src_open (GstPipeWireSrc * pwsrc) -{ - struct pw_properties *props; - - GST_DEBUG_OBJECT (pwsrc, "open"); - - pwsrc->core = gst_pipewire_core_get(pwsrc->fd); - if (pwsrc->core == NULL) - goto connect_error; - - pw_thread_loop_lock (pwsrc->core->loop); - - props = pw_properties_new (NULL, NULL); - if (pwsrc->client_properties) { - gst_structure_foreach (pwsrc->client_properties, copy_properties, props); - pw_core_update_properties (pwsrc->core->core, &props->dict); - pw_properties_clear(props); - } - if (pwsrc->client_name) { - pw_properties_set (props, PW_KEY_NODE_NAME, pwsrc->client_name); - pw_properties_set (props, PW_KEY_NODE_DESCRIPTION, pwsrc->client_name); - } - if (pwsrc->stream_properties) { - gst_structure_foreach (pwsrc->stream_properties, copy_properties, props); - } - - if ((pwsrc->stream = pw_stream_new (pwsrc->core->core, - pwsrc->client_name, props)) == NULL) - goto no_stream; - - - pw_stream_add_listener(pwsrc->stream, - &pwsrc->stream_listener, - &stream_events, - pwsrc); - - pwsrc->clock = gst_pipewire_clock_new (pwsrc->stream, pwsrc->last_time); - pw_thread_loop_unlock (pwsrc->core->loop); - - return TRUE; - - /* ERRORS */ -connect_error: - { - GST_ELEMENT_ERROR (pwsrc, RESOURCE, FAILED, ("can't connect"), (NULL)); - return FALSE; - } -no_stream: - { - GST_ELEMENT_ERROR (pwsrc, RESOURCE, FAILED, ("can't create stream"), (NULL)); - pw_thread_loop_unlock (pwsrc->core->loop); - gst_pipewire_core_release (pwsrc->core); - pwsrc->core = NULL; - return FALSE; - } -} - -static void -gst_pipewire_src_close (GstPipeWireSrc * pwsrc) -{ - pwsrc->last_time = gst_clock_get_time (pwsrc->clock); - - GST_DEBUG_OBJECT (pwsrc, "close"); - - gst_element_post_message (GST_ELEMENT (pwsrc), - gst_message_new_clock_lost (GST_OBJECT_CAST (pwsrc), pwsrc->clock)); - - GST_OBJECT_LOCK (pwsrc); - GST_PIPEWIRE_CLOCK (pwsrc->clock)->stream = NULL; - g_clear_object (&pwsrc->clock); - GST_OBJECT_UNLOCK (pwsrc); - - GST_OBJECT_LOCK (pwsrc->pool); - pw_thread_loop_lock (pwsrc->core->loop); - if (pwsrc->stream) { - pw_stream_destroy (pwsrc->stream); - pwsrc->stream = NULL; - } - pw_thread_loop_unlock (pwsrc->core->loop); - GST_OBJECT_UNLOCK (pwsrc->pool); - - if (pwsrc->core) { - gst_pipewire_core_release (pwsrc->core); - pwsrc->core = NULL; - } -} - static gboolean gst_pipewire_src_send_event (GstElement * elem, GstEvent * event) { @@ -1522,10 +1402,10 @@ gst_pipewire_src_send_event (GstElement * elem, GstEvent * event) switch (GST_EVENT_TYPE (event)) { case GST_EVENT_EOS: GST_DEBUG_OBJECT (this, "got EOS"); - pw_thread_loop_lock (this->core->loop); + pw_thread_loop_lock (this->stream->core->loop); this->eos = true; - pw_thread_loop_signal (this->core->loop, FALSE); - pw_thread_loop_unlock (this->core->loop); + pw_thread_loop_signal (this->stream->core->loop, FALSE); + pw_thread_loop_unlock (this->stream->core->loop); ret = TRUE; break; default: @@ -1543,22 +1423,22 @@ gst_pipewire_src_change_state (GstElement * element, GstStateChange transition) switch (transition) { case GST_STATE_CHANGE_NULL_TO_READY: - if (!gst_pipewire_src_open (this)) + if (!gst_pipewire_stream_open (this->stream, &stream_events)) goto open_failed; break; case GST_STATE_CHANGE_READY_TO_PAUSED: break; case GST_STATE_CHANGE_PAUSED_TO_PLAYING: /* uncork and start recording */ - pw_thread_loop_lock (this->core->loop); - pw_stream_set_active(this->stream, true); - pw_thread_loop_unlock (this->core->loop); + pw_thread_loop_lock (this->stream->core->loop); + pw_stream_set_active(this->stream->pwstream, true); + pw_thread_loop_unlock (this->stream->core->loop); break; case GST_STATE_CHANGE_PLAYING_TO_PAUSED: /* stop recording ASAP by corking */ - pw_thread_loop_lock (this->core->loop); - pw_stream_set_active(this->stream, false); - pw_thread_loop_unlock (this->core->loop); + pw_thread_loop_lock (this->stream->core->loop); + pw_stream_set_active(this->stream->pwstream, false); + pw_thread_loop_unlock (this->stream->core->loop); break; default: break; @@ -1577,12 +1457,12 @@ gst_pipewire_src_change_state (GstElement * element, GstStateChange transition) case GST_STATE_CHANGE_PLAYING_TO_PAUSED: break; case GST_STATE_CHANGE_PAUSED_TO_READY: - pw_thread_loop_lock (this->core->loop); + pw_thread_loop_lock (this->stream->core->loop); this->negotiated = FALSE; - pw_thread_loop_unlock (this->core->loop); + pw_thread_loop_unlock (this->stream->core->loop); break; case GST_STATE_CHANGE_READY_TO_NULL: - gst_pipewire_src_close (this); + gst_pipewire_stream_close (this->stream); break; default: break; diff --git a/src/gst/gstpipewiresrc.h b/src/gst/gstpipewiresrc.h index bc7b65017..11fe7bf1c 100644 --- a/src/gst/gstpipewiresrc.h +++ b/src/gst/gstpipewiresrc.h @@ -7,6 +7,8 @@ #include "config.h" +#include "gstpipewirestream.h" + #include #include @@ -42,14 +44,12 @@ typedef struct _GstPipeWireSrcClass GstPipeWireSrcClass; struct _GstPipeWireSrc { GstPushSrc element; + GstPipeWireStream *stream; + /*< private >*/ - gchar *path; - gchar *target_object; - gchar *client_name; gboolean always_copy; gint min_buffers; gint max_buffers; - int fd; gboolean resend_last; gint keepalive_time; gboolean autoconnect; @@ -73,21 +73,7 @@ struct _GstPipeWireSrc { GstClockTime min_latency; GstClockTime max_latency; - GstStructure *client_properties; - GstPipeWireCore *core; - struct spa_hook core_listener; - int last_seq; - int pending_seq; - - struct pw_stream *stream; - struct spa_hook stream_listener; - GstBuffer *last_buffer; - GstStructure *stream_properties; - - GstPipeWirePool *pool; - GstClock *clock; - GstClockTime last_time; enum spa_meta_videotransform_value transform_value; }; diff --git a/src/gst/gstpipewirestream.c b/src/gst/gstpipewirestream.c new file mode 100644 index 000000000..7591d46c2 --- /dev/null +++ b/src/gst/gstpipewirestream.c @@ -0,0 +1,168 @@ +/* GStreamer */ +/* SPDX-FileCopyrightText: Copyright © 2018 Wim Taymans */ +/* SPDX-FileCopyrightText: Copyright © 2024 Collabora Ltd. */ +/* SPDX-License-Identifier: MIT */ + +#include "gstpipewirestream.h" + +GST_DEBUG_CATEGORY_STATIC (pipewire_stream_debug); +#define GST_CAT_DEFAULT pipewire_stream_debug + +G_DEFINE_TYPE (GstPipeWireStream, gst_pipewire_stream, GST_TYPE_OBJECT) + +static void +gst_pipewire_stream_init (GstPipeWireStream * self) +{ + self->fd = -1; + self->client_name = g_strdup (pw_get_client_name()); + self->pool = gst_pipewire_pool_new (); +} + +static void +gst_pipewire_stream_finalize (GObject * object) +{ + GstPipeWireStream * self = GST_PIPEWIRE_STREAM (object); + + g_clear_object (&self->pool); + g_free (self->path); + g_free (self->target_object); + g_free (self->client_name); + gst_clear_structure (&self->client_properties); + gst_clear_structure (&self->stream_properties); + + G_OBJECT_CLASS(gst_pipewire_stream_parent_class)->finalize (object); +} + +void +gst_pipewire_stream_class_init (GstPipeWireStreamClass * klass) +{ + GObjectClass *gobject_class = G_OBJECT_CLASS (klass); + + gobject_class->finalize = gst_pipewire_stream_finalize; + + GST_DEBUG_CATEGORY_INIT (pipewire_stream_debug, "pipewirestream", 0, + "PipeWire Stream"); +} + +GstPipeWireStream * +gst_pipewire_stream_new (GstElement * element) +{ + GstPipeWireStream *stream; + + stream = g_object_new (GST_TYPE_PIPEWIRE_STREAM, NULL); + stream->element = element; + + return stream; +} + +static gboolean +copy_properties (GQuark field_id, + const GValue *value, + gpointer user_data) +{ + struct pw_properties *properties = user_data; + GValue dst = { 0 }; + + if (g_value_type_transformable (G_VALUE_TYPE(value), G_TYPE_STRING)) { + g_value_init (&dst, G_TYPE_STRING); + if (g_value_transform (value, &dst)) { + pw_properties_set (properties, + g_quark_to_string (field_id), + g_value_get_string (&dst)); + } + g_value_unset (&dst); + } + return TRUE; +} + +gboolean +gst_pipewire_stream_open (GstPipeWireStream * self, + const struct pw_stream_events * pwstream_events) +{ + struct pw_properties *props; + + g_return_val_if_fail (self->core == NULL, FALSE); + + GST_DEBUG_OBJECT (self, "open"); + + /* acquire the core */ + self->core = gst_pipewire_core_get (self->fd); + if (self->core == NULL) + goto connect_error; + + pw_thread_loop_lock (self->core->loop); + + /* update the client properties */ + if (self->client_properties) { + props = pw_properties_new (NULL, NULL); + gst_structure_foreach (self->client_properties, copy_properties, props); + pw_core_update_properties (self->core->core, &props->dict); + pw_properties_free (props); + } + + /* create stream */ + props = pw_properties_new (NULL, NULL); + if (self->client_name) { + pw_properties_set (props, PW_KEY_NODE_NAME, self->client_name); + pw_properties_set (props, PW_KEY_NODE_DESCRIPTION, self->client_name); + } + if (self->stream_properties) { + gst_structure_foreach (self->stream_properties, copy_properties, props); + } + + if ((self->pwstream = pw_stream_new (self->core->core, + self->client_name, props)) == NULL) + goto no_stream; + + pw_stream_add_listener(self->pwstream, + &self->pwstream_listener, + pwstream_events, + self->element); + + /* create clock */ + self->clock = gst_pipewire_clock_new (self->pwstream, 0); + + self->pool->stream = self->pwstream; + + pw_thread_loop_unlock (self->core->loop); + + return TRUE; + + /* ERRORS */ +connect_error: + { + GST_ELEMENT_ERROR (self->element, RESOURCE, FAILED, + ("Failed to connect"), (NULL)); + return FALSE; + } +no_stream: + { + GST_ELEMENT_ERROR (self->element, RESOURCE, FAILED, + ("can't create stream"), (NULL)); + pw_thread_loop_unlock (self->core->loop); + return FALSE; + } +} + +void +gst_pipewire_stream_close (GstPipeWireStream * self) +{ + GST_DEBUG_OBJECT (self, "close"); + + self->pool->stream = NULL; + + /* destroy the clock */ + gst_element_post_message (GST_ELEMENT (self->element), + gst_message_new_clock_lost (GST_OBJECT_CAST (self->element), self->clock)); + + GST_PIPEWIRE_CLOCK (self->clock)->stream = NULL; + g_clear_object (&self->clock); + + /* destroy the pw stream */ + pw_thread_loop_lock (self->core->loop); + g_clear_pointer (&self->pwstream, pw_stream_destroy); + pw_thread_loop_unlock (self->core->loop); + + /* release the core */ + g_clear_pointer (&self->core, gst_pipewire_core_release); +} diff --git a/src/gst/gstpipewirestream.h b/src/gst/gstpipewirestream.h new file mode 100644 index 000000000..a9626c0e7 --- /dev/null +++ b/src/gst/gstpipewirestream.h @@ -0,0 +1,53 @@ +/* GStreamer */ +/* SPDX-FileCopyrightText: Copyright © 2018 Wim Taymans */ +/* SPDX-FileCopyrightText: Copyright © 2024 Collabora Ltd. */ +/* SPDX-License-Identifier: MIT */ + +#ifndef __GST_PIPEWIRE_STREAM_H__ +#define __GST_PIPEWIRE_STREAM_H__ + +#include "config.h" + +#include "gstpipewirecore.h" +#include "gstpipewirepool.h" +#include "gstpipewireclock.h" + +#include +#include + +G_BEGIN_DECLS + +#define GST_TYPE_PIPEWIRE_STREAM (gst_pipewire_stream_get_type()) +G_DECLARE_FINAL_TYPE(GstPipeWireStream, gst_pipewire_stream, GST, PIPEWIRE_STREAM, GstObject) + +struct _GstPipeWireStream { + GstObject parent; + + /* relatives */ + GstElement *element; + GstPipeWireCore *core; + GstPipeWirePool *pool; + GstClock *clock; + + /* the actual pw stream */ + struct pw_stream *pwstream; + struct spa_hook pwstream_listener; + + /* common properties */ + int fd; + gchar *path; + gchar *target_object; + gchar *client_name; + GstStructure *client_properties; + GstStructure *stream_properties; +}; + +GstPipeWireStream * gst_pipewire_stream_new (GstElement * element); + +gboolean gst_pipewire_stream_open (GstPipeWireStream * self, + const struct pw_stream_events * pwstream_events); +void gst_pipewire_stream_close (GstPipeWireStream * self); + +G_END_DECLS + +#endif /* __GST_PIPEWIRE_STREAM_H__ */ diff --git a/src/gst/meson.build b/src/gst/meson.build index fd552f6cb..ba1f6d558 100644 --- a/src/gst/meson.build +++ b/src/gst/meson.build @@ -6,6 +6,7 @@ pipewire_gst_sources = [ 'gstpipewirepool.c', 'gstpipewiresink.c', 'gstpipewiresrc.c', + 'gstpipewirestream.c', ] if get_option('gstreamer-device-provider').allowed() @@ -20,6 +21,7 @@ pipewire_gst_headers = [ 'gstpipewirepool.h', 'gstpipewiresink.h', 'gstpipewiresrc.h', + 'gstpipewirestream.h', ] pipewire_gst = shared_library('gstpipewire',