diff --git a/src/internal_modules/roc_audio/latency_monitor.cpp b/src/internal_modules/roc_audio/latency_monitor.cpp index dc90658c2..ec00a6475 100644 --- a/src/internal_modules/roc_audio/latency_monitor.cpp +++ b/src/internal_modules/roc_audio/latency_monitor.cpp @@ -21,6 +21,7 @@ LatencyMonitor::LatencyMonitor(IFrameReader& frame_reader, const packet::SortedQueue& incoming_queue, const Depacketizer& depacketizer, const packet::ILinkMeter& link_meter, + const fec::Reader* fec_reader, ResamplerReader* resampler, const LatencyConfig& config, const SampleSpec& packet_sample_spec, @@ -30,6 +31,7 @@ LatencyMonitor::LatencyMonitor(IFrameReader& frame_reader, , incoming_queue_(incoming_queue) , depacketizer_(depacketizer) , link_meter_(link_meter) + , fec_reader_(fec_reader) , resampler_(resampler) , enable_scaling_(config.tuner_profile != audio::LatencyTunerProfile_Intact) , capture_ts_(0) @@ -102,6 +104,12 @@ bool LatencyMonitor::reclock(const core::nanoseconds_t playback_timestamp) { } bool LatencyMonitor::pre_process_(const Frame& frame) { + if (fec_reader_) { + latency_metrics_.fec_block_duration = + packet_sample_spec_.stream_timestamp_2_ns(fec_reader_->max_block_duration()); + } else { + latency_metrics_.fec_block_duration = 0; + } tuner_.write_metrics(latency_metrics_, link_metrics_); if (!tuner_.update_stream()) { diff --git a/src/internal_modules/roc_audio/latency_monitor.h b/src/internal_modules/roc_audio/latency_monitor.h index 0972b053b..b09949f81 100644 --- a/src/internal_modules/roc_audio/latency_monitor.h +++ b/src/internal_modules/roc_audio/latency_monitor.h @@ -22,7 +22,7 @@ #include "roc_core/noncopyable.h" #include "roc_core/optional.h" #include "roc_core/time.h" -#include "roc_packet/ilink_meter.h" +#include "roc_fec/reader.h" #include "roc_packet/sorted_queue.h" #include "roc_packet/units.h" @@ -64,6 +64,7 @@ class LatencyMonitor : public IFrameReader, public core::NonCopyable<> { const packet::SortedQueue& incoming_queue, const Depacketizer& depacketizer, const packet::ILinkMeter& link_meter, + const fec::Reader* fec_reader, ResamplerReader* resampler, const LatencyConfig& config, const SampleSpec& packet_sample_spec, @@ -112,6 +113,7 @@ class LatencyMonitor : public IFrameReader, public core::NonCopyable<> { const packet::SortedQueue& incoming_queue_; const Depacketizer& depacketizer_; const packet::ILinkMeter& link_meter_; + const fec::Reader* fec_reader_; ResamplerReader* resampler_; const bool enable_scaling_; diff --git a/src/internal_modules/roc_audio/latency_tuner.h b/src/internal_modules/roc_audio/latency_tuner.h index 3b5413858..697e77c0a 100644 --- a/src/internal_modules/roc_audio/latency_tuner.h +++ b/src/internal_modules/roc_audio/latency_tuner.h @@ -151,10 +151,14 @@ struct LatencyMetrics { //! on receiver. core::nanoseconds_t e2e_latency; + //! Estimated FEC block duration. + core::nanoseconds_t fec_block_duration; + LatencyMetrics() : niq_latency(0) , niq_stalling(0) - , e2e_latency(0) { + , e2e_latency(0) + , fec_block_duration(0) { } }; diff --git a/src/internal_modules/roc_fec/reader.cpp b/src/internal_modules/roc_fec/reader.cpp index 0f0251f69..97be82d16 100644 --- a/src/internal_modules/roc_fec/reader.cpp +++ b/src/internal_modules/roc_fec/reader.cpp @@ -17,13 +17,13 @@ namespace fec { Reader::Reader(const ReaderConfig& config, packet::FecScheme fec_scheme, - IBlockDecoder& decoder, + IBlockDecoder& block_decoder, packet::IReader& source_reader, packet::IReader& repair_reader, packet::IParser& parser, packet::PacketFactory& packet_factory, core::IArena& arena) - : decoder_(decoder) + : block_decoder_(block_decoder) , source_reader_(source_reader) , repair_reader_(repair_reader) , parser_(parser) @@ -43,6 +43,8 @@ Reader::Reader(const ReaderConfig& config, , repair_block_resized_(false) , payload_resized_(false) , n_packets_(0) + , prev_block_timestamp_valid_(false) + , block_max_duration_(0) , max_sbn_jump_(config.max_sbn_jump) , fec_scheme_(fec_scheme) { valid_ = true; @@ -139,6 +141,13 @@ status::StatusCode Reader::get_next_packet_(packet::PacketPtr& ptr) { fill_block_(); packet::PacketPtr pp = source_block_[next_packet_]; + if (next_packet_ == 0) { + if (pp) { + update_block_duration_(pp); + } else { + prev_block_timestamp_valid_ = false; + } + } do { if (!alive_) { @@ -162,7 +171,6 @@ status::StatusCode Reader::get_next_packet_(packet::PacketPtr& ptr) { } else { pp = source_block_[pos++]; } - next_packet_ = pos; } else { next_packet_++; @@ -210,7 +218,8 @@ void Reader::try_repair_() { return; } - if (!decoder_.begin(source_block_.size(), repair_block_.size(), payload_size_)) { + if (!block_decoder_.begin(source_block_.size(), repair_block_.size(), + payload_size_)) { roc_log(LogDebug, "fec reader: can't begin decoder block, shutting down:" " sbl=%lu rbl=%lu payload_size=%lu", @@ -224,14 +233,14 @@ void Reader::try_repair_() { if (!source_block_[n]) { continue; } - decoder_.set(n, source_block_[n]->fec()->payload); + block_decoder_.set(n, source_block_[n]->fec()->payload); } for (size_t n = 0; n < repair_block_.size(); n++) { if (!repair_block_[n]) { continue; } - decoder_.set(source_block_.size() + n, repair_block_[n]->fec()->payload); + block_decoder_.set(source_block_.size() + n, repair_block_[n]->fec()->payload); } for (size_t n = 0; n < source_block_.size(); n++) { @@ -239,7 +248,7 @@ void Reader::try_repair_() { continue; } - core::Slice buffer = decoder_.repair(n); + core::Slice buffer = block_decoder_.repair(n); if (!buffer) { continue; } @@ -252,7 +261,7 @@ void Reader::try_repair_() { source_block_[n] = pp; } - decoder_.end(); + block_decoder_.end(); can_repair_ = false; } @@ -655,12 +664,12 @@ bool Reader::can_update_source_block_size_(size_t new_sblen) { return false; } - if (new_sblen > decoder_.max_block_length()) { + if (new_sblen > block_decoder_.max_block_length()) { roc_log(LogDebug, "fec reader: can't change source block size above maximum, shutting down:" " cur_sblen=%lu new_sblen=%lu max_blen=%lu", (unsigned long)cur_sblen, (unsigned long)new_sblen, - (unsigned long)decoder_.max_block_length()); + (unsigned long)block_decoder_.max_block_length()); return (alive_ = false); } @@ -675,6 +684,9 @@ bool Reader::update_source_block_size_(size_t new_sblen) { return true; } + prev_block_timestamp_valid_ = false; + block_max_duration_ = 0; + if (!source_block_.resize(new_sblen)) { roc_log(LogDebug, "fec reader: can't allocate source block memory, shutting down:" @@ -710,12 +722,12 @@ bool Reader::can_update_repair_block_size_(size_t new_blen) { return false; } - if (new_blen > decoder_.max_block_length()) { + if (new_blen > block_decoder_.max_block_length()) { roc_log(LogDebug, "fec reader: can't change repair block size above maximum, shutting down:" " cur_blen=%lu new_blen=%lu max_blen=%lu", (unsigned long)cur_blen, (unsigned long)new_blen, - (unsigned long)decoder_.max_block_length()); + (unsigned long)block_decoder_.max_block_length()); return (alive_ = false); } @@ -733,6 +745,9 @@ bool Reader::update_repair_block_size_(size_t new_blen) { return true; } + prev_block_timestamp_valid_ = false; + block_max_duration_ = 0; + // shoud not happen: sblen should be validated and updated already roc_panic_if_not(new_blen > cur_sblen); @@ -789,5 +804,24 @@ void Reader::drop_repair_packets_from_prev_blocks_() { } } +void Reader::update_block_duration_(const packet::PacketPtr& ptr) { + if (!ptr->rtp()) { + return; + } + packet::stream_timestamp_diff_t block_dur = 0; + if (prev_block_timestamp_valid_) { + block_dur = packet::stream_timestamp_diff(ptr->rtp()->stream_timestamp, + prev_block_timestamp_); + } + roc_panic_if_msg(block_dur < 0, "fec reader: negative block duration"); + block_max_duration_ = std::max(block_max_duration_, block_dur); + prev_block_timestamp_ = ptr->rtp()->stream_timestamp; + prev_block_timestamp_valid_ = true; +} + +packet::stream_timestamp_t Reader::max_block_duration() const { + return (packet::stream_timestamp_t)block_max_duration_; +} + } // namespace fec } // namespace roc diff --git a/src/internal_modules/roc_fec/reader.h b/src/internal_modules/roc_fec/reader.h index bacc64120..3b418344f 100644 --- a/src/internal_modules/roc_fec/reader.h +++ b/src/internal_modules/roc_fec/reader.h @@ -50,7 +50,7 @@ class Reader : public packet::IReader, public core::NonCopyable<> { //! - @p arena is used to initialize a packet array Reader(const ReaderConfig& config, packet::FecScheme fec_scheme, - IBlockDecoder& decoder, + IBlockDecoder& block_decoder, packet::IReader& source_reader, packet::IReader& repair_reader, packet::IParser& parser, @@ -71,6 +71,9 @@ class Reader : public packet::IReader, public core::NonCopyable<> { //! When a packet loss is detected, try to restore it from repair packets. virtual ROC_ATTR_NODISCARD status::StatusCode read(packet::PacketPtr&); + //! Get maximal FEC block duratoin seen since last block resize. + packet::stream_timestamp_t max_block_duration() const; + private: status::StatusCode read_(packet::PacketPtr&); @@ -108,7 +111,9 @@ class Reader : public packet::IReader, public core::NonCopyable<> { void drop_repair_packets_from_prev_blocks_(); - IBlockDecoder& decoder_; + void update_block_duration_(const packet::PacketPtr& ptr); + + IBlockDecoder& block_decoder_; packet::IReader& source_reader_; packet::IReader& repair_reader_; @@ -138,6 +143,10 @@ class Reader : public packet::IReader, public core::NonCopyable<> { unsigned n_packets_; + bool prev_block_timestamp_valid_; + packet::stream_timestamp_t prev_block_timestamp_; + packet::stream_timestamp_diff_t block_max_duration_; + const size_t max_sbn_jump_; const packet::FecScheme fec_scheme_; }; diff --git a/src/internal_modules/roc_fec/writer.cpp b/src/internal_modules/roc_fec/writer.cpp index c52c435ad..62178ed01 100644 --- a/src/internal_modules/roc_fec/writer.cpp +++ b/src/internal_modules/roc_fec/writer.cpp @@ -41,7 +41,10 @@ Writer::Writer(const WriterConfig& config, , cur_packet_(0) , fec_scheme_(fec_scheme) , valid_(false) - , alive_(true) { + , alive_(true) + , prev_block_timestamp_valid_(false) + , prev_block_timestamp_(0) + , block_max_duration_(0) { cur_sbn_ = (packet::blknum_t)core::fast_random_range(0, packet::blknum_t(-1)); cur_block_repair_sn_ = (packet::seqnum_t)core::fast_random_range(0, packet::seqnum_t(-1)); @@ -90,6 +93,8 @@ bool Writer::resize(size_t sblen, size_t rblen) { next_sblen_ = sblen; next_rblen_ = rblen; + prev_block_timestamp_valid_ = false; + return true; } @@ -109,6 +114,7 @@ status::StatusCode Writer::write(const packet::PacketPtr& pp) { } if (cur_packet_ == 0) { + update_block_duration_(pp); if (!begin_block_(pp)) { // TODO(gh-183): return status return status::StatusOK; @@ -342,5 +348,27 @@ bool Writer::validate_source_packet_(const packet::PacketPtr& pp) { return true; } +void Writer::update_block_duration_(const packet::PacketPtr& ptr) { + if (!ptr->rtp()) { + return; + } + packet::stream_timestamp_diff_t block_dur = 0; + if (prev_block_timestamp_valid_) { + block_dur = packet::stream_timestamp_diff(ptr->rtp()->stream_timestamp, + prev_block_timestamp_); + } + if (block_dur < 0) { + prev_block_timestamp_valid_ = false; + } else { + block_max_duration_ = std::max(block_max_duration_, block_dur); + prev_block_timestamp_ = ptr->rtp()->stream_timestamp; + prev_block_timestamp_valid_ = true; + } +} + +packet::stream_timestamp_t Writer::max_block_duration() const { + return (packet::stream_timestamp_t)block_max_duration_; +} + } // namespace fec } // namespace roc diff --git a/src/internal_modules/roc_fec/writer.h b/src/internal_modules/roc_fec/writer.h index 675169e2e..18a8e3e2f 100644 --- a/src/internal_modules/roc_fec/writer.h +++ b/src/internal_modules/roc_fec/writer.h @@ -79,6 +79,9 @@ class Writer : public packet::IWriter, public core::NonCopyable<> { //! - generates repair packets and also writes them to the output writer virtual ROC_ATTR_NODISCARD status::StatusCode write(const packet::PacketPtr&); + //! Get maximal FEC block duratoin seen since last block resize. + packet::stream_timestamp_t max_block_duration() const; + private: bool begin_block_(const packet::PacketPtr& pp); void end_block_(); @@ -97,6 +100,8 @@ class Writer : public packet::IWriter, public core::NonCopyable<> { void validate_fec_packet_(const packet::PacketPtr&); bool validate_source_packet_(const packet::PacketPtr&); + void update_block_duration_(const packet::PacketPtr& ptr); + size_t cur_sblen_; size_t next_sblen_; @@ -127,6 +132,10 @@ class Writer : public packet::IWriter, public core::NonCopyable<> { bool valid_; bool alive_; + + bool prev_block_timestamp_valid_; + packet::stream_timestamp_t prev_block_timestamp_; + packet::stream_timestamp_diff_t block_max_duration_; }; } // namespace fec diff --git a/src/internal_modules/roc_pipeline/receiver_session.cpp b/src/internal_modules/roc_pipeline/receiver_session.cpp index 18c98888e..cdf818090 100644 --- a/src/internal_modules/roc_pipeline/receiver_session.cpp +++ b/src/internal_modules/roc_pipeline/receiver_session.cpp @@ -216,7 +216,7 @@ ReceiverSession::ReceiverSession( } latency_monitor_.reset(new (latency_monitor_) audio::LatencyMonitor( - *frm_reader, *source_queue_, *depacketizer_, *source_meter_, + *frm_reader, *source_queue_, *depacketizer_, *source_meter_, fec_reader_.get(), resampler_reader_.get(), session_config.latency, pkt_encoding->sample_spec, common_config.output_sample_spec)); if (!latency_monitor_ || !latency_monitor_->is_valid()) { diff --git a/src/tests/roc_fec/test_writer_reader.cpp b/src/tests/roc_fec/test_writer_reader.cpp index 6adc05b92..a0341e8c5 100644 --- a/src/tests/roc_fec/test_writer_reader.cpp +++ b/src/tests/roc_fec/test_writer_reader.cpp @@ -225,6 +225,7 @@ TEST_GROUP(writer_reader) { TEST(writer_reader, no_losses) { for (size_t n_scheme = 0; n_scheme < CodecMap::instance().num_schemes(); n_scheme++) { + const size_t n_blocks = 5; codec_config.scheme = CodecMap::instance().nth_scheme(n_scheme); core::ScopedPtr encoder( @@ -251,22 +252,33 @@ TEST(writer_reader, no_losses) { CHECK(writer.is_valid()); CHECK(reader.is_valid()); - fill_all_packets(0); + for (size_t i_block = 0; i_block < n_blocks; ++i_block) { + fill_all_packets(i_block * NumSourcePackets); - for (size_t i = 0; i < NumSourcePackets; ++i) { - UNSIGNED_LONGS_EQUAL(status::StatusOK, writer.write(source_packets[i])); - } - dispatcher.push_stocks(); + for (size_t i = 0; i < NumSourcePackets; ++i) { + UNSIGNED_LONGS_EQUAL(status::StatusOK, writer.write(source_packets[i])); + } + if (i_block > 0) { + CHECK(writer.max_block_duration() == NumSourcePackets * 10); + } + dispatcher.push_stocks(); - UNSIGNED_LONGS_EQUAL(NumSourcePackets, dispatcher.source_size()); - UNSIGNED_LONGS_EQUAL(NumRepairPackets, dispatcher.repair_size()); + UNSIGNED_LONGS_EQUAL(NumSourcePackets, dispatcher.source_size()); + UNSIGNED_LONGS_EQUAL(NumRepairPackets, dispatcher.repair_size()); - for (size_t i = 0; i < NumSourcePackets; ++i) { - packet::PacketPtr p; - UNSIGNED_LONGS_EQUAL(status::StatusOK, reader.read(p)); - CHECK(p); - check_audio_packet(p, i); - check_restored(p, false); + for (size_t i = 0; i < NumSourcePackets; ++i) { + packet::PacketPtr p; + UNSIGNED_LONGS_EQUAL(status::StatusOK, reader.read(p)); + if (i_block == 0) { + CHECK(reader.max_block_duration() == 0); + } else { + CHECK(reader.is_started()); + CHECK(reader.max_block_duration() == NumSourcePackets * 10); + } + CHECK(p); + check_audio_packet(p, i + i_block * NumSourcePackets); + check_restored(p, false); + } } } } @@ -352,6 +364,7 @@ TEST(writer_reader, lost_first_packet_in_first_block) { // Sending first block except first packet. fill_all_packets(0); dispatcher.lose(0); + CHECK(writer.max_block_duration() == 0); for (size_t i = 0; i < NumSourcePackets; ++i) { UNSIGNED_LONGS_EQUAL(status::StatusOK, writer.write(source_packets[i])); } @@ -361,6 +374,7 @@ TEST(writer_reader, lost_first_packet_in_first_block) { fill_all_packets(NumSourcePackets); for (size_t i = 0; i < NumSourcePackets; ++i) { UNSIGNED_LONGS_EQUAL(status::StatusOK, writer.write(source_packets[i])); + CHECK(writer.max_block_duration() == NumSourcePackets * 10); } dispatcher.push_stocks(); @@ -370,8 +384,12 @@ TEST(writer_reader, lost_first_packet_in_first_block) { UNSIGNED_LONGS_EQUAL(status::StatusOK, reader.read(p)); if (i < NumSourcePackets) { CHECK(!reader.is_started()); + CHECK(reader.max_block_duration() == 0); } else { CHECK(reader.is_started()); + // The first packet of the previous block was lost -- still unable to + // get the difference in ts. + CHECK(reader.max_block_duration() == 0); } check_audio_packet(p, i); check_restored(p, false);