From 7ac4b5050aba8a56ce515af6767bcc3dd536e86c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CMariusz=20Trela=E2=80=9D?= Date: Tue, 10 Oct 2023 13:51:05 +0200 Subject: [PATCH] Function `blockchain_worker_thread_pool::set_thread_pool_size` executes a lazy init. --- .../chain/blockchain_worker_thread_pool.cpp | 51 ++++++++++--------- .../chain/blockchain_worker_thread_pool.hpp | 3 +- libraries/plugins/chain/chain_plugin.cpp | 2 +- programs/util/block_log_util.cpp | 2 +- 4 files changed, 29 insertions(+), 29 deletions(-) diff --git a/libraries/chain/blockchain_worker_thread_pool.cpp b/libraries/chain/blockchain_worker_thread_pool.cpp index 4b9ed0702d..0aa8260405 100644 --- a/libraries/chain/blockchain_worker_thread_pool.cpp +++ b/libraries/chain/blockchain_worker_thread_pool.cpp @@ -16,13 +16,14 @@ namespace hive { namespace chain { -namespace -{ - uint32_t thread_pool_size = 0; -} - struct blockchain_worker_thread_pool::impl { + private: + + uint32_t thread_pool_size = 0; + + public: + struct work_request_type { struct transaction_work_request_type @@ -66,6 +67,7 @@ struct blockchain_worker_thread_pool::impl void perform_work(const std::weak_ptr& full_block, data_source_type data_source); void perform_work(const work_request_type::transaction_work_request_type& transaction_work_request, data_source_type data_source); void thread_function(); + void lazy_init( uint32_t new_thread_pool_size ); }; blockchain_worker_thread_pool::impl::impl( appbase::application& app, enqueue_work_type&& enqueue_work ): theApp( app ) @@ -117,22 +119,10 @@ void blockchain_worker_thread_pool::impl::thread_function() } } -void blockchain_worker_thread_pool::impl_deleter::operator()(blockchain_worker_thread_pool::impl* ptr) const +void blockchain_worker_thread_pool::impl::lazy_init( uint32_t new_thread_pool_size ) { - delete ptr; -} - -//std::shared_ptr fill_queue_thread = std::make_shared([&](){ fill_pending_queue(input_block_log_path / "block_log"); }); -blockchain_worker_thread_pool::blockchain_worker_thread_pool( appbase::application& app ) : - my(std::unique_ptr( new impl( app, [this]( const std::vector>& full_transactions, data_source_type data_source, - std::optional block_number ){ this->enqueue_work( full_transactions, data_source, block_number ); } ) ) ) -{ - lazy_init(); -} - -void blockchain_worker_thread_pool::lazy_init() -{ - if( (thread_pool_size == 0) || not my->threads.empty() ) + thread_pool_size = new_thread_pool_size; + if( (thread_pool_size == 0) || not threads.empty() ) { if( thread_pool_size == 0 ) { @@ -151,18 +141,30 @@ void blockchain_worker_thread_pool::lazy_init() ilog("Emplacing worker threads"); for (unsigned i = 1; i <= thread_pool_size; ++i) { - my->threads.emplace_back([i, this]() { + threads.emplace_back([i, this]() { std::ostringstream thread_name_stream; thread_name_stream << "worker_" << i << "_of_" << thread_pool_size; std::string thread_name = thread_name_stream.str(); fc::set_thread_name(thread_name.c_str()); // tells the OS the thread's name fc::thread::current().set_name(thread_name); // tells fc the thread's name for logging - my->thread_function(); + thread_function(); }); } ilog("Emplacing worker threads done"); } +void blockchain_worker_thread_pool::impl_deleter::operator()(blockchain_worker_thread_pool::impl* ptr) const +{ + delete ptr; +} + +//std::shared_ptr fill_queue_thread = std::make_shared([&](){ fill_pending_queue(input_block_log_path / "block_log"); }); +blockchain_worker_thread_pool::blockchain_worker_thread_pool( appbase::application& app ) : + my(std::unique_ptr( new impl( app, [this]( const std::vector>& full_transactions, data_source_type data_source, + std::optional block_number ){ this->enqueue_work( full_transactions, data_source, block_number ); } ) ) ) +{ +} + void blockchain_worker_thread_pool::impl::perform_work(const std::weak_ptr& full_block_weak_ptr, data_source_type data_source) { try @@ -509,10 +511,9 @@ void blockchain_worker_thread_pool::shutdown() ilog("worker threads successfully shut down"); } -// this only works if called before the singleton instance is created -/* static */ void blockchain_worker_thread_pool::set_thread_pool_size(uint32_t new_thread_pool_size) +void blockchain_worker_thread_pool::set_thread_pool_size(uint32_t new_thread_pool_size) { - thread_pool_size = new_thread_pool_size; + my->lazy_init( new_thread_pool_size ); } } } // end namespace hive::chain diff --git a/libraries/chain/include/hive/chain/blockchain_worker_thread_pool.hpp b/libraries/chain/include/hive/chain/blockchain_worker_thread_pool.hpp index 510c3d7459..3d50c810b0 100644 --- a/libraries/chain/include/hive/chain/blockchain_worker_thread_pool.hpp +++ b/libraries/chain/include/hive/chain/blockchain_worker_thread_pool.hpp @@ -37,7 +37,6 @@ class blockchain_worker_thread_pool void operator()(blockchain_worker_thread_pool::impl* ptr) const; }; - void lazy_init(); std::unique_ptr my; public: @@ -73,7 +72,7 @@ class blockchain_worker_thread_pool void set_last_checkpoint(uint32_t last_checkpoint); void shutdown(); - static void set_thread_pool_size(uint32_t thread_pool_size); + void set_thread_pool_size(uint32_t thread_pool_size); }; } } // end namespace hive::chain diff --git a/libraries/plugins/chain/chain_plugin.cpp b/libraries/plugins/chain/chain_plugin.cpp index 4ca95f3f95..69b51364d6 100644 --- a/libraries/plugins/chain/chain_plugin.cpp +++ b/libraries/plugins/chain/chain_plugin.cpp @@ -1345,7 +1345,7 @@ void chain_plugin::plugin_initialize(const variables_map& options) } #endif uint32_t blockchain_thread_pool_size = options.at("blockchain-thread-pool-size").as(); - blockchain_worker_thread_pool::set_thread_pool_size(blockchain_thread_pool_size); + get_thread_pool().set_thread_pool_size(blockchain_thread_pool_size); if (my->validate_during_replay) get_thread_pool().set_validate_during_replay(); diff --git a/programs/util/block_log_util.cpp b/programs/util/block_log_util.cpp index ac7edf390a..1e52cd1415 100644 --- a/programs/util/block_log_util.cpp +++ b/programs/util/block_log_util.cpp @@ -748,7 +748,7 @@ int main(int argc, char** argv) return options_map.count("help") ? 0 : 1; } - hive::chain::blockchain_worker_thread_pool::set_thread_pool_size(options_map["jobs"].as()); + thread_pool.set_thread_pool_size(options_map["jobs"].as()); BOOST_SCOPE_EXIT(&thread_pool) { thread_pool.shutdown(); } BOOST_SCOPE_EXIT_END if (options_map.count("verbose") || options_map.count("debug"))