From 94d3b146408893002227f0a788789ca972eb2982 Mon Sep 17 00:00:00 2001 From: Arseniy136 Date: Wed, 31 Jan 2024 00:41:51 +0100 Subject: [PATCH] gh-657 Implement warmup phase --- src/internal_modules/roc_audio/watchdog.cpp | 39 ++++++++++++----- src/internal_modules/roc_audio/watchdog.h | 14 +++++++ src/tests/roc_audio/test_watchdog.cpp | 28 ++++++++++++- .../roc_pipeline/test_receiver_source.cpp | 42 +++++++++++++++++-- 4 files changed, 108 insertions(+), 15 deletions(-) diff --git a/src/internal_modules/roc_audio/watchdog.cpp b/src/internal_modules/roc_audio/watchdog.cpp index 5267eddd8..2f58c4d49 100644 --- a/src/internal_modules/roc_audio/watchdog.cpp +++ b/src/internal_modules/roc_audio/watchdog.cpp @@ -29,6 +29,10 @@ void WatchdogConfig::deduce_defaults(core::nanoseconds_t target_latency) { choppy_playback_window = std::min(300 * core::Millisecond, choppy_playback_timeout / 4); } + + if(warmup_duration < 0){ + warmup_duration = target_latency; + } } Watchdog::Watchdog(IFrameReader& reader, @@ -46,22 +50,26 @@ Watchdog::Watchdog(IFrameReader& reader, , drop_detection_window_( (packet::stream_timestamp_t)sample_spec.ns_2_stream_timestamp_delta( config.choppy_playback_window)) + , warmup_ending_pos_( + (packet::stream_timestamp_t)sample_spec.ns_2_stream_timestamp_delta( + config.warmup_duration)) , curr_read_pos_(0) - , last_pos_before_blank_(0) + , last_pos_before_blank_(warmup_ending_pos_) , last_pos_before_drops_(0) , curr_window_flags_(0) , status_(arena) , status_pos_(0) , status_show_(false) + , warmup_status_(bool(warmup_ending_pos_)) , alive_(true) , valid_(false) { if (config.no_playback_timeout < 0 || config.choppy_playback_timeout < 0 - || config.choppy_playback_window < 0) { + || config.choppy_playback_window < 0 || config.warmup_duration < 0) { roc_log(LogError, "watchdog: invalid config:" - " no_packets_timeout=%ld drops_timeout=%ld drop_detection_window=%ld", + " no_packets_timeout=%ld drops_timeout=%ld drop_detection_window=%ld warmup_ending_pos=%ld", (long)config.no_playback_timeout, (long)config.choppy_playback_timeout, - (long)config.choppy_playback_window); + (long)config.choppy_playback_window, (long)config.warmup_duration); return; } @@ -70,9 +78,10 @@ Watchdog::Watchdog(IFrameReader& reader, roc_log(LogError, "watchdog: invalid config:" " drop_detection_window should be in range (0; max_drops_duration]:" - " max_drops_duration=%lu drop_detection_window=%lu", + " max_drops_duration=%lu drop_detection_window=%lu warmup_ending_pos=%lu", (unsigned long)max_drops_duration_, - (unsigned long)drop_detection_window_); + (unsigned long)drop_detection_window_, + (unsigned long)warmup_ending_pos_); return; } } @@ -85,9 +94,9 @@ Watchdog::Watchdog(IFrameReader& reader, roc_log(LogDebug, "watchdog: initializing:" - " max_blank_duration=%lu max_drops_duration=%lu drop_detection_window=%lu", + " max_blank_duration=%lu max_drops_duration=%lu drop_detection_window=%lu warmup_ending_pos=%lu", (unsigned long)max_blank_duration_, (unsigned long)max_drops_duration_, - (unsigned long)drop_detection_window_); + (unsigned long)drop_detection_window_, (unsigned long)warmup_ending_pos_); valid_ = true; } @@ -135,6 +144,8 @@ bool Watchdog::read(Frame& frame) { alive_ = false; } + update_warmup_status_(); + return true; } @@ -146,11 +157,12 @@ void Watchdog::update_blank_timeout_(const Frame& frame, if (frame.flags() & Frame::FlagNonblank) { last_pos_before_blank_ = next_read_pos; + warmup_status_ = false; } } bool Watchdog::check_blank_timeout_() const { - if (max_blank_duration_ == 0) { + if (max_blank_duration_ == 0 || warmup_status_) { return true; } @@ -160,9 +172,9 @@ bool Watchdog::check_blank_timeout_() const { roc_log(LogDebug, "watchdog: blank timeout reached: every frame was blank during timeout:" - " curr_read_pos=%lu last_pos_before_blank=%lu max_blank_duration=%lu", + " curr_read_pos=%lu last_pos_before_blank=%lu max_blank_duration=%lu warmup_ending_pos=%lu", (unsigned long)curr_read_pos_, (unsigned long)last_pos_before_blank_, - (unsigned long)max_blank_duration_); + (unsigned long)max_blank_duration_, (unsigned long)warmup_ending_pos_); return false; } @@ -195,6 +207,11 @@ void Watchdog::update_drops_timeout_(const Frame& frame, } } +void Watchdog::update_warmup_status_(){ + warmup_status_ = warmup_status_ && (curr_read_pos_ < warmup_ending_pos_); +} + + bool Watchdog::check_drops_timeout_() { if (max_drops_duration_ == 0) { return true; diff --git a/src/internal_modules/roc_audio/watchdog.h b/src/internal_modules/roc_audio/watchdog.h index 6098fab64..d0326a17c 100644 --- a/src/internal_modules/roc_audio/watchdog.h +++ b/src/internal_modules/roc_audio/watchdog.h @@ -48,6 +48,15 @@ struct WatchdogConfig { //! choppy_playback_timeout core::nanoseconds_t choppy_playback_window; + //! Duration of the warmup phase in the beginning, nanoseconds + //! @remarks + //! During the warmup phase blank_timeout is not triggered. After this period last + //! position before blank frames is set to the current position. Warmup can also + //! be terminated in case a non-blank frame occurs during it. This mechanism allows + //! watchdog to work with latency longer than no_playback_timeout. Usually is equal + //! to target_latency. + core::nanoseconds_t warmup_duration; + //! Frame status window size for logging, number of frames. //! @remarks //! Used for debug logging. Set to zero to disable. @@ -58,6 +67,7 @@ struct WatchdogConfig { : no_playback_timeout(-1) , choppy_playback_timeout(-1) , choppy_playback_window(-1) + , warmup_duration(-1) , frame_status_window(20) { } @@ -98,6 +108,8 @@ class Watchdog : public IFrameReader, public core::NonCopyable<> { void update_drops_timeout_(const Frame& frame, packet::stream_timestamp_t next_read_pos); + void update_warmup_status_(); + bool check_drops_timeout_(); void update_status_(const Frame& frame); @@ -110,6 +122,7 @@ class Watchdog : public IFrameReader, public core::NonCopyable<> { const packet::stream_timestamp_t max_blank_duration_; const packet::stream_timestamp_t max_drops_duration_; const packet::stream_timestamp_t drop_detection_window_; + const packet::stream_timestamp_t warmup_ending_pos_; packet::stream_timestamp_t curr_read_pos_; packet::stream_timestamp_t last_pos_before_blank_; @@ -120,6 +133,7 @@ class Watchdog : public IFrameReader, public core::NonCopyable<> { core::Array status_; size_t status_pos_; bool status_show_; + bool warmup_status_; bool alive_; bool valid_; diff --git a/src/tests/roc_audio/test_watchdog.cpp b/src/tests/roc_audio/test_watchdog.cpp index 3637da69b..c21367e11 100644 --- a/src/tests/roc_audio/test_watchdog.cpp +++ b/src/tests/roc_audio/test_watchdog.cpp @@ -74,12 +74,14 @@ TEST_GROUP(watchdog) { } WatchdogConfig make_config(packet::stream_timestamp_t no_playback_timeout, - packet::stream_timestamp_t broken_playback_timeout) { + packet::stream_timestamp_t broken_playback_timeout, + packet::stream_timestamp_t Latency = 0) { WatchdogConfig config; config.no_playback_timeout = no_playback_timeout * core::Second / SampleRate; config.choppy_playback_timeout = broken_playback_timeout * core::Second / SampleRate; config.choppy_playback_window = BreakageWindow * core::Second / SampleRate; + config.warmup_duration = Latency * core::Second / SampleRate; return config; } @@ -435,5 +437,29 @@ TEST(watchdog, broken_playback_timeout_disabled) { } } +// Checks that watchdog works correctly with latency longer than no_playback_timeout +TEST(watchdog, latency_longer_then_no_playback_timeout) { + enum {Latency = NoPlaybackTimeout * 10}; + WatchdogConfig config = make_config(NoPlaybackTimeout, BrokenPlaybackTimeout, Latency); + Watchdog watchdog(test_reader, SampleSpecs, config, arena); + CHECK(watchdog.is_valid()); + + for (packet::stream_timestamp_t n = 0; n < Latency / SamplesPerFrame; n++) { + CHECK(watchdog.is_alive()); + check_read(watchdog, true, SamplesPerFrame, 0); + } + + CHECK(watchdog.is_alive()); + + for (packet::stream_timestamp_t n = 0; n < NoPlaybackTimeout / SamplesPerFrame - 1; n++) { + check_read(watchdog, true, SamplesPerFrame, 0); + CHECK(watchdog.is_alive()); + } + + check_read(watchdog, true, SamplesPerFrame, 0); + CHECK(!watchdog.is_alive()); + +} + } // namespace audio } // namespace roc diff --git a/src/tests/roc_pipeline/test_receiver_source.cpp b/src/tests/roc_pipeline/test_receiver_source.cpp index 6706bec04..0b401f1dd 100644 --- a/src/tests/roc_pipeline/test_receiver_source.cpp +++ b/src/tests/roc_pipeline/test_receiver_source.cpp @@ -119,7 +119,7 @@ TEST_GROUP(receiver_source) { address::Protocol proto1; address::Protocol proto2; - ReceiverConfig make_config() { + ReceiverConfig make_config(int latency = Latency) { ReceiverConfig config; config.common.output_sample_spec = output_sample_spec; @@ -130,7 +130,7 @@ TEST_GROUP(receiver_source) { config.default_session.latency.tuner_backend = audio::LatencyTunerBackend_Niq; config.default_session.latency.tuner_profile = audio::LatencyTunerProfile_Intact; config.default_session.latency.target_latency = - Latency * core::Second / (int)output_sample_spec.sample_rate(); + latency * core::Second / (int)output_sample_spec.sample_rate(); config.default_session.latency.latency_tolerance = Timeout * 10 * core::Second / (int)output_sample_spec.sample_rate(); @@ -341,7 +341,7 @@ TEST(receiver_source, initial_latency_timeout) { packet_writer.write_packets(1, SamplesPerPacket, packet_sample_spec); - for (size_t np = 0; np < Timeout / SamplesPerPacket; np++) { + for (size_t np = 0; np < Timeout / SamplesPerPacket + Latency / SamplesPerPacket; np++) { for (size_t nf = 0; nf < FramesPerPacket; nf++) { receiver.refresh(frame_reader.refresh_ts()); frame_reader.read_zero_samples(SamplesPerFrame, output_sample_spec); @@ -2265,5 +2265,41 @@ TEST(receiver_source, state) { } } +// Checks that receiver can work with latency longer than timeout +TEST(receiver_source, watchdog_timeout_smaller_than_latency) { + enum { Rate = SampleRate, Chans = Chans_Stereo, Local_latency = Timeout * 10 }; + + init(Rate, Chans, Rate, Chans); + + ReceiverSource receiver(make_config(Local_latency), encoding_map, packet_factory, + byte_buffer_factory, sample_buffer_factory, arena); + CHECK(receiver.is_valid()); + + ReceiverSlot* slot = create_slot(receiver); + CHECK(slot); + + packet::IWriter* endpoint1_writer = + create_transport_endpoint(slot, address::Iface_AudioSource, proto1); + CHECK(endpoint1_writer); + + test::FrameReader frame_reader(receiver, sample_buffer_factory); + + test::PacketWriter packet_writer(arena, *endpoint1_writer, encoding_map, + packet_factory, byte_buffer_factory, src_id1, + src_addr1, dst_addr1, PayloadType_Ch2); + + packet_writer.write_packets(1, SamplesPerPacket, packet_sample_spec); + + for (size_t np = 0; np < Local_latency / SamplesPerPacket; np++) { + for (size_t nf = 0; nf < FramesPerPacket; nf++) { + receiver.refresh(frame_reader.refresh_ts()); + frame_reader.read_zero_samples(SamplesPerFrame, output_sample_spec); + } + + UNSIGNED_LONGS_EQUAL(1, receiver.num_sessions()); + } + +} + } // namespace pipeline } // namespace roc