Skip to content

Commit

Permalink
LinkMeter mov stats test
Browse files Browse the repository at this point in the history
  • Loading branch information
baranovmv committed Jan 28, 2024
1 parent cf033d0 commit 0e5ebe0
Show file tree
Hide file tree
Showing 5 changed files with 335 additions and 45 deletions.
161 changes: 143 additions & 18 deletions src/internal_modules/roc_core/mov_stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,13 @@ class MovStats {
, movsum_(T(0))
, movsum2_(T(0))
, mov_var_(T(0))
, full_(false)
, mov_max_cntr_(0)
, full_(false)
, first_(true)
, queue_max_(arena, win_len)
, curr_max_(T(0))
, queue_min_(arena, win_len)
, curr_min_(T(0))
{
if(!buffer_.resize(win_len)){
roc_panic("MovStats: can't allocate storage for the ring buffer");
Expand All @@ -63,29 +67,60 @@ class MovStats {
movsum_ += x - x_old;
movsum2_ += x2 - x2_old;

if (first_) {
first_ = false;
mov_max_ = x;
mov_max_cntr_++;
} else {
if (x > mov_max_) {
mov_max_ = x;
mov_max_cntr_ = 1;
} else if (x == mov_max_) {
mov_max_cntr_++;
}

if (mov_max_ == x_old) {
mov_
}
}

buffer_i_++;
if (buffer_i_ == win_len_) {
buffer_i_ = 0;
full_ = true;
}

slide_max(x, x_old);
slide_min(x, x_old);
}

// Keeping a sliding max by using a sorted deque.
// The wedge is always sorted in descending order.
// The current max is always at the front of the wedge.
// https://www.geeksforgeeks.org/sliding-window-maximum-maximum-of-all-subarrays-of-size-k/
void slide_max(const T& x, const T x_old) {
if (queue_max_.is_empty()) {
queue_max_.push_back(x);
curr_max_ = x;
} else {
if (queue_max_.front() == x_old) {
queue_max_.pop_front();
curr_max_ = queue_max_.front();
}
while (!queue_max_.is_empty() && queue_max_.back() < x) {
queue_max_.pop_back();
}
if (queue_max_.is_empty()) {
curr_max_ = x;
}
queue_max_.push_back(x);
}
}

// Keeping a sliding min by using a sorted deque.
// The wedge is always sorted in ascending order.
// The current min is always at the front of the wedge.
// https://www.geeksforgeeks.org/sliding-window-maximum-maximum-of-all-subarrays-of-size-k/
void slide_min(const T& x, const T x_old) {
if (queue_min_.is_empty()) {
queue_min_.push_back(x);
curr_min_ = x;
} else {
if (queue_min_.front() == x_old) {
queue_min_.pop_front();
curr_min_ = queue_min_.front();
}
while (!queue_min_.is_empty() && queue_min_.back() > x) {
queue_min_.pop_back();
}
if (queue_min_.is_empty()) {
curr_min_ = x;
}
queue_min_.push_back(x);
}
}

//! Get moving average value.
Expand All @@ -106,6 +141,16 @@ class MovStats {
}
}

T mov_max() const
{
return curr_max_;
}

T mov_min() const
{
return curr_min_;
}

//! Extend rolling window length.
//! @remarks
//! Potentially could cause a gap in the estimated values as
Expand Down Expand Up @@ -137,6 +182,7 @@ class MovStats {
private:
Array<T> buffer_;
Array<T> buffer2_;

const size_t win_len_;
size_t buffer_i_;
T movsum_;
Expand All @@ -147,6 +193,85 @@ class MovStats {

bool full_;
bool first_;

class Queue {
public:
Queue(core::IArena& arena, size_t len)
: buff_(arena)
, buff_len_(len)
, begin_(0)
, end_(0)
{
if (!buff_.resize(len)) {
roc_panic("Queue: can't allocate storage for the buffer");
}
}

T& front()
{
if (is_empty()) {
roc_panic("Queue: front() called on empty buffer");
}
return buff_[begin_];
}

T& back()
{
if (is_empty()) {
roc_panic("Queue: back() called on empty buffer");
}
return buff_[(end_ - 1 + buff_len_) % buff_len_];
}

size_t len() const
{
return (end_ - begin_ + buff_len_) % buff_len_;
}

void push_front(const T& x)
{
begin_ = (begin_ - 1 + buff_len_) % buff_len_;
buff_[begin_] = x;
}

void pop_front()
{
if (is_empty()) {
roc_panic("Queue: pop_front() called on empty buffer");
}
begin_ = (begin_ + 1) % buff_len_;
}

void push_back(const T& x)
{
buff_[end_] = x;
end_ = (end_ + 1) % buff_len_;
}

void pop_back()
{
if (is_empty()) {
roc_panic("Queue: pop_back() called on empty buffer");
}
end_ = (end_ - 1 + buff_len_) % buff_len_;
}

bool is_empty()
{
return begin_ == end_;
}

private:
Array<T> buff_;
size_t buff_len_;
size_t begin_;
size_t end_;
};

Queue queue_max_;
T curr_max_;
Queue queue_min_;
T curr_min_;
};

} // namespace core
Expand Down
6 changes: 4 additions & 2 deletions src/internal_modules/roc_pipeline/receiver_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ ReceiverSession::ReceiverSession(

packet::IWriter* pwriter = source_queue_.get();

source_meter_.reset(new (source_meter_) rtp::LinkMeter());
source_meter_.reset(new (source_meter_)
rtp::LinkMeter(arena, encoding->sample_spec, 100));
if (!source_meter_) {
return;
}
Expand Down Expand Up @@ -85,7 +86,8 @@ ReceiverSession::ReceiverSession(
return;
}

repair_meter_.reset(new (repair_meter_) rtp::LinkMeter());
repair_meter_.reset(new (repair_meter_)
rtp::LinkMeter(arena, encoding->sample_spec, 100));
if (!repair_meter_) {
return;
}
Expand Down
63 changes: 60 additions & 3 deletions src/internal_modules/roc_rtp/link_meter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,28 @@
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/

#include "roc_rtp/link_meter.h"
#include "roc_core/panic.h"
#include "link_meter.h"

namespace roc {
namespace rtp {

LinkMeter::LinkMeter()
LinkMeter::LinkMeter(core::IArena& arena,
const audio::SampleSpec& sample_spec,
size_t run_win_len)
: writer_(NULL)
, reader_(NULL)
, sample_spec_(sample_spec)
, first_packet_(true)
, has_metrics_(false)
, seqnum_hi_(0)
, seqnum_lo_(0) {
, seqnum_lo_(0)
, lost_(0)
, fract_lost_counter_(0)
, jitter_processed_(0)
, period_n_packets_(0)
, prev_packet_enq_ts_(-1)
, packet_jitter_stats_(arena, run_win_len) {
}

status::StatusCode LinkMeter::write(const packet::PacketPtr& packet) {
Expand Down Expand Up @@ -76,13 +85,61 @@ void LinkMeter::update_metrics_(const packet::Packet& packet) {
// Detect wrap.
seqnum_hi_ += (uint16_t)-1;
}

if (!first_packet_) {
const size_t gap = gap_(packet);
// Compute jitter only on consequential packets.
if (gap == 0 && prev_pack_duration_ > 0){
const core::nanoseconds_t d_enq_ts = packet.udp()->enqueue_ts -
prev_packet_enq_ts_;
const core::nanoseconds_t d_capt_ts = sample_spec_.samples_per_chan_2_ns(prev_pack_duration_);
packet_jitter_stats_.add(std::abs(d_enq_ts - d_capt_ts));
metrics_.max_jitter = packet_jitter_stats_.mov_max();
metrics_.min_jitter = packet_jitter_stats_.mov_min();
jitter_processed_++;
metrics_.jitter = sample_spec_.ns_2_samples_per_chan(mean_jitter());
} else {
lost_ += gap;
fract_lost_counter_ += gap;
metrics_.num_packets_covered += gap;
}
}

prev_packet_enq_ts_ = packet.udp()->enqueue_ts;
prev_pack_duration_ = packet.rtp()->duration;
seqnum_lo_ = packet.rtp()->seqnum;
metrics_.ext_last_seqnum = seqnum_hi_ + seqnum_lo_;
prev_stream_timestamp = packet.rtp()->stream_timestamp;
period_n_packets_++;
metrics_.num_packets_covered++;
metrics_.cum_loss = lost_;
metrics_.fract_loss = (float)fract_lost_counter_ / (float)(fract_lost_counter_ + period_n_packets_);
}

first_packet_ = false;
has_metrics_ = true;
}

size_t rtp::LinkMeter::gap_(const packet::Packet& packet) const {
if (first_packet_) {
roc_panic("RTPStats: attempt to detect gap on the first received packet");
}

return (size_t)abs(packet::seqnum_diff( packet.rtp()->seqnum, seqnum_lo_ + 1));
}

core::nanoseconds_t rtp::LinkMeter::mean_jitter() const {
return packet_jitter_stats_.mov_avg();
}

core::nanoseconds_t rtp::LinkMeter::var_jitter() const {
return packet_jitter_stats_.mov_var();
}

void LinkMeter::reset_metrics() {
period_n_packets_ = 0;
fract_lost_counter_ = 0;
metrics_.num_packets_covered = 0;
}
} // namespace rtp
} // namespace roc
Loading

0 comments on commit 0e5ebe0

Please sign in to comment.