From 17235e190b7e645fd1fe5c87a467c9a44c3e581c Mon Sep 17 00:00:00 2001 From: Mikhail Baranov Date: Sun, 28 Apr 2024 19:18:46 +0200 Subject: [PATCH] gh-688: Dynamic latency adjustment #688 --- docs/sphinx/manuals/roc_recv.rst | 5 +- .../roc_audio/feedback_monitor.cpp | 9 +- .../roc_audio/feedback_monitor.h | 3 +- .../roc_audio/freq_estimator.cpp | 92 +++- .../roc_audio/freq_estimator.h | 42 +- .../roc_audio/latency_monitor.cpp | 7 +- .../roc_audio/latency_monitor.h | 4 +- .../roc_audio/latency_tuner.cpp | 458 +++++++++++++++--- .../roc_audio/latency_tuner.h | 121 ++++- src/internal_modules/roc_core/csv_dumper.cpp | 8 +- src/internal_modules/roc_core/csv_dumper.h | 9 +- src/internal_modules/roc_packet/ilink_meter.h | 31 +- src/internal_modules/roc_pipeline/config.cpp | 3 +- src/internal_modules/roc_pipeline/config.h | 9 +- .../roc_pipeline/receiver_session.cpp | 31 +- .../roc_pipeline/receiver_session.h | 6 +- .../roc_pipeline/receiver_session_group.cpp | 7 +- .../roc_pipeline/receiver_session_group.h | 6 +- .../roc_pipeline/receiver_slot.cpp | 7 +- .../roc_pipeline/receiver_slot.h | 4 +- .../roc_pipeline/receiver_source.cpp | 21 +- .../roc_pipeline/receiver_source.h | 5 + .../roc_pipeline/sender_session.cpp | 8 +- .../roc_pipeline/sender_session.h | 5 +- .../roc_pipeline/sender_sink.cpp | 14 +- .../roc_pipeline/sender_sink.h | 3 + .../roc_pipeline/sender_slot.cpp | 12 +- .../roc_pipeline/sender_slot.h | 3 +- .../roc_rtcp/loss_estimator.h | 2 +- src/internal_modules/roc_rtp/link_meter.cpp | 103 +++- src/internal_modules/roc_rtp/link_meter.h | 39 +- src/public_api/include/roc/config.h | 91 ++++ src/public_api/include/roc/metrics.h | 19 + src/public_api/src/adapters.cpp | 48 ++ .../test_loopback_encoder_2_decoder.cpp | 28 +- .../test_loopback_sender_2_receiver.cpp | 31 +- src/tests/roc_audio/test_freq_estimator.cpp | 16 +- .../test_loopback_sink_2_source.cpp | 44 +- .../roc_pipeline/test_receiver_endpoint.cpp | 6 +- src/tests/roc_pipeline/test_receiver_loop.cpp | 1 + .../roc_pipeline/test_receiver_source.cpp | 99 +++- .../roc_pipeline/test_sender_endpoint.cpp | 6 +- src/tests/roc_pipeline/test_sender_sink.cpp | 6 +- .../roc_pipeline/test_session_router.cpp | 4 +- src/tests/roc_rtp/test_link_meter.cpp | 319 +++++++++++- src/tools/roc_recv/cmdline.ggo | 16 +- src/tools/roc_recv/main.cpp | 81 +++- src/tools/roc_send/cmdline.ggo | 7 +- src/tools/roc_send/main.cpp | 4 + 49 files changed, 1666 insertions(+), 237 deletions(-) diff --git a/docs/sphinx/manuals/roc_recv.rst b/docs/sphinx/manuals/roc_recv.rst index 66a3454ea..bce74b698 100644 --- a/docs/sphinx/manuals/roc_recv.rst +++ b/docs/sphinx/manuals/roc_recv.rst @@ -28,8 +28,11 @@ Options --miface=MIFACE IPv4 or IPv6 address of the network interface on which to join the multicast group --reuseaddr enable SO_REUSEADDR when binding sockets --target-latency=STRING Target latency, TIME units ---io-latency=STRING Playback target latency, TIME units --latency-tolerance=STRING Maximum deviation from target latency, TIME units +--start-latency=STRING Target latency, TIME units +--min-latency=STRING Minimum allowed latency, TIME units +--max-latency=STRING Maximum allowed latency, TIME units +--io-latency=STRING Playback target latency, TIME units --no-play-timeout=STRING No playback timeout, TIME units --choppy-play-timeout=STRING Choppy playback timeout, TIME units --frame-len=TIME Duration of the internal frames, TIME units diff --git a/src/internal_modules/roc_audio/feedback_monitor.cpp b/src/internal_modules/roc_audio/feedback_monitor.cpp index 2d3556fe6..acfa810bd 100644 --- a/src/internal_modules/roc_audio/feedback_monitor.cpp +++ b/src/internal_modules/roc_audio/feedback_monitor.cpp @@ -20,8 +20,9 @@ FeedbackMonitor::FeedbackMonitor(IFrameWriter& writer, ResamplerWriter* resampler, const FeedbackConfig& feedback_config, const LatencyConfig& latency_config, - const SampleSpec& sample_spec) - : tuner_(latency_config, sample_spec) + const SampleSpec& sample_spec, + core::CsvDumper* dumper) + : tuner_(latency_config, sample_spec, dumper) , use_packetizer_(false) , has_feedback_(false) , last_feedback_ts_(0) @@ -104,10 +105,10 @@ void FeedbackMonitor::process_feedback(packet::stream_source_t source_id, latency_metrics_ = latency_metrics; link_metrics_ = link_metrics; - if (link_metrics_.total_packets == 0 || use_packetizer_) { + if (link_metrics_.expected_packets == 0 || use_packetizer_) { // If packet counter is not reported from receiver, fallback to // counter from sender. - link_metrics_.total_packets = packetizer_.metrics().encoded_packet_count; + link_metrics_.expected_packets = packetizer_.metrics().encoded_packet_count; use_packetizer_ = true; } diff --git a/src/internal_modules/roc_audio/feedback_monitor.h b/src/internal_modules/roc_audio/feedback_monitor.h index 902fc4aaf..2c7c6bd95 100644 --- a/src/internal_modules/roc_audio/feedback_monitor.h +++ b/src/internal_modules/roc_audio/feedback_monitor.h @@ -68,7 +68,8 @@ class FeedbackMonitor : public IFrameWriter, public core::NonCopyable<> { ResamplerWriter* resampler, const FeedbackConfig& feedback_config, const LatencyConfig& latency_config, - const SampleSpec& sample_spec); + const SampleSpec& sample_spec, + core::CsvDumper* dumper); //! Check if the object was successfully constructed. status::StatusCode init_status() const; diff --git a/src/internal_modules/roc_audio/freq_estimator.cpp b/src/internal_modules/roc_audio/freq_estimator.cpp index 82ebb97e3..4c1b105a0 100644 --- a/src/internal_modules/roc_audio/freq_estimator.cpp +++ b/src/internal_modules/roc_audio/freq_estimator.cpp @@ -9,6 +9,7 @@ #include "roc_audio/freq_estimator.h" #include "roc_core/log.h" #include "roc_core/panic.h" +#include "roc_core/time.h" namespace roc { namespace audio { @@ -25,6 +26,7 @@ FreqEstimatorConfig make_config(FreqEstimatorProfile profile) { config.I = 1e-10; config.decimation_factor1 = fe_decim_factor_max; config.decimation_factor2 = 0; + config.stable_criteria = 0.1; break; case FreqEstimatorProfile_Gradual: @@ -32,8 +34,11 @@ FreqEstimatorConfig make_config(FreqEstimatorProfile profile) { config.I = 5e-9; config.decimation_factor1 = fe_decim_factor_max; config.decimation_factor2 = fe_decim_factor_max; + config.stable_criteria = 0.05; break; } + config.stability_duration_criteria = 15 * core::Second; + config.control_action_saturation_cap = 1e-2; return config; } @@ -62,14 +67,18 @@ double dot_prod(const double* coeff, } // namespace FreqEstimator::FreqEstimator(FreqEstimatorProfile profile, - packet::stream_timestamp_t target_latency) + packet::stream_timestamp_t target_latency, + core::CsvDumper* dumper) : config_(make_config(profile)) , target_(target_latency) , dec1_ind_(0) , dec2_ind_(0) , samples_counter_(0) , accum_(0) - , coeff_(1) { + , coeff_(1) + , stable_(false) + , last_unstable_time_(core::timestamp(core::ClockMonotonic)) + , dumper_(dumper) { roc_log(LogDebug, "freq estimator: initializing: P=%e I=%e dc1=%lu dc2=%lu", config_.P, config_.I, (unsigned long)config_.decimation_factor1, (unsigned long)config_.decimation_factor2); @@ -101,10 +110,13 @@ float FreqEstimator::freq_coeff() const { return (float)coeff_; } -void FreqEstimator::update(packet::stream_timestamp_t current) { +void FreqEstimator::update_current_latency(packet::stream_timestamp_t current_latency) { double filtered; - if (run_decimators_(current, filtered)) { + if (run_decimators_(current_latency, filtered)) { + if (dumper_) { + dump_(filtered); + } coeff_ = run_controller_(filtered); } } @@ -149,8 +161,76 @@ bool FreqEstimator::run_decimators_(packet::stream_timestamp_t current, double FreqEstimator::run_controller_(double current) { const double error = (current - target_); - accum_ = accum_ + error; - return 1 + config_.P * error + config_.I * accum_; + roc_log(LogTrace, + "freq estimator:" + " current latency error: %.0f", + error); + + const core::nanoseconds_t now = core::timestamp(core::ClockMonotonic); + + if (std::abs(error) > target_ * config_.stable_criteria && stable_) { + stable_ = false; + accum_ = 0; + last_unstable_time_ = now; + roc_log(LogDebug, + "freq estimator: " + " unstable, %0.f > %.0f / %0.f", + config_.stable_criteria, error, target_); + } else if (std::abs(error) < target_ * config_.stable_criteria && !stable_ + && now - last_unstable_time_ > config_.stability_duration_criteria) { + stable_ = true; + roc_log(LogDebug, + "freq estimator: " + " stabilized"); + } + + double res = 0.; + // In stable state we are not using P term in order to avoid permanent variation + // of resampler control input. + if (stable_) { + accum_ = accum_ + error; + res += config_.I * accum_; + } else { + res += config_.P * error; + } + if (std::abs(res) > config_.control_action_saturation_cap) { + res = res / std::abs(res) * config_.control_action_saturation_cap; + } + res += 1.; + + return res; +} + +void FreqEstimator::dump_(double filtered) { + core::CsvEntry e; + e.type = 'f'; + e.n_fields = 5; + e.fields[0] = core::timestamp(core::ClockUnix); + e.fields[1] = filtered; + e.fields[2] = target_; + e.fields[3] = (filtered - target_) * config_.P; + e.fields[4] = accum_ * config_.I; + dumper_->write(e); +} + +void FreqEstimator::update_target_latency(packet::stream_timestamp_t target_latency) { + target_ = (double)target_latency; +} + +bool FreqEstimator::is_stable() const { + return stable_; +} + +static const char* fe_profile_to_str(FreqEstimatorProfile profile) { + switch (profile) { + case FreqEstimatorProfile_Responsive: + return "responsive"; + + case FreqEstimatorProfile_Gradual: + return "gradual"; + } + + return ""; } } // namespace audio diff --git a/src/internal_modules/roc_audio/freq_estimator.h b/src/internal_modules/roc_audio/freq_estimator.h index 68407721b..01399eaf2 100644 --- a/src/internal_modules/roc_audio/freq_estimator.h +++ b/src/internal_modules/roc_audio/freq_estimator.h @@ -14,6 +14,7 @@ #include "roc_audio/freq_estimator_decim.h" #include "roc_audio/sample.h" +#include "roc_core/csv_dumper.h" #include "roc_core/noncopyable.h" #include "roc_packet/units.h" @@ -44,11 +45,26 @@ struct FreqEstimatorConfig { //! to fe_decim_factor_max. Could be zero to disable the second decimation stage. size_t decimation_factor2; + //! Within this range we consider the FreqEstimator is stable. + //! stable_criteria > error / target; + double stable_criteria; + + //! How much time current latency readings must be within stable_criteria range + //! to let FreqEstimator switch into stable state. + core::nanoseconds_t stability_duration_criteria; + + //! FreqEstimator limits its output control action value with this value so as to + //! keep sensible pace of latency adjustment if there is a long way to go. + double control_action_saturation_cap; + FreqEstimatorConfig() : P(0) , I(0) , decimation_factor1(0) - , decimation_factor2(0) { + , decimation_factor2(0) + , stable_criteria(0.1) + , stability_duration_criteria(0) + , control_action_saturation_cap(0) { } }; @@ -67,20 +83,32 @@ class FreqEstimator : public core::NonCopyable<> { //! - @p profile defines configuration preset. //! - @p target_latency defines latency we want to archive. FreqEstimator(FreqEstimatorProfile profile, - packet::stream_timestamp_t target_latency); + packet::stream_timestamp_t target_latency, + roc::core::CsvDumper* dumper); //! Get current frequecy coefficient. float freq_coeff() const; //! Compute new value of frequency coefficient. - void update(packet::stream_timestamp_t current_latency); + void update_current_latency(packet::stream_timestamp_t current_latency); + + //! Update target latency. + void update_target_latency(packet::stream_timestamp_t target_latency); + + //! Is FreqEstimator in stable state. + //! @remarks + //! If current_latency is in kept within certain limits around target_latency + //! FreqEstimator is in 'stable' state, otherwise it is 'not-stable' state. + //! The state affects internal regulator strategy and it effectiveness. + bool is_stable() const; private: bool run_decimators_(packet::stream_timestamp_t current, double& filtered); double run_controller_(double current); + void dump_(double filtered); const FreqEstimatorConfig config_; - const double target_; // Target latency. + double target_; // Target latency. double dec1_casc_buff_[fe_decim_len]; size_t dec1_ind_; @@ -92,6 +120,12 @@ class FreqEstimator : public core::NonCopyable<> { double accum_; // Integrator value. double coeff_; // Current frequency coefficient value. + + bool stable_; // True if FreqEstimator has stabilized. + // Last time when FreqEstimator was out of range. + core::nanoseconds_t last_unstable_time_; + + core::CsvDumper* dumper_; }; } // namespace audio diff --git a/src/internal_modules/roc_audio/latency_monitor.cpp b/src/internal_modules/roc_audio/latency_monitor.cpp index 870806620..368778465 100644 --- a/src/internal_modules/roc_audio/latency_monitor.cpp +++ b/src/internal_modules/roc_audio/latency_monitor.cpp @@ -10,9 +10,7 @@ #include "roc_audio/freq_estimator.h" #include "roc_core/log.h" #include "roc_core/panic.h" -#include "roc_core/stddefs.h" #include "roc_core/time.h" -#include "roc_rtp/link_meter.h" namespace roc { namespace audio { @@ -25,8 +23,9 @@ LatencyMonitor::LatencyMonitor(IFrameReader& frame_reader, ResamplerReader* resampler, const LatencyConfig& config, const SampleSpec& packet_sample_spec, - const SampleSpec& frame_sample_spec) - : tuner_(config, frame_sample_spec) + const SampleSpec& frame_sample_spec, + core::CsvDumper* dumper) + : tuner_(config, frame_sample_spec, dumper) , frame_reader_(frame_reader) , incoming_queue_(incoming_queue) , depacketizer_(depacketizer) diff --git a/src/internal_modules/roc_audio/latency_monitor.h b/src/internal_modules/roc_audio/latency_monitor.h index 5584b75b3..517ad208e 100644 --- a/src/internal_modules/roc_audio/latency_monitor.h +++ b/src/internal_modules/roc_audio/latency_monitor.h @@ -19,6 +19,7 @@ #include "roc_audio/resampler_reader.h" #include "roc_audio/sample_spec.h" #include "roc_core/attributes.h" +#include "roc_core/csv_dumper.h" #include "roc_core/noncopyable.h" #include "roc_core/optional.h" #include "roc_core/time.h" @@ -68,7 +69,8 @@ class LatencyMonitor : public IFrameReader, public core::NonCopyable<> { ResamplerReader* resampler, const LatencyConfig& config, const SampleSpec& packet_sample_spec, - const SampleSpec& frame_sample_spec); + const SampleSpec& frame_sample_spec, + core::CsvDumper* dumper); //! Check if the object was successfully constructed. status::StatusCode init_status() const; diff --git a/src/internal_modules/roc_audio/latency_tuner.cpp b/src/internal_modules/roc_audio/latency_tuner.cpp index fccd420a6..e6f493983 100644 --- a/src/internal_modules/roc_audio/latency_tuner.cpp +++ b/src/internal_modules/roc_audio/latency_tuner.cpp @@ -18,6 +18,20 @@ namespace { const core::nanoseconds_t LogInterval = 5 * core::Second; +// Calculates latency decreasment step value such that +// if current latency equals exactly upper threshold value, +// after the decreasment it will get in the middle between threshold and estimated +// value. +float upper_coef_to_step_lat_update_(const float x) { + return ((x + 1.f) / (x * 2.f)); +} + +// Calculates latency increasment step value based on +// latency_decrease_relative_threshold_. +float lower_thrs_to_step_lat_update_(const float x) { + return (x + 1.f) / 2.f; +} + } // namespace void LatencyConfig::deduce_defaults(core::nanoseconds_t default_target_latency, @@ -27,6 +41,9 @@ void LatencyConfig::deduce_defaults(core::nanoseconds_t default_target_latency, tuner_backend = LatencyTunerBackend_Niq; } + bool auto_tune_latency = target_latency == 0; + core::nanoseconds_t latency = std::max(target_latency, start_latency); + if (tuner_profile == LatencyTunerProfile_Default) { if (is_receiver) { if (tuner_backend == LatencyTunerBackend_Niq) { @@ -37,8 +54,7 @@ void LatencyConfig::deduce_defaults(core::nanoseconds_t default_target_latency, // If latency is high, we assume the jitter may be also high. In // this case use gradual profile because it can handle high jitter // much better. - tuner_profile = - target_latency > 0 && target_latency < 30 * core::Millisecond + tuner_profile = latency > 0 && latency < 30 * core::Millisecond ? LatencyTunerProfile_Responsive : LatencyTunerProfile_Gradual; } else { @@ -53,13 +69,23 @@ void LatencyConfig::deduce_defaults(core::nanoseconds_t default_target_latency, } } + if (sliding_stat_window_length == 0) { + if (tuner_profile == audio::LatencyTunerProfile_Responsive) { + sliding_stat_window_length = 10000; + } else { + sliding_stat_window_length = 30000; + } + } + // Deduce default target latency. - if (target_latency == 0) { + if (latency == 0) { if (is_receiver) { if (tuner_profile != LatencyTunerProfile_Intact) { // Latency tuning is enabled on receiver. // Use default if target latency is not specified. - target_latency = default_target_latency; + start_latency = default_target_latency; + auto_tune_latency = true; + latency = std::max(target_latency, start_latency); } else { // Latency tuning is disabled on receiver. // Most likely, it is enabled on sender. To make tuning work on sender, @@ -79,65 +105,93 @@ void LatencyConfig::deduce_defaults(core::nanoseconds_t default_target_latency, } } } - // If latency tuning is enabled. if (tuner_profile != LatencyTunerProfile_Intact) { - // Deduce defaults for min_latency & max_latency if both are zero. - if (latency_tolerance == 0) { - if (target_latency > 0) { + if (auto_tune_latency) { + // Deduce defaults for min_latency & max_latency if both are zero. + if (min_latency == 0 && max_latency == 0) { // Out formula doesn't work well on latencies close to zero. const core::nanoseconds_t floored_target_latency = - std::max(target_latency, core::Millisecond); + std::max(latency, core::Millisecond); // On sender, apply multiplier to make default tolerance a bit higher than // on receiver. This way, if bounding is enabled on both sides, receiver // will always trigger first. const int multiplier = is_receiver ? 1 : 4; - // This formula returns target_latency * N, where N starts with larger - // number and approaches 0.5 as target_latency grows. - // By default we're very tolerant and allow rather big oscillations. - // Examples (for multiplier = 1): - // target=1ms -> tolerance=8ms (x8) - // target=10ms -> tolerance=20ms (x2) - // target=200ms -> tolerance=200ms (x1) - // target=2000ms -> tolerance=1444ms (x0.722) - latency_tolerance = core::nanoseconds_t( - floored_target_latency - * (std::log((200 * core::Millisecond) * 2 * multiplier) - / std::log(floored_target_latency * 2))); - } else { - // Can't deduce latency_tolerance without target_latency. - latency_tolerance = -1; + const core::nanoseconds_t latency_tolerance = + calc_latency_tolerance(floored_target_latency, multiplier); + + min_latency = floored_target_latency - latency_tolerance; + max_latency = floored_target_latency + latency_tolerance; + + if (latency_decrease_relative_threshold_ == 0.f) { + latency_decrease_relative_threshold_ = + (float)max_latency / (float)floored_target_latency; + } } - } + } else { + // Fixed latency + // Deduce default tolerance. + if (latency_tolerance == 0) { + // Out formula doesn't work well on latencies close to zero. + const core::nanoseconds_t floored_target_latency = + std::max(latency, core::Millisecond); - // Deduce defaults for scaling_interval & scaling_tolerance. - if (scaling_interval == 0) { - scaling_interval = 5 * core::Millisecond; - } - if (scaling_tolerance == 0) { - scaling_tolerance = 0.005f; + // On sender, apply multiplier to make default tolerance a bit higher than + // on receiver. This way, if bounding is enabled on both sides, receiver + // will always trigger first. + const int multiplier = is_receiver ? 1 : 4; + + latency_tolerance = + calc_latency_tolerance(floored_target_latency, multiplier); + } } } + // Deduce defaults for scaling_interval & scaling_tolerance. + if (scaling_interval == 0) { + scaling_interval = 5 * core::Millisecond; + } + if (scaling_tolerance == 0) { + scaling_tolerance = 0.005f; + } + // If latency bounding is enabled. - if (latency_tolerance != 0) { + if (min_latency != 0 || max_latency != 0 || latency_tolerance != 0) { // Deduce default for stale_tolerance. if (stale_tolerance == 0) { - if (target_latency > 0) { + if (latency > 0) { // Consider queue "stalling" if at least 1/4 of the missing latency // is caused by lack of new packets. - stale_tolerance = latency_tolerance / 4; + stale_tolerance = (latency - min_latency) / 4; } else { - // Can't deduce stale_tolerance without target_latency. + // Can't deduce stale_tolerance without latency. stale_tolerance = -1; } } } } -LatencyTuner::LatencyTuner(const LatencyConfig& config, const SampleSpec& sample_spec) +core::nanoseconds_t +LatencyConfig::calc_latency_tolerance(const core::nanoseconds_t latency, + const int multiplier) const { + // This formula returns target_latency * N, where N starts with larger + // number and approaches 0.5 as target_latency grows. + // By default we're very tolerant and allow rather big oscillations. + // Examples (for multiplier = 1): + // target=1ms -> tolerance=8ms (x8) + // target=10ms -> tolerance=20ms (x2) + // target=200ms -> tolerance=200ms (x1) + // target=2000ms -> tolerance=1444ms (x0.722) + return core::nanoseconds_t( + latency + * (std::log((200 * core::Millisecond) * 2 * multiplier) / std::log(latency * 2))); +} + +LatencyTuner::LatencyTuner(const LatencyConfig& config, + const SampleSpec& sample_spec, + core::CsvDumper* dumper) : stream_pos_(0) , scale_interval_(0) , scale_pos_(0) @@ -148,51 +202,97 @@ LatencyTuner::LatencyTuner(const LatencyConfig& config, const SampleSpec& sample , freq_coeff_max_delta_(config.scaling_tolerance) , backend_(config.tuner_backend) , profile_(config.tuner_profile) - , enable_tuning_(config.tuner_profile != audio::LatencyTunerProfile_Intact) - , enable_bounds_(config.tuner_profile != audio::LatencyTunerProfile_Intact - || config.latency_tolerance != 0) + , enable_tuning_(config.tuner_profile != audio::LatencyTunerProfile_Intact + && (config.start_latency != 0 || config.target_latency)) + , enable_bounds_( + (config.tuner_profile != audio::LatencyTunerProfile_Intact && !enable_tuning_) + || config.min_latency != 0 || config.max_latency != 0 + || config.latency_tolerance != 0) , has_niq_latency_(false) , niq_latency_(0) , niq_stalling_(0) , has_e2e_latency_(false) , e2e_latency_(0) - , has_jitter_(false) - , jitter_(0) + , has_metrics_(false) + , auto_tune_(config.start_latency > 0 && config.target_latency == 0) , target_latency_(0) , min_latency_(0) , max_latency_(0) , max_stalling_(0) , sample_spec_(sample_spec) + , target_latency_state_(TL_STARTING) + , starting_timeout_(config.starting_timeout) + , cooldown_dec_timeout_(config.cooldown_dec_timeout) + , cooldown_inc_timeout_(config.cooldown_inc_timeout) + , max_jitter_overhead_(config.max_jitter_overhead) + , mean_jitter_overhead_(config.mean_jitter_overhead) + , last_target_latency_update_(0) + , lat_update_upper_thrsh_(config.latency_decrease_relative_threshold_) + , lat_update_dec_step_( + upper_coef_to_step_lat_update_(config.latency_decrease_relative_threshold_)) + , lat_update_inc_step_( + lower_thrs_to_step_lat_update_(config.latency_decrease_relative_threshold_)) + , last_lat_limiter_(LogInterval) + , dumper_(dumper) , init_status_(status::NoStatus) { roc_log(LogDebug, "latency tuner: initializing:" - " target_latency=%ld(%.3fms) latency_tolerance=%ld(%.3fms)" - " stale_tolerance=%ld(%.3fms)" - " scaling_interval=%ld(%.3fms) scaling_tolerance=%f" - " backend=%s profile=%s", + " target_latency=%ld(%.3fms) start_latency=%ld(%.3fms)" + " min_latency=%ld(%.3fms) max_latency=%ld(%.3fms)" + " latency_tolerance=%ld(%.3fms)" + " latency_upper_limit_coef=%f", (long)sample_spec_.ns_2_stream_timestamp_delta(config.target_latency), (double)config.target_latency / core::Millisecond, + (long)sample_spec_.ns_2_stream_timestamp_delta(config.start_latency), + (double)config.start_latency / core::Millisecond, + (long)sample_spec_.ns_2_stream_timestamp_delta(config.min_latency), + (double)config.min_latency / core::Millisecond, + (long)sample_spec_.ns_2_stream_timestamp_delta(config.max_latency), + (double)config.max_latency / core::Millisecond, (long)sample_spec_.ns_2_stream_timestamp_delta(config.latency_tolerance), (double)config.latency_tolerance / core::Millisecond, + (double)config.latency_decrease_relative_threshold_); + + roc_log(LogDebug, + "latency tuner: initializing:" + " stale_tolerance=%ld(%.3fms)" + " scaling_interval=%ld(%.3fms) scaling_tolerance=%f" + " backend=%s profile=%s tuning=%s", (long)sample_spec_.ns_2_stream_timestamp_delta(config.stale_tolerance), (double)config.stale_tolerance / core::Millisecond, (long)sample_spec_.ns_2_stream_timestamp_delta(config.scaling_interval), (double)config.scaling_interval / core::Millisecond, (double)config.scaling_tolerance, latency_tuner_backend_to_str(backend_), - latency_tuner_profile_to_str(profile_)); + latency_tuner_profile_to_str(profile_), + enable_tuning_ ? "enabled" : "disabled"); if (config.target_latency < 0) { roc_log(LogError, "latency tuner: invalid config:" - " target_latency should be set to non-zero value"); + " target_latency should not be negative"); init_status_ = status::StatusBadConfig; return; } + if (config.start_latency < 0) { + roc_log(LogError, + "latency tuner: invalid config:" + " start_latency should not be negative"); + return; + } + + if (config.start_latency > 0 && config.target_latency > 0) { + roc_log(LogError, + "latency tuner: invalid config:" + " start_latency and target_latency must not be positive altogether"); + return; + } + if (enable_bounds_ || enable_tuning_) { - target_latency_ = sample_spec_.ns_2_stream_timestamp_delta(config.target_latency); + target_latency_ = sample_spec_.ns_2_stream_timestamp_delta( + auto_tune_ ? config.start_latency : config.target_latency); - if (config.target_latency <= 0 || target_latency_ <= 0) { + if (target_latency_ <= 0) { roc_log(LogError, "latency tuner: invalid config: target_latency is invalid:" " target_latency=%ld(%.3fms)", @@ -202,7 +302,47 @@ LatencyTuner::LatencyTuner(const LatencyConfig& config, const SampleSpec& sample return; } - if (enable_bounds_) { + if (enable_bounds_ && auto_tune_) { + min_latency_ = sample_spec_.ns_2_stream_timestamp_delta(config.min_latency); + max_latency_ = sample_spec_.ns_2_stream_timestamp_delta(config.max_latency); + max_stalling_ = + sample_spec_.ns_2_stream_timestamp_delta(config.stale_tolerance); + + if ((float)target_latency_ * lat_update_upper_thrsh_ > (float)max_latency_ + && enable_tuning_) { + roc_log( + LogError, + "latency tuner: invalid config: upper threshold coefficient is" + " out of bounds: " + " target_latency * %f = %ld(%.3fms)" + " min_latency=%ld(%.3fms) max_latency=%ld(%.3fms)", + (double)lat_update_upper_thrsh_, + (long)sample_spec_.ns_2_stream_timestamp_delta( + (packet::stream_source_t)(target_latency_ + * lat_update_upper_thrsh_)), + (double)(target_latency_ * lat_update_upper_thrsh_) + / core::Millisecond, + (long)sample_spec_.ns_2_stream_timestamp_delta(config.min_latency), + (double)config.min_latency / core::Millisecond, + (long)sample_spec_.ns_2_stream_timestamp_delta(config.max_latency), + (double)config.max_latency / core::Millisecond); + } + + if (target_latency_ < min_latency_ || target_latency_ > max_latency_) { + roc_log( + LogError, + "latency tuner: invalid config: target_latency is out of bounds:" + " target_latency=%ld(%.3fms)" + " min_latency=%ld(%.3fms) max_latency=%ld(%.3fms)", + (long)sample_spec_.ns_2_stream_timestamp_delta(target_latency_), + (double)config.target_latency / core::Millisecond, + (long)sample_spec_.ns_2_stream_timestamp_delta(config.min_latency), + (double)config.min_latency / core::Millisecond, + (long)sample_spec_.ns_2_stream_timestamp_delta(config.max_latency), + (double)config.max_latency / core::Millisecond); + return; + } + } else if (enable_bounds_ && !auto_tune_) { min_latency_ = sample_spec_.ns_2_stream_timestamp_delta( config.target_latency - config.latency_tolerance); max_latency_ = sample_spec_.ns_2_stream_timestamp_delta( @@ -248,17 +388,24 @@ LatencyTuner::LatencyTuner(const LatencyConfig& config, const SampleSpec& sample return; } - fe_.reset(new (fe_) - FreqEstimator(profile_ == LatencyTunerProfile_Responsive - ? FreqEstimatorProfile_Responsive - : FreqEstimatorProfile_Gradual, - (packet::stream_timestamp_t)target_latency_)); + if (auto_tune_ && config.latency_decrease_relative_threshold_ < 0) { + roc_log(LogError, + "latency tuner: invalid config: upper threshold coef is negative:" + " latency_decrease_relative_threshold_=%f", + (double)config.latency_decrease_relative_threshold_); + } + + fe_.reset(new (fe_) FreqEstimator(profile_ == LatencyTunerProfile_Responsive + ? FreqEstimatorProfile_Responsive + : FreqEstimatorProfile_Gradual, + (packet::stream_timestamp_t)target_latency_, + dumper_)); if (!fe_) { init_status_ = status::StatusNoMem; return; } } - } + } // enable_bounds_ || enable_tuning_ init_status_ = status::StatusOK; } @@ -286,10 +433,24 @@ void LatencyTuner::write_metrics(const LatencyMetrics& latency_metrics, has_e2e_latency_ = true; } - if (link_metrics.jitter > 0 || has_jitter_) { - jitter_ = sample_spec_.ns_2_stream_timestamp_delta(link_metrics.jitter); - has_jitter_ = true; + if (enable_tuning_) { + update_target_latency_(link_metrics.max_jitter, link_metrics.jitter, + latency_metrics.fec_block_duration); + } + + if (dumper_) { + core::CsvEntry e; + e.type = 't'; + e.n_fields = 3; + e.fields[0] = core::timestamp(core::ClockUnix); + e.fields[1] = niq_latency_; + e.fields[2] = target_latency_; + dumper_->write(e); } + + latency_metrics_ = latency_metrics; + link_metrics_ = link_metrics; + has_metrics_ = true; } bool LatencyTuner::update_stream() { @@ -332,6 +493,10 @@ bool LatencyTuner::update_stream() { void LatencyTuner::advance_stream(packet::stream_timestamp_t duration) { roc_panic_if(init_status_ != status::StatusOK); + if (last_target_latency_update_ == 0) { + last_target_latency_update_ = core::timestamp(core::ClockMonotonic); + } + stream_pos_ += duration; report_(); @@ -402,7 +567,7 @@ void LatencyTuner::compute_scaling_(packet::stream_timestamp_diff_t latency) { } while (stream_pos_ >= scale_pos_) { - fe_->update((packet::stream_timestamp_t)latency); + fe_->update_current_latency((packet::stream_timestamp_t)latency); scale_pos_ += (packet::stream_timestamp_t)scale_interval_; } @@ -422,18 +587,167 @@ void LatencyTuner::report_() { report_pos_ += (packet::stream_timestamp_t)report_interval_; } - roc_log( - LogDebug, - "latency tuner:" - " e2e_latency=%ld(%.3fms) niq_latency=%ld(%.3fms) target_latency=%ld(%.3fms)" - " jitter=%ld(%.3fms) stale=%ld(%.3fms)" - " fe=%.6f eff_fe=%.6f", - (long)e2e_latency_, sample_spec_.stream_timestamp_delta_2_ms(e2e_latency_), - (long)niq_latency_, sample_spec_.stream_timestamp_delta_2_ms(niq_latency_), - (long)target_latency_, sample_spec_.stream_timestamp_delta_2_ms(target_latency_), - (long)jitter_, sample_spec_.stream_timestamp_delta_2_ms(jitter_), - (long)niq_stalling_, sample_spec_.stream_timestamp_delta_2_ms(niq_stalling_), - (double)(fe_ && freq_coeff_ > 0 ? fe_->freq_coeff() : 0), (double)freq_coeff_); + roc_log(LogInfo, + "latency tuner:" + " e2e_latency=%ld(%.3fms) niq_latency=%ld(%.3fms) target_latency=%ld(%.3fms)" + " jitter=%.3fms stale=%ld(%.3fms)" + " fe=%.6f eff_fe=%.6f fe_stable=%s", + (long)e2e_latency_, sample_spec_.stream_timestamp_delta_2_ms(e2e_latency_), + (long)niq_latency_, sample_spec_.stream_timestamp_delta_2_ms(niq_latency_), + (long)target_latency_, + sample_spec_.stream_timestamp_delta_2_ms(target_latency_), + (double)link_metrics_.jitter / core::Millisecond, (long)niq_stalling_, + sample_spec_.stream_timestamp_delta_2_ms(niq_stalling_), + (double)(fe_ && freq_coeff_ > 0 ? fe_->freq_coeff() : 0), (double)freq_coeff_, + fe_ && fe_->is_stable() ? "true" : "false"); + + if (has_metrics_) { + roc_log(LogDebug, + "latency tuner:" + " cum_loss=%ld jitter=%.1fms" + " running_jitter(Max/Min)=%.1f/%.1fms" + " expected_packets=%ld", + (long)link_metrics_.lost_packets, + (double)link_metrics_.jitter / core::Millisecond, + (double)link_metrics_.max_jitter / core::Millisecond, + (double)link_metrics_.min_jitter / core::Millisecond, + (long)link_metrics_.expected_packets); + roc_log(LogDebug, "latency tuner: fec block duration=%.1fms", + (double)latency_metrics_.fec_block_duration / core::Millisecond); + } + + if (sample_spec_.ns_2_stream_timestamp_delta(latency_metrics_.fec_block_duration) + >= max_latency_) { + roc_log(LogInfo, + "latency tuner: FEC block %.1fms is longer than the max " + "limit for latency %d(%.1fms)", + (double)latency_metrics_.fec_block_duration / core::Millisecond, + max_latency_, (double)max_latency_ / core::Millisecond); + } +} + +// Decides if the latency should be adjusted and orders fe_ to do so if needed. +// +// 1. Decides to decrease latency if current value is greater than upper threshold, +// The target latency is supposed to change smoothely, so we just cut the current +// latency value by some percentage. +// +// 2. Decides to increase latency if it is lesser than lower threshold (which +// could be close or equal to target latency itself). +// This could/should be done effectively as it could possibly mean that the user +// is already perceives some losses. +// +// NB: After the increasement the new latency target value must not be greater than +// upper threshold in any circumstances. +// +// +void LatencyTuner::update_target_latency_(const core::nanoseconds_t max_jitter_ns, + const core::nanoseconds_t mean_jitter_ns, + const core::nanoseconds_t fec_block_ns) { + const core::nanoseconds_t now = core::timestamp(core::ClockMonotonic); + + // If there is no active timeout, check if evaluated target latency is + // significantly smaller than the latency in action so that we could decrease it. + if (target_latency_state_ == TL_NONE) { + // Here we estimate what would be the perfect latency for this moment based on + // jitter statistics. Later we'll use this value only for decision making if it + // worth changing or we rather keep the current latency target untouched. + const core::nanoseconds_t estimate = std::max( + std::max((core::nanoseconds_t)(max_jitter_ns * max_jitter_overhead_), + (core::nanoseconds_t)(mean_jitter_ns * mean_jitter_overhead_)), + fec_block_ns); + const core::nanoseconds_t cur_tl_ns = + sample_spec_.stream_timestamp_delta_2_ns(target_latency_); + if (estimate < cur_tl_ns && estimate * lat_update_upper_thrsh_ < cur_tl_ns + && fe_->is_stable()) { + try_decrease_latency_(estimate, now, cur_tl_ns); + } else if (estimate > cur_tl_ns) { + // If evaluated target latency is greater, than we must increase it. + try_increase_latency_(estimate, now, cur_tl_ns); + } + } else if (target_latency_state_ == TL_COOLDOWN_AFTER_DEC + && now - last_target_latency_update_ > cooldown_dec_timeout_) { + // Waiting the timeout since the last decreasement. + target_latency_state_ = TL_NONE; + // Waiting the timeout since the startup. + } else if (target_latency_state_ == TL_STARTING + && (last_target_latency_update_ == 0 + || (now - last_target_latency_update_ > starting_timeout_))) { + target_latency_state_ = TL_NONE; + + // Waiting the timeout since the last increasement. + } else if (target_latency_state_ == TL_COOLDOWN_AFTER_INC + && now - last_target_latency_update_ > cooldown_inc_timeout_) { + target_latency_state_ = TL_NONE; + } +} +void LatencyTuner::try_increase_latency_(const core::nanoseconds_t estimate, + const core::nanoseconds_t now, + const core::nanoseconds_t cur_tl_ns) { + const core::nanoseconds_t new_tl_ns = + (core::nanoseconds_t)(estimate * lat_update_inc_step_); + packet::stream_timestamp_diff_t new_tl_ts = + sample_spec_.ns_2_stream_timestamp_delta(new_tl_ns); + + if (new_tl_ts > max_latency_) { + if (last_lat_limiter_.allow()) { + roc_log(LogDebug, + "latency tuner:" + " capping target latency %ld(%.3fms)" + " as max limit is lower %ld(%.3fms)", + (long)new_tl_ts, (double)new_tl_ns / core::Millisecond, + (long)max_latency_, + sample_spec_.stream_timestamp_delta_2_ms(max_latency_)); + } + new_tl_ts = max_latency_; + } + + roc_log(LogNote, + "latency tuner:" + " increasing target latency %ld(%.3fms) → %ld(%.3fms)", + (long)target_latency_, (double)cur_tl_ns / core::Millisecond, (long)new_tl_ts, + (double)new_tl_ns / core::Millisecond); + + target_latency_ = new_tl_ts; + last_target_latency_update_ = now; + target_latency_state_ = TL_COOLDOWN_AFTER_INC; + fe_->update_target_latency((packet::stream_timestamp_t)target_latency_); +} + +void LatencyTuner::try_decrease_latency_(const core::nanoseconds_t estimate, + const core::nanoseconds_t now, + const core::nanoseconds_t cur_tl_ns) { + const core::nanoseconds_t new_tl_ns = + (core::nanoseconds_t)(cur_tl_ns * lat_update_dec_step_); + const packet::stream_timestamp_diff_t new_tl_ts = + sample_spec_.ns_2_stream_timestamp_delta(new_tl_ns); + if (new_tl_ts < min_latency_) { + if (last_lat_limiter_.allow()) { + roc_log(LogDebug, + "latency tuner:" + " not decreasing target latency lower than limit %ld(%.3fms)", + (long)min_latency_, + sample_spec_.stream_timestamp_delta_2_ms(min_latency_)); + } + } else { + roc_log(LogNote, + "latency tuner:" + " decreasing target latency %ld(%.3fms) → %ld(%.3fms)", + (long)target_latency_, (double)cur_tl_ns / core::Millisecond, + (long)new_tl_ts, (double)new_tl_ns / core::Millisecond); + roc_log(LogDebug, + "latency tuner:" + "\testimate %.3fms * %.3f = %.3fms,\tnew tl %.3fms * %f = %.3fms", + (double)estimate / core::Millisecond, (double)lat_update_upper_thrsh_, + (double)estimate * (double)lat_update_upper_thrsh_ / core::Millisecond, + (double)cur_tl_ns / core::Millisecond, (double)lat_update_dec_step_, + (double)(new_tl_ns / core::Millisecond)); + + target_latency_ = new_tl_ts; + last_target_latency_update_ = now; + target_latency_state_ = TL_COOLDOWN_AFTER_DEC; + fe_->update_target_latency((packet::stream_timestamp_t)target_latency_); + } } const char* latency_tuner_backend_to_str(LatencyTunerBackend backend) { diff --git a/src/internal_modules/roc_audio/latency_tuner.h b/src/internal_modules/roc_audio/latency_tuner.h index d0b1c1267..44e0eb663 100644 --- a/src/internal_modules/roc_audio/latency_tuner.h +++ b/src/internal_modules/roc_audio/latency_tuner.h @@ -14,6 +14,7 @@ #include "roc_audio/freq_estimator.h" #include "roc_audio/sample_spec.h" +#include "roc_core/csv_dumper.h" #include "roc_core/noncopyable.h" #include "roc_core/optional.h" #include "roc_core/time.h" @@ -72,6 +73,11 @@ struct LatencyConfig { //! Defines how smooth is the tuning. LatencyTunerProfile tuner_profile; + //! Number of packets we use to calculate sliding statistics. + //! @remarks + //! We calculate jitter statistics based on this last delivered packets. + size_t sliding_stat_window_length; + //! Target latency. //! @remarks //! Latency tuner will try to keep latency close to this value. @@ -88,6 +94,27 @@ struct LatencyConfig { //! Negative value is an error. core::nanoseconds_t latency_tolerance; + //! Start latency. + //! @remarks + //! In case of dynamic latency the tuner will start from this value. + //! @note + //! This value makes sense only when target_latency is set to 0. + core::nanoseconds_t start_latency; + + //! Minimum allowed latency. + //! @remarks + //! If the latency goes out of bounds, the session is terminated. + //! @note + //! If both min_latency and max_latency are zero, defaults are used. + core::nanoseconds_t min_latency; + + //! Maximum allowed latency. + //! @remarks + //! If the latency goes out of bounds, the session is terminated. + //! @note + //! If both min_latency and max_latency are zero, defaults are used. + core::nanoseconds_t max_latency; + //! Maximum delay since last packet before queue is considered stalling. //! @remarks //! If niq_stalling becomes larger than stalling_tolerance, latency @@ -114,19 +141,70 @@ struct LatencyConfig { //! Negative value is an error. float scaling_tolerance; + //! Latency tuner decides to adjust target latency if + //! the current value >= estimated optimal latency * + //! latency_decrease_relative_threshold_. + float latency_decrease_relative_threshold_; + + //! Latency tuner does not adjusts latency for this amount of time from + //! the very beginning. + core::nanoseconds_t starting_timeout; + + //! Latency tuner does not adjusts latency for this amount of time from + //! the last decreasment. + core::nanoseconds_t cooldown_dec_timeout; + + //! Latency tuner does not adjusts latency for this amount of time from + //! the last increasement. + core::nanoseconds_t cooldown_inc_timeout; + + //! Latency tuner estimates an expected latency for the current jitter statistics + //! which is then used for decision if it should engage a regulator to adjust it. + //! estimation = MAX(max_jitter * max_jitter_overhead, + //! mean_jitter * mean_jitter_overhead); + float max_jitter_overhead; + + //! Latency tuner estimates an expected latency for the current jitter statistics + //! which is then used for decision if it should engage a regulator to adjust it. + //! estimation = MAX(max_jitter * max_jitter_overhead, + //! mean_jitter * mean_jitter_overhead); + float mean_jitter_overhead; + //! Initialize. LatencyConfig() : tuner_backend(LatencyTunerBackend_Default) , tuner_profile(LatencyTunerProfile_Default) + , sliding_stat_window_length(0) , target_latency(0) , latency_tolerance(0) + , start_latency(0) + , min_latency(0) + , max_latency(0) , stale_tolerance(0) , scaling_interval(0) - , scaling_tolerance(0) { + , scaling_tolerance(0) + , latency_decrease_relative_threshold_(1.7f) + , starting_timeout(5 * core::Second) + , cooldown_dec_timeout(5 * core::Second) + , cooldown_inc_timeout(15 * core::Second) + , max_jitter_overhead(5 * core::Second) + , mean_jitter_overhead(1.15f) { } //! Automatically fill missing settings. void deduce_defaults(core::nanoseconds_t default_target_latency, bool is_receiver); + //! Computes latency tolerance based on requested latency value. + //! @remarks + //! This formula returns target_latency * N, where N starts with larger + //! number and approaches 0.5 as target_latency grows. + //! By default we're very tolerant and allow rather big oscillations. + //! Examples (for multiplier = 1): + //! target=1ms -> tolerance=8ms (x8) + //! target=10ms -> tolerance=20ms (x2) + //! target=200ms -> tolerance=200ms (x1) + //! target=2000ms -> tolerance=1444ms (x0.722) + core::nanoseconds_t calc_latency_tolerance(const core::nanoseconds_t latency, + const int multiplier) const; }; //! Latency-related metrics. @@ -171,7 +249,9 @@ struct LatencyMetrics { class LatencyTuner : public core::NonCopyable<> { public: //! Initialize. - LatencyTuner(const LatencyConfig& config, const SampleSpec& sample_spec); + LatencyTuner(const LatencyConfig& config, + const SampleSpec& sample_spec, + core::CsvDumper* dumper); //! Check if the object was successfully constructed. status::StatusCode init_status() const; @@ -207,6 +287,10 @@ class LatencyTuner : public core::NonCopyable<> { bool check_bounds_(packet::stream_timestamp_diff_t latency); void compute_scaling_(packet::stream_timestamp_diff_t latency); void report_(); + // Decides if the latency should be adjusted and orders fe_ to do so if needed. + void update_target_latency_(core::nanoseconds_t max_jitter_ns, + core::nanoseconds_t mean_jitter_ns, + core::nanoseconds_t fec_block_ns); core::Optional fe_; @@ -235,9 +319,11 @@ class LatencyTuner : public core::NonCopyable<> { bool has_e2e_latency_; packet::stream_timestamp_diff_t e2e_latency_; - bool has_jitter_; - packet::stream_timestamp_diff_t jitter_; + bool has_metrics_; + LatencyMetrics latency_metrics_; + packet::LinkMetrics link_metrics_; + const bool auto_tune_; packet::stream_timestamp_diff_t target_latency_; packet::stream_timestamp_diff_t min_latency_; packet::stream_timestamp_diff_t max_latency_; @@ -245,7 +331,34 @@ class LatencyTuner : public core::NonCopyable<> { const SampleSpec sample_spec_; + enum TargetLatencyState { + TL_NONE, + TL_STARTING, + TL_COOLDOWN_AFTER_INC, + TL_COOLDOWN_AFTER_DEC + } target_latency_state_; + const core::nanoseconds_t starting_timeout_; + const core::nanoseconds_t cooldown_dec_timeout_; + const core::nanoseconds_t cooldown_inc_timeout_; + const float max_jitter_overhead_; + const float mean_jitter_overhead_; + + core::nanoseconds_t last_target_latency_update_; + const float lat_update_upper_thrsh_; + const float lat_update_dec_step_; + const float lat_update_inc_step_; + + core::RateLimiter last_lat_limiter_; + + core::CsvDumper* dumper_; + status::StatusCode init_status_; + void try_decrease_latency_(const core::nanoseconds_t estimate, + const core::nanoseconds_t now, + const core::nanoseconds_t cur_tl_ns); + void try_increase_latency_(const core::nanoseconds_t estimate, + const core::nanoseconds_t now, + const core::nanoseconds_t cur_tl_ns); }; //! Get string name of latency backend. diff --git a/src/internal_modules/roc_core/csv_dumper.cpp b/src/internal_modules/roc_core/csv_dumper.cpp index 4aeccc63f..45cda6e8e 100644 --- a/src/internal_modules/roc_core/csv_dumper.cpp +++ b/src/internal_modules/roc_core/csv_dumper.cpp @@ -14,11 +14,11 @@ namespace roc { namespace core { -CsvDumper::CsvDumper(const char* path, const CsvConfig& config, IArena& arena) +CsvDumper::CsvDumper(const CsvConfig& config, IArena& arena) : config_(config) , ringbuf_(arena, config.max_queued) , valid_(false) { - if (!open_(path)) { + if (!config.dump_file || !open_(config.dump_file)) { return; } valid_ = true; @@ -167,5 +167,9 @@ bool CsvDumper::dump_(const CsvEntry& entry) { return true; } +bool CsvDumper::is_valid() const { + return valid_; +} + } // namespace core } // namespace roc diff --git a/src/internal_modules/roc_core/csv_dumper.h b/src/internal_modules/roc_core/csv_dumper.h index ea5ca8f59..bdc794794 100644 --- a/src/internal_modules/roc_core/csv_dumper.h +++ b/src/internal_modules/roc_core/csv_dumper.h @@ -43,6 +43,10 @@ struct CsvEntry { //! CSV write configuration. struct CsvConfig { + //! Path to the output CSV file. + //! Can't be null. + const char* dump_file; + //! Maximum number of queued entries. //! If queue becomes larger, entries are dropped. size_t max_queued; @@ -53,7 +57,8 @@ struct CsvConfig { nanoseconds_t max_interval; CsvConfig() - : max_queued(1000) + : dump_file(NULL) + , max_queued(1000) , max_interval(Millisecond) { } }; @@ -66,7 +71,7 @@ class CsvDumper : public Thread { //! Open file. //! @p path - output file. //! @p max_interval - maximum number of writes per second for each entry type. - CsvDumper(const char* path, const CsvConfig& config, IArena& arena); + CsvDumper(const CsvConfig& config, IArena& arena); //! Close file. ~CsvDumper(); diff --git a/src/internal_modules/roc_packet/ilink_meter.h b/src/internal_modules/roc_packet/ilink_meter.h index 0198f88f3..55635b16d 100644 --- a/src/internal_modules/roc_packet/ilink_meter.h +++ b/src/internal_modules/roc_packet/ilink_meter.h @@ -33,10 +33,8 @@ struct LinkMetrics { //! count of seqnum cycles. packet::ext_seqnum_t ext_last_seqnum; - //! Total amount of packets sent or expected to be received. - //! On sender, this counter is just incremented every packet. - //! On receiver, it is derived from seqnums. - uint64_t total_packets; + //! Total amount of packets that receiver expects to be delivered. + uint64_t expected_packets; //! Cumulative count of lost packets. //! The total number of RTP data packets that have been lost since the beginning @@ -46,11 +44,31 @@ struct LinkMetrics { //! and the loss may be negative if there are duplicates. int64_t lost_packets; + //! Cumulate count of recovered packets. + //! How many packets lost packets receiver was able to recover + //! by FEC. The sender is not getting this metric so far. + uint64_t recovered_packets; + //! Estimated interarrival jitter. //! An estimate of the statistical variance of the RTP data packet //! interarrival time. + //! @note + //! This value is calculated on sliding window on a receiver side and sender + //! side gets this value via RTCP. core::nanoseconds_t jitter; + //! Running max of jitter. + //! @note + //! This value is calculated on sliding window on a receiver side and it is not + //! available on sender. + core::nanoseconds_t max_jitter; + + //! Running min of jitter. + //! @note + //! This value is calculated on sliding window on a receiver side and it is not + //! available on sender. + core::nanoseconds_t min_jitter; + //! Estimated round-trip time between sender and receiver. //! Computed based on NTP-like timestamp exchange implemennted by RTCP protocol. //! Read-only field. You can read it on sender, but you should not set @@ -60,9 +78,12 @@ struct LinkMetrics { LinkMetrics() : ext_first_seqnum(0) , ext_last_seqnum(0) - , total_packets(0) + , expected_packets(0) , lost_packets(0) + , recovered_packets(0) , jitter(0) + , max_jitter(0) + , min_jitter(0) , rtt(0) { } }; diff --git a/src/internal_modules/roc_pipeline/config.cpp b/src/internal_modules/roc_pipeline/config.cpp index 6f2246a0a..71de60b79 100644 --- a/src/internal_modules/roc_pipeline/config.cpp +++ b/src/internal_modules/roc_pipeline/config.cpp @@ -19,7 +19,8 @@ SenderSinkConfig::SenderSinkConfig() , enable_cpu_clock(false) , enable_auto_cts(false) , enable_profiling(false) - , enable_interleaving(false) { + , enable_interleaving(false) + , dump_file(NULL) { } void SenderSinkConfig::deduce_defaults(audio::ProcessorMap& processor_map) { diff --git a/src/internal_modules/roc_pipeline/config.h b/src/internal_modules/roc_pipeline/config.h index 9340aca9c..4d58ee48b 100644 --- a/src/internal_modules/roc_pipeline/config.h +++ b/src/internal_modules/roc_pipeline/config.h @@ -102,7 +102,10 @@ struct SenderSinkConfig { //! Interleave packets. bool enable_interleaving; - //! Initialize config. + //! File to a dump file in csv format with some run-time metrics. + const char* dump_file; + + // Initialize config. SenderSinkConfig(); //! Fill unset values with defaults. @@ -141,6 +144,9 @@ struct ReceiverCommonConfig { //! Profile moving average of frames being written. bool enable_profiling; + //! Parameters for a logger in csv format with some run-time metrics. + core::CsvConfig dumper; + //! Initialize config. ReceiverCommonConfig(); @@ -179,6 +185,7 @@ struct ReceiverSessionConfig { }; //! Parameters of receiver session. +//! Top-level config, actual settings are stored in sub-configs. struct ReceiverSourceConfig { //! Task processing parameters. PipelineLoopConfig pipeline_loop; diff --git a/src/internal_modules/roc_pipeline/receiver_session.cpp b/src/internal_modules/roc_pipeline/receiver_session.cpp index 72aa3493c..6d073895d 100644 --- a/src/internal_modules/roc_pipeline/receiver_session.cpp +++ b/src/internal_modules/roc_pipeline/receiver_session.cpp @@ -20,9 +20,11 @@ ReceiverSession::ReceiverSession(const ReceiverSessionConfig& session_config, rtp::EncodingMap& encoding_map, packet::PacketFactory& packet_factory, audio::FrameFactory& frame_factory, - core::IArena& arena) + core::IArena& arena, + core::CsvDumper* dumper) : core::RefCounted(arena) , frame_reader_(NULL) + , dumper_(dumper) , init_status_(status::NoStatus) , fail_status_(status::NoStatus) { const rtp::Encoding* pkt_encoding = @@ -51,7 +53,8 @@ ReceiverSession::ReceiverSession(const ReceiverSessionConfig& session_config, } pkt_writer = source_queue_.get(); - source_meter_.reset(new (source_meter_) rtp::LinkMeter(encoding_map)); + source_meter_.reset(new (source_meter_) rtp::LinkMeter( + arena, encoding_map, pkt_encoding->sample_spec, session_config.latency, dumper_)); if ((init_status_ = source_meter_->init_status()) != status::StatusOK) { return; } @@ -87,22 +90,24 @@ ReceiverSession::ReceiverSession(const ReceiverSessionConfig& session_config, pkt_reader = filter_.get(); delayed_reader_.reset(new (delayed_reader_) packet::DelayedReader( - *pkt_reader, session_config.latency.target_latency, pkt_encoding->sample_spec)); + *pkt_reader, + session_config.latency.target_latency != 0 ? session_config.latency.target_latency + : session_config.latency.start_latency, + pkt_encoding->sample_spec)); if ((init_status_ = delayed_reader_->init_status()) != status::StatusOK) { return; } pkt_reader = delayed_reader_.get(); - source_meter_->set_reader(*pkt_reader); - pkt_reader = source_meter_.get(); - if (session_config.fec_decoder.scheme != packet::FEC_None) { repair_queue_.reset(new (repair_queue_) packet::SortedQueue(0)); if ((init_status_ = repair_queue_->init_status()) != status::StatusOK) { return; } - repair_meter_.reset(new (repair_meter_) rtp::LinkMeter(encoding_map)); + repair_meter_.reset(new (repair_meter_) rtp::LinkMeter( + arena, encoding_map, pkt_encoding->sample_spec, session_config.latency, + dumper_)); if ((init_status_ = repair_meter_->init_status()) != status::StatusOK) { return; } @@ -145,9 +150,6 @@ ReceiverSession::ReceiverSession(const ReceiverSessionConfig& session_config, return; } pkt_reader = fec_filter_.get(); - - repair_meter_->set_reader(*pkt_reader); - pkt_reader = repair_meter_.get(); } timestamp_injector_.reset(new (timestamp_injector_) rtp::TimestampInjector( @@ -157,6 +159,9 @@ ReceiverSession::ReceiverSession(const ReceiverSessionConfig& session_config, } pkt_reader = timestamp_injector_.get(); + source_meter_->set_reader(*pkt_reader); + pkt_reader = source_meter_.get(); + // Third part of pipeline: chained frame readers from depacketizer to mixer. // Mixed reads frames from this pipeline, and in the end it requests packets // from packet readers pipeline. @@ -261,7 +266,7 @@ ReceiverSession::ReceiverSession(const ReceiverSessionConfig& session_config, latency_monitor_.reset(new (latency_monitor_) audio::LatencyMonitor( *frm_reader, *source_queue_, *depacketizer_, *source_meter_, fec_reader_.get(), resampler_reader_.get(), session_config.latency, - pkt_encoding->sample_spec, inout_spec)); + pkt_encoding->sample_spec, inout_spec, dumper_)); if ((init_status_ = latency_monitor_->init_status()) != status::StatusOK) { return; } @@ -346,7 +351,7 @@ void ReceiverSession::generate_reports(const char* report_cname, report.sample_rate = source_meter_->encoding().sample_spec.sample_rate(); report.ext_first_seqnum = link_metrics.ext_first_seqnum; report.ext_last_seqnum = link_metrics.ext_last_seqnum; - report.packet_count = link_metrics.total_packets; + report.packet_count = link_metrics.expected_packets; report.cum_loss = link_metrics.lost_packets; report.jitter = link_metrics.jitter; report.niq_latency = latency_metrics.niq_latency; @@ -371,7 +376,7 @@ void ReceiverSession::generate_reports(const char* report_cname, report.sample_rate = repair_meter_->encoding().sample_spec.sample_rate(); report.ext_first_seqnum = link_metrics.ext_first_seqnum; report.ext_last_seqnum = link_metrics.ext_last_seqnum; - report.packet_count = link_metrics.total_packets; + report.packet_count = link_metrics.expected_packets; report.cum_loss = link_metrics.lost_packets; report.jitter = link_metrics.jitter; diff --git a/src/internal_modules/roc_pipeline/receiver_session.h b/src/internal_modules/roc_pipeline/receiver_session.h index 3d5f534ed..22d34ca42 100644 --- a/src/internal_modules/roc_pipeline/receiver_session.h +++ b/src/internal_modules/roc_pipeline/receiver_session.h @@ -25,6 +25,7 @@ #include "roc_audio/processor_map.h" #include "roc_audio/resampler_reader.h" #include "roc_audio/watchdog.h" +#include "roc_core/csv_dumper.h" #include "roc_core/iarena.h" #include "roc_core/list_node.h" #include "roc_core/optional.h" @@ -69,7 +70,8 @@ class ReceiverSession : public core::RefCounted latency_monitor_; + core::CsvDumper* dumper_; + status::StatusCode init_status_; status::StatusCode fail_status_; }; diff --git a/src/internal_modules/roc_pipeline/receiver_session_group.cpp b/src/internal_modules/roc_pipeline/receiver_session_group.cpp index 3b07eed0d..6c9e0e82a 100644 --- a/src/internal_modules/roc_pipeline/receiver_session_group.cpp +++ b/src/internal_modules/roc_pipeline/receiver_session_group.cpp @@ -8,6 +8,7 @@ #include "roc_pipeline/receiver_session_group.h" #include "roc_address/socket_addr_to_str.h" +#include "roc_core/csv_dumper.h" #include "roc_core/log.h" #include "roc_core/panic.h" #include "roc_rtcp/participant_info.h" @@ -24,7 +25,8 @@ ReceiverSessionGroup::ReceiverSessionGroup(const ReceiverSourceConfig& source_co rtp::EncodingMap& encoding_map, packet::PacketFactory& packet_factory, audio::FrameFactory& frame_factory, - core::IArena& arena) + core::IArena& arena, + core::CsvDumper* dumper) : source_config_(source_config) , slot_config_(slot_config) , state_tracker_(state_tracker) @@ -35,6 +37,7 @@ ReceiverSessionGroup::ReceiverSessionGroup(const ReceiverSourceConfig& source_co , packet_factory_(packet_factory) , frame_factory_(frame_factory) , session_router_(arena) + , dumper_(dumper) , init_status_(status::NoStatus) { identity_.reset(new (identity_) rtp::Identity()); if ((init_status_ = identity_->init_status()) != status::StatusOK) { @@ -390,7 +393,7 @@ ReceiverSessionGroup::create_session_(const packet::PacketPtr& packet) { core::SharedPtr sess = new (arena_) ReceiverSession(sess_config, source_config_.common, processor_map_, encoding_map_, - packet_factory_, frame_factory_, arena_); + packet_factory_, frame_factory_, arena_, dumper_); if (!sess) { roc_log(LogError, "session group: can't create session, allocation failed"); diff --git a/src/internal_modules/roc_pipeline/receiver_session_group.h b/src/internal_modules/roc_pipeline/receiver_session_group.h index 58bd6c775..fa33c3ab5 100644 --- a/src/internal_modules/roc_pipeline/receiver_session_group.h +++ b/src/internal_modules/roc_pipeline/receiver_session_group.h @@ -15,6 +15,7 @@ #include "roc_audio/frame_factory.h" #include "roc_audio/mixer.h" #include "roc_audio/processor_map.h" +#include "roc_core/csv_dumper.h" #include "roc_core/iarena.h" #include "roc_core/list.h" #include "roc_core/noncopyable.h" @@ -57,7 +58,8 @@ class ReceiverSessionGroup : public core::NonCopyable<>, private rtcp::IParticip rtp::EncodingMap& encoding_map, packet::PacketFactory& packet_factory, audio::FrameFactory& frame_factory, - core::IArena& arena); + core::IArena& arena, + core::CsvDumper* dumper); ~ReceiverSessionGroup(); @@ -162,6 +164,8 @@ class ReceiverSessionGroup : public core::NonCopyable<>, private rtcp::IParticip sessions_; ReceiverSessionRouter session_router_; + core::CsvDumper* dumper_; + status::StatusCode init_status_; }; diff --git a/src/internal_modules/roc_pipeline/receiver_slot.cpp b/src/internal_modules/roc_pipeline/receiver_slot.cpp index 2a7530ff9..abbd8be5b 100644 --- a/src/internal_modules/roc_pipeline/receiver_slot.cpp +++ b/src/internal_modules/roc_pipeline/receiver_slot.cpp @@ -7,6 +7,7 @@ */ #include "roc_pipeline/receiver_slot.h" +#include "roc_core/csv_dumper.h" #include "roc_core/log.h" #include "roc_pipeline/endpoint_helpers.h" @@ -21,7 +22,8 @@ ReceiverSlot::ReceiverSlot(const ReceiverSourceConfig& source_config, rtp::EncodingMap& encoding_map, packet::PacketFactory& packet_factory, audio::FrameFactory& frame_factory, - core::IArena& arena) + core::IArena& arena, + core::CsvDumper* dumper) : core::RefCounted(arena) , encoding_map_(encoding_map) , state_tracker_(state_tracker) @@ -33,7 +35,8 @@ ReceiverSlot::ReceiverSlot(const ReceiverSourceConfig& source_config, encoding_map, packet_factory, frame_factory, - arena) + arena, + dumper) , init_status_(status::NoStatus) { roc_log(LogDebug, "receiver slot: initializing"); diff --git a/src/internal_modules/roc_pipeline/receiver_slot.h b/src/internal_modules/roc_pipeline/receiver_slot.h index 357eb2ead..dbcf10067 100644 --- a/src/internal_modules/roc_pipeline/receiver_slot.h +++ b/src/internal_modules/roc_pipeline/receiver_slot.h @@ -17,6 +17,7 @@ #include "roc_audio/frame_factory.h" #include "roc_audio/mixer.h" #include "roc_audio/processor_map.h" +#include "roc_core/csv_dumper.h" #include "roc_core/iarena.h" #include "roc_core/list_node.h" #include "roc_core/ref_counted.h" @@ -47,7 +48,8 @@ class ReceiverSlot : public core::RefCountedstart()) { + init_status_ = status::StatusErrFile; + return; + } + } + audio::IFrameReader* frm_reader = NULL; { @@ -76,6 +84,13 @@ ReceiverSource::ReceiverSource(const ReceiverSourceConfig& source_config, init_status_ = status::StatusOK; } +ReceiverSource::~ReceiverSource() { + if (dumper_ && dumper_->is_valid() && dumper_->is_joinable()) { + dumper_->stop(); + dumper_->join(); + } +} + status::StatusCode ReceiverSource::init_status() const { return init_status_; } @@ -85,9 +100,9 @@ ReceiverSlot* ReceiverSource::create_slot(const ReceiverSlotConfig& slot_config) roc_log(LogInfo, "receiver source: adding slot"); - core::SharedPtr slot = new (arena_) - ReceiverSlot(source_config_, slot_config, state_tracker_, *mixer_, processor_map_, - encoding_map_, packet_factory_, frame_factory_, arena_); + core::SharedPtr slot = new (arena_) ReceiverSlot( + source_config_, slot_config, state_tracker_, *mixer_, processor_map_, + encoding_map_, packet_factory_, frame_factory_, arena_, dumper_.get()); if (!slot) { roc_log(LogError, "receiver source: can't create slot, allocation failed"); diff --git a/src/internal_modules/roc_pipeline/receiver_source.h b/src/internal_modules/roc_pipeline/receiver_source.h index b63887178..00fa0a3ae 100644 --- a/src/internal_modules/roc_pipeline/receiver_source.h +++ b/src/internal_modules/roc_pipeline/receiver_source.h @@ -18,6 +18,7 @@ #include "roc_audio/pcm_mapper_reader.h" #include "roc_audio/processor_map.h" #include "roc_audio/profiling_reader.h" +#include "roc_core/csv_dumper.h" #include "roc_core/iarena.h" #include "roc_core/optional.h" #include "roc_core/stddefs.h" @@ -53,6 +54,8 @@ class ReceiverSource : public sndio::ISource, public core::NonCopyable<> { core::IPool& frame_buffer_pool, core::IArena& arena); + ~ReceiverSource(); + //! Check if the pipeline was successfully constructed. status::StatusCode init_status() const; @@ -132,6 +135,8 @@ class ReceiverSource : public sndio::ISource, public core::NonCopyable<> { StateTracker state_tracker_; + core::Optional dumper_; + core::Optional mixer_; core::Optional profiler_; core::Optional pcm_mapper_; diff --git a/src/internal_modules/roc_pipeline/sender_session.cpp b/src/internal_modules/roc_pipeline/sender_session.cpp index 65d015df6..f9c2c5d9a 100644 --- a/src/internal_modules/roc_pipeline/sender_session.cpp +++ b/src/internal_modules/roc_pipeline/sender_session.cpp @@ -19,7 +19,8 @@ SenderSession::SenderSession(const SenderSinkConfig& sink_config, rtp::EncodingMap& encoding_map, packet::PacketFactory& packet_factory, audio::FrameFactory& frame_factory, - core::IArena& arena) + core::IArena& arena, + core::CsvDumper* dumper) : arena_(arena) , sink_config_(sink_config) , processor_map_(processor_map) @@ -27,6 +28,7 @@ SenderSession::SenderSession(const SenderSinkConfig& sink_config, , packet_factory_(packet_factory) , frame_factory_(frame_factory) , frame_writer_(NULL) + , dumper_(dumper) , init_status_(status::NoStatus) , fail_status_(status::NoStatus) { identity_.reset(new (identity_) rtp::Identity()); @@ -211,7 +213,7 @@ SenderSession::create_transport_pipeline(SenderEndpoint* source_endpoint, feedback_monitor_.reset(new (feedback_monitor_) audio::FeedbackMonitor( *frm_writer, *packetizer_, resampler_writer_.get(), sink_config_.feedback, - sink_config_.latency, inout_spec)); + sink_config_.latency, inout_spec, dumper_)); if ((status = feedback_monitor_->init_status()) != status::StatusOK) { return status; } @@ -372,7 +374,7 @@ SenderSession::notify_send_stream(packet::stream_source_t recv_source_id, packet::LinkMetrics link_metrics; link_metrics.ext_first_seqnum = recv_report.ext_first_seqnum; link_metrics.ext_last_seqnum = recv_report.ext_last_seqnum; - link_metrics.total_packets = recv_report.packet_count; + link_metrics.expected_packets = recv_report.packet_count; link_metrics.lost_packets = recv_report.cum_loss; link_metrics.jitter = recv_report.jitter; link_metrics.rtt = recv_report.rtt; diff --git a/src/internal_modules/roc_pipeline/sender_session.h b/src/internal_modules/roc_pipeline/sender_session.h index a196113ea..8de48888b 100644 --- a/src/internal_modules/roc_pipeline/sender_session.h +++ b/src/internal_modules/roc_pipeline/sender_session.h @@ -60,7 +60,8 @@ class SenderSession : public core::NonCopyable<>, rtp::EncodingMap& encoding_map, packet::PacketFactory& packet_factory, audio::FrameFactory& frame_factory, - core::IArena& arena); + core::IArena& arena, + core::CsvDumper* dumper); //! Check if the pipeline was successfully constructed. status::StatusCode init_status() const; @@ -169,6 +170,8 @@ class SenderSession : public core::NonCopyable<>, audio::IFrameWriter* frame_writer_; + core::CsvDumper* dumper_; + status::StatusCode init_status_; status::StatusCode fail_status_; }; diff --git a/src/internal_modules/roc_pipeline/sender_sink.cpp b/src/internal_modules/roc_pipeline/sender_sink.cpp index cd4704e6f..101def8ce 100644 --- a/src/internal_modules/roc_pipeline/sender_sink.cpp +++ b/src/internal_modules/roc_pipeline/sender_sink.cpp @@ -29,6 +29,7 @@ SenderSink::SenderSink(const SenderSinkConfig& sink_config, , frame_factory_(frame_pool, frame_buffer_pool) , arena_(arena) , frame_writer_(NULL) + , dumper_config_() , init_status_(status::NoStatus) { sink_config_.deduce_defaults(processor_map); @@ -68,6 +69,13 @@ SenderSink::SenderSink(const SenderSinkConfig& sink_config, frm_writer = profiler_.get(); } + if (sink_config_.dump_file) { + dumper_.reset(new (dumper_) core::CsvDumper(dumper_config_, arena)); + if (!dumper_->start()) { + return; + } + } + frame_writer_ = frm_writer; init_status_ = status::StatusOK; } @@ -81,9 +89,9 @@ SenderSlot* SenderSink::create_slot(const SenderSlotConfig& slot_config) { roc_log(LogInfo, "sender sink: adding slot"); - core::SharedPtr slot = new (arena_) - SenderSlot(sink_config_, slot_config, state_tracker_, processor_map_, - encoding_map_, *fanout_, packet_factory_, frame_factory_, arena_); + core::SharedPtr slot = new (arena_) SenderSlot( + sink_config_, slot_config, state_tracker_, processor_map_, encoding_map_, + *fanout_, packet_factory_, frame_factory_, arena_, dumper_.get()); if (!slot) { roc_log(LogError, "sender sink: can't create slot, allocation failed"); diff --git a/src/internal_modules/roc_pipeline/sender_sink.h b/src/internal_modules/roc_pipeline/sender_sink.h index 1b1b96847..5904df876 100644 --- a/src/internal_modules/roc_pipeline/sender_sink.h +++ b/src/internal_modules/roc_pipeline/sender_sink.h @@ -132,6 +132,9 @@ class SenderSink : public sndio::ISink, public core::NonCopyable<> { audio::IFrameWriter* frame_writer_; + const core::CsvConfig dumper_config_; + core::Optional dumper_; + status::StatusCode init_status_; }; diff --git a/src/internal_modules/roc_pipeline/sender_slot.cpp b/src/internal_modules/roc_pipeline/sender_slot.cpp index a7d9b3703..9ac3e0e6b 100644 --- a/src/internal_modules/roc_pipeline/sender_slot.cpp +++ b/src/internal_modules/roc_pipeline/sender_slot.cpp @@ -22,13 +22,19 @@ SenderSlot::SenderSlot(const SenderSinkConfig& sink_config, audio::Fanout& fanout, packet::PacketFactory& packet_factory, audio::FrameFactory& frame_factory, - core::IArena& arena) + core::IArena& arena, + core::CsvDumper* dumper) : core::RefCounted(arena) , sink_config_(sink_config) , fanout_(fanout) , state_tracker_(state_tracker) - , session_( - sink_config, processor_map, encoding_map, packet_factory, frame_factory, arena) + , session_(sink_config, + processor_map, + encoding_map, + packet_factory, + frame_factory, + arena, + dumper) , init_status_(status::NoStatus) { roc_log(LogDebug, "sender slot: initializing"); diff --git a/src/internal_modules/roc_pipeline/sender_slot.h b/src/internal_modules/roc_pipeline/sender_slot.h index d2c34f836..db811b5f4 100644 --- a/src/internal_modules/roc_pipeline/sender_slot.h +++ b/src/internal_modules/roc_pipeline/sender_slot.h @@ -48,7 +48,8 @@ class SenderSlot : public core::RefCounted, audio::Fanout& fanout, packet::PacketFactory& packet_factory, audio::FrameFactory& frame_factory, - core::IArena& arena); + core::IArena& arena, + core::CsvDumper* dumper); ~SenderSlot(); diff --git a/src/internal_modules/roc_rtcp/loss_estimator.h b/src/internal_modules/roc_rtcp/loss_estimator.h index c1fe0e997..c0d0ba486 100644 --- a/src/internal_modules/roc_rtcp/loss_estimator.h +++ b/src/internal_modules/roc_rtcp/loss_estimator.h @@ -24,7 +24,7 @@ class LossEstimator { LossEstimator(); //! Update and return fractional loss ration since previous update. - //! @p total_packets defines total count of packets expected. + //! @p expected_packets defines total count of packets expected. //! @p lost_packets defines count of packets not received, //! probably negative dues to duplicates. float update(uint64_t total_packets, int64_t lost_packets); diff --git a/src/internal_modules/roc_rtp/link_meter.cpp b/src/internal_modules/roc_rtp/link_meter.cpp index d3e34a735..683cc3720 100644 --- a/src/internal_modules/roc_rtp/link_meter.cpp +++ b/src/internal_modules/roc_rtp/link_meter.cpp @@ -13,16 +13,27 @@ namespace roc { namespace rtp { -LinkMeter::LinkMeter(const EncodingMap& encoding_map) +LinkMeter::LinkMeter(core::IArena& arena, + const EncodingMap& encoding_map, + const audio::SampleSpec& sample_spec, + const audio::LatencyConfig& latency_config, + core::CsvDumper* dumper) : encoding_map_(encoding_map) , encoding_(NULL) , writer_(NULL) , reader_(NULL) + , sample_spec_(sample_spec) , first_packet_(true) + , win_len_(latency_config.sliding_stat_window_length) , has_metrics_(false) , first_seqnum_(0) , last_seqnum_hi_(0) - , last_seqnum_lo_(0) { + , last_seqnum_lo_(0) + , processed_packets_(0) + , prev_queue_timestamp_(-1) + , prev_stream_timestamp_(0) + , packet_jitter_stats_(arena, win_len_) + , dumper_(dumper) { } status::StatusCode LinkMeter::init_status() const { @@ -66,7 +77,10 @@ status::StatusCode LinkMeter::write(const packet::PacketPtr& packet) { // When we create LinkMeter, we don't know yet if RTP is used (e.g. // for repair packets), so we should be ready for non-rtp packets. - if (packet->rtp()) { + if (packet->has_flags(packet::Packet::FlagRTP)) { + if (!packet->has_flags(packet::Packet::FlagUDP)) { + roc_panic("Non-udp rtp packet"); + } // Since we don't know packet type in-before, we also determine // encoding dynamically. if (!encoding_ || encoding_->payload_type != packet->rtp()->payload_type) { @@ -86,12 +100,13 @@ status::StatusCode LinkMeter::read(packet::PacketPtr& packet, roc_panic("link meter: forgot to call set_reader()"); } - const status::StatusCode code = reader_->read(packet, mode); - if (code != status::StatusOK) { - return code; + status::StatusCode result = reader_->read(packet, mode); + if (packet && packet->has_flags(packet::Packet::FlagRestored) + && mode == packet::ModeFetch) { + metrics_.recovered_packets++; } - return status::StatusOK; + return result; } void LinkMeter::set_writer(packet::IWriter& writer) { @@ -103,6 +118,7 @@ void LinkMeter::set_reader(packet::IReader& reader) { } void LinkMeter::update_metrics_(const packet::Packet& packet) { + const bool recovered = packet.has_flags(packet::Packet::FlagRestored); const packet::seqnum_t pkt_seqnum = packet.rtp()->seqnum; // If packet seqnum is before first seqnum, and there was no wrap yet, @@ -112,26 +128,79 @@ void LinkMeter::update_metrics_(const packet::Packet& packet) { first_seqnum_ = pkt_seqnum; } - // If packet seqnum is after last seqnum, update last seqnum, and - // also counts possible wraps. - if (first_packet_ || packet::seqnum_diff(pkt_seqnum, last_seqnum_lo_) > 0) { + if (first_packet_) { + last_seqnum_hi_ = 0; + last_seqnum_lo_ = pkt_seqnum; + + } else if (packet::seqnum_diff(pkt_seqnum, last_seqnum_lo_) > 0) { + // If packet seqnum is after last seqnum, update last seqnum, and + // also counts possible wraps. if (pkt_seqnum < last_seqnum_lo_) { - last_seqnum_hi_ += (uint16_t)-1; + last_seqnum_hi_ += (uint32_t)1 << 16; } last_seqnum_lo_ = pkt_seqnum; } + if (!first_packet_) { + if (!recovered) { + update_jitter_(packet); + } + } else { + first_packet_ = false; + } + + if (!recovered) { + prev_queue_timestamp_ = packet.udp()->queue_timestamp; + prev_stream_timestamp_ = packet.rtp()->stream_timestamp; + } + processed_packets_++; + metrics_.ext_first_seqnum = first_seqnum_; metrics_.ext_last_seqnum = last_seqnum_hi_ + last_seqnum_lo_; + metrics_.expected_packets = metrics_.ext_last_seqnum - first_seqnum_ + 1; + metrics_.lost_packets = (int64_t)metrics_.expected_packets - processed_packets_; - // TODO(gh-688): - // - fill total_packets - // - fill lost_packets - // - fill jitter (use encoding_->sample_spec to convert to nanoseconds) - - first_packet_ = false; has_metrics_ = true; } +void LinkMeter::update_jitter_(const packet::Packet& packet) { + const core::nanoseconds_t d_enq_ns = + packet.udp()->queue_timestamp - prev_queue_timestamp_; + const packet::stream_timestamp_diff_t d_s_ts = packet::stream_timestamp_diff( + packet.rtp()->stream_timestamp, prev_stream_timestamp_); + const core::nanoseconds_t d_s_ns = sample_spec_.stream_timestamp_delta_2_ns(d_s_ts); + + packet_jitter_stats_.add(std::abs(d_enq_ns - d_s_ns)); + metrics_.max_jitter = (core::nanoseconds_t)packet_jitter_stats_.mov_max(); + metrics_.min_jitter = (core::nanoseconds_t)packet_jitter_stats_.mov_min(); + metrics_.jitter = mean_jitter(); + + if (dumper_) { + dump_(packet, d_enq_ns, d_s_ns); + } +} + +void LinkMeter::dump_(const packet::Packet& packet, + const long d_enq_ns, + const long d_s_ns) { + core::CsvEntry e; + e.type = 'm'; + e.n_fields = 5; + e.fields[0] = packet.udp()->queue_timestamp; + e.fields[1] = packet.rtp()->stream_timestamp; + e.fields[2] = (double)std::abs(d_enq_ns - d_s_ns) / core::Millisecond; + e.fields[3] = packet_jitter_stats_.mov_max(); + e.fields[4] = packet_jitter_stats_.mov_min(); + dumper_->write(e); +} + +core::nanoseconds_t rtp::LinkMeter::mean_jitter() const { + return (core::nanoseconds_t)packet_jitter_stats_.mov_avg(); +} + +size_t LinkMeter::running_window_len() const { + return win_len_; +} + } // namespace rtp } // namespace roc diff --git a/src/internal_modules/roc_rtp/link_meter.h b/src/internal_modules/roc_rtp/link_meter.h index 5d7bddcfc..8bf225c93 100644 --- a/src/internal_modules/roc_rtp/link_meter.h +++ b/src/internal_modules/roc_rtp/link_meter.h @@ -12,7 +12,11 @@ #ifndef ROC_RTP_LINK_METER_H_ #define ROC_RTP_LINK_METER_H_ +#include "roc_audio/latency_tuner.h" #include "roc_audio/sample_spec.h" +#include "roc_core/csv_dumper.h" +#include "roc_core/iarena.h" +#include "roc_core/mov_stats.h" #include "roc_core/noncopyable.h" #include "roc_core/time.h" #include "roc_packet/ilink_meter.h" @@ -25,7 +29,7 @@ namespace roc { namespace rtp { -//! RTP link meter. +//! Link meter. //! //! Computes various link metrics based on sequence of RTP packets. //! Inserted into pipeline in two points: @@ -46,7 +50,11 @@ class LinkMeter : public packet::ILinkMeter, public core::NonCopyable<> { public: //! Initialize. - explicit LinkMeter(const EncodingMap& encoding_map); + explicit LinkMeter(core::IArena& arena, + const EncodingMap& encoding_map, + const audio::SampleSpec& sample_spec, + const audio::LatencyConfig& latency_config, + core::CsvDumper* dumper); //! Check if the object was successfully constructed. status::StatusCode init_status() const; @@ -75,9 +83,9 @@ class LinkMeter : public packet::ILinkMeter, //! Invoked early in pipeline right after the packet is received. virtual ROC_ATTR_NODISCARD status::StatusCode write(const packet::PacketPtr& packet); - //! Read packet and update metrics. + //! Read packet and update restored packet counter. //! @remarks - //! Invoked late in pipeline right before the packet is decoded. + //! Invoked near the end of pipeline so as to be aware of recovered packets. virtual ROC_ATTR_NODISCARD status::StatusCode read(packet::PacketPtr& packet, packet::PacketReadMode mode); @@ -86,13 +94,20 @@ class LinkMeter : public packet::ILinkMeter, //! Should be called before first write() call. void set_writer(packet::IWriter& writer); - //! Set nested packet reader. + //! Set packet reader. //! @remarks //! Should be called before first read() call. void set_reader(packet::IReader& reader); + //! Get recent average jitter over a running window. + core::nanoseconds_t mean_jitter() const; + + //! Window length to which metrics relate. + size_t running_window_len() const; + private: void update_metrics_(const packet::Packet& packet); + void update_jitter_(const packet::Packet& packet); const EncodingMap& encoding_map_; const Encoding* encoding_; @@ -100,14 +115,26 @@ class LinkMeter : public packet::ILinkMeter, packet::IWriter* writer_; packet::IReader* reader_; + const audio::SampleSpec sample_spec_; + bool first_packet_; - bool has_metrics_; + //! Number of packets we use to calculate sliding statistics. + const size_t win_len_; + bool has_metrics_; packet::LinkMetrics metrics_; uint16_t first_seqnum_; uint32_t last_seqnum_hi_; uint16_t last_seqnum_lo_; + + int64_t processed_packets_; + core::nanoseconds_t prev_queue_timestamp_; + packet::stream_timestamp_t prev_stream_timestamp_; + core::MovStats packet_jitter_stats_; + + core::CsvDumper* dumper_; + void dump_(const packet::Packet& packet, const long d_enq_ns, const long d_s_ns); }; } // namespace rtp diff --git a/src/public_api/include/roc/config.h b/src/public_api/include/roc/config.h index 1af0eec2d..fbe488b49 100644 --- a/src/public_api/include/roc/config.h +++ b/src/public_api/include/roc/config.h @@ -789,6 +789,48 @@ typedef struct roc_sender_config { */ unsigned long long target_latency; + /** Minimum allowed latency, in nanoseconds. + * + * How latency is calculated depends on \c latency_tuner_backend field. + * + * If latency bounding is enabled on sender (if \c latency_tuner_profile is not + * \ref ROC_LATENCY_TUNER_PROFILE_INTACT, or if any of \c min_latency and + * \c max_latency fields is non-zero), then if latency goes below \c min_latency or + * above \c max_latency, sender restarts connection to receiver. + * + * By default, latency bounding is **disabled** on sender. If you enable it on sender, + * you likely want to disable it on receiver. + * + * You should either set both \c min_latency and \c max_latency to meaningful values, + * or keep both zero. If both fields are zero, and if latency bounding is enabled, + * then default values are used. + * + * Negative value is allowed. For \ref ROC_LATENCY_TUNER_BACKEND_NIQ, latency + * can temporary become negative during burst packet losses, and negative + * \c min_latency may be used to tolerate this to some extent. + */ + long long min_latency; + + /** Maximum allowed latency, in nanoseconds. + * + * How latency is calculated depends on \c latency_tuner_backend field. + * + * If latency bounding is enabled on sender (if \c latency_tuner_profile is not + * \ref ROC_LATENCY_TUNER_PROFILE_INTACT, or if any of \c min_latency and + * \c max_latency fields is non-zero), then if latency goes below \c min_latency or + * above \c max_latency, sender restarts connection to receiver. + * + * By default, latency bounding is **disabled** on sender. If you enable it on sender, + * you likely want to disable it on receiver. + * + * You should either set both \c min_latency and \c max_latency to meaningful values, + * or keep both zero. If both fields are zero, and if latency bounding is enabled, + * then default values are used. + * + * Negative value doesn't make practical sense. + */ + long long max_latency; + /** Maximum allowed delta between current and target latency, in nanoseconds. * * How latency is calculated depends on \c latency_tuner_backend field. @@ -891,6 +933,13 @@ typedef struct roc_receiver_config { */ unsigned long long target_latency; + /** Start latency, in nanoseconds. + * + * If target latency is set to zero, and latency tuning is enabled, this value + * sets initial value of latency. + */ + unsigned long long start_latency; + /** Maximum allowed delta between current and target latency, in nanoseconds. * * How latency is calculated depends on \c latency_tuner_backend field. @@ -908,6 +957,48 @@ typedef struct roc_receiver_config { */ unsigned long long latency_tolerance; + /** Minimum allowed latency, in nanoseconds. + * + * How latency is calculated depends on \c latency_tuner_backend field. + * + * If latency bounding is enabled on receiver (if \c latency_tuner_profile is not + * \ref ROC_LATENCY_TUNER_PROFILE_INTACT, or if any of \c min_latency and + * \c max_latency fields is non-zero), then if latency goes below \c min_latency + * or above \c max_latency, receiver terminates connection to sender (but it then + * restarts if sender continues streaming). + * + * By default, latency bounding is **enabled** on receiver. If you disable it on + * receiver, you likely want to enable it on sender. + * + * You should either set both \c min_latency and \c max_latency to meaningful values, + * or keep both zero. If both fields are zero, and if latency bounding is enabled, + * then default values are used. + * + * Negative value is allowed. For \ref ROC_LATENCY_TUNER_BACKEND_NIQ, latency + * can temporary become negative during burst packet losses, and negative + * \c min_latency may be used to tolerate this to some extent. + */ + long long min_latency; + + /** Maximum allowed latency, in nanoseconds. + * + * If latency bounding is enabled on receiver (if \c latency_tuner_profile is not + * \ref ROC_LATENCY_TUNER_PROFILE_INTACT, or if any of \c min_latency and + * \c max_latency fields is non-zero), then if latency goes below \c min_latency + * or above \c max_latency, receiver terminates connection to sender (but it then + * restarts if sender continues streaming). + * + * By default, latency bounding is **enabled** on receiver. If you disable it on + * receiver, you likely want to enable it on sender. + * + * You should either set both \c min_latency and \c max_latency to meaningful values, + * or keep both zero. If both fields are zero, and if latency bounding is enabled, + * then default values are used. + * + * Negative value doesn't make practical sense. + */ + long long max_latency; + /** Timeout for the lack of playback, in nanoseconds. * * If there is no playback during this period, receiver terminates connection to diff --git a/src/public_api/include/roc/metrics.h b/src/public_api/include/roc/metrics.h index 493703e54..da4bc69b4 100644 --- a/src/public_api/include/roc/metrics.h +++ b/src/public_api/include/roc/metrics.h @@ -43,6 +43,25 @@ typedef struct roc_connection_metrics { * May be zero initially, until enough statistics is accumulated. */ unsigned long long e2e_latency; + + /** Estimated interarrival jitter, in nanoseconds. + * + * Determines expected variance of inter-packet arrival period. + * + * Estimated on receiver. + */ + unsigned long long mean_jitter; + + /** Total amount of packets that receiver expects to be delivered. + */ + unsigned long long expected_packets; + + /** Cumulative count of lost packets. + * + * The total number of RTP data packets that have been lost since the beginning + * of reception. + */ + unsigned long long lost_packets; } roc_connection_metrics; /** Receiver metrics. diff --git a/src/public_api/src/adapters.cpp b/src/public_api/src/adapters.cpp index d56464f00..2b572bcf9 100644 --- a/src/public_api/src/adapters.cpp +++ b/src/public_api/src/adapters.cpp @@ -105,6 +105,14 @@ bool sender_config_from_user(node::Context& context, out.latency.latency_tolerance = (core::nanoseconds_t)in.latency_tolerance; } + if (in.min_latency != 0) { + out.latency.min_latency = (core::nanoseconds_t)in.min_latency; + } + + if (in.max_latency != 0) { + out.latency.max_latency = (core::nanoseconds_t)in.max_latency; + } + out.enable_cpu_clock = false; out.enable_auto_cts = true; @@ -174,6 +182,26 @@ bool receiver_config_from_user(node::Context&, (core::nanoseconds_t)in.latency_tolerance; } + if (in.start_latency != 0) { + if (in.target_latency != 0) { + roc_log(LogError, + "bad configuration:" + " start latency must be 0 if latency tuning is disabled" + " (target_latency != 0)"); + return false; + } + out.session_defaults.latency.start_latency = + (core::nanoseconds_t)in.start_latency; + } + + if (in.min_latency != 0) { + out.session_defaults.latency.min_latency = (core::nanoseconds_t)in.min_latency; + } + + if (in.max_latency != 0) { + out.session_defaults.latency.max_latency = (core::nanoseconds_t)in.max_latency; + } + if (in.no_playback_timeout != 0) { out.session_defaults.watchdog.no_playback_timeout = in.no_playback_timeout; } @@ -731,6 +759,16 @@ void receiver_participant_metrics_to_user( if (party_metrics.latency.e2e_latency > 0) { out.e2e_latency = (unsigned long long)party_metrics.latency.e2e_latency; } + + if (party_metrics.link.jitter > 0) { + out.mean_jitter = (unsigned long long)party_metrics.link.jitter; + } + + if (party_metrics.link.expected_packets > 0) { + out.expected_packets = (unsigned long long)party_metrics.link.expected_packets; + out.lost_packets = + (unsigned long long)std::max(party_metrics.link.lost_packets, (int64_t)0); + } } ROC_ATTR_NO_SANITIZE_UB @@ -755,6 +793,16 @@ void sender_participant_metrics_to_user( if (party_metrics.latency.e2e_latency > 0) { out.e2e_latency = (unsigned long long)party_metrics.latency.e2e_latency; } + + if (party_metrics.link.jitter > 0) { + out.mean_jitter = (unsigned long long)party_metrics.link.jitter; + } + + if (party_metrics.link.expected_packets > 0) { + out.expected_packets = (unsigned long long)party_metrics.link.expected_packets; + out.lost_packets = + (unsigned long long)std::max(party_metrics.link.lost_packets, (int64_t)0); + } } ROC_ATTR_NO_SANITIZE_UB diff --git a/src/tests/public_api/test_loopback_encoder_2_decoder.cpp b/src/tests/public_api/test_loopback_encoder_2_decoder.cpp index 8dc5e66fd..ceedf691b 100644 --- a/src/tests/public_api/test_loopback_encoder_2_decoder.cpp +++ b/src/tests/public_api/test_loopback_encoder_2_decoder.cpp @@ -91,6 +91,9 @@ TEST_GROUP(loopback_encoder_2_decoder) { bool leading_zeros = true; size_t iface_packets[10] = {}; + size_t recv_expected_pkts = 0; + size_t sent_expected_pkts = 0; + int64_t recv_lost_pkts = 0; size_t feedback_packets = 0; size_t zero_samples = 0, total_samples = 0; size_t n_pkt = 0; @@ -108,7 +111,8 @@ TEST_GROUP(loopback_encoder_2_decoder) { } } - for (size_t nf = 0; nf < NumFrames || !got_all_metrics; nf++) { + const size_t last_frame = NumFrames - 1; + for (size_t nf = 0; nf <= last_frame || !got_all_metrics; nf++) { { // write frame to encoder float samples[test::FrameSamples] = {}; @@ -139,7 +143,7 @@ TEST_GROUP(loopback_encoder_2_decoder) { const bool loss = (flags & FlagLosses) && (ifaces[n_if] == ROC_INTERFACE_AUDIO_SOURCE) - && ((n_pkt + 3) % LossRatio == 0); + && ((n_pkt + 3) % LossRatio == 0) && nf < last_frame; if (!loss) { CHECK(roc_receiver_decoder_push_packet(decoder, ifaces[n_if], @@ -227,6 +231,10 @@ TEST_GROUP(loopback_encoder_2_decoder) { max_recv_e2e_latency = std::max(max_recv_e2e_latency, conn_metrics.e2e_latency); + + recv_expected_pkts = + std::max(recv_expected_pkts, (size_t)conn_metrics.expected_packets); + recv_lost_pkts = (int64_t)conn_metrics.lost_packets; } { // check sender metrics roc_sender_metrics send_metrics; @@ -242,6 +250,9 @@ TEST_GROUP(loopback_encoder_2_decoder) { max_send_e2e_latency = std::max(max_send_e2e_latency, conn_metrics.e2e_latency); + + sent_expected_pkts = std::max(sent_expected_pkts, + (size_t)conn_metrics.expected_packets); } } @@ -255,6 +266,19 @@ TEST_GROUP(loopback_encoder_2_decoder) { // check we have received enough good samples CHECK(zero_samples < MaxLeadingZeros); + for (size_t n_if = 0; n_if < num_ifaces; n_if++) { + if (ifaces[n_if] == ROC_INTERFACE_AUDIO_SOURCE) { + UNSIGNED_LONGS_EQUAL(iface_packets[n_if], recv_expected_pkts); + if (has_control) { + const size_t nlag = test::FrameSamples / test::PacketSamples; + CHECK(recv_expected_pkts >= sent_expected_pkts + && recv_expected_pkts <= sent_expected_pkts + nlag); + } + } + } + // check lost packets metrics + UNSIGNED_LONGS_EQUAL(n_lost, recv_lost_pkts); + // check that there were packets on all active interfaces for (size_t n_if = 0; n_if < num_ifaces; n_if++) { CHECK(iface_packets[n_if] > 0); diff --git a/src/tests/public_api/test_loopback_sender_2_receiver.cpp b/src/tests/public_api/test_loopback_sender_2_receiver.cpp index c21210e5d..6d514780a 100644 --- a/src/tests/public_api/test_loopback_sender_2_receiver.cpp +++ b/src/tests/public_api/test_loopback_sender_2_receiver.cpp @@ -590,26 +590,33 @@ TEST(loopback_sender_2_receiver, metrics_measurements) { continue; } - UNSIGNED_LONGS_EQUAL(1, receiver.recv_metrics().connection_count); + const roc_receiver_metrics& recv_metrics = receiver.recv_metrics(); + UNSIGNED_LONGS_EQUAL(1, recv_metrics.connection_count); UNSIGNED_LONGS_EQUAL(1, receiver.conn_metrics_count()); - if (receiver.conn_metrics(0).e2e_latency == 0) { + const roc_connection_metrics& recv_conn_metric = receiver.conn_metrics(0); + if (recv_conn_metric.e2e_latency == 0) { continue; } sender.query_metrics(MaxSess); - if (sender.send_metrics().connection_count == 0) { + const roc_sender_metrics& send_metrics = sender.send_metrics(); + if (send_metrics.connection_count == 0) { continue; } - UNSIGNED_LONGS_EQUAL(1, sender.send_metrics().connection_count); + UNSIGNED_LONGS_EQUAL(1, send_metrics.connection_count); UNSIGNED_LONGS_EQUAL(1, sender.conn_metrics_count()); + const roc_connection_metrics& send_conn_metrics = sender.conn_metrics(0); - if (sender.conn_metrics(0).e2e_latency == 0) { + if (send_conn_metrics.e2e_latency == 0) { continue; } + UNSIGNED_LONGS_EQUAL(0, recv_conn_metric.lost_packets); + CHECK(send_conn_metrics.expected_packets > 0); + CHECK(recv_conn_metric.expected_packets >= send_conn_metrics.expected_packets); break; } @@ -795,6 +802,10 @@ TEST(loopback_sender_2_receiver, metrics_slots) { continue; } + if (receiver.conn_metrics(0).mean_jitter == 0) { + continue; + } + break; } @@ -808,6 +819,9 @@ TEST(loopback_sender_2_receiver, metrics_slots) { UNSIGNED_LONGS_EQUAL(1, receiver.recv_metrics().connection_count); UNSIGNED_LONGS_EQUAL(1, receiver.conn_metrics_count()); + + CHECK(receiver.conn_metrics(0).expected_packets > 0); + CHECK(receiver.conn_metrics(0).mean_jitter > 0); } for (;;) { @@ -825,6 +839,10 @@ TEST(loopback_sender_2_receiver, metrics_slots) { continue; } + if (sender.conn_metrics(0).mean_jitter == 0) { + continue; + } + break; } @@ -838,6 +856,9 @@ TEST(loopback_sender_2_receiver, metrics_slots) { UNSIGNED_LONGS_EQUAL(1, sender.send_metrics().connection_count); UNSIGNED_LONGS_EQUAL(1, sender.conn_metrics_count()); + + CHECK(sender.conn_metrics(0).expected_packets > 0); + CHECK(sender.conn_metrics(0).mean_jitter > 0); } receiver.stop(); diff --git a/src/tests/roc_audio/test_freq_estimator.cpp b/src/tests/roc_audio/test_freq_estimator.cpp index 39e76dd99..25176fe84 100644 --- a/src/tests/roc_audio/test_freq_estimator.cpp +++ b/src/tests/roc_audio/test_freq_estimator.cpp @@ -31,7 +31,7 @@ TEST_GROUP(freq_estimator) { TEST(freq_estimator, initial) { for (size_t p = 0; p < ROC_ARRAY_SIZE(Profiles); p++) { - FreqEstimator fe(Profiles[p], Target); + FreqEstimator fe(Profiles[p], Target, NULL); DOUBLES_EQUAL(1.0, (double)fe.freq_coeff(), Epsilon); } @@ -39,10 +39,10 @@ TEST(freq_estimator, initial) { TEST(freq_estimator, aim_queue_size) { for (size_t p = 0; p < ROC_ARRAY_SIZE(Profiles); p++) { - FreqEstimator fe(Profiles[p], Target); + FreqEstimator fe(Profiles[p], Target, NULL); for (size_t n = 0; n < 1000; n++) { - fe.update(Target); + fe.update_current_latency(Target); } DOUBLES_EQUAL(1.0, (double)fe.freq_coeff(), Epsilon); @@ -51,21 +51,21 @@ TEST(freq_estimator, aim_queue_size) { TEST(freq_estimator, large_queue_size) { for (size_t p = 0; p < ROC_ARRAY_SIZE(Profiles); p++) { - FreqEstimator fe(Profiles[p], Target); + FreqEstimator fe(Profiles[p], Target, NULL); do { - fe.update(Target * 2); + fe.update_current_latency(Target * 2); } while (fe.freq_coeff() < 1.01f); } } TEST(freq_estimator, small_queue_size) { for (size_t p = 0; p < ROC_ARRAY_SIZE(Profiles); p++) { - FreqEstimator fe(Profiles[p], Target); + FreqEstimator fe(Profiles[p], Target, NULL); do { - fe.update(Target / 2); - } while (fe.freq_coeff() > 0.99f); + fe.update_current_latency(Target / 2); + } while (fe.freq_coeff() > 0.997f); } } diff --git a/src/tests/roc_pipeline/test_loopback_sink_2_source.cpp b/src/tests/roc_pipeline/test_loopback_sink_2_source.cpp index 3025d9a40..ecd030a3b 100644 --- a/src/tests/roc_pipeline/test_loopback_sink_2_source.cpp +++ b/src/tests/roc_pipeline/test_loopback_sink_2_source.cpp @@ -387,7 +387,10 @@ void read_samples(test::FrameReader& frame_reader, } } -void check_metrics(ReceiverSlot& receiver, SenderSlot& sender, int flags) { +void check_metrics(ReceiverSlot& receiver, + SenderSlot& sender, + int flags, + PacketProxy& packet_proxy) { ReceiverSlotMetrics recv_metrics; ReceiverParticipantMetrics recv_party_metrics; size_t recv_party_count = 1; @@ -401,10 +404,17 @@ void check_metrics(ReceiverSlot& receiver, SenderSlot& sender, int flags) { CHECK(recv_party_metrics.link.ext_first_seqnum > 0); CHECK(recv_party_metrics.link.ext_last_seqnum > 0); - // TODO(gh-688): check that metrics are non-zero: - // - total_packets - // - lost_packets - // - jitter + LONGS_EQUAL((int64_t)recv_party_metrics.link.expected_packets + - recv_party_metrics.link.lost_packets, + packet_proxy.n_source()); + if (flags & FlagLosses) { + CHECK(recv_party_metrics.link.lost_packets > 0); + } else if (flags & FlagInterleaving) { + CHECK(recv_party_metrics.link.lost_packets >= 0); + } else { + CHECK(recv_party_metrics.link.lost_packets == 0); + } + CHECK(recv_party_metrics.link.jitter > 0); CHECK(recv_party_metrics.latency.niq_latency > 0); CHECK(recv_party_metrics.latency.niq_stalling >= 0); @@ -432,10 +442,16 @@ void check_metrics(ReceiverSlot& receiver, SenderSlot& sender, int flags) { send_party_metrics.link.ext_last_seqnum) <= 1); - // TODO(gh-688): check that metrics are equal on sender and receiver: - // - total_packets - // - lost_packets - // - jitter + CHECK((send_party_metrics.link.expected_packets >= packet_proxy.n_source() - 1) + && (send_party_metrics.link.expected_packets <= packet_proxy.n_source())); + + UNSIGNED_LONGS_EQUAL(packet_proxy.n_source(), + recv_party_metrics.link.expected_packets); + + UNSIGNED_LONGS_EQUAL(recv_party_metrics.link.lost_packets, + send_party_metrics.link.lost_packets); + CHECK(std::abs(recv_party_metrics.link.jitter - send_party_metrics.link.jitter) + < 1 * core::Millisecond); DOUBLES_EQUAL(recv_party_metrics.latency.niq_latency, send_party_metrics.latency.niq_latency, core::Millisecond); @@ -597,10 +613,18 @@ void send_receive(int flags, reverse_proxy.deliver_from(receiver_outbound_queue); if (num_sessions == 1 && nf > (Latency + Warmup) / SamplesPerFrame) { - check_metrics(*receiver_slot, *sender_slot, flags); + check_metrics(*receiver_slot, *sender_slot, flags, proxy); } } } + // While receiving interleaved packets losses could be detected incorrectly, + // so we postpone the final check for lost packets metric till the whole bunch + // of packets is sent. + if (flags & FlagInterleaving) { + // Here we exclude FlagInterleaving from flags so that check_metrics could + // undertake the full check. + check_metrics(*receiver_slot, *sender_slot, flags ^ FlagInterleaving, proxy); + } if ((flags & FlagDropSource) == 0) { CHECK(proxy.n_source() > 0); diff --git a/src/tests/roc_pipeline/test_receiver_endpoint.cpp b/src/tests/roc_pipeline/test_receiver_endpoint.cpp index 4d9aa4096..19c836f61 100644 --- a/src/tests/roc_pipeline/test_receiver_endpoint.cpp +++ b/src/tests/roc_pipeline/test_receiver_endpoint.cpp @@ -44,7 +44,7 @@ TEST(receiver_endpoint, valid) { ReceiverSlotConfig slot_config; ReceiverSessionGroup session_group(source_config, slot_config, state_tracker, mixer, processor_map, encoding_map, packet_factory, - frame_factory, arena); + frame_factory, arena, NULL); ReceiverEndpoint endpoint(address::Proto_RTP, state_tracker, session_group, encoding_map, address::SocketAddr(), NULL, arena); @@ -59,7 +59,7 @@ TEST(receiver_endpoint, invalid_proto) { ReceiverSlotConfig slot_config; ReceiverSessionGroup session_group(source_config, slot_config, state_tracker, mixer, processor_map, encoding_map, packet_factory, - frame_factory, arena); + frame_factory, arena, NULL); ReceiverEndpoint endpoint(address::Proto_None, state_tracker, session_group, encoding_map, address::SocketAddr(), NULL, arena); @@ -82,7 +82,7 @@ TEST(receiver_endpoint, no_memory) { ReceiverSlotConfig slot_config; ReceiverSessionGroup session_group( source_config, slot_config, state_tracker, mixer, processor_map, encoding_map, - packet_factory, frame_factory, core::NoopArena); + packet_factory, frame_factory, core::NoopArena, NULL); ReceiverEndpoint endpoint(protos[n], state_tracker, session_group, encoding_map, address::SocketAddr(), NULL, core::NoopArena); diff --git a/src/tests/roc_pipeline/test_receiver_loop.cpp b/src/tests/roc_pipeline/test_receiver_loop.cpp index 4ec44a689..0149b6705 100644 --- a/src/tests/roc_pipeline/test_receiver_loop.cpp +++ b/src/tests/roc_pipeline/test_receiver_loop.cpp @@ -119,6 +119,7 @@ TEST_GROUP(receiver_loop) { void setup() { config.session_defaults.latency.tuner_backend = audio::LatencyTunerBackend_Niq; config.session_defaults.latency.tuner_profile = audio::LatencyTunerProfile_Intact; + config.session_defaults.latency.target_latency = DefaultLatency; } }; diff --git a/src/tests/roc_pipeline/test_receiver_source.cpp b/src/tests/roc_pipeline/test_receiver_source.cpp index 7cffbb28b..1cd99c630 100644 --- a/src/tests/roc_pipeline/test_receiver_source.cpp +++ b/src/tests/roc_pipeline/test_receiver_source.cpp @@ -3559,9 +3559,102 @@ TEST(receiver_source, metrics_truncation) { } // Check how receiver computes packet metrics: -// total_packets, lost_packets, ext_first_seqnum, ext_last_seqnum -IGNORE_TEST(receiver_source, metrics_packet_counters) { - // TODO(gh-688): implement test +// expected_packets, lost_packets, ext_first_seqnum, ext_last_seqnum +TEST(receiver_source, metrics_packet_counters) { + enum { InitSeqnum = 0xFFFC }; + uint32_t seqnum = InitSeqnum; + uint32_t prev_seqnum = InitSeqnum; + size_t pkt_counter = 0; + size_t prev_pkt_counter = 0; + size_t pkt_lost_counter = 0; + size_t prev_pkt_lost_counter = 0; + + init_with_defaults(); + + ReceiverSource receiver(make_default_config(), processor_map, encoding_map, + packet_pool, packet_buffer_pool, frame_pool, + frame_buffer_pool, arena); + LONGS_EQUAL(status::StatusOK, receiver.init_status()); + + ReceiverSlot* slot = create_slot(receiver); + CHECK(slot); + + { + ReceiverSlotMetrics slot_metrics; + ReceiverParticipantMetrics party_metrics; + + slot->get_metrics(slot_metrics, &party_metrics, NULL); + } + + packet::IWriter* endpoint_writer = + create_transport_endpoint(slot, address::Iface_AudioSource, proto1, dst_addr1); + CHECK(endpoint_writer); + + test::FrameReader frame_reader(receiver, frame_factory); + + test::PacketWriter packet_writer(arena, *endpoint_writer, encoding_map, + packet_factory, src_id1, src_addr1, dst_addr1, + PayloadType_Ch2); + packet_writer.set_seqnum(InitSeqnum); + packet_writer.write_packets(Latency / SamplesPerPacket, SamplesPerPacket, + output_sample_spec); + pkt_counter += Latency / SamplesPerPacket; + prev_pkt_counter = pkt_counter; + prev_seqnum = seqnum = InitSeqnum + pkt_counter - 1; + + { + ReceiverSlotMetrics slot_metrics; + ReceiverParticipantMetrics party_metrics; + size_t party_metrics_size = 1; + + slot->get_metrics(slot_metrics, &party_metrics, &party_metrics_size); + + CHECK(slot_metrics.source_id != 0); + } + + for (size_t np = 0; np < ManyPackets; np++) { + const bool lose_pkt = np % 3 == 0 && np; + for (size_t nf = 0; nf < FramesPerPacket; nf++) { + refresh_source(receiver, frame_reader.refresh_ts()); + frame_reader.read_any_samples(SamplesPerFrame, output_sample_spec); + + UNSIGNED_LONGS_EQUAL(1, receiver.num_sessions()); + } + + if (lose_pkt) { + packet_writer.skip_packets(1, SamplesPerPacket, output_sample_spec); + } else { + packet_writer.write_packets(1, SamplesPerPacket, output_sample_spec); + } + + { + ReceiverSlotMetrics slot_metrics; + ReceiverParticipantMetrics party_metrics; + size_t party_metrics_size = 1; + + slot->get_metrics(slot_metrics, &party_metrics, &party_metrics_size); + + if (!lose_pkt) { + UNSIGNED_LONGS_EQUAL(prev_pkt_counter, + party_metrics.link.expected_packets); + UNSIGNED_LONGS_EQUAL(InitSeqnum, party_metrics.link.ext_first_seqnum); + UNSIGNED_LONGS_EQUAL(prev_pkt_lost_counter, + party_metrics.link.lost_packets); + UNSIGNED_LONGS_EQUAL(prev_seqnum, party_metrics.link.ext_last_seqnum); + } + } + + prev_pkt_lost_counter = pkt_lost_counter; + if (lose_pkt) { + pkt_lost_counter++; + } + pkt_counter++; + seqnum++; + if (!lose_pkt) { + prev_pkt_counter = pkt_counter; + prev_seqnum = seqnum; + } + } } // Check how receiver computes jitter metric. diff --git a/src/tests/roc_pipeline/test_sender_endpoint.cpp b/src/tests/roc_pipeline/test_sender_endpoint.cpp index e6752fe41..8191e47c8 100644 --- a/src/tests/roc_pipeline/test_sender_endpoint.cpp +++ b/src/tests/roc_pipeline/test_sender_endpoint.cpp @@ -45,7 +45,7 @@ TEST(sender_endpoint, valid) { SenderSinkConfig sink_config; StateTracker state_tracker; SenderSession session(sink_config, processor_map, encoding_map, packet_factory, - frame_factory, arena); + frame_factory, arena, NULL); SenderEndpoint endpoint(address::Proto_RTP, state_tracker, session, addr, queue, arena); @@ -60,7 +60,7 @@ TEST(sender_endpoint, invalid_proto) { SenderSinkConfig sink_config; StateTracker state_tracker; SenderSession session(sink_config, processor_map, encoding_map, packet_factory, - frame_factory, arena); + frame_factory, arena, NULL); SenderEndpoint endpoint(address::Proto_None, state_tracker, session, addr, queue, arena); @@ -82,7 +82,7 @@ TEST(sender_endpoint, no_memory) { SenderSinkConfig sink_config; StateTracker state_tracker; SenderSession session(sink_config, processor_map, encoding_map, packet_factory, - frame_factory, arena); + frame_factory, arena, NULL); SenderEndpoint endpoint(protos[n], state_tracker, session, addr, queue, core::NoopArena); diff --git a/src/tests/roc_pipeline/test_sender_sink.cpp b/src/tests/roc_pipeline/test_sender_sink.cpp index 31eab522b..a2b0c0fa8 100644 --- a/src/tests/roc_pipeline/test_sender_sink.cpp +++ b/src/tests/roc_pipeline/test_sender_sink.cpp @@ -648,7 +648,7 @@ TEST(sender_sink, metrics_feedback) { packet::LinkMetrics link_metrics; link_metrics.ext_first_seqnum = seed * 100; link_metrics.ext_last_seqnum = seed * 200; - link_metrics.total_packets = (seed * 200) - (seed * 100) + 1; + link_metrics.expected_packets = (seed * 200) - (seed * 100) + 1; link_metrics.lost_packets = (int)seed * 40; link_metrics.jitter = (int)seed * core::Millisecond * 50; @@ -684,8 +684,8 @@ TEST(sender_sink, metrics_feedback) { party_metrics[0].link.ext_first_seqnum); UNSIGNED_LONGS_EQUAL(link_metrics.ext_last_seqnum, party_metrics[0].link.ext_last_seqnum); - UNSIGNED_LONGS_EQUAL(link_metrics.total_packets, - party_metrics[0].link.total_packets); + UNSIGNED_LONGS_EQUAL(link_metrics.expected_packets, + party_metrics[0].link.expected_packets); UNSIGNED_LONGS_EQUAL(link_metrics.lost_packets, party_metrics[0].link.lost_packets); DOUBLES_EQUAL((double)link_metrics.jitter, diff --git a/src/tests/roc_pipeline/test_session_router.cpp b/src/tests/roc_pipeline/test_session_router.cpp index a6a74c371..d1a29ec9d 100644 --- a/src/tests/roc_pipeline/test_session_router.cpp +++ b/src/tests/roc_pipeline/test_session_router.cpp @@ -63,10 +63,10 @@ TEST_GROUP(session_router) { sess1 = new (arena) ReceiverSession(session_config, common_config, processor_map, - encoding_map, packet_factory, frame_factory, arena); + encoding_map, packet_factory, frame_factory, arena, NULL); sess2 = new (arena) ReceiverSession(session_config, common_config, processor_map, - encoding_map, packet_factory, frame_factory, arena); + encoding_map, packet_factory, frame_factory, arena, NULL); } } }; diff --git a/src/tests/roc_rtp/test_link_meter.cpp b/src/tests/roc_rtp/test_link_meter.cpp index b9952396f..724bdb201 100644 --- a/src/tests/roc_rtp/test_link_meter.cpp +++ b/src/tests/roc_rtp/test_link_meter.cpp @@ -10,10 +10,14 @@ #include "test_helpers/status_writer.h" +#include "roc_audio/latency_monitor.h" +#include "roc_audio/sample_spec.h" +#include "roc_core/fast_random.h" #include "roc_core/heap_arena.h" #include "roc_core/macro_helpers.h" #include "roc_packet/fifo_queue.h" #include "roc_packet/packet_factory.h" +#include "roc_pipeline/config.h" #include "roc_rtp/encoding_map.h" #include "roc_rtp/headers.h" #include "roc_rtp/link_meter.h" @@ -23,37 +27,59 @@ namespace rtp { namespace { -enum { PacketSz = 100 }; +enum { ChMask = 3, PacketSz = 100, SampleRate = 10000, Duration = 100 }; core::HeapArena arena; packet::PacketFactory packet_factory(arena, PacketSz); EncodingMap encoding_map(arena); -packet::PacketPtr new_packet(packet::seqnum_t sn) { +audio::SampleSpec sample_spec(SampleRate, + audio::Sample_RawFormat, + audio::ChanLayout_Surround, + audio::ChanOrder_Smpte, + ChMask); +const core::nanoseconds_t start_ts = 1691499037871419405; +const core::nanoseconds_t step_ts = Duration * core::Second / SampleRate; + +const packet::stream_timestamp_t stream_start_ts = 6134803; +const packet::stream_timestamp_t stream_step_ts = Duration; + +packet::PacketPtr new_packet(packet::seqnum_t sn, + const core::nanoseconds_t ts, + const packet::stream_timestamp_t stream_ts) { packet::PacketPtr packet = packet_factory.new_packet(); CHECK(packet); packet->add_flags(packet::Packet::FlagRTP | packet::Packet::FlagUDP); packet->rtp()->payload_type = PayloadType_L16_Stereo; packet->rtp()->seqnum = sn; - packet->udp()->queue_timestamp = 666; + packet->rtp()->duration = Duration; + packet->rtp()->stream_timestamp = stream_ts; + packet->udp()->queue_timestamp = ts; return packet; } +audio::LatencyConfig make_config() { + audio::LatencyConfig latency_config; + latency_config.tuner_profile = audio::LatencyTunerProfile_Responsive; + latency_config.deduce_defaults(pipeline::DefaultLatency, true); + return latency_config; +} } // namespace TEST_GROUP(link_meter) {}; TEST(link_meter, has_metrics) { packet::FifoQueue queue; - LinkMeter meter(encoding_map); + LinkMeter meter(arena, encoding_map, sample_spec, make_config(), NULL); meter.set_writer(queue); CHECK(!meter.has_metrics()); - LONGS_EQUAL(status::StatusOK, meter.write(new_packet(100))); + LONGS_EQUAL(status::StatusOK, + meter.write(new_packet(100, start_ts, stream_start_ts))); UNSIGNED_LONGS_EQUAL(1, queue.size()); CHECK(meter.has_metrics()); @@ -61,59 +87,313 @@ TEST(link_meter, has_metrics) { TEST(link_meter, last_seqnum) { packet::FifoQueue queue; - LinkMeter meter(encoding_map); + LinkMeter meter(arena, encoding_map, sample_spec, make_config(), NULL); meter.set_writer(queue); + core::nanoseconds_t ts = start_ts; + packet::stream_timestamp_t sts = stream_start_ts; UNSIGNED_LONGS_EQUAL(0, meter.metrics().ext_last_seqnum); - LONGS_EQUAL(status::StatusOK, meter.write(new_packet(100))); + LONGS_EQUAL(status::StatusOK, meter.write(new_packet(100, ts, sts))); UNSIGNED_LONGS_EQUAL(100, meter.metrics().ext_last_seqnum); + ts += step_ts; + sts += stream_step_ts; // seqnum increased, metric updated - LONGS_EQUAL(status::StatusOK, meter.write(new_packet(102))); + LONGS_EQUAL(status::StatusOK, + meter.write(new_packet(102, ts + step_ts, sts + stream_step_ts))); UNSIGNED_LONGS_EQUAL(102, meter.metrics().ext_last_seqnum); // seqnum decreased, ignored - LONGS_EQUAL(status::StatusOK, meter.write(new_packet(101))); + LONGS_EQUAL(status::StatusOK, meter.write(new_packet(101, ts, sts))); UNSIGNED_LONGS_EQUAL(102, meter.metrics().ext_last_seqnum); + ts += step_ts * 2; + sts += stream_step_ts * 2; // seqnum increased, metric updated - LONGS_EQUAL(status::StatusOK, meter.write(new_packet(103))); - UNSIGNED_LONGS_EQUAL(103, meter.metrics().ext_last_seqnum); + UNSIGNED_LONGS_EQUAL(status::StatusOK, meter.write(new_packet(103, ts, sts))); + + UNSIGNED_LONGS_EQUAL(0, meter.mean_jitter()); + + const packet::LinkMetrics& metrics = meter.metrics(); + UNSIGNED_LONGS_EQUAL(0, metrics.jitter); + UNSIGNED_LONGS_EQUAL(103, metrics.ext_last_seqnum); UNSIGNED_LONGS_EQUAL(4, queue.size()); } TEST(link_meter, last_seqnum_wrap) { packet::FifoQueue queue; - LinkMeter meter(encoding_map); + LinkMeter meter(arena, encoding_map, sample_spec, make_config(), NULL); meter.set_writer(queue); + core::nanoseconds_t ts = start_ts; + packet::stream_timestamp_t sts = stream_start_ts; UNSIGNED_LONGS_EQUAL(0, meter.metrics().ext_last_seqnum); // no overflow - LONGS_EQUAL(status::StatusOK, meter.write(new_packet(65533))); + LONGS_EQUAL(status::StatusOK, meter.write(new_packet(65533, ts, sts))); UNSIGNED_LONGS_EQUAL(65533, meter.metrics().ext_last_seqnum); + UNSIGNED_LONGS_EQUAL(1, meter.metrics().expected_packets); // no overflow - LONGS_EQUAL(status::StatusOK, meter.write(new_packet(65535))); + LONGS_EQUAL( + status::StatusOK, + meter.write(new_packet(65535, ts + step_ts * 2, sts + stream_step_ts * 2))); UNSIGNED_LONGS_EQUAL(65535, meter.metrics().ext_last_seqnum); + UNSIGNED_LONGS_EQUAL(3, meter.metrics().expected_packets); // overflow - LONGS_EQUAL(status::StatusOK, meter.write(new_packet(2))); + LONGS_EQUAL(status::StatusOK, + meter.write(new_packet(1, ts + step_ts * 3, sts + stream_step_ts * 3))); UNSIGNED_LONGS_EQUAL(65537, meter.metrics().ext_last_seqnum); + UNSIGNED_LONGS_EQUAL(5, meter.metrics().expected_packets); // late packet, ignored - LONGS_EQUAL(status::StatusOK, meter.write(new_packet(65534))); + LONGS_EQUAL(status::StatusOK, + meter.write(new_packet(65534, ts + step_ts, sts + stream_step_ts))); UNSIGNED_LONGS_EQUAL(65537, meter.metrics().ext_last_seqnum); + UNSIGNED_LONGS_EQUAL(5, meter.metrics().expected_packets); // new packet - LONGS_EQUAL(status::StatusOK, meter.write(new_packet(5))); + LONGS_EQUAL(status::StatusOK, + meter.write(new_packet(4, ts + step_ts * 6, sts + stream_step_ts * 6))); UNSIGNED_LONGS_EQUAL(65540, meter.metrics().ext_last_seqnum); + UNSIGNED_LONGS_EQUAL(8, meter.metrics().expected_packets); UNSIGNED_LONGS_EQUAL(5, queue.size()); } +TEST(link_meter, jitter_test) { + packet::FifoQueue queue; + LinkMeter meter(arena, encoding_map, sample_spec, make_config(), NULL); + const ssize_t RunningWinLen = (ssize_t)meter.running_window_len(); + meter.set_writer(queue); + const size_t num_packets = Duration * 100; + core::nanoseconds_t ts_store[num_packets]; + + core::nanoseconds_t ts = start_ts; + packet::stream_timestamp_t sts = stream_start_ts; + for (size_t i = 0; i < num_packets; i++) { + packet::seqnum_t seqnum = 65500 + i; + ts_store[i] = ts; + UNSIGNED_LONGS_EQUAL(status::StatusOK, meter.write(new_packet(seqnum, ts, sts))); + const core::nanoseconds_t jitter_ns = + (core::nanoseconds_t)(core::fast_random_gaussian() * core::Millisecond); + ts += step_ts + jitter_ns; + sts += stream_step_ts; + + if (i > (size_t)RunningWinLen) { + // Check meter metrics running max in min jitter in last Duration number + // of packets in ts_store. + core::nanoseconds_t min_jitter = core::Second; + core::nanoseconds_t max_jitter = 0; + for (size_t j = 0; j < (size_t)RunningWinLen; j++) { + core::nanoseconds_t jitter = + std::abs(ts_store[i - j] - ts_store[i - j - 1] - step_ts); + min_jitter = std::min(min_jitter, jitter); + max_jitter = std::max(max_jitter, jitter); + } + UNSIGNED_LONGS_EQUAL(min_jitter, meter.metrics().min_jitter); + UNSIGNED_LONGS_EQUAL(max_jitter, meter.metrics().max_jitter); + + // Reference average and variance of jitter from ts_store values. + core::nanoseconds_t sum = 0; + for (size_t j = 0; j < (size_t)RunningWinLen; j++) { + sum += std::abs(ts_store[i - j] - ts_store[i - j - 1] - step_ts); + } + const core::nanoseconds_t mean = sum / RunningWinLen; + + sum = 0; + for (size_t j = 0; j < (size_t)RunningWinLen; j++) { + core::nanoseconds_t jitter = + std::abs(ts_store[i - j] - ts_store[i - j - 1] - step_ts); + sum += (jitter - mean) * (jitter - mean); + } + + // Check the jitter value + DOUBLES_EQUAL(mean, meter.mean_jitter(), core::Microsecond * 1); + } + } +} + +TEST(link_meter, ascending_test) { + packet::FifoQueue queue; + LinkMeter meter(arena, encoding_map, sample_spec, make_config(), NULL); + const ssize_t running_win_len = (ssize_t)meter.running_window_len(); + meter.set_writer(queue); + const size_t num_packets = Duration * 100; + core::nanoseconds_t ts_store[num_packets]; + + core::nanoseconds_t ts = start_ts; + packet::stream_timestamp_t sts = stream_start_ts; + for (size_t i = 0; i < num_packets; i++) { + packet::seqnum_t seqnum = 65500 + i; + ts_store[i] = ts; + UNSIGNED_LONGS_EQUAL(status::StatusOK, meter.write(new_packet(seqnum, ts, sts))); + ts += step_ts + (core::nanoseconds_t)i * core::Microsecond; // Removed the random + // component to create + // an increasing + // sequence + sts += stream_step_ts; + + if (i > (size_t)running_win_len) { + // Check meter metrics running max in min jitter in last Duration number + // of packets in ts_store. + core::nanoseconds_t min_jitter = core::Second; + core::nanoseconds_t max_jitter = 0; + for (size_t j = 0; j < (size_t)running_win_len; j++) { + core::nanoseconds_t jitter = + std::abs(ts_store[i - j] - ts_store[i - j - 1] - step_ts); + min_jitter = std::min(min_jitter, jitter); + max_jitter = std::max(max_jitter, jitter); + } + UNSIGNED_LONGS_EQUAL(min_jitter, meter.metrics().min_jitter); + UNSIGNED_LONGS_EQUAL(max_jitter, meter.metrics().max_jitter); + } + } +} + +TEST(link_meter, descending_test) { + packet::FifoQueue queue; + LinkMeter meter(arena, encoding_map, sample_spec, make_config(), NULL); + const ssize_t RunningWinLen = (ssize_t)meter.running_window_len(); + meter.set_writer(queue); + const size_t num_packets = Duration * 100; + core::nanoseconds_t ts_store[num_packets]; + + core::nanoseconds_t ts = start_ts; + packet::stream_timestamp_t sts = stream_start_ts; + for (size_t i = 0; i < num_packets; i++) { + packet::seqnum_t seqnum = 65500 + i; + ts_store[i] = ts; + UNSIGNED_LONGS_EQUAL(status::StatusOK, meter.write(new_packet(seqnum, ts, sts))); + ts += step_ts - (core::nanoseconds_t)i * core::Nanosecond * 10; // Removed the + // random + // component to + // create an + // decreasing + // sequence + sts += stream_step_ts; + + if (i > (size_t)RunningWinLen) { + // Check meter metrics running max in min jitter in last Duration number + // of packets in ts_store. + core::nanoseconds_t min_jitter = core::Second; + core::nanoseconds_t max_jitter = 0; + for (size_t j = 0; j < (size_t)RunningWinLen; j++) { + core::nanoseconds_t jitter = + std::abs(ts_store[i - j] - ts_store[i - j - 1] - step_ts); + min_jitter = std::min(min_jitter, jitter); + max_jitter = std::max(max_jitter, jitter); + } + UNSIGNED_LONGS_EQUAL(min_jitter, meter.metrics().min_jitter); + UNSIGNED_LONGS_EQUAL(max_jitter, meter.metrics().max_jitter); + } + } +} + +TEST(link_meter, saw_test) { + packet::FifoQueue queue; + LinkMeter meter(arena, encoding_map, sample_spec, make_config(), NULL); + const ssize_t RunningWinLen = (ssize_t)meter.running_window_len(); + meter.set_writer(queue); + const size_t num_packets = Duration * 100; + core::nanoseconds_t ts_store[num_packets]; + core::nanoseconds_t step_ts_inc = core::Nanosecond * 10; + core::nanoseconds_t step_ts_ = step_ts; + + core::nanoseconds_t ts = start_ts; + packet::stream_timestamp_t sts = stream_start_ts; + for (size_t i = 0; i < num_packets; i++) { + packet::seqnum_t seqnum = 65500 + i; + ts_store[i] = ts; + UNSIGNED_LONGS_EQUAL(status::StatusOK, meter.write(new_packet(seqnum, ts, sts))); + ts += step_ts_; + sts += stream_step_ts; + step_ts_ += step_ts_inc; + if (i > 0 && i % (size_t)RunningWinLen == 0) { + step_ts_inc = -step_ts_inc; + } + + if (i > (size_t)RunningWinLen) { + // Check meter metrics running max in min jitter in last Duration number + // of packets in ts_store. + core::nanoseconds_t min_jitter = core::Second; + core::nanoseconds_t max_jitter = 0; + for (size_t j = 0; j < (size_t)RunningWinLen; j++) { + core::nanoseconds_t jitter = + std::abs(ts_store[i - j] - ts_store[i - j - 1] - step_ts); + min_jitter = std::min(min_jitter, jitter); + max_jitter = std::max(max_jitter, jitter); + } + UNSIGNED_LONGS_EQUAL(min_jitter, meter.metrics().min_jitter); + UNSIGNED_LONGS_EQUAL(max_jitter, meter.metrics().max_jitter); + } + } +} + +TEST(link_meter, losses_test) { + packet::FifoQueue queue; + LinkMeter meter(arena, encoding_map, sample_spec, make_config(), NULL); + meter.set_writer(queue); + const size_t num_packets = Duration * 2 * (1 << 16); + int64_t total_losses = 0; + + core::nanoseconds_t ts = start_ts; + packet::stream_timestamp_t sts = stream_start_ts; + for (size_t i = 0; i < num_packets; i++) { + packet::seqnum_t seqnum = 65500 + i; + packet::PacketPtr p = new_packet(seqnum, ts, sts); + ts += step_ts; + sts += stream_step_ts; + + if (i > 0 && core::fast_random_range(0, 100) < 30) { + i += 99; + total_losses += 100; + continue; + } else { + UNSIGNED_LONGS_EQUAL(status::StatusOK, meter.write(p)); + } + + packet::PacketPtr pr; + UNSIGNED_LONGS_EQUAL(status::StatusOK, queue.read(pr, packet::ModeFetch)); + UNSIGNED_LONGS_EQUAL(pr->rtp()->seqnum, p->rtp()->seqnum); + + if (i > 0) { + const packet::LinkMetrics& metrics = meter.metrics(); + UNSIGNED_LONGS_EQUAL(total_losses, metrics.lost_packets); + UNSIGNED_LONGS_EQUAL(i + 1, metrics.expected_packets); + } + } +} + +TEST(link_meter, total_counter) { + packet::FifoQueue queue; + LinkMeter meter(arena, encoding_map, sample_spec, make_config(), NULL); + meter.set_writer(queue); + core::nanoseconds_t ts = start_ts; + packet::stream_timestamp_t sts = stream_start_ts; + uint16_t seqnum = 65500; + uint32_t total_counter = 0; + + UNSIGNED_LONGS_EQUAL(0, meter.metrics().ext_last_seqnum); + + for (size_t i = 0; i < 66000; i++) { + LONGS_EQUAL(status::StatusOK, + meter.write(new_packet(uint16_t((seqnum + total_counter) & 0xFFFF), + ts + step_ts * total_counter, + sts + stream_step_ts * total_counter))); + UNSIGNED_LONGS_EQUAL(uint32_t(seqnum) + total_counter, + meter.metrics().ext_last_seqnum); + UNSIGNED_LONGS_EQUAL(total_counter + 1, meter.metrics().expected_packets); + + UNSIGNED_LONGS_EQUAL(i + 1, queue.size()); + + total_counter += 1; + } +} + TEST(link_meter, forward_error) { const status::StatusCode status_list[] = { status::StatusErrDevice, @@ -122,10 +402,11 @@ TEST(link_meter, forward_error) { for (size_t st_n = 0; st_n < ROC_ARRAY_SIZE(status_list); st_n++) { test::StatusWriter writer(status_list[st_n]); - LinkMeter meter(encoding_map); + LinkMeter meter(arena, encoding_map, sample_spec, make_config(), NULL); meter.set_writer(writer); - LONGS_EQUAL(status_list[st_n], meter.write(new_packet(100))); + LONGS_EQUAL(status_list[st_n], + meter.write(new_packet(100, start_ts, stream_start_ts))); } } diff --git a/src/tools/roc_recv/cmdline.ggo b/src/tools/roc_recv/cmdline.ggo index a3925d1fd..1056a2f8b 100644 --- a/src/tools/roc_recv/cmdline.ggo +++ b/src/tools/roc_recv/cmdline.ggo @@ -29,15 +29,24 @@ section "Options" option "reuseaddr" - "enable SO_REUSEADDR when binding sockets" optional - option "target-latency" - "Target latency, TIME units" + option "io-latency" - "Playback target latency, TIME units" string optional - option "io-latency" - "Playback target latency, TIME units" + option "target-latency" - "Target latency, TIME units" string optional option "latency-tolerance" - "Maximum deviation from target latency, TIME units" string optional + option "start-latency" - "Start latency, target-latency must be 'auto' or unset, TIME units" + string optional + + option "min-latency" - "Minimum allowed latency, target-latency and latency-tolerance must be 0 or unset, TIME units" + string optional + + option "max-latency" - "Maximum allowed latency, target-latency and latency-tolerance must be 0 or unset, TIME units" + string optional + option "no-play-timeout" - "No playback timeout, TIME units" string optional @@ -76,6 +85,9 @@ section "Options" option "profiling" - "Enable self-profiling" flag off + option "dump" - "Path for a CSV file where to dump some useful run-time metrics" + string optional + option "color" - "Set colored logging mode for stderr output" values="auto","always","never" default="auto" enum optional diff --git a/src/tools/roc_recv/main.cpp b/src/tools/roc_recv/main.cpp index b674ba4aa..9dfe9e043 100644 --- a/src/tools/roc_recv/main.cpp +++ b/src/tools/roc_recv/main.cpp @@ -105,15 +105,19 @@ int main(int argc, char** argv) { io_config.frame_length, receiver_config.common.output_sample_spec); if (args.target_latency_given) { - if (!core::parse_duration( - args.target_latency_arg, - receiver_config.session_defaults.latency.target_latency)) { - roc_log(LogError, "invalid --target-latency: bad format"); - return 1; - } - if (receiver_config.session_defaults.latency.target_latency <= 0) { - roc_log(LogError, "invalid --target-latency: should be > 0"); - return 1; + if (strcmp(args.target_latency_arg, "auto") == 0) { + receiver_config.session_defaults.latency.target_latency = 0; + } else { + if (!core::parse_duration( + args.target_latency_arg, + receiver_config.session_defaults.latency.target_latency)) { + roc_log(LogError, "invalid --target-latency: bad format"); + return 1; + } + if (receiver_config.session_defaults.latency.target_latency <= 0) { + roc_log(LogError, "invalid --target-latency: should be 'auto' or > 0"); + return 1; + } } } @@ -130,6 +134,61 @@ int main(int argc, char** argv) { } } + if (args.start_latency_given && args.target_latency_given + && receiver_config.session_defaults.latency.target_latency != 0) { + roc_log(LogError, + "--start-latency must be > 0 if --target-latency='auto'" + " or unset"); + return 1; + } else if (args.start_latency_given) { + if (!core::parse_duration( + args.start_latency_arg, + receiver_config.session_defaults.latency.start_latency)) { + roc_log(LogError, "invalid --start-latency: bad format"); + return 1; + } + if (receiver_config.session_defaults.latency.start_latency <= 0) { + roc_log(LogError, "invalid --start-latency: should be > 0"); + return 1; + } + } + + if (args.min_latency_given || args.max_latency_given) { + if (!args.min_latency_given || !args.max_latency_given) { + roc_log(LogError, + "--min-latency and --max-latency should be specified together"); + return 1; + } + + if (!core::parse_duration(args.min_latency_arg, + receiver_config.session_defaults.latency.min_latency)) { + roc_log(LogError, "invalid --min-latency: bad format"); + return 1; + } + + if (!core::parse_duration(args.max_latency_arg, + receiver_config.session_defaults.latency.max_latency)) { + roc_log(LogError, "invalid --max-latency: bad format"); + return 1; + } + if (receiver_config.session_defaults.latency.min_latency <= 0) { + roc_log(LogError, "invalid --min-latency: should be > 0"); + return 1; + } else if (receiver_config.session_defaults.latency.min_latency + > receiver_config.session_defaults.latency.max_latency) { + roc_log(LogError, + "incorrect --max-latency: must be greate or equal to" + " --min-latency"); + return 1; + } + } else if ((!args.min_latency_given || !args.max_latency_given) + && receiver_config.session_defaults.latency.target_latency == 0) { + roc_log(LogError, + "--min-latency and --max-latency must be > 0 " + " if --target-latency=auto"); + return 1; + } + if (args.no_play_timeout_given) { if (!core::parse_duration( args.no_play_timeout_arg, @@ -405,6 +464,10 @@ int main(int argc, char** argv) { } } + if (args.dump_given) { + receiver_config.common.dumper.dump_file = args.dump_arg; + } + node::Receiver receiver(context, receiver_config); if (receiver.init_status() != status::StatusOK) { roc_log(LogError, "can't create receiver node: status=%s", diff --git a/src/tools/roc_send/cmdline.ggo b/src/tools/roc_send/cmdline.ggo index 90e48d170..924aa8fa8 100644 --- a/src/tools/roc_send/cmdline.ggo +++ b/src/tools/roc_send/cmdline.ggo @@ -20,10 +20,10 @@ section "Options" option "reuseaddr" - "enable SO_REUSEADDR when binding sockets" optional - option "target-latency" - "Target latency, TIME units" + option "io-latency" - "Recording target latency, TIME units" string optional - option "io-latency" - "Recording target latency, TIME units" + option "target-latency" - "Target latency, TIME units" string optional option "latency-tolerance" - "Maximum deviation from target latency, TIME units" @@ -66,6 +66,9 @@ section "Options" option "profiling" - "Enable self profiling" flag off + option "dump" - "Path for a CSV file where to dump some useful run-time metrics" + string optional + option "color" - "Set colored logging mode for stderr output" values="auto","always","never" default="auto" enum optional diff --git a/src/tools/roc_send/main.cpp b/src/tools/roc_send/main.cpp index 83d16f8b7..428c5254a 100644 --- a/src/tools/roc_send/main.cpp +++ b/src/tools/roc_send/main.cpp @@ -443,6 +443,10 @@ int main(int argc, char** argv) { return 1; } + if (args.dump_given) { + sender_config.dump_file = args.dump_arg; + } + sndio::Config pump_config; pump_config.sample_spec = input_source->sample_spec(); pump_config.frame_length = io_config.frame_length;