From 815f022f70a2d6407e33e5aa9e885c42a4759470 Mon Sep 17 00:00:00 2001 From: Harihara Kadayam Date: Thu, 26 Oct 2023 13:46:47 -0700 Subject: [PATCH] New concurrent insertable vector --- conanfile.py | 2 +- include/sisl/fds/concurrent_insert_vector.hpp | 122 ++++++++++++++++++ src/fds/CMakeLists.txt | 14 ++ .../tests/concurrent_insert_vector_bench.cpp | 118 +++++++++++++++++ .../tests/test_concurrent_insert_vector.cpp | 117 +++++++++++++++++ 5 files changed, 372 insertions(+), 1 deletion(-) create mode 100644 include/sisl/fds/concurrent_insert_vector.hpp create mode 100644 src/fds/tests/concurrent_insert_vector_bench.cpp create mode 100644 src/fds/tests/test_concurrent_insert_vector.cpp diff --git a/conanfile.py b/conanfile.py index 74b61bc2..aa778835 100644 --- a/conanfile.py +++ b/conanfile.py @@ -8,7 +8,7 @@ class SISLConan(ConanFile): name = "sisl" - version = "10.2.2" + version = "10.2.3" homepage = "https://github.com/eBay/sisl" description = "Library for fast data structures, utilities" diff --git a/include/sisl/fds/concurrent_insert_vector.hpp b/include/sisl/fds/concurrent_insert_vector.hpp new file mode 100644 index 00000000..35a49542 --- /dev/null +++ b/include/sisl/fds/concurrent_insert_vector.hpp @@ -0,0 +1,122 @@ +/********************************************************************************* + * Modifications Copyright 2017-2019 eBay Inc. + * + * Author/Developer(s): Harihara Kadayam + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + *********************************************************************************/ +#pragma once + +#include +#include + +#include + +namespace sisl { + +// +// This data structure provides a vector where concurrent threads can safely emplace or push back the data into. +// However, it does not guarantee any access or iterations happen during the insertion. It is the responsibility of the +// user to synchornoize this behavior. This data structure is useful when the user wants to insert data into a vector +// concurrently in a fast manner and then iterate over the data later. If the user wants a vector implementation which +// reads concurrently with writer, they can use sisl::ThreadVector. This data structure is provided as a replacement for +// simplistic cases where insertion and iteration never happen concurrently. As a result it provides better performance +// than even sisl::ThreadVector and better debuggability. +// +// Benchmark shows atleast 10x better performance on more than 4 threads concurrently inserting with mutex. +// +template < typename T > +class ConcurrentInsertVector { +private: + ExitSafeThreadBuffer< std::vector< T >, size_t > tvector_; + std::vector< std::vector< T > const* > per_thread_vec_ptrs_; + +public: + struct iterator { + size_t next_thread{0}; + size_t next_id_in_thread{0}; + ConcurrentInsertVector const& vec; + + iterator(ConcurrentInsertVector const& v) : vec{v} {} + iterator(ConcurrentInsertVector const& v, bool end_iterator) : vec{v} { + if (end_iterator) { next_thread = vec.per_thread_vec_ptrs_.size(); } + } + + void operator++() { + ++next_id_in_thread; + if (next_id_in_thread >= vec.per_thread_vec_ptrs_[next_thread]->size()) { + ++next_thread; + next_id_in_thread = 0; + } + } + + bool operator==(iterator const& other) const { + return (next_thread == other.next_thread) && (next_id_in_thread == other.next_id_in_thread); + } + + bool operator!=(iterator const& other) const { return !(*this == other); } + + T const& operator*() const { return vec.per_thread_vec_ptrs_[next_thread]->at(next_id_in_thread); } + T const* operator->() const { return &(vec.per_thread_vec_ptrs_[next_thread]->at(next_id_in_thread)); } + }; + + ConcurrentInsertVector() = default; + ConcurrentInsertVector(size_t size) : tvector_{size} {} + ConcurrentInsertVector(const ConcurrentInsertVector&) = delete; + ConcurrentInsertVector(ConcurrentInsertVector&&) noexcept = delete; + ConcurrentInsertVector& operator=(const ConcurrentInsertVector&) = delete; + ConcurrentInsertVector& operator=(ConcurrentInsertVector&&) noexcept = delete; + ~ConcurrentInsertVector() = default; + + template < typename InputType, + typename = typename std::enable_if< + std::is_convertible< typename std::decay< InputType >::type, T >::value >::type > + void push_back(InputType&& ele) { + tvector_.get()->push_back(std::forward< InputType >(ele)); + } + + template < class... Args > + void emplace_back(Args&&... args) { + tvector_.get()->push_back(std::forward< Args >(args)...); + } + + iterator begin() { + tvector_.access_all_threads([this](std::vector< T > const* tvec, bool, bool) { + if (tvec) { per_thread_vec_ptrs_.push_back(tvec); } + return false; + }); + return iterator{*this}; + } + + iterator end() { return iterator{*this, true /* end_iterator */}; } + + void foreach_entry(auto&& cb) { + tvector_.access_all_threads([this, &cb](std::vector< T > const* tvec, bool, bool) { + if (tvec) { + for (auto const& e : *tvec) { + cb(e); + } + } + return false; + }); + } + + size_t size() { + size_t sz{0}; + tvector_.access_all_threads([this, &sz](std::vector< T > const* tvec, bool, bool) { + if (tvec) { sz += tvec->size; } + }); + return sz; + } +}; + +} // namespace sisl diff --git a/src/fds/CMakeLists.txt b/src/fds/CMakeLists.txt index b53fe5cd..f500ece9 100644 --- a/src/fds/CMakeLists.txt +++ b/src/fds/CMakeLists.txt @@ -45,6 +45,20 @@ if (DEFINED ENABLE_TESTING) target_link_libraries(test_compact_bitset sisl ${COMMON_DEPS} GTest::gtest) add_test(NAME CompactBitset COMMAND test_compact_bitset) + add_executable(test_concurrent_insert_vector) + target_sources(test_concurrent_insert_vector PRIVATE + tests/test_concurrent_insert_vector.cpp + ) + target_link_libraries(test_concurrent_insert_vector sisl ${COMMON_DEPS} GTest::gtest) + add_test(NAME ConcurrentInsertVector COMMAND test_concurrent_insert_vector) + + add_executable(concurrent_insert_vector_bench) + target_sources(concurrent_insert_vector_bench PRIVATE + tests/concurrent_insert_vector_bench.cpp + ) + target_link_libraries(concurrent_insert_vector_bench sisl ${COMMON_DEPS} benchmark::benchmark) + add_test(NAME ConcurrentVectorBench COMMAND concurrent_insert_vector_bench) + add_executable(obj_allocator_benchmark) target_sources(obj_allocator_benchmark PRIVATE tests/obj_allocator_benchmark.cpp diff --git a/src/fds/tests/concurrent_insert_vector_bench.cpp b/src/fds/tests/concurrent_insert_vector_bench.cpp new file mode 100644 index 00000000..64e40df6 --- /dev/null +++ b/src/fds/tests/concurrent_insert_vector_bench.cpp @@ -0,0 +1,118 @@ +/********************************************************************************* + * Modifications Copyright 2017-2019 eBay Inc. + * + * Author/Developer(s): Harihara Kadayam + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + *********************************************************************************/ +#include +#include +#include +#include + +#include +#include +#include + +using namespace sisl; + +static constexpr uint32_t NUM_THREADS = 1; +std::unique_ptr< std::vector< uint64_t > > glob_lock_vector; +std::mutex glob_vector_mutex; + +std::unique_ptr< sisl::ConcurrentInsertVector< uint64_t > > glob_cvec; + +/*void setup() { + auto const nentries = SISL_OPTIONS["num_entries"].as< uint32_t >(); + auto const per_thread_count = nentries / nthreads; + + glob_lock_vector = std::make_unique< std::vector< uint64_t > >(); + glob_lock_vector->reserve(SISL_OPTIONS["num_entries"].as< uint32_t >()); + glob_cvec = std::make_unique< sisl::ConcurrentInsertVector< uint64_t > >((size_t)per_thread_count); +} */ + +void test_locked_vector_insert(benchmark::State& state) { + // auto const per_thread_count = nentries / state.threads(); + + LOGINFO("Running on {} iterations in {} threads", state.iterations(), state.threads()); + std::cout << "Running on iterations=" << state.iterations() << " in threads=" << state.threads() << "\n"; + glob_lock_vector = std::make_unique< std::vector< uint64_t > >(); + glob_lock_vector->reserve(state.iterations()); + + uint64_t i{0}; + for (auto s : state) { // Loops upto iteration count + std::lock_guard< std::mutex > lg(glob_vector_mutex); + glob_lock_vector->emplace_back(++i); + } +} + +void test_concurrent_vector_insert(benchmark::State& state) { + std::cout << "Running on iterations=" << state.iterations() << " in threads=" << state.threads() << "\n"; + auto const per_thread_count = state.iterations() / state.threads(); + glob_cvec = std::make_unique< sisl::ConcurrentInsertVector< uint64_t > >((size_t)per_thread_count); + + uint64_t i{0}; + for (auto s : state) { // Loops upto iteration count + glob_cvec->emplace_back(++i); + } +} + +#if 0 +void test_wisr_vector_insert(benchmark::State& state) { + for (auto s : state) { + for (size_t i{0}; i < NENTRIES_PER_THREAD; ++i) { + glob_wisr_vector->emplace_back(i); + } + } +} + +void test_locked_vector_read(benchmark::State& state) { + uint64_t ret; + for (auto s : state) { // Loops upto iteration count + std::lock_guard< std::mutex > lg(glob_vector_mutex); + for (auto& v : *glob_lock_vector) { + benchmark::DoNotOptimize(ret = v * 2); + } + } +} + +void test_wisr_vector_read(benchmark::State& state) { + uint64_t ret; + for (auto s : state) { // Loops upto iteration count + auto vec = glob_wisr_vector->get_copy_and_reset(); + for (auto& v : *vec) { + benchmark::DoNotOptimize(ret = v * 2); + } + } +} +#endif + +BENCHMARK(test_locked_vector_insert)->Threads(NUM_THREADS); +BENCHMARK(test_concurrent_vector_insert)->Threads(NUM_THREADS); + +#if 0 +BENCHMARK(test_wisr_vector_insert) + ->Iterations(SISL_OPTIONS["num_iterations"].as< uint32_t >()) + ->Threads(SISL_OPTIONS["num_threads"].as< uint32_t >()); +BENCHMARK(test_locked_vector_insert)->Iterations(SISL_OPTIONS["num_iterations"].as< uint32_t >())->Threads(1); +BENCHMARK(test_wisr_vector_insert)->Iterations(SISL_OPTIONS["num_iterations"].as< uint32_t >())->Threads(1); +BENCHMARK(test_locked_vector_read)->Iterations(SISL_OPTIONS["num_iterations"].as< uint32_t >())->Threads(1); +BENCHMARK(test_wisr_vector_read)->Iterations(SISL_OPTIONS["num_iterations"].as< uint32_t >())->Threads(1); +#endif + +int main(int argc, char** argv) { + int parsed_argc{argc}; + ::benchmark::Initialize(&parsed_argc, argv); + + // setup(); + ::benchmark::RunSpecifiedBenchmarks(); +} diff --git a/src/fds/tests/test_concurrent_insert_vector.cpp b/src/fds/tests/test_concurrent_insert_vector.cpp new file mode 100644 index 00000000..6ee76346 --- /dev/null +++ b/src/fds/tests/test_concurrent_insert_vector.cpp @@ -0,0 +1,117 @@ +/********************************************************************************* + * Modifications Copyright 2017-2019 eBay Inc. + * + * Author/Developer(s): Harihara Kadayam + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + *********************************************************************************/ +#include +#include +#include +#include + +#include +#include + +#include + +#include +#include + +using namespace sisl; + +SISL_OPTIONS_ENABLE(logging, test_concurrent_insert_vector) + +class ConcurrentInsertVectorTest : public testing::Test { +protected: + ConcurrentInsertVector< uint32_t > m_cvec; + std::vector< std::thread > m_threads; + +public: + ConcurrentInsertVectorTest() : testing::Test() {} + // m_cvec{ConcurrentInsertVector< uint32_t >{s_cast< size_t >(SISL_OPTIONS["num_entries"].as< uint32_t + // >())}} { + + ConcurrentInsertVectorTest(const ConcurrentInsertVectorTest&) = delete; + ConcurrentInsertVectorTest(ConcurrentInsertVectorTest&&) noexcept = delete; + ConcurrentInsertVectorTest& operator=(const ConcurrentInsertVectorTest&) = delete; + ConcurrentInsertVectorTest& operator=(ConcurrentInsertVectorTest&&) noexcept = delete; + virtual ~ConcurrentInsertVectorTest() override = default; + +protected: + void insert_and_wait() { + auto const nthreads = SISL_OPTIONS["num_threads"].as< uint32_t >(); + auto const per_thread_count = SISL_OPTIONS["num_entries"].as< uint32_t >() / nthreads; + for (size_t i{0}; i < nthreads; ++i) { + m_threads.emplace_back( + [this](uint32_t start, uint32_t count) { + for (uint32_t i{0}; i < count; ++i) { + m_cvec.push_back(start + i); + } + }, + i * per_thread_count, per_thread_count); + } + + for (auto& thr : m_threads) { + thr.join(); + } + } + + void validate_all() { + sisl::Bitset bset{SISL_OPTIONS["num_entries"].as< uint32_t >()}; + m_cvec.foreach_entry([&bset](uint32_t const& e) { bset.set_bit(e); }); + ASSERT_EQ(bset.get_next_reset_bit(0), sisl::Bitset::npos) << "Access didn't receive all entries"; + } + + void validate_all_by_iteration() { + sisl::Bitset bset{SISL_OPTIONS["num_entries"].as< uint32_t >()}; + for (const auto& e : m_cvec) { + bset.set_bit(e); + } + ASSERT_EQ(bset.get_next_reset_bit(0), sisl::Bitset::npos) << "Access didn't receive all entries"; + } +}; + +TEST_F(ConcurrentInsertVectorTest, concurrent_insertion) { + LOGINFO("Step1: Inserting {} entries in parallel in {} threads and wait", + SISL_OPTIONS["num_entries"].as< uint32_t >(), SISL_OPTIONS["num_threads"].as< uint32_t >()); + insert_and_wait(); + + LOGINFO("Step2: Validating all entries are inserted"); + validate_all(); + + LOGINFO("Step3: Validating all entries again to ensure it is readable multipled times"); + validate_all(); + + LOGINFO("Step4: Validating all entries by iterator"); + validate_all_by_iteration(); + + LOGINFO("Step5: Validating all entries again by iterator to ensure it is readable multipled times"); + validate_all_by_iteration(); +} + +SISL_OPTION_GROUP(test_concurrent_insert_vector, + (num_entries, "", "num_entries", "num_entries", + ::cxxopts::value< uint32_t >()->default_value("10000"), "number"), + (num_threads, "", "num_threads", "num_threads", ::cxxopts::value< uint32_t >()->default_value("8"), + "number")) + +int main(int argc, char* argv[]) { + int parsed_argc{argc}; + ::testing::InitGoogleTest(&parsed_argc, argv); + SISL_OPTIONS_LOAD(parsed_argc, argv, logging, test_concurrent_insert_vector); + + sisl::logging::SetLogger("test_concurrent_insert_vector"); + spdlog::set_pattern("[%D %T%z] [%^%l%$] [%n] [%t] %v"); + + return RUN_ALL_TESTS(); +}