Skip to content

Commit

Permalink
Separated out code into separate files, used common substream_summary.
Browse files Browse the repository at this point in the history
  • Loading branch information
ujvl committed Jul 24, 2018
1 parent 8b7a084 commit 0bc97dd
Show file tree
Hide file tree
Showing 9 changed files with 255 additions and 405 deletions.
27 changes: 15 additions & 12 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ set(CMAKE_CXX_FLAGS_RELWITHDEBINFO "${CMAKE_CXX_FLAGS_RELWITHDEBINFO} -Wall -ped
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fno-strict-aliasing")
set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -ldl")

set(CMAKE_BUILD_TYPE RelWithDebInfo)
if (NOT CMAKE_BUILD_TYPE)
set(CMAKE_BUILD_TYPE "RelWithDebInfo" CACHE STRING "" FORCE)
endif()
message("lol here it is: ${CMAKE_BUILD_TYPE}")

if ("${CMAKE_SOURCE_DIR}" STREQUAL "${CMAKE_BINARY_DIR}")
message(FATAL_ERROR "In-source builds are not allowed.")
Expand Down Expand Up @@ -69,23 +72,23 @@ add_subdirectory(libconfluo)

if (BUILD_RPC)
# RPC Framework
add_subdirectory(librpc)
#add_subdirectory(librpc)

# Python Client
if (WITH_PY_CLIENT)
add_subdirectory(pyclient)
endif()

# Java Client
if (WITH_JAVA_CLIENT)
add_subdirectory(javaclient)
endif()
# if (WITH_PY_CLIENT)
# add_subdirectory(pyclient)
# endif()
#
# # Java Client
# if (WITH_JAVA_CLIENT)
# add_subdirectory(javaclient)
# endif()
endif()

# Confluo examples
if (BUILD_EXAMPLES)
add_subdirectory(examples/libtimeseries)
add_subdirectory(examples/libstreaming)
# add_subdirectory(examples/libtimeseries)
# add_subdirectory(examples/libstreaming)
endif()

if (BUILD_DOC)
Expand Down
2 changes: 1 addition & 1 deletion build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ set -e

mkdir -p build
cd build
cmake ..
cmake "$@" ..

START=$(date +%s)
make
Expand Down
4 changes: 3 additions & 1 deletion libconfluo/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ add_library(confluo STATIC
confluo/container/sketch/hash_manager.h
confluo/container/sketch/priority_queue.h
confluo/container/sketch/sketch_utils.h
confluo/container/sketch/substream_summary.h
confluo/container/sketch/universal_sketch.h
confluo/schema/field.h
confluo/schema/record.h
Expand Down Expand Up @@ -174,6 +175,7 @@ add_library(confluo STATIC
src/container/cursor/alert_cursor.cc
src/container/cursor/offset_cursors.cc
src/container/cursor/record_cursors.cc
src/container/sketch/hash_manager.cc
src/parser/aggregate_parser.cc
src/parser/expression_compiler.cc
src/parser/expression_parser.cc
Expand Down Expand Up @@ -219,7 +221,7 @@ add_library(confluo STATIC
src/types/raw_data.cc
src/types/numeric.cc
src/types/type_properties.cc
src/types/type_manager.cc)
src/types/type_manager.cc )
target_link_libraries(confluo confluoutils ${CMAKE_THREAD_LIBS_INIT} ${lz4_STATIC_LIB})
add_dependencies(confluo lz4)

Expand Down
181 changes: 5 additions & 176 deletions libconfluo/confluo/container/sketch/confluo_universal_sketch.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,182 +6,11 @@
#include "atomic.h"
#include "count_sketch.h"
#include "hash_manager.h"
#include "priority_queue.h"
#include "substream_summary.h"

namespace confluo {
namespace sketch {

template<typename counter_t = int64_t>
class confluo_substream_summary {

public:
typedef atomic::type<counter_t> atomic_counter_t;
typedef std::vector<atomic::type<size_t>> atomic_vector_t;
typedef count_sketch<counter_t> sketch_t;
typedef heavy_hitter_set<size_t, counter_t> heavy_hitter_set_t;

confluo_substream_summary() = default;

/**
* Constructor
* @param t depth (number of estimates)
* @param b width (number of buckets)
* @param k number of heavy hitters to track
* @param a heavy hitter threshold
* @param precise track exact heavy hitters
*/
confluo_substream_summary(size_t t, size_t b, size_t k, double a, bool precise = true)
: hh_threshold_(a),
num_hh_(k),
l2_squared_(),
sketch_(t, b),
heavy_hitters_(k),
hhs_precise_(),
hh_hash_(pairwise_indep_hash::generate_random()),
use_precise_hh_(precise) {
}

confluo_substream_summary(const confluo_substream_summary& other)
: hh_threshold_(other.hh_threshold_),
num_hh_(other.num_hh_),
l2_squared_(atomic::load(&other.l2_squared_)),
sketch_(other.sketch_),
heavy_hitters_(other.heavy_hitters_.size()),
hhs_precise_(other.hhs_precise_),
hh_hash_(other.hh_hash_),
use_precise_hh_(other.use_precise_hh_) {
for (size_t i = 0; i < other.heavy_hitters_.size(); i++) {
atomic::store(&heavy_hitters_[i], atomic::load(&other.heavy_hitters_[i]));
}
}

confluo_substream_summary& operator=(const confluo_substream_summary& other) {
hh_threshold_ = other.hh_threshold_;
num_hh_ = other.num_hh_;
l2_squared_ = atomic::load(&other.l2_squared_);
sketch_ = other.sketch_;
heavy_hitters_ = atomic_vector_t(other.heavy_hitters_.size());
hhs_precise_ = other.hhs_precise_;
hh_hash_ = other.hh_hash_;
use_precise_hh_ = other.use_precise_hh_;
for (size_t i = 0; i < other.heavy_hitters_.size(); i++) {
atomic::store(&heavy_hitters_[i], atomic::load(&other.heavy_hitters_[i]));
}
return *this;
}

void update(size_t key_hash) {
counter_t old_count = sketch_.update_and_estimate(key_hash);
counter_t update = l2_squared_update(old_count);
counter_t old_l2_sq = atomic::faa(&l2_squared_, update);
double new_l2 = std::sqrt(old_l2_sq + update);
if (use_precise_hh_) {
this->update_hh_pq(key_hash, old_count + 1, new_l2);
} else {
this->update_hh_approx(key_hash, old_count + 1, new_l2);
}
}

/**
* Estimate count
* @param key key
* @return estimated count
*/
counter_t estimate(const byte_string& key) {
return sketch_.estimate(key);
}

/**
* @return sketch
*/
sketch_t& get_sketch() {
return sketch_;
}

atomic_vector_t& get_heavy_hitters() {
return heavy_hitters_;
}

heavy_hitter_set_t& get_pq() {
return hhs_precise_;
}

/**
* @return size of data structure in bytes
*/
size_t storage_size() {
size_t total_size = 0;
total_size += sketch_.storage_size();
total_size += heavy_hitters_.size();
return total_size;
}

private:
/**
* Update heavy hitters priority queue
* @param key_hash key hash
* @param count frequency count
* @param l2 current l2 norm
*/
void update_hh_pq(size_t key_hash, counter_t count, double l2) {
if (count < hh_threshold_ * l2) {
return;
}
if (hhs_precise_.size() < num_hh_) {
hhs_precise_.remove_if_exists(key_hash);
hhs_precise_.pushp(key_hash, count);
} else {
auto head = hhs_precise_.top().key_;
if (sketch_.estimate(head) < count) {
hhs_precise_.pop();
hhs_precise_.remove_if_exists(key_hash);
hhs_precise_.pushp(key_hash, count);
}
}
}

/**
* Update heavy hitters approximate DS
* @param key_hash key hash
* @param count frequency count
* @param l2 current l2 norm
*/
void update_hh_approx(size_t key_hash, counter_t count, double l2) {
if (count < hh_threshold_ * l2) {
return;
}
bool done = false;
while (!done) {
size_t idx = hh_hash_.apply<size_t>(key_hash) % heavy_hitters_.size();
size_t prev_key_hash = atomic::load(&heavy_hitters_[idx]);
if (prev_key_hash == key_hash)
return;
counter_t prev_count = sketch_.estimate(prev_key_hash);
done = (prev_count > count) ? true : atomic::strong::cas(&heavy_hitters_[idx], &prev_key_hash, key_hash);
}
}

/**
* L_2^2 += (c_i + 1)^2 - (c_i)^2
* @param old_count estimate of a count before an update
*/
static inline counter_t l2_squared_update(counter_t old_count) {
return 2 * old_count + 1;
}

double hh_threshold_; // heavy hitter threshold
size_t num_hh_; // number of heavy hitters to track (k)

atomic_counter_t l2_squared_; // L2 norm squared
sketch_t sketch_;
atomic_vector_t heavy_hitters_;
heavy_hitter_set_t hhs_precise_;
pairwise_indep_hash hh_hash_;

bool use_precise_hh_;

};

template<typename counter_t = int64_t>
class confluo_universal_sketch {

Expand Down Expand Up @@ -209,7 +38,7 @@ class confluo_universal_sketch {
is_valid_(true) {
layer_hashes_.guarantee_initialized(l - 1);
for (size_t i = 0; i < l; i++) {
substream_summaries_[i] = confluo_substream_summary<counter_t>(t, b, k, a, precise);
substream_summaries_[i] = substream_summary<size_t, counter_t>(t, b, k, a, precise);
}
}

Expand Down Expand Up @@ -363,11 +192,11 @@ class confluo_universal_sketch {
return hashed_value % 2;
}

std::vector<confluo_substream_summary<counter_t>> substream_summaries_;
std::vector<substream_summary<size_t, counter_t>> substream_summaries_;
hash_manager layer_hashes_;

schema_t schema_;
column_t column_;
schema_t schema_{};
column_t column_{};

bool precise_hh_;
atomic::type<bool> is_valid_;
Expand Down
Loading

0 comments on commit 0bc97dd

Please sign in to comment.