From 6584217d5ab55eda2db88ce5755a3e929df9ad1f Mon Sep 17 00:00:00 2001 From: Mikhail Baranov Date: Sun, 28 Apr 2024 19:18:46 +0200 Subject: [PATCH] FEC estimates block duration Reader and writer esimate duration of FEC block in ns as this value could vary even though number of packets is known. For now it is decided to keep the greates value seen in the current session, not average. --- .../roc_audio/latency_monitor.cpp | 8 +++ .../roc_audio/latency_monitor.h | 4 +- .../roc_audio/latency_tuner.h | 6 +- src/internal_modules/roc_fec/reader.cpp | 58 +++++++++++++++---- src/internal_modules/roc_fec/reader.h | 13 ++++- src/internal_modules/roc_fec/writer.cpp | 30 +++++++++- src/internal_modules/roc_fec/writer.h | 9 +++ .../roc_pipeline/receiver_session.cpp | 2 +- src/tests/roc_fec/test_writer_reader.cpp | 44 +++++++++----- 9 files changed, 143 insertions(+), 31 deletions(-) diff --git a/src/internal_modules/roc_audio/latency_monitor.cpp b/src/internal_modules/roc_audio/latency_monitor.cpp index dc90658c2..ec00a6475 100644 --- a/src/internal_modules/roc_audio/latency_monitor.cpp +++ b/src/internal_modules/roc_audio/latency_monitor.cpp @@ -21,6 +21,7 @@ LatencyMonitor::LatencyMonitor(IFrameReader& frame_reader, const packet::SortedQueue& incoming_queue, const Depacketizer& depacketizer, const packet::ILinkMeter& link_meter, + const fec::Reader* fec_reader, ResamplerReader* resampler, const LatencyConfig& config, const SampleSpec& packet_sample_spec, @@ -30,6 +31,7 @@ LatencyMonitor::LatencyMonitor(IFrameReader& frame_reader, , incoming_queue_(incoming_queue) , depacketizer_(depacketizer) , link_meter_(link_meter) + , fec_reader_(fec_reader) , resampler_(resampler) , enable_scaling_(config.tuner_profile != audio::LatencyTunerProfile_Intact) , capture_ts_(0) @@ -102,6 +104,12 @@ bool LatencyMonitor::reclock(const core::nanoseconds_t playback_timestamp) { } bool LatencyMonitor::pre_process_(const Frame& frame) { + if (fec_reader_) { + latency_metrics_.fec_block_duration = + packet_sample_spec_.stream_timestamp_2_ns(fec_reader_->max_block_duration()); + } else { + latency_metrics_.fec_block_duration = 0; + } tuner_.write_metrics(latency_metrics_, link_metrics_); if (!tuner_.update_stream()) { diff --git a/src/internal_modules/roc_audio/latency_monitor.h b/src/internal_modules/roc_audio/latency_monitor.h index 0972b053b..b09949f81 100644 --- a/src/internal_modules/roc_audio/latency_monitor.h +++ b/src/internal_modules/roc_audio/latency_monitor.h @@ -22,7 +22,7 @@ #include "roc_core/noncopyable.h" #include "roc_core/optional.h" #include "roc_core/time.h" -#include "roc_packet/ilink_meter.h" +#include "roc_fec/reader.h" #include "roc_packet/sorted_queue.h" #include "roc_packet/units.h" @@ -64,6 +64,7 @@ class LatencyMonitor : public IFrameReader, public core::NonCopyable<> { const packet::SortedQueue& incoming_queue, const Depacketizer& depacketizer, const packet::ILinkMeter& link_meter, + const fec::Reader* fec_reader, ResamplerReader* resampler, const LatencyConfig& config, const SampleSpec& packet_sample_spec, @@ -112,6 +113,7 @@ class LatencyMonitor : public IFrameReader, public core::NonCopyable<> { const packet::SortedQueue& incoming_queue_; const Depacketizer& depacketizer_; const packet::ILinkMeter& link_meter_; + const fec::Reader* fec_reader_; ResamplerReader* resampler_; const bool enable_scaling_; diff --git a/src/internal_modules/roc_audio/latency_tuner.h b/src/internal_modules/roc_audio/latency_tuner.h index 3b5413858..697e77c0a 100644 --- a/src/internal_modules/roc_audio/latency_tuner.h +++ b/src/internal_modules/roc_audio/latency_tuner.h @@ -151,10 +151,14 @@ struct LatencyMetrics { //! on receiver. core::nanoseconds_t e2e_latency; + //! Estimated FEC block duration. + core::nanoseconds_t fec_block_duration; + LatencyMetrics() : niq_latency(0) , niq_stalling(0) - , e2e_latency(0) { + , e2e_latency(0) + , fec_block_duration(0) { } }; diff --git a/src/internal_modules/roc_fec/reader.cpp b/src/internal_modules/roc_fec/reader.cpp index 0f0251f69..97be82d16 100644 --- a/src/internal_modules/roc_fec/reader.cpp +++ b/src/internal_modules/roc_fec/reader.cpp @@ -17,13 +17,13 @@ namespace fec { Reader::Reader(const ReaderConfig& config, packet::FecScheme fec_scheme, - IBlockDecoder& decoder, + IBlockDecoder& block_decoder, packet::IReader& source_reader, packet::IReader& repair_reader, packet::IParser& parser, packet::PacketFactory& packet_factory, core::IArena& arena) - : decoder_(decoder) + : block_decoder_(block_decoder) , source_reader_(source_reader) , repair_reader_(repair_reader) , parser_(parser) @@ -43,6 +43,8 @@ Reader::Reader(const ReaderConfig& config, , repair_block_resized_(false) , payload_resized_(false) , n_packets_(0) + , prev_block_timestamp_valid_(false) + , block_max_duration_(0) , max_sbn_jump_(config.max_sbn_jump) , fec_scheme_(fec_scheme) { valid_ = true; @@ -139,6 +141,13 @@ status::StatusCode Reader::get_next_packet_(packet::PacketPtr& ptr) { fill_block_(); packet::PacketPtr pp = source_block_[next_packet_]; + if (next_packet_ == 0) { + if (pp) { + update_block_duration_(pp); + } else { + prev_block_timestamp_valid_ = false; + } + } do { if (!alive_) { @@ -162,7 +171,6 @@ status::StatusCode Reader::get_next_packet_(packet::PacketPtr& ptr) { } else { pp = source_block_[pos++]; } - next_packet_ = pos; } else { next_packet_++; @@ -210,7 +218,8 @@ void Reader::try_repair_() { return; } - if (!decoder_.begin(source_block_.size(), repair_block_.size(), payload_size_)) { + if (!block_decoder_.begin(source_block_.size(), repair_block_.size(), + payload_size_)) { roc_log(LogDebug, "fec reader: can't begin decoder block, shutting down:" " sbl=%lu rbl=%lu payload_size=%lu", @@ -224,14 +233,14 @@ void Reader::try_repair_() { if (!source_block_[n]) { continue; } - decoder_.set(n, source_block_[n]->fec()->payload); + block_decoder_.set(n, source_block_[n]->fec()->payload); } for (size_t n = 0; n < repair_block_.size(); n++) { if (!repair_block_[n]) { continue; } - decoder_.set(source_block_.size() + n, repair_block_[n]->fec()->payload); + block_decoder_.set(source_block_.size() + n, repair_block_[n]->fec()->payload); } for (size_t n = 0; n < source_block_.size(); n++) { @@ -239,7 +248,7 @@ void Reader::try_repair_() { continue; } - core::Slice buffer = decoder_.repair(n); + core::Slice buffer = block_decoder_.repair(n); if (!buffer) { continue; } @@ -252,7 +261,7 @@ void Reader::try_repair_() { source_block_[n] = pp; } - decoder_.end(); + block_decoder_.end(); can_repair_ = false; } @@ -655,12 +664,12 @@ bool Reader::can_update_source_block_size_(size_t new_sblen) { return false; } - if (new_sblen > decoder_.max_block_length()) { + if (new_sblen > block_decoder_.max_block_length()) { roc_log(LogDebug, "fec reader: can't change source block size above maximum, shutting down:" " cur_sblen=%lu new_sblen=%lu max_blen=%lu", (unsigned long)cur_sblen, (unsigned long)new_sblen, - (unsigned long)decoder_.max_block_length()); + (unsigned long)block_decoder_.max_block_length()); return (alive_ = false); } @@ -675,6 +684,9 @@ bool Reader::update_source_block_size_(size_t new_sblen) { return true; } + prev_block_timestamp_valid_ = false; + block_max_duration_ = 0; + if (!source_block_.resize(new_sblen)) { roc_log(LogDebug, "fec reader: can't allocate source block memory, shutting down:" @@ -710,12 +722,12 @@ bool Reader::can_update_repair_block_size_(size_t new_blen) { return false; } - if (new_blen > decoder_.max_block_length()) { + if (new_blen > block_decoder_.max_block_length()) { roc_log(LogDebug, "fec reader: can't change repair block size above maximum, shutting down:" " cur_blen=%lu new_blen=%lu max_blen=%lu", (unsigned long)cur_blen, (unsigned long)new_blen, - (unsigned long)decoder_.max_block_length()); + (unsigned long)block_decoder_.max_block_length()); return (alive_ = false); } @@ -733,6 +745,9 @@ bool Reader::update_repair_block_size_(size_t new_blen) { return true; } + prev_block_timestamp_valid_ = false; + block_max_duration_ = 0; + // shoud not happen: sblen should be validated and updated already roc_panic_if_not(new_blen > cur_sblen); @@ -789,5 +804,24 @@ void Reader::drop_repair_packets_from_prev_blocks_() { } } +void Reader::update_block_duration_(const packet::PacketPtr& ptr) { + if (!ptr->rtp()) { + return; + } + packet::stream_timestamp_diff_t block_dur = 0; + if (prev_block_timestamp_valid_) { + block_dur = packet::stream_timestamp_diff(ptr->rtp()->stream_timestamp, + prev_block_timestamp_); + } + roc_panic_if_msg(block_dur < 0, "fec reader: negative block duration"); + block_max_duration_ = std::max(block_max_duration_, block_dur); + prev_block_timestamp_ = ptr->rtp()->stream_timestamp; + prev_block_timestamp_valid_ = true; +} + +packet::stream_timestamp_t Reader::max_block_duration() const { + return (packet::stream_timestamp_t)block_max_duration_; +} + } // namespace fec } // namespace roc diff --git a/src/internal_modules/roc_fec/reader.h b/src/internal_modules/roc_fec/reader.h index bacc64120..3b418344f 100644 --- a/src/internal_modules/roc_fec/reader.h +++ b/src/internal_modules/roc_fec/reader.h @@ -50,7 +50,7 @@ class Reader : public packet::IReader, public core::NonCopyable<> { //! - @p arena is used to initialize a packet array Reader(const ReaderConfig& config, packet::FecScheme fec_scheme, - IBlockDecoder& decoder, + IBlockDecoder& block_decoder, packet::IReader& source_reader, packet::IReader& repair_reader, packet::IParser& parser, @@ -71,6 +71,9 @@ class Reader : public packet::IReader, public core::NonCopyable<> { //! When a packet loss is detected, try to restore it from repair packets. virtual ROC_ATTR_NODISCARD status::StatusCode read(packet::PacketPtr&); + //! Get maximal FEC block duratoin seen since last block resize. + packet::stream_timestamp_t max_block_duration() const; + private: status::StatusCode read_(packet::PacketPtr&); @@ -108,7 +111,9 @@ class Reader : public packet::IReader, public core::NonCopyable<> { void drop_repair_packets_from_prev_blocks_(); - IBlockDecoder& decoder_; + void update_block_duration_(const packet::PacketPtr& ptr); + + IBlockDecoder& block_decoder_; packet::IReader& source_reader_; packet::IReader& repair_reader_; @@ -138,6 +143,10 @@ class Reader : public packet::IReader, public core::NonCopyable<> { unsigned n_packets_; + bool prev_block_timestamp_valid_; + packet::stream_timestamp_t prev_block_timestamp_; + packet::stream_timestamp_diff_t block_max_duration_; + const size_t max_sbn_jump_; const packet::FecScheme fec_scheme_; }; diff --git a/src/internal_modules/roc_fec/writer.cpp b/src/internal_modules/roc_fec/writer.cpp index c52c435ad..62178ed01 100644 --- a/src/internal_modules/roc_fec/writer.cpp +++ b/src/internal_modules/roc_fec/writer.cpp @@ -41,7 +41,10 @@ Writer::Writer(const WriterConfig& config, , cur_packet_(0) , fec_scheme_(fec_scheme) , valid_(false) - , alive_(true) { + , alive_(true) + , prev_block_timestamp_valid_(false) + , prev_block_timestamp_(0) + , block_max_duration_(0) { cur_sbn_ = (packet::blknum_t)core::fast_random_range(0, packet::blknum_t(-1)); cur_block_repair_sn_ = (packet::seqnum_t)core::fast_random_range(0, packet::seqnum_t(-1)); @@ -90,6 +93,8 @@ bool Writer::resize(size_t sblen, size_t rblen) { next_sblen_ = sblen; next_rblen_ = rblen; + prev_block_timestamp_valid_ = false; + return true; } @@ -109,6 +114,7 @@ status::StatusCode Writer::write(const packet::PacketPtr& pp) { } if (cur_packet_ == 0) { + update_block_duration_(pp); if (!begin_block_(pp)) { // TODO(gh-183): return status return status::StatusOK; @@ -342,5 +348,27 @@ bool Writer::validate_source_packet_(const packet::PacketPtr& pp) { return true; } +void Writer::update_block_duration_(const packet::PacketPtr& ptr) { + if (!ptr->rtp()) { + return; + } + packet::stream_timestamp_diff_t block_dur = 0; + if (prev_block_timestamp_valid_) { + block_dur = packet::stream_timestamp_diff(ptr->rtp()->stream_timestamp, + prev_block_timestamp_); + } + if (block_dur < 0) { + prev_block_timestamp_valid_ = false; + } else { + block_max_duration_ = std::max(block_max_duration_, block_dur); + prev_block_timestamp_ = ptr->rtp()->stream_timestamp; + prev_block_timestamp_valid_ = true; + } +} + +packet::stream_timestamp_t Writer::max_block_duration() const { + return (packet::stream_timestamp_t)block_max_duration_; +} + } // namespace fec } // namespace roc diff --git a/src/internal_modules/roc_fec/writer.h b/src/internal_modules/roc_fec/writer.h index 675169e2e..18a8e3e2f 100644 --- a/src/internal_modules/roc_fec/writer.h +++ b/src/internal_modules/roc_fec/writer.h @@ -79,6 +79,9 @@ class Writer : public packet::IWriter, public core::NonCopyable<> { //! - generates repair packets and also writes them to the output writer virtual ROC_ATTR_NODISCARD status::StatusCode write(const packet::PacketPtr&); + //! Get maximal FEC block duratoin seen since last block resize. + packet::stream_timestamp_t max_block_duration() const; + private: bool begin_block_(const packet::PacketPtr& pp); void end_block_(); @@ -97,6 +100,8 @@ class Writer : public packet::IWriter, public core::NonCopyable<> { void validate_fec_packet_(const packet::PacketPtr&); bool validate_source_packet_(const packet::PacketPtr&); + void update_block_duration_(const packet::PacketPtr& ptr); + size_t cur_sblen_; size_t next_sblen_; @@ -127,6 +132,10 @@ class Writer : public packet::IWriter, public core::NonCopyable<> { bool valid_; bool alive_; + + bool prev_block_timestamp_valid_; + packet::stream_timestamp_t prev_block_timestamp_; + packet::stream_timestamp_diff_t block_max_duration_; }; } // namespace fec diff --git a/src/internal_modules/roc_pipeline/receiver_session.cpp b/src/internal_modules/roc_pipeline/receiver_session.cpp index 18c98888e..cdf818090 100644 --- a/src/internal_modules/roc_pipeline/receiver_session.cpp +++ b/src/internal_modules/roc_pipeline/receiver_session.cpp @@ -216,7 +216,7 @@ ReceiverSession::ReceiverSession( } latency_monitor_.reset(new (latency_monitor_) audio::LatencyMonitor( - *frm_reader, *source_queue_, *depacketizer_, *source_meter_, + *frm_reader, *source_queue_, *depacketizer_, *source_meter_, fec_reader_.get(), resampler_reader_.get(), session_config.latency, pkt_encoding->sample_spec, common_config.output_sample_spec)); if (!latency_monitor_ || !latency_monitor_->is_valid()) { diff --git a/src/tests/roc_fec/test_writer_reader.cpp b/src/tests/roc_fec/test_writer_reader.cpp index 6adc05b92..a0341e8c5 100644 --- a/src/tests/roc_fec/test_writer_reader.cpp +++ b/src/tests/roc_fec/test_writer_reader.cpp @@ -225,6 +225,7 @@ TEST_GROUP(writer_reader) { TEST(writer_reader, no_losses) { for (size_t n_scheme = 0; n_scheme < CodecMap::instance().num_schemes(); n_scheme++) { + const size_t n_blocks = 5; codec_config.scheme = CodecMap::instance().nth_scheme(n_scheme); core::ScopedPtr encoder( @@ -251,22 +252,33 @@ TEST(writer_reader, no_losses) { CHECK(writer.is_valid()); CHECK(reader.is_valid()); - fill_all_packets(0); + for (size_t i_block = 0; i_block < n_blocks; ++i_block) { + fill_all_packets(i_block * NumSourcePackets); - for (size_t i = 0; i < NumSourcePackets; ++i) { - UNSIGNED_LONGS_EQUAL(status::StatusOK, writer.write(source_packets[i])); - } - dispatcher.push_stocks(); + for (size_t i = 0; i < NumSourcePackets; ++i) { + UNSIGNED_LONGS_EQUAL(status::StatusOK, writer.write(source_packets[i])); + } + if (i_block > 0) { + CHECK(writer.max_block_duration() == NumSourcePackets * 10); + } + dispatcher.push_stocks(); - UNSIGNED_LONGS_EQUAL(NumSourcePackets, dispatcher.source_size()); - UNSIGNED_LONGS_EQUAL(NumRepairPackets, dispatcher.repair_size()); + UNSIGNED_LONGS_EQUAL(NumSourcePackets, dispatcher.source_size()); + UNSIGNED_LONGS_EQUAL(NumRepairPackets, dispatcher.repair_size()); - for (size_t i = 0; i < NumSourcePackets; ++i) { - packet::PacketPtr p; - UNSIGNED_LONGS_EQUAL(status::StatusOK, reader.read(p)); - CHECK(p); - check_audio_packet(p, i); - check_restored(p, false); + for (size_t i = 0; i < NumSourcePackets; ++i) { + packet::PacketPtr p; + UNSIGNED_LONGS_EQUAL(status::StatusOK, reader.read(p)); + if (i_block == 0) { + CHECK(reader.max_block_duration() == 0); + } else { + CHECK(reader.is_started()); + CHECK(reader.max_block_duration() == NumSourcePackets * 10); + } + CHECK(p); + check_audio_packet(p, i + i_block * NumSourcePackets); + check_restored(p, false); + } } } } @@ -352,6 +364,7 @@ TEST(writer_reader, lost_first_packet_in_first_block) { // Sending first block except first packet. fill_all_packets(0); dispatcher.lose(0); + CHECK(writer.max_block_duration() == 0); for (size_t i = 0; i < NumSourcePackets; ++i) { UNSIGNED_LONGS_EQUAL(status::StatusOK, writer.write(source_packets[i])); } @@ -361,6 +374,7 @@ TEST(writer_reader, lost_first_packet_in_first_block) { fill_all_packets(NumSourcePackets); for (size_t i = 0; i < NumSourcePackets; ++i) { UNSIGNED_LONGS_EQUAL(status::StatusOK, writer.write(source_packets[i])); + CHECK(writer.max_block_duration() == NumSourcePackets * 10); } dispatcher.push_stocks(); @@ -370,8 +384,12 @@ TEST(writer_reader, lost_first_packet_in_first_block) { UNSIGNED_LONGS_EQUAL(status::StatusOK, reader.read(p)); if (i < NumSourcePackets) { CHECK(!reader.is_started()); + CHECK(reader.max_block_duration() == 0); } else { CHECK(reader.is_started()); + // The first packet of the previous block was lost -- still unable to + // get the difference in ts. + CHECK(reader.max_block_duration() == 0); } check_audio_packet(p, i); check_restored(p, false);