diff --git a/spa/plugins/bluez5/iso-io.c b/spa/plugins/bluez5/iso-io.c index 120f02680..bb52575bd 100644 --- a/spa/plugins/bluez5/iso-io.c +++ b/spa/plugins/bluez5/iso-io.c @@ -30,8 +30,8 @@ SPA_LOG_TOPIC_DEFINE_STATIC(log_topic, "spa.bluez5.iso"); #define IDLE_TIME (500 * SPA_NSEC_PER_MSEC) #define EMPTY_BUF_SIZE 65536 -#define LATENCY_PERIOD (200 * SPA_NSEC_PER_MSEC) -#define MAX_PACKET_QUEUE 3 +#define LATENCY_PERIOD (1000 * SPA_NSEC_PER_MSEC) +#define MAX_LATENCY (50 * SPA_NSEC_PER_MSEC) struct group { struct spa_log *log; @@ -43,6 +43,7 @@ struct group { uint8_t id; uint64_t next; uint64_t duration; + bool flush; bool started; }; @@ -164,6 +165,57 @@ static void drop_rx(int fd) } while (res >= 0); } +static bool group_latency_check(struct group *group) +{ + struct stream *stream; + int32_t min_latency = INT32_MAX, max_latency = INT32_MIN; + unsigned int kernel_queue = UINT_MAX; + + spa_list_for_each(stream, &group->streams, link) { + if (!stream->sink) + continue; + if (!stream->tx_latency.enabled) + return false; + + if (kernel_queue == UINT_MAX) + kernel_queue = stream->tx_latency.kernel_queue; + + if (group->flush && stream->tx_latency.queue) { + spa_log_debug(group->log, "%p: ISO group:%d latency skip: flushing", + group, group->id); + return true; + } + if (stream->tx_latency.kernel_queue != kernel_queue) { + /* Streams out of sync, try to correct if it persists */ + spa_log_debug(group->log, "%p: ISO group:%d latency skip: imbalance", + group, group->id); + group->flush = true; + return true; + } + } + + group->flush = false; + + spa_list_for_each(stream, &group->streams, link) { + if (!stream->sink) + continue; + if (!stream->tx_latency.valid) + return false; + + min_latency = SPA_MIN(min_latency, stream->tx_latency.ptp.min); + max_latency = SPA_MAX(max_latency, stream->tx_latency.ptp.max); + } + + if (max_latency > MAX_LATENCY) { + spa_log_debug(group->log, "%p: ISO group:%d latency skip: latency %d ms", + group, group->id, (int)(max_latency / SPA_NSEC_PER_MSEC)); + group->flush = true; + return true; + } + + return false; +} + static void group_on_timeout(struct spa_source *source) { struct group *group = source->data; @@ -179,6 +231,8 @@ static void group_on_timeout(struct spa_source *source) group, group->id, spa_strerror(res)); return; } + if (!exp) + return; spa_list_for_each(stream, &group->streams, link) { if (!stream->sink) { @@ -200,12 +254,16 @@ static void group_on_timeout(struct spa_source *source) group->started = true; } + if (group_latency_check(group)) { + spa_list_for_each(stream, &group->streams, link) + spa_bt_latency_reset(&stream->tx_latency); + goto done; + } + /* Produce output */ spa_list_for_each(stream, &group->streams, link) { int res = 0; uint64_t now; - int32_t min_latency = INT32_MAX, max_latency = INT32_MIN; - struct stream *other; if (!stream->sink) continue; @@ -223,44 +281,21 @@ static void group_on_timeout(struct spa_source *source) } } - spa_list_for_each(other, &group->streams, link) { - if (!other->sink || stream == other || !other->tx_latency.valid) - continue; - min_latency = SPA_MIN(min_latency, other->tx_latency.ptp.min); - max_latency = SPA_MAX(max_latency, other->tx_latency.ptp.max); - } - - if (stream->tx_latency.valid && min_latency <= max_latency && - stream->tx_latency.ptp.min > min_latency + (int64_t)group->duration/2 && - stream->tx_latency.ptp.max > max_latency + (int64_t)group->duration/2) { - spa_log_debug(group->log, "%p: ISO group:%u latency skip align fd:%d", group, group->id, stream->fd); - spa_bt_latency_reset(&stream->tx_latency); - goto stream_done; - } - - /* TODO: this should use rate match */ - if (stream->tx_latency.valid && - stream->tx_latency.ptp.min > MAX_PACKET_QUEUE * (int64_t)group->duration) { - spa_log_debug(group->log, "%p: ISO group:%u latency skip fd:%d", group, group->id, stream->fd); - spa_bt_latency_reset(&stream->tx_latency); - goto stream_done; - } - now = get_time_ns(group->data_system, CLOCK_REALTIME); - res = send(stream->fd, stream->this.buf, stream->this.size, MSG_DONTWAIT | MSG_NOSIGNAL); + res = spa_bt_send(stream->fd, stream->this.buf, stream->this.size, + &stream->tx_latency, now); if (res < 0) { res = -errno; fail = true; - } else { - spa_bt_latency_sent(&stream->tx_latency, now); + group->flush = true; } - stream_done: - spa_log_trace(group->log, "%p: ISO group:%u sent fd:%d size:%u ts:%u idle:%d res:%d latency:%d..%d us", + spa_log_trace(group->log, "%p: ISO group:%u sent fd:%d size:%u ts:%u idle:%d res:%d latency:%d..%d%sus queue:%u", group, group->id, stream->fd, (unsigned)stream->this.size, (unsigned)stream->this.timestamp, stream->idle, res, - stream->tx_latency.valid ? stream->tx_latency.ptp.min/1000 : -1, - stream->tx_latency.valid ? stream->tx_latency.ptp.max/1000 : -1); + stream->tx_latency.ptp.min/1000, stream->tx_latency.ptp.max/1000, + stream->tx_latency.valid ? " " : "* ", + stream->tx_latency.queue); stream->this.size = 0; } @@ -268,6 +303,7 @@ static void group_on_timeout(struct spa_source *source) if (fail) spa_log_debug(group->log, "%p: ISO group:%d send failure", group, group->id); +done: /* Pull data for the next interval */ group->next += exp * group->duration;