From 0edbfdb692cc0b5a54729679c7c36910f2874304 Mon Sep 17 00:00:00 2001 From: Souptik Barman Date: Mon, 15 Jul 2024 20:07:40 +0530 Subject: [PATCH] gh-751: feature checkpoint, rolling minimum --- src/internal_modules/roc_core/mov_quantile.h | 139 +++++++++++++++++++ src/tests/roc_core/test_mov_quantile.cpp | 40 ++++++ 2 files changed, 179 insertions(+) create mode 100644 src/internal_modules/roc_core/mov_quantile.h create mode 100644 src/tests/roc_core/test_mov_quantile.cpp diff --git a/src/internal_modules/roc_core/mov_quantile.h b/src/internal_modules/roc_core/mov_quantile.h new file mode 100644 index 000000000..58cc0d53d --- /dev/null +++ b/src/internal_modules/roc_core/mov_quantile.h @@ -0,0 +1,139 @@ +/* + * Copyright (c) 2023 Roc Streaming authors + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ + +//! @file roc_core/mov_quantile.h +//! @brief Rolling window moving quantile. + +#ifndef ROC_CORE_MOV_QUANTILE_H_ +#define ROC_CORE_MOV_QUANTILE_H_ + +#include "roc_core/array.h" +#include "roc_core/iarena.h" +#include "roc_core/panic.h" + +namespace roc { +namespace core { + +template class MovQuantile { + public: + const size_t win_len_; + const size_t k_; + size_t elem_index_; + bool win_filled_; + Array heap_; + Array elem_index_heap_index_; + Array heap_index_elem_index_; + + MovQuantile(IArena& arena, const size_t win_len, const size_t k) + : win_len_(win_len) + , k_(k) + , elem_index_(0) + , win_filled_ (false) + , heap_ (arena) + , elem_index_heap_index_(arena) + , heap_index_elem_index_(arena) { + if (win_len == 0) { + roc_panic("mov quantile: window length must be greater than 0"); + } + if (k_ > win_len_){ + roc_panic("smallest element index should be less than window size"); + } + if (!heap_.resize(win_len)) { + return; + } + if (!elem_index_heap_index_.resize(win_len)) { + return; + } + if (!heap_index_elem_index_.resize(win_len)) { + return; + } + for(size_t i = 0 ; i < win_len; i++){ + elem_index_heap_index_[i] = i; + heap_index_elem_index_[i] = i; + } + } + bool is_valid(){ + return (win_len_ == 5); + } + + void swap(size_t index_1, size_t index_2){ + T temp = heap_[index_1]; + size_t temp_index = heap_index_elem_index_[index_1]; + heap_[index_1] = heap_[index_2]; + heap_index_elem_index_[index_1] = heap_index_elem_index_[index_2]; + heap_[index_2] = temp; + heap_index_elem_index_[index_2] = temp_index; + + elem_index_heap_index_[heap_index_elem_index_[index_1]] = index_2; + elem_index_heap_index_[heap_index_elem_index_[index_2]] = index_1; + } + + void heapify_up(size_t heap_index){ + //sift up + if (heap_index == 0){ + return; + } + size_t parent = (heap_index - 1)/2; + if(heap_[parent] > heap_[heap_index]){ + swap(heap_index, parent); + heapify_up(parent); + } + } + + void heapify_down(size_t heap_index, size_t heap_size){ + size_t largest = heap_index; + + size_t left = 2*heap_index + 1; + if(left < heap_size && heap_[left] < heap_[largest]) + largest = left; + size_t right = 2*heap_index + 2; + if(right < heap_size && heap_[right] < heap_[largest]) + largest = right; + + if(largest != heap_index){ + swap(heap_index,largest); + heapify_down(largest,heap_size); + } + } + + void heapify(size_t heap_index, size_t heap_size){ + size_t parent = (heap_index-1)/2; + if(heap_index!=0 && heap_[parent] > heap_[heap_index]){ + heapify_up(heap_index); + return; + } + else{ + heapify_down(heap_index,heap_size); + return; + } + } + + void add(const T& x){ + if ((elem_index_ + 1) == win_len_) + win_filled_ = true; + size_t heap_size = elem_index_ + 1; + if(win_filled_){ + heap_size = win_len_; + } + size_t heap_index = elem_index_heap_index_[elem_index_]; + heap_[heap_index] = x; + heapify(heap_index,heap_size); + elem_index_ = (elem_index_ + 1) % win_len_; + + } + + T sliding_minimum(){ + return heap_[0]; + } +}; + + +} //namespace core +} //namespace roc + +#endif \ No newline at end of file diff --git a/src/tests/roc_core/test_mov_quantile.cpp b/src/tests/roc_core/test_mov_quantile.cpp new file mode 100644 index 000000000..c9471ebf6 --- /dev/null +++ b/src/tests/roc_core/test_mov_quantile.cpp @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2023 Roc Streaming authors + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ + +#include + +#include "roc_core/heap_arena.h" +#include "roc_core/mov_quantile.h" + +namespace roc{ +namespace core{ + +TEST_GROUP(movquantile){ + HeapArena arena; +}; + +TEST(movquantile, testing_code){ + const size_t n = 7; + MovQuantile quant(arena,n,2); + quant.add(5); + quant.add(4); + quant.add(2); + quant.add(5); + quant.add(6); + + int64_t x = quant.sliding_minimum(); + LONGS_EQUAL((int64_t)2,x); + + quant.add(10); + quant.add(12); + quant.add(4); + int64_t x1 = quant.sliding_minimum(); + LONGS_EQUAL((int64_t)4,x1); +} +} +}