Skip to content

Commit

Permalink
MovStats: refactor, move out queue
Browse files Browse the repository at this point in the history
  • Loading branch information
baranovmv committed May 29, 2024
1 parent a0e1696 commit a22c2eb
Show file tree
Hide file tree
Showing 3 changed files with 138 additions and 81 deletions.
107 changes: 27 additions & 80 deletions src/internal_modules/roc_core/mov_stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,19 @@
#include "roc_core/array.h"
#include "roc_core/iarena.h"
#include "roc_core/panic.h"
#include "roc_core/queue.h"

namespace roc {
namespace core {

//! Rolling window moving average and variance.
//!
//! @remarks
//! Efficiently implements moving average and variance based on approach
//! described in https://www.dsprelated.com/showthread/comp.dsp/97276-1.php
//!
//! @tparam T defines a sample type.
//!
//! @note T should be trivially copyable.
template <typename T> class MovStats {
public:
//! Initialize.
Expand All @@ -43,11 +46,16 @@ template <typename T> class MovStats {
, curr_max_(T(0))
, queue_min_(arena, win_len + 1)
, curr_min_(T(0)) {

if (win_len == 0) {
roc_panic("mov stats: window length must be greater than 0");
}

if (!buffer_.resize(win_len)) {
roc_panic("MovStats: can't allocate storage for the ring buffer");
return;
}
if (!buffer2_.resize(win_len)) {
roc_panic("MovStats: can't allocate storage for the ring buffer");
return;
}
memset(buffer_.data(), 0, sizeof(T) * buffer_.size());
memset(buffer2_.data(), 0, sizeof(T) * buffer2_.size());
Expand All @@ -70,8 +78,8 @@ template <typename T> class MovStats {
full_ = true;
}

slide_max(x, x_old);
slide_min(x, x_old);
slide_max_(x, x_old);
slide_min_(x, x_old);
}

//! Get moving average value.
Expand Down Expand Up @@ -120,15 +128,15 @@ template <typename T> class MovStats {
//! [■■■■■■■■■■□□□□□□□□□□□□□□□□□□□□□--------------------]
//! ↑ ↑ ↑
//! Dropped samples.
void extend_win(const size_t new_win) {
ROC_ATTR_NODISCARD bool extend_win(const size_t new_win) {
if (new_win <= win_len_) {
roc_panic("MovStats: the window length can only grow");
roc_panic("mov stats: the window length can only grow");
}
if (!buffer_.resize(new_win)) {
roc_panic("MovStats: can not increase storage");
return false;
}
if (!buffer2_.resize(new_win)) {
roc_panic("MovStats: can not increase storage");
return false;
}

movsum_ = 0;
Expand All @@ -138,6 +146,12 @@ template <typename T> class MovStats {
movsum2_ += buffer2_[i];
}
full_ = false;
return true;
}

//! Check that initial allocation succeeded.
bool is_valid() const {
return buffer_.data() && buffer2_.data();
}

private:
Expand All @@ -146,7 +160,7 @@ template <typename T> class MovStats {
//! The wedge is always sorted in descending order.
//! The current max is always at the front of the wedge.
//! https://www.geeksforgeeks.org/sliding-window-maximum-maximum-of-all-subarrays-of-size-k/
void slide_max(const T& x, const T x_old) {
void slide_max_(const T& x, const T x_old) {
if (queue_max_.is_empty()) {
queue_max_.push_back(x);
curr_max_ = x;
Expand All @@ -170,7 +184,7 @@ template <typename T> class MovStats {
//! The wedge is always sorted in ascending order.
//! The current min is always at the front of the wedge.
//! https://www.geeksforgeeks.org/sliding-window-maximum-maximum-of-all-subarrays-of-size-k/
void slide_min(const T& x, const T x_old) {
void slide_min_(const T& x, const T x_old) {
if (queue_min_.is_empty()) {
queue_min_.push_back(x);
curr_min_ = x;
Expand Down Expand Up @@ -203,76 +217,9 @@ template <typename T> class MovStats {
bool full_;
bool first_;

class Queue {
public:
Queue(core::IArena& arena, size_t len)
: buff_(arena)
, buff_len_(len)
, begin_(0)
, end_(0) {
if (!buff_.resize(len)) {
roc_panic("Queue: can't allocate storage for the buffer");
}
}

T& front() {
if (is_empty()) {
roc_panic("Queue: front() called on empty buffer");
}
return buff_[begin_];
}

T& back() {
if (is_empty()) {
roc_panic("Queue: back() called on empty buffer");
}
return buff_[(end_ - 1 + buff_len_) % buff_len_];
}

size_t len() const {
return (end_ - begin_ + buff_len_) % buff_len_;
}

void push_front(const T& x) {
begin_ = (begin_ - 1 + buff_len_) % buff_len_;
buff_[begin_] = x;
roc_panic_if_msg(end_ == begin_, "Queue: buffer overflow");
}

void pop_front() {
if (is_empty()) {
roc_panic("Queue: pop_front() called on empty buffer");
}
begin_ = (begin_ + 1) % buff_len_;
}

void push_back(const T& x) {
buff_[end_] = x;
end_ = (end_ + 1) % buff_len_;
roc_panic_if_msg(end_ == begin_, "Queue: buffer overflow");
}

void pop_back() {
if (is_empty()) {
roc_panic("Queue: pop_back() called on empty buffer");
}
end_ = (end_ - 1 + buff_len_) % buff_len_;
}

bool is_empty() {
return begin_ == end_;
}

private:
Array<T> buff_;
size_t buff_len_;
size_t begin_;
size_t end_;
};

Queue queue_max_;
Queue<T> queue_max_;
T curr_max_;
Queue queue_min_;
Queue<T> queue_min_;
T curr_min_;
};

Expand Down
110 changes: 110 additions & 0 deletions src/internal_modules/roc_core/queue.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Copyright (c) 2024 Roc Streaming authors
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/

//! @file roc_core/queue.h
//! @brief Queue for trivially typed elements with continuous memory buffer.

#ifndef ROC_CORE_QUEUE_H_
#define ROC_CORE_QUEUE_H_

#include "roc_core/array.h"
#include "roc_core/iarena.h"
#include "roc_core/panic.h"

namespace roc {
namespace core {

//! Queue on dynamic array.
//! @tparam T defines type of a single element.
//!
template <class T> class Queue {
public:
//! Initialize.
//! @remarks
//! Preallocate buffer in @p arena with @p len number of elements.
Queue(core::IArena& arena, size_t len)
: buff_(arena)
, buff_len_(len)
, begin_(0)
, end_(0) {
if (len == 0) {
roc_panic("queue: the length must be greater than 0");
}

if (!buff_.resize(len)) {
roc_panic("queue: can't allocate storage for the buffer");
}
}

//! Get reference of the front element.
T& front() {
if (is_empty()) {
roc_panic("queue: front() called on empty buffer");
}
return buff_[begin_];
}

//! Get reference of the back element.
T& back() {
if (is_empty()) {
roc_panic("queue: back() called on empty buffer");
}
return buff_[(end_ - 1 + buff_len_) % buff_len_];
}

//! Get number of elements in the queue.
size_t len() const {
return (end_ - begin_ + buff_len_) % buff_len_;
}

//! Push an element to the front of the queue.
void push_front(const T& x) {
begin_ = (begin_ - 1 + buff_len_) % buff_len_;
buff_[begin_] = x;
roc_panic_if_msg(end_ == begin_, "queue: buffer overflow");
}

//! Remove the first element from the front.
void pop_front() {
if (is_empty()) {
roc_panic("queue: pop_front() called on empty buffer");
}
begin_ = (begin_ + 1) % buff_len_;
}

//! Push an element to the backside of the queue.
void push_back(const T& x) {
buff_[end_] = x;
end_ = (end_ + 1) % buff_len_;
roc_panic_if_msg(end_ == begin_, "queue: buffer overflow");
}

//! Remove the first element from the back.
void pop_back() {
if (is_empty()) {
roc_panic("queue: pop_back() called on empty buffer");
}
end_ = (end_ - 1 + buff_len_) % buff_len_;
}

//! Is the queue empty.
bool is_empty() {
return begin_ == end_;
}

private:
Array<T> buff_;
size_t buff_len_;
size_t begin_;
size_t end_;
};

} // namespace core
} // namespace roc

#endif // ROC_CORE_QUEUE_H_
2 changes: 1 addition & 1 deletion src/tests/roc_core/test_mov_stats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ TEST(movstats, one_n_half_extend) {
LONGS_EQUAL(target_avg, stats.mov_avg());
LONGS_EQUAL(target_var, stats.mov_var());

stats.extend_win(n * 10);
CHECK(stats.extend_win(n * 10));

LONGS_EQUAL((int64_t)ceil(n * 1.25), stats.mov_avg()); // [n; n + n/2]
LONGS_EQUAL(target_var / 2, stats.mov_var());
Expand Down

0 comments on commit a22c2eb

Please sign in to comment.