wireplumber/lib/wp/core.c

570 lines
14 KiB
C
Raw Normal View History

/* WirePlumber
*
* Copyright © 2019 Collabora Ltd.
* @author George Kiagiadakis <george.kiagiadakis@collabora.com>
*
2019-05-31 12:13:01 +03:00
* SPDX-License-Identifier: MIT
*/
2020-02-17 15:39:19 +02:00
/**
* SECTION: WpCore
*
* The core is the central object around which everything operates. It is
* essential to create a #WpCore before using any other WirePlumber API.
*
* The core object has the following responsibilities:
* * it initializes the PipeWire library
* * it creates a `pw_context` and allows connecting to the PipeWire server,
* creating a local `pw_core`
* * it glues the PipeWire library's event loop system with GMainLoop
* * it maintains a list of registered objects, which other classes use
* to keep objects loaded permanently into memory
* * it watches the PipeWire registry and keeps track of remote and local
* objects that appear in the registry, making them accessible through
* the #WpObjectManager API.
*/
#include "core.h"
#include "wp.h"
#include "private.h"
#include <pipewire/pipewire.h>
2020-01-09 12:39:45 -05:00
#include <spa/utils/result.h>
lib: introduce WpObjectManager * rework how global objects are stored in the core * rework how users get notified about global objects and proxies of remote global objects The purpose of this change is to have a class that can manage objects that are registered in the core or signalled through the registry. This object can declare interest on certain types of global objects and only keep & signal those objects that it is interested in. Additionally, it can prepare proxy features and asynchronously deliver an 'objects-changed' signal, which is basically telling us that the list of objects has changed. This is useful to simplify port proxies management in WpAudioStream. Now the stream object can declare that it is interested in ports that have "node.id" == X and the object manager will only maintain a list of those. Additionally, it will emit the 'objects-changed' signal when the list of ports is complete, so there is no reason to do complex operations and core syncs in the WpAudioStream class in order to figure out when the list of ports is ready. As a side effect, this also reduces resource management. Now we don't construct a WpProxy for every global that pipewire reports; we only construct proxies when there is interest in them! Another interesting side effect is that we can now register an object manager at any point in time and get immediately notified about remote globals that already exist. i.e. when you register an object manager that is interested in nodes, it will be immediately notified about all the existing nodes in the graph. This is useful to avoid race conditions between connecting the signal and objects beting created in pipewire
2019-11-13 15:44:23 +02:00
#include <spa/debug/types.h>
/*
* Integration between the PipeWire main loop and GMainLoop
*/
#define WP_LOOP_SOURCE(x) ((WpLoopSource *) x)
typedef struct _WpLoopSource WpLoopSource;
struct _WpLoopSource
{
GSource parent;
struct pw_loop *loop;
};
static gboolean
wp_loop_source_dispatch (GSource * s, GSourceFunc callback, gpointer user_data)
{
int result;
pw_loop_enter (WP_LOOP_SOURCE(s)->loop);
result = pw_loop_iterate (WP_LOOP_SOURCE(s)->loop, 0);
pw_loop_leave (WP_LOOP_SOURCE(s)->loop);
if (G_UNLIKELY (result < 0))
g_warning ("pw_loop_iterate failed: %s", spa_strerror (result));
return G_SOURCE_CONTINUE;
}
static void
wp_loop_source_finalize (GSource * s)
{
pw_loop_destroy (WP_LOOP_SOURCE(s)->loop);
}
static GSourceFuncs source_funcs = {
NULL,
NULL,
wp_loop_source_dispatch,
wp_loop_source_finalize
};
static GSource *
wp_loop_source_new (void)
{
GSource *s = g_source_new (&source_funcs, sizeof (WpLoopSource));
WP_LOOP_SOURCE(s)->loop = pw_loop_new (NULL);
g_source_add_unix_fd (s,
pw_loop_get_fd (WP_LOOP_SOURCE(s)->loop),
G_IO_IN | G_IO_ERR | G_IO_HUP);
return (GSource *) s;
}
/**
* WpCore
*/
struct _WpCore
{
GObject parent;
/* main loop integration */
GMainContext *context;
/* extra properties */
WpProperties *properties;
/* pipewire main objects */
struct pw_context *pw_context;
struct pw_core *pw_core;
/* pipewire main listeners */
struct spa_hook core_listener;
struct spa_hook proxy_core_listener;
WpRegistry registry;
GHashTable *async_tasks; // <int seq, GTask*>
};
enum {
PROP_0,
PROP_CONTEXT,
PROP_PROPERTIES,
2020-01-09 12:39:45 -05:00
PROP_PW_CONTEXT,
PROP_PW_CORE,
};
enum {
2020-01-09 12:39:45 -05:00
SIGNAL_CONNECTED,
SIGNAL_DISCONNECTED,
NUM_SIGNALS
};
static guint32 signals[NUM_SIGNALS];
2020-02-17 15:39:19 +02:00
/**
* WP_TYPE_CORE:
*
* The #WpCore #GType
*/
G_DEFINE_TYPE (WpCore, wp_core, G_TYPE_OBJECT)
2019-12-04 15:20:42 -05:00
static void
core_done (void *data, uint32_t id, int seq)
{
WpCore *self = WP_CORE (data);
g_autoptr (GTask) task = NULL;
g_hash_table_steal_extended (self->async_tasks, GINT_TO_POINTER (seq), NULL,
(gpointer *) &task);
if (task)
g_task_return_boolean (task, TRUE);
}
2020-01-09 12:39:45 -05:00
static const struct pw_core_events core_events = {
2019-12-04 15:20:42 -05:00
PW_VERSION_CORE_EVENTS,
.done = core_done,
};
static void
2020-01-09 12:39:45 -05:00
proxy_core_destroy (void *data)
{
2020-01-09 12:39:45 -05:00
WpCore *self = WP_CORE (data);
self->pw_core = NULL;
2020-01-09 12:39:45 -05:00
/* Emit the disconnected signal */
g_signal_emit (self, signals[SIGNAL_DISCONNECTED], 0);
}
2020-01-09 12:39:45 -05:00
static const struct pw_proxy_events proxy_core_events = {
PW_VERSION_PROXY_EVENTS,
.destroy = proxy_core_destroy,
};
static void
wp_core_init (WpCore * self)
{
wp_registry_init (&self->registry);
2019-12-04 15:20:42 -05:00
self->async_tasks = g_hash_table_new_full (g_direct_hash, g_direct_equal,
NULL, g_object_unref);
}
static void
wp_core_constructed (GObject *object)
{
WpCore *self = WP_CORE (object);
g_autoptr (GSource) source = NULL;
2020-01-09 12:39:45 -05:00
struct pw_properties *p = NULL;
2020-01-09 12:39:45 -05:00
/* loop */
source = wp_loop_source_new ();
g_source_attach (source, self->context);
2020-01-09 12:39:45 -05:00
/* context */
p = self->properties ? wp_properties_to_pw_properties (self->properties) : NULL;
2020-01-09 12:39:45 -05:00
self->pw_context = pw_context_new (WP_LOOP_SOURCE(source)->loop, p, 0);
G_OBJECT_CLASS (wp_core_parent_class)->constructed (object);
}
static void
wp_core_dispose (GObject * obj)
{
WpCore *self = WP_CORE (obj);
lib: introduce WpObjectManager * rework how global objects are stored in the core * rework how users get notified about global objects and proxies of remote global objects The purpose of this change is to have a class that can manage objects that are registered in the core or signalled through the registry. This object can declare interest on certain types of global objects and only keep & signal those objects that it is interested in. Additionally, it can prepare proxy features and asynchronously deliver an 'objects-changed' signal, which is basically telling us that the list of objects has changed. This is useful to simplify port proxies management in WpAudioStream. Now the stream object can declare that it is interested in ports that have "node.id" == X and the object manager will only maintain a list of those. Additionally, it will emit the 'objects-changed' signal when the list of ports is complete, so there is no reason to do complex operations and core syncs in the WpAudioStream class in order to figure out when the list of ports is ready. As a side effect, this also reduces resource management. Now we don't construct a WpProxy for every global that pipewire reports; we only construct proxies when there is interest in them! Another interesting side effect is that we can now register an object manager at any point in time and get immediately notified about remote globals that already exist. i.e. when you register an object manager that is interested in nodes, it will be immediately notified about all the existing nodes in the graph. This is useful to avoid race conditions between connecting the signal and objects beting created in pipewire
2019-11-13 15:44:23 +02:00
wp_registry_clear (&self->registry);
G_OBJECT_CLASS (wp_core_parent_class)->dispose (obj);
}
static void
wp_core_finalize (GObject * obj)
{
WpCore *self = WP_CORE (obj);
2020-01-09 12:39:45 -05:00
wp_core_disconnect (self);
g_clear_pointer (&self->pw_context, pw_context_destroy);
g_clear_pointer (&self->properties, wp_properties_unref);
g_clear_pointer (&self->context, g_main_context_unref);
2019-12-04 15:20:42 -05:00
g_clear_pointer (&self->async_tasks, g_hash_table_unref);
g_debug ("WpCore destroyed");
G_OBJECT_CLASS (wp_core_parent_class)->finalize (obj);
}
static void
wp_core_get_property (GObject * object, guint property_id,
GValue * value, GParamSpec * pspec)
{
WpCore *self = WP_CORE (object);
switch (property_id) {
case PROP_CONTEXT:
g_value_set_boxed (value, self->context);
break;
case PROP_PROPERTIES:
g_value_set_boxed (value, self->properties);
break;
2020-01-09 12:39:45 -05:00
case PROP_PW_CONTEXT:
g_value_set_pointer (value, self->pw_context);
break;
case PROP_PW_CORE:
g_value_set_pointer (value, self->pw_core);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
break;
}
}
static void
wp_core_set_property (GObject * object, guint property_id,
const GValue * value, GParamSpec * pspec)
{
WpCore *self = WP_CORE (object);
switch (property_id) {
case PROP_CONTEXT:
self->context = g_value_dup_boxed (value);
break;
case PROP_PROPERTIES:
self->properties = g_value_dup_boxed (value);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
break;
}
}
static void
wp_core_class_init (WpCoreClass * klass)
{
GObjectClass *object_class = (GObjectClass *) klass;
pw_init (NULL, NULL);
object_class->constructed = wp_core_constructed;
object_class->dispose = wp_core_dispose;
object_class->finalize = wp_core_finalize;
object_class->get_property = wp_core_get_property;
object_class->set_property = wp_core_set_property;
g_object_class_install_property (object_class, PROP_CONTEXT,
g_param_spec_boxed ("context", "context", "A GMainContext to attach to",
G_TYPE_MAIN_CONTEXT,
G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (object_class, PROP_PROPERTIES,
g_param_spec_boxed ("properties", "properties", "Extra properties",
WP_TYPE_PROPERTIES,
G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY | G_PARAM_STATIC_STRINGS));
2020-01-09 12:39:45 -05:00
g_object_class_install_property (object_class, PROP_PW_CONTEXT,
g_param_spec_pointer ("pw-context", "pw-context", "The pipewire context",
G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
2020-01-09 12:39:45 -05:00
g_object_class_install_property (object_class, PROP_PW_CORE,
g_param_spec_pointer ("pw-core", "pw-core", "The pipewire core",
G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
2020-02-17 15:39:19 +02:00
/**
* WpCore::connected:
* @self: the core
*
* Emitted when the core is successfully connected to the PipeWire server
*/
2020-01-09 12:39:45 -05:00
signals[SIGNAL_CONNECTED] = g_signal_new ("connected",
G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL,
G_TYPE_NONE, 0);
2020-02-17 15:39:19 +02:00
/**
* WpCore::disconnected:
* @self: the core
*
* Emitted when the core is disconnected from the PipeWire server
*/
2020-01-09 12:39:45 -05:00
signals[SIGNAL_DISCONNECTED] = g_signal_new ("disconnected",
G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL,
G_TYPE_NONE, 0);
/* ensure WpProxy subclasses are loaded, which is needed to be able
to autodetect the GType of proxies created through wp_proxy_new_global() */
g_type_ensure (WP_TYPE_CLIENT);
g_type_ensure (WP_TYPE_DEVICE);
g_type_ensure (WP_TYPE_ENDPOINT);
g_type_ensure (WP_TYPE_LINK);
g_type_ensure (WP_TYPE_NODE);
g_type_ensure (WP_TYPE_PORT);
g_type_ensure (WP_TYPE_SESSION);
}
2020-02-17 15:39:19 +02:00
/**
* wp_core_new:
* @context: (transfer none) (nullable): the #GMainContext to use for events
* @properties: (transfer none) (nullable): additional properties, which are
* passed to `pw_context_new` and `pw_context_connect`
*
* Returns: (transfer full): a new #WpCore
*/
WpCore *
wp_core_new (GMainContext *context, WpProperties * properties)
{
return g_object_new (WP_TYPE_CORE,
"context", context,
"properties", properties,
NULL);
}
2020-02-17 15:39:19 +02:00
/**
* wp_core_get_context:
* @self: the core
*
* Returns: (transfer none) (nullable): the #GMainContext that is in use by
* this core for events
*/
GMainContext *
wp_core_get_context (WpCore * self)
{
g_return_val_if_fail (WP_IS_CORE (self), NULL);
return self->context;
}
2020-02-17 15:39:19 +02:00
/**
* wp_core_get_pw_context:
* @self: the core
*
* Returns: (transfer none): the internal `pw_context` object
*/
2020-01-09 12:39:45 -05:00
struct pw_context *
wp_core_get_pw_context (WpCore * self)
{
g_return_val_if_fail (WP_IS_CORE (self), NULL);
return self->pw_context;
}
2020-02-17 15:39:19 +02:00
/**
* wp_core_get_pw_core:
* @self: the core
*
* Returns: (transfer none) (nullable): the internal `pw_core` object,
* or %NULL if the core is not connected to PipeWire
*/
struct pw_core *
wp_core_get_pw_core (WpCore * self)
{
g_return_val_if_fail (WP_IS_CORE (self), NULL);
return self->pw_core;
}
2020-02-17 15:39:19 +02:00
/**
* wp_core_connect:
* @self: the core
*
* Connects this core to the PipeWire server. When connection succeeds,
* the #WpCore::connected signal is emitted
*
* Returns: %TRUE if the core is effectively connected or %FALSE if
* connection failed
*/
2020-01-09 12:39:45 -05:00
gboolean
wp_core_connect (WpCore *self)
{
struct pw_properties *p = NULL;
g_return_val_if_fail (WP_IS_CORE (self), FALSE);
/* Don't do anything if core is already connected */
if (self->pw_core)
return TRUE;
/* Connect */
p = self->properties ? wp_properties_to_pw_properties (self->properties) : NULL;
self->pw_core = pw_context_connect (self->pw_context, p, 0);
if (!self->pw_core)
return FALSE;
/* Add the core listeners */
pw_core_add_listener (self->pw_core, &self->core_listener, &core_events, self);
pw_proxy_add_listener((struct pw_proxy*)self->pw_core,
&self->proxy_core_listener, &proxy_core_events, self);
/* Add the registry listener */
wp_registry_attach (&self->registry, self->pw_core);
2020-01-09 12:39:45 -05:00
/* Emit the connected signal */
g_signal_emit (self, signals[SIGNAL_CONNECTED], 0);
return TRUE;
}
2020-02-17 15:39:19 +02:00
/**
* wp_core_disconnect:
* @self: the core
*
* Disconnects this core from the PipeWire server. This also effectively
* destroys all #WpProxy objects that were created through the registry,
* destroys the `pw_core` and finally emits the #WpCore::disconnected signal.
*/
2020-01-09 12:39:45 -05:00
void
wp_core_disconnect (WpCore *self)
{
wp_registry_detach (&self->registry);
2020-01-09 12:39:45 -05:00
g_clear_pointer (&self->pw_core, pw_core_disconnect);
/* Emit the disconnected signal */
g_signal_emit (self, signals[SIGNAL_DISCONNECTED], 0);
}
2020-02-17 15:39:19 +02:00
/**
* wp_core_is_connected:
* @self: the core
*
* Returns: %TRUE if the core is connected to PipeWire, %FALSE otherwise
*/
2020-01-09 12:39:45 -05:00
gboolean
wp_core_is_connected (WpCore * self)
{
g_return_val_if_fail (WP_IS_CORE (self), FALSE);
return self->pw_core != NULL;
}
2020-02-17 15:39:19 +02:00
/**
* wp_core_idle_add:
* @self: the core
* @function: (scope notified): the function to call
* @data: (closure): data to pass to @function
* @destroy: (nullable): a function to destroy @data
*
* Adds an idle callback to be called in the same #GMainContext as the
* one used by this core. This is essentially the same as g_idle_add_full(),
* but it adds the created #GSource on the #GMainContext used by this core
* instead of the default context.
*
* Returns: the ID (greater than 0) of the event source
*/
guint
wp_core_idle_add (WpCore * self, GSourceFunc function, gpointer data,
GDestroyNotify destroy)
2019-11-27 12:56:02 -05:00
{
g_autoptr (GSource) source = NULL;
2019-11-27 12:56:02 -05:00
g_return_val_if_fail (WP_IS_CORE (self), 0);
2019-11-27 12:56:02 -05:00
source = g_idle_source_new ();
g_source_set_callback (source, function, data, destroy);
2019-11-27 12:56:02 -05:00
g_source_attach (source, self->context);
return g_source_get_id (source);
2019-11-27 12:56:02 -05:00
}
2020-02-17 15:39:19 +02:00
/**
* wp_core_sync:
* @self: the core
* @cancellable: (nullable): a #GCancellable to cancel the operation
* @callback: (scope async): a function to call when the operation is done
* @user_data: (closure): data to pass to @callback
*
* Asks the PipeWire server to call the @callback via an event.
*
* Since methods are handled in-order and events are delivered
* in-order, this can be used as a barrier to ensure all previous
* methods and the resulting events have been handled.
*
* In both success and error cases, @callback is always called. Use
* wp_core_sync_finish() from within the @callback to determine whether
* the operation completed successfully or if an error occurred.
*
* Returns: %TRUE if the sync operation was started, %FALSE if an error
* occurred before returning from this function
*/
2019-12-06 08:43:40 -05:00
gboolean
2019-12-04 15:20:42 -05:00
wp_core_sync (WpCore * self, GCancellable * cancellable,
GAsyncReadyCallback callback, gpointer user_data)
{
g_autoptr (GTask) task = NULL;
int seq;
2019-12-06 08:43:40 -05:00
g_return_val_if_fail (WP_IS_CORE (self), FALSE);
2019-12-04 15:20:42 -05:00
task = g_task_new (self, cancellable, callback, user_data);
2020-01-09 12:39:45 -05:00
if (G_UNLIKELY (!self->pw_core)) {
2019-12-04 15:20:42 -05:00
g_warn_if_reached ();
g_task_return_new_error (task, WP_DOMAIN_LIBRARY,
2020-01-09 12:39:45 -05:00
WP_LIBRARY_ERROR_INVARIANT, "No pipewire core");
2019-12-06 08:43:40 -05:00
return FALSE;
2019-12-04 15:20:42 -05:00
}
2020-01-09 12:39:45 -05:00
seq = pw_core_sync (self->pw_core, 0, 0);
2019-12-04 15:20:42 -05:00
if (G_UNLIKELY (seq < 0)) {
g_task_return_new_error (task, WP_DOMAIN_LIBRARY,
2020-01-09 12:39:45 -05:00
WP_LIBRARY_ERROR_OPERATION_FAILED, "pw_core_sync failed: %s",
2019-12-04 15:20:42 -05:00
g_strerror (-seq));
2019-12-06 08:43:40 -05:00
return FALSE;
2019-12-04 15:20:42 -05:00
}
g_hash_table_insert (self->async_tasks, GINT_TO_POINTER (seq),
g_steal_pointer (&task));
2019-12-06 08:43:40 -05:00
return TRUE;
2019-12-04 15:20:42 -05:00
}
2020-02-17 15:39:19 +02:00
/**
* wp_core_sync_finish:
* @self: the core
* @res: a #GAsyncResult
* @error: (out) (optional): the error that occurred, if any
*
* This function is meant to be called from within the callback of
* wp_core_sync() in order to determine the success or failure of the operation.
*
* Returns: %TRUE if the operation succeeded, %FALSE otherwise
*/
gboolean
wp_core_sync_finish (WpCore * self, GAsyncResult * res, GError ** error)
{
g_return_val_if_fail (WP_IS_CORE (self), FALSE);
g_return_val_if_fail (g_task_is_valid (res, self), FALSE);
return g_task_propagate_boolean (G_TASK (res), error);
}
WpRegistry *
wp_core_get_registry (WpCore * self)
{
return &self->registry;
}
WpCore *
wp_registry_get_core (WpRegistry * self)
{
return SPA_CONTAINER_OF (self, WpCore, registry);
}