From 4e0102e4243febf4876ef1e3859e5f22a8934742 Mon Sep 17 00:00:00 2001 From: Mikhail Baranov Date: Sun, 11 Feb 2024 22:03:45 +0100 Subject: [PATCH] Latency Monitor controlls target latency --- .../roc_audio/freq_estimator.cpp | 37 +++++- .../roc_audio/freq_estimator.h | 19 ++- .../roc_audio/latency_monitor.cpp | 120 ++++++++++++++++-- .../roc_audio/latency_monitor.h | 36 +++++- .../roc_pipeline/receiver_session.cpp | 7 +- src/internal_modules/roc_rtp/link_meter.cpp | 1 + src/internal_modules/roc_rtp/link_meter.h | 7 +- 7 files changed, 203 insertions(+), 24 deletions(-) diff --git a/src/internal_modules/roc_audio/freq_estimator.cpp b/src/internal_modules/roc_audio/freq_estimator.cpp index 8b71573a1..c720db78b 100644 --- a/src/internal_modules/roc_audio/freq_estimator.cpp +++ b/src/internal_modules/roc_audio/freq_estimator.cpp @@ -9,6 +9,7 @@ #include "roc_audio/freq_estimator.h" #include "roc_core/log.h" #include "roc_core/panic.h" +#include "roc_core/time.h" namespace roc { namespace audio { @@ -25,6 +26,7 @@ FreqEstimatorConfig make_config(FreqEstimatorProfile profile) { config.I = 1e-10; config.decimation_factor1 = fe_decim_factor_max; config.decimation_factor2 = 0; + config.stable_criteria = 0.1; break; case FreqEstimatorProfile_Gradual: @@ -32,6 +34,7 @@ FreqEstimatorConfig make_config(FreqEstimatorProfile profile) { config.I = 5e-9; config.decimation_factor1 = fe_decim_factor_max; config.decimation_factor2 = fe_decim_factor_max; + config.stable_criteria = 0.05; break; } @@ -69,7 +72,9 @@ FreqEstimator::FreqEstimator(FreqEstimatorProfile profile, , dec2_ind_(0) , samples_counter_(0) , accum_(0) - , coeff_(1) { + , coeff_(1) + , stable_(false) + , last_unstable_time_(core::timestamp(core::ClockMonotonic)) { roc_log(LogDebug, "freq estimator: initializing: P=%e I=%e dc1=%lu dc2=%lu", config_.P, config_.I, (unsigned long)config_.decimation_factor1, (unsigned long)config_.decimation_factor2); @@ -149,10 +154,40 @@ bool FreqEstimator::run_decimators_(packet::stream_timestamp_t current, double FreqEstimator::run_controller_(double current) { const double error = (current - target_); + roc_log(LogTrace, + "Freq Estimator:" + " current latency error: %.0f", + error); + + if (abs(error) > target_ * config_.stable_criteria && stable_) { + stable_ = false; + last_unstable_time_ = core::timestamp(core::ClockMonotonic); + roc_log(LogDebug, + "Freq Estimator: " + " unstable, %0.f > %.0f / %0.f", + config_.stable_criteria, error, target_); + } else if (abs(error) < target_ * config_.stable_criteria && !stable_ && + core::timestamp(core::ClockMonotonic) - last_unstable_time_ + > 15 * core::Second) { + stable_ = true; + roc_log(LogDebug, + "Freq Estimator: " + " stabilized"); + } + accum_ = accum_ + error; return 1 + config_.P * error + config_.I * accum_; } +void FreqEstimator::update_target_latency(packet::stream_timestamp_t target_latency) { + target_ = (double)target_latency; +} + +bool FreqEstimator::stable() const { + + return stable_; +} + const char* fe_profile_to_str(FreqEstimatorProfile profile) { switch (profile) { case FreqEstimatorProfile_Responsive: diff --git a/src/internal_modules/roc_audio/freq_estimator.h b/src/internal_modules/roc_audio/freq_estimator.h index fbfe4a764..9fe9d7e33 100644 --- a/src/internal_modules/roc_audio/freq_estimator.h +++ b/src/internal_modules/roc_audio/freq_estimator.h @@ -44,11 +44,16 @@ struct FreqEstimatorConfig { //! to fe_decim_factor_max. Could be zero to disable the second decimation stage. size_t decimation_factor2; + //! Within this range we consider the FreqEstimator is stable. + //! stable_criteria > error / target; + double stable_criteria; + FreqEstimatorConfig() : P(0) , I(0) , decimation_factor1(0) - , decimation_factor2(0) { + , decimation_factor2(0) + , stable_criteria(0.1) { } }; @@ -75,12 +80,18 @@ class FreqEstimator : public core::NonCopyable<> { //! Compute new value of frequency coefficient. void update(packet::stream_timestamp_t current_latency); + //! Update target latency. + void update_target_latency(packet::stream_timestamp_t target_latency); + + //! If FreqEstimator has stabilized. + bool stable() const; + private: bool run_decimators_(packet::stream_timestamp_t current, double& filtered); double run_controller_(double current); const FreqEstimatorConfig config_; - const double target_; // Target latency. + double target_; // Target latency. double dec1_casc_buff_[fe_decim_len]; size_t dec1_ind_; @@ -92,6 +103,10 @@ class FreqEstimator : public core::NonCopyable<> { double accum_; // Integrator value. double coeff_; // Current frequency coefficient value. + + bool stable_; // True if FreqEstimator has stabilized. + // Last time when FreqEstimator was out of range. + core::nanoseconds_t last_unstable_time_; }; //! Get string name of FreqEstimator profile. diff --git a/src/internal_modules/roc_audio/latency_monitor.cpp b/src/internal_modules/roc_audio/latency_monitor.cpp index b9f5c4d3f..332c54cf2 100644 --- a/src/internal_modules/roc_audio/latency_monitor.cpp +++ b/src/internal_modules/roc_audio/latency_monitor.cpp @@ -29,6 +29,8 @@ double timestamp_to_ms(const SampleSpec& sample_spec, LatencyMonitor::LatencyMonitor(IFrameReader& frame_reader, const packet::SortedQueue& incoming_queue, const Depacketizer& depacketizer, + const fec::Reader* fec_reader, + const rtp::LinkMeter& link_meter, ResamplerReader* resampler, const LatencyMonitorConfig& config, core::nanoseconds_t target_latency, @@ -37,6 +39,8 @@ LatencyMonitor::LatencyMonitor(IFrameReader& frame_reader, : frame_reader_(frame_reader) , incoming_queue_(incoming_queue) , depacketizer_(depacketizer) + , fec_reader_(fec_reader) + , link_meter_(link_meter) , resampler_(resampler) , stream_pos_(0) , stream_cts_(0) @@ -52,24 +56,27 @@ LatencyMonitor::LatencyMonitor(IFrameReader& frame_reader, , e2e_latency_(0) , has_niq_latency_(false) , has_e2e_latency_(false) - , target_latency_(input_sample_spec.ns_2_stream_timestamp_delta(target_latency)) - , min_latency_(input_sample_spec.ns_2_stream_timestamp_delta( - target_latency - config.latency_tolerance)) - , max_latency_(input_sample_spec.ns_2_stream_timestamp_delta( - target_latency + config.latency_tolerance)) , max_scaling_delta_(config.scaling_tolerance) , input_sample_spec_(input_sample_spec) , output_sample_spec_(output_sample_spec) , alive_(true) - , valid_(false) { + , valid_(false) + , target_latency_auto_tune_(config.auto_tune_target_latency) + , target_latency_state_(TL_START) + , target_latency_ts_(input_sample_spec.ns_2_stream_timestamp_delta(target_latency)) + , max_latency_(input_sample_spec.ns_2_stream_timestamp_delta( + target_latency + config.latency_tolerance)) + , min_latency_(input_sample_spec.ns_2_stream_timestamp_delta( + target_latency - config.latency_tolerance)) + , last_target_latency_update_(core::timestamp(core::ClockMonotonic)) { roc_log( LogDebug, "latency monitor: initializing:" " target=%lu(%.3fms) min=%lu(%.3fms) max=%lu(%.3fms)" " in_rate=%lu out_rate=%lu" " fe_enable=%d fe_profile=%s fe_interval=%.3fms", - (unsigned long)target_latency_, - timestamp_to_ms(input_sample_spec_, target_latency_), (unsigned long)min_latency_, + (unsigned long)target_latency_ts_, + timestamp_to_ms(input_sample_spec_, target_latency_ts_), (unsigned long)min_latency_, timestamp_to_ms(input_sample_spec_, min_latency_), (unsigned long)max_latency_, timestamp_to_ms(input_sample_spec_, max_latency_), (unsigned long)input_sample_spec_.sample_rate(), @@ -78,7 +85,7 @@ LatencyMonitor::LatencyMonitor(IFrameReader& frame_reader, timestamp_to_ms(input_sample_spec_, (packet::stream_timestamp_diff_t)update_interval_)); - if (target_latency_ < min_latency_ || target_latency_ > max_latency_ + if (target_latency_ts_ < min_latency_ || target_latency_ts_ > max_latency_ || target_latency <= 0) { roc_log(LogError, "latency monitor: invalid config:" @@ -100,7 +107,7 @@ LatencyMonitor::LatencyMonitor(IFrameReader& frame_reader, } fe_.reset(new (fe_) FreqEstimator(config.fe_profile, - (packet::stream_timestamp_t)target_latency_)); + (packet::stream_timestamp_t)target_latency_ts_)); if (!fe_) { return; } @@ -213,6 +220,10 @@ bool LatencyMonitor::update_() { return false; } + if (target_latency_auto_tune_) { + update_target_latency_(); + } + // currently scaling is always updated based on niq latency if (has_niq_latency_) { if (!check_bounds_(niq_latency_)) { @@ -248,8 +259,8 @@ bool LatencyMonitor::check_bounds_(const packet::stream_timestamp_diff_t latency " latency=%ld(%.3fms) target=%ld(%.3fms)" " min=%ld(%.3fms) max=%ld(%.3fms) q_size=%lu", (long)latency, timestamp_to_ms(input_sample_spec_, latency), - (long)target_latency_, - timestamp_to_ms(input_sample_spec_, target_latency_), (long)min_latency_, + (long)target_latency_ts_, + timestamp_to_ms(input_sample_spec_, target_latency_ts_), (long)min_latency_, timestamp_to_ms(input_sample_spec_, min_latency_), (long)max_latency_, timestamp_to_ms(input_sample_spec_, max_latency_), (unsigned long)incoming_queue_.size()); @@ -329,9 +340,92 @@ void LatencyMonitor::report_() { " fe=%.6f trim_fe=%.6f", (long)e2e_latency_, timestamp_to_ms(input_sample_spec_, e2e_latency_), (long)niq_latency_, timestamp_to_ms(input_sample_spec_, niq_latency_), - (long)target_latency_, timestamp_to_ms(input_sample_spec_, target_latency_), + (long)target_latency_ts_, timestamp_to_ms(input_sample_spec_, target_latency_ts_), (double)(fe_ ? fe_->freq_coeff() : 0), (double)freq_coeff_); } +bool LatencyMonitor::update_target_latency_() { + core::nanoseconds_t fec_block_duration = 0; + if (fec_reader_) { + fec_block_duration = fec_reader_->max_block_duration(); + } + + const rtp::LinkMetrics metrics = link_meter_.metrics(); + + const core::nanoseconds_t tl = std::max(std::max(metrics.max_jitter, metrics.mean_jitter * 3), fec_block_duration); + const core::nanoseconds_t now = core::timestamp(core::ClockMonotonic); + core::nanoseconds_t cur_tl_ns = input_sample_spec_.stream_timestamp_delta_2_ns(target_latency_ts_); + + if (target_latency_state_ == TL_NONE) { + // If there is no active timeout, check if evaluated target latency is significantly smaller, + // than the latency in action so that we could decrease it. + if (tl < cur_tl_ns && + (cur_tl_ns - tl) / (double )(cur_tl_ns) > 0.1 && + fe_->stable()) { + const core::nanoseconds_t new_tl_ns = (core::nanoseconds_t)(cur_tl_ns * 0.9); + const packet::stream_timestamp_diff_t new_tl_ts = + input_sample_spec_.ns_2_stream_timestamp_delta(cur_tl_ns); + if (new_tl_ts < min_latency_) { + roc_log(LogDebug, "Latency monitor:" + " not decreasing target latency lower than limit %ld(%.3fms)", + (long)min_latency_, timestamp_to_ms(input_sample_spec_, min_latency_)); + return false; + } + + roc_log(LogInfo, + "Latency monitor:" + " decreasing target latency %ld(%.3fms)→%ld(%.3fms)", + (long)target_latency_ts_, (double)cur_tl_ns / core::Millisecond, + (long)new_tl_ts, (double)new_tl_ns / core::Millisecond); + + cur_tl_ns = new_tl_ns; + target_latency_ts_ = new_tl_ts; + last_target_latency_update_ = now; + target_latency_state_ = TL_DEC_TIMEOUT; + fe_->update_target_latency(target_latency_ts_); + + return true; + // If evaluated target latency is greater, than we must increase it. + } else if (tl > target_latency_ts_ + && (tl - cur_tl_ns) / (float)(cur_tl_ns) > 0.01) { + const core::nanoseconds_t new_tl_ns = (core::nanoseconds_t)(tl * 1.1); + const packet::stream_timestamp_diff_t new_tl_ts = + input_sample_spec_.ns_2_stream_timestamp_delta(cur_tl_ns); + + if (new_tl_ts > max_latency_) { + roc_log(LogDebug, "Latency monitor:" + " not increasing target latency more than limit %ld(%.3fms)", + (long)max_latency_, timestamp_to_ms(input_sample_spec_, max_latency_)); + return false; + } + + roc_log(LogInfo, + "Latency monitor:" + " increasing target latency %ld(%.3fms)→%ld(%.3fms)", + (long)target_latency_ts_, (double)cur_tl_ns / core::Millisecond, + (long)new_tl_ts, (double)new_tl_ns / core::Millisecond); + + + cur_tl_ns = (core::nanoseconds_t)(tl * 1.1); + target_latency_ts_ = input_sample_spec_.ns_2_stream_timestamp_delta(cur_tl_ns); + last_target_latency_update_ = now; + target_latency_state_ = TL_INC_TIMEOUT; + fe_->update_target_latency(target_latency_ts_); + return true; + } + } else if (target_latency_state_ == TL_DEC_TIMEOUT && + now - last_target_latency_update_ > 15 * core::Second) { + target_latency_state_ = TL_NONE; + } else if (target_latency_state_ == TL_START && + now - last_target_latency_update_ > 5 * core::Second) { + target_latency_state_ = TL_NONE; + } else if (target_latency_state_ == TL_INC_TIMEOUT && + now - last_target_latency_update_ > 5 * core::Second) { + target_latency_state_ = TL_NONE; + } + + return false; +} + } // namespace audio } // namespace roc diff --git a/src/internal_modules/roc_audio/latency_monitor.h b/src/internal_modules/roc_audio/latency_monitor.h index b600c5b88..4e2180a19 100644 --- a/src/internal_modules/roc_audio/latency_monitor.h +++ b/src/internal_modules/roc_audio/latency_monitor.h @@ -21,6 +21,8 @@ #include "roc_core/noncopyable.h" #include "roc_core/optional.h" #include "roc_core/time.h" +#include "roc_fec/reader.h" +#include "roc_rtp/link_meter.h" #include "roc_packet/sorted_queue.h" #include "roc_packet/units.h" @@ -48,12 +50,22 @@ struct LatencyMonitorConfig { //! For example, 0.01 allows freq_coeff values in range [0.99; 1.01]. float scaling_tolerance; + //! Automatically tune target latency within tolarance range so as to + //! + //! increase it when: + //! * jitter grows + //! * FEC start being sent or its block length grows + //! + //! or decrease it when jitter and FEC block length allows to. + bool auto_tune_target_latency; + LatencyMonitorConfig() : fe_enable(true) , fe_profile(FreqEstimatorProfile_Responsive) , fe_update_interval(5 * core::Millisecond) , latency_tolerance(0) - , scaling_tolerance(0.005f) { + , scaling_tolerance(0.005f) + , auto_tune_target_latency(false) { } //! Automatically deduce FreqEstimator profile from target latency. @@ -144,6 +156,7 @@ class LatencyMonitor : public IFrameReader, public core::NonCopyable<> { //! @b Parameters //! - @p frame_reader is inner frame reader for E2E latency calculation //! - @p incoming_queue and @p depacketizer are used to NIQ latency calculation + //! - @p fec_reader and @p link_meter_ are used to calculate target latency //! - @p resampler is used to set the scaling factor to compensate clock //! drift according to calculated latency //! - @p config defines calculation parameters @@ -154,6 +167,8 @@ class LatencyMonitor : public IFrameReader, public core::NonCopyable<> { LatencyMonitor(IFrameReader& frame_reader, const packet::SortedQueue& incoming_queue, const Depacketizer& depacketizer, + const fec::Reader* fec_reader, + const rtp::LinkMeter& link_meter, ResamplerReader* resampler, const LatencyMonitorConfig& config, core::nanoseconds_t target_latency, @@ -190,6 +205,8 @@ class LatencyMonitor : public IFrameReader, public core::NonCopyable<> { bool check_bounds_(packet::stream_timestamp_diff_t latency) const; + bool update_target_latency_(); + bool init_scaling_(size_t input_sample_rate, size_t output_sample_rate); bool update_scaling_(packet::stream_timestamp_diff_t latency); @@ -199,6 +216,8 @@ class LatencyMonitor : public IFrameReader, public core::NonCopyable<> { const packet::SortedQueue& incoming_queue_; const Depacketizer& depacketizer_; + const fec::Reader* fec_reader_; + const rtp::LinkMeter& link_meter_; ResamplerReader* resampler_; core::Optional fe_; @@ -219,9 +238,6 @@ class LatencyMonitor : public IFrameReader, public core::NonCopyable<> { bool has_niq_latency_; bool has_e2e_latency_; - const packet::stream_timestamp_diff_t target_latency_; - const packet::stream_timestamp_diff_t min_latency_; - const packet::stream_timestamp_diff_t max_latency_; const float max_scaling_delta_; @@ -230,6 +246,18 @@ class LatencyMonitor : public IFrameReader, public core::NonCopyable<> { bool alive_; bool valid_; + + const bool target_latency_auto_tune_; + enum TargetLatencyState { + TL_NONE, + TL_START, + TL_INC_TIMEOUT, + TL_DEC_TIMEOUT + } target_latency_state_; + packet::stream_timestamp_diff_t target_latency_ts_; + const packet::stream_timestamp_diff_t max_latency_; + const packet::stream_timestamp_diff_t min_latency_; + core::nanoseconds_t last_target_latency_update_; }; } // namespace audio diff --git a/src/internal_modules/roc_pipeline/receiver_session.cpp b/src/internal_modules/roc_pipeline/receiver_session.cpp index 9c1794bb7..ffc34b4b8 100644 --- a/src/internal_modules/roc_pipeline/receiver_session.cpp +++ b/src/internal_modules/roc_pipeline/receiver_session.cpp @@ -212,9 +212,10 @@ ReceiverSession::ReceiverSession( areader = session_poisoner_.get(); latency_monitor_.reset(new (latency_monitor_) audio::LatencyMonitor( - *areader, *source_queue_, *depacketizer_, resampler_reader_.get(), - session_config.latency_monitor, session_config.target_latency, - encoding->sample_spec, common_config.output_sample_spec)); + *areader, *source_queue_, *depacketizer_, fec_reader_.get(), *source_meter_, + resampler_reader_.get(), session_config.latency_monitor, + session_config.target_latency, encoding->sample_spec, + common_config.output_sample_spec)); if (!latency_monitor_ || !latency_monitor_->is_valid()) { return; } diff --git a/src/internal_modules/roc_rtp/link_meter.cpp b/src/internal_modules/roc_rtp/link_meter.cpp index 0e38f94f6..37f7e2244 100644 --- a/src/internal_modules/roc_rtp/link_meter.cpp +++ b/src/internal_modules/roc_rtp/link_meter.cpp @@ -106,6 +106,7 @@ void LinkMeter::update_jitter_(const packet::Packet& packet) { packet_jitter_stats_.add(std::abs(d_enq_ts - d_capt_ts)); metrics_.max_jitter = packet_jitter_stats_.mov_max(); metrics_.min_jitter = packet_jitter_stats_.mov_min(); + metrics_.mean_jitter = mean_jitter(); jitter_processed_++; metrics_.jitter = sample_spec_.ns_2_samples_per_chan(mean_jitter()); } diff --git a/src/internal_modules/roc_rtp/link_meter.h b/src/internal_modules/roc_rtp/link_meter.h index bd6013e8f..0126fb110 100644 --- a/src/internal_modules/roc_rtp/link_meter.h +++ b/src/internal_modules/roc_rtp/link_meter.h @@ -56,6 +56,9 @@ struct LinkMetrics { //! received and lost packets. size_t num_packets_covered; + //! Running mean of Jitter. + core::nanoseconds_t mean_jitter; + //! Running max of Jitter. core::nanoseconds_t max_jitter; @@ -67,7 +70,9 @@ struct LinkMetrics { , fract_loss(0) , cum_loss(0) , jitter(0) - , num_packets_covered(0) { + , num_packets_covered(0) + , max_jitter(0) + , min_jitter(0) { } };