Skip to content

Commit

Permalink
Latency Monitor controlls target latency
Browse files Browse the repository at this point in the history
  • Loading branch information
baranovmv committed Feb 11, 2024
1 parent 719ad18 commit 66b4cbb
Show file tree
Hide file tree
Showing 7 changed files with 188 additions and 23 deletions.
37 changes: 36 additions & 1 deletion src/internal_modules/roc_audio/freq_estimator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "roc_audio/freq_estimator.h"
#include "roc_core/log.h"
#include "roc_core/panic.h"
#include "roc_core/time.h"

namespace roc {
namespace audio {
Expand All @@ -25,13 +26,15 @@ FreqEstimatorConfig make_config(FreqEstimatorProfile profile) {
config.I = 1e-10;
config.decimation_factor1 = fe_decim_factor_max;
config.decimation_factor2 = 0;
config.stable_criteria = 0.1;
break;

case FreqEstimatorProfile_Gradual:
config.P = 1e-6;
config.I = 5e-9;
config.decimation_factor1 = fe_decim_factor_max;
config.decimation_factor2 = fe_decim_factor_max;
config.stable_criteria = 0.05;
break;
}

Expand Down Expand Up @@ -69,7 +72,9 @@ FreqEstimator::FreqEstimator(FreqEstimatorProfile profile,
, dec2_ind_(0)
, samples_counter_(0)
, accum_(0)
, coeff_(1) {
, coeff_(1)
, stable_(false)
, last_unstable_time_(core::timestamp(core::ClockMonotonic)) {
roc_log(LogDebug, "freq estimator: initializing: P=%e I=%e dc1=%lu dc2=%lu",
config_.P, config_.I, (unsigned long)config_.decimation_factor1,
(unsigned long)config_.decimation_factor2);
Expand Down Expand Up @@ -149,10 +154,40 @@ bool FreqEstimator::run_decimators_(packet::stream_timestamp_t current,
double FreqEstimator::run_controller_(double current) {
const double error = (current - target_);

roc_log(LogTrace,
"Freq Estimator:"
" current latency error: %.0f",
error);

if (abs(error) > target_ * config_.stable_criteria && stable_) {
stable_ = false;
last_unstable_time_ = core::timestamp(core::ClockMonotonic);
roc_log(LogDebug,
"Freq Estimator: "
" unstable, %0.f > %.0f / %0.f",
config_.stable_criteria, error, target_);
} else if (abs(error) < target_ * config_.stable_criteria && !stable_ &&
core::timestamp(core::ClockMonotonic) - last_unstable_time_
> 15 * core::Second) {
stable_ = true;
roc_log(LogDebug,
"Freq Estimator: "
" stabilized");
}

accum_ = accum_ + error;
return 1 + config_.P * error + config_.I * accum_;
}

void FreqEstimator::update_target_latency(packet::stream_timestamp_t target_latency) {
target_ = (double)target_latency;
}

bool FreqEstimator::stable() const {

return stable_;
}

const char* fe_profile_to_str(FreqEstimatorProfile profile) {
switch (profile) {
case FreqEstimatorProfile_Responsive:
Expand Down
19 changes: 17 additions & 2 deletions src/internal_modules/roc_audio/freq_estimator.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,16 @@ struct FreqEstimatorConfig {
//! to fe_decim_factor_max. Could be zero to disable the second decimation stage.
size_t decimation_factor2;

//! Within this range we consider the FreqEstimator is stable.
//! stable_criteria > error / target;
double stable_criteria;

FreqEstimatorConfig()
: P(0)
, I(0)
, decimation_factor1(0)
, decimation_factor2(0) {
, decimation_factor2(0)
, stable_criteria(0.1) {
}
};

Expand All @@ -75,12 +80,18 @@ class FreqEstimator : public core::NonCopyable<> {
//! Compute new value of frequency coefficient.
void update(packet::stream_timestamp_t current_latency);

//! Update target latency.
void update_target_latency(packet::stream_timestamp_t target_latency);

//! If FreqEstimator has stabilized.
bool stable() const;

private:
bool run_decimators_(packet::stream_timestamp_t current, double& filtered);
double run_controller_(double current);

const FreqEstimatorConfig config_;
const double target_; // Target latency.
double target_; // Target latency.

double dec1_casc_buff_[fe_decim_len];
size_t dec1_ind_;
Expand All @@ -92,6 +103,10 @@ class FreqEstimator : public core::NonCopyable<> {
double accum_; // Integrator value.

double coeff_; // Current frequency coefficient value.

bool stable_; // True if FreqEstimator has stabilized.
// Last time when FreqEstimator was out of range.
core::nanoseconds_t last_unstable_time_;
};

//! Get string name of FreqEstimator profile.
Expand Down
117 changes: 104 additions & 13 deletions src/internal_modules/roc_audio/latency_monitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ double timestamp_to_ms(const SampleSpec& sample_spec,
LatencyMonitor::LatencyMonitor(IFrameReader& frame_reader,
const packet::SortedQueue& incoming_queue,
const Depacketizer& depacketizer,
const fec::Reader* fec_reader,
const rtp::LinkMeter& link_meter,
ResamplerReader* resampler,
const LatencyMonitorConfig& config,
core::nanoseconds_t target_latency,
Expand All @@ -37,6 +39,8 @@ LatencyMonitor::LatencyMonitor(IFrameReader& frame_reader,
: frame_reader_(frame_reader)
, incoming_queue_(incoming_queue)
, depacketizer_(depacketizer)
, fec_reader_(fec_reader)
, link_meter_(link_meter)
, resampler_(resampler)
, stream_pos_(0)
, stream_cts_(0)
Expand All @@ -52,24 +56,26 @@ LatencyMonitor::LatencyMonitor(IFrameReader& frame_reader,
, e2e_latency_(0)
, has_niq_latency_(false)
, has_e2e_latency_(false)
, target_latency_(input_sample_spec.ns_2_stream_timestamp_delta(target_latency))
, min_latency_(input_sample_spec.ns_2_stream_timestamp_delta(
target_latency - config.latency_tolerance))
, max_latency_(input_sample_spec.ns_2_stream_timestamp_delta(
target_latency + config.latency_tolerance))
, max_scaling_delta_(config.scaling_tolerance)
, input_sample_spec_(input_sample_spec)
, output_sample_spec_(output_sample_spec)
, alive_(true)
, valid_(false) {
, valid_(false)
, target_latency_state_(TL_START)
, target_latency_ts_(input_sample_spec.ns_2_stream_timestamp_delta(target_latency))
, max_latency_(input_sample_spec.ns_2_stream_timestamp_delta(
target_latency + config.latency_tolerance))
, min_latency_(input_sample_spec.ns_2_stream_timestamp_delta(
target_latency - config.latency_tolerance))
, last_target_latency_update_(core::timestamp(core::ClockMonotonic)) {
roc_log(
LogDebug,
"latency monitor: initializing:"
" target=%lu(%.3fms) min=%lu(%.3fms) max=%lu(%.3fms)"
" in_rate=%lu out_rate=%lu"
" fe_enable=%d fe_profile=%s fe_interval=%.3fms",
(unsigned long)target_latency_,
timestamp_to_ms(input_sample_spec_, target_latency_), (unsigned long)min_latency_,
(unsigned long)target_latency_ts_,
timestamp_to_ms(input_sample_spec_, target_latency_ts_), (unsigned long)min_latency_,
timestamp_to_ms(input_sample_spec_, min_latency_), (unsigned long)max_latency_,
timestamp_to_ms(input_sample_spec_, max_latency_),
(unsigned long)input_sample_spec_.sample_rate(),
Expand All @@ -78,7 +84,7 @@ LatencyMonitor::LatencyMonitor(IFrameReader& frame_reader,
timestamp_to_ms(input_sample_spec_,
(packet::stream_timestamp_diff_t)update_interval_));

if (target_latency_ < min_latency_ || target_latency_ > max_latency_
if (target_latency_ts_ < min_latency_ || target_latency_ts_ > max_latency_
|| target_latency <= 0) {
roc_log(LogError,
"latency monitor: invalid config:"
Expand All @@ -100,7 +106,7 @@ LatencyMonitor::LatencyMonitor(IFrameReader& frame_reader,
}

fe_.reset(new (fe_) FreqEstimator(config.fe_profile,
(packet::stream_timestamp_t)target_latency_));
(packet::stream_timestamp_t)target_latency_ts_));
if (!fe_) {
return;
}
Expand Down Expand Up @@ -213,6 +219,8 @@ bool LatencyMonitor::update_() {
return false;
}

update_target_latency_();

// currently scaling is always updated based on niq latency
if (has_niq_latency_) {
if (!check_bounds_(niq_latency_)) {
Expand Down Expand Up @@ -248,8 +256,8 @@ bool LatencyMonitor::check_bounds_(const packet::stream_timestamp_diff_t latency
" latency=%ld(%.3fms) target=%ld(%.3fms)"
" min=%ld(%.3fms) max=%ld(%.3fms) q_size=%lu",
(long)latency, timestamp_to_ms(input_sample_spec_, latency),
(long)target_latency_,
timestamp_to_ms(input_sample_spec_, target_latency_), (long)min_latency_,
(long)target_latency_ts_,
timestamp_to_ms(input_sample_spec_, target_latency_ts_), (long)min_latency_,
timestamp_to_ms(input_sample_spec_, min_latency_), (long)max_latency_,
timestamp_to_ms(input_sample_spec_, max_latency_),
(unsigned long)incoming_queue_.size());
Expand Down Expand Up @@ -329,9 +337,92 @@ void LatencyMonitor::report_() {
" fe=%.6f trim_fe=%.6f",
(long)e2e_latency_, timestamp_to_ms(input_sample_spec_, e2e_latency_),
(long)niq_latency_, timestamp_to_ms(input_sample_spec_, niq_latency_),
(long)target_latency_, timestamp_to_ms(input_sample_spec_, target_latency_),
(long)target_latency_ts_, timestamp_to_ms(input_sample_spec_, target_latency_ts_),
(double)(fe_ ? fe_->freq_coeff() : 0), (double)freq_coeff_);
}

bool LatencyMonitor::update_target_latency_() {
core::nanoseconds_t fec_block_duration = 0;
if (fec_reader_) {
fec_block_duration = fec_reader_->max_block_duration();
}

const rtp::LinkMetrics metrics = link_meter_.metrics();

const core::nanoseconds_t tl = std::max(std::max(metrics.max_jitter, metrics.mean_jitter * 3), fec_block_duration);
const core::nanoseconds_t now = core::timestamp(core::ClockMonotonic);
core::nanoseconds_t cur_tl_ns = input_sample_spec_.stream_timestamp_delta_2_ns(target_latency_ts_);

if (target_latency_state_ == TL_NONE) {
// If there is no active timeout, check if evaluated target latency is significantly smaller,
// than the latency in action so that we could decrease it.
if (tl < cur_tl_ns &&
(cur_tl_ns - tl) / (double )(cur_tl_ns) > 0.1 &&
fe_->stable()) {
const core::nanoseconds_t new_tl_ns = (core::nanoseconds_t)(cur_tl_ns * 0.9);
const packet::stream_timestamp_diff_t new_tl_ts =
input_sample_spec_.ns_2_stream_timestamp_delta(cur_tl_ns);
if (new_tl_ts < min_latency_) {
roc_log(LogDebug, "Latency monitor:"
" not decreasing target latency lower than limit %ld(%.3fms)",
(long)min_latency_, timestamp_to_ms(input_sample_spec_, min_latency_));
return false;
}

roc_log(LogInfo,
"Latency monitor:"
" decreasing target latency %ld(%.3fms)→%ld(%.3fms)",
(long)target_latency_ts_, (double)cur_tl_ns / core::Millisecond,
(long)new_tl_ts, (double)new_tl_ns / core::Millisecond);

cur_tl_ns = new_tl_ns;
target_latency_ts_ = new_tl_ts;
last_target_latency_update_ = now;
target_latency_state_ = TL_DEC_TIMEOUT;
fe_->update_target_latency(target_latency_ts_);

return true;
// If evaluated target latency is greater, than we must increase it.
} else if (tl > target_latency_ts_
&& (tl - cur_tl_ns) / (float)(cur_tl_ns) > 0.01) {
const core::nanoseconds_t new_tl_ns = (core::nanoseconds_t)(tl * 1.1);
const packet::stream_timestamp_diff_t new_tl_ts =
input_sample_spec_.ns_2_stream_timestamp_delta(cur_tl_ns);

if (new_tl_ts > max_latency_) {
roc_log(LogDebug, "Latency monitor:"
" not increasing target latency more than limit %ld(%.3fms)",
(long)max_latency_, timestamp_to_ms(input_sample_spec_, max_latency_));
return false;
}

roc_log(LogInfo,
"Latency monitor:"
" increasing target latency %ld(%.3fms)→%ld(%.3fms)",
(long)target_latency_ts_, (double)cur_tl_ns / core::Millisecond,
(long)new_tl_ts, (double)new_tl_ns / core::Millisecond);


cur_tl_ns = (core::nanoseconds_t)(tl * 1.1);
target_latency_ts_ = input_sample_spec_.ns_2_stream_timestamp_delta(cur_tl_ns);
last_target_latency_update_ = now;
target_latency_state_ = TL_INC_TIMEOUT;
fe_->update_target_latency(target_latency_ts_);
return true;
}
} else if (target_latency_state_ == TL_DEC_TIMEOUT &&
now - last_target_latency_update_ > 15 * core::Second) {
target_latency_state_ = TL_NONE;
} else if (target_latency_state_ == TL_START &&
now - last_target_latency_update_ > 5 * core::Second) {
target_latency_state_ = TL_NONE;
} else if (target_latency_state_ == TL_INC_TIMEOUT &&
now - last_target_latency_update_ > 5 * core::Second) {
target_latency_state_ = TL_NONE;
}

return false;
}

} // namespace audio
} // namespace roc
23 changes: 20 additions & 3 deletions src/internal_modules/roc_audio/latency_monitor.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
#include "roc_core/noncopyable.h"
#include "roc_core/optional.h"
#include "roc_core/time.h"
#include "roc_fec/reader.h"
#include "roc_rtp/link_meter.h"
#include "roc_packet/sorted_queue.h"
#include "roc_packet/units.h"

Expand Down Expand Up @@ -144,6 +146,7 @@ class LatencyMonitor : public IFrameReader, public core::NonCopyable<> {
//! @b Parameters
//! - @p frame_reader is inner frame reader for E2E latency calculation
//! - @p incoming_queue and @p depacketizer are used to NIQ latency calculation
//! - @p fec_reader and @p link_meter_ are used to calculate target latency
//! - @p resampler is used to set the scaling factor to compensate clock
//! drift according to calculated latency
//! - @p config defines calculation parameters
Expand All @@ -154,6 +157,8 @@ class LatencyMonitor : public IFrameReader, public core::NonCopyable<> {
LatencyMonitor(IFrameReader& frame_reader,
const packet::SortedQueue& incoming_queue,
const Depacketizer& depacketizer,
const fec::Reader* fec_reader,
const rtp::LinkMeter& link_meter,
ResamplerReader* resampler,
const LatencyMonitorConfig& config,
core::nanoseconds_t target_latency,
Expand Down Expand Up @@ -190,6 +195,8 @@ class LatencyMonitor : public IFrameReader, public core::NonCopyable<> {

bool check_bounds_(packet::stream_timestamp_diff_t latency) const;

bool update_target_latency_();

bool init_scaling_(size_t input_sample_rate, size_t output_sample_rate);
bool update_scaling_(packet::stream_timestamp_diff_t latency);

Expand All @@ -199,6 +206,8 @@ class LatencyMonitor : public IFrameReader, public core::NonCopyable<> {

const packet::SortedQueue& incoming_queue_;
const Depacketizer& depacketizer_;
const fec::Reader* fec_reader_;
const rtp::LinkMeter& link_meter_;

ResamplerReader* resampler_;
core::Optional<FreqEstimator> fe_;
Expand All @@ -219,9 +228,6 @@ class LatencyMonitor : public IFrameReader, public core::NonCopyable<> {
bool has_niq_latency_;
bool has_e2e_latency_;

const packet::stream_timestamp_diff_t target_latency_;
const packet::stream_timestamp_diff_t min_latency_;
const packet::stream_timestamp_diff_t max_latency_;

const float max_scaling_delta_;

Expand All @@ -230,6 +236,17 @@ class LatencyMonitor : public IFrameReader, public core::NonCopyable<> {

bool alive_;
bool valid_;

enum TargetLatencyState {
TL_NONE,
TL_START,
TL_INC_TIMEOUT,
TL_DEC_TIMEOUT
} target_latency_state_;
packet::stream_timestamp_diff_t target_latency_ts_;
const packet::stream_timestamp_diff_t max_latency_;
const packet::stream_timestamp_diff_t min_latency_;
core::nanoseconds_t last_target_latency_update_;
};

} // namespace audio
Expand Down
7 changes: 4 additions & 3 deletions src/internal_modules/roc_pipeline/receiver_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -212,9 +212,10 @@ ReceiverSession::ReceiverSession(
areader = session_poisoner_.get();

latency_monitor_.reset(new (latency_monitor_) audio::LatencyMonitor(
*areader, *source_queue_, *depacketizer_, resampler_reader_.get(),
session_config.latency_monitor, session_config.target_latency,
encoding->sample_spec, common_config.output_sample_spec));
*areader, *source_queue_, *depacketizer_, fec_reader_.get(), *source_meter_,
resampler_reader_.get(), session_config.latency_monitor,
session_config.target_latency, encoding->sample_spec,
common_config.output_sample_spec));
if (!latency_monitor_ || !latency_monitor_->is_valid()) {
return;
}
Expand Down
Loading

0 comments on commit 66b4cbb

Please sign in to comment.