From f2c1fe106dcc428cf8b52c5c922f1e097a06c46d Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Mon, 16 May 2022 16:11:57 +0200 Subject: [PATCH] audioconvert2: more fixes --- spa/plugins/audioconvert/audioconvert2.c | 171 ++++++++++++----------- 1 file changed, 90 insertions(+), 81 deletions(-) diff --git a/spa/plugins/audioconvert/audioconvert2.c b/spa/plugins/audioconvert/audioconvert2.c index a89fbb60f..7fe630e2c 100644 --- a/spa/plugins/audioconvert/audioconvert2.c +++ b/spa/plugins/audioconvert/audioconvert2.c @@ -1788,7 +1788,7 @@ static void queue_buffer(struct impl *this, struct port *port, uint32_t id) SPA_FLAG_SET(b->flags, BUFFER_FLAG_QUEUED); } -static struct buffer *dequeue_buffer(struct impl *this, struct port *port) +static struct buffer *peek_buffer(struct impl *this, struct port *port) { struct buffer *b; @@ -1796,12 +1796,17 @@ static struct buffer *dequeue_buffer(struct impl *this, struct port *port) return NULL; b = spa_list_first(&port->queue, struct buffer, link); + spa_log_trace_fp(this->log, "%p: peek buffer %d on port %d %u", + this, b->id, port->id, b->flags); + return b; +} + +static void dequeue_buffer(struct impl *this, struct port *port, struct buffer *b) +{ spa_list_remove(&b->link); SPA_FLAG_CLEAR(b->flags, BUFFER_FLAG_QUEUED); spa_log_trace_fp(this->log, "%p: dequeue buffer %d on port %d %u", this, b->id, port->id, b->flags); - - return b; } static int @@ -1927,49 +1932,6 @@ static int impl_node_port_reuse_buffer(void *object, uint32_t port_id, uint32_t return 0; } -static inline int get_in_buffer(struct impl *this, struct port *port, struct buffer **buf) -{ - struct spa_io_buffers *io; - - if ((io = port->io) == NULL) { - spa_log_debug(this->log, "%p: no io on port %d", - this, port->id); - return -EIO; - } - if (io->status != SPA_STATUS_HAVE_DATA || - io->buffer_id >= port->n_buffers) { - spa_log_debug(this->log, "%p: empty port %d %p %d %d %d", - this, port->id, io, io->status, io->buffer_id, - port->n_buffers); - return -EPIPE; - } - - *buf = &port->buffers[io->buffer_id]; - io->status = SPA_STATUS_NEED_DATA; - - return 0; -} - -static inline int get_out_buffer(struct impl *this, struct port *port, struct buffer **buf) -{ - struct spa_io_buffers *io; - - if (SPA_UNLIKELY((io = port->io) == NULL || - io->status == SPA_STATUS_HAVE_DATA)) - return SPA_STATUS_HAVE_DATA; - - if (SPA_LIKELY(io->buffer_id < port->n_buffers)) - queue_buffer(this, port, io->buffer_id); - - if (SPA_UNLIKELY((*buf = dequeue_buffer(this, port)) == NULL)) - return -EPIPE; - - io->status = SPA_STATUS_HAVE_DATA; - io->buffer_id = (*buf)->id; - - return 0; -} - static void resample_update_rate_match(struct impl *this, bool passthrough, uint32_t out_size, uint32_t in_queued) { double rate = this->rate_scale / this->props.rate; @@ -2011,26 +1973,41 @@ static int impl_node_process(void *object) const void *src_datas[MAX_PORTS], **in_datas; void *dst_datas[MAX_PORTS], *mon_datas[MAX_PORTS], **out_datas; uint32_t i, j, n_src_datas = 0, n_dst_datas = 0, n_mon_datas = 0; - uint32_t in_samples, max_mon, max_out, out_samples, quant_samples; + uint32_t n_samples, max_mon, max_out, quant_samples; struct port *port; - struct buffer *buf; - struct spa_data *bd, *dst_bufs[MAX_PORTS]; + struct buffer *buf, *out_bufs[MAX_PORTS]; + struct spa_data *bd; struct dir *dir; int tmp = 0; bool in_passthrough, mix_passthrough, resample_passthrough, out_passthrough, end_passthrough; bool flush_out; uint32_t in_len, out_len, remap; + struct spa_io_buffers *io; dir = &this->dir[SPA_DIRECTION_INPUT]; in_passthrough = dir->conv.is_passthrough; - in_samples = UINT32_MAX; + n_samples = UINT32_MAX; /* collect input port data */ for (i = 0; i < dir->n_ports; i++) { port = GET_IN_PORT(this, i); - if (SPA_UNLIKELY(get_in_buffer(this, port, &buf) != 0)) { + if ((io = port->io) == NULL) { + spa_log_debug(this->log, "%p: no io on port %d", + this, port->id); + buf = NULL; + } else if (io->status != SPA_STATUS_HAVE_DATA || + io->buffer_id >= port->n_buffers) { + spa_log_debug(this->log, "%p: empty port %d %p %d %d %d", + this, port->id, io, io->status, io->buffer_id, + port->n_buffers); + buf = NULL; + } else { + buf = &port->buffers[io->buffer_id]; + } + + if (buf == NULL) { for (j = 0; j < port->blocks; j++) { remap = dir->src_remap[n_src_datas++]; src_datas[remap] = SPA_PTR_ALIGN(this->empty, MAX_ALIGN, void); @@ -2039,16 +2016,14 @@ static int impl_node_process(void *object) } } else { for (j = 0; j < port->blocks; j++) { - remap = dir->src_remap[n_src_datas++]; - bd = &buf->buf->datas[j]; - + remap = dir->src_remap[n_src_datas++]; src_datas[remap] = SPA_PTROFF(bd->data, bd->chunk->offset, void); - in_samples = SPA_MIN(in_samples, bd->chunk->size / port->stride); + n_samples = SPA_MIN(n_samples, bd->chunk->size / port->stride); spa_log_trace_fp(this->log, "%p: %d %d %d->%d", this, - bd->chunk->size, in_samples, + bd->chunk->size, n_samples, i * port->blocks + j, remap); } } @@ -2079,7 +2054,7 @@ static int impl_node_process(void *object) quant_samples = this->quantum_limit; resample_passthrough = resample_is_passthrough(this); - if (in_samples == UINT32_MAX) { + if (n_samples == UINT32_MAX) { /* no input, ask for more */ resample_update_rate_match(this, resample_passthrough, quant_samples, 0); return SPA_STATUS_NEED_DATA; @@ -2094,13 +2069,24 @@ static int impl_node_process(void *object) for (i = 0; i < dir->n_ports; i++) { port = GET_OUT_PORT(this, i); - if (SPA_UNLIKELY(get_out_buffer(this, port, &buf) != 0)) { + if (SPA_UNLIKELY((io = port->io) == NULL || + io->status == SPA_STATUS_HAVE_DATA)) { + buf = NULL; + } + else { + if (SPA_LIKELY(io->buffer_id < port->n_buffers)) + queue_buffer(this, port, io->buffer_id); + + buf = peek_buffer(this, port); + } + out_bufs[i] = buf; + + if (SPA_UNLIKELY(buf == NULL)) { for (j = 0; j < port->blocks; j++) { if (port->is_monitor) { - mon_datas[n_mon_datas++] = SPA_PTR_ALIGN(this->scratch, MAX_ALIGN, void); + mon_datas[n_mon_datas++] = NULL; } else { remap = dir->dst_remap[n_dst_datas++]; - dst_bufs[remap] = NULL; dst_datas[remap] = SPA_PTR_ALIGN(this->scratch, MAX_ALIGN, void); } spa_log_trace_fp(this->log, "%p: %d->%d %d", this, @@ -2111,16 +2097,13 @@ static int impl_node_process(void *object) bd = &buf->buf->datas[j]; bd->chunk->offset = 0; + bd->chunk->size = 0; if (port->is_monitor) { mon_datas[n_mon_datas++] = bd->data; - max_mon = SPA_MIN(max_mon, in_samples); max_mon = SPA_MIN(max_mon, bd->maxsize / port->stride); - bd->chunk->size = max_mon * port->stride; } else { remap = dir->dst_remap[n_dst_datas++]; - dst_bufs[remap] = bd; dst_datas[remap] = bd->data; - bd->chunk->size = 0; max_out = SPA_MIN(max_out, bd->maxsize / port->stride); } spa_log_trace_fp(this->log, "%p: %d %d %d->%d", this, @@ -2153,15 +2136,13 @@ static int impl_node_process(void *object) if (this->direction == SPA_DIRECTION_INPUT) { /* in split mode we need to output exactly the size of the * duration so we don't try to flush early */ - out_samples = SPA_MIN(max_out, quant_samples); + max_out = SPA_MIN(max_out, quant_samples); flush_out = false; } else { /* in merge mode we consume one duration of samples and * always output the resulting data */ + max_out = SPA_MIN(max_out, this->quantum_limit); flush_out = true; - if (this->io_rate_match) - in_samples = SPA_MIN(in_samples, this->io_rate_match->size); - out_samples = SPA_MIN(max_out, this->quantum_limit); } in_datas = (const void**)src_datas; @@ -2170,7 +2151,7 @@ static int impl_node_process(void *object) out_datas = (void **)dst_datas; else out_datas = (void **)this->tmp_datas[(tmp++) & 1]; - convert_process(&this->dir[SPA_DIRECTION_INPUT].conv, out_datas, in_datas, in_samples); + convert_process(&this->dir[SPA_DIRECTION_INPUT].conv, out_datas, in_datas, n_samples); } else { out_datas = (void **)in_datas; } @@ -2181,7 +2162,7 @@ static int impl_node_process(void *object) out_datas = (void **)dst_datas; else out_datas = (void **)this->tmp_datas[(tmp++) & 1]; - channelmix_process(&this->mix, out_datas, in_datas, in_samples); + channelmix_process(&this->mix, out_datas, in_datas, n_samples); } else { out_datas = (void **)in_datas; } @@ -2193,25 +2174,53 @@ static int impl_node_process(void *object) else out_datas = (void **)this->tmp_datas[(tmp++) & 1]; - in_len = in_samples; - out_len = out_samples; + in_len = n_samples; + out_len = max_out; resample_process(&this->resample, in_datas, &in_len, out_datas, &out_len); } else { out_datas = (void **)in_datas; - out_len = in_samples; + out_len = n_samples; } - resample_update_rate_match(this, resample_passthrough, out_samples, 0); - in_samples = out_len; + resample_update_rate_match(this, resample_passthrough, max_out, 0); + n_samples = out_len; in_datas = (const void **)out_datas; if (!out_passthrough) - convert_process(&this->dir[SPA_DIRECTION_OUTPUT].conv, dst_datas, in_datas, in_samples); + convert_process(&this->dir[SPA_DIRECTION_OUTPUT].conv, dst_datas, in_datas, n_samples); - for (i = 0; i < n_dst_datas; i++) { - port = GET_OUT_PORT(this, 0); - if (dst_bufs[i]) { - dst_bufs[i]->chunk->size = in_samples * port->stride; - spa_log_debug(this->log, "%d %d", in_samples, dst_bufs[i]->chunk->size); + /* return input buffers */ + dir = &this->dir[SPA_DIRECTION_INPUT]; + for (i = 0; i < dir->n_ports; i++) { + port = GET_IN_PORT(this, i); + if ((io = port->io) != NULL) + io->status = SPA_STATUS_NEED_DATA; + } + + /* queue output buffers */ + dir = &this->dir[SPA_DIRECTION_OUTPUT]; + for (i = 0; i < dir->n_ports; i++) { + port = GET_OUT_PORT(this, i); + buf = out_bufs[i]; + + if (SPA_UNLIKELY((io = port->io) == NULL)) + continue; + + io->status = SPA_STATUS_HAVE_DATA; + io->buffer_id = buf ? buf->id : SPA_ID_INVALID; + + if (buf == NULL) + continue; + + dequeue_buffer(this, port, buf); + + for (j = 0; j < port->blocks; j++) { + bd = &buf->buf->datas[j]; + if (port->is_monitor) + bd->chunk->size = max_mon * port->stride; + else + bd->chunk->size = n_samples * port->stride; + + spa_log_debug(this->log, "%d %d %d", max_mon, n_samples, bd->chunk->size); } } return SPA_STATUS_NEED_DATA | SPA_STATUS_HAVE_DATA;