From de563358c4f6020528ab2e7fe01d27ee37218c81 Mon Sep 17 00:00:00 2001 From: Mikhail Baranov Date: Sun, 28 Apr 2024 19:18:46 +0200 Subject: [PATCH] gh-688 Issue 688: FEC estimates block duration --- .../roc_audio/latency_monitor.cpp | 14 +- .../roc_audio/latency_monitor.h | 6 +- .../roc_audio/latency_tuner.h | 6 +- src/internal_modules/roc_fec/reader.cpp | 59 +- src/internal_modules/roc_fec/reader.h | 13 +- src/internal_modules/roc_fec/writer.cpp | 28 +- src/internal_modules/roc_fec/writer.h | 9 + .../roc_pipeline/receiver_session.cpp | 2 +- src/tests/roc_fec/test_block_duration.cpp | 573 ++++++++++++++++++ src/tests/roc_fec/test_writer_reader.cpp | 47 +- 10 files changed, 721 insertions(+), 36 deletions(-) create mode 100644 src/tests/roc_fec/test_block_duration.cpp diff --git a/src/internal_modules/roc_audio/latency_monitor.cpp b/src/internal_modules/roc_audio/latency_monitor.cpp index 1df7b8400..8921fd65a 100644 --- a/src/internal_modules/roc_audio/latency_monitor.cpp +++ b/src/internal_modules/roc_audio/latency_monitor.cpp @@ -21,6 +21,7 @@ LatencyMonitor::LatencyMonitor(IFrameReader& frame_reader, const packet::SortedQueue& incoming_queue, const Depacketizer& depacketizer, const packet::ILinkMeter& link_meter, + const fec::Reader* fec_reader, ResamplerReader* resampler, const LatencyConfig& config, const SampleSpec& packet_sample_spec, @@ -30,6 +31,7 @@ LatencyMonitor::LatencyMonitor(IFrameReader& frame_reader, , incoming_queue_(incoming_queue) , depacketizer_(depacketizer) , link_meter_(link_meter) + , fec_reader_(fec_reader) , resampler_(resampler) , enable_scaling_(config.tuner_profile != audio::LatencyTunerProfile_Intact) , capture_ts_(0) @@ -71,7 +73,7 @@ bool LatencyMonitor::read(Frame& frame) { if (alive_) { compute_niq_latency_(); - query_link_meter_(); + query_metrics_(); if (!pre_process_(frame)) { alive_ = false; @@ -103,7 +105,6 @@ bool LatencyMonitor::reclock(const core::nanoseconds_t playback_timestamp) { bool LatencyMonitor::pre_process_(const Frame& frame) { tuner_.write_metrics(latency_metrics_, link_metrics_); - if (!tuner_.update_stream()) { // TODO(gh-183): forward status code return false; @@ -177,12 +178,19 @@ void LatencyMonitor::compute_e2e_latency_(const core::nanoseconds_t playback_tim latency_metrics_.e2e_latency = playback_timestamp - capture_ts_; } -void LatencyMonitor::query_link_meter_() { +void LatencyMonitor::query_metrics_() { if (!link_meter_.has_metrics()) { return; } link_metrics_ = link_meter_.metrics(); + + if (fec_reader_) { + latency_metrics_.fec_block_duration = + packet_sample_spec_.stream_timestamp_2_ns(fec_reader_->max_block_duration()); + } else { + latency_metrics_.fec_block_duration = 0; + } } bool LatencyMonitor::init_scaling_() { diff --git a/src/internal_modules/roc_audio/latency_monitor.h b/src/internal_modules/roc_audio/latency_monitor.h index 606ff8aa2..1598f3f43 100644 --- a/src/internal_modules/roc_audio/latency_monitor.h +++ b/src/internal_modules/roc_audio/latency_monitor.h @@ -22,7 +22,7 @@ #include "roc_core/noncopyable.h" #include "roc_core/optional.h" #include "roc_core/time.h" -#include "roc_packet/ilink_meter.h" +#include "roc_fec/reader.h" #include "roc_packet/sorted_queue.h" #include "roc_packet/units.h" @@ -64,6 +64,7 @@ class LatencyMonitor : public IFrameReader, public core::NonCopyable<> { const packet::SortedQueue& incoming_queue, const Depacketizer& depacketizer, const packet::ILinkMeter& link_meter, + const fec::Reader* fec_reader, ResamplerReader* resampler, const LatencyConfig& config, const SampleSpec& packet_sample_spec, @@ -94,7 +95,7 @@ class LatencyMonitor : public IFrameReader, public core::NonCopyable<> { private: void compute_niq_latency_(); void compute_e2e_latency_(core::nanoseconds_t playback_timestamp); - void query_link_meter_(); + void query_metrics_(); bool pre_process_(const Frame& frame); void post_process_(const Frame& frame); @@ -112,6 +113,7 @@ class LatencyMonitor : public IFrameReader, public core::NonCopyable<> { const packet::SortedQueue& incoming_queue_; const Depacketizer& depacketizer_; const packet::ILinkMeter& link_meter_; + const fec::Reader* fec_reader_; ResamplerReader* resampler_; const bool enable_scaling_; diff --git a/src/internal_modules/roc_audio/latency_tuner.h b/src/internal_modules/roc_audio/latency_tuner.h index bcc8688ab..a7e87b7a5 100644 --- a/src/internal_modules/roc_audio/latency_tuner.h +++ b/src/internal_modules/roc_audio/latency_tuner.h @@ -144,10 +144,14 @@ struct LatencyMetrics { //! on receiver. core::nanoseconds_t e2e_latency; + //! Estimated FEC block duration. + core::nanoseconds_t fec_block_duration; + LatencyMetrics() : niq_latency(0) , niq_stalling(0) - , e2e_latency(0) { + , e2e_latency(0) + , fec_block_duration(0) { } }; diff --git a/src/internal_modules/roc_fec/reader.cpp b/src/internal_modules/roc_fec/reader.cpp index 8c2872d78..ebc04d244 100644 --- a/src/internal_modules/roc_fec/reader.cpp +++ b/src/internal_modules/roc_fec/reader.cpp @@ -17,13 +17,13 @@ namespace fec { Reader::Reader(const ReaderConfig& config, packet::FecScheme fec_scheme, - IBlockDecoder& decoder, + IBlockDecoder& block_decoder, packet::IReader& source_reader, packet::IReader& repair_reader, packet::IParser& parser, packet::PacketFactory& packet_factory, core::IArena& arena) - : decoder_(decoder) + : block_decoder_(block_decoder) , source_reader_(source_reader) , repair_reader_(repair_reader) , parser_(parser) @@ -43,6 +43,8 @@ Reader::Reader(const ReaderConfig& config, , repair_block_resized_(false) , payload_resized_(false) , n_packets_(0) + , prev_block_timestamp_valid_(false) + , block_max_duration_(0) , max_sbn_jump_(config.max_sbn_jump) , fec_scheme_(fec_scheme) { valid_ = true; @@ -162,7 +164,6 @@ status::StatusCode Reader::get_next_packet_(packet::PacketPtr& ptr) { } else { pp = source_block_[pos++]; } - next_packet_ = pos; } else { next_packet_++; @@ -181,6 +182,12 @@ status::StatusCode Reader::get_next_packet_(packet::PacketPtr& ptr) { void Reader::next_block_() { roc_log(LogTrace, "fec reader: next block: sbn=%lu", (unsigned long)cur_sbn_); + if (source_block_[0]) { + update_block_duration_(source_block_[0]); + } else { + prev_block_timestamp_valid_ = false; + } + for (size_t n = 0; n < source_block_.size(); n++) { source_block_[n] = NULL; } @@ -210,7 +217,8 @@ void Reader::try_repair_() { return; } - if (!decoder_.begin(source_block_.size(), repair_block_.size(), payload_size_)) { + if (!block_decoder_.begin(source_block_.size(), repair_block_.size(), + payload_size_)) { roc_log(LogDebug, "fec reader: can't begin decoder block, shutting down:" " sbl=%lu rbl=%lu payload_size=%lu", @@ -224,14 +232,14 @@ void Reader::try_repair_() { if (!source_block_[n]) { continue; } - decoder_.set(n, source_block_[n]->fec()->payload); + block_decoder_.set(n, source_block_[n]->fec()->payload); } for (size_t n = 0; n < repair_block_.size(); n++) { if (!repair_block_[n]) { continue; } - decoder_.set(source_block_.size() + n, repair_block_[n]->fec()->payload); + block_decoder_.set(source_block_.size() + n, repair_block_[n]->fec()->payload); } for (size_t n = 0; n < source_block_.size(); n++) { @@ -239,7 +247,7 @@ void Reader::try_repair_() { continue; } - core::Slice buffer = decoder_.repair(n); + core::Slice buffer = block_decoder_.repair(n); if (!buffer) { continue; } @@ -252,7 +260,7 @@ void Reader::try_repair_() { source_block_[n] = pp; } - decoder_.end(); + block_decoder_.end(); can_repair_ = false; } @@ -655,12 +663,12 @@ bool Reader::can_update_source_block_size_(size_t new_sblen) { return false; } - if (new_sblen > decoder_.max_block_length()) { + if (new_sblen > block_decoder_.max_block_length()) { roc_log(LogDebug, "fec reader: can't change source block size above maximum, shutting down:" " cur_sblen=%lu new_sblen=%lu max_blen=%lu", (unsigned long)cur_sblen, (unsigned long)new_sblen, - (unsigned long)decoder_.max_block_length()); + (unsigned long)block_decoder_.max_block_length()); return (alive_ = false); } @@ -675,6 +683,9 @@ bool Reader::update_source_block_size_(size_t new_sblen) { return true; } + prev_block_timestamp_valid_ = false; + block_max_duration_ = 0; + if (!source_block_.resize(new_sblen)) { roc_log(LogDebug, "fec reader: can't allocate source block memory, shutting down:" @@ -710,12 +721,12 @@ bool Reader::can_update_repair_block_size_(size_t new_blen) { return false; } - if (new_blen > decoder_.max_block_length()) { + if (new_blen > block_decoder_.max_block_length()) { roc_log(LogDebug, "fec reader: can't change repair block size above maximum, shutting down:" " cur_blen=%lu new_blen=%lu max_blen=%lu", (unsigned long)cur_blen, (unsigned long)new_blen, - (unsigned long)decoder_.max_block_length()); + (unsigned long)block_decoder_.max_block_length()); return (alive_ = false); } @@ -733,7 +744,9 @@ bool Reader::update_repair_block_size_(size_t new_blen) { return true; } - // should not happen: sblen should be validated and updated already + prev_block_timestamp_valid_ = false; + block_max_duration_ = 0; + roc_panic_if_not(new_blen > cur_sblen); const size_t new_rblen = new_blen - cur_sblen; @@ -789,5 +802,25 @@ void Reader::drop_repair_packets_from_prev_blocks_() { } } +void Reader::update_block_duration_(const packet::PacketPtr& ptr) { + packet::stream_timestamp_diff_t block_dur = 0; + if (prev_block_timestamp_valid_) { + block_dur = + packet::stream_timestamp_diff(ptr->stream_timestamp(), prev_block_timestamp_); + } + if (block_dur < 0) { + roc_log(LogTrace, "fec reader: negative block duration"); + prev_block_timestamp_valid_ = false; + } else { + block_max_duration_ = std::max(block_max_duration_, block_dur); + prev_block_timestamp_ = ptr->stream_timestamp(); + prev_block_timestamp_valid_ = true; + } +} + +packet::stream_timestamp_t Reader::max_block_duration() const { + return (packet::stream_timestamp_t)block_max_duration_; +} + } // namespace fec } // namespace roc diff --git a/src/internal_modules/roc_fec/reader.h b/src/internal_modules/roc_fec/reader.h index bacc64120..3b418344f 100644 --- a/src/internal_modules/roc_fec/reader.h +++ b/src/internal_modules/roc_fec/reader.h @@ -50,7 +50,7 @@ class Reader : public packet::IReader, public core::NonCopyable<> { //! - @p arena is used to initialize a packet array Reader(const ReaderConfig& config, packet::FecScheme fec_scheme, - IBlockDecoder& decoder, + IBlockDecoder& block_decoder, packet::IReader& source_reader, packet::IReader& repair_reader, packet::IParser& parser, @@ -71,6 +71,9 @@ class Reader : public packet::IReader, public core::NonCopyable<> { //! When a packet loss is detected, try to restore it from repair packets. virtual ROC_ATTR_NODISCARD status::StatusCode read(packet::PacketPtr&); + //! Get maximal FEC block duratoin seen since last block resize. + packet::stream_timestamp_t max_block_duration() const; + private: status::StatusCode read_(packet::PacketPtr&); @@ -108,7 +111,9 @@ class Reader : public packet::IReader, public core::NonCopyable<> { void drop_repair_packets_from_prev_blocks_(); - IBlockDecoder& decoder_; + void update_block_duration_(const packet::PacketPtr& ptr); + + IBlockDecoder& block_decoder_; packet::IReader& source_reader_; packet::IReader& repair_reader_; @@ -138,6 +143,10 @@ class Reader : public packet::IReader, public core::NonCopyable<> { unsigned n_packets_; + bool prev_block_timestamp_valid_; + packet::stream_timestamp_t prev_block_timestamp_; + packet::stream_timestamp_diff_t block_max_duration_; + const size_t max_sbn_jump_; const packet::FecScheme fec_scheme_; }; diff --git a/src/internal_modules/roc_fec/writer.cpp b/src/internal_modules/roc_fec/writer.cpp index b86866854..93ec94691 100644 --- a/src/internal_modules/roc_fec/writer.cpp +++ b/src/internal_modules/roc_fec/writer.cpp @@ -39,7 +39,10 @@ Writer::Writer(const WriterConfig& config, , cur_packet_(0) , fec_scheme_(fec_scheme) , valid_(false) - , alive_(true) { + , alive_(true) + , prev_block_timestamp_valid_(false) + , prev_block_timestamp_(0) + , block_max_duration_(0) { cur_sbn_ = (packet::blknum_t)core::fast_random_range(0, packet::blknum_t(-1)); cur_block_repair_sn_ = (packet::seqnum_t)core::fast_random_range(0, packet::seqnum_t(-1)); @@ -88,6 +91,8 @@ bool Writer::resize(size_t sblen, size_t rblen) { next_sblen_ = sblen; next_rblen_ = rblen; + prev_block_timestamp_valid_ = false; + return true; } @@ -133,6 +138,8 @@ status::StatusCode Writer::write(const packet::PacketPtr& pp) { } bool Writer::begin_block_(const packet::PacketPtr& pp) { + update_block_duration_(pp); + if (!apply_sizes_(next_sblen_, next_rblen_, pp->fec()->payload.size())) { return false; } @@ -340,5 +347,24 @@ bool Writer::validate_source_packet_(const packet::PacketPtr& pp) { return true; } +void Writer::update_block_duration_(const packet::PacketPtr& ptr) { + packet::stream_timestamp_diff_t block_dur = 0; + if (prev_block_timestamp_valid_) { + block_dur = + packet::stream_timestamp_diff(ptr->stream_timestamp(), prev_block_timestamp_); + } + if (block_dur < 0) { + prev_block_timestamp_valid_ = false; + } else { + block_max_duration_ = std::max(block_max_duration_, block_dur); + prev_block_timestamp_ = ptr->stream_timestamp(); + prev_block_timestamp_valid_ = true; + } +} + +packet::stream_timestamp_t Writer::max_block_duration() const { + return (packet::stream_timestamp_t)block_max_duration_; +} + } // namespace fec } // namespace roc diff --git a/src/internal_modules/roc_fec/writer.h b/src/internal_modules/roc_fec/writer.h index 804510edf..15327b084 100644 --- a/src/internal_modules/roc_fec/writer.h +++ b/src/internal_modules/roc_fec/writer.h @@ -77,6 +77,9 @@ class Writer : public packet::IWriter, public core::NonCopyable<> { //! - generates repair packets and also writes them to the output writer virtual ROC_ATTR_NODISCARD status::StatusCode write(const packet::PacketPtr&); + //! Get maximal FEC block duratoin seen since last block resize. + packet::stream_timestamp_t max_block_duration() const; + private: bool begin_block_(const packet::PacketPtr& pp); void end_block_(); @@ -95,6 +98,8 @@ class Writer : public packet::IWriter, public core::NonCopyable<> { void validate_fec_packet_(const packet::PacketPtr&); bool validate_source_packet_(const packet::PacketPtr&); + void update_block_duration_(const packet::PacketPtr& ptr); + size_t cur_sblen_; size_t next_sblen_; @@ -124,6 +129,10 @@ class Writer : public packet::IWriter, public core::NonCopyable<> { bool valid_; bool alive_; + + bool prev_block_timestamp_valid_; + packet::stream_timestamp_t prev_block_timestamp_; + packet::stream_timestamp_diff_t block_max_duration_; }; } // namespace fec diff --git a/src/internal_modules/roc_pipeline/receiver_session.cpp b/src/internal_modules/roc_pipeline/receiver_session.cpp index 10c88dbe3..806dc6ca7 100644 --- a/src/internal_modules/roc_pipeline/receiver_session.cpp +++ b/src/internal_modules/roc_pipeline/receiver_session.cpp @@ -214,7 +214,7 @@ ReceiverSession::ReceiverSession(const ReceiverSessionConfig& session_config, } latency_monitor_.reset(new (latency_monitor_) audio::LatencyMonitor( - *frm_reader, *source_queue_, *depacketizer_, *source_meter_, + *frm_reader, *source_queue_, *depacketizer_, *source_meter_, fec_reader_.get(), resampler_reader_.get(), session_config.latency, pkt_encoding->sample_spec, common_config.output_sample_spec)); if (!latency_monitor_ || !latency_monitor_->is_valid()) { diff --git a/src/tests/roc_fec/test_block_duration.cpp b/src/tests/roc_fec/test_block_duration.cpp new file mode 100644 index 000000000..c2e4b99ff --- /dev/null +++ b/src/tests/roc_fec/test_block_duration.cpp @@ -0,0 +1,573 @@ +/* + * Copyright (c) 2015 Roc Streaming authors + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ + +#include + +#include "test_helpers/mock_arena.h" +#include "test_helpers/packet_dispatcher.h" + +#include "roc_core/heap_arena.h" +#include "roc_core/macro_helpers.h" +#include "roc_core/scoped_ptr.h" +#include "roc_fec/codec_map.h" +#include "roc_fec/composer.h" +#include "roc_fec/headers.h" +#include "roc_fec/parser.h" +#include "roc_fec/reader.h" +#include "roc_fec/writer.h" +#include "roc_packet/interleaver.h" +#include "roc_packet/packet_factory.h" +#include "roc_packet/queue.h" +#include "roc_rtp/composer.h" +#include "roc_rtp/encoding_map.h" +#include "roc_rtp/headers.h" +#include "roc_rtp/parser.h" + +namespace roc { +namespace fec { + +namespace { + +const size_t NumSourcePackets = 20; +const size_t NumRepairPackets = 10; + +const unsigned SourceID = 555; +const unsigned PayloadType = rtp::PayloadType_L16_Stereo; + +const size_t FECPayloadSize = 193; + +const size_t MaxBuffSize = 500; + +core::HeapArena arena; +packet::PacketFactory packet_factory(arena, MaxBuffSize); + +rtp::EncodingMap encoding_map(arena); +rtp::Parser rtp_parser(encoding_map, NULL); + +Parser rs8m_source_parser(&rtp_parser); +Parser rs8m_repair_parser(NULL); +Parser ldpc_source_parser(&rtp_parser); +Parser ldpc_repair_parser(NULL); + +rtp::Composer rtp_composer(NULL); +Composer rs8m_source_composer(&rtp_composer); +Composer rs8m_repair_composer(NULL); +Composer ldpc_source_composer(&rtp_composer); +Composer ldpc_repair_composer(NULL); + +class StatusReader : public packet::IReader { +public: + explicit StatusReader(status::StatusCode code) + : code_(code) { + } + + virtual ROC_ATTR_NODISCARD status::StatusCode read(packet::PacketPtr&) { + return code_; + } + +private: + status::StatusCode code_; +}; + +} // namespace + +TEST_GROUP(block_duration) { + packet::PacketPtr source_packets[NumSourcePackets]; + + CodecConfig codec_config; + WriterConfig writer_config; + ReaderConfig reader_config; + + void setup() { + writer_config.n_source_packets = NumSourcePackets; + writer_config.n_repair_packets = NumRepairPackets; + } + + packet::IParser& source_parser() { + switch (codec_config.scheme) { + case packet::FEC_ReedSolomon_M8: + return rs8m_source_parser; + case packet::FEC_LDPC_Staircase: + return ldpc_source_parser; + default: + roc_panic("bad scheme"); + } + } + + packet::IParser& repair_parser() { + switch (codec_config.scheme) { + case packet::FEC_ReedSolomon_M8: + return rs8m_repair_parser; + case packet::FEC_LDPC_Staircase: + return ldpc_repair_parser; + default: + roc_panic("bad scheme"); + } + } + + packet::IComposer& source_composer() { + switch (codec_config.scheme) { + case packet::FEC_ReedSolomon_M8: + return rs8m_source_composer; + case packet::FEC_LDPC_Staircase: + return ldpc_source_composer; + default: + roc_panic("bad scheme"); + } + } + + packet::IComposer& repair_composer() { + switch (codec_config.scheme) { + case packet::FEC_ReedSolomon_M8: + return rs8m_repair_composer; + case packet::FEC_LDPC_Staircase: + return ldpc_repair_composer; + default: + roc_panic("bad scheme"); + } + } + + void recompose_packet(const packet::PacketPtr& p) { + if (p->flags() & packet::Packet::FlagRepair) { + CHECK(repair_composer().compose(*p)); + } else { + CHECK(source_composer().compose(*p)); + } + } + + void fill_all_packets(size_t sn) { + for (size_t i = 0; i < NumSourcePackets; ++i) { + source_packets[i] = fill_one_packet(sn + i); + } + } + + packet::PacketPtr fill_one_packet(size_t sn, size_t fec_payload_size = FECPayloadSize, + packet::IComposer* composer = NULL) { + CHECK(fec_payload_size > sizeof(rtp::Header)); + const size_t rtp_payload_size = fec_payload_size - sizeof(rtp::Header); + + packet::PacketPtr pp = packet_factory.new_packet(); + CHECK(pp); + + core::Slice bp = packet_factory.new_packet_buffer(); + CHECK(bp); + + if (!composer) { + composer = &source_composer(); + } + CHECK(composer->prepare(*pp, bp, rtp_payload_size)); + + pp->set_buffer(bp); + + UNSIGNED_LONGS_EQUAL(rtp_payload_size, pp->rtp()->payload.size()); + UNSIGNED_LONGS_EQUAL(fec_payload_size, pp->fec()->payload.size()); + + pp->add_flags(packet::Packet::FlagAudio | packet::Packet::FlagPrepared); + + pp->rtp()->source_id = SourceID; + pp->rtp()->payload_type = PayloadType; + pp->rtp()->seqnum = packet::seqnum_t(sn); + pp->rtp()->stream_timestamp = packet::stream_timestamp_t(sn * 10); + + for (size_t i = 0; i < rtp_payload_size; i++) { + pp->rtp()->payload.data()[i] = uint8_t(sn + i); + } + + return pp; + } + + void check_audio_packet(packet::PacketPtr pp, size_t sn, + size_t fec_payload_size = FECPayloadSize) { + CHECK(fec_payload_size > sizeof(rtp::Header)); + const size_t rtp_payload_size = fec_payload_size - sizeof(rtp::Header); + + CHECK(pp); + + CHECK(pp->flags() & packet::Packet::FlagRTP); + CHECK(pp->flags() & packet::Packet::FlagAudio); + + CHECK(pp->rtp()); + CHECK(pp->rtp()->header); + CHECK(pp->rtp()->payload); + + UNSIGNED_LONGS_EQUAL(SourceID, pp->rtp()->source_id); + + UNSIGNED_LONGS_EQUAL(sn, pp->rtp()->seqnum); + UNSIGNED_LONGS_EQUAL(packet::stream_timestamp_t(sn * 10), + pp->rtp()->stream_timestamp); + + UNSIGNED_LONGS_EQUAL(PayloadType, pp->rtp()->payload_type); + UNSIGNED_LONGS_EQUAL(rtp_payload_size, pp->rtp()->payload.size()); + + for (size_t i = 0; i < rtp_payload_size; i++) { + UNSIGNED_LONGS_EQUAL(uint8_t(sn + i), pp->rtp()->payload.data()[i]); + } + } + + void check_restored(packet::PacketPtr p, bool restored) { + if (restored) { + CHECK((p->flags() & packet::Packet::FlagRestored) != 0); + CHECK(!p->fec()); + } else { + CHECK((p->flags() & packet::Packet::FlagRestored) == 0); + CHECK(p->fec()); + } + } +}; + +TEST(block_duration, no_losses) { + if (CodecMap::instance().num_schemes() == 0) { + return; + } + + const size_t n_blocks = 5; + codec_config.scheme = CodecMap::instance().nth_scheme(0); + + core::ScopedPtr encoder( + CodecMap::instance().new_encoder(codec_config, packet_factory, arena), arena); + + core::ScopedPtr decoder( + CodecMap::instance().new_decoder(codec_config, packet_factory, arena), arena); + + test::PacketDispatcher dispatcher(source_parser(), repair_parser(), packet_factory, + NumSourcePackets, NumRepairPackets); + + Writer writer(writer_config, codec_config.scheme, *encoder, dispatcher, + source_composer(), repair_composer(), packet_factory, arena); + + Reader reader(reader_config, codec_config.scheme, *decoder, + dispatcher.source_reader(), dispatcher.repair_reader(), rtp_parser, + packet_factory, arena); + + for (size_t i_block = 0; i_block < n_blocks; ++i_block) { + fill_all_packets(i_block * NumSourcePackets); + + for (size_t i = 0; i < NumSourcePackets; ++i) { + UNSIGNED_LONGS_EQUAL(status::StatusOK, writer.write(source_packets[i])); + } + if (i_block > 0) { + UNSIGNED_LONGS_EQUAL(NumSourcePackets * 10, writer.max_block_duration()); + } + dispatcher.push_stocks(); + + for (size_t i = 0; i < NumSourcePackets; ++i) { + packet::PacketPtr p; + UNSIGNED_LONGS_EQUAL(status::StatusOK, reader.read(p)); + if (i_block == 0) { + UNSIGNED_LONGS_EQUAL(0, reader.max_block_duration()); + } else { + CHECK(reader.is_started()); + if (i_block > 1) { + UNSIGNED_LONGS_EQUAL(NumSourcePackets * 10, + reader.max_block_duration()); + } + } + } + } +} + +TEST(block_duration, lost_first_packet_in_first_block) { + if (CodecMap::instance().num_schemes() == 0) { + return; + } + + codec_config.scheme = CodecMap::instance().nth_scheme(0); + + core::ScopedPtr encoder( + CodecMap::instance().new_encoder(codec_config, packet_factory, arena), arena); + + core::ScopedPtr decoder( + CodecMap::instance().new_decoder(codec_config, packet_factory, arena), arena); + + test::PacketDispatcher dispatcher(source_parser(), repair_parser(), packet_factory, + NumSourcePackets, NumRepairPackets); + + Writer writer(writer_config, codec_config.scheme, *encoder, dispatcher, + source_composer(), repair_composer(), packet_factory, arena); + + Reader reader(reader_config, codec_config.scheme, *decoder, + dispatcher.source_reader(), dispatcher.repair_reader(), rtp_parser, + packet_factory, arena); + + // Sending first block except first packet. + fill_all_packets(0); + dispatcher.lose(0); + UNSIGNED_LONGS_EQUAL(0, writer.max_block_duration()); + for (size_t i = 0; i < NumSourcePackets; ++i) { + UNSIGNED_LONGS_EQUAL(status::StatusOK, writer.write(source_packets[i])); + } + + // Sending 2nd, 3rd and 4th blocks lossless. + for (size_t i_block = 1; i_block < 4; i_block++) { + dispatcher.clear_losses(); + fill_all_packets(i_block * NumSourcePackets); + for (size_t i = 0; i < NumSourcePackets; ++i) { + UNSIGNED_LONGS_EQUAL(status::StatusOK, + writer.write(source_packets[i % NumSourcePackets])); + UNSIGNED_LONGS_EQUAL(NumSourcePackets * 10, writer.max_block_duration()); + } + dispatcher.push_stocks(); + } + + // Receive every sent packet except the first one. + for (size_t i = 1; i < NumSourcePackets * 4; ++i) { + packet::PacketPtr p; + UNSIGNED_LONGS_EQUAL(status::StatusOK, reader.read(p)); + if (i < NumSourcePackets * 3 - 1) { + UNSIGNED_LONGS_EQUAL(0, reader.max_block_duration()); + } else { + UNSIGNED_LONGS_EQUAL(NumSourcePackets * 10, reader.max_block_duration()); + } + } +} + +TEST(block_duration, lost_first_packet_in_third_block) { + if (CodecMap::instance().num_schemes() == 0) { + return; + } + + codec_config.scheme = CodecMap::instance().nth_scheme(0); + + core::ScopedPtr encoder( + CodecMap::instance().new_encoder(codec_config, packet_factory, arena), arena); + + core::ScopedPtr decoder( + CodecMap::instance().new_decoder(codec_config, packet_factory, arena), arena); + + test::PacketDispatcher dispatcher(source_parser(), repair_parser(), packet_factory, + NumSourcePackets, NumRepairPackets); + + Writer writer(writer_config, codec_config.scheme, *encoder, dispatcher, + source_composer(), repair_composer(), packet_factory, arena); + + Reader reader(reader_config, codec_config.scheme, *decoder, + dispatcher.source_reader(), dispatcher.repair_reader(), rtp_parser, + packet_factory, arena); + + // Sending first block except first packet. + UNSIGNED_LONGS_EQUAL(0, writer.max_block_duration()); + // Sending 1-4 blocks. + for (size_t i_block = 0; i_block < 4; i_block++) { + if (i_block == 2) { + dispatcher.lose(0); + } else { + dispatcher.clear_losses(); + } + fill_all_packets(i_block * NumSourcePackets); + for (size_t i = 0; i < NumSourcePackets; ++i) { + UNSIGNED_LONGS_EQUAL(status::StatusOK, + writer.write(source_packets[i % NumSourcePackets])); + if (i_block > 0) { + UNSIGNED_LONGS_EQUAL(NumSourcePackets * 10, writer.max_block_duration()); + } + } + dispatcher.push_stocks(); + } + + // Receive every sent packet except the first one. + for (size_t i = 1; i < NumSourcePackets * 4; ++i) { + packet::PacketPtr p; + UNSIGNED_LONGS_EQUAL(status::StatusOK, reader.read(p)); + if (i <= NumSourcePackets * 2 - 1) { + UNSIGNED_LONGS_EQUAL(0, reader.max_block_duration()); + } else { + UNSIGNED_LONGS_EQUAL(NumSourcePackets * 10, reader.max_block_duration()); + } + } +} + +TEST(block_duration, lost_almost_every_packet) { + if (CodecMap::instance().num_schemes() == 0) { + return; + } + + codec_config.scheme = CodecMap::instance().nth_scheme(0); + + core::ScopedPtr encoder( + CodecMap::instance().new_encoder(codec_config, packet_factory, arena), arena); + + core::ScopedPtr decoder( + CodecMap::instance().new_decoder(codec_config, packet_factory, arena), arena); + + test::PacketDispatcher dispatcher(source_parser(), repair_parser(), packet_factory, + NumSourcePackets, NumRepairPackets); + + Writer writer(writer_config, codec_config.scheme, *encoder, dispatcher, + source_composer(), repair_composer(), packet_factory, arena); + + Reader reader(reader_config, codec_config.scheme, *decoder, + dispatcher.source_reader(), dispatcher.repair_reader(), rtp_parser, + packet_factory, arena); + + // Sending first block except first packet. + UNSIGNED_LONGS_EQUAL(0, writer.max_block_duration()); + // Sending 1-4 blocks. + for (size_t i_block = 0; i_block < 4; i_block++) { + dispatcher.clear_losses(); + + fill_all_packets(i_block * NumSourcePackets); + for (size_t i = 0; i < NumSourcePackets; ++i) { + if (i > 0) { + dispatcher.lose(i); + } + UNSIGNED_LONGS_EQUAL(status::StatusOK, + writer.write(source_packets[i % NumSourcePackets])); + if (i_block > 0) { + UNSIGNED_LONGS_EQUAL(NumSourcePackets * 10, writer.max_block_duration()); + } + } + dispatcher.push_stocks(); + } + + // Receive every sent packet except the first one. + for (size_t i = 0; i < 4; ++i) { + packet::PacketPtr p; + UNSIGNED_LONGS_EQUAL(status::StatusOK, reader.read(p)); + UNSIGNED_LONGS_EQUAL(NumSourcePackets * 10 * i, p->stream_timestamp()); + if (i < 2) { + UNSIGNED_LONGS_EQUAL(0, reader.max_block_duration()); + } else { + UNSIGNED_LONGS_EQUAL(NumSourcePackets * 10, reader.max_block_duration()); + } + } +} + +TEST(block_duration, lost_single_block) { + if (CodecMap::instance().num_schemes() == 0) { + return; + } + + codec_config.scheme = CodecMap::instance().nth_scheme(0); + + core::ScopedPtr encoder( + CodecMap::instance().new_encoder(codec_config, packet_factory, arena), arena); + + core::ScopedPtr decoder( + CodecMap::instance().new_decoder(codec_config, packet_factory, arena), arena); + + test::PacketDispatcher dispatcher(source_parser(), repair_parser(), packet_factory, + NumSourcePackets, NumRepairPackets); + + Writer writer(writer_config, codec_config.scheme, *encoder, dispatcher, + source_composer(), repair_composer(), packet_factory, arena); + + Reader reader(reader_config, codec_config.scheme, *decoder, + dispatcher.source_reader(), dispatcher.repair_reader(), rtp_parser, + packet_factory, arena); + + // Sending first block except first packet. + UNSIGNED_LONGS_EQUAL(0, writer.max_block_duration()); + // Sending 1-5 blocks. + for (size_t i_block = 0; i_block < 5; i_block++) { + dispatcher.clear_losses(); + + fill_all_packets(i_block * NumSourcePackets); + for (size_t i = 0; i < NumSourcePackets; ++i) { + if (i_block == 3) { + dispatcher.lose(i); + } + UNSIGNED_LONGS_EQUAL(status::StatusOK, + writer.write(source_packets[i % NumSourcePackets])); + if (i_block > 0) { + UNSIGNED_LONGS_EQUAL(NumSourcePackets * 10, writer.max_block_duration()); + } + } + dispatcher.push_stocks(); + } + + // Receive every sent packet except the first one. + for (size_t i = 0; i < 4 * NumSourcePackets; ++i) { + packet::PacketPtr p; + UNSIGNED_LONGS_EQUAL(status::StatusOK, reader.read(p)); + if (i >= 3 * NumSourcePackets) { + UNSIGNED_LONGS_EQUAL(10 * (i + NumSourcePackets), p->stream_timestamp()); + } else { + UNSIGNED_LONGS_EQUAL(10 * i, p->stream_timestamp()); + } + if (i < 2 * NumSourcePackets - 1) { + UNSIGNED_LONGS_EQUAL(0, reader.max_block_duration()); + } else { + UNSIGNED_LONGS_EQUAL(NumSourcePackets * 10, reader.max_block_duration()); + } + } +} + +TEST(block_duration, resize_block_middle) { + if (CodecMap::instance().num_schemes() == 0) { + return; + } + + codec_config.scheme = CodecMap::instance().nth_scheme(0); + + core::ScopedPtr encoder( + CodecMap::instance().new_encoder(codec_config, packet_factory, arena), arena); + + core::ScopedPtr decoder( + CodecMap::instance().new_decoder(codec_config, packet_factory, arena), arena); + + test::PacketDispatcher dispatcher(source_parser(), repair_parser(), packet_factory, + NumSourcePackets, NumRepairPackets); + + Writer writer(writer_config, codec_config.scheme, *encoder, dispatcher, + source_composer(), repair_composer(), packet_factory, arena); + + Reader reader(reader_config, codec_config.scheme, *decoder, + dispatcher.source_reader(), dispatcher.repair_reader(), rtp_parser, + packet_factory, arena); + + packet::seqnum_t wr_sn = 0; + size_t sb_len[10] = { NumSourcePackets, NumSourcePackets, + NumSourcePackets, // 0-2 + 2 * NumSourcePackets, 2 * NumSourcePackets, + 2 * NumSourcePackets, // 3-5 + NumSourcePackets, NumSourcePackets, // 6-7 + NumSourcePackets, NumSourcePackets }; // 8-9 + + UNSIGNED_LONGS_EQUAL(0, writer.max_block_duration()); + for (size_t i_block = 0; i_block < 10; i_block++) { + core::Array packets(arena); + + dispatcher.clear_losses(); + + if (i_block == 3) { + writer.resize(sb_len[i_block], dispatcher.repair_size()); + } else if (i_block == 6) { + writer.resize(sb_len[i_block], dispatcher.repair_size()); + } + if (!packets.resize(sb_len[i_block])) { + FAIL("resize failed"); + } + for (size_t i = 0; i < sb_len[i_block]; ++i) { + packets[i] = fill_one_packet(wr_sn, FECPayloadSize); + wr_sn++; + + UNSIGNED_LONGS_EQUAL(status::StatusOK, writer.write(packets[i])); + } + dispatcher.push_stocks(); + if (i_block >= 4) { + UNSIGNED_LONGS_EQUAL(NumSourcePackets * 2 * 10, writer.max_block_duration()); + } else if (i_block > 0) { + UNSIGNED_LONGS_EQUAL(NumSourcePackets * 10, writer.max_block_duration()); + } + } + + // Receive every sent packet except the first one. + for (size_t i_block = 0; i_block < 10; ++i_block) { + packet::PacketPtr p; + for (size_t i_packet = 0; i_packet < sb_len[i_block]; i_packet++) { + UNSIGNED_LONGS_EQUAL(status::StatusOK, reader.read(p)); + if ((i_block == 2 || i_block == 5 || i_block > 7) + && i_packet < sb_len[i_block] - 1) { + UNSIGNED_LONGS_EQUAL(sb_len[i_block] * 10, reader.max_block_duration()); + } + } + } +} + +} // namespace fec +} // namespace roc diff --git a/src/tests/roc_fec/test_writer_reader.cpp b/src/tests/roc_fec/test_writer_reader.cpp index 6b85907e7..ead50308a 100644 --- a/src/tests/roc_fec/test_writer_reader.cpp +++ b/src/tests/roc_fec/test_writer_reader.cpp @@ -222,6 +222,7 @@ TEST_GROUP(writer_reader) { TEST(writer_reader, no_losses) { for (size_t n_scheme = 0; n_scheme < CodecMap::instance().num_schemes(); n_scheme++) { + const size_t n_blocks = 5; codec_config.scheme = CodecMap::instance().nth_scheme(n_scheme); core::ScopedPtr encoder( @@ -247,22 +248,36 @@ TEST(writer_reader, no_losses) { CHECK(writer.is_valid()); CHECK(reader.is_valid()); - fill_all_packets(0); + for (size_t i_block = 0; i_block < n_blocks; ++i_block) { + fill_all_packets(i_block * NumSourcePackets); - for (size_t i = 0; i < NumSourcePackets; ++i) { - UNSIGNED_LONGS_EQUAL(status::StatusOK, writer.write(source_packets[i])); - } - dispatcher.push_stocks(); + for (size_t i = 0; i < NumSourcePackets; ++i) { + UNSIGNED_LONGS_EQUAL(status::StatusOK, writer.write(source_packets[i])); + } + if (i_block > 0) { + CHECK(writer.max_block_duration() == NumSourcePackets * 10); + } + dispatcher.push_stocks(); - UNSIGNED_LONGS_EQUAL(NumSourcePackets, dispatcher.source_size()); - UNSIGNED_LONGS_EQUAL(NumRepairPackets, dispatcher.repair_size()); + UNSIGNED_LONGS_EQUAL(NumSourcePackets, dispatcher.source_size()); + UNSIGNED_LONGS_EQUAL(NumRepairPackets, dispatcher.repair_size()); - for (size_t i = 0; i < NumSourcePackets; ++i) { - packet::PacketPtr p; - UNSIGNED_LONGS_EQUAL(status::StatusOK, reader.read(p)); - CHECK(p); - check_audio_packet(p, i); - check_restored(p, false); + for (size_t i = 0; i < NumSourcePackets; ++i) { + packet::PacketPtr p; + UNSIGNED_LONGS_EQUAL(status::StatusOK, reader.read(p)); + if (i_block == 0) { + CHECK(reader.max_block_duration() == 0); + } else { + CHECK(reader.is_started()); + if (i_block > 1) { + // CHECK(reader.max_block_duration() == + // NumSourcePackets * 10); + } + } + CHECK(p); + check_audio_packet(p, i + i_block * NumSourcePackets); + check_restored(p, false); + } } } } @@ -346,6 +361,7 @@ TEST(writer_reader, lost_first_packet_in_first_block) { // Sending first block except first packet. fill_all_packets(0); dispatcher.lose(0); + CHECK(writer.max_block_duration() == 0); for (size_t i = 0; i < NumSourcePackets; ++i) { UNSIGNED_LONGS_EQUAL(status::StatusOK, writer.write(source_packets[i])); } @@ -355,6 +371,7 @@ TEST(writer_reader, lost_first_packet_in_first_block) { fill_all_packets(NumSourcePackets); for (size_t i = 0; i < NumSourcePackets; ++i) { UNSIGNED_LONGS_EQUAL(status::StatusOK, writer.write(source_packets[i])); + CHECK(writer.max_block_duration() == NumSourcePackets * 10); } dispatcher.push_stocks(); @@ -364,8 +381,12 @@ TEST(writer_reader, lost_first_packet_in_first_block) { UNSIGNED_LONGS_EQUAL(status::StatusOK, reader.read(p)); if (i < NumSourcePackets) { CHECK(!reader.is_started()); + CHECK(reader.max_block_duration() == 0); } else { CHECK(reader.is_started()); + // The first packet of the previous block was lost -- still unable to + // get the difference in ts. + CHECK(reader.max_block_duration() == 0); } check_audio_packet(p, i); check_restored(p, false);