diff --git a/spa/plugins/bluez5/bt-latency.h b/spa/plugins/bluez5/bt-latency.h new file mode 100644 index 000000000..6fba869aa --- /dev/null +++ b/spa/plugins/bluez5/bt-latency.h @@ -0,0 +1,175 @@ +/* Spa Bluez5 ISO I/O */ +/* SPDX-FileCopyrightText: Copyright © 2024 Pauli Virtanen */ +/* SPDX-License-Identifier: MIT */ + +#ifndef SPA_BLUEZ5_BT_LATENCY_H +#define SPA_BLUEZ5_BT_LATENCY_H + +#include +#include +#include +#include + +#include +#include + +#include "rate-control.h" + +/* New kernel API */ +#ifndef BT_SCM_ERROR +#define BT_SCM_ERROR 0x04 +#endif +#ifndef BT_POLL_ERRQUEUE +#define BT_POLL_ERRQUEUE 21 +#endif + +/** + * Bluetooth latency tracking. + */ +struct spa_bt_latency +{ + uint64_t value; + struct spa_bt_ptp ptp; + bool valid; + bool disabled; + + struct { + int64_t send[64]; + uint32_t pos; + int64_t prev_tx; + } impl; +}; + +static inline void spa_bt_latency_init(struct spa_bt_latency *lat, int fd, + uint32_t period, struct spa_log *log) +{ + int so_timestamping = (SOF_TIMESTAMPING_TX_SOFTWARE | SOF_TIMESTAMPING_SOFTWARE | + SOF_TIMESTAMPING_OPT_ID | SOF_TIMESTAMPING_OPT_TSONLY); + uint32_t flag; + int res; + + spa_zero(*lat); + + flag = 0; + res = setsockopt(fd, SOL_BLUETOOTH, BT_POLL_ERRQUEUE, &flag, sizeof(flag)); + if (res < 0) { + spa_log_warn(log, "setsockopt(BT_POLL_ERRQUEUE) failed (kernel feature not enabled?): %d (%m)", errno); + lat->disabled = true; + return; + } + + res = setsockopt(fd, SOL_SOCKET, SO_TIMESTAMPING, &so_timestamping, sizeof(so_timestamping)); + if (res < 0) { + spa_log_warn(log, "setsockopt(SO_TIMESTAMPING) failed (kernel feature not enabled?): %d (%m)", errno); + lat->disabled = true; + return; + } + + /* Flush errqueue on start */ + do { + res = recv(fd, NULL, 0, MSG_ERRQUEUE | MSG_DONTWAIT | MSG_TRUNC); + } while (res == 0); + + spa_bt_ptp_init(&lat->ptp, period, period / 2); +} + +static inline void spa_bt_latency_reset(struct spa_bt_latency *lat) +{ + lat->value = 0; + lat->valid = false; + spa_bt_ptp_init(&lat->ptp, lat->ptp.period, lat->ptp.period / 2); +} + +static inline void spa_bt_latency_sent(struct spa_bt_latency *lat, uint64_t now) +{ + const unsigned int n = SPA_N_ELEMENTS(lat->impl.send); + + if (lat->disabled) + return; + + lat->impl.send[lat->impl.pos++] = now; + if (lat->impl.pos >= n) + lat->impl.pos = 0; +} + +static inline int spa_bt_latency_recv_errqueue(struct spa_bt_latency *lat, int fd, struct spa_log *log) +{ + struct { + struct cmsghdr cm; + char control[512]; + } control; + + if (lat->disabled) + return -EOPNOTSUPP; + + do { + struct iovec data = { + .iov_base = NULL, + .iov_len = 0 + }; + struct msghdr msg = { + .msg_iov = &data, + .msg_iovlen = 1, + .msg_control = &control, + .msg_controllen = sizeof(control), + }; + struct cmsghdr *cmsg; + struct scm_timestamping *tss = NULL; + struct sock_extended_err *serr = NULL; + int res; + + res = recvmsg(fd, &msg, MSG_ERRQUEUE | MSG_DONTWAIT); + if (res < 0) { + if (errno == EAGAIN || errno == EWOULDBLOCK) + break; + return -errno; + } + + for (cmsg = CMSG_FIRSTHDR(&msg); cmsg; cmsg = CMSG_NXTHDR(&msg, cmsg)) { + if (cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SCM_TIMESTAMPING) + tss = (void *)CMSG_DATA(cmsg); + else if (cmsg->cmsg_level == SOL_BLUETOOTH && cmsg->cmsg_type == BT_SCM_ERROR) + serr = (void *)CMSG_DATA(cmsg); + else + continue; + } + + if (!tss || !serr || serr->ee_errno != ENOMSG || serr->ee_origin != SO_EE_ORIGIN_TIMESTAMPING) + return -EINVAL; + if (serr->ee_info != SCM_TSTAMP_SND) + continue; + + struct timespec *ts = &tss->ts[0]; + int64_t tx_time = SPA_TIMESPEC_TO_NSEC(ts); + uint32_t tx_pos = serr->ee_data % SPA_N_ELEMENTS(lat->impl.send); + + lat->value = tx_time - lat->impl.send[tx_pos]; + + if (lat->impl.prev_tx && tx_time > lat->impl.prev_tx) + spa_bt_ptp_update(&lat->ptp, lat->value, tx_time - lat->impl.prev_tx); + + lat->impl.prev_tx = tx_time; + + spa_log_trace(log, "fd:%d latency[%d] nsec:%"PRIu64" range:%d..%d ms", + fd, tx_pos, lat->value, + (int)(spa_bt_ptp_valid(&lat->ptp) ? lat->ptp.min / SPA_NSEC_PER_MSEC : -1), + (int)(spa_bt_ptp_valid(&lat->ptp) ? lat->ptp.max / SPA_NSEC_PER_MSEC : -1)); + } while (true); + + lat->valid = spa_bt_ptp_valid(&lat->ptp); + + return 0; +} + +static inline void spa_bt_latency_flush(struct spa_bt_latency *lat, int fd, struct spa_log *log) +{ + int so_timestamping = 0; + + /* Disable timestamping and flush errqueue */ + setsockopt(fd, SOL_SOCKET, SO_TIMESTAMPING, &so_timestamping, sizeof(so_timestamping)); + spa_bt_latency_recv_errqueue(lat, fd, log); + + lat->disabled = true; +} + +#endif diff --git a/spa/plugins/bluez5/decode-buffer.h b/spa/plugins/bluez5/decode-buffer.h index 3ca080da3..3b73994e2 100644 --- a/spa/plugins/bluez5/decode-buffer.h +++ b/spa/plugins/bluez5/decode-buffer.h @@ -104,8 +104,8 @@ static int spa_bt_decode_buffer_init(struct spa_bt_decode_buffer *this, struct s spa_bt_rate_control_init(&this->ctl, 0); - spa_bt_ptp_init(&this->spike, (uint64_t)this->rate * BUFFERING_LONG_MSEC / 1000); - spa_bt_ptp_init(&this->packet_size, (uint64_t)this->rate * BUFFERING_SHORT_MSEC / 1000); + spa_bt_ptp_init(&this->spike, (uint64_t)this->rate * BUFFERING_LONG_MSEC / 1000, 0); + spa_bt_ptp_init(&this->packet_size, (uint64_t)this->rate * BUFFERING_SHORT_MSEC / 1000, 0); if ((this->buffer_decoded = malloc(this->buffer_size)) == NULL) { this->buffer_size = 0; diff --git a/spa/plugins/bluez5/iso-io.c b/spa/plugins/bluez5/iso-io.c index de7a8565a..8aff446d0 100644 --- a/spa/plugins/bluez5/iso-io.c +++ b/spa/plugins/bluez5/iso-io.c @@ -7,7 +7,6 @@ #include #include #include -#include #include #include @@ -26,9 +25,14 @@ SPA_LOG_TOPIC_DEFINE_STATIC(log_topic, "spa.bluez5.iso"); #undef SPA_LOG_TOPIC_DEFAULT #define SPA_LOG_TOPIC_DEFAULT &log_topic +#include "bt-latency.h" + #define IDLE_TIME (500 * SPA_NSEC_PER_MSEC) #define EMPTY_BUF_SIZE 65536 +#define LATENCY_PERIOD (200 * SPA_NSEC_PER_MSEC) +#define MAX_PACKET_QUEUE 3 + struct group { struct spa_log *log; struct spa_loop *data_loop; @@ -39,7 +43,6 @@ struct group { uint8_t id; uint64_t next; uint64_t duration; - uint32_t paused; bool started; }; @@ -55,6 +58,8 @@ struct stream { const struct media_codec *codec; uint32_t block_size; + + struct spa_bt_latency tx_latency; }; struct modify_info @@ -131,15 +136,20 @@ static int set_timeout(struct group *group, uint64_t time) group->timerfd, SPA_FD_TIMER_ABSTIME, &ts, NULL); } -static int set_timers(struct group *group) +static uint64_t get_time_ns(struct spa_system *system, clockid_t clockid) { struct timespec now; + spa_system_clock_gettime(system, clockid, &now); + return SPA_TIMESPEC_TO_NSEC(&now); +} + +static int set_timers(struct group *group) +{ if (group->duration == 0) return -EINVAL; - spa_system_clock_gettime(group->data_system, CLOCK_MONOTONIC, &now); - group->next = SPA_ROUND_UP(SPA_TIMESPEC_TO_NSEC(&now) + group->duration, + group->next = SPA_ROUND_UP(get_time_ns(group->data_system, CLOCK_MONOTONIC) + group->duration, group->duration); return set_timeout(group, group->next); @@ -170,10 +180,6 @@ static void group_on_timeout(struct spa_source *source) return; } - /* - * If a stream failed, pause output of all streams for a while to avoid - * desynchronization. - */ spa_list_for_each(stream, &group->streams, link) { if (!stream->sink) { if (!stream->pull) { @@ -183,6 +189,8 @@ static void group_on_timeout(struct spa_source *source) continue; } + spa_bt_latency_recv_errqueue(&stream->tx_latency, stream->fd, group->log); + if (stream->this.need_resync) { resync = true; stream->this.need_resync = false; @@ -192,18 +200,16 @@ static void group_on_timeout(struct spa_source *source) group->started = true; } - if (group->paused) { - --group->paused; - spa_log_debug(group->log, "%p: ISO group:%u paused:%u", group, group->id, group->paused); - } - /* Produce output */ spa_list_for_each(stream, &group->streams, link) { int res; + uint64_t now; + int32_t min_latency = INT32_MAX, max_latency = INT32_MIN; + struct stream *other; if (!stream->sink) continue; - if (group->paused || !group->started) { + if (!group->started) { stream->this.resync = true; stream->this.size = 0; continue; @@ -217,21 +223,50 @@ static void group_on_timeout(struct spa_source *source) } } + spa_list_for_each(other, &group->streams, link) { + if (!other->sink || stream == other || !other->tx_latency.valid) + continue; + min_latency = SPA_MIN(min_latency, other->tx_latency.ptp.min); + max_latency = SPA_MAX(max_latency, other->tx_latency.ptp.max); + } + + if (stream->tx_latency.valid && min_latency <= max_latency && + stream->tx_latency.ptp.min > min_latency + (int64_t)group->duration/2 && + stream->tx_latency.ptp.max > max_latency + (int64_t)group->duration/2) { + spa_log_debug(group->log, "%p: ISO group:%u latency skip align fd:%d", group, group->id, stream->fd); + spa_bt_latency_reset(&stream->tx_latency); + goto stream_done; + } + + /* TODO: this should use rate match */ + if (stream->tx_latency.valid && + stream->tx_latency.ptp.min > MAX_PACKET_QUEUE * (int64_t)group->duration) { + spa_log_debug(group->log, "%p: ISO group:%u latency skip fd:%d", group, group->id, stream->fd); + spa_bt_latency_reset(&stream->tx_latency); + goto stream_done; + } + + now = get_time_ns(group->data_system, CLOCK_REALTIME); res = send(stream->fd, stream->this.buf, stream->this.size, MSG_DONTWAIT | MSG_NOSIGNAL); if (res < 0) { res = -errno; fail = true; + } else { + spa_bt_latency_sent(&stream->tx_latency, now); } - spa_log_trace(group->log, "%p: ISO group:%u sent fd:%d size:%u ts:%u idle:%d res:%d", + stream_done: + spa_log_trace(group->log, "%p: ISO group:%u sent fd:%d size:%u ts:%u idle:%d res:%d latency:%d..%d us", group, group->id, stream->fd, (unsigned)stream->this.size, - (unsigned)stream->this.timestamp, stream->idle, res); + (unsigned)stream->this.timestamp, stream->idle, res, + stream->tx_latency.valid ? stream->tx_latency.ptp.min/1000 : -1, + stream->tx_latency.valid ? stream->tx_latency.ptp.max/1000 : -1); stream->this.size = 0; } if (fail) - group->paused = 1u + IDLE_TIME / group->duration; + spa_log_debug(group->log, "%p: ISO group:%d send failure", group, group->id); /* Pull data for the next interval */ group->next += exp * group->duration; @@ -399,6 +434,8 @@ static struct stream *stream_create(struct spa_bt_transport *t, struct group *gr stream->this.format = format; stream->block_size = block_size; + spa_bt_latency_init(&stream->tx_latency, stream->fd, LATENCY_PERIOD, group->log); + if (sink) stream_silence(stream); @@ -453,6 +490,8 @@ void spa_bt_iso_io_destroy(struct spa_bt_iso_io *this) stream_unlink(stream); + spa_bt_latency_flush(&stream->tx_latency, stream->fd, stream->group->log); + if (spa_list_is_empty(&stream->group->streams)) group_destroy(stream->group); @@ -508,3 +547,12 @@ void spa_bt_iso_io_set_cb(struct spa_bt_iso_io *this, spa_bt_iso_io_pull_t pull, return; } } + +/** Must be called from data thread */ +int spa_bt_iso_io_recv_errqueue(struct spa_bt_iso_io *this) +{ + struct stream *stream = SPA_CONTAINER_OF(this, struct stream, this); + struct group *group = stream->group; + + return spa_bt_latency_recv_errqueue(&stream->tx_latency, stream->fd, group->log); +} diff --git a/spa/plugins/bluez5/iso-io.h b/spa/plugins/bluez5/iso-io.h index 2200302cd..ed49c77c1 100644 --- a/spa/plugins/bluez5/iso-io.h +++ b/spa/plugins/bluez5/iso-io.h @@ -44,5 +44,6 @@ struct spa_bt_iso_io *spa_bt_iso_io_create(struct spa_bt_transport *t, struct spa_bt_iso_io *spa_bt_iso_io_attach(struct spa_bt_iso_io *io, struct spa_bt_transport *t); void spa_bt_iso_io_destroy(struct spa_bt_iso_io *io); void spa_bt_iso_io_set_cb(struct spa_bt_iso_io *io, spa_bt_iso_io_pull_t pull, void *user_data); +int spa_bt_iso_io_recv_errqueue(struct spa_bt_iso_io *io); #endif diff --git a/spa/plugins/bluez5/media-sink.c b/spa/plugins/bluez5/media-sink.c index 546bc7f01..80a4f515f 100644 --- a/spa/plugins/bluez5/media-sink.c +++ b/spa/plugins/bluez5/media-sink.c @@ -45,6 +45,8 @@ SPA_LOG_TOPIC_DEFINE_STATIC(log_topic, "spa.bluez5.sink.media"); #undef SPA_LOG_TOPIC_DEFAULT #define SPA_LOG_TOPIC_DEFAULT &log_topic +#include "bt-latency.h" + #define DEFAULT_CLOCK_NAME "clock.system.monotonic" struct props { @@ -57,6 +59,7 @@ struct props { #define MAX_BUFFERS 32 #define BUFFER_SIZE (8192*8) #define RATE_CTL_DIFF_MAX 0.005 +#define LATENCY_PERIOD (200 * SPA_NSEC_PER_MSEC) /* Wait for two cycles before trying to sync ISO. On start/driver reassign, * first cycle may have strange number of samples. */ @@ -1086,9 +1089,18 @@ static void media_on_flush_error(struct spa_source *source) { struct impl *this = source->data; + if (source->rmask & SPA_IO_ERR) { + /* TX timestamp info? */ + if (this->transport && this->transport->iso_io) + if (spa_bt_iso_io_recv_errqueue(this->transport->iso_io) == 0) + return; + + /* Otherwise: actual error */ + } + spa_log_trace(this->log, "%p: flush event", this); - if (source->rmask & (SPA_IO_ERR | SPA_IO_HUP)) { + if (source->rmask & (SPA_IO_HUP | SPA_IO_ERR)) { spa_log_warn(this->log, "%p: error %d", this, source->rmask); if (this->flush_source.loop) spa_loop_remove_source(this->data_loop, &this->flush_source); @@ -1372,7 +1384,6 @@ static int do_remove_transport_source(struct spa_loop *loop, if (this->flush_source.loop) spa_loop_remove_source(this->data_loop, &this->flush_source); - if (this->flush_timer_source.loop) spa_loop_remove_source(this->data_loop, &this->flush_timer_source); enable_flush_timer(this, false); diff --git a/spa/plugins/bluez5/media-source.c b/spa/plugins/bluez5/media-source.c index c5e6753fc..12bcf5b8f 100644 --- a/spa/plugins/bluez5/media-source.c +++ b/spa/plugins/bluez5/media-source.c @@ -148,6 +148,8 @@ struct impl { uint8_t buffer_read[4096]; struct timespec now; uint64_t sample_count; + + uint32_t errqueue_count; }; #define CHECK_PORT(this,d,p) ((d) == SPA_DIRECTION_OUTPUT && (p) == 0) @@ -455,6 +457,24 @@ static int32_t decode_data(struct impl *this, uint8_t *src, uint32_t src_size, return dst_size - avail; } +static void handle_errqueue(struct impl *this) +{ + int res; + + /* iso-io/media-sink use these for TX latency. + * Someone else should be reading them, so drop + * only after yielding. + */ + if (this->errqueue_count < 4) { + this->errqueue_count++; + return; + } + + this->errqueue_count = 0; + res = recv(this->fd, NULL, 0, MSG_ERRQUEUE | MSG_TRUNC); + spa_log_trace(this->log, "%p: ignoring errqueue data (%d)", this, res); +} + static void media_on_ready_read(struct spa_source *source) { struct impl *this = source->data; @@ -467,6 +487,11 @@ static void media_on_ready_read(struct spa_source *source) /* make sure the source is an input */ if ((source->rmask & SPA_IO_IN) == 0) { + if (source->rmask & SPA_IO_ERR) { + handle_errqueue(this); + return; + } + spa_log_error(this->log, "source is not an input, rmask=%d", source->rmask); goto stop; } @@ -475,6 +500,8 @@ static void media_on_ready_read(struct spa_source *source) goto stop; } + this->errqueue_count = 0; + spa_log_trace(this->log, "socket poll"); /* read */ @@ -688,6 +715,7 @@ static int transport_start(struct impl *this) } this->sample_count = 0; + this->errqueue_count = 0; this->source.data = this; diff --git a/spa/plugins/bluez5/rate-control.h b/spa/plugins/bluez5/rate-control.h index e7cca7528..7ea882221 100644 --- a/spa/plugins/bluez5/rate-control.h +++ b/spa/plugins/bluez5/rate-control.h @@ -19,10 +19,11 @@ struct spa_bt_ptp int32_t maxs[4]; }; uint32_t pos; + uint32_t left; uint32_t period; }; -static inline void spa_bt_ptp_init(struct spa_bt_ptp *p, int32_t period) +static inline void spa_bt_ptp_init(struct spa_bt_ptp *p, int32_t period, uint32_t min_duration) { size_t i; @@ -31,6 +32,7 @@ static inline void spa_bt_ptp_init(struct spa_bt_ptp *p, int32_t period) p->mins[i] = INT32_MAX; p->maxs[i] = INT32_MIN; } + p->left = min_duration; p->period = period; } @@ -54,6 +56,16 @@ static inline void spa_bt_ptp_update(struct spa_bt_ptp *p, int32_t value, uint32 p->mins[n-1] = INT32_MAX; p->maxs[n-1] = INT32_MIN; } + + if (p->left < duration) + p->left = 0; + else + p->left -= duration; +} + +static inline bool spa_bt_ptp_valid(struct spa_bt_ptp *p) +{ + return p->left == 0; } /**