Skip to content

Commit

Permalink
Fixed SIGINT interrupts and exception handling inside block_log::for_…
Browse files Browse the repository at this point in the history
…each_block
  • Loading branch information
vogel76 committed Apr 5, 2023
1 parent b2f5fa9 commit faa8b1d
Showing 1 changed file with 41 additions and 23 deletions.
64 changes: 41 additions & 23 deletions libraries/chain/block_log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -802,20 +802,7 @@ namespace hive { namespace chain {
FC_THROW("unknown purpose");
}

std::shared_ptr< std::thread > queue_filler_thread;

BOOST_SCOPE_EXIT( &queue_filler_thread, &stop_requested, &block_queue_condition ) {
ilog("Queue filler thread is joining.");
if( queue_filler_thread )
{
stop_requested = true;
block_queue_condition.notify_one();
queue_filler_thread->join();
}
ilog("Queue filler thread was joined.");
} BOOST_SCOPE_EXIT_END

queue_filler_thread = std::make_shared<std::thread>([&]() {
std::thread queue_filler_thread([&]() {
fc::set_thread_name("for_each_io"); // tells the OS the thread's name
fc::thread::current().set_name("for_each_io"); // tells fc the thread's name for logging
for (uint32_t block_number = starting_block_number; block_number <= ending_block_number; ++block_number)
Expand All @@ -826,36 +813,67 @@ namespace hive { namespace chain {
while (block_queue.size() >= max_blocks_to_prefetch && !stop_requested)
block_queue_condition.wait(lock);
if (stop_requested)
{
ilog("Leaving the queue thread");
return;
}
block_queue.push(full_block);
block_queue_condition.notify_one();
}
block_queue_condition.notify_one();
hive::chain::blockchain_worker_thread_pool::get_instance().enqueue_work(full_block, worker_thread_processing);
}

ilog("Exiting the queue thread");
});

for (uint32_t block_number = starting_block_number; block_number <= ending_block_number; ++block_number)
{
std::shared_ptr<full_block_type> full_block;
{
std::unique_lock<std::mutex> lock(block_queue_mutex);
while (block_queue.empty())
while (block_queue.empty() && !stop_requested)
block_queue_condition.wait(lock);
full_block = block_queue.front();
block_queue.pop();

if(!stop_requested)
{
full_block = block_queue.front();
block_queue.pop();
}

block_queue_condition.notify_one();
}
block_queue_condition.notify_one();

if (!processor(full_block))
try
{
if(!stop_requested)
stop_requested = !processor(full_block);

if (stop_requested)
{
ilog("Attempting to break a block processing loop and request block queue stop.");
std::unique_lock<std::mutex> lock(block_queue_mutex);
stop_requested = true;
block_queue_condition.notify_one();
ilog("Block queue stop requested.");
break;
}
block_queue_condition.notify_one();
break;
}
FC_CAPTURE_CALL_LOG_AND_RETHROW([&]()
{
{
std::unique_lock<std::mutex> lock(block_queue_mutex);
stop_requested = true;
block_queue_condition.notify_one();
}

ilog("Attempting to join queue_filler_thread...");
queue_filler_thread.join();
ilog("queue_filler_thread joined.");
}, ());
}

ilog("Attempting to join queue_filler_thread...");
queue_filler_thread.join();
ilog("queue_filler_thread joined.");
}

void block_log::truncate(uint32_t new_head_block_num)
Expand Down

0 comments on commit faa8b1d

Please sign in to comment.