Skip to content

Commit

Permalink
FEC estimates block duration
Browse files Browse the repository at this point in the history
Reader and writer esimate duration of
FEC block in ns as this value could vary
even though number of packets is known.
For now it is decided to keep the greates
value seen in the current session, not
average.
  • Loading branch information
baranovmv committed May 26, 2024
1 parent 78d85df commit 3f2a94f
Show file tree
Hide file tree
Showing 9 changed files with 143 additions and 32 deletions.
8 changes: 8 additions & 0 deletions src/internal_modules/roc_audio/latency_monitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ LatencyMonitor::LatencyMonitor(IFrameReader& frame_reader,
const packet::SortedQueue& incoming_queue,
const Depacketizer& depacketizer,
const packet::ILinkMeter& link_meter,
const fec::Reader* fec_reader,
ResamplerReader* resampler,
const LatencyConfig& config,
const SampleSpec& packet_sample_spec,
Expand All @@ -30,6 +31,7 @@ LatencyMonitor::LatencyMonitor(IFrameReader& frame_reader,
, incoming_queue_(incoming_queue)
, depacketizer_(depacketizer)
, link_meter_(link_meter)
, fec_reader_(fec_reader)
, resampler_(resampler)
, enable_scaling_(config.tuner_profile != audio::LatencyTunerProfile_Intact)
, capture_ts_(0)
Expand Down Expand Up @@ -102,6 +104,12 @@ bool LatencyMonitor::reclock(const core::nanoseconds_t playback_timestamp) {
}

bool LatencyMonitor::pre_process_(const Frame& frame) {
if (fec_reader_) {
latency_metrics_.fec_block_duration =
packet_sample_spec_.stream_timestamp_2_ns(fec_reader_->max_block_duration());
} else {
latency_metrics_.fec_block_duration = 0;
}
tuner_.write_metrics(latency_metrics_, link_metrics_);

if (!tuner_.update_stream()) {
Expand Down
4 changes: 3 additions & 1 deletion src/internal_modules/roc_audio/latency_monitor.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
#include "roc_core/noncopyable.h"
#include "roc_core/optional.h"
#include "roc_core/time.h"
#include "roc_packet/ilink_meter.h"
#include "roc_fec/reader.h"
#include "roc_packet/sorted_queue.h"
#include "roc_packet/units.h"

Expand Down Expand Up @@ -64,6 +64,7 @@ class LatencyMonitor : public IFrameReader, public core::NonCopyable<> {
const packet::SortedQueue& incoming_queue,
const Depacketizer& depacketizer,
const packet::ILinkMeter& link_meter,
const fec::Reader* fec_reader,
ResamplerReader* resampler,
const LatencyConfig& config,
const SampleSpec& packet_sample_spec,
Expand Down Expand Up @@ -112,6 +113,7 @@ class LatencyMonitor : public IFrameReader, public core::NonCopyable<> {
const packet::SortedQueue& incoming_queue_;
const Depacketizer& depacketizer_;
const packet::ILinkMeter& link_meter_;
const fec::Reader* fec_reader_;

ResamplerReader* resampler_;
const bool enable_scaling_;
Expand Down
6 changes: 5 additions & 1 deletion src/internal_modules/roc_audio/latency_tuner.h
Original file line number Diff line number Diff line change
Expand Up @@ -151,10 +151,14 @@ struct LatencyMetrics {
//! on receiver.
core::nanoseconds_t e2e_latency;

//! Estimated FEC block duration.
core::nanoseconds_t fec_block_duration;

LatencyMetrics()
: niq_latency(0)
, niq_stalling(0)
, e2e_latency(0) {
, e2e_latency(0)
, fec_block_duration(0) {
}
};

Expand Down
59 changes: 46 additions & 13 deletions src/internal_modules/roc_fec/reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ namespace fec {

Reader::Reader(const ReaderConfig& config,
packet::FecScheme fec_scheme,
IBlockDecoder& decoder,
IBlockDecoder& block_decoder,
packet::IReader& source_reader,
packet::IReader& repair_reader,
packet::IParser& parser,
packet::PacketFactory& packet_factory,
core::IArena& arena)
: decoder_(decoder)
: block_decoder_(block_decoder)
, source_reader_(source_reader)
, repair_reader_(repair_reader)
, parser_(parser)
Expand All @@ -43,6 +43,8 @@ Reader::Reader(const ReaderConfig& config,
, repair_block_resized_(false)
, payload_resized_(false)
, n_packets_(0)
, prev_block_timestamp_valid_(false)
, block_max_duration_(0)
, max_sbn_jump_(config.max_sbn_jump)
, fec_scheme_(fec_scheme) {
valid_ = true;
Expand Down Expand Up @@ -139,6 +141,13 @@ status::StatusCode Reader::get_next_packet_(packet::PacketPtr& ptr) {
fill_block_();

packet::PacketPtr pp = source_block_[next_packet_];
if (next_packet_ == 0) {
if (pp) {
update_block_duration_(pp);
} else {
prev_block_timestamp_valid_ = false;
}
}

do {
if (!alive_) {
Expand All @@ -162,7 +171,6 @@ status::StatusCode Reader::get_next_packet_(packet::PacketPtr& ptr) {
} else {
pp = source_block_[pos++];
}

next_packet_ = pos;
} else {
next_packet_++;
Expand Down Expand Up @@ -210,7 +218,8 @@ void Reader::try_repair_() {
return;
}

if (!decoder_.begin(source_block_.size(), repair_block_.size(), payload_size_)) {
if (!block_decoder_.begin(source_block_.size(), repair_block_.size(),
payload_size_)) {
roc_log(LogDebug,
"fec reader: can't begin decoder block, shutting down:"
" sbl=%lu rbl=%lu payload_size=%lu",
Expand All @@ -224,22 +233,22 @@ void Reader::try_repair_() {
if (!source_block_[n]) {
continue;
}
decoder_.set(n, source_block_[n]->fec()->payload);
block_decoder_.set(n, source_block_[n]->fec()->payload);
}

for (size_t n = 0; n < repair_block_.size(); n++) {
if (!repair_block_[n]) {
continue;
}
decoder_.set(source_block_.size() + n, repair_block_[n]->fec()->payload);
block_decoder_.set(source_block_.size() + n, repair_block_[n]->fec()->payload);
}

for (size_t n = 0; n < source_block_.size(); n++) {
if (source_block_[n]) {
continue;
}

core::Slice<uint8_t> buffer = decoder_.repair(n);
core::Slice<uint8_t> buffer = block_decoder_.repair(n);
if (!buffer) {
continue;
}
Expand All @@ -252,7 +261,7 @@ void Reader::try_repair_() {
source_block_[n] = pp;
}

decoder_.end();
block_decoder_.end();
can_repair_ = false;
}

Expand Down Expand Up @@ -655,12 +664,12 @@ bool Reader::can_update_source_block_size_(size_t new_sblen) {
return false;
}

if (new_sblen > decoder_.max_block_length()) {
if (new_sblen > block_decoder_.max_block_length()) {
roc_log(LogDebug,
"fec reader: can't change source block size above maximum, shutting down:"
" cur_sblen=%lu new_sblen=%lu max_blen=%lu",
(unsigned long)cur_sblen, (unsigned long)new_sblen,
(unsigned long)decoder_.max_block_length());
(unsigned long)block_decoder_.max_block_length());
return (alive_ = false);
}

Expand All @@ -675,6 +684,9 @@ bool Reader::update_source_block_size_(size_t new_sblen) {
return true;
}

prev_block_timestamp_valid_ = false;
block_max_duration_ = 0;

if (!source_block_.resize(new_sblen)) {
roc_log(LogDebug,
"fec reader: can't allocate source block memory, shutting down:"
Expand Down Expand Up @@ -710,12 +722,12 @@ bool Reader::can_update_repair_block_size_(size_t new_blen) {
return false;
}

if (new_blen > decoder_.max_block_length()) {
if (new_blen > block_decoder_.max_block_length()) {
roc_log(LogDebug,
"fec reader: can't change repair block size above maximum, shutting down:"
" cur_blen=%lu new_blen=%lu max_blen=%lu",
(unsigned long)cur_blen, (unsigned long)new_blen,
(unsigned long)decoder_.max_block_length());
(unsigned long)block_decoder_.max_block_length());
return (alive_ = false);
}

Expand All @@ -733,7 +745,9 @@ bool Reader::update_repair_block_size_(size_t new_blen) {
return true;
}

// should not happen: sblen should be validated and updated already
prev_block_timestamp_valid_ = false;
block_max_duration_ = 0;

roc_panic_if_not(new_blen > cur_sblen);

const size_t new_rblen = new_blen - cur_sblen;
Expand Down Expand Up @@ -789,5 +803,24 @@ void Reader::drop_repair_packets_from_prev_blocks_() {
}
}

void Reader::update_block_duration_(const packet::PacketPtr& ptr) {
if (!ptr->rtp()) {
return;
}
packet::stream_timestamp_diff_t block_dur = 0;
if (prev_block_timestamp_valid_) {
block_dur = packet::stream_timestamp_diff(ptr->rtp()->stream_timestamp,
prev_block_timestamp_);
}
roc_panic_if_msg(block_dur < 0, "fec reader: negative block duration");
block_max_duration_ = std::max(block_max_duration_, block_dur);
prev_block_timestamp_ = ptr->rtp()->stream_timestamp;
prev_block_timestamp_valid_ = true;
}

packet::stream_timestamp_t Reader::max_block_duration() const {
return (packet::stream_timestamp_t)block_max_duration_;
}

} // namespace fec
} // namespace roc
13 changes: 11 additions & 2 deletions src/internal_modules/roc_fec/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class Reader : public packet::IReader, public core::NonCopyable<> {
//! - @p arena is used to initialize a packet array
Reader(const ReaderConfig& config,
packet::FecScheme fec_scheme,
IBlockDecoder& decoder,
IBlockDecoder& block_decoder,
packet::IReader& source_reader,
packet::IReader& repair_reader,
packet::IParser& parser,
Expand All @@ -71,6 +71,9 @@ class Reader : public packet::IReader, public core::NonCopyable<> {
//! When a packet loss is detected, try to restore it from repair packets.
virtual ROC_ATTR_NODISCARD status::StatusCode read(packet::PacketPtr&);

//! Get maximal FEC block duratoin seen since last block resize.
packet::stream_timestamp_t max_block_duration() const;

private:
status::StatusCode read_(packet::PacketPtr&);

Expand Down Expand Up @@ -108,7 +111,9 @@ class Reader : public packet::IReader, public core::NonCopyable<> {

void drop_repair_packets_from_prev_blocks_();

IBlockDecoder& decoder_;
void update_block_duration_(const packet::PacketPtr& ptr);

IBlockDecoder& block_decoder_;

packet::IReader& source_reader_;
packet::IReader& repair_reader_;
Expand Down Expand Up @@ -138,6 +143,10 @@ class Reader : public packet::IReader, public core::NonCopyable<> {

unsigned n_packets_;

bool prev_block_timestamp_valid_;
packet::stream_timestamp_t prev_block_timestamp_;
packet::stream_timestamp_diff_t block_max_duration_;

const size_t max_sbn_jump_;
const packet::FecScheme fec_scheme_;
};
Expand Down
30 changes: 29 additions & 1 deletion src/internal_modules/roc_fec/writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@ Writer::Writer(const WriterConfig& config,
, cur_packet_(0)
, fec_scheme_(fec_scheme)
, valid_(false)
, alive_(true) {
, alive_(true)
, prev_block_timestamp_valid_(false)
, prev_block_timestamp_(0)
, block_max_duration_(0) {
cur_sbn_ = (packet::blknum_t)core::fast_random_range(0, packet::blknum_t(-1));
cur_block_repair_sn_ =
(packet::seqnum_t)core::fast_random_range(0, packet::seqnum_t(-1));
Expand Down Expand Up @@ -88,6 +91,8 @@ bool Writer::resize(size_t sblen, size_t rblen) {
next_sblen_ = sblen;
next_rblen_ = rblen;

prev_block_timestamp_valid_ = false;

return true;
}

Expand All @@ -107,6 +112,7 @@ status::StatusCode Writer::write(const packet::PacketPtr& pp) {
}

if (cur_packet_ == 0) {
update_block_duration_(pp);
if (!begin_block_(pp)) {
// TODO(gh-183): return status
return status::StatusOK;
Expand Down Expand Up @@ -340,5 +346,27 @@ bool Writer::validate_source_packet_(const packet::PacketPtr& pp) {
return true;
}

void Writer::update_block_duration_(const packet::PacketPtr& ptr) {
if (!ptr->rtp()) {
return;
}
packet::stream_timestamp_diff_t block_dur = 0;
if (prev_block_timestamp_valid_) {
block_dur = packet::stream_timestamp_diff(ptr->rtp()->stream_timestamp,
prev_block_timestamp_);
}
if (block_dur < 0) {
prev_block_timestamp_valid_ = false;
} else {
block_max_duration_ = std::max(block_max_duration_, block_dur);
prev_block_timestamp_ = ptr->rtp()->stream_timestamp;
prev_block_timestamp_valid_ = true;
}
}

packet::stream_timestamp_t Writer::max_block_duration() const {
return (packet::stream_timestamp_t)block_max_duration_;
}

} // namespace fec
} // namespace roc
9 changes: 9 additions & 0 deletions src/internal_modules/roc_fec/writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ class Writer : public packet::IWriter, public core::NonCopyable<> {
//! - generates repair packets and also writes them to the output writer
virtual ROC_ATTR_NODISCARD status::StatusCode write(const packet::PacketPtr&);

//! Get maximal FEC block duratoin seen since last block resize.
packet::stream_timestamp_t max_block_duration() const;

private:
bool begin_block_(const packet::PacketPtr& pp);
void end_block_();
Expand All @@ -95,6 +98,8 @@ class Writer : public packet::IWriter, public core::NonCopyable<> {
void validate_fec_packet_(const packet::PacketPtr&);
bool validate_source_packet_(const packet::PacketPtr&);

void update_block_duration_(const packet::PacketPtr& ptr);

size_t cur_sblen_;
size_t next_sblen_;

Expand Down Expand Up @@ -124,6 +129,10 @@ class Writer : public packet::IWriter, public core::NonCopyable<> {

bool valid_;
bool alive_;

bool prev_block_timestamp_valid_;
packet::stream_timestamp_t prev_block_timestamp_;
packet::stream_timestamp_diff_t block_max_duration_;
};

} // namespace fec
Expand Down
2 changes: 1 addition & 1 deletion src/internal_modules/roc_pipeline/receiver_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ ReceiverSession::ReceiverSession(const ReceiverSessionConfig& session_config,
}

latency_monitor_.reset(new (latency_monitor_) audio::LatencyMonitor(
*frm_reader, *source_queue_, *depacketizer_, *source_meter_,
*frm_reader, *source_queue_, *depacketizer_, *source_meter_, fec_reader_.get(),
resampler_reader_.get(), session_config.latency, pkt_encoding->sample_spec,
common_config.output_sample_spec));
if (!latency_monitor_ || !latency_monitor_->is_valid()) {
Expand Down
Loading

0 comments on commit 3f2a94f

Please sign in to comment.