From 1e18cded7580e3c51cfd7aef8a2221f2aa6ea234 Mon Sep 17 00:00:00 2001 From: Pauli Virtanen Date: Fri, 31 May 2024 20:31:59 +0300 Subject: [PATCH] bluez5: iso-io: improve latency logic If kernel socket queues for different streams get out of sync, it will mess up time alignment of different streams. If that happens, flush to resync. If total latency becomes too large, flush queue. Get accurate queue sizes from tx timestamping. --- spa/plugins/bluez5/iso-io.c | 104 ++++++++++++++++++++++++------------ 1 file changed, 70 insertions(+), 34 deletions(-) diff --git a/spa/plugins/bluez5/iso-io.c b/spa/plugins/bluez5/iso-io.c index 120f02680..bb52575bd 100644 --- a/spa/plugins/bluez5/iso-io.c +++ b/spa/plugins/bluez5/iso-io.c @@ -30,8 +30,8 @@ SPA_LOG_TOPIC_DEFINE_STATIC(log_topic, "spa.bluez5.iso"); #define IDLE_TIME (500 * SPA_NSEC_PER_MSEC) #define EMPTY_BUF_SIZE 65536 -#define LATENCY_PERIOD (200 * SPA_NSEC_PER_MSEC) -#define MAX_PACKET_QUEUE 3 +#define LATENCY_PERIOD (1000 * SPA_NSEC_PER_MSEC) +#define MAX_LATENCY (50 * SPA_NSEC_PER_MSEC) struct group { struct spa_log *log; @@ -43,6 +43,7 @@ struct group { uint8_t id; uint64_t next; uint64_t duration; + bool flush; bool started; }; @@ -164,6 +165,57 @@ static void drop_rx(int fd) } while (res >= 0); } +static bool group_latency_check(struct group *group) +{ + struct stream *stream; + int32_t min_latency = INT32_MAX, max_latency = INT32_MIN; + unsigned int kernel_queue = UINT_MAX; + + spa_list_for_each(stream, &group->streams, link) { + if (!stream->sink) + continue; + if (!stream->tx_latency.enabled) + return false; + + if (kernel_queue == UINT_MAX) + kernel_queue = stream->tx_latency.kernel_queue; + + if (group->flush && stream->tx_latency.queue) { + spa_log_debug(group->log, "%p: ISO group:%d latency skip: flushing", + group, group->id); + return true; + } + if (stream->tx_latency.kernel_queue != kernel_queue) { + /* Streams out of sync, try to correct if it persists */ + spa_log_debug(group->log, "%p: ISO group:%d latency skip: imbalance", + group, group->id); + group->flush = true; + return true; + } + } + + group->flush = false; + + spa_list_for_each(stream, &group->streams, link) { + if (!stream->sink) + continue; + if (!stream->tx_latency.valid) + return false; + + min_latency = SPA_MIN(min_latency, stream->tx_latency.ptp.min); + max_latency = SPA_MAX(max_latency, stream->tx_latency.ptp.max); + } + + if (max_latency > MAX_LATENCY) { + spa_log_debug(group->log, "%p: ISO group:%d latency skip: latency %d ms", + group, group->id, (int)(max_latency / SPA_NSEC_PER_MSEC)); + group->flush = true; + return true; + } + + return false; +} + static void group_on_timeout(struct spa_source *source) { struct group *group = source->data; @@ -179,6 +231,8 @@ static void group_on_timeout(struct spa_source *source) group, group->id, spa_strerror(res)); return; } + if (!exp) + return; spa_list_for_each(stream, &group->streams, link) { if (!stream->sink) { @@ -200,12 +254,16 @@ static void group_on_timeout(struct spa_source *source) group->started = true; } + if (group_latency_check(group)) { + spa_list_for_each(stream, &group->streams, link) + spa_bt_latency_reset(&stream->tx_latency); + goto done; + } + /* Produce output */ spa_list_for_each(stream, &group->streams, link) { int res = 0; uint64_t now; - int32_t min_latency = INT32_MAX, max_latency = INT32_MIN; - struct stream *other; if (!stream->sink) continue; @@ -223,44 +281,21 @@ static void group_on_timeout(struct spa_source *source) } } - spa_list_for_each(other, &group->streams, link) { - if (!other->sink || stream == other || !other->tx_latency.valid) - continue; - min_latency = SPA_MIN(min_latency, other->tx_latency.ptp.min); - max_latency = SPA_MAX(max_latency, other->tx_latency.ptp.max); - } - - if (stream->tx_latency.valid && min_latency <= max_latency && - stream->tx_latency.ptp.min > min_latency + (int64_t)group->duration/2 && - stream->tx_latency.ptp.max > max_latency + (int64_t)group->duration/2) { - spa_log_debug(group->log, "%p: ISO group:%u latency skip align fd:%d", group, group->id, stream->fd); - spa_bt_latency_reset(&stream->tx_latency); - goto stream_done; - } - - /* TODO: this should use rate match */ - if (stream->tx_latency.valid && - stream->tx_latency.ptp.min > MAX_PACKET_QUEUE * (int64_t)group->duration) { - spa_log_debug(group->log, "%p: ISO group:%u latency skip fd:%d", group, group->id, stream->fd); - spa_bt_latency_reset(&stream->tx_latency); - goto stream_done; - } - now = get_time_ns(group->data_system, CLOCK_REALTIME); - res = send(stream->fd, stream->this.buf, stream->this.size, MSG_DONTWAIT | MSG_NOSIGNAL); + res = spa_bt_send(stream->fd, stream->this.buf, stream->this.size, + &stream->tx_latency, now); if (res < 0) { res = -errno; fail = true; - } else { - spa_bt_latency_sent(&stream->tx_latency, now); + group->flush = true; } - stream_done: - spa_log_trace(group->log, "%p: ISO group:%u sent fd:%d size:%u ts:%u idle:%d res:%d latency:%d..%d us", + spa_log_trace(group->log, "%p: ISO group:%u sent fd:%d size:%u ts:%u idle:%d res:%d latency:%d..%d%sus queue:%u", group, group->id, stream->fd, (unsigned)stream->this.size, (unsigned)stream->this.timestamp, stream->idle, res, - stream->tx_latency.valid ? stream->tx_latency.ptp.min/1000 : -1, - stream->tx_latency.valid ? stream->tx_latency.ptp.max/1000 : -1); + stream->tx_latency.ptp.min/1000, stream->tx_latency.ptp.max/1000, + stream->tx_latency.valid ? " " : "* ", + stream->tx_latency.queue); stream->this.size = 0; } @@ -268,6 +303,7 @@ static void group_on_timeout(struct spa_source *source) if (fail) spa_log_debug(group->log, "%p: ISO group:%d send failure", group, group->id); +done: /* Pull data for the next interval */ group->next += exp * group->duration;