wireplumber/tests/wp/proxy.c
George Kiagiadakis e7e5c66853 lib: introduce WpObjectManager
* rework how global objects are stored in the core
* rework how users get notified about global objects
  and proxies of remote global objects

The purpose of this change is to have a class that can manage
objects that are registered in the core or signalled through the
registry. This object can declare interest on certain types
of global objects and only keep & signal those objects that it is
interested in. Additionally, it can prepare proxy features and
asynchronously deliver an 'objects-changed' signal, which is
basically telling us that the list of objects has changed.

This is useful to simplify port proxies management in WpAudioStream.
Now the stream object can declare that it is interested in ports
that have "node.id" == X and the object manager will only maintain
a list of those. Additionally, it will emit the 'objects-changed'
signal when the list of ports is complete, so there is no reason to
do complex operations and core syncs in the WpAudioStream class
in order to figure out when the list of ports is ready.

As a side effect, this also reduces resource management. Now we
don't construct a WpProxy for every global that pipewire reports;
we only construct proxies when there is interest in them!

Another interesting side effect is that we can now register an
object manager at any point in time and get immediately notified
about remote globals that already exist. i.e. when you register
an object manager that is interested in nodes, it will be immediately
notified about all the existing nodes in the graph. This is useful
to avoid race conditions between connecting the signal and objects
beting created in pipewire
2019-11-13 15:49:39 +02:00

299 lines
8.7 KiB
C

/* WirePlumber
*
* Copyright © 2019 Collabora Ltd.
* @author George Kiagiadakis <george.kiagiadakis@collabora.com>
*
* SPDX-License-Identifier: MIT
*/
#include <wp/wp.h>
#include <pipewire/pipewire.h>
#include <spa/pod/iter.h>
#include "test-server.h"
typedef struct {
/* the local pipewire server */
WpTestServer server;
/* the main loop */
GMainContext *context;
GMainLoop *loop;
GSource *timeout_source;
/* the client wireplumber core */
WpCore *core;
/* the object manager that listens for proxies */
WpObjectManager *om;
} TestProxyFixture;
static gboolean
timeout_callback (TestProxyFixture *fixture)
{
g_message ("test timed out");
g_test_fail ();
g_main_loop_quit (fixture->loop);
return G_SOURCE_REMOVE;
}
static void
test_proxy_remote_state_changed (WpCore *core, WpRemoteState state,
TestProxyFixture *fixture)
{
const gchar * msg = NULL;
switch (state) {
case WP_REMOTE_STATE_ERROR:
wp_core_get_remote_state (core, &msg);
g_message ("remote error: %s", msg);
g_test_fail ();
g_main_loop_quit (fixture->loop);
break;
default:
break;
}
}
static void
test_proxy_setup (TestProxyFixture *self, gconstpointer user_data)
{
g_autoptr (WpProperties) props = NULL;
wp_test_server_setup (&self->server);
props = wp_properties_new (PW_KEY_REMOTE_NAME, self->server.name, NULL);
self->context = g_main_context_new ();
self->loop = g_main_loop_new (self->context, FALSE);
self->core = wp_core_new (self->context, props);
self->om = wp_object_manager_new ();
g_main_context_push_thread_default (self->context);
/* watchdogs */
g_signal_connect (self->core, "remote-state-changed",
(GCallback) test_proxy_remote_state_changed, self);
self->timeout_source = g_timeout_source_new_seconds (3);
g_source_set_callback (self->timeout_source, (GSourceFunc) timeout_callback,
self, NULL);
g_source_attach (self->timeout_source, self->context);
}
static void
test_proxy_teardown (TestProxyFixture *self, gconstpointer user_data)
{
g_main_context_pop_thread_default (self->context);
g_clear_object (&self->om);
g_clear_object (&self->core);
g_clear_pointer (&self->timeout_source, g_source_unref);
g_clear_pointer (&self->loop, g_main_loop_unref);
g_clear_pointer (&self->context, g_main_context_unref);
wp_test_server_teardown (&self->server);
}
static void
test_proxy_basic_done (WpProxy *proxy, GAsyncResult *res,
TestProxyFixture *fixture)
{
g_autoptr (GError) error = NULL;
g_assert_true (wp_proxy_sync_finish (proxy, res, &error));
g_assert_no_error (error);
g_main_loop_quit (fixture->loop);
}
static void
test_proxy_basic_augmented (WpProxy *proxy, GAsyncResult *res,
TestProxyFixture *fixture)
{
g_autoptr (GError) error = NULL;
g_assert_true (wp_proxy_augment_finish (proxy, res, &error));
g_assert_no_error (error);
g_assert_true (wp_proxy_get_features (proxy) & WP_PROXY_FEATURE_PW_PROXY);
g_assert_nonnull (wp_proxy_get_pw_proxy (proxy));
wp_proxy_sync (proxy, NULL, (GAsyncReadyCallback) test_proxy_basic_done,
fixture);
}
static void
test_proxy_basic_object_added (WpObjectManager *om, WpProxy *proxy,
TestProxyFixture *fixture)
{
g_assert_nonnull (proxy);
{
g_autoptr (WpCore) pcore = NULL;
g_autoptr (WpCore) omcore = NULL;
g_object_get (proxy, "core", &pcore, NULL);
g_object_get (om, "core", &omcore, NULL);
g_assert_nonnull (pcore);
g_assert_nonnull (omcore);
g_assert_true (pcore == omcore);
}
g_assert_cmpuint (wp_proxy_get_global_id (proxy), !=, 0);
g_assert_true (wp_proxy_is_global (proxy));
g_assert_cmpuint (wp_proxy_get_interface_quark (proxy), ==,
g_quark_from_string ("client"));
g_assert_cmpuint (wp_proxy_get_interface_type (proxy), ==,
PW_TYPE_INTERFACE_Client);
g_assert_cmpstr (wp_proxy_get_interface_name (proxy), ==,
"PipeWire:Interface:Client");
g_assert_cmphex (wp_proxy_get_global_permissions (proxy), ==, PW_PERM_RWX);
g_assert_true (WP_IS_PROXY_CLIENT (proxy));
g_assert_cmphex (wp_proxy_get_features (proxy), ==, 0);
g_assert_null (wp_proxy_get_pw_proxy (proxy));
{
g_autoptr (WpProperties) props = wp_proxy_get_global_properties (proxy);
g_assert_nonnull (props);
g_assert_cmpstr (wp_properties_get (props, PW_KEY_PROTOCOL), ==,
"protocol-native");
}
wp_proxy_augment (proxy, WP_PROXY_FEATURE_PW_PROXY, NULL,
(GAsyncReadyCallback) test_proxy_basic_augmented, fixture);
}
static void
test_proxy_basic (TestProxyFixture *fixture, gconstpointer data)
{
/* our test server should advertise exactly one
* client: our WpRemote; use this to test WpProxy */
g_signal_connect (fixture->om, "object-added",
(GCallback) test_proxy_basic_object_added, fixture);
wp_object_manager_add_proxy_interest (fixture->om,
PW_TYPE_INTERFACE_Client, NULL, 0);
wp_core_install_object_manager (fixture->core, fixture->om);
g_assert_true (wp_core_connect (fixture->core));
g_main_loop_run (fixture->loop);
}
typedef struct {
TestProxyFixture *fixture;
guint n_params;
} TestProxyNodeParamData;
static void
test_proxy_node_param (WpProxyNode *node, int seq, guint id, guint index,
guint next, struct spa_pod *param, TestProxyNodeParamData *data)
{
data->n_params++;
}
static void
test_proxy_node_enum_params_done (WpProxyNode *node, GAsyncResult *res,
TestProxyNodeParamData *data)
{
g_autoptr (GPtrArray) params = NULL;
g_autoptr (GError) error = NULL;
guint i;
params = wp_proxy_node_enum_params_collect_finish (node, res, &error);
g_assert_no_error (error);
g_assert_nonnull (params);
/* the param signal must have also been fired for all params */
g_assert_cmpint (params->len, ==, data->n_params);
for (i = 0; i < params->len; i++) {
struct spa_pod *pod = g_ptr_array_index(params, i);
g_assert_true (spa_pod_is_object_type (pod, SPA_TYPE_OBJECT_PropInfo));
}
g_main_loop_quit (data->fixture->loop);
g_free (data);
}
static void
test_proxy_node_object_added (WpObjectManager *om, WpProxy *proxy,
TestProxyFixture *fixture)
{
const struct pw_node_info *info;
TestProxyNodeParamData *param_data;
g_assert_nonnull (proxy);
g_assert_true (wp_proxy_is_global (proxy));
g_assert_cmpuint (wp_proxy_get_interface_type (proxy), ==,
PW_TYPE_INTERFACE_Node);
g_assert_cmphex (wp_proxy_get_features (proxy), ==,
WP_PROXY_FEATURE_PW_PROXY | WP_PROXY_FEATURE_INFO);
g_assert_nonnull (wp_proxy_get_pw_proxy (proxy));
g_assert_true (WP_IS_PROXY_NODE (proxy));
info = wp_proxy_node_get_info (WP_PROXY_NODE (proxy));
g_assert_nonnull (info);
g_assert_cmpint (wp_proxy_get_global_id (proxy), ==, info->id);
{
const char *id;
g_autoptr (WpProperties) props =
wp_proxy_node_get_properties (WP_PROXY_NODE (proxy));
g_assert_nonnull (props);
g_assert_true (wp_properties_peek_dict (props) == info->props);
id = wp_properties_get (props, PW_KEY_OBJECT_ID);
g_assert_nonnull (id);
g_assert_cmpint (info->id, ==, atoi(id));
}
param_data = g_new0 (TestProxyNodeParamData, 1);
param_data->fixture = fixture;
g_signal_connect (proxy, "param", (GCallback) test_proxy_node_param,
param_data);
wp_proxy_node_enum_params_collect (WP_PROXY_NODE (proxy), SPA_PARAM_PropInfo,
NULL, NULL, (GAsyncReadyCallback) test_proxy_node_enum_params_done,
param_data);
}
static void
test_proxy_node (TestProxyFixture *fixture, gconstpointer data)
{
/* load audiotestsrc on the server side */
pw_thread_loop_lock (fixture->server.thread_loop);
pw_core_add_spa_lib (fixture->server.core, "audiotestsrc",
"audiotestsrc/libspa-audiotestsrc");
if (!pw_module_load (fixture->server.core, "libpipewire-module-spa-node",
"audiotestsrc", NULL)) {
pw_thread_loop_unlock (fixture->server.thread_loop);
g_test_skip ("audiotestsrc SPA plugin is not installed");
return;
}
pw_thread_loop_unlock (fixture->server.thread_loop);
/* we should be able to see this exported audiotestsrc node on the client */
g_signal_connect (fixture->om, "object-added",
(GCallback) test_proxy_node_object_added, fixture);
/* declare interest and set default features to be ready
when the signal is fired */
wp_object_manager_add_proxy_interest (fixture->om,
PW_TYPE_INTERFACE_Node, NULL,
WP_PROXY_FEATURE_PW_PROXY | WP_PROXY_FEATURE_INFO);
wp_core_install_object_manager (fixture->core, fixture->om);
g_assert_true (wp_core_connect (fixture->core));
g_main_loop_run (fixture->loop);
}
gint
main (gint argc, gchar *argv[])
{
g_test_init (&argc, &argv, NULL);
pw_init (NULL, NULL);
g_test_add ("/wp/proxy/basic", TestProxyFixture, NULL,
test_proxy_setup, test_proxy_basic, test_proxy_teardown);
g_test_add ("/wp/proxy/node", TestProxyFixture, NULL,
test_proxy_setup, test_proxy_node, test_proxy_teardown);
return g_test_run ();
}