From 0f9fd45a588c909492a6ae40e66a6beea3713604 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Thu, 17 Jun 2021 10:46:04 +0200 Subject: [PATCH] jack: rework locking Ensure all callbacks are called from the thread_loop and release the thread lock before calling the callback. Introduce a new rt_lock that is taken while the callbacks are called. Only call the process_callback when we can acquire the rt_lock in the data thread. This ensures the process callback is never called concurrently with any other callback. Give a warning when we try to call do_sync from the thread_loop itself. We would deadlock because the thread that is supposed to do the sync operation would be blocked in wait(). Fixes #1313 --- pipewire-jack/src/pipewire-jack.c | 166 +++++++++++++++++------------- 1 file changed, 96 insertions(+), 70 deletions(-) diff --git a/pipewire-jack/src/pipewire-jack.c b/pipewire-jack/src/pipewire-jack.c index 9337bc310..3f54397b7 100644 --- a/pipewire-jack/src/pipewire-jack.c +++ b/pipewire-jack/src/pipewire-jack.c @@ -359,7 +359,7 @@ struct client { struct spa_list target_links; } rt; - int pending; + pthread_mutex_t rt_lock; unsigned int started:1; unsigned int active:1; @@ -680,6 +680,29 @@ void jack_get_version(int *major_ptr, int *minor_ptr, int *micro_ptr, int *proto *proto_ptr = 0; } +#define do_callback(c,callback,...) \ +({ \ + if (c->callback) { \ + pw_thread_loop_unlock(c->context.loop); \ + pthread_mutex_lock(&c->rt_lock); \ + c->callback(__VA_ARGS__); \ + pthread_mutex_unlock(&c->rt_lock); \ + pw_thread_loop_lock(c->context.loop); \ + } \ +}) + +#define do_rt_callback_res(c,callback,...) \ +({ \ + int res = 0; \ + if (c->callback) { \ + if (pthread_mutex_trylock(&c->rt_lock) == 0) { \ + res = c->callback(__VA_ARGS__); \ + pthread_mutex_unlock(&c->rt_lock); \ + } \ + } \ + res; \ +}) + SPA_EXPORT const char * jack_get_version_string(void) @@ -709,8 +732,8 @@ static void on_error(void *data, uint32_t id, int seq, int res, const char *mess if (id == PW_ID_CORE) { client->error = true; client->last_res = res; - if (client->shutdown_callback && !client->destroyed) - client->shutdown_callback(client->shutdown_arg); + if (!client->destroyed) + do_callback(client, shutdown_callback, client->shutdown_arg); } pw_thread_loop_signal(client->context.loop, false); } @@ -725,6 +748,11 @@ static int do_sync(struct client *client) { int seq; + if (pw_thread_loop_in_thread(client->context.loop)) { + pw_log_warn("sync requested from callback"); + return 0; + } + seq = pw_proxy_sync((struct pw_proxy*)client->core, client->last_sync); while (true) { @@ -1073,25 +1101,18 @@ do_buffer_frames(struct spa_loop *loop, { uint32_t buffer_frames = *((uint32_t*)data); struct client *c = user_data; - if (c->bufsize_callback) - c->bufsize_callback(buffer_frames, c->bufsize_arg); - ATOMIC_DEC(c->pending); + do_callback(c, bufsize_callback, buffer_frames, c->bufsize_arg); return 0; } -static inline void check_buffer_frames(struct client *c, struct spa_io_position *pos, bool rt) +static inline void check_buffer_frames(struct client *c, struct spa_io_position *pos) { uint32_t buffer_frames = pos->clock.duration; if (SPA_UNLIKELY(buffer_frames != c->buffer_frames)) { pw_log_info(NAME" %p: bufferframes %d", c, buffer_frames); - ATOMIC_INC(c->pending); c->buffer_frames = buffer_frames; - if (rt) - pw_loop_invoke(c->context.l, do_buffer_frames, 0, - &buffer_frames, sizeof(buffer_frames), false, c); - else - do_buffer_frames(c->context.l->loop, false, 0, - &buffer_frames, sizeof(buffer_frames), c); + pw_loop_invoke(c->context.l, do_buffer_frames, 0, + &buffer_frames, sizeof(buffer_frames), false, c); } } @@ -1101,25 +1122,18 @@ do_sample_rate(struct spa_loop *loop, { struct client *c = user_data; uint32_t sample_rate = *((uint32_t*)data); - if (c->srate_callback) - c->srate_callback(sample_rate, c->srate_arg); - ATOMIC_DEC(c->pending); + do_callback(c, srate_callback, sample_rate, c->srate_arg); return 0; } -static inline void check_sample_rate(struct client *c, struct spa_io_position *pos, bool rt) +static inline void check_sample_rate(struct client *c, struct spa_io_position *pos) { uint32_t sample_rate = pos->clock.rate.denom; if (SPA_UNLIKELY(sample_rate != c->sample_rate)) { pw_log_info(NAME" %p: sample_rate %d", c, sample_rate); - ATOMIC_INC(c->pending); c->sample_rate = sample_rate; - if (rt) - pw_loop_invoke(c->context.l, do_sample_rate, 0, - &sample_rate, sizeof(sample_rate), false, c); - else - do_sample_rate(c->context.l->loop, false, 0, - &sample_rate, sizeof(sample_rate), c); + pw_loop_invoke(c->context.l, do_sample_rate, 0, + &sample_rate, sizeof(sample_rate), false, c); } } @@ -1161,8 +1175,8 @@ static inline uint32_t cycle_run(struct client *c) return 0; } - check_buffer_frames(c, pos, true); - check_sample_rate(c, pos, true); + check_buffer_frames(c, pos); + check_sample_rate(c, pos); if (SPA_LIKELY(driver)) { c->jack_state = position_to_jack(driver, &c->jack_position); @@ -1281,8 +1295,7 @@ on_rtsocket_condition(void *data, int fd, uint32_t mask) buffer_frames = cycle_run(c); - if (!ATOMIC_LOAD(c->pending) && c->process_callback) - status = c->process_callback(buffer_frames, c->process_arg); + status = do_rt_callback_res(c, process_callback, buffer_frames, c->process_arg); cycle_signal(c, status); } @@ -1417,8 +1430,7 @@ static int update_driver_activation(struct client *c) freewheeling = SPA_FLAG_IS_SET(c->position->clock.flags, SPA_IO_CLOCK_FLAG_FREEWHEEL); if (c->freewheeling != freewheeling) { c->freewheeling = freewheeling; - if (c->freewheel_callback) - c->freewheel_callback(freewheeling, c->freewheel_arg); + do_callback(c, freewheel_callback, freewheeling, c->freewheel_arg); } link = find_activation(&c->links, c->driver_id); @@ -1463,7 +1475,7 @@ static int client_node_set_io(void *object, c->driver_id = ptr ? c->position->clock.id : SPA_ID_INVALID; update_driver_activation(c); if (ptr) - check_sample_rate(c, c->position, false); + check_sample_rate(c, c->position); break; default: break; @@ -1813,7 +1825,7 @@ static int port_set_latency(struct client *c, struct port *p, mode = JackCaptureLatency; if (c->latency_callback) - c->latency_callback(mode, c->latency_arg); + do_callback(c, latency_callback, mode, c->latency_arg); else default_latency_callback(mode, c); @@ -1906,7 +1918,6 @@ static int client_node_port_use_buffers(void *object, res = -EINVAL; goto done; } - if ((mix = ensure_mix(c, p, mix_id)) == NULL) { res = -ENOMEM; goto done; @@ -2625,18 +2636,19 @@ static void registry_event_global(void *data, uint32_t id, switch (o->type) { case INTERFACE_Node: - if (c->registration_callback && is_first) - c->registration_callback(o->node.name, 1, c->registration_arg); + if (is_first) + do_callback(c, registration_callback, + o->node.name, 1, c->registration_arg); break; case INTERFACE_Port: - if (c->portregistration_callback) - c->portregistration_callback(o->id, 1, c->portregistration_arg); + do_callback(c, portregistration_callback, + o->id, 1, c->portregistration_arg); break; case INTERFACE_Link: - if (c->connect_callback) - c->connect_callback(o->port_link.src, o->port_link.dst, 1, c->connect_arg); + do_callback(c, connect_callback, + o->port_link.src, o->port_link.dst, 1, c->connect_arg); break; } @@ -2683,16 +2695,17 @@ static void registry_event_global_remove(void *object, uint32_t id) if (spa_streq(o->node.node_name, c->metadata->default_audio_source)) c->metadata->default_audio_source[0] = '\0'; } - if (c->registration_callback && is_last) - c->registration_callback(o->node.name, 0, c->registration_arg); + if (is_last) + do_callback(c, registration_callback, + o->node.name, 0, c->registration_arg); break; case INTERFACE_Port: - if (c->portregistration_callback) - c->portregistration_callback(o->id, 0, c->portregistration_arg); + do_callback(c, portregistration_callback, + o->id, 0, c->portregistration_arg); break; case INTERFACE_Link: - if (c->connect_callback) - c->connect_callback(o->port_link.src, o->port_link.dst, 0, c->connect_arg); + do_callback(c, connect_callback, + o->port_link.src, o->port_link.dst, 0, c->connect_arg); break; } @@ -2814,6 +2827,7 @@ jack_client_t * jack_client_open (const char *client_name, spa_list_init(&client->context.free_objects); pthread_mutex_init(&client->context.lock, NULL); + pthread_mutex_init(&client->rt_lock, NULL); spa_list_init(&client->context.nodes); spa_list_init(&client->context.ports); spa_list_init(&client->context.links); @@ -3000,6 +3014,7 @@ int jack_client_close (jack_client_t *client) pw_log_debug(NAME" %p: free", client); pw_map_clear(&c->context.globals); pthread_mutex_destroy(&c->context.lock); + pthread_mutex_destroy(&c->rt_lock); pw_properties_free(c->props); free(c); @@ -3093,14 +3108,11 @@ static int do_activate(struct client *c) { int res; - pw_thread_loop_lock(c->context.loop); - pw_log_info(NAME" %p: activate", c); pw_client_node_set_active(c->node, true); res = do_sync(c); - pw_thread_loop_unlock(c->context.loop); return res; } @@ -3108,22 +3120,27 @@ SPA_EXPORT int jack_activate (jack_client_t *client) { struct client *c = (struct client *) client; - int res; + int res = 0; spa_return_val_if_fail(c != NULL, -EINVAL); if (c->active) return 0; + pw_thread_loop_lock(c->context.loop); + if ((res = do_activate(c)) < 0) - return res; + goto done; c->activation->pending_new_pos = true; c->activation->pending_sync = true; c->active = true; if (c->position) - check_buffer_frames(c, c->position, false); + check_buffer_frames(c, c->position); + +done: + pw_thread_loop_unlock(c->context.loop); return 0; } @@ -4159,6 +4176,12 @@ int jack_port_set_alias (jack_port_t *port, const char *alias) pw_thread_loop_lock(c->context.loop); + p = GET_PORT(c, GET_DIRECTION(o->port.flags), o->port.port_id); + if (p == NULL || !p->valid) { + res = -EINVAL; + goto done; + } + if (o->port.alias1[0] == '\0') { key = PW_KEY_OBJECT_PATH; snprintf(o->port.alias1, sizeof(o->port.alias1), "%s", alias); @@ -4172,12 +4195,6 @@ int jack_port_set_alias (jack_port_t *port, const char *alias) goto done; } - p = GET_PORT(c, GET_DIRECTION(o->port.flags), o->port.port_id); - if (p == NULL || !p->valid) { - res = -EINVAL; - goto done; - } - pw_properties_set(p->props, key, alias); p->info.change_mask |= SPA_PORT_CHANGE_MASK_PROPS; @@ -4213,6 +4230,12 @@ int jack_port_unset_alias (jack_port_t *port, const char *alias) pw_thread_loop_lock(c->context.loop); + p = GET_PORT(c, GET_DIRECTION(o->port.flags), o->port.port_id); + if (p == NULL || !p->valid) { + res = -EINVAL; + goto done; + } + if (spa_streq(o->port.alias1, alias)) key = PW_KEY_OBJECT_PATH; else if (spa_streq(o->port.alias2, alias)) @@ -4222,12 +4245,6 @@ int jack_port_unset_alias (jack_port_t *port, const char *alias) goto done; } - p = GET_PORT(c, GET_DIRECTION(o->port.flags), o->port.port_id); - if (p == NULL || !p->valid) { - res = -EINVAL; - goto done; - } - pw_properties_set(p->props, key, NULL); p->info.change_mask |= SPA_PORT_CHANGE_MASK_PROPS; @@ -5052,19 +5069,24 @@ int jack_set_sync_callback (jack_client_t *client, JackSyncCallback sync_callback, void *arg) { - int res; + int res = 0; struct client *c = (struct client *) client; spa_return_val_if_fail(c != NULL, -EINVAL); + pw_thread_loop_lock(c->context.loop); + c->sync_callback = sync_callback; c->sync_arg = arg; if ((res = do_activate(c)) < 0) - return res; + goto done; c->activation->pending_sync = true; - return 0; +done: + pw_thread_loop_unlock(c->context.loop); + + return res; } SPA_EXPORT @@ -5090,12 +5112,14 @@ int jack_set_timebase_callback (jack_client_t *client, JackTimebaseCallback timebase_callback, void *arg) { - int res; + int res = 0; struct client *c = (struct client *) client; spa_return_val_if_fail(c != NULL, -EINVAL); spa_return_val_if_fail(timebase_callback != NULL, -EINVAL); + pw_thread_loop_lock(c->context.loop); + c->timebase_callback = timebase_callback; c->timebase_arg = arg; c->timeowner_pending = true; @@ -5105,11 +5129,13 @@ int jack_set_timebase_callback (jack_client_t *client, pw_log_debug(NAME" %p: timebase set id:%u", c, c->node_id); if ((res = do_activate(c)) < 0) - return res; + goto done; c->activation->pending_new_pos = true; +done: + pw_thread_loop_unlock(c->context.loop); - return 0; + return res; } SPA_EXPORT