Skip to content

Commit

Permalink
roc-streaminggh-751: feature checkpoint, rolling minimum
Browse files Browse the repository at this point in the history
  • Loading branch information
novertia committed Jul 15, 2024
1 parent 118a090 commit 0edbfdb
Show file tree
Hide file tree
Showing 2 changed files with 179 additions and 0 deletions.
139 changes: 139 additions & 0 deletions src/internal_modules/roc_core/mov_quantile.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
* Copyright (c) 2023 Roc Streaming authors
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/

//! @file roc_core/mov_quantile.h
//! @brief Rolling window moving quantile.

#ifndef ROC_CORE_MOV_QUANTILE_H_
#define ROC_CORE_MOV_QUANTILE_H_

#include "roc_core/array.h"
#include "roc_core/iarena.h"
#include "roc_core/panic.h"

namespace roc {
namespace core {

template <typename T> class MovQuantile {
public:
const size_t win_len_;
const size_t k_;
size_t elem_index_;
bool win_filled_;
Array<T> heap_;
Array<size_t> elem_index_heap_index_;
Array<size_t> heap_index_elem_index_;

MovQuantile(IArena& arena, const size_t win_len, const size_t k)
: win_len_(win_len)
, k_(k)
, elem_index_(0)
, win_filled_ (false)
, heap_ (arena)
, elem_index_heap_index_(arena)
, heap_index_elem_index_(arena) {
if (win_len == 0) {
roc_panic("mov quantile: window length must be greater than 0");
}
if (k_ > win_len_){
roc_panic("smallest element index should be less than window size");
}
if (!heap_.resize(win_len)) {
return;
}
if (!elem_index_heap_index_.resize(win_len)) {
return;
}
if (!heap_index_elem_index_.resize(win_len)) {
return;
}
for(size_t i = 0 ; i < win_len; i++){
elem_index_heap_index_[i] = i;
heap_index_elem_index_[i] = i;
}
}
bool is_valid(){
return (win_len_ == 5);
}

void swap(size_t index_1, size_t index_2){
T temp = heap_[index_1];
size_t temp_index = heap_index_elem_index_[index_1];
heap_[index_1] = heap_[index_2];
heap_index_elem_index_[index_1] = heap_index_elem_index_[index_2];
heap_[index_2] = temp;
heap_index_elem_index_[index_2] = temp_index;

elem_index_heap_index_[heap_index_elem_index_[index_1]] = index_2;
elem_index_heap_index_[heap_index_elem_index_[index_2]] = index_1;
}

void heapify_up(size_t heap_index){
//sift up
if (heap_index == 0){
return;
}
size_t parent = (heap_index - 1)/2;
if(heap_[parent] > heap_[heap_index]){
swap(heap_index, parent);
heapify_up(parent);
}
}

void heapify_down(size_t heap_index, size_t heap_size){
size_t largest = heap_index;

size_t left = 2*heap_index + 1;
if(left < heap_size && heap_[left] < heap_[largest])
largest = left;
size_t right = 2*heap_index + 2;
if(right < heap_size && heap_[right] < heap_[largest])
largest = right;

if(largest != heap_index){
swap(heap_index,largest);
heapify_down(largest,heap_size);
}
}

void heapify(size_t heap_index, size_t heap_size){
size_t parent = (heap_index-1)/2;
if(heap_index!=0 && heap_[parent] > heap_[heap_index]){
heapify_up(heap_index);
return;
}
else{
heapify_down(heap_index,heap_size);
return;
}
}

void add(const T& x){
if ((elem_index_ + 1) == win_len_)
win_filled_ = true;
size_t heap_size = elem_index_ + 1;
if(win_filled_){
heap_size = win_len_;
}
size_t heap_index = elem_index_heap_index_[elem_index_];
heap_[heap_index] = x;
heapify(heap_index,heap_size);
elem_index_ = (elem_index_ + 1) % win_len_;

}

T sliding_minimum(){
return heap_[0];
}
};


} //namespace core
} //namespace roc

#endif
40 changes: 40 additions & 0 deletions src/tests/roc_core/test_mov_quantile.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright (c) 2023 Roc Streaming authors
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/

#include <CppUTest/TestHarness.h>

#include "roc_core/heap_arena.h"
#include "roc_core/mov_quantile.h"

namespace roc{
namespace core{

TEST_GROUP(movquantile){
HeapArena arena;
};

TEST(movquantile, testing_code){
const size_t n = 7;
MovQuantile<int64_t> quant(arena,n,2);
quant.add(5);
quant.add(4);
quant.add(2);
quant.add(5);
quant.add(6);

int64_t x = quant.sliding_minimum();
LONGS_EQUAL((int64_t)2,x);

quant.add(10);
quant.add(12);
quant.add(4);
int64_t x1 = quant.sliding_minimum();
LONGS_EQUAL((int64_t)4,x1);
}
}
}

0 comments on commit 0edbfdb

Please sign in to comment.