Skip to content

Commit

Permalink
New concurrent insertable vector
Browse files Browse the repository at this point in the history
  • Loading branch information
hkadayam committed Oct 26, 2023
1 parent 91a6835 commit 815f022
Show file tree
Hide file tree
Showing 5 changed files with 372 additions and 1 deletion.
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

class SISLConan(ConanFile):
name = "sisl"
version = "10.2.2"
version = "10.2.3"

homepage = "https://github.com/eBay/sisl"
description = "Library for fast data structures, utilities"
Expand Down
122 changes: 122 additions & 0 deletions include/sisl/fds/concurrent_insert_vector.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*********************************************************************************
* Modifications Copyright 2017-2019 eBay Inc.
*
* Author/Developer(s): Harihara Kadayam
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
*********************************************************************************/
#pragma once

#include <cstdint>
#include <vector>

#include <sisl/utility/thread_buffer.hpp>

namespace sisl {

//
// This data structure provides a vector where concurrent threads can safely emplace or push back the data into.
// However, it does not guarantee any access or iterations happen during the insertion. It is the responsibility of the
// user to synchornoize this behavior. This data structure is useful when the user wants to insert data into a vector
// concurrently in a fast manner and then iterate over the data later. If the user wants a vector implementation which
// reads concurrently with writer, they can use sisl::ThreadVector. This data structure is provided as a replacement for
// simplistic cases where insertion and iteration never happen concurrently. As a result it provides better performance
// than even sisl::ThreadVector and better debuggability.
//
// Benchmark shows atleast 10x better performance on more than 4 threads concurrently inserting with mutex.
//
template < typename T >
class ConcurrentInsertVector {
private:
ExitSafeThreadBuffer< std::vector< T >, size_t > tvector_;
std::vector< std::vector< T > const* > per_thread_vec_ptrs_;

public:
struct iterator {
size_t next_thread{0};
size_t next_id_in_thread{0};
ConcurrentInsertVector const& vec;

iterator(ConcurrentInsertVector const& v) : vec{v} {}
iterator(ConcurrentInsertVector const& v, bool end_iterator) : vec{v} {
if (end_iterator) { next_thread = vec.per_thread_vec_ptrs_.size(); }
}

void operator++() {
++next_id_in_thread;
if (next_id_in_thread >= vec.per_thread_vec_ptrs_[next_thread]->size()) {
++next_thread;
next_id_in_thread = 0;
}
}

bool operator==(iterator const& other) const {
return (next_thread == other.next_thread) && (next_id_in_thread == other.next_id_in_thread);
}

bool operator!=(iterator const& other) const { return !(*this == other); }

T const& operator*() const { return vec.per_thread_vec_ptrs_[next_thread]->at(next_id_in_thread); }
T const* operator->() const { return &(vec.per_thread_vec_ptrs_[next_thread]->at(next_id_in_thread)); }
};

ConcurrentInsertVector() = default;
ConcurrentInsertVector(size_t size) : tvector_{size} {}
ConcurrentInsertVector(const ConcurrentInsertVector&) = delete;
ConcurrentInsertVector(ConcurrentInsertVector&&) noexcept = delete;
ConcurrentInsertVector& operator=(const ConcurrentInsertVector&) = delete;
ConcurrentInsertVector& operator=(ConcurrentInsertVector&&) noexcept = delete;
~ConcurrentInsertVector() = default;

template < typename InputType,
typename = typename std::enable_if<
std::is_convertible< typename std::decay< InputType >::type, T >::value >::type >
void push_back(InputType&& ele) {
tvector_.get()->push_back(std::forward< InputType >(ele));
}

template < class... Args >
void emplace_back(Args&&... args) {
tvector_.get()->push_back(std::forward< Args >(args)...);
}

iterator begin() {
tvector_.access_all_threads([this](std::vector< T > const* tvec, bool, bool) {
if (tvec) { per_thread_vec_ptrs_.push_back(tvec); }
return false;
});
return iterator{*this};
}

iterator end() { return iterator{*this, true /* end_iterator */}; }

void foreach_entry(auto&& cb) {
tvector_.access_all_threads([this, &cb](std::vector< T > const* tvec, bool, bool) {
if (tvec) {
for (auto const& e : *tvec) {
cb(e);
}
}
return false;
});
}

size_t size() {
size_t sz{0};
tvector_.access_all_threads([this, &sz](std::vector< T > const* tvec, bool, bool) {
if (tvec) { sz += tvec->size; }
});
return sz;
}
};

} // namespace sisl
14 changes: 14 additions & 0 deletions src/fds/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,20 @@ if (DEFINED ENABLE_TESTING)
target_link_libraries(test_compact_bitset sisl ${COMMON_DEPS} GTest::gtest)
add_test(NAME CompactBitset COMMAND test_compact_bitset)

add_executable(test_concurrent_insert_vector)
target_sources(test_concurrent_insert_vector PRIVATE
tests/test_concurrent_insert_vector.cpp
)
target_link_libraries(test_concurrent_insert_vector sisl ${COMMON_DEPS} GTest::gtest)
add_test(NAME ConcurrentInsertVector COMMAND test_concurrent_insert_vector)

add_executable(concurrent_insert_vector_bench)
target_sources(concurrent_insert_vector_bench PRIVATE
tests/concurrent_insert_vector_bench.cpp
)
target_link_libraries(concurrent_insert_vector_bench sisl ${COMMON_DEPS} benchmark::benchmark)
add_test(NAME ConcurrentVectorBench COMMAND concurrent_insert_vector_bench)

add_executable(obj_allocator_benchmark)
target_sources(obj_allocator_benchmark PRIVATE
tests/obj_allocator_benchmark.cpp
Expand Down
118 changes: 118 additions & 0 deletions src/fds/tests/concurrent_insert_vector_bench.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*********************************************************************************
* Modifications Copyright 2017-2019 eBay Inc.
*
* Author/Developer(s): Harihara Kadayam
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
*********************************************************************************/
#include <cstdint>
#include <memory>
#include <mutex>
#include <vector>

#include <benchmark/benchmark.h>
#include <sisl/logging/logging.h>
#include <sisl/fds/concurrent_insert_vector.hpp>

using namespace sisl;

static constexpr uint32_t NUM_THREADS = 1;
std::unique_ptr< std::vector< uint64_t > > glob_lock_vector;
std::mutex glob_vector_mutex;

std::unique_ptr< sisl::ConcurrentInsertVector< uint64_t > > glob_cvec;

/*void setup() {
auto const nentries = SISL_OPTIONS["num_entries"].as< uint32_t >();
auto const per_thread_count = nentries / nthreads;
glob_lock_vector = std::make_unique< std::vector< uint64_t > >();
glob_lock_vector->reserve(SISL_OPTIONS["num_entries"].as< uint32_t >());
glob_cvec = std::make_unique< sisl::ConcurrentInsertVector< uint64_t > >((size_t)per_thread_count);
} */

void test_locked_vector_insert(benchmark::State& state) {
// auto const per_thread_count = nentries / state.threads();

LOGINFO("Running on {} iterations in {} threads", state.iterations(), state.threads());
std::cout << "Running on iterations=" << state.iterations() << " in threads=" << state.threads() << "\n";
glob_lock_vector = std::make_unique< std::vector< uint64_t > >();
glob_lock_vector->reserve(state.iterations());

uint64_t i{0};
for (auto s : state) { // Loops upto iteration count
std::lock_guard< std::mutex > lg(glob_vector_mutex);
glob_lock_vector->emplace_back(++i);
}
}

void test_concurrent_vector_insert(benchmark::State& state) {
std::cout << "Running on iterations=" << state.iterations() << " in threads=" << state.threads() << "\n";
auto const per_thread_count = state.iterations() / state.threads();
glob_cvec = std::make_unique< sisl::ConcurrentInsertVector< uint64_t > >((size_t)per_thread_count);

uint64_t i{0};
for (auto s : state) { // Loops upto iteration count
glob_cvec->emplace_back(++i);
}
}

#if 0
void test_wisr_vector_insert(benchmark::State& state) {
for (auto s : state) {
for (size_t i{0}; i < NENTRIES_PER_THREAD; ++i) {
glob_wisr_vector->emplace_back(i);
}
}
}

void test_locked_vector_read(benchmark::State& state) {
uint64_t ret;
for (auto s : state) { // Loops upto iteration count
std::lock_guard< std::mutex > lg(glob_vector_mutex);
for (auto& v : *glob_lock_vector) {
benchmark::DoNotOptimize(ret = v * 2);
}
}
}

void test_wisr_vector_read(benchmark::State& state) {
uint64_t ret;
for (auto s : state) { // Loops upto iteration count
auto vec = glob_wisr_vector->get_copy_and_reset();
for (auto& v : *vec) {
benchmark::DoNotOptimize(ret = v * 2);
}
}
}
#endif

BENCHMARK(test_locked_vector_insert)->Threads(NUM_THREADS);
BENCHMARK(test_concurrent_vector_insert)->Threads(NUM_THREADS);

#if 0
BENCHMARK(test_wisr_vector_insert)
->Iterations(SISL_OPTIONS["num_iterations"].as< uint32_t >())
->Threads(SISL_OPTIONS["num_threads"].as< uint32_t >());
BENCHMARK(test_locked_vector_insert)->Iterations(SISL_OPTIONS["num_iterations"].as< uint32_t >())->Threads(1);
BENCHMARK(test_wisr_vector_insert)->Iterations(SISL_OPTIONS["num_iterations"].as< uint32_t >())->Threads(1);
BENCHMARK(test_locked_vector_read)->Iterations(SISL_OPTIONS["num_iterations"].as< uint32_t >())->Threads(1);
BENCHMARK(test_wisr_vector_read)->Iterations(SISL_OPTIONS["num_iterations"].as< uint32_t >())->Threads(1);
#endif

int main(int argc, char** argv) {
int parsed_argc{argc};
::benchmark::Initialize(&parsed_argc, argv);

// setup();
::benchmark::RunSpecifiedBenchmarks();
}
117 changes: 117 additions & 0 deletions src/fds/tests/test_concurrent_insert_vector.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*********************************************************************************
* Modifications Copyright 2017-2019 eBay Inc.
*
* Author/Developer(s): Harihara Kadayam
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
*********************************************************************************/
#include <cstdint>
#include <iostream>
#include <boost/dynamic_bitset.hpp>
#include <random>

#include <sisl/logging/logging.h>
#include <sisl/options/options.h>

#include <gtest/gtest.h>

#include <sisl/fds/concurrent_insert_vector.hpp>
#include <sisl/fds/bitset.hpp>

using namespace sisl;

SISL_OPTIONS_ENABLE(logging, test_concurrent_insert_vector)

class ConcurrentInsertVectorTest : public testing::Test {
protected:
ConcurrentInsertVector< uint32_t > m_cvec;
std::vector< std::thread > m_threads;

public:
ConcurrentInsertVectorTest() : testing::Test() {}
// m_cvec{ConcurrentInsertVector< uint32_t >{s_cast< size_t >(SISL_OPTIONS["num_entries"].as< uint32_t
// >())}} {

ConcurrentInsertVectorTest(const ConcurrentInsertVectorTest&) = delete;
ConcurrentInsertVectorTest(ConcurrentInsertVectorTest&&) noexcept = delete;
ConcurrentInsertVectorTest& operator=(const ConcurrentInsertVectorTest&) = delete;
ConcurrentInsertVectorTest& operator=(ConcurrentInsertVectorTest&&) noexcept = delete;
virtual ~ConcurrentInsertVectorTest() override = default;

protected:
void insert_and_wait() {
auto const nthreads = SISL_OPTIONS["num_threads"].as< uint32_t >();
auto const per_thread_count = SISL_OPTIONS["num_entries"].as< uint32_t >() / nthreads;
for (size_t i{0}; i < nthreads; ++i) {
m_threads.emplace_back(
[this](uint32_t start, uint32_t count) {
for (uint32_t i{0}; i < count; ++i) {
m_cvec.push_back(start + i);
}
},
i * per_thread_count, per_thread_count);
}

for (auto& thr : m_threads) {
thr.join();
}
}

void validate_all() {
sisl::Bitset bset{SISL_OPTIONS["num_entries"].as< uint32_t >()};
m_cvec.foreach_entry([&bset](uint32_t const& e) { bset.set_bit(e); });
ASSERT_EQ(bset.get_next_reset_bit(0), sisl::Bitset::npos) << "Access didn't receive all entries";
}

void validate_all_by_iteration() {
sisl::Bitset bset{SISL_OPTIONS["num_entries"].as< uint32_t >()};
for (const auto& e : m_cvec) {
bset.set_bit(e);
}
ASSERT_EQ(bset.get_next_reset_bit(0), sisl::Bitset::npos) << "Access didn't receive all entries";
}
};

TEST_F(ConcurrentInsertVectorTest, concurrent_insertion) {
LOGINFO("Step1: Inserting {} entries in parallel in {} threads and wait",
SISL_OPTIONS["num_entries"].as< uint32_t >(), SISL_OPTIONS["num_threads"].as< uint32_t >());
insert_and_wait();

LOGINFO("Step2: Validating all entries are inserted");
validate_all();

LOGINFO("Step3: Validating all entries again to ensure it is readable multipled times");
validate_all();

LOGINFO("Step4: Validating all entries by iterator");
validate_all_by_iteration();

LOGINFO("Step5: Validating all entries again by iterator to ensure it is readable multipled times");
validate_all_by_iteration();
}

SISL_OPTION_GROUP(test_concurrent_insert_vector,
(num_entries, "", "num_entries", "num_entries",
::cxxopts::value< uint32_t >()->default_value("10000"), "number"),
(num_threads, "", "num_threads", "num_threads", ::cxxopts::value< uint32_t >()->default_value("8"),
"number"))

int main(int argc, char* argv[]) {
int parsed_argc{argc};
::testing::InitGoogleTest(&parsed_argc, argv);
SISL_OPTIONS_LOAD(parsed_argc, argv, logging, test_concurrent_insert_vector);

sisl::logging::SetLogger("test_concurrent_insert_vector");
spdlog::set_pattern("[%D %T%z] [%^%l%$] [%n] [%t] %v");

return RUN_ALL_TESTS();
}

0 comments on commit 815f022

Please sign in to comment.