diff --git a/libconfluo/confluo/container/sketch/confluo_universal_sketch.h b/libconfluo/confluo/container/sketch/confluo_universal_sketch.h index ab16a4130..c623ef764 100644 --- a/libconfluo/confluo/container/sketch/confluo_universal_sketch.h +++ b/libconfluo/confluo/container/sketch/confluo_universal_sketch.h @@ -6,182 +6,11 @@ #include "atomic.h" #include "count_sketch.h" #include "hash_manager.h" -#include "priority_queue.h" +#include "substream_summary.h" namespace confluo { namespace sketch { -template -class confluo_substream_summary { - -public: - typedef atomic::type atomic_counter_t; - typedef std::vector> atomic_vector_t; - typedef count_sketch sketch_t; - typedef heavy_hitter_set heavy_hitter_set_t; - - confluo_substream_summary() = default; - - /** - * Constructor - * @param t depth (number of estimates) - * @param b width (number of buckets) - * @param k number of heavy hitters to track - * @param a heavy hitter threshold - * @param precise track exact heavy hitters - */ - confluo_substream_summary(size_t t, size_t b, size_t k, double a, bool precise = true) - : hh_threshold_(a), - num_hh_(k), - l2_squared_(), - sketch_(t, b), - heavy_hitters_(k), - hhs_precise_(), - hh_hash_(pairwise_indep_hash::generate_random()), - use_precise_hh_(precise) { - } - - confluo_substream_summary(const confluo_substream_summary& other) - : hh_threshold_(other.hh_threshold_), - num_hh_(other.num_hh_), - l2_squared_(atomic::load(&other.l2_squared_)), - sketch_(other.sketch_), - heavy_hitters_(other.heavy_hitters_.size()), - hhs_precise_(other.hhs_precise_), - hh_hash_(other.hh_hash_), - use_precise_hh_(other.use_precise_hh_) { - for (size_t i = 0; i < other.heavy_hitters_.size(); i++) { - atomic::store(&heavy_hitters_[i], atomic::load(&other.heavy_hitters_[i])); - } - } - - confluo_substream_summary& operator=(const confluo_substream_summary& other) { - hh_threshold_ = other.hh_threshold_; - num_hh_ = other.num_hh_; - l2_squared_ = atomic::load(&other.l2_squared_); - sketch_ = other.sketch_; - heavy_hitters_ = atomic_vector_t(other.heavy_hitters_.size()); - hhs_precise_ = other.hhs_precise_; - hh_hash_ = other.hh_hash_; - use_precise_hh_ = other.use_precise_hh_; - for (size_t i = 0; i < other.heavy_hitters_.size(); i++) { - atomic::store(&heavy_hitters_[i], atomic::load(&other.heavy_hitters_[i])); - } - return *this; - } - - void update(size_t key_hash) { - counter_t old_count = sketch_.update_and_estimate(key_hash); - counter_t update = l2_squared_update(old_count); - counter_t old_l2_sq = atomic::faa(&l2_squared_, update); - double new_l2 = std::sqrt(old_l2_sq + update); - if (use_precise_hh_) { - this->update_hh_pq(key_hash, old_count + 1, new_l2); - } else { - this->update_hh_approx(key_hash, old_count + 1, new_l2); - } - } - - /** - * Estimate count - * @param key key - * @return estimated count - */ - counter_t estimate(const byte_string& key) { - return sketch_.estimate(key); - } - - /** - * @return sketch - */ - sketch_t& get_sketch() { - return sketch_; - } - - atomic_vector_t& get_heavy_hitters() { - return heavy_hitters_; - } - - heavy_hitter_set_t& get_pq() { - return hhs_precise_; - } - - /** - * @return size of data structure in bytes - */ - size_t storage_size() { - size_t total_size = 0; - total_size += sketch_.storage_size(); - total_size += heavy_hitters_.size(); - return total_size; - } - -private: - /** - * Update heavy hitters priority queue - * @param key_hash key hash - * @param count frequency count - * @param l2 current l2 norm - */ - void update_hh_pq(size_t key_hash, counter_t count, double l2) { - if (count < hh_threshold_ * l2) { - return; - } - if (hhs_precise_.size() < num_hh_) { - hhs_precise_.remove_if_exists(key_hash); - hhs_precise_.pushp(key_hash, count); - } else { - auto head = hhs_precise_.top().key_; - if (sketch_.estimate(head) < count) { - hhs_precise_.pop(); - hhs_precise_.remove_if_exists(key_hash); - hhs_precise_.pushp(key_hash, count); - } - } - } - - /** - * Update heavy hitters approximate DS - * @param key_hash key hash - * @param count frequency count - * @param l2 current l2 norm - */ - void update_hh_approx(size_t key_hash, counter_t count, double l2) { - if (count < hh_threshold_ * l2) { - return; - } - bool done = false; - while (!done) { - size_t idx = hh_hash_.apply(key_hash) % heavy_hitters_.size(); - size_t prev_key_hash = atomic::load(&heavy_hitters_[idx]); - if (prev_key_hash == key_hash) - return; - counter_t prev_count = sketch_.estimate(prev_key_hash); - done = (prev_count > count) ? true : atomic::strong::cas(&heavy_hitters_[idx], &prev_key_hash, key_hash); - } - } - - /** - * L_2^2 += (c_i + 1)^2 - (c_i)^2 - * @param old_count estimate of a count before an update - */ - static inline counter_t l2_squared_update(counter_t old_count) { - return 2 * old_count + 1; - } - - double hh_threshold_; // heavy hitter threshold - size_t num_hh_; // number of heavy hitters to track (k) - - atomic_counter_t l2_squared_; // L2 norm squared - sketch_t sketch_; - atomic_vector_t heavy_hitters_; - heavy_hitter_set_t hhs_precise_; - pairwise_indep_hash hh_hash_; - - bool use_precise_hh_; - -}; - template class confluo_universal_sketch { @@ -209,7 +38,7 @@ class confluo_universal_sketch { is_valid_(true) { layer_hashes_.guarantee_initialized(l - 1); for (size_t i = 0; i < l; i++) { - substream_summaries_[i] = confluo_substream_summary(t, b, k, a, precise); + substream_summaries_[i] = substream_summary(t, b, k, a, precise); } } @@ -363,11 +192,11 @@ class confluo_universal_sketch { return hashed_value % 2; } - std::vector> substream_summaries_; + std::vector> substream_summaries_; hash_manager layer_hashes_; - schema_t schema_; - column_t column_; + schema_t schema_{}; + column_t column_{}; bool precise_hh_; atomic::type is_valid_; diff --git a/libconfluo/confluo/container/sketch/substream_summary.h b/libconfluo/confluo/container/sketch/substream_summary.h new file mode 100644 index 000000000..969a17a4e --- /dev/null +++ b/libconfluo/confluo/container/sketch/substream_summary.h @@ -0,0 +1,188 @@ +#ifndef CONFLUO_CONTAINER_SKETCH_SUBSTREAM_SUMMARY_H +#define CONFLUO_CONTAINER_SKETCH_SUBSTREAM_SUMMARY_H + +#include + +#include "atomic.h" +#include "count_sketch.h" +#include "hash_manager.h" +#include "priority_queue.h" + +namespace confluo { +namespace sketch { + +template +class substream_summary { + +public: + typedef atomic::type atomic_counter_t; + typedef std::vector> atomic_vector_t; + typedef count_sketch sketch_t; + + substream_summary() = default; + + /** + * Constructor + * @param t depth (number of estimates) + * @param b width (number of buckets) + * @param k number of heavy hitters to track + * @param a heavy hitter threshold + * @param precise track exact heavy hitters + */ + substream_summary(size_t t, size_t b, size_t k, double a, bool precise = true) + : hh_threshold_(a), + num_hh_(k), + l2_squared_(), + sketch_(t, b), + heavy_hitters_(k), + hhs_precise_(), + hh_hash_(pairwise_indep_hash::generate_random()), + use_precise_hh_(precise) { + } + + substream_summary(const substream_summary& other) + : hh_threshold_(other.hh_threshold_), + num_hh_(other.num_hh_), + l2_squared_(atomic::load(&other.l2_squared_)), + sketch_(other.sketch_), + heavy_hitters_(other.heavy_hitters_.size()), + hhs_precise_(other.hhs_precise_), + hh_hash_(other.hh_hash_), + use_precise_hh_(other.use_precise_hh_) { + for (size_t i = 0; i < other.heavy_hitters_.size(); i++) { + atomic::store(&heavy_hitters_[i], atomic::load(&other.heavy_hitters_[i])); + } + } + + substream_summary& operator=(const substream_summary& other) { + hh_threshold_ = other.hh_threshold_; + num_hh_ = other.num_hh_; + l2_squared_ = atomic::load(&other.l2_squared_); + sketch_ = other.sketch_; + heavy_hitters_ = atomic_vector_t(other.heavy_hitters_.size()); + hhs_precise_ = other.hhs_precise_; + hh_hash_ = other.hh_hash_; + use_precise_hh_ = other.use_precise_hh_; + for (size_t i = 0; i < other.heavy_hitters_.size(); i++) { + atomic::store(&heavy_hitters_[i], atomic::load(&other.heavy_hitters_[i])); + } + return *this; + } + + void update(T key) { + counter_t old_count = sketch_.update_and_estimate(key); + counter_t update = l2_squared_update(old_count); + counter_t old_l2_sq = atomic::faa(&l2_squared_, update); + double new_l2 = std::sqrt(old_l2_sq + update); + + if (use_precise_hh_) { + this->update_hh_pq(key, old_count + 1, new_l2); + } else { + this->update_hh_approx(key, old_count + 1, new_l2); + } + } + + /** + * Estimate count + * @param key key + * @return estimated count + */ + counter_t estimate(T key) { + return sketch_.estimate(key); + } + + /** + * @return sketch + */ + sketch_t& get_sketch() { + return sketch_; + } + + atomic_vector_t& get_heavy_hitters() { + return heavy_hitters_; + } + + heavy_hitter_set& get_pq() { + return hhs_precise_; + } + + /** + * @return size of data structure in bytes + */ + size_t storage_size() { + size_t total_size = 0; + total_size += sketch_.storage_size(); + total_size += heavy_hitters_.size(); + return total_size; + } + +private: + /** + * Update heavy hitters priority queue + * @param key key + * @param count frequency count + * @param l2 current l2 norm + */ + void update_hh_pq(T key, counter_t count, double l2) { + if (count < hh_threshold_ * l2) { + return; + } + if (hhs_precise_.size() < num_hh_) { + hhs_precise_.remove_if_exists(key); + hhs_precise_.pushp(key, count); + } else { + T head = hhs_precise_.top().key_; + if (sketch_.estimate(head) < count) { + hhs_precise_.pop(); + hhs_precise_.remove_if_exists(key); + hhs_precise_.pushp(key, count); + } + } + } + + /** + * Update heavy hitters approximate DS + * @param key key + * @param count frequency count + * @param l2 current l2 norm + */ + void update_hh_approx(T key, counter_t count, double l2) { + if (count < hh_threshold_ * l2) { + return; + } + bool done = false; + while (!done) { + size_t idx = hh_hash_.apply(key) % heavy_hitters_.size(); + T prev = atomic::load(&heavy_hitters_[idx]); + if (prev == key) + return; + counter_t prev_count = sketch_.estimate(prev); + done = (prev_count > count) ? true : atomic::strong::cas(&heavy_hitters_[idx], &prev, key); + } + } + + /** + * L_2^2 += (c_i + 1)^2 - (c_i)^2 + * @param old_count estimate of a count before an update + */ + static inline counter_t l2_squared_update(counter_t old_count) { + return 2 * old_count + 1; + } + + double hh_threshold_; // heavy hitter threshold + size_t num_hh_; // number of heavy hitters to track (k) + + atomic_counter_t l2_squared_; // L2 norm squared + sketch_t sketch_; + atomic_vector_t heavy_hitters_; + heavy_hitter_set hhs_precise_; + pairwise_indep_hash hh_hash_; + + bool use_precise_hh_; + +}; + +} +} + +#endif // CONFLUO_CONTAINER_SKETCH_SUBSTREAM_SUMMARY_H diff --git a/libconfluo/confluo/container/sketch/universal_sketch.h b/libconfluo/confluo/container/sketch/universal_sketch.h index 2992a77f1..1e071221d 100644 --- a/libconfluo/confluo/container/sketch/universal_sketch.h +++ b/libconfluo/confluo/container/sketch/universal_sketch.h @@ -6,182 +6,11 @@ #include "atomic.h" #include "count_sketch.h" #include "hash_manager.h" -#include "priority_queue.h" +#include "substream_summary.h" namespace confluo { namespace sketch { -template -class substream_summary { - - public: - typedef atomic::type atomic_counter_t; - typedef std::vector> atomic_vector_t; - typedef count_sketch sketch_t; - - substream_summary() = default; - - /** - * Constructor - * @param t depth (number of estimates) - * @param b width (number of buckets) - * @param k number of heavy hitters to track - * @param a heavy hitter threshold - * @param precise track exact heavy hitters - */ - substream_summary(size_t t, size_t b, size_t k, double a, bool precise = true) - : hh_threshold_(a), - num_hh_(k), - l2_squared_(), - sketch_(t, b), - heavy_hitters_(k), - hhs_precise_(), - hh_hash_(pairwise_indep_hash::generate_random()), - use_precise_hh_(precise) { - } - - substream_summary(const substream_summary& other) - : hh_threshold_(other.hh_threshold_), - num_hh_(other.num_hh_), - l2_squared_(atomic::load(&other.l2_squared_)), - sketch_(other.sketch_), - heavy_hitters_(other.heavy_hitters_.size()), - hhs_precise_(other.hhs_precise_), - hh_hash_(other.hh_hash_), - use_precise_hh_(other.use_precise_hh_) { - for (size_t i = 0; i < other.heavy_hitters_.size(); i++) { - atomic::store(&heavy_hitters_[i], atomic::load(&other.heavy_hitters_[i])); - } - } - - substream_summary& operator=(const substream_summary& other) { - hh_threshold_ = other.hh_threshold_; - num_hh_ = other.num_hh_; - l2_squared_ = atomic::load(&other.l2_squared_); - sketch_ = other.sketch_; - heavy_hitters_ = atomic_vector_t(other.heavy_hitters_.size()); - hhs_precise_ = other.hhs_precise_; - hh_hash_ = other.hh_hash_; - use_precise_hh_ = other.use_precise_hh_; - for (size_t i = 0; i < other.heavy_hitters_.size(); i++) { - atomic::store(&heavy_hitters_[i], atomic::load(&other.heavy_hitters_[i])); - } - return *this; - } - - void update(T key) { - counter_t old_count = sketch_.update_and_estimate(key); - counter_t update = l2_squared_update(old_count); - counter_t old_l2_sq = atomic::faa(&l2_squared_, update); - double new_l2 = std::sqrt(old_l2_sq + update); - - if (use_precise_hh_) { - this->update_hh_pq(key, old_count + 1, new_l2); - } else { - this->update_hh_approx(key, old_count + 1, new_l2); - } - } - - /** - * Estimate count - * @param key key - * @return estimated count - */ - counter_t estimate(T key) { - return sketch_.estimate(key); - } - - /** - * @return sketch - */ - sketch_t& get_sketch() { - return sketch_; - } - - atomic_vector_t& get_heavy_hitters() { - return heavy_hitters_; - } - - heavy_hitter_set& get_pq() { - return hhs_precise_; - } - - /** - * @return size of data structure in bytes - */ - size_t storage_size() { - size_t total_size = 0; - total_size += sketch_.storage_size(); - total_size += heavy_hitters_.size(); - return total_size; - } - - private: - /** - * Update heavy hitters priority queue - * @param key key - * @param count frequency count - * @param l2 current l2 norm - */ - void update_hh_pq(T key, counter_t count, double l2) { - if (count < hh_threshold_ * l2) { - return; - } - if (hhs_precise_.size() < num_hh_) { - hhs_precise_.remove_if_exists(key); - hhs_precise_.pushp(key, count); - } else { - T head = hhs_precise_.top().key_; - if (sketch_.estimate(head) < count) { - hhs_precise_.pop(); - hhs_precise_.remove_if_exists(key); - hhs_precise_.pushp(key, count); - } - } - } - - /** - * Update heavy hitters approximate DS - * @param key key - * @param count frequency count - * @param l2 current l2 norm - */ - void update_hh_approx(T key, counter_t count, double l2) { - if (count < hh_threshold_ * l2) { - return; - } - bool done = false; - while (!done) { - size_t idx = hh_hash_.apply(key) % heavy_hitters_.size(); - T prev = atomic::load(&heavy_hitters_[idx]); - if (prev == key) - return; - counter_t prev_count = sketch_.estimate(prev); - done = (prev_count > count) ? true : atomic::strong::cas(&heavy_hitters_[idx], &prev, key); - } - } - - /** - * L_2^2 += (c_i + 1)^2 - (c_i)^2 - * @param old_count estimate of a count before an update - */ - static inline counter_t l2_squared_update(counter_t old_count) { - return 2 * old_count + 1; - } - - double hh_threshold_; // heavy hitter threshold - size_t num_hh_; // number of heavy hitters to track (k) - - atomic_counter_t l2_squared_; // L2 norm squared - sketch_t sketch_; - atomic_vector_t heavy_hitters_; - heavy_hitter_set hhs_precise_; - pairwise_indep_hash hh_hash_; - - bool use_precise_hh_; - -}; - template class universal_sketch { diff --git a/libconfluo/src/container/sketch/hash_manager.cc b/libconfluo/src/container/sketch/hash_manager.cc index 059bbb4b9..6029ecd35 100644 --- a/libconfluo/src/container/sketch/hash_manager.cc +++ b/libconfluo/src/container/sketch/hash_manager.cc @@ -14,7 +14,6 @@ pairwise_indep_hash::pairwise_indep_hash(size_t a, size_t b) b_(b) { } - pairwise_indep_hash pairwise_indep_hash::generate_random() { return { utils::rand_utils::rand_uint64(PRIME), utils::rand_utils::rand_uint64(PRIME) }; }