Skip to content

Commit

Permalink
chore: Cosmetic changes in MovStats and MovQuantile
Browse files Browse the repository at this point in the history
  • Loading branch information
gavv authored and jeshwanthreddy13 committed Aug 6, 2024
1 parent b3c9a8a commit 72c9798
Show file tree
Hide file tree
Showing 4 changed files with 162 additions and 153 deletions.
266 changes: 138 additions & 128 deletions src/internal_modules/roc_core/mov_quantile.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023 Roc Streaming authors
* Copyright (c) 2024 Roc Streaming authors
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
Expand All @@ -19,16 +19,16 @@
namespace roc {
namespace core {

//! Rolling window moving quantile
//!@remarks
//! Rolling window moving quantile.
//! @tparam T defines a sample type.
//! @remarks
//! Efficiently implements moving quantile using partition heap based on approach
//! described in https://aakinshin.net/posts/partitioning-heaps-quantile-estimator/.
//! It follows the quantile estimator strategy mentioned in the document
//! @tparam T defines a sample type.
//! It follows the quantile estimator strategy mentioned in the document
template <typename T> class MovQuantile {
public:
//! Initialize
MovQuantile(IArena& arena, const size_t win_len, const double quantile)
//! Initialize
: win_len_(win_len)
, quantile_(quantile)
, old_heap_root_index_(0)
Expand All @@ -40,8 +40,8 @@ template <typename T> class MovQuantile {
, win_filled_(false)
, valid_(false)
, heap_(arena)
, elem_index_heap_index_(arena)
, heap_index_elem_index_(arena) {
, elem_index_2_heap_index_(arena)
, heap_index_2_elem_index_(arena) {
if (win_len == 0) {
roc_panic("mov quantile: window length must be greater than 0");
}
Expand All @@ -51,14 +51,14 @@ template <typename T> class MovQuantile {
if (!heap_.resize(win_len)) {
return;
}
if (!elem_index_heap_index_.resize(win_len)) {
if (!elem_index_2_heap_index_.resize(win_len)) {
return;
}
if (!heap_index_elem_index_.resize(win_len)) {
if (!heap_index_2_elem_index_.resize(win_len)) {
return;
}
double index = (quantile * static_cast<double>(win_len - 1));
heap_root_ = static_cast<size_t>(index);
const double index = (quantile * double(win_len - 1));
heap_root_ = size_t(index);
max_heap_index_ = heap_root_;
min_heap_index_ = heap_root_;
valid_ = true;
Expand All @@ -69,163 +69,173 @@ template <typename T> class MovQuantile {
return valid_;
}

//! Swaps 2 heap elements along with their mapping in element to heap index and heap
//! to element index
void swap(size_t index_1, size_t index_2) {
size_t elem_index_1 = heap_index_elem_index_[index_1];
size_t elem_index_2 = heap_index_elem_index_[index_2];
//! Returns the moving quantile
T mov_quantile() const {
return heap_[heap_root_];
}

T temp = heap_[index_1];
heap_[index_1] = heap_[index_2];
heap_[index_2] = temp;
//! Insert or swaps elements in the partition heap
//! @remarks
//! Case 1: The window is filled. The element in heap is changed whose
//! element_index%window_length is equal to arrived element. heapify_() is called post
//! that Case 2: The window in not filled. In this case we insert element in max_heap,
//! min_heap or root based on the current percentile index
void add(const T& x) {
if (elem_index_ == win_len_) {
win_filled_ = true;
}
heap_size_ = elem_index_ + 1;
elem_index_ = (elem_index_) % win_len_;
if (win_filled_) {
heap_size_ = win_len_;
min_heap_index_ = win_len_ - 1;
max_heap_index_ = 0;
const size_t heap_index = elem_index_2_heap_index_[elem_index_];
heap_[heap_index] = x;
heapify_(heap_index);
} else {
const double index = quantile_ * double(heap_size_ - 1);
const size_t k = size_t(index);
size_t heap_index;
if (elem_index_ == 0) {
heap_index = heap_root_;
elem_index_2_heap_index_[elem_index_] = heap_index;
heap_[heap_index] = x;
heap_index_2_elem_index_[heap_index] = elem_index_;
} else {
if (old_heap_root_index_ == k) {
min_heap_index_ += 1;
heap_index = min_heap_index_;
} else {
max_heap_index_ -= 1;
heap_index = max_heap_index_;
}
elem_index_2_heap_index_[elem_index_] = heap_index;
heap_[heap_index] = x;
heap_index_2_elem_index_[heap_index] = elem_index_;
heapify_(heap_index);
old_heap_root_index_ = k;
}
}

heap_index_elem_index_[index_1] = elem_index_2;
heap_index_elem_index_[index_2] = elem_index_1;
elem_index_ = elem_index_ + 1;
}

elem_index_heap_index_[elem_index_1] = index_2;
elem_index_heap_index_[elem_index_2] = index_1;
private:
//! Maintains property of the partition heap when an element in inserted or swapped.
//! @remarks
//! The element could be inserted or changed in min_heap, max_heap or the root.
void heapify_(const size_t heap_index) {
if (heap_index < heap_root_) {
const size_t parent = heap_root_ - ((heap_root_ - heap_index - 1) / 2);
if (heap_[parent] < heap_[heap_index]) {
max_heapify_up_(heap_index);
min_heapify_down_(heap_root_);
} else {
max_heapify_down_(heap_index);
}
} else if (heap_root_ == heap_index) {
max_heapify_down_(heap_index);
min_heapify_down_(heap_root_);
} else {
const size_t parent = (heap_index - heap_root_ - 1) / 2 + heap_root_;
if (heap_[parent] > heap_[heap_index]) {
min_heapify_up_(heap_index);
max_heapify_down_(heap_root_);
} else {
min_heapify_down_(heap_index);
}
}
}

//! Recursively swaps parent and element in min heap partition until the parent is
//! smaller or element reaches root index.
void min_heapify_up(size_t heap_index) {
void min_heapify_up_(const size_t heap_index) {
if (heap_index == heap_root_) {
return;
}
size_t parent = (heap_index - heap_root_ - 1) / 2 + heap_root_;
const size_t parent = (heap_index - heap_root_ - 1) / 2 + heap_root_;
if (heap_[parent] > heap_[heap_index]) {
swap(heap_index, parent);
min_heapify_up(parent);
swap_(heap_index, parent);
min_heapify_up_(parent);
}
}

//! Recursively swaps parent and element in max heap partition until the parent is
//! larger or element reaches root index.
//! @remarks
//! The root index in max heap partition is larger than all its child index so parent
//! index formulae has been adjusted accordingly
void max_heapify_up(size_t heap_index) {
void max_heapify_up_(const size_t heap_index) {
// sift up
if (heap_index == heap_root_) {
return;
}
size_t parent = heap_root_ - ((heap_root_ - heap_index - 1) / 2);
const size_t parent = heap_root_ - ((heap_root_ - heap_index - 1) / 2);
if (heap_[parent] < heap_[heap_index]) {
swap(heap_index, parent);
max_heapify_up(parent);
swap_(heap_index, parent);
max_heapify_up_(parent);
}
}

//! Recursively swaps children and element in min heap partition until the children
//! are smaller or there are no children
void min_heapify_down(size_t heap_index) {
void min_heapify_down_(const size_t heap_index) {
size_t largest = heap_index;

size_t left = 2 * (heap_index - heap_root_) + 1 + heap_root_;
if (left <= min_heap_index_ && heap_[left] < heap_[largest])
const size_t left = 2 * (heap_index - heap_root_) + 1 + heap_root_;
if (left <= min_heap_index_ && heap_[left] < heap_[largest]) {
largest = left;
size_t right = 2 * (heap_index - heap_root_) + 2 + heap_root_;
if (right <= min_heap_index_ && heap_[right] < heap_[largest])
}
const size_t right = 2 * (heap_index - heap_root_) + 2 + heap_root_;
if (right <= min_heap_index_ && heap_[right] < heap_[largest]) {
largest = right;
}

if (largest != heap_index) {
swap(heap_index, largest);
min_heapify_down(largest);
swap_(heap_index, largest);
min_heapify_down_(largest);
}
}

//! Recursively swaps children and element in max heap partition until the children
//! are larger or there are no children.
//! @remarks
//! Similar adjustment to child index calculation like in max_heapify_up
void max_heapify_down(size_t heap_index) {
void max_heapify_down_(const size_t heap_index) {
size_t largest = heap_index;

size_t left = 2 * (heap_root_ - heap_index) + 1;
const size_t left = 2 * (heap_root_ - heap_index) + 1;
if (left <= (heap_root_ - max_heap_index_)
&& heap_[heap_root_ - left] > heap_[largest])
&& heap_[heap_root_ - left] > heap_[largest]) {
largest = heap_root_ - left;
size_t right = 2 * (heap_root_ - heap_index) + 2;
}
const size_t right = 2 * (heap_root_ - heap_index) + 2;
if (right <= (heap_root_ - max_heap_index_)
&& heap_[heap_root_ - right] > heap_[largest])
&& heap_[heap_root_ - right] > heap_[largest]) {
largest = heap_root_ - right;

if (largest != heap_index) {
swap(heap_index, largest);
max_heapify_down(largest);
}
}
//! Maintains property of the partition heap when an element in inserted or swapped.
//! @remarks
//! The element could be inserted or changed in min_heap, max_heap or the root.
void heapify(size_t heap_index) {
if (heap_index < heap_root_) {
size_t parent = heap_root_ - ((heap_root_ - heap_index - 1) / 2);
if (heap_[parent] < heap_[heap_index]) {
max_heapify_up(heap_index);
min_heapify_down(heap_root_);
} else {
max_heapify_down(heap_index);
}
} else if (heap_root_ == heap_index) {
max_heapify_down(heap_index);
min_heapify_down(heap_root_);
} else {
size_t parent = (heap_index - heap_root_ - 1) / 2 + heap_root_;
if (heap_[parent] > heap_[heap_index]) {
min_heapify_up(heap_index);
max_heapify_down(heap_root_);
} else {
min_heapify_down(heap_index);
}
if (largest != heap_index) {
swap_(heap_index, largest);
max_heapify_down_(largest);
}
}
//! Insert or swaps elements in the partition heap
//! @remarks
//! Case 1: The window is filled. The element in heap is changed whose
//! element_index%window_length is equal to arrived element. heapify is called post
//! that Case 2: The window in not filled. In this case we insert element in max_heap,
//! min_heap or root based on the current percentile index
void add(const T& x) {
if (elem_index_ == win_len_)
win_filled_ = true;
heap_size_ = elem_index_ + 1;
elem_index_ = (elem_index_) % win_len_;
if (win_filled_) {
heap_size_ = win_len_;
min_heap_index_ = win_len_ - 1;
max_heap_index_ = 0;
size_t heap_index = elem_index_heap_index_[elem_index_];
heap_[heap_index] = x;
heapify(heap_index);
} else {
double index = quantile_ * static_cast<double>(heap_size_ - 1);
size_t k = static_cast<size_t>(index);
size_t heap_index;
if (elem_index_ == 0) {
heap_index = heap_root_;
elem_index_heap_index_[elem_index_] = heap_index;
heap_[heap_index] = x;
heap_index_elem_index_[heap_index] = elem_index_;
} else {
if (old_heap_root_index_ == k) {
min_heap_index_ += 1;
heap_index = min_heap_index_;

} else {
max_heap_index_ -= 1;
heap_index = max_heap_index_;
}
elem_index_heap_index_[elem_index_] = heap_index;
heap_[heap_index] = x;
heap_index_elem_index_[heap_index] = elem_index_;
heapify(heap_index);
old_heap_root_index_ = k;
}
}
//! Swaps 2 heap elements along with their mapping in element to heap index and heap
//! to element index
void swap_(const size_t index_1, const size_t index_2) {
const size_t elem_index_1 = heap_index_2_elem_index_[index_1];
const size_t elem_index_2 = heap_index_2_elem_index_[index_2];

elem_index_ = elem_index_ + 1;
}
//! Returns the moving quantile
T sliding_quantile() {
return heap_[heap_root_];
const T temp = heap_[index_1];
heap_[index_1] = heap_[index_2];
heap_[index_2] = temp;

heap_index_2_elem_index_[index_1] = elem_index_2;
heap_index_2_elem_index_[index_2] = elem_index_1;

elem_index_2_heap_index_[elem_index_1] = index_2;
elem_index_2_heap_index_[elem_index_2] = index_1;
}

//! Length of the sliding window
Expand All @@ -235,30 +245,30 @@ template <typename T> class MovQuantile {

//! Used to check the window filling logic
size_t old_heap_root_index_;
//! Index which seperates max and min heap and also act as their root
//! Index which separates max and min heap and also act as their root
size_t heap_root_;

//! Maintains current heap size
size_t heap_size_;
//! Maintians the index to which max_heap extends
//! Maintains the index to which max_heap extends
size_t max_heap_index_;
//! Maintians the index to which min_heap extends
//! Maintains the index to which min_heap extends
size_t min_heap_index_;

//! Maintians current element index
//! Maintains current element index
size_t elem_index_;

//! Window filled check
bool win_filled_;
//! Maintians initialization success
//! Maintains initialization success
bool valid_;

//! Maintians the parititon heap
//! Maintains the partition heap
Array<T> heap_;
//! Maintians the element index to heap index mapping
Array<size_t> elem_index_heap_index_;
//! Maintains the element index to heap index mapping
Array<size_t> elem_index_2_heap_index_;
//! Maintains the heap index to element index mapping
Array<size_t> heap_index_elem_index_;
Array<size_t> heap_index_2_elem_index_;
};

} // namespace core
Expand Down
7 changes: 3 additions & 4 deletions src/internal_modules/roc_core/mov_stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,10 @@ namespace roc {
namespace core {

//! Rolling window moving average and variance.
//! @remarks
//! Efficiently implements moving average and variance based on approach
//! described in https://www.dsprelated.com/showthread/comp.dsp/97276-1.php
//!
//! @tparam T defines a sample type.
//! @remarks
//! Efficiently implements moving average and variance based on approach
//! described in https://www.dsprelated.com/showthread/comp.dsp/97276-1.php
template <typename T> class MovStats {
public:
//! Initialize.
Expand Down
Loading

0 comments on commit 72c9798

Please sign in to comment.