diff --git a/src/modules/module-rtp/audio.c b/src/modules/module-rtp/audio.c index 99be8ee5e..18dd9d0ee 100644 --- a/src/modules/module-rtp/audio.c +++ b/src/modules/module-rtp/audio.c @@ -385,19 +385,31 @@ static void rtp_audio_flush_packets(struct impl *impl, uint32_t num_packets, uin uint32_t stride, timestamp; struct iovec iov[3]; struct rtp_header header; + bool insufficient_data; avail = spa_ringbuffer_get_read_index(&impl->ring, ×tamp); tosend = impl->psamples; - if (avail < tosend) - if (impl->started) + insufficient_data = (avail < tosend); + if (insufficient_data) { + /* There is insufficient data for even a single full packet. + * Handle this depending on the current state. */ + + if (get_internal_stream_state(impl) == RTP_STREAM_INTERNAL_STATE_STARTED) { + /* If the stream is started, just try again later, + * when more data comes in. Enough data for covering + * the psamples amount might be available by then. */ goto done; - else { - /* send last packet before emitting state_changed */ + } else { + /* There is not enough data for a full packet, but the + * stream is no longer in the started state, so the + * remaining data needs to be flushed out now. */ tosend = avail; num_packets = 1; } - else + } else { + /* There is sufficient data for one or more full packets. */ num_packets = SPA_MIN(num_packets, (uint32_t)(avail / tosend)); + } stride = impl->stride; @@ -434,18 +446,30 @@ static void rtp_audio_flush_packets(struct impl *impl, uint32_t num_packets, uin num_packets--; } spa_ringbuffer_read_update(&impl->ring, timestamp); + done: if (impl->timer_running) { - if (impl->started) { - if (avail < tosend) { + if (get_internal_stream_state(impl) != RTP_STREAM_INTERNAL_STATE_STOPPING) { + /* If the stream isn't being stopped, and instead is running, + * keep the timer running if there was sufficient data to + * produce at least one packet. That's because by the time + * the next timer expiration happens, there might be enough + * data available for even more packets. However, if there + * wasn't sufficient data for even one packet, stop the + * timer, since it is likely then that input has ceased + * (at least for now). */ + if (insufficient_data) { set_timer(impl, 0, 0); } } else if (avail <= 0) { - bool started = false; - - /* the stream has been stopped and all packets have been sent */ + /* All packets were sent, and the stream is in the stopping + * state. This means that stream_stop() was called while this + * timer was still sending out remaining packets, and thus, + * stream_stop() could not immediately change the stream to the + * stopping state. Now that all packets have gone out, finish + * the stopping state change. */ set_timer(impl, 0, 0); - pw_loop_invoke(impl->main_loop, do_emit_state_changed, SPA_ID_INVALID, &started, sizeof started, false, impl); + pw_loop_invoke(impl->main_loop, do_finish_stopping_state, SPA_ID_INVALID, NULL, 0, false, impl); } } } diff --git a/src/modules/module-rtp/stream.c b/src/modules/module-rtp/stream.c index 96c06e2ee..f83019a3e 100644 --- a/src/modules/module-rtp/stream.c +++ b/src/modules/module-rtp/stream.c @@ -7,6 +7,7 @@ #include #include +#include #include #include #include @@ -49,6 +50,30 @@ PW_LOG_TOPIC_EXTERN(mod_topic); #define rtp_stream_emit_send_packet(s,i,l) rtp_stream_emit(s, send_packet,0,i,l) #define rtp_stream_emit_send_feedback(s,seq) rtp_stream_emit(s, send_feedback,0,seq) +enum rtp_stream_internal_state { + /* The state when the stream is idle / stopped. The background + * timer that may be used for sending out buffered data + * must not be running in this state. If the separate PTP sender + * is being used, then that one must be inactive in this state. + * Set at the end of stream_stop() and when the stream is created. */ + RTP_STREAM_INTERNAL_STATE_STOPPED, + /* Temporary state that is set at the beginning of stream_stop(). + * If a full stop is possible, stream_stop() immediately moves on + * to the STOPPED state. However, if the timer is running (because it + * is still sending out buffered data), the state remains set to + * STOPPING until the timer has sent out all data, at which point + * the timer finishes the change to the STOPPED state. */ + RTP_STREAM_INTERNAL_STATE_STOPPING, + /* Temporary state that is set at the beginning of stream_start(). + * It is mainly used for preventing do_finish_stopping_state() + * from setting a stopped state. See do_finish_stopping_state() + * for details. */ + RTP_STREAM_INTERNAL_STATE_STARTING, + /* The state when the stream has been started. It is set at the + * end of stream_start(). */ + RTP_STREAM_INTERNAL_STATE_STARTED +}; + struct impl { struct spa_audio_info info; struct spa_audio_info stream_info; @@ -100,11 +125,19 @@ struct impl { unsigned direct_timestamp:1; unsigned always_process:1; - unsigned started:1; unsigned have_sync:1; unsigned receiving:1; unsigned first:1; + /* IMPORTANT: Do NOT access this value directly. Use the atomic + * set_internal_stream_state() / get_internal_stream_state() accessors, + * since the state is accessed by both the dataloop and mainloop. To + * prevent memory visibility issues, atomic accessors need to be used. + * + * Also, its type here is uint32_t. See the explanation about atomic + * access below for the reason why. */ + uint32_t internal_state; + struct pw_loop *main_loop; struct pw_loop *data_loop; struct spa_source *timer; @@ -118,6 +151,9 @@ struct impl { * requires filling the ring buffer with something other than nullbytes * (this can happen with DSD for example). */ void (*reset_ringbuffer)(struct impl *impl); + /* Called by stream_start() to stop any running timer before continuing to + * start the stream. This is necessary, because by that point, any remaining + * buffered data is stale, and the timer would keep sending it out. */ void (*stop_timer)(struct impl *impl); void (*flush_timeout)(struct impl *impl, uint64_t expirations); void (*deinit)(struct impl *impl, enum spa_direction direction); @@ -144,17 +180,83 @@ struct impl { uint32_t rtp_last_ts; }; -static int do_emit_state_changed(struct spa_loop *loop, bool async, uint32_t seq, const void *data, size_t size, void *user_data) -{ - struct impl *impl = user_data; - bool started = *((bool *)data); +/* Atomic internal_state accessors. + * + * These are necessary because internal_state may be accessed by both + * the dataloop (in the flush_timeout and do_finish_stopping_state()) + * and the mainloop (in stream_start() and stream_stop()). Even though + * stream_start() and stream_stop() may not necessarily run at the + * same time when the dataloop is active, there is still a potential + * memory visibility issue if the state is set in one loop but that + * change is not yet propagated to other CPU cores, causing the other + * loop (which runs in a separate thread) to still see the old state. + * + * Also, since GCC __atomic built-ins (which the SPA macros use) are + * designed to work with integral scalar or pointer type that is 1, + * 2, 4, or 8 bytes in length, impl->internal_state is of type uint33_t. + * This guarantee a correct size for the built-ins. The accessors take + * care of casting from/to rtp_stream_internal_state . The relevant + * GCC manual page for this is: + * https://gcc.gnu.org/onlinedocs/gcc/_005f_005fatomic-Builtins.html + */ - if (started) { - rtp_stream_emit_open_connection(impl, NULL); - } else { - rtp_stream_emit_close_connection(impl, NULL); +static inline enum rtp_stream_internal_state get_internal_stream_state(struct impl *impl) { + return (enum rtp_stream_internal_state)SPA_ATOMIC_LOAD(impl->internal_state); +} + +static inline void set_internal_stream_state(struct impl *impl, enum rtp_stream_internal_state state) { + SPA_ATOMIC_STORE(impl->internal_state, (uint32_t)state); +} + +static int do_finish_stopping_state(struct spa_loop *loop, bool async, uint32_t seq, const void *data, size_t size, void *user_data) +{ + int res = 0; + struct impl *impl = user_data; + enum rtp_stream_internal_state cur_state = get_internal_stream_state(impl); + + /* The checks here cover a corner case that can happen when the + * following conditions are met (in order): + * + * 1. Stream is stopped via stream_stop(), but the timer is still + * running, meaning that internal_state stays at STOPPING. + * 2. The timer manages to invoke do_finish_stopping_state() + * asynchronously, meaning that the invocation is queued. + * 3. Immediately afterwards, the state is started again via + * stream_start(). That call stops the timer, but does not + * undo the do_finish_stopping_state() invocation. + * The internal_state is set to STARTED. + * 4. The queued do_finish_stopping_state() invocation takes + * place, and it tries to set the internal_state to STOPPED. + * + * In such a case, the STARTED state would be set again to STOPPED, + * even though the stream has been started and is running. + * + * To fix this, check if the current internal state is STOPPING. + * This is the only case where setting the state to STOPPED makes + * sense, since that is why this do_finish_stopping_state() exists - + * to finish a stopping procedure that could not be finished in + * stream_stop() immediately. If the stream is restarted, then this + * delayed stop is no longer needed. Canceling the queued invocation + * is not possible (PipeWire has no cancellation API for this), + * so this approach needs to be used instead. */ + + switch (cur_state) { + case RTP_STREAM_INTERNAL_STATE_STOPPING: + pw_log_debug("setting \"stopped\" state after timer expired"); + break; + default: + pw_log_debug("\"stopped\" state change event emission was scheduled, " + "but the current state is not \"stopping\"; ignoring " + "scheduled request"); + return 0; } + rtp_stream_emit_close_connection(impl, &res); + if (res > 0) + pw_log_debug("closed connection"); + else if (res < 0) + pw_log_error("error while closing connection: %s", spa_strerror(res)); + return 0; } @@ -204,12 +306,17 @@ static void stream_destroy(void *d) static int stream_start(struct impl *impl) { int res; + enum rtp_stream_internal_state cur_state; - if (impl->started) + cur_state = get_internal_stream_state(impl); + + if (cur_state == RTP_STREAM_INTERNAL_STATE_STARTED) return 0; impl->first = true; + set_internal_stream_state(impl, RTP_STREAM_INTERNAL_STATE_STARTING); + /* Stop the timer now (if the timer is used). Any lingering timer * will try to send data that is stale at this point, especially * after the ring buffer contents get reset. Worse, the timer might @@ -218,6 +325,33 @@ static int stream_start(struct impl *impl) if (impl->stop_timer) impl->stop_timer(impl); + res = 0; + rtp_stream_emit_close_connection(impl, &res); + + /* A leftover connection only makes sense if the stream was in the + * STOPPING state prior to this stream_start() call, because then, + * the previous stream_stop() call could not finish stopping the + * stream, and had to leave the connection open so the timer can + * finish sending out packets. If stream_start() was called before + * the timer finished, then the stream is still in the STOPPING + * state, was thus not properly stopped, and the connection is still + * there. This is not an error, but a consequence of restarting the + * stream early enough. + * If however the state prior to this stream_start() call was + * anything other than STOPPING, then something is wrong. */ + if (res > 0) { + if (cur_state != RTP_STREAM_INTERNAL_STATE_STOPPING) { + pw_log_warn("there was already an open connection, " + "even though none was expected"); + } else { + pw_log_debug("closed leftover connection since a scheduled " + "\"stopped\" state change was cancelled " + "and we are still in the \"stopping\" state"); + } + } else if (res < 0) { + pw_log_error("error while closing leftover connection: %s", spa_strerror(res)); + } + impl->reset_ringbuffer(impl); res = 0; @@ -239,20 +373,42 @@ static int stream_start(struct impl *impl) pw_log_info("activated pw_filter for separate sender"); } - impl->started = true; + set_internal_stream_state(impl, RTP_STREAM_INTERNAL_STATE_STARTED); + pw_log_info("stream started"); return 0; } static int stream_stop(struct impl *impl) { - if (!impl->started) - return 0; + switch (get_internal_stream_state(impl)) { + case RTP_STREAM_INTERNAL_STATE_STOPPING: + case RTP_STREAM_INTERNAL_STATE_STOPPED: + return 0; + default: + break; + } - /* if timer is running, the state changed event must be emitted by the timer after all packets have been sent */ - if (!impl->timer_running) - rtp_stream_emit_close_connection(impl, NULL); + set_internal_stream_state(impl, RTP_STREAM_INTERNAL_STATE_STOPPING); + /* Proper stop is only possible if the timer is currently not running, + * because a stop involves closing the connection. If the timer is still + * running, it needs an open connection for sending out remaining packets. */ + if (!impl->timer_running) { + int res; + pw_log_info("closing connection as part of stopping the stream"); + rtp_stream_emit_close_connection(impl, &res); + if (res > 0) { + pw_log_debug("closed connection"); + } else if (res < 0) { + pw_log_error("error while closing connection: %s", spa_strerror(res)); + } + } else { + pw_log_info("cannot close connection yet - timer is still running"); + } + + /* Stopping the separate sender can be done even if the timer is still + * running because it has no dependency on said timer. */ if (impl->separate_sender) { struct spa_dict_item items[1]; items[0] = SPA_DICT_ITEM_INIT(PW_KEY_NODE_ALWAYS_PROCESS, "false"); @@ -263,7 +419,14 @@ static int stream_stop(struct impl *impl) pw_filter_set_active(impl->ptp_sender, false); } - impl->started = false; + /* Only switch to STOPPED if the stream could _actually_ be stopped, + * meaning that the timer was no longer running, and the connection + * could be closed. */ + if (!impl->timer_running) { + set_internal_stream_state(impl, RTP_STREAM_INTERNAL_STATE_STOPPED); + pw_log_info("stream stopped"); + } + return 0; } @@ -372,6 +535,7 @@ struct rtp_stream *rtp_stream_new(struct pw_core *core, goto out; } impl->first = true; + set_internal_stream_state(impl, RTP_STREAM_INTERNAL_STATE_STOPPED); spa_hook_list_init(&impl->listener_list); impl->direction = direction; impl->stream_events = stream_events;