Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 688: FEC estimates block duration #717

Merged
merged 1 commit into from
Jun 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions src/internal_modules/roc_audio/latency_monitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ LatencyMonitor::LatencyMonitor(IFrameReader& frame_reader,
const packet::SortedQueue& incoming_queue,
const Depacketizer& depacketizer,
const packet::ILinkMeter& link_meter,
const fec::Reader* fec_reader,
ResamplerReader* resampler,
const LatencyConfig& config,
const SampleSpec& packet_sample_spec,
Expand All @@ -30,6 +31,7 @@ LatencyMonitor::LatencyMonitor(IFrameReader& frame_reader,
, incoming_queue_(incoming_queue)
, depacketizer_(depacketizer)
, link_meter_(link_meter)
, fec_reader_(fec_reader)
, resampler_(resampler)
, enable_scaling_(config.tuner_profile != audio::LatencyTunerProfile_Intact)
, capture_ts_(0)
Expand Down Expand Up @@ -71,7 +73,7 @@ bool LatencyMonitor::read(Frame& frame) {

if (alive_) {
compute_niq_latency_();
query_link_meter_();
query_metrics_();

if (!pre_process_(frame)) {
alive_ = false;
Expand Down Expand Up @@ -103,7 +105,6 @@ bool LatencyMonitor::reclock(const core::nanoseconds_t playback_timestamp) {

bool LatencyMonitor::pre_process_(const Frame& frame) {
tuner_.write_metrics(latency_metrics_, link_metrics_);

if (!tuner_.update_stream()) {
// TODO(gh-183): forward status code
return false;
Expand Down Expand Up @@ -177,12 +178,19 @@ void LatencyMonitor::compute_e2e_latency_(const core::nanoseconds_t playback_tim
latency_metrics_.e2e_latency = playback_timestamp - capture_ts_;
}

void LatencyMonitor::query_link_meter_() {
void LatencyMonitor::query_metrics_() {
if (!link_meter_.has_metrics()) {
return;
}

link_metrics_ = link_meter_.metrics();

if (fec_reader_) {
latency_metrics_.fec_block_duration =
packet_sample_spec_.stream_timestamp_2_ns(fec_reader_->max_block_duration());
} else {
latency_metrics_.fec_block_duration = 0;
}
}

bool LatencyMonitor::init_scaling_() {
Expand Down
6 changes: 4 additions & 2 deletions src/internal_modules/roc_audio/latency_monitor.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
#include "roc_core/noncopyable.h"
#include "roc_core/optional.h"
#include "roc_core/time.h"
#include "roc_packet/ilink_meter.h"
#include "roc_fec/reader.h"
#include "roc_packet/sorted_queue.h"
#include "roc_packet/units.h"

Expand Down Expand Up @@ -64,6 +64,7 @@ class LatencyMonitor : public IFrameReader, public core::NonCopyable<> {
const packet::SortedQueue& incoming_queue,
const Depacketizer& depacketizer,
const packet::ILinkMeter& link_meter,
const fec::Reader* fec_reader,
ResamplerReader* resampler,
const LatencyConfig& config,
const SampleSpec& packet_sample_spec,
Expand Down Expand Up @@ -94,7 +95,7 @@ class LatencyMonitor : public IFrameReader, public core::NonCopyable<> {
private:
void compute_niq_latency_();
void compute_e2e_latency_(core::nanoseconds_t playback_timestamp);
void query_link_meter_();
void query_metrics_();

bool pre_process_(const Frame& frame);
void post_process_(const Frame& frame);
Expand All @@ -112,6 +113,7 @@ class LatencyMonitor : public IFrameReader, public core::NonCopyable<> {
const packet::SortedQueue& incoming_queue_;
const Depacketizer& depacketizer_;
const packet::ILinkMeter& link_meter_;
const fec::Reader* fec_reader_;

ResamplerReader* resampler_;
const bool enable_scaling_;
Expand Down
6 changes: 5 additions & 1 deletion src/internal_modules/roc_audio/latency_tuner.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,14 @@ struct LatencyMetrics {
//! on receiver.
core::nanoseconds_t e2e_latency;

//! Estimated FEC block duration.
core::nanoseconds_t fec_block_duration;

LatencyMetrics()
: niq_latency(0)
, niq_stalling(0)
, e2e_latency(0) {
, e2e_latency(0)
, fec_block_duration(0) {
}
};

Expand Down
59 changes: 46 additions & 13 deletions src/internal_modules/roc_fec/reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ namespace fec {

Reader::Reader(const ReaderConfig& config,
packet::FecScheme fec_scheme,
IBlockDecoder& decoder,
IBlockDecoder& block_decoder,
packet::IReader& source_reader,
packet::IReader& repair_reader,
packet::IParser& parser,
packet::PacketFactory& packet_factory,
core::IArena& arena)
: decoder_(decoder)
: block_decoder_(block_decoder)
, source_reader_(source_reader)
, repair_reader_(repair_reader)
, parser_(parser)
Expand All @@ -43,6 +43,8 @@ Reader::Reader(const ReaderConfig& config,
, repair_block_resized_(false)
, payload_resized_(false)
, n_packets_(0)
, prev_block_timestamp_valid_(false)
, block_max_duration_(0)
, max_sbn_jump_(config.max_sbn_jump)
, fec_scheme_(fec_scheme) {
valid_ = true;
Expand Down Expand Up @@ -162,7 +164,6 @@ status::StatusCode Reader::get_next_packet_(packet::PacketPtr& ptr) {
} else {
pp = source_block_[pos++];
}

next_packet_ = pos;
} else {
next_packet_++;
Expand All @@ -181,6 +182,12 @@ status::StatusCode Reader::get_next_packet_(packet::PacketPtr& ptr) {
void Reader::next_block_() {
roc_log(LogTrace, "fec reader: next block: sbn=%lu", (unsigned long)cur_sbn_);

if (source_block_[0]) {
update_block_duration_(source_block_[0]);
} else {
prev_block_timestamp_valid_ = false;
}

for (size_t n = 0; n < source_block_.size(); n++) {
source_block_[n] = NULL;
}
Expand Down Expand Up @@ -210,7 +217,8 @@ void Reader::try_repair_() {
return;
}

if (!decoder_.begin(source_block_.size(), repair_block_.size(), payload_size_)) {
if (!block_decoder_.begin(source_block_.size(), repair_block_.size(),
payload_size_)) {
roc_log(LogDebug,
"fec reader: can't begin decoder block, shutting down:"
" sbl=%lu rbl=%lu payload_size=%lu",
Expand All @@ -224,22 +232,22 @@ void Reader::try_repair_() {
if (!source_block_[n]) {
continue;
}
decoder_.set(n, source_block_[n]->fec()->payload);
block_decoder_.set(n, source_block_[n]->fec()->payload);
}

for (size_t n = 0; n < repair_block_.size(); n++) {
if (!repair_block_[n]) {
continue;
}
decoder_.set(source_block_.size() + n, repair_block_[n]->fec()->payload);
block_decoder_.set(source_block_.size() + n, repair_block_[n]->fec()->payload);
}

for (size_t n = 0; n < source_block_.size(); n++) {
if (source_block_[n]) {
continue;
}

core::Slice<uint8_t> buffer = decoder_.repair(n);
core::Slice<uint8_t> buffer = block_decoder_.repair(n);
if (!buffer) {
continue;
}
Expand All @@ -252,7 +260,7 @@ void Reader::try_repair_() {
source_block_[n] = pp;
}

decoder_.end();
block_decoder_.end();
can_repair_ = false;
}

Expand Down Expand Up @@ -655,12 +663,12 @@ bool Reader::can_update_source_block_size_(size_t new_sblen) {
return false;
}

if (new_sblen > decoder_.max_block_length()) {
if (new_sblen > block_decoder_.max_block_length()) {
roc_log(LogDebug,
"fec reader: can't change source block size above maximum, shutting down:"
" cur_sblen=%lu new_sblen=%lu max_blen=%lu",
(unsigned long)cur_sblen, (unsigned long)new_sblen,
(unsigned long)decoder_.max_block_length());
(unsigned long)block_decoder_.max_block_length());
return (alive_ = false);
}

Expand All @@ -675,6 +683,9 @@ bool Reader::update_source_block_size_(size_t new_sblen) {
return true;
}

prev_block_timestamp_valid_ = false;
block_max_duration_ = 0;

if (!source_block_.resize(new_sblen)) {
roc_log(LogDebug,
"fec reader: can't allocate source block memory, shutting down:"
Expand Down Expand Up @@ -710,12 +721,12 @@ bool Reader::can_update_repair_block_size_(size_t new_blen) {
return false;
}

if (new_blen > decoder_.max_block_length()) {
if (new_blen > block_decoder_.max_block_length()) {
roc_log(LogDebug,
"fec reader: can't change repair block size above maximum, shutting down:"
" cur_blen=%lu new_blen=%lu max_blen=%lu",
(unsigned long)cur_blen, (unsigned long)new_blen,
(unsigned long)decoder_.max_block_length());
(unsigned long)block_decoder_.max_block_length());
return (alive_ = false);
}

Expand All @@ -733,7 +744,9 @@ bool Reader::update_repair_block_size_(size_t new_blen) {
return true;
}

// should not happen: sblen should be validated and updated already
prev_block_timestamp_valid_ = false;
block_max_duration_ = 0;

roc_panic_if_not(new_blen > cur_sblen);

const size_t new_rblen = new_blen - cur_sblen;
Expand Down Expand Up @@ -789,5 +802,25 @@ void Reader::drop_repair_packets_from_prev_blocks_() {
}
}

void Reader::update_block_duration_(const packet::PacketPtr& ptr) {
packet::stream_timestamp_diff_t block_dur = 0;
if (prev_block_timestamp_valid_) {
block_dur =
packet::stream_timestamp_diff(ptr->stream_timestamp(), prev_block_timestamp_);
}
if (block_dur < 0) {
roc_log(LogTrace, "fec reader: negative block duration");
prev_block_timestamp_valid_ = false;
} else {
block_max_duration_ = std::max(block_max_duration_, block_dur);
prev_block_timestamp_ = ptr->stream_timestamp();
prev_block_timestamp_valid_ = true;
}
}

packet::stream_timestamp_t Reader::max_block_duration() const {
return (packet::stream_timestamp_t)block_max_duration_;
}

} // namespace fec
} // namespace roc
13 changes: 11 additions & 2 deletions src/internal_modules/roc_fec/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class Reader : public packet::IReader, public core::NonCopyable<> {
//! - @p arena is used to initialize a packet array
Reader(const ReaderConfig& config,
packet::FecScheme fec_scheme,
IBlockDecoder& decoder,
IBlockDecoder& block_decoder,
packet::IReader& source_reader,
packet::IReader& repair_reader,
packet::IParser& parser,
Expand All @@ -71,6 +71,9 @@ class Reader : public packet::IReader, public core::NonCopyable<> {
//! When a packet loss is detected, try to restore it from repair packets.
virtual ROC_ATTR_NODISCARD status::StatusCode read(packet::PacketPtr&);

//! Get maximal FEC block duratoin seen since last block resize.
packet::stream_timestamp_t max_block_duration() const;

private:
status::StatusCode read_(packet::PacketPtr&);

Expand Down Expand Up @@ -108,7 +111,9 @@ class Reader : public packet::IReader, public core::NonCopyable<> {

void drop_repair_packets_from_prev_blocks_();

IBlockDecoder& decoder_;
void update_block_duration_(const packet::PacketPtr& ptr);

IBlockDecoder& block_decoder_;

packet::IReader& source_reader_;
packet::IReader& repair_reader_;
Expand Down Expand Up @@ -138,6 +143,10 @@ class Reader : public packet::IReader, public core::NonCopyable<> {

unsigned n_packets_;

bool prev_block_timestamp_valid_;
packet::stream_timestamp_t prev_block_timestamp_;
packet::stream_timestamp_diff_t block_max_duration_;

const size_t max_sbn_jump_;
const packet::FecScheme fec_scheme_;
};
Expand Down
28 changes: 27 additions & 1 deletion src/internal_modules/roc_fec/writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@ Writer::Writer(const WriterConfig& config,
, cur_packet_(0)
, fec_scheme_(fec_scheme)
, valid_(false)
, alive_(true) {
, alive_(true)
, prev_block_timestamp_valid_(false)
, prev_block_timestamp_(0)
, block_max_duration_(0) {
cur_sbn_ = (packet::blknum_t)core::fast_random_range(0, packet::blknum_t(-1));
cur_block_repair_sn_ =
(packet::seqnum_t)core::fast_random_range(0, packet::seqnum_t(-1));
Expand Down Expand Up @@ -88,6 +91,8 @@ bool Writer::resize(size_t sblen, size_t rblen) {
next_sblen_ = sblen;
next_rblen_ = rblen;

prev_block_timestamp_valid_ = false;

return true;
}

Expand Down Expand Up @@ -133,6 +138,8 @@ status::StatusCode Writer::write(const packet::PacketPtr& pp) {
}

bool Writer::begin_block_(const packet::PacketPtr& pp) {
update_block_duration_(pp);

if (!apply_sizes_(next_sblen_, next_rblen_, pp->fec()->payload.size())) {
return false;
}
Expand Down Expand Up @@ -340,5 +347,24 @@ bool Writer::validate_source_packet_(const packet::PacketPtr& pp) {
return true;
}

void Writer::update_block_duration_(const packet::PacketPtr& ptr) {
packet::stream_timestamp_diff_t block_dur = 0;
if (prev_block_timestamp_valid_) {
block_dur =
packet::stream_timestamp_diff(ptr->stream_timestamp(), prev_block_timestamp_);
}
if (block_dur < 0) {
prev_block_timestamp_valid_ = false;
} else {
block_max_duration_ = std::max(block_max_duration_, block_dur);
prev_block_timestamp_ = ptr->stream_timestamp();
prev_block_timestamp_valid_ = true;
}
}

packet::stream_timestamp_t Writer::max_block_duration() const {
return (packet::stream_timestamp_t)block_max_duration_;
}

} // namespace fec
} // namespace roc
9 changes: 9 additions & 0 deletions src/internal_modules/roc_fec/writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ class Writer : public packet::IWriter, public core::NonCopyable<> {
//! - generates repair packets and also writes them to the output writer
virtual ROC_ATTR_NODISCARD status::StatusCode write(const packet::PacketPtr&);

//! Get maximal FEC block duratoin seen since last block resize.
packet::stream_timestamp_t max_block_duration() const;

private:
bool begin_block_(const packet::PacketPtr& pp);
void end_block_();
Expand All @@ -95,6 +98,8 @@ class Writer : public packet::IWriter, public core::NonCopyable<> {
void validate_fec_packet_(const packet::PacketPtr&);
bool validate_source_packet_(const packet::PacketPtr&);

void update_block_duration_(const packet::PacketPtr& ptr);

size_t cur_sblen_;
size_t next_sblen_;

Expand Down Expand Up @@ -124,6 +129,10 @@ class Writer : public packet::IWriter, public core::NonCopyable<> {

bool valid_;
bool alive_;

bool prev_block_timestamp_valid_;
packet::stream_timestamp_t prev_block_timestamp_;
packet::stream_timestamp_diff_t block_max_duration_;
};

} // namespace fec
Expand Down
Loading
Loading