diff --git a/spa/plugins/avb/avb-pcm-source.c b/spa/plugins/avb/avb-pcm-source.c index a680e9730..619dd53f7 100644 --- a/spa/plugins/avb/avb-pcm-source.c +++ b/spa/plugins/avb/avb-pcm-source.c @@ -703,6 +703,7 @@ static int impl_node_process(void *object) struct state *this = object; struct port *port; struct spa_io_buffers *io; + struct buffer *b; spa_return_val_if_fail(this != NULL, -EINVAL); @@ -712,7 +713,29 @@ static int impl_node_process(void *object) spa_return_val_if_fail(io != NULL, -EIO); spa_log_trace_fp(this->log, "%p: process %d %d/%d", this, io->status, - io->buffer_id, this->n_buffers); + io->buffer_id, port->n_buffers); + + if (io->status == SPA_STATUS_HAVE_DATA) + return SPA_STATUS_HAVE_DATA; + + if (io->buffer_id < port->n_buffers) { + spa_avb_recycle_buffer(this, port, io->buffer_id); + io->buffer_id = SPA_ID_INVALID; + } + + if (spa_list_is_empty(&port->ready) && this->following) { + } + if (spa_list_is_empty(&port->ready) || !this->following) + return SPA_STATUS_OK; + + b = spa_list_first(&port->ready, struct buffer, link); + spa_list_remove(&b->link); + SPA_FLAG_SET(b->flags, BUFFER_FLAG_OUT); + + spa_log_trace_fp(this->log, "%p: dequeue buffer %d", this, b->id); + + io->buffer_id = b->id; + io->status = SPA_STATUS_HAVE_DATA; return SPA_STATUS_HAVE_DATA; } diff --git a/spa/plugins/avb/avb-pcm.c b/spa/plugins/avb/avb-pcm.c index c56d4c608..49e934b1c 100644 --- a/spa/plugins/avb/avb-pcm.c +++ b/spa/plugins/avb/avb-pcm.c @@ -757,6 +757,15 @@ static void reset_buffers(struct state *this, struct port *port) static bool is_pdu_valid(struct state *state) { + uint64_t val64; + if (avtp_aaf_pdu_get(state->pdu, AVTP_AAF_FIELD_SEQ_NUM, &val64) < 0) + return false; + + if (state->prev_seq != 0 && (uint8_t)(state->prev_seq + 1) != val64) { + spa_log_warn(state->log, "dropped packets %d != %d", state->prev_seq + 1, (int)val64); + } + state->prev_seq = val64; + return true; } @@ -853,6 +862,8 @@ int spa_avb_write(struct state *state) avail = size - offs; n_bytes = SPA_MIN(avail, to_write); + if (n_bytes == 0) + break; spa_ringbuffer_write_data(&state->ring, state->ringbuffer_data, @@ -976,6 +987,7 @@ static int handle_capture(struct state *state, uint64_t current_time) spa_list_append(&port->ready, &b->link); index += n_bytes; + avail -= n_bytes; spa_ringbuffer_read_update(&state->ring, index); } diff --git a/spa/plugins/avb/avb-pcm.h b/spa/plugins/avb/avb-pcm.h index 3f69816ad..93af975e1 100644 --- a/spa/plugins/avb/avb-pcm.h +++ b/spa/plugins/avb/avb-pcm.h @@ -240,6 +240,7 @@ struct state { size_t pdu_size; int64_t pdu_period; uint8_t pdu_seq; + uint8_t prev_seq; struct iovec iov; struct msghdr msg; diff --git a/src/daemon/pipewire.conf.in b/src/daemon/pipewire.conf.in index a5ecfc47b..02076b533 100644 --- a/src/daemon/pipewire.conf.in +++ b/src/daemon/pipewire.conf.in @@ -243,6 +243,7 @@ context.objects = [ node.description = "AVB Sink" media.class = "Audio/Sink" audio.channels = 8 + #avb.ifname = "eth0" avb.ifname = "enp3s0" #avb.macaddr = "01:AA:AA:AA:AA:AA" #avb.prio = 0 @@ -250,6 +251,7 @@ context.objects = [ #avb.mtt = 50000000 #avb.time-uncertainty = 1000000 #avb.frames-per-pdu = 8 + avb.frames-per-pdu = 32 #avb.ptime-tolerance = 100000 } } @@ -260,6 +262,7 @@ context.objects = [ node.description = "AVB Source" media.class = "Audio/Source" audio.channels = 8 + #avb.ifname = "eth0" avb.ifname = "enp3s0" #avb.macaddr = "01:AA:AA:AA:AA:AA" #avb.prio = 0 @@ -267,6 +270,7 @@ context.objects = [ #avb.mtt = 50000000 #avb.time-uncertainty = 1000000 #avb.frames-per-pdu = 8 + avb.frames-per-pdu = 32 #avb.ptime-tolerance = 100000 } }