diff --git a/src/modules/module-avb/stream.c b/src/modules/module-avb/stream.c index 58d4d496f..2687ef1b6 100644 --- a/src/modules/module-avb/stream.c +++ b/src/modules/module-avb/stream.c @@ -94,9 +94,10 @@ * TIMESTAMP_UNCERTAIN_IN TODO: tick when AVTPDU tu bit is set in the header * UNSUPPORTED_FORMAT live: handle_aaf_packet drops + ticks per PDU any AAF PDU * whose format != the Stream Input current format - * LATE_TIMESTAMP TODO: tick when p->timestamp < CLOCK_TAI now + * LATE_TIMESTAMP TODO: tick when p->timestamp < stream_gptp_now() (the PHC, + * NOT CLOCK_TAI — no phc2sys, so the system clock is not gPTP) * (frame missed its presentation deadline) - * EARLY_TIMESTAMP TODO: tick when p->timestamp > now + max_transit_time + * EARLY_TIMESTAMP TODO: tick when p->timestamp > stream_gptp_now() (PHC) + max_transit_time * (frame arrived too far ahead of its deadline) * Table 5.17 Stream Output: * FRAMES_TX live: per send in flush_write_milan_v12 / _legacy @@ -235,10 +236,7 @@ static void on_flush_tick(void *data, uint64_t expirations) (void)expirations; - /* Pace the send rate off CLOCK_MONOTONIC (a stable local 1x clock); use the gPTP - * clock only for the AVTP presentation timestamp. Pacing must not ride the absolute - * PHC interpolation, whose steps during gPTP re-convergence burst the talker past - * its SRP reservation and get the stream policed away by the bridge. */ + /* Pace the flush drain off CLOCK_MONOTONIC, the SAME clock as the graph-fill driver (the drive_timer runs on a CLOCK_MONOTONIC timerfd, which cannot be _RAW) so producer and consumer of the ring stay rate-matched; gPTP is used only for the AVTP timestamp. Pacing on _RAW here decouples drain from fill -> ring drift -> glitch noise (measured -55dB vs -99dB THD+N). The independent gPTP/PHC reference still uses CLOCK_MONOTONIC_RAW (gptp-clock.h). */ clock_gettime(CLOCK_MONOTONIC, &ts); now_mono = SPA_TIMESPEC_TO_NSEC(&ts); now_gptp = stream_gptp_now(server); @@ -267,9 +265,7 @@ static void on_flush_tick(void *data, uint64_t expirations) } } -/* Talker egress pacing runs on the RT data loop (impl->data_loop). A source - * cannot be added to or removed from a running loop off-thread, so the flush - * timer is created and destroyed ON the RT thread via pw_loop_invoke. */ +/* Talker egress pacing runs on the RT data loop (impl->data_loop); a source cannot be added/removed off-thread, so the flush timer is created and destroyed ON the RT thread via pw_loop_invoke. */ static int do_add_flush_timer(struct spa_loop *loop, bool async, uint32_t seq, const void *data, size_t size, void *user_data) { @@ -343,30 +339,28 @@ static void on_source_stream_process(void *data) } } - /* milan-avb: consume-side actuator. While AAF is the clock source and the - * adapter gave us a resampler, trim its ratio to hold the ring fill and feed - * the recovered media rate forward (play-loop.h). Off otherwise, so the - * default rate=1.0 path is untouched. */ - if (stream->mc_aaf_active && stream->io_rate_match != NULL) { + /* milan-avb: consume-side actuator, FOLLOWER path only; when avb.source DRIVES the graph at the recovered mc.rate there is no resampler on its output, so we deliver the ring samples 1:1 (bit-perfect) and must NOT trim a ratio. */ + if (!stream->driving && stream->mc_aaf_active && stream->io_rate_match != NULL) { uint32_t rate = stream->info.info.raw.rate; int32_t avail_samples = avail / (int32_t)stream->stride; uint32_t quantum = buf->requested ? (uint32_t)buf->requested : (stream->io_position ? stream->io_position->clock.duration : 1024); int32_t ring_samples = (int32_t)(stream->buffer_size / stream->stride); - /* Target ~½ quantum: that is where the ring sits on average, so it is - * reachable. A full quantum never is, so the error stays saturated and - * the DLL winds up (rate ramps without bound). */ + /* Target ~½ quantum: where the ring sits on average so it is reachable; a full quantum never is, so the error saturates and the DLL winds up. */ int32_t target = (int32_t)(quantum / 2); double max_error = 2.0 * rate / 1000.0; /* 2 ms, == module-rtp ERROR_MSEC */ double ff, error, r; const char *env_target = getenv("MILAN_AVB_PLAY_TARGET"); - if (env_target) + if (env_target) { target = atoi(env_target); - if (target < (int32_t)(rate / 1000)) /* >= ~1 ms underrun margin */ + } + if (target < (int32_t)(rate / 1000)) { /* >= ~1 ms underrun margin */ target = (int32_t)(rate / 1000); - if (target > ring_samples / 2) /* keep well inside the ring */ + } + if (target > ring_samples / 2) { /* keep well inside the ring */ target = ring_samples / 2; + } stream->play_target = target; ff = stream->mc.rate > 1.0 ? (double)rate / stream->mc.rate : 1.0; @@ -374,17 +368,16 @@ static void on_source_stream_process(void *data) r = play_loop_update(&stream->play, error, max_error, ff, quantum, rate); pw_stream_set_rate(stream->stream, r); } else if (stream->play.init) { - /* clock source switched away from AAF: release the resampler so the - * graph free-runs at nominal again, and re-prime for next engage. */ + /* clock source switched away from AAF: release the resampler so the graph free-runs at nominal again, and re-prime for next engage. */ pw_stream_set_rate(stream->stream, 1.0); play_loop_reset(&stream->play); } - /* milan-avb: ~1 Hz log of the local consume rate (Δticks/Δtai, mapped to TAI - * via a monotonic/TAI offset) next to mc.rate and the actuator state. */ + /* milan-avb: ~1 Hz log of the local consume rate (Δticks/Δtai, mapped to TAI) next to mc.rate and the actuator state. */ if (stream->mc_aaf_active || getenv("MILAN_AVB_PLAY_LOG")) { struct timespec ts_mono; uint64_t mono_ns; + /* CLOCK_MONOTONIC (NOT _RAW): mono_ns is offset against pwt.now below, which PipeWire reports in the CLOCK_MONOTONIC domain — they must match. */ clock_gettime(CLOCK_MONOTONIC, &ts_mono); mono_ns = SPA_TIMESPEC_TO_NSEC(&ts_mono); if (!stream->play_primed || @@ -392,8 +385,7 @@ static void on_source_stream_process(void *data) struct pw_time pwt; if (pw_stream_get_time_n(stream->stream, &pwt, sizeof(pwt)) == 0) { uint64_t tai_ns, consume_tai; - /* milan-avb: gPTP time from the PHC so the consume clock - * stays in the gPTP domain even with NTP on the system clock. */ + /* milan-avb: gPTP time from the PHC so the consume clock stays in the gPTP domain even with NTP on the system clock. */ tai_ns = stream_gptp_now(stream->server); consume_tai = (uint64_t)pwt.now + (tai_ns - mono_ns); if (stream->play_primed) { @@ -467,15 +459,19 @@ static void on_source_stream_io_changed(void *data, uint32_t id, case SPA_IO_Buffers: name = "Buffers"; break; default: name = "?"; break; } - /* milan-avb: logs whether the adapter gave us SPA_IO_RateMatch (the actuator - * knob) on this source. */ + /* milan-avb: logs whether the adapter gave us SPA_IO_RateMatch (the actuator knob) on this source. */ pw_log_info("milan-avb: io_changed id=%u (%s) area=%p size=%u", id, name, area, (unsigned)size); } +/* generic: arms the self-driving timer on STREAMING (defined below, used by both source and sink stream-event tables). */ +static void on_sink_stream_state_changed(void *data, enum pw_stream_state old, + enum pw_stream_state state, const char *error); + static const struct pw_stream_events source_stream_events = { PW_VERSION_STREAM_EVENTS, .destroy = on_stream_destroy, + .state_changed = on_sink_stream_state_changed, .io_changed = on_source_stream_io_changed, .process = on_source_stream_process }; @@ -494,22 +490,35 @@ static int flush_write_milan_v12(struct stream *stream, uint64_t current_time, i { int32_t avail; uint32_t index; - uint64_t ptime, txtime; + uint64_t ptime; int pdu_count; ssize_t n; struct avb_frame_header *h = (void*)stream->pdu; struct avb_packet_aaf *p = SPA_PTROFF(h, sizeof(*h), void); + uint64_t base; + int64_t err; avail = spa_ringbuffer_get_read_index(&stream->ring, &index); pdu_count = (avail / stream->stride) / stream->frames_per_pdu; - /* Pace to real time: only drain what is due this tick, so the ETF - * launch schedule cannot run ahead and overflow the qdisc backlog. */ - if (pdu_count > max_pdus) + /* Pace to real time: drain only what is due this tick so the ETF launch schedule cannot run ahead and overflow the qdisc backlog. */ + if (pdu_count > max_pdus) { pdu_count = max_pdus; + } - txtime = current_time + stream->t_uncertainty; - ptime = txtime + stream->mtt; + /* M2: monotonic AVTP timestamps anchored to the PHC; advance by pdu_period per PDU and slow-leak (err/1024) toward the live PHC so the rate reflects the real gPTP media clock without per-tick interpolation jitter (audible FM at the listener); re-anchor hard on a >1s gap (gPTP re-converge). */ + base = current_time + stream->t_uncertainty + stream->mtt; + if (stream->tx_pts == 0) { + stream->tx_pts = base; + } else { + err = (int64_t)(base - stream->tx_pts); + if (err > (int64_t)SPA_NSEC_PER_SEC || err < -(int64_t)SPA_NSEC_PER_SEC) { + stream->tx_pts = base; + } else { + stream->tx_pts += err / 1024; + } + } + ptime = stream->tx_pts; while (pdu_count--) { /* CBS-exclusive: no SCM_TXTIME; txtime feeds ptime only */ @@ -526,15 +535,17 @@ static int flush_write_milan_v12(struct stream *stream, uint64_t current_time, i n = avb_server_stream_send(stream->server, stream, &stream->msg, MSG_NOSIGNAL); - if (n < 0 || n != (ssize_t)stream->pdu_size) + if (n < 0 || n != (ssize_t)stream->pdu_size) { pw_log_error("stream send failed %zd != %zd: %m", n, stream->pdu_size); - else + } else { stream_out_counters(stream)->frame_tx++; - txtime += stream->pdu_period; + } ptime += stream->pdu_period; index += stream->payload_size; } + /* M2: keep the accumulator monotonic across ticks (advance by emitted PDUs). */ + stream->tx_pts = ptime; stream_out_mark_counters_dirty(stream); spa_ringbuffer_read_update(&stream->ring, index); @@ -716,17 +727,144 @@ static int setup_msg(struct stream *stream) stream->msg.msg_namelen = sizeof(stream->sock_addr); stream->msg.msg_iov = stream->iov; stream->msg.msg_iovlen = 3; - /* CBS/Qav-exclusive: no SCM_TXTIME control message -- CBS and SO_TXTIME - * cannot coexist; the egress CBS qdisc paces the stream. */ + /* CBS/Qav-exclusive: no SCM_TXTIME control message -- CBS and SO_TXTIME cannot coexist; the egress CBS qdisc paces the stream. */ stream->msg.msg_control = NULL; stream->msg.msg_controllen = 0; stream->cmsg = NULL; return 0; } +/* milan-avb: arm the self-driving one-shot timer at absolute time `when` (ns on CLOCK_MONOTONIC); when==0 disarms; runs on the RT data loop. */ +static void set_drive_timeout(struct stream *stream, uint64_t when) +{ + struct timespec ts; + struct timespec interval = { 0, 0 }; + + if (stream->drive_timer == NULL) { + return; + } + ts.tv_sec = (time_t)(when / SPA_NSEC_PER_SEC); + ts.tv_nsec = (long)(when % SPA_NSEC_PER_SEC); + pw_loop_update_timer(stream->server->impl->data_loop, + stream->drive_timer, &ts, &interval, true); +} + +/* milan-avb: graph driver tick (pipe-tunnel pattern); fires once per quantum, fills io_position->clock so the core schedules followers against our clock, re-arms the next tick, then triggers the cycle exactly once from the data loop (never re-entrantly from process()). */ +static void on_drive_timeout(void *data, uint64_t expirations) +{ + struct stream *stream = data; + struct spa_io_position *pos = stream->io_position; + struct timespec ts; + uint64_t duration = 1024, mono_now, nominal_ns; + uint32_t rate = 48000; + uint64_t phc_now; + uint64_t this_time; + double nom; + + (void)expirations; + if (!stream->driving) { + return; + } + + if (pos != NULL) { + if (pos->clock.target_duration != 0) { + duration = pos->clock.target_duration; + } + if (pos->clock.target_rate.denom != 0) { + rate = pos->clock.target_rate.denom; + } + } + + clock_gettime(CLOCK_MONOTONIC, &ts); + mono_now = SPA_TIMESPEC_TO_NSEC(&ts); + nominal_ns = duration * SPA_NSEC_PER_SEC / rate; + + /* LISTENER (avb.source): pace at the RECOVERED talker rate (mc.rate from mc_recover) so the ring drain rate == the AAF arrival rate and process() delivers samples 1:1 with no resampling (bit-perfect, sample-locked). */ + if (stream->direction == SPA_DIRECTION_INPUT && + stream->mc_aaf_active && stream->mc.rate > 1.0) { + nominal_ns = (uint64_t)((double)duration * (double)SPA_NSEC_PER_SEC + / stream->mc.rate); + } + + /* TALKER (sink): pace at the EXACT nominal rate so the exported clock has rate_diff==1.0 CONSTANT; a varying rate_diff makes pw-cat's adapter resample (FM baked into the wire), 1.0 gives adapter passthrough = bit-perfect, and the listener recovers the rate from timestamp arrival. */ + phc_now = stream_gptp_now(stream->server); + (void)phc_now; + stream->drive_phc_last = phc_now; + stream->drive_mono_last = mono_now; + + /* Export the SMOOTH scheduled time (not the jittery wake-up mono_now) so the follower resampler sees an evenly-paced clock; rate_diff=nom/nominal keeps nsec/next_nsec/duration/rate_diff self-consistent (pipe-tunnel sets corr, not 1.0). */ + this_time = stream->drive_next_time; + nom = (double)duration * (double)SPA_NSEC_PER_SEC / (double)rate; + stream->drive_next_time += nominal_ns; + if (pos != NULL) { + pos->clock.nsec = this_time; + pos->clock.rate = pos->clock.target_rate; + pos->clock.position += pos->clock.duration; + pos->clock.duration = pos->clock.target_duration; + pos->clock.delay = 0; + pos->clock.rate_diff = nominal_ns > 0 ? nom / (double)nominal_ns : 1.0; + pos->clock.next_nsec = stream->drive_next_time; + } + + set_drive_timeout(stream, stream->drive_next_time); + pw_stream_trigger_process(stream->stream); +} + +/* milan-avb: avb.sink/avb.source is created as a DRIVER; when it reaches STREAMING and the core elected it (pw_stream_is_driving), start the self-driving timer. */ +static void on_sink_stream_state_changed(void *data, enum pw_stream_state old, + enum pw_stream_state state, const char *error) +{ + struct stream *stream = data; + struct timespec ts; + + (void)old; (void)error; + switch (state) { + case PW_STREAM_STATE_STREAMING: + stream->driving = pw_stream_is_driving(stream->stream); + pw_log_info("milan-avb: avb.sink STREAMING driving=%d", stream->driving); + if (stream->driving) { + clock_gettime(CLOCK_MONOTONIC, &ts); + stream->drive_next_time = SPA_TIMESPEC_TO_NSEC(&ts); + stream->drive_phc_last = 0; + stream->drive_mono_last = 0; + stream->drive_ratio_ema = 0.0; + set_drive_timeout(stream, stream->drive_next_time); + } + break; + case PW_STREAM_STATE_PAUSED: + case PW_STREAM_STATE_ERROR: + case PW_STREAM_STATE_UNCONNECTED: + stream->driving = false; + set_drive_timeout(stream, 0); + break; + default: + break; + } +} + +/* milan-avb: capture the driver clock/position areas the core hands the driver node. */ +static void on_sink_stream_io_changed(void *data, uint32_t id, void *area, uint32_t size) +{ + struct stream *stream = data; + + (void)size; + switch (id) { + case SPA_IO_Position: + stream->io_position = area; + break; + case SPA_IO_RateMatch: + stream->io_rate_match = area; + break; + default: + break; + } +} + static const struct pw_stream_events sink_stream_events = { PW_VERSION_STREAM_EVENTS, .destroy = on_stream_destroy, + .state_changed = on_sink_stream_state_changed, + .io_changed = on_sink_stream_io_changed, .process = on_sink_stream_process }; @@ -746,9 +884,7 @@ struct stream *server_create_stream(struct server *server, struct stream *stream stream->prio = AVB_MSRP_PRIORITY_DEFAULT; stream->vlan_id = AVB_DEFAULT_VLAN; stream->mtt = 2000000; - /* TX timestamp jitter budget added on top of CLOCK_TAI now. 125 µs is - * the upper bound at 1 GbE class-A traffic per IEEE 802.1Qav; safe - * default until we have a way to measure it from gPTP. */ + /* TX timestamp jitter budget added on top of the gPTP (PHC) time; 125 µs is the upper bound at 1 GbE class-A per IEEE 802.1Qav, safe default until we measure it from gPTP. */ stream->t_uncertainty = 0; stream->id = (uint64_t)server->mac_addr[0] << 56 | @@ -771,7 +907,9 @@ struct stream *server_create_stream(struct server *server, struct stream *stream PW_KEY_MEDIA_CLASS, "Audio/Source", PW_KEY_NODE_NAME, "avb.source", PW_KEY_NODE_DESCRIPTION, "AVB Source", - PW_KEY_NODE_WANT_DRIVER, "true", + /* milan-avb: avb.source IS the listener's media clock; it drives the graph at the recovered talker rate (mc.rate) so consumers run sample-locked (no resampling, bit-perfect); NODE_DRIVER + high priority elects it over the fallback Dummy-Driver. */ + PW_KEY_NODE_DRIVER, "true", + PW_KEY_PRIORITY_DRIVER, "300000", NULL)); } else { stream->stream = pw_stream_new(server->impl->core, "sink", @@ -779,7 +917,9 @@ struct stream *server_create_stream(struct server *server, struct stream *stream PW_KEY_MEDIA_CLASS, "Audio/Sink", PW_KEY_NODE_NAME, "avb.sink", PW_KEY_NODE_DESCRIPTION, "AVB Sink", - PW_KEY_NODE_WANT_DRIVER, "true", + /* milan-avb: avb.sink IS the graph driver (self-clocked off the AVTP/PHC rate), not a follower; NODE_DRIVER + high PRIORITY_DRIVER elect it over the fallback Dummy-Driver (priority 200000) so pw-cat clocks to us. */ + PW_KEY_NODE_DRIVER, "true", + PW_KEY_PRIORITY_DRIVER, "300000", NULL)); } @@ -825,10 +965,21 @@ struct stream *server_create_stream(struct server *server, struct stream *stream PW_ID_ANY, PW_STREAM_FLAG_MAP_BUFFERS | PW_STREAM_FLAG_INACTIVE | - PW_STREAM_FLAG_RT_PROCESS, + PW_STREAM_FLAG_RT_PROCESS | + /* milan-avb: both directions drive the graph themselves (talker off its media clock, listener off the recovered AAF clock), staying INACTIVE until a Milan ACMP/MSRP connection activates them. */ + PW_STREAM_FLAG_DRIVER, params, n_params)) < 0) goto error_free_stream; + /* milan-avb: the self-driving timer lives on the RT data loop and is armed once the stream reaches STREAMING (state_changed); both directions drive (talker off its media clock, listener off the recovered AAF clock). */ + if (!stream->is_crf) { + stream->drive_timer = pw_loop_add_timer(server->impl->data_loop, + on_drive_timeout, stream); + if (stream->drive_timer == NULL) { + pw_log_warn("avb stream: no drive_timer; core will pick a driver"); + } + } + stream->frames_per_pdu = 6; stream->pdu_period = SPA_NSEC_PER_SEC * stream->frames_per_pdu / stream->info.info.raw.rate; @@ -862,15 +1013,7 @@ struct stream *server_create_stream(struct server *server, struct stream *stream goto error_free; } - /* Milan Section 5.3.8.8 / Section 5.4.2.10.1.1: a Listener observes foreign - * Talker Advertise PDUs matching the bound talker's stream_id. - * Create the registrar attribute now (stream_id is set later at - * BIND_RX, cleared at UNBIND_RX) and start its FSM without a - * join — we are an observer, not a declarant. Once a matching TA - * arrives from the wire, msrp.c populates attr.talker - * (accumulated_latency, dest_addr, vlan_id) and moves the - * registrar to IN. The Listener side reads those fields to - * answer GET_STREAM_INFO with real msrp_accumulated_latency. */ + /* Milan Section 5.3.8.8 / 5.4.2.10.1.1: a Listener observes foreign Talker Advertise PDUs matching the bound talker's stream_id; create the registrar attribute now (stream_id set later at BIND_RX, cleared at UNBIND_RX) and start its FSM without a join (observer, not declarant); once a matching TA arrives msrp.c populates attr.talker (accumulated_latency, dest_addr, vlan_id), moves the registrar to IN, and the Listener answers GET_STREAM_INFO with the real msrp_accumulated_latency. */ res = avb_msrp_attribute_new(server->msrp, &common->tastream_attr, AVB_MSRP_ATTRIBUTE_TYPE_TALKER_ADVERTISE); if (res) { @@ -891,25 +1034,19 @@ struct stream *server_create_stream(struct server *server, struct stream *stream goto error_free; } - /* Milan v1.2 Section 4.3.3.1: pre-create lstream_attr with our talker - * stream_id so foreign Listener declarations from peers are - * delivered to it via process_listener and observed through - * notify_listener (sets listener_observed on stream_output_state). */ + /* Milan v1.2 Section 4.3.3.1: pre-create lstream_attr with our talker stream_id so foreign Listener declarations from peers reach it via process_listener and are observed through notify_listener (sets listener_observed on stream_output_state). */ common->lstream_attr.attr.listener.stream_id = htobe64(stream->id); common->tastream_attr.attr.talker.vlan_id = htons(stream->vlan_id); - if (server->avb_mode == AVB_MODE_MILAN_V12) - /* Milan v1.2 Section 4.3.3.2 Table 4.4: MaxFrameSize is the AVTPDU - * (header + payload) ONLY, plus 1 byte to account for the PAAD - * sampling clock possibly running slightly fast. The Ethernet header - * and FCS are added separately by the bandwidth rule (F = MaxFrameSize - * + 22), so exclude our avb_frame_header (the L2 header) from pdu_size. */ + /* Milan v1.2 Section 4.3.3.2 Table 4.4: MaxFrameSize is the AVTPDU (header + payload) ONLY plus 1 byte for PAAD sampling-clock drift; the Ethernet header and FCS are added by the bandwidth rule (F = MaxFrameSize + 22), so exclude our avb_frame_header (the L2 header) from pdu_size. */ + if (server->avb_mode == AVB_MODE_MILAN_V12) { common->tastream_attr.attr.talker.tspec_max_frame_size = htons((uint16_t)(stream->pdu_size - sizeof(struct avb_frame_header) + 1)); - else + } else { common->tastream_attr.attr.talker.tspec_max_frame_size = htons((uint16_t)(32 + stream->frames_per_pdu * stream->stride)); + } common->tastream_attr.attr.talker.tspec_max_interval_frames = htons(AVB_MSRP_TSPEC_MAX_INTERVAL_FRAMES_DEFAULT); common->tastream_attr.attr.talker.priority = stream->prio; @@ -933,8 +1070,7 @@ void stream_destroy(struct stream *stream) struct stream_common *common = SPA_CONTAINER_OF(stream, struct stream_common, stream); uint64_t now; - /* milan-avb: de-register (MRP Leave) before freeing the attributes so a stop/restart - * or replug doesn't strand a stale reservation on the bridge (socket still open here). */ + /* milan-avb: de-register (MRP Leave) before freeing the attributes so a stop/restart or replug doesn't strand a stale reservation on the bridge (socket still open here). */ now = stream_gptp_now(stream->server); stream_deactivate(stream, now); @@ -952,6 +1088,12 @@ void stream_destroy(struct stream *stream) avb_mrp_attribute_destroy(common->tfstream_attr.mrp); } + if (stream->drive_timer != NULL) { + set_drive_timeout(stream, 0); + pw_loop_destroy_source(stream->server->impl->data_loop, stream->drive_timer); + stream->drive_timer = NULL; + } + if (stream->raw_dump_fp) { fclose(stream->raw_dump_fp); stream->raw_dump_fp = NULL; @@ -963,11 +1105,7 @@ static int setup_socket(struct stream *stream) return avb_server_stream_setup_socket(stream->server, stream); } -/* milan-avb: media-clock recovery ------------------------------------------- - * - * Returns the CLOCK_SOURCE descriptor currently selected by CLOCK_DOMAIN 0, - * or NULL. The selection is clock_source_index, set at boot (Internal = 0) - * and updated on the wire by SET_CLOCK_SOURCE (IEEE 1722.1 Section 7.4.23). */ +/* milan-avb: media-clock recovery; returns the CLOCK_SOURCE descriptor selected by CLOCK_DOMAIN 0 (or NULL); selection is clock_source_index, set at boot (Internal=0) and updated on the wire by SET_CLOCK_SOURCE (IEEE 1722.1 Section 7.4.23). */ static struct avb_aem_desc_clock_source *selected_clock_source(struct server *server) { struct descriptor *dom; @@ -986,9 +1124,7 @@ static struct avb_aem_desc_clock_source *selected_clock_source(struct server *se return descriptor_body(src); } -/* True when the CLOCK_DOMAIN selects an AAF (INPUT_STREAM) clock source whose - * location points at this listener stream. CRF (MEDIA_CLOCK_STREAM) is out of - * scope and returns false. */ +/* True when the CLOCK_DOMAIN selects an AAF (INPUT_STREAM) clock source whose location points at this listener stream; CRF (MEDIA_CLOCK_STREAM) is out of scope and returns false. */ static bool stream_mc_aaf_selected(struct stream *stream) { struct avb_aem_desc_clock_source *cs; @@ -1029,43 +1165,36 @@ void avb_stream_update_clock_source(struct server *server) } } -/* Recover the talker media rate from a PDU's avtp_timestamp. The timestamps - * carry the talker media clock in gPTP time; their inter-PDU deltas give its - * rate. A second-order DLL (spa_dll) tracks phase+frequency. Observe-only: - * drives mc_rate; consumption retiming is the next step. */ +/* Recover the talker media rate from a PDU's avtp_timestamp (which carries the talker media clock in gPTP time); inter-PDU deltas give the rate, a second-order DLL (spa_dll) tracks phase+frequency, drives mc_rate. */ static void stream_mc_recover(struct stream *stream, const struct avb_packet_aaf *p) { uint32_t avtp_ts; double rate; - if (!stream->mc_aaf_active || !p->tv) + if (!stream->mc_aaf_active || !p->tv) { return; + } avtp_ts = ntohl(p->timestamp); rate = mc_recover_update(&stream->mc, avtp_ts, stream->frames_per_pdu, stream->info.info.raw.rate, stream->pdu_period); - if (stream->mc.pdus < 40 || (stream->mc.pdus % 8000) == 1) + if (stream->mc.pdus < 40 || (stream->mc.pdus % 8000) == 1) { pw_log_info("milan-avb: mc-recovery stream=%u pdus=%llu avtp_ts=%u model_lo=%u nom=%u pdu_ns=%lld rate=%.4f corr=%.8f err_ns=%d ppm=%.3f", stream->index, (unsigned long long)stream->mc.pdus, avtp_ts, (uint32_t)stream->mc.model_ns, (unsigned)stream->info.info.raw.rate, (long long)stream->pdu_period, rate, stream->mc.corr, stream->mc.last_err_ns, (stream->mc.corr - 1.0) * 1e6); + } } -/* Milan 5.4.5.3 STREAM_INTERRUPTED: playback is interrupted by the loss of - * "several" AVTPDUs (the spec leaves the count implementation-defined). A - * single dropped/reordered PDU is a SEQ_NUM_MISMATCH but not a full - * interruption; a gap of this many or more missing PDUs is. */ +/* Milan 5.4.5.3 STREAM_INTERRUPTED: playback interrupted by loss of "several" AVTPDUs (count implementation-defined); a single dropped/reordered PDU is a SEQ_NUM_MISMATCH, a gap of this many or more is an interruption. */ #define AVB_STREAM_INTERRUPT_MIN_LOST 2 -/* PDUs after a (re)lock during which a sequence step is absorbed (re-seeded) and - * NOT counted as SEQ_NUM_MISMATCH — covers the one-time bind/SRP-path-open gap of - * a Listener that joins mid-stream. Small, so genuine ongoing loss still counts. */ +/* PDUs after a (re)lock during which a sequence step is absorbed (re-seeded) and NOT counted as SEQ_NUM_MISMATCH — covers the one-time bind/SRP-path-open gap of a mid-stream join; small, so genuine ongoing loss still counts. */ #define AVB_STREAM_SEQ_SETTLE 8 -/* Milan v1.2 Section 5.4: a received AAF AVTPDU matches the current format when - * subtype, format, nsr, bit depth, channels and sparse all match. */ +/* Milan v1.2 Section 5.4: a received AAF AVTPDU matches the current format when subtype, format, nsr, bit depth, channels and sparse all match. */ static inline bool aaf_pdu_format_matches(const struct avb_packet_aaf *p, const struct avb_aem_stream_format_info *fi) { @@ -1077,8 +1206,7 @@ static inline bool aaf_pdu_format_matches(const struct avb_packet_aaf *p, p->sp == fi->sparse; } -/* Read the current format from the Stream Input descriptor. SET_STREAM_FORMAT - * updates it there, so this is always the current one. */ +/* Read the current format from the Stream Input descriptor; SET_STREAM_FORMAT updates it there, so this is always the current one. */ static void stream_in_current_format(struct stream *stream, struct avb_aem_stream_format_info *out) { @@ -1104,8 +1232,7 @@ static void handle_aaf_packet(struct stream *stream, filled = spa_ringbuffer_get_write_index(&stream->ring, &index); n_bytes = ntohs(p->data_len); - /* IEEE 1722.1-2021 Table 7-156: per-PDU, bump UNSUPPORTED_FORMAT on any AVTPDU - * whose format != the Stream Input current format (from descriptor), or malformed. */ + /* IEEE 1722.1-2021 Table 7-156: per-PDU, bump UNSUPPORTED_FORMAT on any AVTPDU whose format != the Stream Input current format (from descriptor), or malformed. */ stream_in_current_format(stream, &cur); if (n_bytes > (uint32_t)(len - (int)sizeof(*p)) || !aaf_pdu_format_matches(p, &cur)) { cnt->unsupported_format++; @@ -1113,9 +1240,7 @@ static void handle_aaf_packet(struct stream *stream, return; } - /* IEEE 1722.1 Section 7.4.42 / Milan Section 5.4.5.3: FRAMES_RX counts every - * valid AVTPDU received on the wire — independent of whether the listener - * pipeline could absorb it. */ + /* IEEE 1722.1 Section 7.4.42 / Milan Section 5.4.5.3: FRAMES_RX counts every valid AVTPDU received on the wire, independent of whether the listener pipeline could absorb it. */ cnt->frame_rx++; clock_gettime(CLOCK_MONOTONIC, &now_ts); @@ -1127,27 +1252,24 @@ static void handle_aaf_packet(struct stream *stream, stream->prev_seq = p->seq_num; /* (re)lock: seed seq, no gap */ si->seq_settle = AVB_STREAM_SEQ_SETTLE; /* grace the bind/path-open step */ } else if (si->seq_settle > 0) { - /* settling just after a (re)lock: a Listener that binds mid-stream - * behind an SRP bridge gets a one-time sequence step as the bridge - * opens forwarding — re-seed and don't count it. */ + /* settling just after a (re)lock: a Listener that binds mid-stream behind an SRP bridge gets a one-time sequence step as the bridge opens forwarding — re-seed and don't count it. */ si->seq_settle--; stream->prev_seq = p->seq_num; } else { uint8_t expected = (uint8_t)(stream->prev_seq + 1); if (p->seq_num != expected) { - /* IEEE 1722.1 7.4: SEQ_NUM_MISMATCH on any sequence - * discontinuity (loss, reorder or duplicate). */ + /* IEEE 1722.1 7.4: SEQ_NUM_MISMATCH on any sequence discontinuity (loss, reorder or duplicate). */ uint8_t lost = (uint8_t)(p->seq_num - expected); cnt->seq_mistmatch++; /* STREAM_INTERRUPTED only when several PDUs are missing. */ - if (lost >= AVB_STREAM_INTERRUPT_MIN_LOST) + if (lost >= AVB_STREAM_INTERRUPT_MIN_LOST) { cnt->stream_interrupted++; + } } stream->prev_seq = p->seq_num; } - /* milan-avb: AAF media-clock recovery (active only when selected via the - * CLOCK_DOMAIN). Recovers the talker media rate from avtp_timestamps. */ + /* milan-avb: AAF media-clock recovery (active only when selected via the CLOCK_DOMAIN); recovers the talker media rate from avtp_timestamps. */ stream_mc_recover(stream, p); /* milan-avb: latency observability (throttled to 1 Hz, env-gated). */ @@ -1277,8 +1399,7 @@ static void handle_iec61883_packet(struct stream *stream, } } -/* TODO: RX is on the main loop, not the RT data_loop — preemption can drop PDUs - * (SEQ_NUM_MISMATCH). Move it to data_loop + a big SO_RCVBUF, like the flush_timer. */ +/* TODO: RX is on the main loop, not the RT data_loop — preemption can drop PDUs (SEQ_NUM_MISMATCH); move it to data_loop + a big SO_RCVBUF, like the flush_timer. */ static void on_socket_data(void *data, int fd, uint32_t mask) { struct stream *stream = data; @@ -1316,8 +1437,7 @@ static void on_socket_data(void *data, int fd, uint32_t mask) len - (int)sizeof(*h)); break; case AVB_SUBTYPE_CRF: - /* CRF clock-reference stream: no audio data plane. - * Consume and ignore (clock recovery is future work). */ + /* CRF clock-reference stream: no audio data plane; consume and ignore (clock recovery is future work). */ break; default: pw_log_warn("unsupported subtype 0x%02x", ph->subtype); @@ -1327,12 +1447,7 @@ static void on_socket_data(void *data, int fd, uint32_t mask) } } -/* Milan v1.2 Table 5.6: a Stream Input resets its diagnostic counters on the - * not-bound -> bound transition (and NOT on bound -> not-bound). Also re-arms - * the media-lock / seq-settle state: the unlock edge is detected only inside the - * GET_COUNTERS poll (100 ms silence), so a fast unbind/rebind can leave - * media_locked_state == true and miscount the bridge-open step as - * SEQ_NUM_MISMATCH / STREAM_INTERRUPTED. Called from stream_activate(). */ +/* Milan v1.2 Table 5.6: a Stream Input resets its diagnostic counters on the not-bound -> bound transition (NOT the reverse); also re-arms the media-lock / seq-settle state, since the unlock edge is detected only in the GET_COUNTERS poll (100 ms silence) so a fast unbind/rebind could leave media_locked_state==true and miscount the bridge-open step as SEQ_NUM_MISMATCH / STREAM_INTERRUPTED. Called from stream_activate(). */ static void stream_input_reset_counters(struct aecp_aem_stream_input_state *si) { si->counters.media_locked = 0; @@ -1359,11 +1474,7 @@ int stream_activate(struct stream *stream, uint16_t index, uint64_t now) struct stream_common *common; common = SPA_CONTAINER_OF(stream, struct stream_common, stream); - /* milan-avb: SR-class priority + VLAN id come from the MSRP Domain, not a - * hardcoded default. process_domain() re-adjusts the AVB_INTERFACE domain - * to the network-declared sr_class_priority/sr_class_vid, so this is the - * authoritative source. Read it before setup_socket() — the listener uses - * stream->vlan_id to select its VLAN sub-iface. */ + /* milan-avb: SR-class priority + VLAN id come from the MSRP Domain (the authoritative network-declared values), not a hardcoded default; read before setup_socket() since the listener uses stream->vlan_id to select its VLAN sub-iface. */ { struct descriptor *avbif = server_find_descriptor(server, AVB_AEM_DESC_AVB_INTERFACE, 0); @@ -1395,26 +1506,25 @@ int stream_activate(struct stream *stream, uint16_t index, uint64_t now) struct aecp_aem_stream_input_state *input_stream; input_stream = SPA_CONTAINER_OF(common, struct aecp_aem_stream_input_state, common); - /* Milan v1.2 Table 5.6: reset diagnostic counters + re-arm the - * media-lock / seq-settle state on the not-bound -> bound transition. */ + /* Milan v1.2 Table 5.6: reset diagnostic counters + re-arm the media-lock / seq-settle state on the not-bound -> bound transition. */ stream_input_reset_counters(input_stream); /* Prime ring with one PipeWire quantum of silence (Milan v1.2 Section 5.4.5.3). */ spa_ringbuffer_init(&stream->ring); if (stream->frames_per_pdu > 0) { uint32_t prefill_pdus = 1024u / stream->frames_per_pdu; - if (prefill_pdus > 0) + if (prefill_pdus > 0) { pad_ringbuffer_with_silence(stream, (int)prefill_pdus); + } } - /* milan-avb: pick up the current media-clock selection for this input - * (AAF recovery vs internal/gPTP); re-prime the DLL on a fresh bind. */ + /* milan-avb: pick up the current media-clock selection for this input (AAF recovery vs internal/gPTP); re-prime the DLL on a fresh bind. */ stream->mc_aaf_active = stream_mc_aaf_selected(stream); - if (stream->mc_aaf_active) + if (stream->mc_aaf_active) { stream_mc_reset(stream); + } - /* milan-avb: publish our contribution to graph latency so wpctl/pw-cli - * report it. Latency is the prefill: one PipeWire quantum at 48 kHz. */ + /* milan-avb: publish our contribution to graph latency (the prefill: one PipeWire quantum at 48 kHz) so wpctl/pw-cli report it. */ { struct spa_latency_info latency = SPA_LATENCY_INFO(SPA_DIRECTION_OUTPUT); uint32_t rate = stream->info.info.raw.rate ? stream->info.info.raw.rate : 48000; @@ -1444,9 +1554,7 @@ int stream_activate(struct stream *stream, uint16_t index, uint64_t now) pw_properties_free(props); } - /* Milan v1.2 Section 4.3.3.1: Listener_Ready iff Talker Advertise registrar IN. - * Compute from current state so a reconnect picks up an already-IN TA - * registrar (no NEW/JOIN event fires when the registrar didn't transition). */ + /* Milan v1.2 Section 4.3.3.1: Listener_Ready iff Talker Advertise registrar IN; compute from current state so a reconnect picks up an already-IN TA registrar (no NEW/JOIN event fires when the registrar didn't transition). */ common->lstream_attr.param = (common->tastream_attr.mrp != NULL && avb_mrp_attribute_get_registrar_state(common->tastream_attr.mrp) == AVB_MRP_IN) @@ -1459,8 +1567,12 @@ int stream_activate(struct stream *stream, uint16_t index, uint64_t now) avb_mrp_attribute_begin(input_stream->mvrp_attr.mrp, now); avb_mrp_attribute_join(input_stream->mvrp_attr.mrp, now, true); } else { - if ((res = avb_maap_get_address(server->maap, stream->addr, index)) < 0) + if ((res = avb_maap_get_address(server->maap, stream->addr, index)) < 0) { return res; + } + + /* M2: re-anchor the presentation-timestamp accumulator on connect. */ + stream->tx_pts = 0; common->tastream_attr.attr.talker.stream_id = htobe64(stream->id); memcpy(common->tastream_attr.attr.talker.dest_addr, stream->addr, 6); @@ -1478,8 +1590,7 @@ int stream_activate(struct stream *stream, uint16_t index, uint64_t now) pw_stream_set_active(stream->stream, true); - /* Milan Table 5.17: STREAM_START counter ticks each time the stream - * transitions from stopped → started. */ + /* Milan Table 5.17: STREAM_START counter ticks each time the stream transitions from stopped → started. */ if (stream->direction == SPA_DIRECTION_OUTPUT) { stream_out_counters(stream)->stream_start++; stream_out_mark_counters_dirty(stream); @@ -1509,10 +1620,7 @@ int stream_deactivate(struct stream *stream, uint64_t now) pw_loop_invoke(stream->server->impl->data_loop, do_remove_flush_timer, 0, NULL, 0, true, stream); } - /* milan-avb: withdraw ALL of this stream's declarations so the bridge frees the - * reservation immediately (Leave) instead of holding stale state until its - * LeaveAll timer — otherwise a stop/restart or replug to another port can't - * re-register (the old port's Talker/Listener/VLAN entry still pins the stream). */ + /* milan-avb: withdraw ALL of this stream's declarations so the bridge frees the reservation immediately (Leave) instead of holding stale state until its LeaveAll timer — otherwise a stop/restart or replug to another port can't re-register (the old port's Talker/Listener/VLAN entry still pins the stream). */ if (stream->direction == SPA_DIRECTION_INPUT) { si = SPA_CONTAINER_OF(common, struct aecp_aem_stream_input_state, common); avb_mrp_attribute_leave(common->lstream_attr.mrp, now); @@ -1524,8 +1632,7 @@ int stream_deactivate(struct stream *stream, uint64_t now) avb_mrp_attribute_leave(common->tastream_attr.mrp, now); } - /* Milan Table 5.17: STREAM_STOP counter ticks each transition the - * other way. */ + /* Milan Table 5.17: STREAM_STOP counter ticks each transition the other way. */ if (stream->direction == SPA_DIRECTION_OUTPUT) { stream_out_counters(stream)->stream_stop++; stream_out_mark_counters_dirty(stream); diff --git a/src/modules/module-avb/stream.h b/src/modules/module-avb/stream.h index a8d2bf2eb..ce34b82cc 100644 --- a/src/modules/module-avb/stream.h +++ b/src/modules/module-avb/stream.h @@ -19,7 +19,8 @@ #include -#define BUFFER_SIZE (1u<<16) +/* milan-avb: the talker ring must hold SEVERAL graph quanta so the burst-fill (one quantum/cycle) and the 125us flush-drain decouple; at 1<<16 the ring equalled ONE quantum so each cycle overwrote unread samples (~40 glitches/s). 1<<18 = 4 quanta. */ +#define BUFFER_SIZE (1u<<18) #define BUFFER_MASK (BUFFER_SIZE-1) struct stream { @@ -41,6 +42,17 @@ struct stream { struct spa_source *source; struct spa_source *flush_timer; uint64_t flush_last_ns; + + /* milan-avb: self-driving timer (the node IS the graph DRIVER); a data-loop one-shot fills io_position->clock, re-arms to drive_next_time, and triggers one graph cycle (pipe-tunnel pattern) so the graph fill and AVTP egress share one clock (no follower-resampler FM). */ + struct spa_source *drive_timer; + uint64_t drive_next_time; + bool driving; + uint64_t drive_phc_last; + uint64_t drive_mono_last; + double drive_ratio_ema; /* M2: EMA of PHC/MONOTONIC rate ratio */ + + /* M2: monotonic AVTP presentation-timestamp accumulator (PHC domain); advances by exactly pdu_period per PDU, slow-leaked toward the live PHC, so the listener's recovered media clock stays jitter-free (per-tick PHC reads inject interpolation jitter, audible FM). */ + uint64_t tx_pts; bool is_crf; uint64_t next_txtime; int prio; @@ -72,15 +84,11 @@ struct stream { uint32_t stride; struct spa_audio_info info; - /* milan-avb: AAF media-clock recovery (listener / STREAM_INPUT only). - * Active only while the CLOCK_DOMAIN selects the AAF (INPUT_STREAM) - * clock source whose location points at this stream. Estimator state in - * struct mc_recover (mc-recover.h); recovered from avtp_timestamp deltas. */ + /* milan-avb: AAF media-clock recovery (listener / STREAM_INPUT only); active only while the CLOCK_DOMAIN selects the AAF (INPUT_STREAM) source pointing at this stream; recovered from avtp_timestamp deltas (mc-recover.h). */ bool mc_aaf_active; struct mc_recover mc; - /* milan-avb: actuator I/O areas (set via .io_changed). io_rate_match is the - * resampler knob — NULL unless the adapter inserted a resampler. */ + /* milan-avb: actuator I/O areas (set via .io_changed); io_rate_match is the resampler knob, NULL unless the adapter inserted a resampler. */ struct spa_io_rate_match *io_rate_match; struct spa_io_position *io_position; @@ -112,9 +120,7 @@ int stream_activate(struct stream *stream, uint16_t index, uint64_t now); int stream_deactivate(struct stream *stream, uint64_t now); int stream_activate_virtual(struct stream *stream, uint16_t index); -/* milan-avb: re-evaluate each input stream's media-clock recovery against the - * current CLOCK_DOMAIN selection. Call after SET_CLOCK_SOURCE for on-the-fly - * clock-source switching. */ +/* milan-avb: re-evaluate each input stream's media-clock recovery against the current CLOCK_DOMAIN selection; call after SET_CLOCK_SOURCE for on-the-fly clock-source switching. */ void avb_stream_update_clock_source(struct server *server); #endif /* AVB_STREAM_H */