Skip to content

Commit

Permalink
Function blockchain_worker_thread_pool::set_thread_pool_size execut…
Browse files Browse the repository at this point in the history
…es a lazy init.
  • Loading branch information
Mariusz-Trela authored and vogel76 committed Oct 24, 2023
1 parent a2bfcd2 commit 7ac4b50
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 29 deletions.
51 changes: 26 additions & 25 deletions libraries/chain/blockchain_worker_thread_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@

namespace hive { namespace chain {

namespace
{
uint32_t thread_pool_size = 0;
}

struct blockchain_worker_thread_pool::impl
{
private:

uint32_t thread_pool_size = 0;

public:

struct work_request_type
{
struct transaction_work_request_type
Expand Down Expand Up @@ -66,6 +67,7 @@ struct blockchain_worker_thread_pool::impl
void perform_work(const std::weak_ptr<full_block_type>& full_block, data_source_type data_source);
void perform_work(const work_request_type::transaction_work_request_type& transaction_work_request, data_source_type data_source);
void thread_function();
void lazy_init( uint32_t new_thread_pool_size );
};

blockchain_worker_thread_pool::impl::impl( appbase::application& app, enqueue_work_type&& enqueue_work ): theApp( app )
Expand Down Expand Up @@ -117,22 +119,10 @@ void blockchain_worker_thread_pool::impl::thread_function()
}
}

void blockchain_worker_thread_pool::impl_deleter::operator()(blockchain_worker_thread_pool::impl* ptr) const
void blockchain_worker_thread_pool::impl::lazy_init( uint32_t new_thread_pool_size )
{
delete ptr;
}

//std::shared_ptr<std::thread> fill_queue_thread = std::make_shared<std::thread>([&](){ fill_pending_queue(input_block_log_path / "block_log"); });
blockchain_worker_thread_pool::blockchain_worker_thread_pool( appbase::application& app ) :
my(std::unique_ptr<impl, impl_deleter>( new impl( app, [this]( const std::vector<std::shared_ptr<full_transaction_type>>& full_transactions, data_source_type data_source,
std::optional<uint32_t> block_number ){ this->enqueue_work( full_transactions, data_source, block_number ); } ) ) )
{
lazy_init();
}

void blockchain_worker_thread_pool::lazy_init()
{
if( (thread_pool_size == 0) || not my->threads.empty() )
thread_pool_size = new_thread_pool_size;
if( (thread_pool_size == 0) || not threads.empty() )
{
if( thread_pool_size == 0 )
{
Expand All @@ -151,18 +141,30 @@ void blockchain_worker_thread_pool::lazy_init()
ilog("Emplacing worker threads");
for (unsigned i = 1; i <= thread_pool_size; ++i)
{
my->threads.emplace_back([i, this]() {
threads.emplace_back([i, this]() {
std::ostringstream thread_name_stream;
thread_name_stream << "worker_" << i << "_of_" << thread_pool_size;
std::string thread_name = thread_name_stream.str();
fc::set_thread_name(thread_name.c_str()); // tells the OS the thread's name
fc::thread::current().set_name(thread_name); // tells fc the thread's name for logging
my->thread_function();
thread_function();
});
}
ilog("Emplacing worker threads done");
}

void blockchain_worker_thread_pool::impl_deleter::operator()(blockchain_worker_thread_pool::impl* ptr) const
{
delete ptr;
}

//std::shared_ptr<std::thread> fill_queue_thread = std::make_shared<std::thread>([&](){ fill_pending_queue(input_block_log_path / "block_log"); });
blockchain_worker_thread_pool::blockchain_worker_thread_pool( appbase::application& app ) :
my(std::unique_ptr<impl, impl_deleter>( new impl( app, [this]( const std::vector<std::shared_ptr<full_transaction_type>>& full_transactions, data_source_type data_source,
std::optional<uint32_t> block_number ){ this->enqueue_work( full_transactions, data_source, block_number ); } ) ) )
{
}

void blockchain_worker_thread_pool::impl::perform_work(const std::weak_ptr<full_block_type>& full_block_weak_ptr, data_source_type data_source)
{
try
Expand Down Expand Up @@ -509,10 +511,9 @@ void blockchain_worker_thread_pool::shutdown()
ilog("worker threads successfully shut down");
}

// this only works if called before the singleton instance is created
/* static */ void blockchain_worker_thread_pool::set_thread_pool_size(uint32_t new_thread_pool_size)
void blockchain_worker_thread_pool::set_thread_pool_size(uint32_t new_thread_pool_size)
{
thread_pool_size = new_thread_pool_size;
my->lazy_init( new_thread_pool_size );
}

} } // end namespace hive::chain
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ class blockchain_worker_thread_pool
void operator()(blockchain_worker_thread_pool::impl* ptr) const;
};

void lazy_init();
std::unique_ptr<impl, impl_deleter> my;
public:

Expand Down Expand Up @@ -73,7 +72,7 @@ class blockchain_worker_thread_pool
void set_last_checkpoint(uint32_t last_checkpoint);

void shutdown();
static void set_thread_pool_size(uint32_t thread_pool_size);
void set_thread_pool_size(uint32_t thread_pool_size);
};

} } // end namespace hive::chain
Expand Down
2 changes: 1 addition & 1 deletion libraries/plugins/chain/chain_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1345,7 +1345,7 @@ void chain_plugin::plugin_initialize(const variables_map& options)
}
#endif
uint32_t blockchain_thread_pool_size = options.at("blockchain-thread-pool-size").as<uint32_t>();
blockchain_worker_thread_pool::set_thread_pool_size(blockchain_thread_pool_size);
get_thread_pool().set_thread_pool_size(blockchain_thread_pool_size);

if (my->validate_during_replay)
get_thread_pool().set_validate_during_replay();
Expand Down
2 changes: 1 addition & 1 deletion programs/util/block_log_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -748,7 +748,7 @@ int main(int argc, char** argv)
return options_map.count("help") ? 0 : 1;
}

hive::chain::blockchain_worker_thread_pool::set_thread_pool_size(options_map["jobs"].as<int>());
thread_pool.set_thread_pool_size(options_map["jobs"].as<int>());
BOOST_SCOPE_EXIT(&thread_pool) { thread_pool.shutdown(); } BOOST_SCOPE_EXIT_END

if (options_map.count("verbose") || options_map.count("debug"))
Expand Down

0 comments on commit 7ac4b50

Please sign in to comment.