From d56e5cab269e49fa3fe894f2f25a1c270634b1b4 Mon Sep 17 00:00:00 2001 From: Ujval Misra Date: Tue, 17 Jul 2018 10:06:13 -0700 Subject: [PATCH] Universal Sketch working. --- .../confluo/container/sketch/count_sketch.h | 1 + .../confluo/container/sketch/sketch_utils.h | 6 +- ...universal_monitor.h => universal_sketch.h} | 167 ++++++++++-------- .../container/sketch/count_min_sketch_test.h | 129 -------------- .../test/container/sketch/count_sketch_test.h | 105 +++++++++++ 5 files changed, 207 insertions(+), 201 deletions(-) rename libconfluo/confluo/container/sketch/{universal_monitor.h => universal_sketch.h} (53%) delete mode 100644 libconfluo/test/container/sketch/count_min_sketch_test.h create mode 100644 libconfluo/test/container/sketch/count_sketch_test.h diff --git a/libconfluo/confluo/container/sketch/count_sketch.h b/libconfluo/confluo/container/sketch/count_sketch.h index ae8259889..5acad2311 100644 --- a/libconfluo/confluo/container/sketch/count_sketch.h +++ b/libconfluo/confluo/container/sketch/count_sketch.h @@ -23,6 +23,7 @@ class count_sketch { typedef atomic::type atomic_counter_t; // TODO defaults + count_sketch() = default; /** * Constructor. diff --git a/libconfluo/confluo/container/sketch/sketch_utils.h b/libconfluo/confluo/container/sketch/sketch_utils.h index bad824a03..14e063ba0 100644 --- a/libconfluo/confluo/container/sketch/sketch_utils.h +++ b/libconfluo/confluo/container/sketch/sketch_utils.h @@ -12,9 +12,9 @@ namespace sketch { #define ELEM_SWAP(a,b) { T t=(a); (a)=(b); (b)=t; } template -static T median(std::vector data) { - int k = (data.size() & 1) ? (data.size() / 2) : (data.size() / 2) - 1; - int i, j, l, m; +static T median(std::vector& data) { + size_t k = (data.size() & 1) ? (data.size() / 2) : (data.size() / 2) - 1; + size_t i, j, l, m; T x; l = 0; m = data.size() - 1; diff --git a/libconfluo/confluo/container/sketch/universal_monitor.h b/libconfluo/confluo/container/sketch/universal_sketch.h similarity index 53% rename from libconfluo/confluo/container/sketch/universal_monitor.h rename to libconfluo/confluo/container/sketch/universal_sketch.h index 08e1effaa..a0e5a68a3 100644 --- a/libconfluo/confluo/container/sketch/universal_monitor.h +++ b/libconfluo/confluo/container/sketch/universal_sketch.h @@ -3,9 +3,9 @@ #include +#include "atomic.h" #include "count_sketch.h" #include "hash_manager.h" -#include "container/monolog/monolog_exp2_linear.h" namespace confluo { namespace sketch { @@ -20,20 +20,26 @@ class substream_summary { substream_summary() = default; - substream_summary(size_t num_estimates, size_t num_buckets, size_t num_heavy_hitters, - double hh_threshold, atomic_counter_t* l2_squared) - : hh_threshold_(hh_threshold), - num_hh_(num_heavy_hitters), - l2_squared_(l2_squared), - sketch_(num_estimates, num_buckets), - heavy_hitters_(num_heavy_hitters), + /** + * Constructor + * @param t depth (number of estimates) + * @param b width (number of buckets) + * @param k number of heavy hitters to track + * @param a heavy hitter threshold + */ + substream_summary(size_t t, size_t b, size_t k, double a) + : hh_threshold_(a), + num_hh_(k), + l2_squared_(), + sketch_(t, b), + heavy_hitters_(k), hh_hash_(pairwise_indep_hash::generate_random()) { } substream_summary(const substream_summary& other) : hh_threshold_(other.hh_threshold_), num_hh_(other.num_hh_), - l2_squared_(other.l2_squared_), + l2_squared_(atomic::load(&other.l2_squared_)), sketch_(other.sketch_), heavy_hitters_(other.heavy_hitters_.size()), hh_hash_(other.hh_hash_) { @@ -45,7 +51,7 @@ class substream_summary { substream_summary& operator=(const substream_summary& other) { hh_threshold_ = other.hh_threshold_; num_hh_ = other.num_hh_; - l2_squared_ = other.l2_squared_; // atomic load?? + l2_squared_ = atomic::load(&other.l2_squared_); sketch_ = other.sketch_; heavy_hitters_ = heavy_hitters_set(other.heavy_hitters_.size()); hh_hash_ = other.hh_hash_; @@ -58,7 +64,7 @@ class substream_summary { void update(T key) { counter_t old_count = sketch_.update_and_estimate(key); counter_t update = l2_squared_update(old_count); - counter_t old_l2_sq = atomic::faa(l2_squared_, update); + counter_t old_l2_sq = atomic::faa(&l2_squared_, update); double new_l2 = std::sqrt(old_l2_sq + update); this->update_heavy_hitters(key, old_count + 1, new_l2); } @@ -71,23 +77,30 @@ class substream_summary { return sketch_; } + /** + * @return size of data structure in bytes + */ + size_t storage_size() { + size_t total_size = 0; + total_size += sketch_.storage_size(); + total_size += heavy_hitters_.size(); + return total_size; + } + private: + void update_heavy_hitters(T key, counter_t count, double l2) { if (count < hh_threshold_ * l2) { return; } - bool updated = false; - while (!updated) { + bool done = false; + while (!done) { size_t idx = hh_hash_.apply(key) % heavy_hitters_.size(); T prev = atomic::load(&heavy_hitters_[idx]); if (prev == key) return; counter_t prev_count = sketch_.estimate(prev); - if (prev_count <= count) { - updated = atomic::strong::cas(&heavy_hitters_[idx], &prev, key); - } - else - updated = true; + done = (prev_count > count) ? true : atomic::strong::cas(&heavy_hitters_[idx], &prev, key); } } @@ -102,7 +115,7 @@ class substream_summary { double hh_threshold_; // heavy hitter threshold size_t num_hh_; // number of heavy hitters to track - atomic_counter_t* l2_squared_; // L2 norm squared + atomic_counter_t l2_squared_; // L2 norm squared sketch sketch_; heavy_hitters_set heavy_hitters_; pairwise_indep_hash hh_hash_; @@ -110,43 +123,50 @@ class substream_summary { }; template -class universal_monitor { +class universal_sketch { public: - typedef atomic::type atomic_counter_t; typedef std::vector> heavy_hitters_set; template using g_fn = std::function; - - universal_monitor(size_t num_estimates, size_t num_buckets, double hh_threshold, size_t num_heavy_hitters, - hash_manager& monitor_hash_manager) - : universal_monitor(8 * sizeof(T), num_estimates, num_buckets, hh_threshold, num_heavy_hitters, - monitor_hash_manager) { + /** + * Constructor + * @param t count-sketch depth (number of estimates) + * @param b count-sketch width (number of buckets) + * @param k number of heavy hitters to track per layer + * @param a heavy hitter threshold + * @param layer_hashes hash manager for layers + */ + universal_sketch(size_t t, size_t b, size_t k, double a, hash_manager& layer_hashes) + : universal_sketch(8 * sizeof(T), t, b, a, k, layer_hashes) { } - universal_monitor(size_t num_substreams, size_t num_estimates, size_t num_buckets, - double hh_threshold, size_t num_heavy_hitters, - hash_manager& monitor_hash_manager) - : l2_squared_(0), - substream_summaries_(num_substreams), - hash_manager_(monitor_hash_manager) { - hash_manager_.guarantee_initialized(num_substreams); - for (size_t i = 0; i < num_substreams - 1; i++) { - substream_summaries_[i] = substream_summary(num_estimates, num_buckets, num_heavy_hitters, - hh_threshold, &l2_squared_); + /** + * Constructor + * @param l number of layers + * @param t count-sketch depth (number of estimates) + * @param b count-sketch width (number of buckets) + * @param k number of heavy hitters to track per layer + * @param a heavy hitter threshold + * @param layer_hashes hash manager for layers + */ + universal_sketch(size_t l, size_t t, size_t b, size_t k, double a, const hash_manager& layer_hashes) + : substream_summaries_(l), + layer_hashes_(layer_hashes) { + layer_hashes_.guarantee_initialized(l - 1); + for (size_t i = 0; i < l; i++) { + substream_summaries_[i] = substream_summary(t, b, k, a); } } - universal_monitor(const universal_monitor& other) - : l2_squared_(atomic::load(&other.l2_squared_)), - substream_summaries_(other.substream_summaries_), - hash_manager_(other.hash_manager_) { + universal_sketch(const universal_sketch& other) + : substream_summaries_(other.substream_summaries_), + layer_hashes_(other.layer_hashes_) { } - universal_monitor& operator=(const universal_monitor& other) { - l2_squared_ = atomic::load(&other.l2_squared_); + universal_sketch& operator=(const universal_sketch& other) { substream_summaries_ = other.substream_summaries_; - hash_manager_ = other.hash_manager_; + layer_hashes_ = other.layer_hashes_; return *this; } @@ -156,65 +176,75 @@ class universal_monitor { */ void update(T key) { substream_summaries_[0].update(key); - for (size_t i = 1; i < substream_summaries_.size() && to_bool(hash_manager_.hash(i - 1, key)); i++) { + for (size_t i = 1; i < substream_summaries_.size() && to_bool(layer_hashes_.hash(i - 1, key)); i++) { substream_summaries_[i].update(key); } } - heavy_hitters_set& get_heavy_hitters() { - return substream_summaries_[0].get_heavy_hitters(); + counter_t estimate_count(T key) { + counter_t est = substream_summaries_[0].estimate(key); + // Refine count using lower layers. + for (size_t i = 1; i < substream_summaries_.size() && to_bool(layer_hashes_.hash(i - 1, key)); i++) { + est = substream_summaries_[i].estimate(key); + } + return est; } template - g_ret_t process_heavy_hitters(g_fn g) { + g_ret_t evaluate(g_fn g) { g_ret_t recursive_sum = 0; // Handle last substream - int substream_i = substream_summaries_.size() - 1; - auto substream_hhs = substream_summaries_[substream_i].get_heavy_hitters(); - auto substream_sketch = substream_summaries_[substream_i].get_sketch(); - for (size_t hh_i = 0; hh_i < substream_hhs.size(); hh_i++) { - T hh = atomic::load(&substream_hhs[hh_i]); + size_t substream_i = substream_summaries_.size() - 1; + + auto& last_substream_hhs = substream_summaries_[substream_i].get_heavy_hitters(); + auto& last_substream_sketch = substream_summaries_[substream_i].get_sketch(); + for (size_t hh_i = 0; hh_i < last_substream_hhs.size(); hh_i++) { + T hh = atomic::load(&last_substream_hhs[hh_i]); + // TODO handle special case if (hh != T()) { - counter_t count = substream_sketch.estimate(hh); + counter_t count = last_substream_sketch.estimate(hh); recursive_sum += g(count); } } - substream_i--; - while (substream_i >= 0) { + while (substream_i-- > 0) { g_ret_t substream_sum = 0; - substream_hhs = substream_summaries_[substream_i].get_heavy_hitters(); - substream_sketch = substream_summaries_[substream_i].get_sketch(); + auto& substream_hhs = substream_summaries_[substream_i].get_heavy_hitters(); + auto& substream_sketch = substream_summaries_[substream_i].get_sketch(); for (size_t hh_i = 0; hh_i < substream_hhs.size(); hh_i++) { T hh = atomic::load(&substream_hhs[hh_i]); + // TODO handle special case if (hh != T()) { counter_t count = substream_sketch.estimate(hh); - g_ret_t update = ((1 - 2 * hash_manager_.hash(substream_i, hh)) * g(count)); + g_ret_t update = ((1 - 2 * (layer_hashes_.hash(substream_i, hh) % 2)) * g(count)); substream_sum += update; } } recursive_sum = 2 * recursive_sum + substream_sum; } - return 0; + return recursive_sum; } /** - * Get the number of substreams. - * @return number of substreams + * @return size of data structure in bytes */ - size_t num_substreams() { - return substream_summaries_.size(); + size_t storage_size() { + size_t total_size = 0; + for (size_t i = 0; i < substream_summaries_.size(); i++) { + total_size += substream_summaries_[i].storage_size(); + } + return total_size; } - static universal_monitor create_parameterized(double gamma, double epsilon, + static universal_sketch create_parameterized(double gamma, double epsilon, double hh_threshold, size_t num_heavy_hitters, - hash_manager& monitor_hash_manager) { - return universal_monitor(count_sketch::perror_to_num_estimates(gamma), + hash_manager& layer_hashes) { + return universal_sketch(count_sketch::perror_to_num_estimates(gamma), count_sketch::error_margin_to_num_buckets(epsilon), - hh_threshold, num_heavy_hitters, monitor_hash_manager); + hh_threshold, num_heavy_hitters, layer_hashes); } private: @@ -222,9 +252,8 @@ class universal_monitor { return bool(hashed_value % 2); } - atomic_counter_t l2_squared_; // L2 norm squared std::vector> substream_summaries_; - hash_manager hash_manager_; + hash_manager layer_hashes_; }; diff --git a/libconfluo/test/container/sketch/count_min_sketch_test.h b/libconfluo/test/container/sketch/count_min_sketch_test.h deleted file mode 100644 index 71416a26c..000000000 --- a/libconfluo/test/container/sketch/count_min_sketch_test.h +++ /dev/null @@ -1,129 +0,0 @@ -#ifndef TEST_CONTAINER_SKETCH_COUNT_MIN_SKETCH_TEST_H_ -#define TEST_CONTAINER_SKETCH_COUNT_MIN_SKETCH_TEST_H_ - -#include "container/sketch/count_min_sketch.h" -#include "container/sketch/universal_monitor.h" -#include "gtest/gtest.h" - -using namespace ::confluo::sketch; - -class CountSketchTest : public testing::Test { - public: - static const int N = 1000; -}; - -const int CountSketchTest::N; - -//TEST_F(CountSketchTest, SimpleHeavyFlowsEstimateTest) { -// -// hash_manager manager; -// size_t A[10][2] = { -// {1, 3543}, -// {2, 7932}, -// {3, 8234}, -// {4, 48}, -// {5, 58}, -// {6, 238}, -// {7, 732}, -// {8, 10038}, -// {9, 78}, -// {327, 78923} -// }; -// -// auto cs1 = count_sketch::create_parameterized(0.1, 0.1, manager); -// for (int i = 0; i < 10; i++) { -// for (int j = 0; j < A[i][1]; j++) { -// cs1.update(A[i][0]); -// } -// } -// for (int i = 0; i < 10; i++) { -// ASSERT_EQ(cs1.estimate(A[i][0]), A[i][1]); -// } -//} -// - -TEST_F(CountSketchTest, EstimateTest) { - hash_manager manager; - auto cs = count_sketch::create_parameterized(0.01, 0.01, manager); - - for (int i = 0; i < N; i++) { - for (int j = 0; j < i; j++) - cs.update(i); - } - - for (int i = N - 1; i >= 0; i--) { - ASSERT_EQ(cs.estimate(i), i); - } -} - -//TEST_F(CountSketchTest, BenchTest) { -// hash_manager monitor_manager; -// hash_manager sketch_manager; -// double hh_thresh = 0.1; -// size_t num_hh = 10; -// universal_monitor univ_mon = universal_monitor::create_parameterized(0.01, 0.01, hh_thresh, num_hh, -// monitor_manager, -// sketch_manager); -// size_t num_ops = 0; -// size_t time = 0; -// for (size_t i = 0; i < N; i++) { -// for (int j = 0; j < i; j++) { -// int64_t start = utils::time_utils::cur_ns(); -// univ_mon.update(i); -// int64_t stop = utils::time_utils::cur_ns(); -// time += (stop - start); -// num_ops++; -// } -// } -// -// LOG_INFO << "Avg Latency: " << time/num_ops << "ns."; -// -//} - -//TEST_F(CountSketchTest, UniversalMonitorTest) { -// hash_manager monitor_manager; -// hash_manager sketch_manager; -// double hh_thresh = 0.1; -// size_t num_hh = 10; -// universal_monitor univ_mon = universal_monitor::create_parameterized(0.01, 0.01, hh_thresh, num_hh, -// monitor_manager, -// sketch_manager); -// -// size_t total = 0; -// int A[11][2] = { -// {0, 0}, -// {1, 3543}, -// {2, 7932}, -// {3, 32234}, -// {4, 48}, -// {5, 58}, -// {6, 238}, -// {7, 732}, -// {8, 10038}, -// {9, 78}, -// {327, 78923} -// }; -// size_t cdf[11] = {0,0,0,0,0,0,0,0,0,0,0}; -// -// for (size_t i = 1; i < 11; i++) { -// total += A[i][1]; -// cdf[i] = total; -// } -// -// for (int i = 0; i < 10000; i++) { -// size_t rand = utils::rand_utils::rand_int64(total); -// for (int k = 1; k < 11; k++) { -// if (rand > cdf[k - 1] && rand < cdf[k]) { -// univ_mon.update(A[k][0]); -// } -// } -// } -// -// std::vector>& hhs = univ_mon.get_heavy_hitters(); -// for (int i = 0; i < hhs.size(); i++) { -// LOG_INFO << atomic::load(&hhs[i]); -// } -// -//} - -#endif /* TEST_CONTAINER_SKETCH_COUNT_MIN_SKETCH_TEST_H_ */ diff --git a/libconfluo/test/container/sketch/count_sketch_test.h b/libconfluo/test/container/sketch/count_sketch_test.h new file mode 100644 index 000000000..78944e240 --- /dev/null +++ b/libconfluo/test/container/sketch/count_sketch_test.h @@ -0,0 +1,105 @@ +#ifndef TEST_CONTAINER_SKETCH_COUNT_MIN_SKETCH_TEST_H_ +#define TEST_CONTAINER_SKETCH_COUNT_MIN_SKETCH_TEST_H_ + +#include + +#include "/Users/Ujval/dev/research/univmon_extension/simulator/V0.3/countSketch.h" +#include "container/sketch/count_sketch.h" +#include "container/sketch/universal_monitor.h" +#include "gtest/gtest.h" + +using namespace ::confluo::sketch; + +class CountSketchTest : public testing::Test { + public: + static const int N = 1000; +}; + +const int CountSketchTest::N; + +TEST_F(CountSketchTest, EstimateAccuracyTest) { + double epsilon = 0.0; + + std::ofstream out("sketch_error.out"); + + std::random_device rd; + std::mt19937 e2(rd()); + std::normal_distribution dist(0, 1000); + std::map hist; + for (int n = 0; n < 10000; n++) { + hist[std::abs(std::round(dist(e2)))]++; + } + + size_t num_rows[] = { 4, 8, 16, 32 }; + size_t counters_per_row[] = { 8000 }; + + for (int i = 0; i < 4; i++) { + for (int j = 0; j < 1; j++) { + + count_sketch cs(num_rows[i], counters_per_row[j]); + CountSketch alan_cs(counters_per_row[j], num_rows[i], 10000); + + for (auto p : hist) { + for (int i = 0; i < p.second; i++) { + cs.update(p.first); + alan_cs.add(p.first); + } + } + + double num_measurements = 0; + + double violator_count = 0; + std::vector errors; + std::vector alan_errors; + + size_t negative_count = 0; + for (auto p : hist) { + int64_t actual = p.second; + num_measurements++; + + // My estimate + int64_t est = cs.estimate(p.first); + double diff = std::abs(est - actual); + double error = diff/actual; + errors.push_back(error); + + //if (error > epsilon) { + // violator_count++; + //} + if (est < 0) { + negative_count++; + } + + // Alan estimate + int alan_est = alan_cs.estimate(p.first); + double alan_diff = std::abs(alan_est - actual); + double alan_error = alan_diff/actual; + alan_errors.push_back(alan_error); + + } + + // Compute overall stats + std::sort(errors.begin(), errors.end()); + double average = std::accumulate(errors.begin(), errors.end(), 0.0)/errors.size(); + + std::sort(alan_errors.begin(), alan_errors.end()); + double alan_average = std::accumulate(alan_errors.begin(), alan_errors.end(), 0.0)/alan_errors.size(); + + // Print + LOG_INFO << "Dimensions: " << num_rows[i] << " x " << counters_per_row[j]; + LOG_INFO << "Sketch size: " << cs.storage_size() << "B"; + LOG_INFO << "Negative rate: " << negative_count/num_measurements; + LOG_INFO << "Median error: " << errors[int(errors.size()/2)]; + // LOG_INFO << "Median error (alan): " << alan_errors[int(alan_errors.size()/2)] << "\n"; + + //LOG_INFO<< "Incorrectness rate: " << violator_count/num_measurements; + //LOG_INFO << "Mean error: " << average << "\n"; + //std::cout << "Failure rate: " << violator_count/num_measurements << "\n"; + //summary_out << num_rows[i] << " " << counters_per_row[j] << " " << violator_count/num_measurements << + // " " << errors[int(errors.size()/2)] << "\n"; + + } + } +} + +#endif /* TEST_CONTAINER_SKETCH_COUNT_MIN_SKETCH_TEST_H_ */