diff --git a/src/modules/module-protocol-pulse/modules/module-combine-sink.c b/src/modules/module-protocol-pulse/modules/module-combine-sink.c index b7e034698..495f76af3 100644 --- a/src/modules/module-protocol-pulse/modules/module-combine-sink.c +++ b/src/modules/module-protocol-pulse/modules/module-combine-sink.c @@ -39,6 +39,8 @@ PW_LOG_TOPIC_STATIC(mod_topic, "mod." NAME); #define MAX_SINKS 64 /* ... good enough for anyone */ +#define TIMEOUT_SINKS_MSEC 2000 + static const struct spa_dict_item module_combine_sink_info[] = { { PW_KEY_MODULE_AUTHOR, "Arun Raghavan " }, { PW_KEY_MODULE_DESCRIPTION, "Combine multiple sinks into a single sink" }, @@ -61,6 +63,7 @@ struct combine_stream { struct spa_hook stream_listener; struct module_combine_sink_data *data; bool cleanup; + bool started; }; struct module_combine_sink_data { @@ -80,8 +83,14 @@ struct module_combine_sink_data { struct combine_stream streams[MAX_SINKS]; struct spa_source *cleanup; + struct spa_source *sinks_timeout; struct spa_audio_info_raw info; + + unsigned int sinks_pending; + unsigned int source_started:1; + unsigned int load_emitted:1; + unsigned int start_error:1; }; /* Core connection: mainly to unload the module if the connection errors out */ @@ -161,6 +170,24 @@ static void capture_process(void *d) pw_stream_queue_buffer(data->sink, in); } +static void check_initialized(struct module_combine_sink_data *data) +{ + struct module *module = data->module; + + if (data->load_emitted) + return; + + if (data->start_error) { + pw_log_debug("module load error"); + data->load_emitted = true; + module_emit_loaded(module, -EIO); + } else if (data->sinks_pending == 0 && data->source_started) { + pw_log_debug("module loaded"); + data->load_emitted = true; + module_emit_loaded(module, 0); + } +} + static void on_in_stream_state_changed(void *d, enum pw_stream_state old, enum pw_stream_state state, const char *error) { @@ -168,6 +195,14 @@ static void on_in_stream_state_changed(void *d, enum pw_stream_state old, struct module *module = data->module; uint32_t i; + if (!data->source_started && state != PW_STREAM_STATE_CONNECTING) { + /* Input stream appears on server */ + data->source_started = true; + if (state < PW_STREAM_STATE_PAUSED) + data->start_error = true; + check_initialized(data); + } + switch (state) { case PW_STREAM_STATE_PAUSED: pw_stream_flush(data->sink, false); @@ -199,6 +234,16 @@ static void on_out_stream_state_changed(void *data, enum pw_stream_state old, { struct combine_stream *s = data; + if (!s->started && state != PW_STREAM_STATE_CONNECTING) { + /* Output stream appears on server */ + s->started = true; + if (s->data->sinks_pending > 0) + --s->data->sinks_pending; + if (state < PW_STREAM_STATE_PAUSED) + s->data->start_error = true; + check_initialized(s->data); + } + if (state == PW_STREAM_STATE_UNCONNECTED) { s->cleanup = true; pw_loop_signal_event(s->data->module->impl->loop, s->data->cleanup); @@ -276,7 +321,7 @@ static void manager_added(void *d, struct pw_manager_object *o) cstream->stream = pw_stream_new(data->core, NULL, props); if (cstream->stream == NULL) { pw_log_error("Could not create stream"); - return; + goto error; } pw_stream_add_listener(cstream->stream, @@ -296,8 +341,14 @@ static void manager_added(void *d, struct pw_manager_object *o) PW_STREAM_FLAG_RT_PROCESS, params, n_params)) < 0) { pw_log_error("Could not connect to sink '%s'", sink_name); - return; + goto error; } + + return; + +error: + data->start_error = true; + check_initialized(data); } static const struct pw_manager_events manager_events = { @@ -326,6 +377,17 @@ static void on_cleanup(void *d, uint64_t count) } } +static void on_sinks_timeout(void *d, uint64_t count) +{ + struct module_combine_sink_data *data = d; + + if (data->load_emitted) + return; + + data->start_error = true; + check_initialized(data); +} + static int module_combine_sink_load(struct client *client, struct module *module) { struct module_combine_sink_data *data = module->user_data; @@ -389,7 +451,15 @@ static int module_combine_sink_load(struct client *client, struct module *module data->cleanup = pw_loop_add_event(module->impl->loop, on_cleanup, data); - return 0; + data->sinks_timeout = pw_loop_add_timer(module->impl->loop, on_sinks_timeout, data); + if (data->sinks_timeout) { + struct timespec timeout = {0}, interval = {0}; + timeout.tv_sec = TIMEOUT_SINKS_MSEC / 1000; + timeout.tv_nsec = (TIMEOUT_SINKS_MSEC % 1000) * SPA_NSEC_PER_MSEC; + pw_loop_update_timer(module->impl->loop, data->sinks_timeout, &timeout, &interval, false); + } + + return data->load_emitted ? 0 : SPA_RESULT_RETURN_ASYNC(0); } static int module_combine_sink_unload(struct module *module) @@ -400,6 +470,9 @@ static int module_combine_sink_unload(struct module *module) if (d->cleanup != NULL) pw_loop_destroy_source(module->impl->loop, d->cleanup); + if (d->sinks_timeout != NULL) + pw_loop_destroy_source(module->impl->loop, d->sinks_timeout); + /* Note that we explicitly disconnect the hooks to avoid having the * cleanup triggered again in those callbacks */ if (d->sink != NULL) { @@ -442,7 +515,8 @@ struct module *create_module_combine_sink(struct impl *impl, const char *argumen const char *str; char *sink_name = NULL, **sink_names = NULL; struct spa_audio_info_raw info = { 0 }; - int i, n, res; + int i, res; + int num_sinks = 0; PW_LOG_TOPIC_INIT(mod_topic); @@ -462,7 +536,7 @@ struct module *create_module_combine_sink(struct impl *impl, const char *argumen } if ((str = pw_properties_get(props, "slaves")) != NULL) { - sink_names = pw_split_strv(str, ",", MAX_SINKS, &n); + sink_names = pw_split_strv(str, ",", MAX_SINKS, &num_sinks); pw_properties_set(props, "slaves", NULL); } @@ -493,6 +567,7 @@ struct module *create_module_combine_sink(struct impl *impl, const char *argumen d->info = info; d->sink_name = sink_name; d->sink_names = sink_names; + d->sinks_pending = (sink_names == NULL) ? 0 : num_sinks; for (i = 0; i < MAX_SINKS; i++) { d->streams[i].stream = NULL; d->streams[i].cleanup = false;