Skip to content

Commit

Permalink
Universal Sketch working.
Browse files Browse the repository at this point in the history
  • Loading branch information
ujvl committed Jul 17, 2018
1 parent 2996b33 commit d56e5ca
Show file tree
Hide file tree
Showing 5 changed files with 207 additions and 201 deletions.
1 change: 1 addition & 0 deletions libconfluo/confluo/container/sketch/count_sketch.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class count_sketch {
typedef atomic::type<counter_t> atomic_counter_t;

// TODO defaults
count_sketch() = default;

/**
* Constructor.
Expand Down
6 changes: 3 additions & 3 deletions libconfluo/confluo/container/sketch/sketch_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ namespace sketch {
#define ELEM_SWAP(a,b) { T t=(a); (a)=(b); (b)=t; }

template<typename T>
static T median(std::vector<T> data) {
int k = (data.size() & 1) ? (data.size() / 2) : (data.size() / 2) - 1;
int i, j, l, m;
static T median(std::vector<T>& data) {
size_t k = (data.size() & 1) ? (data.size() / 2) : (data.size() / 2) - 1;
size_t i, j, l, m;
T x;
l = 0;
m = data.size() - 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@

#include <vector>

#include "atomic.h"
#include "count_sketch.h"
#include "hash_manager.h"
#include "container/monolog/monolog_exp2_linear.h"

namespace confluo {
namespace sketch {
Expand All @@ -20,20 +20,26 @@ class substream_summary {

substream_summary() = default;

substream_summary(size_t num_estimates, size_t num_buckets, size_t num_heavy_hitters,
double hh_threshold, atomic_counter_t* l2_squared)
: hh_threshold_(hh_threshold),
num_hh_(num_heavy_hitters),
l2_squared_(l2_squared),
sketch_(num_estimates, num_buckets),
heavy_hitters_(num_heavy_hitters),
/**
* Constructor
* @param t depth (number of estimates)
* @param b width (number of buckets)
* @param k number of heavy hitters to track
* @param a heavy hitter threshold
*/
substream_summary(size_t t, size_t b, size_t k, double a)
: hh_threshold_(a),
num_hh_(k),
l2_squared_(),
sketch_(t, b),
heavy_hitters_(k),
hh_hash_(pairwise_indep_hash<T>::generate_random()) {
}

substream_summary(const substream_summary& other)
: hh_threshold_(other.hh_threshold_),
num_hh_(other.num_hh_),
l2_squared_(other.l2_squared_),
l2_squared_(atomic::load(&other.l2_squared_)),
sketch_(other.sketch_),
heavy_hitters_(other.heavy_hitters_.size()),
hh_hash_(other.hh_hash_) {
Expand All @@ -45,7 +51,7 @@ class substream_summary {
substream_summary& operator=(const substream_summary& other) {
hh_threshold_ = other.hh_threshold_;
num_hh_ = other.num_hh_;
l2_squared_ = other.l2_squared_; // atomic load??
l2_squared_ = atomic::load(&other.l2_squared_);
sketch_ = other.sketch_;
heavy_hitters_ = heavy_hitters_set(other.heavy_hitters_.size());
hh_hash_ = other.hh_hash_;
Expand All @@ -58,7 +64,7 @@ class substream_summary {
void update(T key) {
counter_t old_count = sketch_.update_and_estimate(key);
counter_t update = l2_squared_update(old_count);
counter_t old_l2_sq = atomic::faa(l2_squared_, update);
counter_t old_l2_sq = atomic::faa(&l2_squared_, update);
double new_l2 = std::sqrt(old_l2_sq + update);
this->update_heavy_hitters(key, old_count + 1, new_l2);
}
Expand All @@ -71,23 +77,30 @@ class substream_summary {
return sketch_;
}

/**
* @return size of data structure in bytes
*/
size_t storage_size() {
size_t total_size = 0;
total_size += sketch_.storage_size();
total_size += heavy_hitters_.size();
return total_size;
}

private:

void update_heavy_hitters(T key, counter_t count, double l2) {
if (count < hh_threshold_ * l2) {
return;
}
bool updated = false;
while (!updated) {
bool done = false;
while (!done) {
size_t idx = hh_hash_.apply(key) % heavy_hitters_.size();
T prev = atomic::load(&heavy_hitters_[idx]);
if (prev == key)
return;
counter_t prev_count = sketch_.estimate(prev);
if (prev_count <= count) {
updated = atomic::strong::cas(&heavy_hitters_[idx], &prev, key);
}
else
updated = true;
done = (prev_count > count) ? true : atomic::strong::cas(&heavy_hitters_[idx], &prev, key);
}
}

Expand All @@ -102,51 +115,58 @@ class substream_summary {
double hh_threshold_; // heavy hitter threshold
size_t num_hh_; // number of heavy hitters to track

atomic_counter_t* l2_squared_; // L2 norm squared
atomic_counter_t l2_squared_; // L2 norm squared
sketch sketch_;
heavy_hitters_set heavy_hitters_;
pairwise_indep_hash<T> hh_hash_;

};

template<typename T, typename counter_t = int64_t>
class universal_monitor {
class universal_sketch {

public:
typedef atomic::type<counter_t> atomic_counter_t;
typedef std::vector<atomic::type<T>> heavy_hitters_set;
template<typename g_ret_t> using g_fn = std::function<g_ret_t(counter_t)>;


universal_monitor(size_t num_estimates, size_t num_buckets, double hh_threshold, size_t num_heavy_hitters,
hash_manager<T>& monitor_hash_manager)
: universal_monitor(8 * sizeof(T), num_estimates, num_buckets, hh_threshold, num_heavy_hitters,
monitor_hash_manager) {
/**
* Constructor
* @param t count-sketch depth (number of estimates)
* @param b count-sketch width (number of buckets)
* @param k number of heavy hitters to track per layer
* @param a heavy hitter threshold
* @param layer_hashes hash manager for layers
*/
universal_sketch(size_t t, size_t b, size_t k, double a, hash_manager<T>& layer_hashes)
: universal_sketch(8 * sizeof(T), t, b, a, k, layer_hashes) {
}

universal_monitor(size_t num_substreams, size_t num_estimates, size_t num_buckets,
double hh_threshold, size_t num_heavy_hitters,
hash_manager<T>& monitor_hash_manager)
: l2_squared_(0),
substream_summaries_(num_substreams),
hash_manager_(monitor_hash_manager) {
hash_manager_.guarantee_initialized(num_substreams);
for (size_t i = 0; i < num_substreams - 1; i++) {
substream_summaries_[i] = substream_summary<T, counter_t>(num_estimates, num_buckets, num_heavy_hitters,
hh_threshold, &l2_squared_);
/**
* Constructor
* @param l number of layers
* @param t count-sketch depth (number of estimates)
* @param b count-sketch width (number of buckets)
* @param k number of heavy hitters to track per layer
* @param a heavy hitter threshold
* @param layer_hashes hash manager for layers
*/
universal_sketch(size_t l, size_t t, size_t b, size_t k, double a, const hash_manager<T>& layer_hashes)
: substream_summaries_(l),
layer_hashes_(layer_hashes) {
layer_hashes_.guarantee_initialized(l - 1);
for (size_t i = 0; i < l; i++) {
substream_summaries_[i] = substream_summary<T, counter_t>(t, b, k, a);
}
}

universal_monitor(const universal_monitor& other)
: l2_squared_(atomic::load(&other.l2_squared_)),
substream_summaries_(other.substream_summaries_),
hash_manager_(other.hash_manager_) {
universal_sketch(const universal_sketch& other)
: substream_summaries_(other.substream_summaries_),
layer_hashes_(other.layer_hashes_) {
}

universal_monitor& operator=(const universal_monitor& other) {
l2_squared_ = atomic::load(&other.l2_squared_);
universal_sketch& operator=(const universal_sketch& other) {
substream_summaries_ = other.substream_summaries_;
hash_manager_ = other.hash_manager_;
layer_hashes_ = other.layer_hashes_;
return *this;
}

Expand All @@ -156,75 +176,84 @@ class universal_monitor {
*/
void update(T key) {
substream_summaries_[0].update(key);
for (size_t i = 1; i < substream_summaries_.size() && to_bool(hash_manager_.hash(i - 1, key)); i++) {
for (size_t i = 1; i < substream_summaries_.size() && to_bool(layer_hashes_.hash(i - 1, key)); i++) {
substream_summaries_[i].update(key);
}
}

heavy_hitters_set& get_heavy_hitters() {
return substream_summaries_[0].get_heavy_hitters();
counter_t estimate_count(T key) {
counter_t est = substream_summaries_[0].estimate(key);
// Refine count using lower layers.
for (size_t i = 1; i < substream_summaries_.size() && to_bool(layer_hashes_.hash(i - 1, key)); i++) {
est = substream_summaries_[i].estimate(key);
}
return est;
}

template<typename g_ret_t = counter_t>
g_ret_t process_heavy_hitters(g_fn<g_ret_t> g) {
g_ret_t evaluate(g_fn<g_ret_t> g) {
g_ret_t recursive_sum = 0;

// Handle last substream
int substream_i = substream_summaries_.size() - 1;
auto substream_hhs = substream_summaries_[substream_i].get_heavy_hitters();
auto substream_sketch = substream_summaries_[substream_i].get_sketch();
for (size_t hh_i = 0; hh_i < substream_hhs.size(); hh_i++) {
T hh = atomic::load(&substream_hhs[hh_i]);
size_t substream_i = substream_summaries_.size() - 1;

auto& last_substream_hhs = substream_summaries_[substream_i].get_heavy_hitters();
auto& last_substream_sketch = substream_summaries_[substream_i].get_sketch();
for (size_t hh_i = 0; hh_i < last_substream_hhs.size(); hh_i++) {
T hh = atomic::load(&last_substream_hhs[hh_i]);
// TODO handle special case
if (hh != T()) {
counter_t count = substream_sketch.estimate(hh);
counter_t count = last_substream_sketch.estimate(hh);
recursive_sum += g(count);
}
}

substream_i--;
while (substream_i >= 0) {
while (substream_i-- > 0) {

g_ret_t substream_sum = 0;
substream_hhs = substream_summaries_[substream_i].get_heavy_hitters();
substream_sketch = substream_summaries_[substream_i].get_sketch();
auto& substream_hhs = substream_summaries_[substream_i].get_heavy_hitters();
auto& substream_sketch = substream_summaries_[substream_i].get_sketch();
for (size_t hh_i = 0; hh_i < substream_hhs.size(); hh_i++) {
T hh = atomic::load(&substream_hhs[hh_i]);
// TODO handle special case
if (hh != T()) {
counter_t count = substream_sketch.estimate(hh);
g_ret_t update = ((1 - 2 * hash_manager_.hash(substream_i, hh)) * g(count));
g_ret_t update = ((1 - 2 * (layer_hashes_.hash(substream_i, hh) % 2)) * g(count));
substream_sum += update;
}
}

recursive_sum = 2 * recursive_sum + substream_sum;
}
return 0;
return recursive_sum;
}

/**
* Get the number of substreams.
* @return number of substreams
* @return size of data structure in bytes
*/
size_t num_substreams() {
return substream_summaries_.size();
size_t storage_size() {
size_t total_size = 0;
for (size_t i = 0; i < substream_summaries_.size(); i++) {
total_size += substream_summaries_[i].storage_size();
}
return total_size;
}

static universal_monitor<T, counter_t> create_parameterized(double gamma, double epsilon,
static universal_sketch<T, counter_t> create_parameterized(double gamma, double epsilon,
double hh_threshold, size_t num_heavy_hitters,
hash_manager<T>& monitor_hash_manager) {
return universal_monitor<T, counter_t>(count_sketch<T, counter_t>::perror_to_num_estimates(gamma),
hash_manager<T>& layer_hashes) {
return universal_sketch<T, counter_t>(count_sketch<T, counter_t>::perror_to_num_estimates(gamma),
count_sketch<T, counter_t>::error_margin_to_num_buckets(epsilon),
hh_threshold, num_heavy_hitters, monitor_hash_manager);
hh_threshold, num_heavy_hitters, layer_hashes);
}

private:
static inline bool to_bool(size_t hashed_value) {
return bool(hashed_value % 2);
}

atomic_counter_t l2_squared_; // L2 norm squared
std::vector<substream_summary<T, counter_t>> substream_summaries_;
hash_manager<T> hash_manager_;
hash_manager<T> layer_hashes_;

};

Expand Down
Loading

0 comments on commit d56e5ca

Please sign in to comment.