diff --git a/pipewire-jack/src/pipewire-jack.c b/pipewire-jack/src/pipewire-jack.c index a57d01082..641b10591 100644 --- a/pipewire-jack/src/pipewire-jack.c +++ b/pipewire-jack/src/pipewire-jack.c @@ -228,7 +228,8 @@ struct mix { struct port *port; struct port *peer_port; - struct spa_io_buffers *io; + struct spa_io_buffers *io[2]; + struct spa_io_buffers *io_data; struct buffer buffers[MAX_BUFFERS]; uint32_t n_buffers; @@ -254,7 +255,7 @@ struct port { #define N_PORT_PARAMS 5 struct spa_param_info params[N_PORT_PARAMS]; - struct spa_io_buffers io; + struct spa_io_buffers io[2]; struct spa_list mix; uint32_t n_mix; struct mix *global_mix; @@ -455,6 +456,7 @@ struct client { uint32_t max_ports; unsigned int fill_aliases:1; unsigned int writable_input:1; + unsigned int async:1; uint32_t max_frames; @@ -564,6 +566,7 @@ static inline jack_port_t *object_to_port(struct object *o) struct io_info { struct mix *mix; void *data; + size_t size; }; static int @@ -572,20 +575,36 @@ do_mix_set_io(struct spa_loop *loop, bool async, uint32_t seq, { const struct io_info *info = data; struct port *port = info->mix->port; - info->mix->io = info->data; - if (info->mix->io) { - if (port->n_mix++ == 0 && port->global_mix != NULL) - port->global_mix->io = &port->io; + info->mix->io_data = info->data; + if (info->mix->io_data) { + if (info->size >= sizeof(struct spa_io_async_buffers)) { + info->mix->io[0] = &info->mix->io_data[0]; + info->mix->io[1] = &info->mix->io_data[1]; + } else if (info->size >= sizeof(struct spa_io_buffers)) { + info->mix->io[0] = &info->mix->io_data[0]; + info->mix->io[1] = &info->mix->io_data[0]; + } else { + info->mix->io[0] = NULL; + info->mix->io[1] = NULL; + } + if (port->n_mix++ == 0 && port->global_mix != NULL) { + port->global_mix->io_data = port->io; + port->global_mix->io[0] = &port->io[0]; + port->global_mix->io[1] = &port->io[1]; + } } else { - if (--port->n_mix == 0 && port->global_mix != NULL) - port->global_mix->io = NULL; + if (--port->n_mix == 0 && port->global_mix != NULL) { + port->global_mix->io_data = NULL; + port->global_mix->io[0] = NULL; + port->global_mix->io[1] = NULL; + } } return 0; } -static inline void mix_set_io(struct mix *mix, void *data) +static inline void mix_set_io(struct mix *mix, void *data, size_t size) { - struct io_info info = { .mix = mix, .data = data }; + struct io_info info = { .mix = mix, .data = data, .size = size }; pw_data_loop_invoke(mix->port->client->loop, do_mix_set_io, SPA_ID_INVALID, &info, sizeof(info), false, NULL); } @@ -597,13 +616,14 @@ static void init_mix(struct mix *mix, uint32_t mix_id, struct port *port, uint32 mix->peer_id = peer_id; mix->port = port; mix->peer_port = NULL; - mix->io = NULL; + mix->io_data = NULL; + mix->io[0] = mix->io[1] = NULL; mix->n_buffers = 0; spa_list_init(&mix->queue); if (mix_id == SPA_ID_INVALID) { port->global_mix = mix; if (port->n_mix > 0) - mix_set_io(port->global_mix, &port->io); + mix_set_io(port->global_mix, &port->io, sizeof(port->io)); } } static struct mix *find_mix_peer(struct client *c, uint32_t peer_id) @@ -1492,6 +1512,7 @@ static inline void *get_buffer_output(struct port *p, uint32_t frames, uint32_t struct buffer *b; struct spa_data *d; struct spa_io_buffers *io; + uint32_t cycle = (p->client->rt.position->clock.cycle + 1) & 1; if (frames == 0 || !p->valid) return NULL; @@ -1503,7 +1524,7 @@ static inline void *get_buffer_output(struct port *p, uint32_t frames, uint32_t c, p->object->port.name, p->port_id, frames, mix->n_buffers, mix->io); - if (SPA_UNLIKELY((io = mix->io) == NULL || mix->n_buffers == 0)) + if (SPA_UNLIKELY((io = mix->io[cycle]) == NULL || mix->n_buffers == 0)) return NULL; if (io->status == SPA_STATUS_HAVE_DATA && @@ -1579,16 +1600,17 @@ static void prepare_output(struct port *p, uint32_t frames) { struct mix *mix; struct spa_io_buffers *io; + uint32_t cycle = (p->client->rt.position->clock.cycle + 1) & 1; if (SPA_UNLIKELY(p->empty_out || p->tied)) process_empty(p, frames); - if (p->global_mix == NULL || (io = p->global_mix->io) == NULL) + if (p->global_mix == NULL || (io = p->global_mix->io[cycle]) == NULL) return; spa_list_for_each(mix, &p->mix, port_link) { if (SPA_LIKELY(mix->io != NULL)) - *mix->io = *io; + *mix->io[cycle] = *io; } } @@ -1597,6 +1619,7 @@ static void complete_process(struct client *c, uint32_t frames) struct port *p; struct mix *mix; union pw_map_item *item; + uint32_t cycle = (c->rt.position->clock.cycle + 1) & 1; pw_array_for_each(item, &c->ports[SPA_DIRECTION_OUTPUT].items) { if (pw_map_item_is_free(item)) @@ -1605,7 +1628,7 @@ static void complete_process(struct client *c, uint32_t frames) if (!p->valid) continue; prepare_output(p, frames); - p->io.status = SPA_STATUS_NEED_DATA; + p->io[cycle].status = SPA_STATUS_NEED_DATA; } pw_array_for_each(item, &c->ports[SPA_DIRECTION_INPUT].items) { if (pw_map_item_is_free(item)) @@ -1614,8 +1637,8 @@ static void complete_process(struct client *c, uint32_t frames) if (!p->valid) continue; spa_list_for_each(mix, &p->mix, port_link) { - if (SPA_LIKELY(mix->io != NULL)) - mix->io->status = SPA_STATUS_NEED_DATA; + if (SPA_LIKELY(mix->io[cycle] != NULL)) + mix->io[cycle]->status = SPA_STATUS_NEED_DATA; } } } @@ -1880,6 +1903,9 @@ static inline void signal_sync(struct client *c) activation->status = PW_NODE_ACTIVATION_FINISHED; activation->finish_time = nsec; + if (c->async) + return; + cmd = 1; spa_list_for_each(l, &c->rt.target_links, target_link) { struct pw_node_activation_state *state; @@ -2329,6 +2355,15 @@ static int param_io(struct client *c, struct port *p, return 1; } +static int param_io_async(struct client *c, struct port *p, + struct spa_pod **param, struct spa_pod_builder *b) +{ + *param = spa_pod_builder_add_object(b, + SPA_TYPE_OBJECT_ParamIO, SPA_PARAM_IO, + SPA_PARAM_IO_id, SPA_POD_Id(SPA_IO_AsyncBuffers), + SPA_PARAM_IO_size, SPA_POD_Int(sizeof(struct spa_io_async_buffers))); + return 1; +} static int param_latency(struct client *c, struct port *p, struct spa_pod **param, struct spa_pod_builder *b) { @@ -2349,7 +2384,7 @@ static int param_latency_other(struct client *c, struct port *p, static int port_set_format(struct client *c, struct port *p, uint32_t flags, const struct spa_pod *param) { - struct spa_pod *params[6]; + struct spa_pod *params[7]; uint8_t buffer[4096]; struct spa_pod_builder b = SPA_POD_BUILDER_INIT(buffer, sizeof(buffer)); @@ -2410,8 +2445,9 @@ static int port_set_format(struct client *c, struct port *p, param_format(c, p, ¶ms[1], &b); param_buffers(c, p, ¶ms[2], &b); param_io(c, p, ¶ms[3], &b); - param_latency(c, p, ¶ms[4], &b); - param_latency_other(c, p, ¶ms[5], &b); + param_io_async(c, p, ¶ms[4], &b); + param_latency(c, p, ¶ms[5], &b); + param_latency_other(c, p, ¶ms[6], &b); pw_client_node_port_update(c->node, p->direction, @@ -2429,7 +2465,7 @@ static int port_set_format(struct client *c, struct port *p, static void port_update_latency(struct port *p) { struct client *c = p->client; - struct spa_pod *params[6]; + struct spa_pod *params[7]; uint8_t buffer[4096]; struct spa_pod_builder b = SPA_POD_BUILDER_INIT(buffer, sizeof(buffer)); @@ -2437,8 +2473,9 @@ static void port_update_latency(struct port *p) param_format(c, p, ¶ms[1], &b); param_buffers(c, p, ¶ms[2], &b); param_io(c, p, ¶ms[3], &b); - param_latency(c, p, ¶ms[4], &b); - param_latency_other(c, p, ¶ms[5], &b); + param_io_async(c, p, ¶ms[4], &b); + param_latency(c, p, ¶ms[5], &b); + param_latency_other(c, p, ¶ms[6], &b); pw_log_info("port %s: update", p->object->port.name); @@ -2825,8 +2862,8 @@ static int client_node_port_set_io(void *data, if (mem_id == SPA_ID_INVALID) { mm = ptr = NULL; - } - else { + size = 0; + } else { mm = pw_mempool_map_id(c->pool, mem_id, PW_MEMMAP_FLAG_READWRITE, offset, size, tag); if (mm == NULL) { @@ -2842,7 +2879,8 @@ static int client_node_port_set_io(void *data, switch (id) { case SPA_IO_Buffers: - mix_set_io(mix, ptr); + case SPA_IO_AsyncBuffers: + mix_set_io(mix, ptr, size); if (old != NULL) { old->tag[0] = SPA_ID_INVALID; pw_core_set_paused(c->core, true); @@ -4051,6 +4089,7 @@ jack_client_t * jack_client_open (const char *client_name, client->max_ports = pw_properties_get_uint32(client->props, "jack.max-client-ports", MAX_CLIENT_PORTS); client->fill_aliases = pw_properties_get_bool(client->props, "jack.fill-aliases", false); client->writable_input = pw_properties_get_bool(client->props, "jack.writable-input", true); + client->async = pw_properties_get_bool(client->props, PW_KEY_NODE_ASYNC, false); client->self_connect_mode = SELF_CONNECT_ALLOW; if ((str = pw_properties_get(client->props, "jack.self-connect-mode")) != NULL) { @@ -4986,7 +5025,7 @@ jack_port_t * jack_port_register (jack_client_t *client, jack_port_type_id_t type_id; uint8_t buffer[1024]; struct spa_pod_builder b = SPA_POD_BUILDER_INIT(buffer, sizeof(buffer)); - struct spa_pod *params[6]; + struct spa_pod *params[7]; uint32_t n_params = 0; struct port *p; int res, len; @@ -5098,6 +5137,7 @@ jack_port_t * jack_port_register (jack_client_t *client, param_enum_format(c, p, ¶ms[n_params++], &b); param_buffers(c, p, ¶ms[n_params++], &b); param_io(c, p, ¶ms[n_params++], &b); + param_io_async(c, p, ¶ms[n_params++], &b); param_latency(c, p, ¶ms[n_params++], &b); param_latency_other(c, p, ¶ms[n_params++], &b); @@ -5204,14 +5244,15 @@ done: return res; } -static struct buffer *get_mix_buffer(struct mix *mix, jack_nframes_t frames) +static struct buffer *get_mix_buffer(struct port *p, struct mix *mix, jack_nframes_t frames) { struct spa_io_buffers *io; + uint32_t cycle = p->client->rt.position->clock.cycle & 1; if (mix->peer_port != NULL) prepare_output(mix->peer_port, frames); - io = mix->io; + io = mix->io[cycle]; if (io == NULL || io->status != SPA_STATUS_HAVE_DATA || io->buffer_id >= mix->n_buffers) @@ -5249,7 +5290,7 @@ static void *get_buffer_input_float(struct port *p, jack_nframes_t frames) pw_log_trace_fp("%p: port %s mix %d.%d get buffer %d", p->client, p->object->port.name, p->port_id, mix->id, frames); - if ((b = get_mix_buffer(mix, frames)) == NULL) + if ((b = get_mix_buffer(p, mix, frames)) == NULL) continue; if ((np = get_buffer_data(b, frames)) == NULL) @@ -5293,7 +5334,7 @@ static void *get_buffer_input_midi(struct port *p, jack_nframes_t frames) pw_log_trace_fp("%p: port %p mix %d.%d get buffer %d", p->client, p, p->port_id, mix->id, frames); - if ((b = get_mix_buffer(mix, frames)) == NULL) + if ((b = get_mix_buffer(p, mix, frames)) == NULL) continue; d = &b->datas[0]; @@ -5374,7 +5415,7 @@ void * jack_port_get_buffer (jack_port_t *port, jack_nframes_t frames) pw_log_trace("peer mix: %p %d", mix, mix->peer_id); - if ((b = get_mix_buffer(mix, frames)) == NULL) + if ((b = get_mix_buffer(p, mix, frames)) == NULL) goto done; if (o->port.type_id == TYPE_ID_MIDI) { diff --git a/spa/plugins/audiomixer/audiomixer.c b/spa/plugins/audiomixer/audiomixer.c index 99d197f79..9f2427a84 100644 --- a/spa/plugins/audiomixer/audiomixer.c +++ b/spa/plugins/audiomixer/audiomixer.c @@ -65,7 +65,7 @@ struct port { struct port_props props; - struct spa_io_buffers *io; + struct spa_io_buffers *io[2]; uint64_t info_all; struct spa_port_info info; @@ -100,6 +100,8 @@ struct impl { struct spa_node_info info; struct spa_param_info params[8]; + struct spa_io_position *position; + struct spa_hook_list hooks; uint32_t port_count; @@ -146,7 +148,16 @@ static int impl_node_set_param(void *object, uint32_t id, uint32_t flags, static int impl_node_set_io(void *object, uint32_t id, void *data, size_t size) { - return -ENOTSUP; + struct impl *this = object; + + switch (id) { + case SPA_IO_Position: + this->position = data; + break; + default: + return -ENOTSUP; + } + return 0; } static int impl_node_send_command(void *object, const struct spa_command *command) @@ -436,6 +447,12 @@ impl_node_port_enum_params(void *object, int seq, SPA_PARAM_IO_id, SPA_POD_Id(SPA_IO_Buffers), SPA_PARAM_IO_size, SPA_POD_Int(sizeof(struct spa_io_buffers))); break; + case 1: + param = spa_pod_builder_add_object(&b, + SPA_TYPE_OBJECT_ParamIO, id, + SPA_PARAM_IO_id, SPA_POD_Id(SPA_IO_AsyncBuffers), + SPA_PARAM_IO_size, SPA_POD_Int(sizeof(struct spa_io_async_buffers))); + break; default: return 0; } @@ -683,13 +700,24 @@ impl_node_port_use_buffers(void *object, struct io_info { struct port *port; void *data; + size_t size; }; static int do_port_set_io(struct spa_loop *loop, bool async, uint32_t seq, const void *data, size_t size, void *user_data) { struct io_info *info = user_data; - info->port->io = info->data; + if (info->size >= sizeof(struct spa_io_async_buffers)) { + struct spa_io_async_buffers *ab = info->data; + info->port->io[0] = &ab->buffers[0]; + info->port->io[1] = &ab->buffers[1]; + } else if (info->size >= sizeof(struct spa_io_buffers)) { + info->port->io[0] = info->data; + info->port->io[1] = info->data; + } else { + info->port->io[0] = NULL; + info->port->io[1] = NULL; + } return 0; } @@ -712,9 +740,11 @@ impl_node_port_set_io(void *object, port = GET_PORT(this, direction, port_id); info.port = port; info.data = data; + info.size = size; switch (id) { case SPA_IO_Buffers: + case SPA_IO_AsyncBuffers: spa_loop_invoke(this->data_loop, do_port_set_io, SPA_ID_INVALID, NULL, 0, true, &info); break; @@ -748,11 +778,12 @@ static int impl_node_process(void *object) struct buffer **buffers; struct buffer *outb; const void **datas; + uint32_t cycle = this->position->clock.cycle & 1; spa_return_val_if_fail(this != NULL, -EINVAL); outport = GET_OUT_PORT(this, 0); - if ((outio = outport->io) == NULL) + if ((outio = outport->io[cycle]) == NULL) return -EIO; spa_log_trace_fp(this->log, "%p: status %p %d %d", @@ -780,16 +811,17 @@ static int impl_node_process(void *object) struct spa_data *bd; uint32_t size, offs; - if (SPA_UNLIKELY(!PORT_VALID(inport) || - (inio = inport->io) == NULL || - inio->buffer_id >= inport->n_buffers || - inio->status != SPA_STATUS_HAVE_DATA)) { - spa_log_trace_fp(this->log, "%p: skip input idx:%d valid:%d " + if (SPA_UNLIKELY(!PORT_VALID(inport) || (inio = inport->io[cycle]) == NULL)) { + spa_log_trace_fp(this->log, "%p: skip input idx:%d valid:%d io:%p/%p/%d", + this, i, PORT_VALID(inport), + inport->io[0], inport->io[1], cycle); + continue; + } + if (inio->buffer_id >= inport->n_buffers || + inio->status != SPA_STATUS_HAVE_DATA) { + spa_log_trace_fp(this->log, "%p: skip input idx:%d " "io:%p status:%d buf_id:%d n_buffers:%d", this, - i, PORT_VALID(inport), inio, - inio ? inio->status : -1, - inio ? inio->buffer_id : SPA_ID_INVALID, - inport->n_buffers); + i, inio, inio->status, inio->buffer_id, inport->n_buffers); continue; } diff --git a/spa/plugins/audiomixer/mixer-dsp.c b/spa/plugins/audiomixer/mixer-dsp.c index cb80379bc..61a47414c 100644 --- a/spa/plugins/audiomixer/mixer-dsp.c +++ b/spa/plugins/audiomixer/mixer-dsp.c @@ -61,7 +61,7 @@ struct port { struct port_props props; - struct spa_io_buffers *io; + struct spa_io_buffers *io[2]; uint64_t info_all; struct spa_port_info info; @@ -96,6 +96,8 @@ struct impl { struct spa_node_info info; struct spa_param_info params[8]; + struct spa_io_position *position; + struct spa_hook_list hooks; uint32_t port_count; @@ -141,7 +143,16 @@ static int impl_node_set_param(void *object, uint32_t id, uint32_t flags, static int impl_node_set_io(void *object, uint32_t id, void *data, size_t size) { - return -ENOTSUP; + struct impl *this = object; + + switch (id) { + case SPA_IO_Position: + this->position = data; + break; + default: + return -ENOTSUP; + } + return 0; } static int impl_node_send_command(void *object, const struct spa_command *command) @@ -413,6 +424,12 @@ next: SPA_PARAM_IO_id, SPA_POD_Id(SPA_IO_Buffers), SPA_PARAM_IO_size, SPA_POD_Int(sizeof(struct spa_io_buffers))); break; + case 1: + param = spa_pod_builder_add_object(&b, + SPA_TYPE_OBJECT_ParamIO, id, + SPA_PARAM_IO_id, SPA_POD_Id(SPA_IO_AsyncBuffers), + SPA_PARAM_IO_size, SPA_POD_Int(sizeof(struct spa_io_async_buffers))); + break; default: return 0; } @@ -618,13 +635,24 @@ impl_node_port_use_buffers(void *object, struct io_info { struct port *port; void *data; + size_t size; }; static int do_port_set_io(struct spa_loop *loop, bool async, uint32_t seq, const void *data, size_t size, void *user_data) { struct io_info *info = user_data; - info->port->io = info->data; + if (info->size >= sizeof(struct spa_io_async_buffers)) { + struct spa_io_async_buffers *ab = info->data; + info->port->io[0] = &ab->buffers[0]; + info->port->io[1] = &ab->buffers[1]; + } else if (info->size >= sizeof(struct spa_io_buffers)) { + info->port->io[0] = info->data; + info->port->io[1] = info->data; + } else { + info->port->io[0] = NULL; + info->port->io[1] = NULL; + } return 0; } @@ -647,9 +675,11 @@ impl_node_port_set_io(void *object, port = GET_PORT(this, direction, port_id); info.port = port; info.data = data; + info.size = size; switch (id) { case SPA_IO_Buffers: + case SPA_IO_AsyncBuffers: spa_loop_invoke(this->data_loop, do_port_set_io, SPA_ID_INVALID, NULL, 0, true, &info); break; @@ -683,11 +713,12 @@ static int impl_node_process(void *object) struct buffer **buffers; struct buffer *outb; const void **datas; + uint32_t cycle = this->position->clock.cycle & 1; spa_return_val_if_fail(this != NULL, -EINVAL); outport = GET_OUT_PORT(this, 0); - if ((outio = outport->io) == NULL) + if ((outio = outport->io[cycle]) == NULL) return -EIO; spa_log_trace_fp(this->log, "%p: status %p %d %d", @@ -715,16 +746,17 @@ static int impl_node_process(void *object) struct spa_data *bd; uint32_t size, offs; - if (SPA_UNLIKELY(!PORT_VALID(inport) || - (inio = inport->io) == NULL || - inio->buffer_id >= inport->n_buffers || - inio->status != SPA_STATUS_HAVE_DATA)) { - spa_log_trace_fp(this->log, "%p: skip input idx:%d valid:%d " + if (SPA_UNLIKELY(!PORT_VALID(inport) || (inio = inport->io[cycle]) == NULL)) { + spa_log_trace_fp(this->log, "%p: skip input idx:%d valid:%d io:%p/%p/%d", + this, i, PORT_VALID(inport), + inport->io[0], inport->io[1], cycle); + continue; + } + if (inio->buffer_id >= inport->n_buffers || + inio->status != SPA_STATUS_HAVE_DATA) { + spa_log_trace_fp(this->log, "%p: skip input idx:%d " "io:%p status:%d buf_id:%d n_buffers:%d", this, - i, PORT_VALID(inport), inio, - inio ? inio->status : -1, - inio ? inio->buffer_id : SPA_ID_INVALID, - inport->n_buffers); + i, inio, inio->status, inio->buffer_id, inport->n_buffers); continue; } @@ -735,8 +767,8 @@ static int impl_node_process(void *object) size = SPA_MIN(bd->maxsize - offs, bd->chunk->size); maxsize = SPA_MIN(maxsize, size); - spa_log_trace_fp(this->log, "%p: mix input %d %p->%p %d %d %d:%d/%d %u", this, - i, inio, outio, inio->status, inio->buffer_id, + spa_log_trace_fp(this->log, "%p: mix input %d %p->%p %d %d/%d %d:%d/%d %u", this, + i, inio, outio, inio->status, inio->buffer_id, inport->n_buffers, offs, size, (int)sizeof(float), bd->chunk->flags); diff --git a/spa/plugins/control/mixer.c b/spa/plugins/control/mixer.c index d744ca714..7cc8ee9a6 100644 --- a/spa/plugins/control/mixer.c +++ b/spa/plugins/control/mixer.c @@ -41,7 +41,7 @@ struct port { uint32_t direction; uint32_t id; - struct spa_io_buffers *io; + struct spa_io_buffers *io[2]; uint64_t info_all; struct spa_port_info info; @@ -68,6 +68,8 @@ struct impl { struct spa_node_info info; struct spa_param_info params[8]; + struct spa_io_position *position; + struct spa_hook_list hooks; uint32_t port_count; @@ -111,7 +113,16 @@ static int impl_node_set_param(void *object, uint32_t id, uint32_t flags, static int impl_node_set_io(void *object, uint32_t id, void *data, size_t size) { - return -ENOTSUP; + struct impl *this = object; + + switch (id) { + case SPA_IO_Position: + this->position = data; + break; + default: + return -ENOTSUP; + } + return 0; } static int impl_node_send_command(void *object, const struct spa_command *command) @@ -353,6 +364,12 @@ next: SPA_PARAM_IO_id, SPA_POD_Id(SPA_IO_Buffers), SPA_PARAM_IO_size, SPA_POD_Int(sizeof(struct spa_io_buffers))); break; + case 1: + param = spa_pod_builder_add_object(&b, + SPA_TYPE_OBJECT_ParamIO, id, + SPA_PARAM_IO_id, SPA_POD_Id(SPA_IO_AsyncBuffers), + SPA_PARAM_IO_size, SPA_POD_Int(sizeof(struct spa_io_async_buffers))); + break; default: return 0; } @@ -531,13 +548,24 @@ impl_node_port_use_buffers(void *object, struct io_info { struct port *port; void *data; + size_t size; }; static int do_port_set_io(struct spa_loop *loop, bool async, uint32_t seq, const void *data, size_t size, void *user_data) { struct io_info *info = user_data; - info->port->io = info->data; + if (info->size >= sizeof(struct spa_io_async_buffers)) { + struct spa_io_async_buffers *ab = info->data; + info->port->io[0] = &ab->buffers[0]; + info->port->io[1] = &ab->buffers[1]; + } else if (info->size >= sizeof(struct spa_io_buffers)) { + info->port->io[0] = info->data; + info->port->io[1] = info->data; + } else { + info->port->io[0] = NULL; + info->port->io[1] = NULL; + } return 0; } @@ -560,9 +588,11 @@ impl_node_port_set_io(void *object, port = GET_PORT(this, direction, port_id); info.port = port; info.data = data; + info.size = size; switch (id) { case SPA_IO_Buffers: + case SPA_IO_AsyncBuffers: spa_loop_invoke(this->data_loop, do_port_set_io, SPA_ID_INVALID, NULL, 0, true, &info); break; @@ -631,11 +661,12 @@ static int impl_node_process(void *object) struct spa_pod_frame f; struct buffer *outb; struct spa_data *d; + uint32_t cycle = this->position->clock.cycle & 1; spa_return_val_if_fail(this != NULL, -EINVAL); outport = GET_OUT_PORT(this, 0); - if ((outio = outport->io) == NULL) + if ((outio = outport->io[cycle]) == NULL) return -EIO; spa_log_trace_fp(this->log, "%p: status %p %d %d", @@ -668,16 +699,17 @@ static int impl_node_process(void *object) struct spa_io_buffers *inio = NULL; void *pod; - if (!inport->valid || - (inio = inport->io) == NULL || - inio->buffer_id >= inport->n_buffers || + if (SPA_UNLIKELY(!PORT_VALID(inport) || (inio = inport->io[cycle]) == NULL)) { + spa_log_trace_fp(this->log, "%p: skip input idx:%d valid:%d io:%p/%p/%d", + this, i, PORT_VALID(inport), + inport->io[0], inport->io[1], cycle); + continue; + } + if (inio->buffer_id >= inport->n_buffers || inio->status != SPA_STATUS_HAVE_DATA) { - spa_log_trace_fp(this->log, "%p: skip input idx:%d valid:%d " + spa_log_trace_fp(this->log, "%p: skip input idx:%d " "io:%p status:%d buf_id:%d n_buffers:%d", this, - i, inport->valid, inio, - inio ? inio->status : -1, - inio ? inio->buffer_id : SPA_ID_INVALID, - inport->n_buffers); + i, inio, inio->status, inio->buffer_id, inport->n_buffers); continue; } diff --git a/src/modules/module-client-node/client-node.c b/src/modules/module-client-node/client-node.c index 0882b6a67..b45deb4a3 100644 --- a/src/modules/module-client-node/client-node.c +++ b/src/modules/module-client-node/client-node.c @@ -31,7 +31,8 @@ PW_LOG_TOPIC_EXTERN(mod_topic); #define MAX_BUFFERS 64 #define MAX_METAS 16u #define MAX_DATAS 64u -#define AREA_SIZE (4096u / sizeof(struct spa_io_buffers)) +#define AREA_SLOT (sizeof(struct spa_io_async_buffers)) +#define AREA_SIZE (4096u / AREA_SLOT) #define MAX_AREAS 32 #define CHECK_FREE_PORT(impl,d,p) (p <= pw_map_get_size(&impl->ports[d]) && !CHECK_PORT(impl,d,p)) @@ -1363,7 +1364,7 @@ static int add_area(struct impl *impl) size_t size; struct pw_memblock *area; - size = sizeof(struct spa_io_buffers) * AREA_SIZE; + size = AREA_SLOT * AREA_SIZE; area = pw_mempool_alloc(impl->context_pool, PW_MEMBLOCK_FLAG_READWRITE | @@ -1449,6 +1450,7 @@ static int port_init_mix(void *data, struct pw_impl_port_mix *mix) struct mix *m; uint32_t idx, pos, len; struct pw_memblock *area; + struct spa_io_async_buffers *ab; if ((m = create_mix(port, mix->port.port_id)) == NULL) return -ENOMEM; @@ -1472,9 +1474,12 @@ static int port_init_mix(void *data, struct pw_impl_port_mix *mix) } area = *pw_array_get_unchecked(&impl->io_areas, idx, struct pw_memblock*); - mix->io = SPA_PTROFF(area->map->ptr, - pos * sizeof(struct spa_io_buffers), void); - *mix->io = SPA_IO_BUFFERS_INIT; + ab = SPA_PTROFF(area->map->ptr, pos * AREA_SLOT, void); + mix->io_data = ab; + mix->io[0] = &ab->buffers[0]; + mix->io[1] = &ab->buffers[1]; + *mix->io[0] = SPA_IO_BUFFERS_INIT; + *mix->io[1] = SPA_IO_BUFFERS_INIT; m->peer_id = mix->peer_id; m->impl_mix_id = mix->id; @@ -1484,8 +1489,8 @@ static int port_init_mix(void *data, struct pw_impl_port_mix *mix) mix->port.direction, mix->p->port_id, mix->port.port_id, mix->peer_id, NULL); - pw_log_debug("%p: init mix id:%d io:%p base:%p", impl, - mix->id, mix->io, area->map->ptr); + pw_log_debug("%p: init mix id:%d io:%p/%p base:%p", impl, + mix->id, mix->io[0], mix->io[1], area->map->ptr); return 0; no_mem: @@ -1606,11 +1611,24 @@ static int impl_mix_port_set_io(void *object, if (mix == NULL) return -EINVAL; - if (id == SPA_IO_Buffers) { + switch (id) { + case SPA_IO_Buffers: if (data && size >= sizeof(struct spa_io_buffers)) - mix->io = data; + mix->io[0] = mix->io[1] = data; else - mix->io = NULL; + mix->io[0] = mix->io[1] = NULL; + break; + case SPA_IO_AsyncBuffers: + if (data && size >= sizeof(struct spa_io_async_buffers)) { + struct spa_io_async_buffers *ab = data; + mix->io[0] = &ab->buffers[0]; + mix->io[1] = &ab->buffers[1]; + } + else + mix->io[0] = mix->io[1] = NULL; + break; + default: + break; } return do_port_set_io(impl, direction, port->port_id, mix->port.port_id, diff --git a/src/pipewire/impl-link.c b/src/pipewire/impl-link.c index dab97df47..d0cedf4de 100644 --- a/src/pipewire/impl-link.c +++ b/src/pipewire/impl-link.c @@ -44,9 +44,10 @@ struct impl { struct spa_hook output_node_listener; struct spa_hook output_global_listener; - struct spa_io_buffers io; + struct spa_io_buffers io[2]; struct pw_impl_node *inode, *onode; + bool async; }; /** \endcond */ @@ -98,7 +99,8 @@ static void pw_node_peer_activate(struct pw_node_peer *peer) if (peer->active_count++ == 0) { spa_list_append(&peer->output->rt.target_list, &peer->target.link); if (!peer->target.active && peer->output->rt.driver_target.node != NULL) { - state->required++; + if (!peer->output->async) + state->required++; peer->target.active = true; } } @@ -115,7 +117,8 @@ static void pw_node_peer_deactivate(struct pw_node_peer *peer) spa_list_remove(&peer->target.link); if (peer->target.active) { - state->required--; + if (!peer->output->async) + state->required--; peer->target.active = false; } } @@ -534,14 +537,15 @@ static void select_io(struct pw_impl_link *this) struct impl *impl = SPA_CONTAINER_OF(this, struct impl, this); struct spa_io_buffers *io; - io = this->rt.in_mix.io; + io = this->rt.in_mix.io_data; if (io == NULL) - io = this->rt.out_mix.io; + io = this->rt.out_mix.io_data; if (io == NULL) - io = &impl->io; + io = impl->io; this->io = io; - *this->io = SPA_IO_BUFFERS_INIT; + this->io[0] = SPA_IO_BUFFERS_INIT; + this->io[1] = SPA_IO_BUFFERS_INIT; } static int do_allocation(struct pw_impl_link *this) @@ -678,6 +682,7 @@ int pw_impl_link_activate(struct pw_impl_link *this) { struct impl *impl = SPA_CONTAINER_OF(this, struct impl, this); int res; + uint32_t io_type, io_size; pw_log_debug("%p: activate activated:%d state:%s", this, impl->activated, pw_link_state_as_string(this->info.state)); @@ -686,11 +691,19 @@ int pw_impl_link_activate(struct pw_impl_link *this) !impl->inode->runnable || !impl->onode->runnable) return 0; - if ((res = port_set_io(this, this->input, SPA_IO_Buffers, this->io, - sizeof(struct spa_io_buffers), &this->rt.in_mix)) < 0) + if (impl->async) { + io_type = SPA_IO_AsyncBuffers; + io_size = sizeof(struct spa_io_async_buffers); + } else { + io_type = SPA_IO_Buffers; + io_size = sizeof(struct spa_io_buffers); + } + + if ((res = port_set_io(this, this->input, io_type, this->io, + io_size, &this->rt.in_mix)) < 0) goto error; - if ((res = port_set_io(this, this->output, SPA_IO_Buffers, this->io, - sizeof(struct spa_io_buffers), &this->rt.out_mix)) < 0) + if ((res = port_set_io(this, this->output, io_type, this->io, + io_size, &this->rt.out_mix)) < 0) goto error_clean; pw_loop_invoke(this->output->node->data_loop, @@ -703,7 +716,7 @@ int pw_impl_link_activate(struct pw_impl_link *this) return 0; error_clean: - port_set_io(this, this->input, SPA_IO_Buffers, NULL, 0, &this->rt.in_mix); + port_set_io(this, this->input, io_type, NULL, 0, &this->rt.in_mix); error: pw_log_error("%p: can't activate link: %s", this, spa_strerror(res)); return res; @@ -892,7 +905,7 @@ int pw_impl_link_deactivate(struct pw_impl_link *this) impl->activated = false; pw_log_info("(%s) deactivated", this->name); - + if (this->info.state < PW_LINK_STATE_PAUSED || this->destroyed) link_update_state(this, PW_LINK_STATE_INIT, 0, NULL); else @@ -1385,6 +1398,13 @@ struct pw_impl_link *pw_context_create_link(struct pw_context *context, if (this->passive && str == NULL) pw_properties_set(properties, PW_KEY_LINK_PASSIVE, "true"); + impl->async = (output_node->async || input_node->async) && + SPA_FLAG_IS_SET(output->flags, PW_IMPL_PORT_FLAG_ASYNC) && + SPA_FLAG_IS_SET(input->flags, PW_IMPL_PORT_FLAG_ASYNC); + + if (impl->async) + pw_properties_set(properties, PW_KEY_LINK_ASYNC, "true"); + spa_hook_list_init(&this->listener_list); impl->format_filter = format_filter; @@ -1414,8 +1434,6 @@ struct pw_impl_link *pw_context_create_link(struct pw_context *context, spa_list_append(&output->links, &this->output_link); spa_list_append(&input->links, &this->input_link); - impl->io = SPA_IO_BUFFERS_INIT; - select_io(this); if (this->feedback) { @@ -1434,7 +1452,8 @@ struct pw_impl_link *pw_context_create_link(struct pw_context *context, this->name = spa_aprintf("%d.%d.%d -> %d.%d.%d", output_node->info.id, output->port_id, this->rt.out_mix.port.port_id, input_node->info.id, input->port_id, this->rt.in_mix.port.port_id); - pw_log_info("(%s) (%s) -> (%s)", this->name, output_node->name, input_node->name); + pw_log_info("(%s) (%s) -> (%s) async:%04x:%04x:%d", this->name, output_node->name, + input_node->name, output->flags, input->flags, impl->async); pw_impl_port_emit_link_added(output, this); pw_impl_port_emit_link_added(input, this); diff --git a/src/pipewire/impl-node.c b/src/pipewire/impl-node.c index 59fdeff10..06b8d3657 100644 --- a/src/pipewire/impl-node.c +++ b/src/pipewire/impl-node.c @@ -113,7 +113,8 @@ static void add_node(struct pw_impl_node *this, struct pw_impl_node *driver) spa_list_for_each(t, &this->rt.target_list, link) { dstate = &t->activation->state[0]; if (!t->active) { - dstate->required++; + if (!this->async) + dstate->required++; t->active = true; } pw_log_trace("%p: driver state:%p pending:%d/%d, node state:%p pending:%d/%d", @@ -146,7 +147,8 @@ static void remove_node(struct pw_impl_node *this) spa_list_for_each(t, &this->rt.target_list, link) { dstate = &t->activation->state[0]; if (t->active) { - dstate->required--; + if (!this->async) + dstate->required--; t->active = false; } pw_log_trace("%p: driver state:%p pending:%d/%d, node state:%p pending:%d/%d", @@ -973,7 +975,7 @@ static void check_properties(struct pw_impl_node *node) const char *str, *recalc_reason = NULL; struct spa_fraction frac; uint32_t value; - bool driver, trigger, transport, sync; + bool driver, trigger, transport, sync, async; struct match match; match = MATCH_INIT(node); @@ -1082,6 +1084,11 @@ static void check_properties(struct pw_impl_node *node) node->transport = transport; recalc_reason = "transport changed"; } + async = pw_properties_get_bool(node->properties, PW_KEY_NODE_ASYNC, false); + if (async != node->async) { + pw_log_info("%p: async %d -> %d", node, node->async, async); + node->async = async; + } if ((str = pw_properties_get(node->properties, PW_KEY_MEDIA_CLASS)) != NULL && (strstr(str, "/Sink") != NULL || strstr(str, "/Source") != NULL)) { @@ -1360,7 +1367,8 @@ static inline int process_node(void *data) /* we don't need to trigger targets when the node was driving the * graph because that means we finished the graph. */ if (SPA_LIKELY(!this->driving)) { - trigger_targets(&this->rt.target, status, nsec); + if (!this->async) + trigger_targets(&this->rt.target, status, nsec); } else { /* calculate CPU time when finished */ a->signal_time = this->driver_start; diff --git a/src/pipewire/impl-port.c b/src/pipewire/impl-port.c index 14f2217e0..f59387c32 100644 --- a/src/pipewire/impl-port.c +++ b/src/pipewire/impl-port.c @@ -168,6 +168,12 @@ next: SPA_PARAM_IO_id, SPA_POD_Id(SPA_IO_Buffers), SPA_PARAM_IO_size, SPA_POD_Int(sizeof(struct spa_io_buffers))); break; + case 1: + param = spa_pod_builder_add_object(&b, + SPA_TYPE_OBJECT_ParamIO, id, + SPA_PARAM_IO_id, SPA_POD_Id(SPA_IO_AsyncBuffers), + SPA_PARAM_IO_size, SPA_POD_Int(sizeof(struct spa_io_buffers))); + break; default: return 0; } @@ -242,13 +248,22 @@ static int port_set_io(void *object, if (mix == NULL) return -ENOENT; - if (id == SPA_IO_Buffers) { + switch (id) { + case SPA_IO_Buffers: + case SPA_IO_AsyncBuffers: if (data == NULL || size == 0) { pw_loop_invoke(this->node->data_loop, do_remove_mix, SPA_ID_INVALID, NULL, 0, true, mix); - mix->io = NULL; + mix->io_data = mix->io[0] = mix->io[1] = NULL; } else if (data != NULL && size >= sizeof(struct spa_io_buffers)) { - mix->io = data; + if (size >= sizeof(struct spa_io_async_buffers)) { + struct spa_io_async_buffers *ab = data; + mix->io_data = data; + mix->io[0] = &ab->buffers[0]; + mix->io[1] = &ab->buffers[1]; + } else { + mix->io_data = mix->io[0] = mix->io[1] = data; + } pw_loop_invoke(this->node->data_loop, do_add_mix, SPA_ID_INVALID, NULL, 0, false, mix); } @@ -262,12 +277,13 @@ static int tee_process(void *object) struct pw_impl_port *this = &impl->this; struct pw_impl_port_mix *mix; struct spa_io_buffers *io = &this->rt.io; + uint32_t cycle = (this->node->rt.position->clock.cycle + 1) & 1; pw_log_trace_fp("%p: tee input %d %d", this, io->status, io->buffer_id); spa_list_for_each(mix, &impl->mix_list, rt_link) { pw_log_trace_fp("%p: port %d %p->%p %d", this, - mix->port.port_id, io, mix->io, mix->io->buffer_id); - *mix->io = *io; + mix->port.port_id, io, mix->io[cycle], mix->io[cycle]->buffer_id); + *mix->io[cycle] = *io; } io->status = SPA_STATUS_NEED_DATA; @@ -299,15 +315,17 @@ static int schedule_mix_input(void *object) struct pw_impl_port *this = &impl->this; struct spa_io_buffers *io = &this->rt.io; struct pw_impl_port_mix *mix; + uint32_t cycle = (this->node->rt.position->clock.cycle + 1) & 1; if (SPA_UNLIKELY(PW_IMPL_PORT_IS_CONTROL(this))) return SPA_STATUS_HAVE_DATA | SPA_STATUS_NEED_DATA; spa_list_for_each(mix, &impl->mix_list, rt_link) { pw_log_trace_fp("%p: mix input %d %p->%p %d %d", this, - mix->port.port_id, mix->io, io, mix->io->status, mix->io->buffer_id); - *io = *mix->io; - mix->io->status = SPA_STATUS_NEED_DATA; + mix->port.port_id, mix->io[cycle], io, + mix->io[cycle]->status, mix->io[cycle]->buffer_id); + *io = *mix->io[cycle]; + mix->io[cycle]->status = SPA_STATUS_NEED_DATA; break; } return SPA_STATUS_HAVE_DATA | SPA_STATUS_NEED_DATA; @@ -541,6 +559,9 @@ static int check_param_io(void *data, int seq, uint32_t id, pw_control_new(node->context, port, pid, psize, 0); SPA_FLAG_SET(port->flags, PW_IMPL_PORT_FLAG_CONTROL); break; + case SPA_IO_AsyncBuffers: + SPA_FLAG_SET(port->flags, PW_IMPL_PORT_FLAG_ASYNC); + SPA_FALLTHROUGH; case SPA_IO_Buffers: SPA_FLAG_SET(port->flags, PW_IMPL_PORT_FLAG_BUFFERS); break; @@ -615,6 +636,7 @@ static void check_params(struct pw_impl_port *port) port->info.params[i].user = 0; port->flags &= ~(PW_IMPL_PORT_FLAG_CONTROL | + PW_IMPL_PORT_FLAG_ASYNC | PW_IMPL_PORT_FLAG_BUFFERS); pw_impl_port_for_each_param(port, 0, SPA_PARAM_IO, 0, 0, NULL, check_param_io, port); diff --git a/src/pipewire/keys.h b/src/pipewire/keys.h index 93f45ea99..4b36cd522 100644 --- a/src/pipewire/keys.h +++ b/src/pipewire/keys.h @@ -182,6 +182,7 @@ extern "C" { #define PW_KEY_NODE_CACHE_PARAMS "node.cache-params" /**< cache the node params */ #define PW_KEY_NODE_TRANSPORT_SYNC "node.transport.sync" /**< the node handles transport sync */ #define PW_KEY_NODE_DRIVER "node.driver" /**< node can drive the graph */ +#define PW_KEY_NODE_ASYNC "node.async" /**< the node wants async scheduling */ #define PW_KEY_NODE_STREAM "node.stream" /**< node is a stream, the server side should * add a converter */ #define PW_KEY_NODE_VIRTUAL "node.virtual" /**< the node is some sort of virtual @@ -230,6 +231,7 @@ extern "C" { #define PW_KEY_LINK_FEEDBACK "link.feedback" /**< indicate that a link is a feedback * link and the target will receive data * in the next cycle */ +#define PW_KEY_LINK_ASYNC "link.async" /**< the link is using async io */ /** device properties */ #define PW_KEY_DEVICE_ID "device.id" /**< device id */ diff --git a/src/pipewire/private.h b/src/pipewire/private.h index d3de73c73..5a5b205ba 100644 --- a/src/pipewire/private.h +++ b/src/pipewire/private.h @@ -688,6 +688,7 @@ struct pw_impl_node { unsigned int checked; /**< for sorting */ unsigned int sync:1; /**< the sync-groups are active */ unsigned int transport:1; /**< the transport is active */ + unsigned int async:1; /**< async processing, one cycle latency */ uint32_t port_user_data_size; /**< extra size for port user data */ @@ -755,7 +756,8 @@ struct pw_impl_port_mix { enum spa_direction direction; uint32_t port_id; } port; - struct spa_io_buffers *io; + struct spa_io_buffers *io[2]; + void *io_data; uint32_t id; uint32_t peer_id; unsigned int have_buffers:1; @@ -811,6 +813,7 @@ struct pw_impl_port { #define PW_IMPL_PORT_FLAG_BUFFERS (1<<1) /**< port has data */ #define PW_IMPL_PORT_FLAG_CONTROL (1<<2) /**< port has control */ #define PW_IMPL_PORT_FLAG_NO_MIXER (1<<3) /**< don't try to add mixer to port */ +#define PW_IMPL_PORT_FLAG_ASYNC (1<<4) /**< port support async io */ uint32_t flags; uint64_t spa_flags; diff --git a/src/pipewire/stream.c b/src/pipewire/stream.c index 62e39cd60..6a60b2253 100644 --- a/src/pipewire/stream.c +++ b/src/pipewire/stream.c @@ -2033,6 +2033,9 @@ pw_stream_connect(struct pw_stream *stream, impl->media_subtype == SPA_MEDIA_SUBTYPE_control) pw_properties_set(impl->port_props, PW_KEY_FORMAT_DSP, "8 bit raw midi"); + if ((str = pw_properties_get(stream->properties, PW_KEY_NODE_ASYNC)) != NULL && spa_atob(str)) + SPA_FLAG_SET(impl->info.flags, SPA_NODE_FLAG_ASYNC); + match = MATCH_INIT(stream); pw_context_conf_section_match_rules(impl->context, "stream.rules", &stream->properties->dict, execute_match, &match);