mirror of
https://gitlab.freedesktop.org/pipewire/pipewire.git
synced 2026-05-24 22:38:11 +02:00
raop: implement retransmission
Keep the last relation between the sequence number and the timestamp (ringbuffer position). When a retransmission is requested for a given sequence number use the relation to calculate the corresponding timestamp and retransmit the packet from the ringbuffer again. See #5276
This commit is contained in:
parent
a6fe6196d5
commit
d56d5fa87a
4 changed files with 96 additions and 43 deletions
|
|
@ -689,6 +689,7 @@ on_control_source_io(void *data, int fd, uint32_t mask)
|
|||
case 0xd5:
|
||||
pw_log_debug("retransmit request seq:%u num:%u", seq, num);
|
||||
/* retransmit request */
|
||||
rtp_stream_resend_packets(impl->stream, seq, num);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -501,12 +501,83 @@ static void set_timer(struct impl *impl, uint64_t time, uint64_t itime)
|
|||
set_timer_running(impl, time != 0 && itime != 0);
|
||||
}
|
||||
|
||||
static void rtp_audio_send_packets(struct impl *impl, uint32_t timestamp, uint32_t tosend,
|
||||
uint16_t seq, uint16_t num, uint64_t set_timestamp, bool first)
|
||||
{
|
||||
struct iovec iov[3];
|
||||
struct rtp_header header;
|
||||
uint32_t stride;
|
||||
|
||||
stride = impl->stride;
|
||||
|
||||
spa_zero(header);
|
||||
header.v = 2;
|
||||
header.pt = impl->payload;
|
||||
header.ssrc = htonl(impl->ssrc);
|
||||
|
||||
iov[0].iov_base = &header;
|
||||
iov[0].iov_len = sizeof(header);
|
||||
|
||||
while (num > 0) {
|
||||
uint32_t rtp_timestamp;
|
||||
|
||||
if (impl->marker_on_first && first)
|
||||
header.m = 1;
|
||||
else
|
||||
header.m = 0;
|
||||
|
||||
rtp_timestamp = impl->ts_offset + impl->ts_align + (set_timestamp ? set_timestamp : timestamp);
|
||||
|
||||
header.sequence_number = htons(seq);
|
||||
header.timestamp = htonl(rtp_timestamp);
|
||||
|
||||
set_iovec(&impl->ring,
|
||||
impl->buffer, impl->actual_max_buffer_size,
|
||||
((uint64_t)timestamp * stride) % impl->actual_max_buffer_size,
|
||||
&iov[1], tosend * stride);
|
||||
|
||||
pw_log_trace_fp("sending %d packet:%d ts_offset:%d timestamp:%u (%f s)",
|
||||
tosend, num_packets, impl->ts_offset, timestamp,
|
||||
(double)timestamp * impl->io_position->clock.rate.num /
|
||||
impl->io_position->clock.rate.denom);
|
||||
|
||||
rtp_stream_call_send_packet(impl, iov, 3);
|
||||
|
||||
seq++;
|
||||
first = false;
|
||||
timestamp += tosend;
|
||||
num--;
|
||||
}
|
||||
}
|
||||
|
||||
static int rtp_audio_resend_packets(struct impl *impl, uint16_t seq, uint16_t num)
|
||||
{
|
||||
uint32_t tosend, last_seq, last_ts, timestamp;
|
||||
int32_t diff;
|
||||
uint64_t ts_seq;
|
||||
|
||||
tosend = impl->psamples;
|
||||
|
||||
ts_seq = SPA_ATOMIC_LOAD(impl->last_ts_seq);
|
||||
last_seq = ts_seq >> 32;
|
||||
last_ts = ts_seq & 0xffffffff;
|
||||
|
||||
diff = (int32_t)(last_seq - seq);
|
||||
if (diff < 0)
|
||||
return -EIO;
|
||||
|
||||
timestamp = last_ts - (diff * tosend);
|
||||
|
||||
pw_log_info("resend %d -> %d %d %d", seq, timestamp, last_ts - timestamp, impl->actual_max_buffer_size);
|
||||
|
||||
rtp_audio_send_packets(impl, timestamp, tosend, seq, num, 0, false);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void rtp_audio_flush_packets(struct impl *impl, uint32_t num_packets, uint64_t set_timestamp)
|
||||
{
|
||||
int32_t avail, tosend;
|
||||
uint32_t stride, timestamp;
|
||||
struct iovec iov[3];
|
||||
struct rtp_header header;
|
||||
uint32_t timestamp;
|
||||
bool insufficient_data;
|
||||
|
||||
avail = spa_ringbuffer_get_read_index(&impl->ring, ×tamp);
|
||||
|
|
@ -533,48 +604,15 @@ static void rtp_audio_flush_packets(struct impl *impl, uint32_t num_packets, uin
|
|||
num_packets = SPA_MIN(num_packets, (uint32_t)(avail / tosend));
|
||||
}
|
||||
|
||||
stride = impl->stride;
|
||||
rtp_audio_send_packets(impl, timestamp, tosend, impl->seq,
|
||||
num_packets, set_timestamp, impl->first);
|
||||
|
||||
spa_zero(header);
|
||||
header.v = 2;
|
||||
header.pt = impl->payload;
|
||||
header.ssrc = htonl(impl->ssrc);
|
||||
|
||||
iov[0].iov_base = &header;
|
||||
iov[0].iov_len = sizeof(header);
|
||||
|
||||
while (num_packets > 0) {
|
||||
uint32_t rtp_timestamp;
|
||||
|
||||
if (impl->marker_on_first && impl->first)
|
||||
header.m = 1;
|
||||
else
|
||||
header.m = 0;
|
||||
|
||||
rtp_timestamp = impl->ts_offset + impl->ts_align + (set_timestamp ? set_timestamp : timestamp);
|
||||
|
||||
header.sequence_number = htons(impl->seq);
|
||||
header.timestamp = htonl(rtp_timestamp);
|
||||
|
||||
set_iovec(&impl->ring,
|
||||
impl->buffer, impl->actual_max_buffer_size,
|
||||
((uint64_t)timestamp * stride) % impl->actual_max_buffer_size,
|
||||
&iov[1], tosend * stride);
|
||||
|
||||
pw_log_trace_fp("sending %d packet:%d ts_offset:%d timestamp:%u (%f s)",
|
||||
tosend, num_packets, impl->ts_offset, timestamp,
|
||||
(double)timestamp * impl->io_position->clock.rate.num /
|
||||
impl->io_position->clock.rate.denom);
|
||||
|
||||
rtp_stream_call_send_packet(impl, iov, 3);
|
||||
|
||||
impl->seq++;
|
||||
impl->first = false;
|
||||
timestamp += tosend;
|
||||
avail -= tosend;
|
||||
num_packets--;
|
||||
}
|
||||
impl->first = false;
|
||||
impl->seq += num_packets;
|
||||
avail -= tosend * num_packets;
|
||||
timestamp += tosend * num_packets;
|
||||
spa_ringbuffer_read_update(&impl->ring, timestamp);
|
||||
SPA_ATOMIC_STORE(impl->last_ts_seq, ((uint64_t)impl->seq << 32) | timestamp);
|
||||
|
||||
done:
|
||||
if (is_timer_running(impl)) {
|
||||
|
|
@ -966,6 +1004,7 @@ static int rtp_audio_init(struct impl *impl, struct pw_core *core, enum spa_dire
|
|||
impl->receive_rtp = rtp_audio_receive;
|
||||
impl->stop_timer = rtp_audio_stop_timer;
|
||||
impl->flush_timeout = rtp_audio_flush_timeout;
|
||||
impl->resend_packets = rtp_audio_resend_packets;
|
||||
|
||||
setup_ptp_sender(impl, core, direction, ptp_driver);
|
||||
|
||||
|
|
|
|||
|
|
@ -172,6 +172,7 @@ struct impl {
|
|||
void (*stop_timer)(struct impl *impl);
|
||||
void (*flush_timeout)(struct impl *impl, uint64_t expirations);
|
||||
void (*deinit)(struct impl *impl, enum spa_direction direction);
|
||||
int (*resend_packets)(struct impl *impl, uint16_t seq, uint16_t num);
|
||||
|
||||
/*
|
||||
* pw_filter where the filter would be driven at the PTP clock
|
||||
|
|
@ -194,6 +195,8 @@ struct impl {
|
|||
uint64_t rtp_base_ts;
|
||||
uint32_t rtp_last_ts;
|
||||
|
||||
uint64_t last_ts_seq;
|
||||
|
||||
/* The process latency, set by on_stream_param_changed(). */
|
||||
struct spa_process_latency_info process_latency;
|
||||
};
|
||||
|
|
@ -1059,6 +1062,14 @@ int rtp_stream_receive_packet(struct rtp_stream *s, uint8_t *buffer, size_t len,
|
|||
struct impl *impl = (struct impl*)s;
|
||||
return impl->receive_rtp(impl, buffer, len, current_time);
|
||||
}
|
||||
int rtp_stream_resend_packets(struct rtp_stream *s, uint16_t seq, uint16_t num)
|
||||
{
|
||||
struct impl *impl = (struct impl*)s;
|
||||
if (impl->resend_packets)
|
||||
return impl->resend_packets(impl, seq, num);
|
||||
else
|
||||
return -ENOTSUP;
|
||||
}
|
||||
|
||||
uint64_t rtp_stream_get_nsec(struct rtp_stream *s)
|
||||
{
|
||||
|
|
|
|||
|
|
@ -66,6 +66,8 @@ int rtp_stream_update_properties(struct rtp_stream *s, const struct spa_dict *di
|
|||
int rtp_stream_receive_packet(struct rtp_stream *s, uint8_t *buffer, size_t len,
|
||||
uint64_t current_time);
|
||||
|
||||
int rtp_stream_resend_packets(struct rtp_stream *s, uint16_t seq, uint16_t num);
|
||||
|
||||
uint64_t rtp_stream_get_nsec(struct rtp_stream *s);
|
||||
|
||||
uint64_t rtp_stream_get_time(struct rtp_stream *s, uint32_t *rate);
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue