Merge branch 'ev_hook_topo_sort' into 'master'

Draft: event-dispatcher: sort hooks based on dependencies

See merge request pipewire/wireplumber!750
This commit is contained in:
Barnabás Pőcze 2025-11-21 15:25:05 +01:00
commit 30b57c3207
3 changed files with 267 additions and 262 deletions

View file

@ -15,6 +15,17 @@
WP_DEFINE_LOCAL_LOG_TOPIC ("wp-event-dispatcher")
typedef struct _HookData HookData;
struct _HookData
{
WpEventHook *hook;
struct spa_list dispatcher_link; /* WpEventDispatcher::hooks_by_order */
/* for topo sort */
size_t n_dependencies;
struct spa_list queue_link;
};
typedef struct _EventData EventData;
struct _EventData
{
@ -49,7 +60,8 @@ struct _WpEventDispatcher
GObject parent;
GWeakRef core;
GPtrArray *hooks; /* registered hooks */
GHashTable *hooks_by_ptr; /* WpEventHook * -> HookData */
struct spa_list hooks_by_order; /* HookData::dispatcher_link */
GSource *source; /* the event loop source */
GList *events; /* the events stack */
struct spa_system *system;
@ -156,11 +168,35 @@ static GSourceFuncs source_funcs = {
NULL
};
#define for_each_hook_in_order(self, iter) \
spa_list_for_each (iter, &(self)->hooks_by_order, dispatcher_link)
static inline HookData *
hook_data_new (WpEventDispatcher *dispatcher, WpEventHook *hook)
{
HookData *self = g_new0 (HookData, 1);
self->hook = g_object_ref (hook);
spa_list_append (&dispatcher->hooks_by_order, &self->dispatcher_link);
return self;
}
static void
hook_data_free (HookData *self)
{
spa_list_remove (&self->dispatcher_link);
g_clear_object (&self->hook);
g_free (self);
}
G_DEFINE_AUTOPTR_CLEANUP_FUNC(HookData, hook_data_free)
static void
wp_event_dispatcher_init (WpEventDispatcher * self)
{
g_weak_ref_init (&self->core, NULL);
self->hooks = g_ptr_array_new_with_free_func (g_object_unref);
self->hooks_by_ptr = g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL, (GDestroyNotify) hook_data_free);
spa_list_init (&self->hooks_by_order);
self->source = g_source_new (&source_funcs, sizeof (WpEventSource));
((WpEventSource *) self->source)->dispatcher = self;
@ -184,7 +220,7 @@ wp_event_dispatcher_finalize (GObject * object)
close (self->eventfd);
g_clear_pointer (&self->hooks, g_ptr_array_unref);
g_hash_table_destroy (self->hooks_by_ptr);
g_weak_ref_clear (&self->core);
G_OBJECT_CLASS (wp_event_dispatcher_parent_class)->finalize (object);
@ -259,6 +295,8 @@ wp_event_dispatcher_push_event (WpEventDispatcher * self, WpEvent * event)
g_return_if_fail (WP_IS_EVENT_DISPATCHER (self));
g_return_if_fail (event != NULL);
wp_debug_object (self, "pushing event (%s)", wp_event_get_name (event));
if (wp_event_collect_hooks (event, self)) {
EventData *event_data = event_data_new (event);
@ -273,6 +311,165 @@ wp_event_dispatcher_push_event (WpEventDispatcher * self, WpEvent * event)
wp_event_unref (event);
}
typedef struct _HookDependency HookDependency;
struct _HookDependency {
HookData *a;
HookData *b;
};
static int
cmp_hook_dependency (gconstpointer a, gconstpointer b)
{
const HookDependency *x = a, *y = b;
if ((uintptr_t) x->a < (uintptr_t) y->a)
return -1;
if ((uintptr_t) x->a > (uintptr_t) y->a)
return +1;
return 0;
}
static gboolean
sort_hooks (WpEventDispatcher *self)
{
const size_t n_total = g_hash_table_size (self->hooks_by_ptr);
wp_debug_object (self, "sorting %zu hooks", n_total);
HookData *hook_data;
for_each_hook_in_order (self, hook_data) {
hook_data->n_dependencies = 0;
wp_debug_object (self, "consider hook %p:%s", hook_data->hook, wp_event_hook_get_name (hook_data->hook));
}
g_autoptr (GArray) all_deps = g_array_new (FALSE, FALSE, sizeof (HookDependency));
for_each_hook_in_order (self, hook_data) {
WpEventHook *hook = hook_data->hook;
const char * const * deps;
deps = wp_event_hook_get_runs_before_hooks (hook);
if (deps) {
for (size_t i = 0; deps[i]; i++) {
g_autoptr (GPatternSpec) pat = g_pattern_spec_new (deps[i]);
HookData *dep_hook_data;
for_each_hook_in_order (self, dep_hook_data) {
if (hook_data == dep_hook_data)
continue;
WpEventHook *dep_hook = dep_hook_data->hook;
const char *dep_name = wp_event_hook_get_name (dep_hook);
if (!g_pattern_match_string (pat, dep_name))
continue;
g_array_append_vals (all_deps, &(HookDependency) { hook_data, dep_hook_data }, 1);
dep_hook_data->n_dependencies += 1;
wp_debug_object (self, "adding '%s' -> '%s' dependency: %zu",
wp_event_hook_get_name (hook), wp_event_hook_get_name (dep_hook), dep_hook_data->n_dependencies);
}
}
}
deps = wp_event_hook_get_runs_after_hooks (hook);
if (deps) {
for (size_t i = 0; deps[i]; i++) {
g_autoptr (GPatternSpec) pat = g_pattern_spec_new (deps[i]);
HookData *dep_hook_data;
for_each_hook_in_order (self, dep_hook_data) {
if (hook_data == dep_hook_data)
continue;
WpEventHook *dep_hook = dep_hook_data->hook;
const char *dep_name = wp_event_hook_get_name (dep_hook);
if (!g_pattern_match_string (pat, dep_name))
continue;
g_array_append_vals (all_deps, &(HookDependency) { dep_hook_data, hook_data }, 1);
hook_data->n_dependencies += 1;
wp_debug_object (self, "adding '%s' <- '%s' dependency: %zu",
wp_event_hook_get_name (hook), wp_event_hook_get_name (dep_hook), hook_data->n_dependencies);
}
}
}
}
g_array_sort (all_deps, cmp_hook_dependency);
struct spa_list queue = SPA_LIST_INIT(&queue); /* HookData::queue_link */
for_each_hook_in_order (self, hook_data) {
if (hook_data->n_dependencies == 0) {
spa_list_append (&queue, &hook_data->queue_link);
wp_debug_object (self, "adding independent hook %p:%s", hook_data->hook, wp_event_hook_get_name (hook_data->hook));
}
}
struct spa_list final = SPA_LIST_INIT(&final); /* HookData::queue_link */
size_t n_processed = 0;
spa_list_consume (hook_data, &queue, queue_link) {
WpEventHook *hook = hook_data->hook;
spa_list_remove (&hook_data->queue_link);
spa_list_append (&final, &hook_data->queue_link);
wp_debug_object (self, "#%zu: %p:%s", n_processed, hook, wp_event_hook_get_name (hook));
n_processed += 1;
// FIXME: poor man's std::lower_bound...
guint start;
if (!g_array_binary_search (all_deps, &(HookDependency) { hook_data }, cmp_hook_dependency, &start))
continue;
// FIXME: not too optimal
while (start > 0 && g_array_index (all_deps, HookDependency, start - 1).a == hook_data)
start -= 1;
guint end = start;
for (; end < all_deps->len; end++) {
const HookDependency *d = &g_array_index (all_deps, HookDependency, end);
if (d->a != hook_data)
break;
HookData *dep_hook_data = d->b;
WpEventHook *dep_hook = dep_hook_data->hook;
wp_debug_object (self, "removing '%s <- '%s' dependency: %zu",
wp_event_hook_get_name (dep_hook), wp_event_hook_get_name (hook), dep_hook_data->n_dependencies);
g_assert (dep_hook_data->n_dependencies > 0);
if (--dep_hook_data->n_dependencies == 0) {
spa_list_append (&queue, &dep_hook_data->queue_link);
wp_debug_object (self, "adding node: %s", wp_event_hook_get_name (dep_hook));
}
}
// FIXME: is this worth it?
g_array_remove_range (all_deps, start, end - start);
}
g_assert (n_processed <= n_total);
if (n_processed != n_total)
return FALSE; /* circular dependency somewhere */
g_assert (all_deps->len == 0);
spa_list_init (&self->hooks_by_order);
spa_list_for_each (hook_data, &final, queue_link) {
g_assert (hook_data->n_dependencies == 0);
spa_list_append (&self->hooks_by_order, &hook_data->dispatcher_link);
}
return TRUE;
}
/*!
* \brief Registers an event hook
* \ingroup wpeventdispatcher
@ -287,12 +484,24 @@ wp_event_dispatcher_register_hook (WpEventDispatcher * self,
g_return_if_fail (WP_IS_EVENT_DISPATCHER (self));
g_return_if_fail (WP_IS_EVENT_HOOK (hook));
wp_debug_object (self, "register %p:%s", hook, wp_event_hook_get_name (hook));
g_autoptr (WpEventDispatcher) already_registered_dispatcher =
wp_event_hook_get_dispatcher (hook);
g_return_if_fail (already_registered_dispatcher == NULL);
g_return_if_fail (!g_hash_table_contains(self->hooks_by_ptr, hook));
g_hash_table_insert (self->hooks_by_ptr, hook, hook_data_new (self, hook));
if (!sort_hooks (self)) {
g_hash_table_remove (self->hooks_by_ptr, hook);
wp_critical_object (self,
"cannot register hook %p:%s with circular dependencies",
hook, wp_event_hook_get_name (hook));
return /* FALSE */;
}
wp_event_hook_set_dispatcher (hook, self);
g_ptr_array_add (self->hooks, g_object_ref (hook));
}
/*!
@ -309,12 +518,21 @@ wp_event_dispatcher_unregister_hook (WpEventDispatcher * self,
g_return_if_fail (WP_IS_EVENT_DISPATCHER (self));
g_return_if_fail (WP_IS_EVENT_HOOK (hook));
wp_debug_object (self, "unregister %p:%s", hook, wp_event_hook_get_name (hook));
g_autoptr (WpEventDispatcher) already_registered_dispatcher =
wp_event_hook_get_dispatcher (hook);
g_return_if_fail (already_registered_dispatcher == self);
gpointer value;
if (!g_hash_table_steal_extended (self->hooks_by_ptr, hook, NULL, &value))
return /* FALSE */;
g_autoptr (HookData) hook_data = value;
g_assert (hook_data->hook == hook);
wp_event_hook_set_dispatcher (hook, NULL);
g_ptr_array_remove_fast (self->hooks, hook);
spa_list_remove (&hook_data->dispatcher_link);
}
/*!
@ -327,7 +545,39 @@ wp_event_dispatcher_unregister_hook (WpEventDispatcher * self,
WpIterator *
wp_event_dispatcher_new_hooks_iterator (WpEventDispatcher * self)
{
GPtrArray *items =
g_ptr_array_copy (self->hooks, (GCopyFunc) g_object_ref, NULL);
return wp_iterator_new_ptr_array (items, WP_TYPE_EVENT_HOOK);
g_autoptr (GPtrArray) res = g_ptr_array_new_with_free_func (g_object_unref);
HookData *hook_data;
for_each_hook_in_order (self, hook_data)
g_ptr_array_add (res, g_object_ref (hook_data->hook));
return wp_iterator_new_ptr_array (g_steal_pointer (&res), WP_TYPE_EVENT_HOOK);
}
/*!
* \brief Collect the matching hooks
* \ingroup wpeventdispatcher
*
* \param self the event dispatcher
* \param event the event
* \param res the array to save the matching hooks
* \return number of matching hooks
*/
size_t
wp_event_dispatcher_get_hooks_for_event (WpEventDispatcher * self, WpEvent * event, GPtrArray * res)
{
HookData *hook_data;
size_t cnt = 0;
for_each_hook_in_order (self, hook_data) {
WpEventHook *hook = hook_data->hook;
if (wp_event_hook_runs_for_event (hook, event)) {
g_ptr_array_add (res, g_object_ref (hook));
wp_debug_object (self, "adding hook '%s' for event %p", wp_event_hook_get_name (hook), event);
cnt += 1;
}
}
return cnt;
}

View file

@ -43,6 +43,9 @@ void wp_event_dispatcher_unregister_hook (WpEventDispatcher * self,
WP_API
WpIterator * wp_event_dispatcher_new_hooks_iterator (WpEventDispatcher * self);
WP_API
size_t wp_event_dispatcher_get_hooks_for_event (WpEventDispatcher * self, WpEvent * event, GPtrArray * res);
G_END_DECLS
#endif

View file

@ -17,37 +17,11 @@
WP_DEFINE_LOCAL_LOG_TOPIC ("wp-event")
typedef struct _HookData HookData;
struct _HookData
{
struct spa_list link;
WpEventHook *hook;
GPtrArray *dependencies;
};
static inline HookData *
hook_data_new (WpEventHook * hook)
{
HookData *hook_data = g_new0 (HookData, 1);
spa_list_init (&hook_data->link);
hook_data->hook = g_object_ref (hook);
hook_data->dependencies = g_ptr_array_new ();
return hook_data;
}
static void
hook_data_free (HookData *self)
{
g_clear_object (&self->hook);
g_clear_pointer (&self->dependencies, g_ptr_array_unref);
g_free (self);
}
struct _WpEvent
{
grefcount ref;
GData *datalist;
struct spa_list hooks;
GPtrArray *hooks;
/* immutable fields */
gint priority;
@ -96,7 +70,7 @@ wp_event_new (const gchar * type, gint priority, WpProperties * properties,
WpEvent * self = g_new0 (WpEvent, 1);
g_ref_count_init (&self->ref);
g_datalist_init (&self->datalist);
spa_list_init (&self->hooks);
self->hooks = g_ptr_array_new_with_free_func (g_object_unref);
self->priority = priority;
self->properties = properties ?
@ -155,11 +129,7 @@ wp_event_get_name(WpEvent *self)
static void
wp_event_free (WpEvent * self)
{
HookData *hook_data;
spa_list_consume (hook_data, &self->hooks, link) {
spa_list_remove (&hook_data->link);
hook_data_free (hook_data);
}
g_ptr_array_free (self->hooks, TRUE);
g_datalist_clear (&self->datalist);
g_clear_pointer (&self->properties, wp_properties_unref);
g_clear_object (&self->source);
@ -316,33 +286,6 @@ wp_event_get_data (WpEvent * self, const gchar * key)
return g_datalist_get_data (&self->datalist, key);
}
static inline void
record_dependency (struct spa_list *list, const gchar *target,
const gchar *dependency)
{
HookData *hook_data;
spa_list_for_each (hook_data, list, link) {
if (g_pattern_match_simple (target, wp_event_hook_get_name (hook_data->hook))) {
g_ptr_array_insert (hook_data->dependencies, -1, (gchar *) dependency);
break;
}
}
}
static inline gboolean
hook_exists_in (const gchar *hook_name, struct spa_list *list)
{
HookData *hook_data;
if (!spa_list_is_empty (list)) {
spa_list_for_each (hook_data, list, link) {
if (g_pattern_match_simple (hook_name, wp_event_hook_get_name (hook_data->hook))) {
return TRUE;
}
}
}
return FALSE;
}
/*!
* \brief Collects all the hooks registered in the \a dispatcher that run for
* this \a event
@ -355,199 +298,16 @@ hook_exists_in (const gchar *hook_name, struct spa_list *list)
gboolean
wp_event_collect_hooks (WpEvent * event, WpEventDispatcher * dispatcher)
{
struct spa_list collected, result, remaining;
g_autoptr (WpIterator) all_hooks = NULL;
g_auto (GValue) value = G_VALUE_INIT;
g_return_val_if_fail (event != NULL, FALSE);
g_return_val_if_fail (WP_IS_EVENT_DISPATCHER (dispatcher), FALSE);
/* hooks already collected */
if (!spa_list_is_empty (&event->hooks))
if (event->hooks->len > 0)
return TRUE;
spa_list_init (&collected);
spa_list_init (&result);
spa_list_init (&remaining);
/* collect hooks that run for this event */
all_hooks = wp_event_dispatcher_new_hooks_iterator (dispatcher);
while (wp_iterator_next (all_hooks, &value)) {
WpEventHook *hook = g_value_get_object (&value);
if (wp_event_hook_runs_for_event (hook, event)) {
HookData *hook_data = hook_data_new (hook);
/* record "after" dependencies directly */
const gchar * const * strv =
wp_event_hook_get_runs_after_hooks (hook_data->hook);
while (strv && *strv) {
g_ptr_array_insert (hook_data->dependencies, -1, (gchar *) *strv);
strv++;
}
spa_list_append (&collected, &hook_data->link);
wp_debug_boxed (WP_TYPE_EVENT, event, "added "WP_OBJECT_FORMAT"(%s)",
WP_OBJECT_ARGS (hook), wp_event_hook_get_name (hook));
}
g_value_unset (&value);
}
if (!spa_list_is_empty (&collected)) {
HookData *hook_data;
/* convert "before" dependencies into "after" dependencies */
spa_list_for_each (hook_data, &collected, link) {
const gchar * const * strv =
wp_event_hook_get_runs_before_hooks (hook_data->hook);
while (strv && *strv) {
/* record hook_data->hook as a dependency of the *strv hook */
record_dependency (&collected, *strv,
wp_event_hook_get_name (hook_data->hook));
strv++;
}
}
/* sort */
while (!spa_list_is_empty (&collected)) {
gboolean made_progress = FALSE;
/* examine each hook to see if its dependencies are satisfied in the
result list; if yes, then append it to the result too */
spa_list_consume (hook_data, &collected, link) {
guint deps_satisfied = 0;
spa_list_remove (&hook_data->link);
wp_trace_boxed (WP_TYPE_EVENT, event,
"examining: %s", wp_event_hook_get_name (hook_data->hook));
for (guint i = 0; i < hook_data->dependencies->len; i++) {
const gchar *dep = g_ptr_array_index (hook_data->dependencies, i);
/* if the dependency is already in the sorted result list or if
it doesn't exist at all, we consider it satisfied */
if (hook_exists_in (dep, &result) ||
!(hook_exists_in (dep, &collected) ||
hook_exists_in (dep, &remaining))) {
deps_satisfied++;
}
wp_trace_boxed (WP_TYPE_EVENT, event, "depends: %s, satisfied: %u/%u",
dep, deps_satisfied, hook_data->dependencies->len);
}
if (deps_satisfied == hook_data->dependencies->len) {
wp_trace_boxed (WP_TYPE_EVENT, event,
"sorted: "WP_OBJECT_FORMAT"(%s)",
WP_OBJECT_ARGS (hook_data->hook),
wp_event_hook_get_name (hook_data->hook));
spa_list_append (&result, &hook_data->link);
made_progress = TRUE;
} else {
spa_list_append (&remaining, &hook_data->link);
}
}
if (made_progress) {
/* run again with the remaining hooks */
spa_list_insert_list (&collected, &remaining);
spa_list_init (&remaining);
}
else if (!spa_list_is_empty (&remaining)) {
/* if we did not make any progress towards growing the result list,
it means the dependencies cannot be satisfied because of circles */
wp_critical_boxed (WP_TYPE_EVENT, event, "detected circular "
"dependencies in the collected hooks!");
/* clean up */
spa_list_consume (hook_data, &result, link) {
spa_list_remove (&hook_data->link);
hook_data_free (hook_data);
}
spa_list_consume (hook_data, &remaining, link) {
spa_list_remove (&hook_data->link);
hook_data_free (hook_data);
}
return FALSE;
}
}
}
spa_list_insert_list (&event->hooks, &result);
return !spa_list_is_empty (&event->hooks);
return wp_event_dispatcher_get_hooks_for_event (dispatcher, event, event->hooks) > 0;
}
struct event_hooks_iterator_data
{
WpEvent *event;
HookData *cur;
};
static void
event_hooks_iterator_reset (WpIterator *it)
{
struct event_hooks_iterator_data *it_data = wp_iterator_get_user_data (it);
struct spa_list *list = &it_data->event->hooks;
if (!spa_list_is_empty (list))
it_data->cur = spa_list_first (&it_data->event->hooks, HookData, link);
}
static gboolean
event_hooks_iterator_next (WpIterator *it, GValue *item)
{
struct event_hooks_iterator_data *it_data = wp_iterator_get_user_data (it);
struct spa_list *list = &it_data->event->hooks;
if (!spa_list_is_empty (list) &&
!spa_list_is_end (it_data->cur, list, link)) {
g_value_init (item, WP_TYPE_EVENT_HOOK);
g_value_set_object (item, it_data->cur->hook);
it_data->cur = spa_list_next (it_data->cur, link);
return TRUE;
}
return FALSE;
}
static gboolean
event_hooks_iterator_fold (WpIterator *it, WpIteratorFoldFunc func, GValue *ret,
gpointer data)
{
struct event_hooks_iterator_data *it_data = wp_iterator_get_user_data (it);
struct spa_list *list = &it_data->event->hooks;
HookData *hook_data;
if (!spa_list_is_empty (list)) {
spa_list_for_each (hook_data, list, link) {
g_auto (GValue) item = G_VALUE_INIT;
g_value_init (&item, WP_TYPE_EVENT_HOOK);
g_value_set_object (&item, hook_data->hook);
if (!func (&item, ret, data))
return FALSE;
}
}
return TRUE;
}
static void
event_hooks_iterator_finalize (WpIterator *it)
{
struct event_hooks_iterator_data *it_data = wp_iterator_get_user_data (it);
wp_event_unref (it_data->event);
}
static const WpIteratorMethods event_hooks_iterator_methods = {
.version = WP_ITERATOR_METHODS_VERSION,
.reset = event_hooks_iterator_reset,
.next = event_hooks_iterator_next,
.fold = event_hooks_iterator_fold,
.finalize = event_hooks_iterator_finalize,
};
/*!
* \brief Returns an iterator that iterates over all the hooks that were
* collected by wp_event_collect_hooks()
@ -558,15 +318,7 @@ static const WpIteratorMethods event_hooks_iterator_methods = {
WpIterator *
wp_event_new_hooks_iterator (WpEvent * event)
{
WpIterator *it = NULL;
struct event_hooks_iterator_data *it_data;
g_return_val_if_fail (event != NULL, NULL);
it = wp_iterator_new (&event_hooks_iterator_methods,
sizeof (struct event_hooks_iterator_data));
it_data = wp_iterator_get_user_data (it);
it_data->event = wp_event_ref (event);
event_hooks_iterator_reset (it);
return it;
return wp_iterator_new_ptr_array (g_ptr_array_ref (event->hooks), WP_TYPE_EVENT_HOOK);
}