Skip to content

Commit

Permalink
gh-688 Issue 688: FEC estimates block duration
Browse files Browse the repository at this point in the history
  • Loading branch information
baranovmv authored and gavv committed Jun 17, 2024
1 parent 3a19267 commit de56335
Show file tree
Hide file tree
Showing 10 changed files with 721 additions and 36 deletions.
14 changes: 11 additions & 3 deletions src/internal_modules/roc_audio/latency_monitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ LatencyMonitor::LatencyMonitor(IFrameReader& frame_reader,
const packet::SortedQueue& incoming_queue,
const Depacketizer& depacketizer,
const packet::ILinkMeter& link_meter,
const fec::Reader* fec_reader,
ResamplerReader* resampler,
const LatencyConfig& config,
const SampleSpec& packet_sample_spec,
Expand All @@ -30,6 +31,7 @@ LatencyMonitor::LatencyMonitor(IFrameReader& frame_reader,
, incoming_queue_(incoming_queue)
, depacketizer_(depacketizer)
, link_meter_(link_meter)
, fec_reader_(fec_reader)
, resampler_(resampler)
, enable_scaling_(config.tuner_profile != audio::LatencyTunerProfile_Intact)
, capture_ts_(0)
Expand Down Expand Up @@ -71,7 +73,7 @@ bool LatencyMonitor::read(Frame& frame) {

if (alive_) {
compute_niq_latency_();
query_link_meter_();
query_metrics_();

if (!pre_process_(frame)) {
alive_ = false;
Expand Down Expand Up @@ -103,7 +105,6 @@ bool LatencyMonitor::reclock(const core::nanoseconds_t playback_timestamp) {

bool LatencyMonitor::pre_process_(const Frame& frame) {
tuner_.write_metrics(latency_metrics_, link_metrics_);

if (!tuner_.update_stream()) {
// TODO(gh-183): forward status code
return false;
Expand Down Expand Up @@ -177,12 +178,19 @@ void LatencyMonitor::compute_e2e_latency_(const core::nanoseconds_t playback_tim
latency_metrics_.e2e_latency = playback_timestamp - capture_ts_;
}

void LatencyMonitor::query_link_meter_() {
void LatencyMonitor::query_metrics_() {
if (!link_meter_.has_metrics()) {
return;
}

link_metrics_ = link_meter_.metrics();

if (fec_reader_) {
latency_metrics_.fec_block_duration =
packet_sample_spec_.stream_timestamp_2_ns(fec_reader_->max_block_duration());
} else {
latency_metrics_.fec_block_duration = 0;
}
}

bool LatencyMonitor::init_scaling_() {
Expand Down
6 changes: 4 additions & 2 deletions src/internal_modules/roc_audio/latency_monitor.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
#include "roc_core/noncopyable.h"
#include "roc_core/optional.h"
#include "roc_core/time.h"
#include "roc_packet/ilink_meter.h"
#include "roc_fec/reader.h"
#include "roc_packet/sorted_queue.h"
#include "roc_packet/units.h"

Expand Down Expand Up @@ -64,6 +64,7 @@ class LatencyMonitor : public IFrameReader, public core::NonCopyable<> {
const packet::SortedQueue& incoming_queue,
const Depacketizer& depacketizer,
const packet::ILinkMeter& link_meter,
const fec::Reader* fec_reader,
ResamplerReader* resampler,
const LatencyConfig& config,
const SampleSpec& packet_sample_spec,
Expand Down Expand Up @@ -94,7 +95,7 @@ class LatencyMonitor : public IFrameReader, public core::NonCopyable<> {
private:
void compute_niq_latency_();
void compute_e2e_latency_(core::nanoseconds_t playback_timestamp);
void query_link_meter_();
void query_metrics_();

bool pre_process_(const Frame& frame);
void post_process_(const Frame& frame);
Expand All @@ -112,6 +113,7 @@ class LatencyMonitor : public IFrameReader, public core::NonCopyable<> {
const packet::SortedQueue& incoming_queue_;
const Depacketizer& depacketizer_;
const packet::ILinkMeter& link_meter_;
const fec::Reader* fec_reader_;

ResamplerReader* resampler_;
const bool enable_scaling_;
Expand Down
6 changes: 5 additions & 1 deletion src/internal_modules/roc_audio/latency_tuner.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,14 @@ struct LatencyMetrics {
//! on receiver.
core::nanoseconds_t e2e_latency;

//! Estimated FEC block duration.
core::nanoseconds_t fec_block_duration;

LatencyMetrics()
: niq_latency(0)
, niq_stalling(0)
, e2e_latency(0) {
, e2e_latency(0)
, fec_block_duration(0) {
}
};

Expand Down
59 changes: 46 additions & 13 deletions src/internal_modules/roc_fec/reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ namespace fec {

Reader::Reader(const ReaderConfig& config,
packet::FecScheme fec_scheme,
IBlockDecoder& decoder,
IBlockDecoder& block_decoder,
packet::IReader& source_reader,
packet::IReader& repair_reader,
packet::IParser& parser,
packet::PacketFactory& packet_factory,
core::IArena& arena)
: decoder_(decoder)
: block_decoder_(block_decoder)
, source_reader_(source_reader)
, repair_reader_(repair_reader)
, parser_(parser)
Expand All @@ -43,6 +43,8 @@ Reader::Reader(const ReaderConfig& config,
, repair_block_resized_(false)
, payload_resized_(false)
, n_packets_(0)
, prev_block_timestamp_valid_(false)
, block_max_duration_(0)
, max_sbn_jump_(config.max_sbn_jump)
, fec_scheme_(fec_scheme) {
valid_ = true;
Expand Down Expand Up @@ -162,7 +164,6 @@ status::StatusCode Reader::get_next_packet_(packet::PacketPtr& ptr) {
} else {
pp = source_block_[pos++];
}

next_packet_ = pos;
} else {
next_packet_++;
Expand All @@ -181,6 +182,12 @@ status::StatusCode Reader::get_next_packet_(packet::PacketPtr& ptr) {
void Reader::next_block_() {
roc_log(LogTrace, "fec reader: next block: sbn=%lu", (unsigned long)cur_sbn_);

if (source_block_[0]) {
update_block_duration_(source_block_[0]);
} else {
prev_block_timestamp_valid_ = false;
}

for (size_t n = 0; n < source_block_.size(); n++) {
source_block_[n] = NULL;
}
Expand Down Expand Up @@ -210,7 +217,8 @@ void Reader::try_repair_() {
return;
}

if (!decoder_.begin(source_block_.size(), repair_block_.size(), payload_size_)) {
if (!block_decoder_.begin(source_block_.size(), repair_block_.size(),
payload_size_)) {
roc_log(LogDebug,
"fec reader: can't begin decoder block, shutting down:"
" sbl=%lu rbl=%lu payload_size=%lu",
Expand All @@ -224,22 +232,22 @@ void Reader::try_repair_() {
if (!source_block_[n]) {
continue;
}
decoder_.set(n, source_block_[n]->fec()->payload);
block_decoder_.set(n, source_block_[n]->fec()->payload);
}

for (size_t n = 0; n < repair_block_.size(); n++) {
if (!repair_block_[n]) {
continue;
}
decoder_.set(source_block_.size() + n, repair_block_[n]->fec()->payload);
block_decoder_.set(source_block_.size() + n, repair_block_[n]->fec()->payload);
}

for (size_t n = 0; n < source_block_.size(); n++) {
if (source_block_[n]) {
continue;
}

core::Slice<uint8_t> buffer = decoder_.repair(n);
core::Slice<uint8_t> buffer = block_decoder_.repair(n);
if (!buffer) {
continue;
}
Expand All @@ -252,7 +260,7 @@ void Reader::try_repair_() {
source_block_[n] = pp;
}

decoder_.end();
block_decoder_.end();
can_repair_ = false;
}

Expand Down Expand Up @@ -655,12 +663,12 @@ bool Reader::can_update_source_block_size_(size_t new_sblen) {
return false;
}

if (new_sblen > decoder_.max_block_length()) {
if (new_sblen > block_decoder_.max_block_length()) {
roc_log(LogDebug,
"fec reader: can't change source block size above maximum, shutting down:"
" cur_sblen=%lu new_sblen=%lu max_blen=%lu",
(unsigned long)cur_sblen, (unsigned long)new_sblen,
(unsigned long)decoder_.max_block_length());
(unsigned long)block_decoder_.max_block_length());
return (alive_ = false);
}

Expand All @@ -675,6 +683,9 @@ bool Reader::update_source_block_size_(size_t new_sblen) {
return true;
}

prev_block_timestamp_valid_ = false;
block_max_duration_ = 0;

if (!source_block_.resize(new_sblen)) {
roc_log(LogDebug,
"fec reader: can't allocate source block memory, shutting down:"
Expand Down Expand Up @@ -710,12 +721,12 @@ bool Reader::can_update_repair_block_size_(size_t new_blen) {
return false;
}

if (new_blen > decoder_.max_block_length()) {
if (new_blen > block_decoder_.max_block_length()) {
roc_log(LogDebug,
"fec reader: can't change repair block size above maximum, shutting down:"
" cur_blen=%lu new_blen=%lu max_blen=%lu",
(unsigned long)cur_blen, (unsigned long)new_blen,
(unsigned long)decoder_.max_block_length());
(unsigned long)block_decoder_.max_block_length());
return (alive_ = false);
}

Expand All @@ -733,7 +744,9 @@ bool Reader::update_repair_block_size_(size_t new_blen) {
return true;
}

// should not happen: sblen should be validated and updated already
prev_block_timestamp_valid_ = false;
block_max_duration_ = 0;

roc_panic_if_not(new_blen > cur_sblen);

const size_t new_rblen = new_blen - cur_sblen;
Expand Down Expand Up @@ -789,5 +802,25 @@ void Reader::drop_repair_packets_from_prev_blocks_() {
}
}

void Reader::update_block_duration_(const packet::PacketPtr& ptr) {
packet::stream_timestamp_diff_t block_dur = 0;
if (prev_block_timestamp_valid_) {
block_dur =
packet::stream_timestamp_diff(ptr->stream_timestamp(), prev_block_timestamp_);
}
if (block_dur < 0) {
roc_log(LogTrace, "fec reader: negative block duration");
prev_block_timestamp_valid_ = false;
} else {
block_max_duration_ = std::max(block_max_duration_, block_dur);
prev_block_timestamp_ = ptr->stream_timestamp();
prev_block_timestamp_valid_ = true;
}
}

packet::stream_timestamp_t Reader::max_block_duration() const {
return (packet::stream_timestamp_t)block_max_duration_;
}

} // namespace fec
} // namespace roc
13 changes: 11 additions & 2 deletions src/internal_modules/roc_fec/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class Reader : public packet::IReader, public core::NonCopyable<> {
//! - @p arena is used to initialize a packet array
Reader(const ReaderConfig& config,
packet::FecScheme fec_scheme,
IBlockDecoder& decoder,
IBlockDecoder& block_decoder,
packet::IReader& source_reader,
packet::IReader& repair_reader,
packet::IParser& parser,
Expand All @@ -71,6 +71,9 @@ class Reader : public packet::IReader, public core::NonCopyable<> {
//! When a packet loss is detected, try to restore it from repair packets.
virtual ROC_ATTR_NODISCARD status::StatusCode read(packet::PacketPtr&);

//! Get maximal FEC block duratoin seen since last block resize.
packet::stream_timestamp_t max_block_duration() const;

private:
status::StatusCode read_(packet::PacketPtr&);

Expand Down Expand Up @@ -108,7 +111,9 @@ class Reader : public packet::IReader, public core::NonCopyable<> {

void drop_repair_packets_from_prev_blocks_();

IBlockDecoder& decoder_;
void update_block_duration_(const packet::PacketPtr& ptr);

IBlockDecoder& block_decoder_;

packet::IReader& source_reader_;
packet::IReader& repair_reader_;
Expand Down Expand Up @@ -138,6 +143,10 @@ class Reader : public packet::IReader, public core::NonCopyable<> {

unsigned n_packets_;

bool prev_block_timestamp_valid_;
packet::stream_timestamp_t prev_block_timestamp_;
packet::stream_timestamp_diff_t block_max_duration_;

const size_t max_sbn_jump_;
const packet::FecScheme fec_scheme_;
};
Expand Down
28 changes: 27 additions & 1 deletion src/internal_modules/roc_fec/writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@ Writer::Writer(const WriterConfig& config,
, cur_packet_(0)
, fec_scheme_(fec_scheme)
, valid_(false)
, alive_(true) {
, alive_(true)
, prev_block_timestamp_valid_(false)
, prev_block_timestamp_(0)
, block_max_duration_(0) {
cur_sbn_ = (packet::blknum_t)core::fast_random_range(0, packet::blknum_t(-1));
cur_block_repair_sn_ =
(packet::seqnum_t)core::fast_random_range(0, packet::seqnum_t(-1));
Expand Down Expand Up @@ -88,6 +91,8 @@ bool Writer::resize(size_t sblen, size_t rblen) {
next_sblen_ = sblen;
next_rblen_ = rblen;

prev_block_timestamp_valid_ = false;

return true;
}

Expand Down Expand Up @@ -133,6 +138,8 @@ status::StatusCode Writer::write(const packet::PacketPtr& pp) {
}

bool Writer::begin_block_(const packet::PacketPtr& pp) {
update_block_duration_(pp);

if (!apply_sizes_(next_sblen_, next_rblen_, pp->fec()->payload.size())) {
return false;
}
Expand Down Expand Up @@ -340,5 +347,24 @@ bool Writer::validate_source_packet_(const packet::PacketPtr& pp) {
return true;
}

void Writer::update_block_duration_(const packet::PacketPtr& ptr) {
packet::stream_timestamp_diff_t block_dur = 0;
if (prev_block_timestamp_valid_) {
block_dur =
packet::stream_timestamp_diff(ptr->stream_timestamp(), prev_block_timestamp_);
}
if (block_dur < 0) {
prev_block_timestamp_valid_ = false;
} else {
block_max_duration_ = std::max(block_max_duration_, block_dur);
prev_block_timestamp_ = ptr->stream_timestamp();
prev_block_timestamp_valid_ = true;
}
}

packet::stream_timestamp_t Writer::max_block_duration() const {
return (packet::stream_timestamp_t)block_max_duration_;
}

} // namespace fec
} // namespace roc
9 changes: 9 additions & 0 deletions src/internal_modules/roc_fec/writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ class Writer : public packet::IWriter, public core::NonCopyable<> {
//! - generates repair packets and also writes them to the output writer
virtual ROC_ATTR_NODISCARD status::StatusCode write(const packet::PacketPtr&);

//! Get maximal FEC block duratoin seen since last block resize.
packet::stream_timestamp_t max_block_duration() const;

private:
bool begin_block_(const packet::PacketPtr& pp);
void end_block_();
Expand All @@ -95,6 +98,8 @@ class Writer : public packet::IWriter, public core::NonCopyable<> {
void validate_fec_packet_(const packet::PacketPtr&);
bool validate_source_packet_(const packet::PacketPtr&);

void update_block_duration_(const packet::PacketPtr& ptr);

size_t cur_sblen_;
size_t next_sblen_;

Expand Down Expand Up @@ -124,6 +129,10 @@ class Writer : public packet::IWriter, public core::NonCopyable<> {

bool valid_;
bool alive_;

bool prev_block_timestamp_valid_;
packet::stream_timestamp_t prev_block_timestamp_;
packet::stream_timestamp_diff_t block_max_duration_;
};

} // namespace fec
Expand Down
Loading

0 comments on commit de56335

Please sign in to comment.