diff --git a/libconsensus b/libconsensus index 61de2c014..2b6f9e518 160000 --- a/libconsensus +++ b/libconsensus @@ -1 +1 @@ -Subproject commit 61de2c014b7e3f9be6de6dc0afd1b6523ea0427c +Subproject commit 2b6f9e51810e76e033a79c4b19f29f624680c1e9 diff --git a/libdevcore/Worker.h b/libdevcore/Worker.h index 5ae5a936a..a0c850347 100644 --- a/libdevcore/Worker.h +++ b/libdevcore/Worker.h @@ -60,9 +60,11 @@ class Worker { /// Starts worker thread; causes startedWorking() to be called. void startWorking(); +public: /// Stop worker thread; causes call to stopWorking(). void stopWorking(); +protected: /// Returns if worker thread is present. bool isWorking() const { Guard l( x_work ); diff --git a/libethereum/BlockChain.h b/libethereum/BlockChain.h index 75fd6690f..dcfb0ce84 100644 --- a/libethereum/BlockChain.h +++ b/libethereum/BlockChain.h @@ -450,10 +450,12 @@ class BlockChain { /// Initialise everything and ready for openning the database. void init( ChainParams const& _p ); /// Open the database. +public: void open( boost::filesystem::path const& _path, WithExisting _we ); /// Finalise everything and close the database. void close(); +private: void rotateDBIfNeeded(); ImportRoute insertBlockAndExtras( VerifiedBlockRef const& _block, bytesConstRef _receipts, diff --git a/libethereum/BlockQueue.cpp b/libethereum/BlockQueue.cpp index 358dc004d..3e87e4756 100644 --- a/libethereum/BlockQueue.cpp +++ b/libethereum/BlockQueue.cpp @@ -49,11 +49,16 @@ BlockQueue::BlockQueue() { // Allow some room for other activity unsigned verifierThreads = 1; // needed for JsonRpcTests (real mining) // std::max( // thread::hardware_concurrency(), 3U ) - 2U; - for ( unsigned i = 0; i < verifierThreads; ++i ) + for ( unsigned i = 0; i < verifierThreads; ++i ) { + if ( this->m_deleting ) + return; m_verifiers.emplace_back( [=]() { + if ( this->m_deleting ) + return; setThreadName( "blockVerifier" + toString( i ) ); this->verifierBody(); } ); + } } BlockQueue::~BlockQueue() { diff --git a/libethereum/Client.cpp b/libethereum/Client.cpp index af03033bc..efa48aea4 100644 --- a/libethereum/Client.cpp +++ b/libethereum/Client.cpp @@ -119,11 +119,11 @@ Client::Client( ChainParams const& _params, int _networkID, } Client::~Client() { - if ( m_snapshotHashComputing != nullptr ) - m_snapshotHashComputing->join(); + stopWorking(); +} - m_new_block_watch.uninstallAll(); - m_new_pending_transaction_watch.uninstallAll(); +void Client::stopWorking() { + Worker::stopWorking(); if ( m_skaleHost ) m_skaleHost->stopWorking(); // TODO Find and document a systematic way to sart/stop all @@ -131,18 +131,44 @@ Client::~Client() { else cerror << "Instance of SkaleHost was not properly created."; + if ( m_snapshotHashComputing != nullptr ) { + try { + if ( m_snapshotHashComputing->joinable() ) + m_snapshotHashComputing->join(); + } catch ( ... ) { + } + } + + m_new_block_watch.uninstallAll(); + m_new_pending_transaction_watch.uninstallAll(); + m_signalled.notify_all(); // to wake up the thread from Client::doWork() - stopWorking(); m_tq.HandleDestruction(); // l_sergiy: destroy transaction queue earlier m_bq.stop(); // l_sergiy: added to stop block queue processing - terminate(); + m_bc.close(); + LOG( m_logger ) << cc::success( "Blockchain is closed" ); - if ( !m_skaleHost || m_skaleHost->exitedForcefully() == false ) + bool isForcefulExit = + ( !m_skaleHost || m_skaleHost->exitedForcefully() == false ) ? false : true; + if ( !isForcefulExit ) { delete_lock_file( m_dbPath ); + LOG( m_logger ) << cc::success( "Deleted lock file " ) + << cc::p( boost::filesystem::canonical( m_dbPath ).string() + + std::string( "/skaled.lock" ) ); + } else { + LOG( m_logger ) << cc::fatal( "ATTENTION:" ) << " " << cc::error( "Deleted lock file " ) + << cc::p( boost::filesystem::canonical( m_dbPath ).string() + + std::string( "/skaled.lock" ) ) + << cc::error( " after foreceful exit" ); + } + LOG( m_logger ).flush(); + + terminate(); } + void Client::injectSkaleHost( std::shared_ptr< SkaleHost > _skaleHost ) { assert( !m_skaleHost ); @@ -224,7 +250,7 @@ ImportResult Client::queueBlock( bytes const& _block, bool _isSafe ) { } tuple< ImportRoute, bool, unsigned > Client::syncQueue( unsigned _max ) { - stopWorking(); + Worker::stopWorking(); return bc().sync( m_bq, m_state, _max ); } @@ -1006,12 +1032,12 @@ ExecutionResult Client::call( Address const& _from, u256 _value, Address _dest, temp.mutableState().addBalance( _from, ( u256 )( t.gas() * t.gasPrice() + t.value() ) ); ret = temp.execute( bc().lastBlockHashes(), t, Permanence::Reverted ); } catch ( InvalidNonce const& in ) { - std::cout << "exception in client call(1):" - << boost::current_exception_diagnostic_information() << std::endl; + LOG( m_logger ) << "exception in client call(1):" + << boost::current_exception_diagnostic_information() << std::endl; throw std::runtime_error( "call with invalid nonce" ); } catch ( ... ) { - std::cout << "exception in client call(2):" - << boost::current_exception_diagnostic_information() << std::endl; + LOG( m_logger ) << "exception in client call(2):" + << boost::current_exception_diagnostic_information() << std::endl; throw; } return ret; diff --git a/libethereum/Client.h b/libethereum/Client.h index 3ff2f115f..34c5d3684 100644 --- a/libethereum/Client.h +++ b/libethereum/Client.h @@ -90,6 +90,8 @@ class Client : public ClientBase, protected Worker { /// Destructor. virtual ~Client(); + void stopWorking(); + void injectSkaleHost( std::shared_ptr< SkaleHost > _skaleHost = nullptr ); /// Get information on this chain. diff --git a/libethereum/SkaleHost.cpp b/libethereum/SkaleHost.cpp index a941ef59e..9b42c77e9 100644 --- a/libethereum/SkaleHost.cpp +++ b/libethereum/SkaleHost.cpp @@ -660,7 +660,9 @@ void SkaleHost::stopWorking() { // if we could not lock from 1st attempt - then exit forcefully! if ( !locked ) { m_exitedForcefully = true; - clog( VerbosityWarning, "skale-host" ) << "Forcefully shutting down consensus!"; + clog( VerbosityWarning, "skale-host" ) + << cc::fatal( "ATTENTION:" ) << " " + << cc::error( "Forcefully shutting down consensus!" ); } diff --git a/libethereum/ValidationSchemes.cpp b/libethereum/ValidationSchemes.cpp index 8a03a1e0f..5d72fe0dd 100644 --- a/libethereum/ValidationSchemes.cpp +++ b/libethereum/ValidationSchemes.cpp @@ -153,9 +153,28 @@ void validateConfigJson( js::mObject const& _obj ) { {"logLevel", {{js::str_type}, JsonFieldPresence::Optional}}, {"logLevelConfig", {{js::str_type}, JsonFieldPresence::Optional}}, {"logLevelProposal", {{js::str_type}, JsonFieldPresence::Optional}}, + {"aa", {{js::str_type}, JsonFieldPresence::Optional}}, + {"acceptors", {{js::int_type}, JsonFieldPresence::Optional}}, + {"adminOrigins", {{js::array_type}, JsonFieldPresence::Optional}}, + {"db-path", {{js::str_type}, JsonFieldPresence::Optional}}, + {"ipcpath", {{js::str_type}, JsonFieldPresence::Optional}}, + {"enable-debug-behavior-apis", {{js::bool_type}, JsonFieldPresence::Optional}}, + {"unsafe-transactions", {{js::bool_type}, JsonFieldPresence::Optional}}, + {"web3-trace", {{js::bool_type}, JsonFieldPresence::Optional}}, + {"web3-shutdown", {{js::bool_type}, JsonFieldPresence::Optional}}, + {"unsafe-transactions", {{js::bool_type}, JsonFieldPresence::Optional}}, + {"max-connections", {{js::int_type}, JsonFieldPresence::Optional}}, + {"max-http-queues", {{js::int_type}, JsonFieldPresence::Optional}}, + {"ws-mode", {{js::str_type}, JsonFieldPresence::Optional}}, + {"ws-log", {{js::str_type}, JsonFieldPresence::Optional}}, {"imaMainNet", {{js::str_type}, JsonFieldPresence::Optional}}, {"imaMessageProxySChain", {{js::str_type}, JsonFieldPresence::Optional}}, - {"imaMessageProxyMainNet", {{js::str_type}, JsonFieldPresence::Optional}}} ); + {"imaMessageProxyMainNet", {{js::str_type}, JsonFieldPresence::Optional}}, + {"imaMessageProxySChain", {{js::str_type}, JsonFieldPresence::Optional}}, + {"imaMessageProxyMainNet", {{js::str_type}, JsonFieldPresence::Optional}}, + {"imaCallerAddressSChain", {{js::str_type}, JsonFieldPresence::Optional}}, + {"imaCallerAddressMainNet", {{js::str_type}, JsonFieldPresence::Optional}}, + {"wallets", {{js::obj_type}, JsonFieldPresence::Optional}}} ); std::string keyShareName = ""; try { @@ -207,9 +226,13 @@ void validateConfigJson( js::mObject const& _obj ) { {"ip6", {{js::str_type}, JsonFieldPresence::Optional}}, {"basePort6", {{js::int_type}, JsonFieldPresence::Optional}}, {"httpRpcPort", {{js::int_type}, JsonFieldPresence::Optional}}, + {"httpRpcPort6", {{js::int_type}, JsonFieldPresence::Optional}}, {"httpsRpcPort", {{js::int_type}, JsonFieldPresence::Optional}}, + {"httpsRpcPort6", {{js::int_type}, JsonFieldPresence::Optional}}, {"wsRpcPort", {{js::int_type}, JsonFieldPresence::Optional}}, + {"wsRpcPort6", {{js::int_type}, JsonFieldPresence::Optional}}, {"wssRpcPort", {{js::int_type}, JsonFieldPresence::Optional}}, + {"wssRpcPort6", {{js::int_type}, JsonFieldPresence::Optional}}, {"schainIndex", {{js::int_type}, JsonFieldPresence::Required}}, {"publicKey", {{js::str_type}, JsonFieldPresence::Optional}}, {"blsPublicKey0", {{js::str_type}, JsonFieldPresence::Optional}}, diff --git a/libskutils/include/skutils/dispatch.h b/libskutils/include/skutils/dispatch.h index 90aefc68e..d0a12bb70 100644 --- a/libskutils/include/skutils/dispatch.h +++ b/libskutils/include/skutils/dispatch.h @@ -664,6 +664,7 @@ class queue : public ref_retain_release { class domain : public ref_retain_release { public: + static std::atomic_bool g_bVerboseDispatchThreadDetailsLogging; typedef skutils::multithreading::recursive_mutex_type mutex_type; typedef std::lock_guard< mutex_type > lock_type; @@ -708,6 +709,7 @@ class domain : public ref_retain_release { // skutils::thread_pool thread_pool_; std::atomic_size_t cntRunningThreads_; + std::atomic_size_t cntStartTestedThreads_; // std::atomic_uint64_t decrease_accumulators_counter_, decrease_accumulators_period_; // diff --git a/libskutils/include/skutils/thread_pool.h b/libskutils/include/skutils/thread_pool.h index 52ac741b2..64aeb4e1a 100644 --- a/libskutils/include/skutils/thread_pool.h +++ b/libskutils/include/skutils/thread_pool.h @@ -146,6 +146,8 @@ class thread_pool { // wake up one thread if its waiting conditional_lock_.notify_one(); } + void notify_one() { conditional_lock_.notify_one(); } + void notify_all() { conditional_lock_.notify_all(); } }; /// class thread_pool }; // namespace skutils diff --git a/libskutils/src/dispatch.cpp b/libskutils/src/dispatch.cpp index 362ebc544..bfe3f8a0c 100644 --- a/libskutils/src/dispatch.cpp +++ b/libskutils/src/dispatch.cpp @@ -1114,6 +1114,8 @@ bool queue::job_run() { // fetch first asynchronously stored job and run //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +std::atomic_bool domain::g_bVerboseDispatchThreadDetailsLogging = false; + domain::domain( const size_t nNumberOfThreads, // = 0 // 0 means use CPU count const size_t nQueueLimit // = 0 ) @@ -1130,6 +1132,7 @@ domain::domain( const size_t nNumberOfThreads, // = 0 // 0 means use CPU count thread_pool_( ( nNumberOfThreads > 0 ) ? nNumberOfThreads : skutils::tools::cpu_count(), nQueueLimit ), cntRunningThreads_( 0 ), + cntStartTestedThreads_( 0 ), decrease_accumulators_counter_( uint64_t( 0 ) ), decrease_accumulators_period_( uint64_t( 1000 ) * uint64_t( 1000 ) ) // rare enough { @@ -1256,12 +1259,12 @@ void domain::impl_startup( size_t nWaitMilliSeconds /*= size_t(-1)*/ ) { // if( ! shutdown_flag_ ) // return; shutdown_flag_ = false; - size_t idxThread, cntThreads = thread_pool_.number_of_threads(); - if ( cntThreads == 0 ) + size_t idxThread, cntThreadsInPool = thread_pool_.number_of_threads(); + if ( cntThreadsInPool == 0 ) throw std::runtime_error( "dispatch domain failed to initialize thread pool" ); // init thread pool - size_t cntThreadsToStart = cntThreads; - for ( size_t cntThreadStartupAttempts = cntThreads * 2; cntThreadStartupAttempts != 0; + size_t cntThreadsToStart = cntThreadsInPool; + for ( size_t cntThreadStartupAttempts = cntThreadsInPool * 2; cntThreadStartupAttempts != 0; --cntThreadStartupAttempts ) { std::atomic_size_t cntFailedToStartThreads; cntFailedToStartThreads = 0; @@ -1276,38 +1279,89 @@ void domain::impl_startup( size_t nWaitMilliSeconds /*= size_t(-1)*/ ) { std::this_thread::sleep_for( std::chrono::milliseconds( 500 ) ); } try { - thread_pool_.safe_submit_without_future_te( [this, strPerformanceQueueName]() { - ++cntRunningThreads_; - try { - size_t nTaskNumberInThisThread = 0; - for ( ; true; ) { - if ( shutdown_flag_ ) - break; - { // block - std::unique_lock< fetch_mutex_type > lock( fetch_mutex_ ); - fetch_lock_.wait( lock ); - } // block - if ( shutdown_flag_ ) - break; + thread_pool_.safe_submit_without_future_te( + [this, strPerformanceQueueName, idxThread, cntThreadsToStart]() { + ++cntRunningThreads_; + ++cntStartTestedThreads_; + try { + if ( g_bVerboseDispatchThreadDetailsLogging ) { + std::string strThreadStartupMessage = + cc::deep_note( "Dispatch:" ) + " " + + cc::debug( "Started thread " ) + cc::size10( idxThread ) + + cc::debug( " of " ) + cc::size10( cntThreadsToStart ) + + cc::debug( ", have " ) + + cc::size10( size_t( cntRunningThreads_ ) ) + + cc::debug( " running thread(s)" ) + "\n"; + std::cout << strThreadStartupMessage; + std::cout.flush(); + } + size_t nTaskNumberInThisThread = 0; for ( ; true; ) { - // - std::string strPerformanceActionName = skutils::tools::format( - "task %zu", nTaskNumberInThisThread++ ); - skutils::task::performance::action a( - strPerformanceQueueName, strPerformanceActionName ); - // - if ( !run_one() ) + if ( shutdown_flag_ ) break; + { // block + std::unique_lock< fetch_mutex_type > lock( fetch_mutex_ ); + fetch_lock_.wait( lock ); + } // block if ( shutdown_flag_ ) break; - // fetch_lock_.notify_one(); // spread the work into other - // threads - } - } /// for( ; true ; ) - } catch ( ... ) { - } - --cntRunningThreads_; - } ); + for ( ; true; ) { + // + std::string strPerformanceActionName = + skutils::tools::format( + "task %zu", nTaskNumberInThisThread++ ); + skutils::task::performance::action a( + strPerformanceQueueName, strPerformanceActionName ); + // + if ( !run_one() ) + break; + if ( shutdown_flag_ ) + break; + // fetch_lock_.notify_one(); // spread the work into other + // threads + } + } /// for( ; true ; ) + } catch ( const std::exception& ex ) { + std::string strError( ex.what() ); + if ( strError.empty() ) + strError = "Exception without description"; + std::string strErrorMessage = + cc::deep_note( "Dispatch:" ) + " " + + cc::fatal( "CRITICAL ERROR:" ) + + cc::error( "Got exception in thread " ) + + cc::size10( idxThread ) + cc::error( " of " ) + + cc::size10( cntThreadsToStart ) + cc::error( ", have " ) + + cc::size10( size_t( cntRunningThreads_ ) ) + + cc::error( " running threads, exception info: " ) + + cc::warn( strError ) + "\n"; + std::cout << strErrorMessage; + std::cout.flush(); + } catch ( ... ) { + std::string strErrorMessage = + cc::deep_note( "Dispatch:" ) + " " + + cc::fatal( "CRITICAL ERROR:" ) + + cc::error( "Got exception in thread " ) + + cc::size10( idxThread ) + cc::error( " of " ) + + cc::size10( cntThreadsToStart ) + cc::error( ", have " ) + + cc::size10( size_t( cntRunningThreads_ ) ) + + cc::error( " running threads, exception info: " ) + + cc::warn( "Unknown exception" ) + "\n"; + std::cout << strErrorMessage; + std::cout.flush(); + } + --cntRunningThreads_; + if ( g_bVerboseDispatchThreadDetailsLogging ) { + std::string strThreadFinalMessage = + cc::deep_note( "Dispatch:" ) + " " + + cc::debug( "Exiting thread " ) + cc::size10( idxThread ) + + cc::debug( " of " ) + cc::size10( cntThreadsToStart ) + + cc::debug( ", have " ) + + cc::size10( size_t( cntRunningThreads_ ) ) + + cc::debug( " running thread(s)" ) + "\n"; + std::cout << strThreadFinalMessage; + std::cout.flush(); + } + } ); } catch ( std::exception& ex ) { strError = ex.what(); if ( strError.empty() ) @@ -1317,9 +1371,14 @@ void domain::impl_startup( size_t nWaitMilliSeconds /*= size_t(-1)*/ ) { } if ( strError.empty() ) break; - std::cout << "Failed submit initialization task for the \"" - << strPerformanceQueueName << "\" queue at attempt " << idxAttempt - << " of " << cntAttempts << ", error is: " << strError << "\n"; + std::string strErrorMessage = + cc::deep_note( "Dispatch:" ) + " " + cc::fatal( "CRITICAL ERROR:" ) + + cc::error( " Failed submit initialization task for the " ) + + cc::info( strPerformanceQueueName ) + cc::error( " queue at attempt " ) + + cc::size10( idxAttempt ) + cc::error( " of " ) + cc::size10( cntAttempts ) + + cc::error( ", error is: " ) + cc::warn( strError ) + "\n"; + std::cout << strErrorMessage; + std::cout.flush(); } // for( size_t idxAttempt = 0; idxAttempt < 3; ++ idxAttempt ) { if ( !strError.empty() ) { ++cntFailedToStartThreads; @@ -1332,19 +1391,36 @@ void domain::impl_startup( size_t nWaitMilliSeconds /*= size_t(-1)*/ ) { std::this_thread::sleep_for( std::chrono::milliseconds( 10 ) ); } // for ( size_t cntThreadStartupAttempts = 1; cntThreadStartupAttempts != 0; // --cntThreadStartupAttempts ) - size_t cntStartedAndRunningThreads = size_t( cntRunningThreads_ ); + thread_pool_.notify_all(); // faster encueued call processing here because we knew we did + // submit first calls above + size_t cntStartedAndRunningThreads = size_t( cntStartTestedThreads_ ); size_t cntWaitAttempts = - ( nWaitMilliSeconds == 0 || nWaitMilliSeconds == size_t( -1 ) ) ? 3000 : nWaitMilliSeconds; + ( nWaitMilliSeconds == 0 || nWaitMilliSeconds == size_t( -1 ) ) ? 10000 : nWaitMilliSeconds; for ( size_t idxWaitAttempt = 0; idxWaitAttempt < cntWaitAttempts; ++idxWaitAttempt ) { - if ( cntStartedAndRunningThreads == cntThreads ) + if ( cntStartedAndRunningThreads == cntThreadsInPool ) break; std::this_thread::sleep_for( std::chrono::milliseconds( 1 ) ); - cntStartedAndRunningThreads = size_t( cntRunningThreads_ ); + cntStartedAndRunningThreads = size_t( cntStartTestedThreads_ ); } - if ( cntStartedAndRunningThreads != cntThreads ) { + if ( cntStartedAndRunningThreads != cntThreadsInPool ) { fetch_lock_.notify_all(); // notify earlier - throw std::runtime_error( - "dispatch domain failed to initialize all threads in thread pool" ); + // throw std::runtime_error( + // "dispatch domain failed to initialize all threads in thread pool" ); + std::string strWarningMessage = + cc::deep_note( "Dispatch:" ) + " " + cc::warn( "WARNING: expected " ) + + cc::size10( size_t( cntThreadsInPool ) ) + + cc::warn( " threads in pool to be started at this time but have " ) + + cc::size10( size_t( cntStartedAndRunningThreads ) ) + cc::warn( ", startup is slow!" ) + + "\n"; + std::cout << strWarningMessage; + std::cout.flush(); + } else { + std::string strSuccessMessage = cc::deep_note( "Dispatch:" ) + " " + + cc::success( "Have all " ) + + cc::size10( size_t( cntThreadsInPool ) ) + + cc::success( " threads in pool started fast" ) + "\n"; + std::cout << strSuccessMessage; + std::cout.flush(); } } void domain::impl_shutdown() { @@ -1360,6 +1436,13 @@ void domain::impl_shutdown() { size_t cntThreads = thread_pool_.number_of_threads(); if ( cntThreads > 0 ) { for ( ; true; ) { + if ( g_bVerboseDispatchThreadDetailsLogging ) { + std::string strMessage = cc::deep_note( "Dispatch:" ) + " " + cc::debug( "Have " ) + + cc::size10( size_t( cntRunningThreads_ ) ) + + cc::debug( " thread(s) still running..." ) + "\n"; + std::cout << strMessage; + std::cout.flush(); + } shutdown_flag_ = true; fetch_lock_.notify_all(); size_t cntRunningThreads = size_t( cntRunningThreads_ ); @@ -1368,17 +1451,40 @@ void domain::impl_shutdown() { std::this_thread::sleep_for( std::chrono::milliseconds( 1 ) ); } // for( ; true; ) } // if( cntThreads > 0 ) + std::cout << cc::deep_note( "Dispatch:" ) + " " + cc::success( "All threads stopped" ) + "\n"; + std::cout.flush(); // wait loop to shutdown if ( pLoop ) { + if ( g_bVerboseDispatchThreadDetailsLogging ) { + std::cout << cc::deep_note( "Dispatch:" ) + " " + + cc::debug( "Waiting for dispatch loop" ) + "\n"; + std::cout.flush(); + } pLoop->wait(); try { + if ( g_bVerboseDispatchThreadDetailsLogging ) { + std::cout << cc::deep_note( "Dispatch:" ) + " " + + cc::debug( "Stopping for dispatch loop" ) + "\n"; + std::cout.flush(); + } if ( loop_thread_.joinable() ) loop_thread_.join(); + std::cout << cc::deep_note( "Dispatch:" ) + " " + + cc::success( "Dispatch loop stopped" ) + "\n"; + std::cout.flush(); } catch ( ... ) { } } // if( pLoop ) // shutdown, remove all queues + if ( g_bVerboseDispatchThreadDetailsLogging ) { + std::cout << cc::deep_note( "Dispatch:" ) + " " + cc::debug( "Removing dispatch queues" ) + + "\n"; + std::cout.flush(); + } queue_remove_all(); + std::cout << cc::deep_note( "Dispatch:" ) + " " + cc::success( "All dispatch queues removed" ) + + "\n"; + std::cout.flush(); } queue_ptr_t domain::impl_find_queue_to_run() { // find queue with minimal accumulator and // remove it from with_jobs_ diff --git a/libskutils/src/thread_pool.cpp b/libskutils/src/thread_pool.cpp index 732c8b764..e3feaf188 100644 --- a/libskutils/src/thread_pool.cpp +++ b/libskutils/src/thread_pool.cpp @@ -19,7 +19,8 @@ void thread_pool::worker::invoke() { if ( pool_->queue_.empty() ) { pool_->conditional_lock_.wait( lock ); } - was_dequeued = pool_->queue_.dequeue( func ); + if ( !pool_->shutdown_flag_ ) + was_dequeued = pool_->queue_.dequeue( func ); } // block if ( was_dequeued ) { try { @@ -64,7 +65,9 @@ void thread_pool::shutdown() { threads_[i].join(); } catch ( ... ) { } + conditional_lock_.notify_all(); } + threads_.clear(); } }; // namespace skutils diff --git a/skaled/main.cpp b/skaled/main.cpp index 0889082b9..90034bf73 100644 --- a/skaled/main.cpp +++ b/skaled/main.cpp @@ -314,6 +314,9 @@ get_machine_ip_addresses_6() { // first-interface name, second-address return listIfaceInfos6; } +static std::unique_ptr< Client > g_client; +unique_ptr< ModularServer<> > g_jsonrpcIpcServer; + int main( int argc, char** argv ) try { cc::_on_ = false; cc::_max_value_size_ = 2048; @@ -326,6 +329,15 @@ int main( int argc, char** argv ) try { if ( nSignalNo == SIGPIPE ) return; bool stopWasRaisedBefore = skutils::signal::g_bStop; + if ( !stopWasRaisedBefore ) { + if ( g_jsonrpcIpcServer.get() ) { + g_jsonrpcIpcServer->StopListening(); + g_jsonrpcIpcServer.reset( nullptr ); + } + if ( g_client ) { + g_client->stopWorking(); + } + } skutils::signal::g_bStop = true; std::string strMessagePrefix = stopWasRaisedBefore ? cc::error( "\nStop flag was already raised on. " ) + @@ -1347,7 +1359,6 @@ int main( int argc, char** argv ) try { // chainParams, withExisting, nodeMode == NodeMode::Full ? caps : set< string >(), false // ); - std::unique_ptr< Client > client; std::shared_ptr< GasPricer > gasPricer; auto rotationFlagDirPath = configPath.parent_path(); @@ -1360,12 +1371,12 @@ int main( int argc, char** argv ) try { NoProof::init(); if ( chainParams.sealEngineName == Ethash::name() ) { - client.reset( new eth::EthashClient( chainParams, ( int ) chainParams.networkID, + g_client.reset( new eth::EthashClient( chainParams, ( int ) chainParams.networkID, shared_ptr< GasPricer >(), snapshotManager, instanceMonitor, getDataDir(), withExisting, TransactionQueue::Limits{c_transactionQueueSize, 1024}, isStartedFromSnapshot ) ); } else if ( chainParams.sealEngineName == NoProof::name() ) { - client.reset( new eth::Client( chainParams, ( int ) chainParams.networkID, + g_client.reset( new eth::Client( chainParams, ( int ) chainParams.networkID, shared_ptr< GasPricer >(), snapshotManager, instanceMonitor, getDataDir(), withExisting, TransactionQueue::Limits{c_transactionQueueSize, 1024}, isStartedFromSnapshot ) ); @@ -1373,32 +1384,33 @@ int main( int argc, char** argv ) try { BOOST_THROW_EXCEPTION( ChainParamsInvalid() << errinfo_comment( "Unknown seal engine: " + chainParams.sealEngineName ) ); - client->setAuthor( chainParams.sChain.owner ); + g_client->setAuthor( chainParams.sChain.owner ); - DefaultConsensusFactory cons_fact( *client ); + DefaultConsensusFactory cons_fact( *g_client ); setenv( "DATA_DIR", getDataDir().c_str(), 0 ); std::shared_ptr< SkaleHost > skaleHost = - std::make_shared< SkaleHost >( *client, &cons_fact ); + std::make_shared< SkaleHost >( *g_client, &cons_fact ); gasPricer = std::make_shared< ConsensusGasPricer >( *skaleHost ); - client->setGasPricer( gasPricer ); - client->injectSkaleHost( skaleHost ); - client->startWorking(); + g_client->setGasPricer( gasPricer ); + g_client->injectSkaleHost( skaleHost ); + g_client->startWorking(); const auto* buildinfo = skale_get_buildinfo(); - client->setExtraData( rlpList( 0, string{buildinfo->project_version}.substr( 0, 5 ) + "++" + - string{buildinfo->git_commit_hash}.substr( 0, 4 ) + - string{buildinfo->build_type}.substr( 0, 1 ) + - string{buildinfo->system_name}.substr( 0, 5 ) + - string{buildinfo->compiler_id}.substr( 0, 3 ) ) ); + g_client->setExtraData( + rlpList( 0, string{buildinfo->project_version}.substr( 0, 5 ) + "++" + + string{buildinfo->git_commit_hash}.substr( 0, 4 ) + + string{buildinfo->build_type}.substr( 0, 1 ) + + string{buildinfo->system_name}.substr( 0, 5 ) + + string{buildinfo->compiler_id}.substr( 0, 3 ) ) ); } auto toNumber = [&]( string const& s ) -> unsigned { if ( s == "latest" ) - return client->number(); + return g_client->number(); if ( s.size() == 64 || ( s.size() == 66 && s.substr( 0, 2 ) == "0x" ) ) - return client->blockChain().number( h256( s ) ); + return g_client->blockChain().number( h256( s ) ); try { return static_cast< unsigned int >( stoul( s ) ); } catch ( ... ) { @@ -1413,7 +1425,7 @@ int main( int argc, char** argv ) try { unsigned last = toNumber( exportTo ); for ( unsigned i = toNumber( exportFrom ); i <= last; ++i ) { - bytes block = client->blockChain().block( client->blockChain().numberHash( i ) ); + bytes block = g_client->blockChain().block( g_client->blockChain().numberHash( i ) ); switch ( exportFormat ) { case Format::Binary: out.write( reinterpret_cast< char const* >( block.data() ), @@ -1447,12 +1459,12 @@ int main( int argc, char** argv ) try { unsigned imported = 0; unsigned block_no = static_cast< unsigned int >( -1 ); - cout << "Skipping " << client->syncStatus().currentBlockNumber + 1 << " blocks.\n"; + cout << "Skipping " << g_client->syncStatus().currentBlockNumber + 1 << " blocks.\n"; MICROPROFILE_ENTERI( "main", "bunch 10s", MP_LIGHTGRAY ); while ( in.peek() != -1 && ( !exitHandler.shouldExit() ) ) { bytes block( 8 ); { - if ( block_no >= client->number() ) { + if ( block_no >= g_client->number() ) { MICROPROFILE_ENTERI( "main", "in.read", -1 ); } in.read( reinterpret_cast< char* >( block.data() ), @@ -1461,7 +1473,7 @@ int main( int argc, char** argv ) try { if ( block.size() >= 8 ) { in.read( reinterpret_cast< char* >( block.data() + 8 ), std::streamsize( block.size() ) - 8 ); - if ( block_no >= client->number() ) { + if ( block_no >= g_client->number() ) { MICROPROFILE_LEAVE(); } } else { @@ -1470,10 +1482,10 @@ int main( int argc, char** argv ) try { } block_no++; - if ( block_no <= client->number() ) + if ( block_no <= g_client->number() ) continue; - switch ( client->queueBlock( block, safeImport ) ) { + switch ( g_client->queueBlock( block, safeImport ) ) { case ImportResult::Success: good++; break; @@ -1496,7 +1508,7 @@ int main( int argc, char** argv ) try { } // sync chain with queue - tuple< ImportRoute, bool, unsigned > r = client->syncQueue( 10 ); + tuple< ImportRoute, bool, unsigned > r = g_client->syncQueue( 10 ); imported += get< 2 >( r ); double e = @@ -1510,9 +1522,9 @@ int main( int argc, char** argv ) try { cout << i << " more imported at " << i / d << " blocks/s. " << imported << " imported in " << e << " seconds at " << ( round( imported * 10 / e ) / 10 ) << " blocks/s (#" - << client->number() << ")" + << g_client->number() << ")" << "\n"; - fprintf( client->performance_fd, "%d\t%.2lf\n", client->number(), i / d ); + fprintf( g_client->performance_fd, "%d\t%.2lf\n", g_client->number(), i / d ); last = static_cast< unsigned >( e ); lastImported = imported; MICROPROFILE_ENTERI( "main", "bunch 10s", MP_LIGHTGRAY ); @@ -1526,14 +1538,14 @@ int main( int argc, char** argv ) try { MICROPROFILE_SCOPEI( "main", "sleep 1 sec", MP_DIMGREY ); this_thread::sleep_for( chrono::seconds( 1 ) ); } - tie( ignore, moreToImport, ignore ) = client->syncQueue( 100000 ); + tie( ignore, moreToImport, ignore ) = g_client->syncQueue( 100000 ); } double e = chrono::duration_cast< chrono::milliseconds >( chrono::steady_clock::now() - t ) .count() / 1000.0; cout << imported << " imported in " << e << " seconds at " - << ( round( imported * 10 / e ) / 10 ) << " blocks/s (#" << client->number() + << ( round( imported * 10 / e ) / 10 ) << " blocks/s (#" << g_client->number() << ")\n"; } ); // thread th.join(); @@ -1574,12 +1586,12 @@ int main( int argc, char** argv ) try { if ( mode == OperationMode::ImportSnapshot ) { try { - auto stateImporter = client->createStateImporter(); - auto blockChainImporter = client->createBlockChainImporter(); + auto stateImporter = g_client->createStateImporter(); + auto blockChainImporter = g_client->createBlockChainImporter(); SnapshotImporter importer( *stateImporter, *blockChainImporter ); auto snapshotStorage( createSnapshotStorage( filename ) ); - importer.import( *snapshotStorage, client->blockChain().genesisHash() ); + importer.import( *snapshotStorage, g_client->blockChain().genesisHash() ); // continue with regular sync from the snapshot block } catch ( ... ) { cerr << "Error during importing the snapshot: " @@ -1589,14 +1601,13 @@ int main( int argc, char** argv ) try { } if ( nodeMode == NodeMode::Full ) { - client->setSealer( m.minerType() ); + g_client->setSealer( m.minerType() ); if ( networkID != NoNetworkID ) - client->setNetworkId( networkID ); + g_client->setNetworkId( networkID ); } - cout << "Mining Beneficiary: " << client->author() << endl; + cout << "Mining Beneficiary: " << g_client->author() << endl; - unique_ptr< ModularServer<> > jsonrpcIpcServer; unique_ptr< rpc::SessionManager > sessionManager; unique_ptr< SimpleAccountHolder > accountHolder; @@ -1645,19 +1656,19 @@ int main( int argc, char** argv ) try { if ( !alwaysConfirm || allowedDestinations.count( _t.to ) ) return true; - string r = - getResponse( _t.userReadable( isProxy, - [&]( TransactionSkeleton const& _t ) -> pair< bool, string > { - h256 contractCodeHash = client->postState().codeHash( _t.to ); - if ( contractCodeHash == EmptySHA3 ) - return std::make_pair( false, std::string() ); - // TODO: actually figure out the natspec. we'll need the - // natspec database here though. - return std::make_pair( true, std::string() ); - }, - [&]( Address const& _a ) { return _a.hex(); } ) + - "\nEnter yes/no/always (always to this address): ", - {"yes", "n", "N", "no", "NO", "always"} ); + string r = getResponse( + _t.userReadable( isProxy, + [&]( TransactionSkeleton const& _t ) -> pair< bool, string > { + h256 contractCodeHash = g_client->postState().codeHash( _t.to ); + if ( contractCodeHash == EmptySHA3 ) + return std::make_pair( false, std::string() ); + // TODO: actually figure out the natspec. we'll need the + // natspec database here though. + return std::make_pair( true, std::string() ); + }, + [&]( Address const& _a ) { return _a.hex(); } ) + + "\nEnter yes/no/always (always to this address): ", + {"yes", "n", "N", "no", "NO", "always"} ); if ( r == "always" ) allowedDestinations.insert( _t.to ); return r == "yes" || r == "always"; @@ -1686,13 +1697,13 @@ int main( int argc, char** argv ) try { sessionManager.reset( new rpc::SessionManager() ); accountHolder.reset( new SimpleAccountHolder( - [&]() { return client.get(); }, getAccountPassword, keyManager, authenticator ) ); + [&]() { return g_client.get(); }, getAccountPassword, keyManager, authenticator ) ); - auto ethFace = new rpc::Eth( *client, *accountHolder.get() ); + auto ethFace = new rpc::Eth( *g_client, *accountHolder.get() ); /// skale - auto skaleFace = new rpc::Skale( *client ); + auto skaleFace = new rpc::Skale( *g_client ); /// skaleStatsFace - auto skaleStatsFace = new rpc::SkaleStats( configPath.string(), *client ); + auto skaleStatsFace = new rpc::SkaleStats( configPath.string(), *g_client ); std::string argv_string; { @@ -1702,19 +1713,19 @@ int main( int argc, char** argv ) try { argv_string = ss.str(); } - jsonrpcIpcServer.reset( new FullServer( ethFace, + g_jsonrpcIpcServer.reset( new FullServer( ethFace, skaleFace, /// skale skaleStatsFace, /// skaleStats new rpc::Net( chainParams ), new rpc::Web3( clientVersion() ), - new rpc::Personal( keyManager, *accountHolder, *client ), - new rpc::AdminEth( *client, *gasPricer.get(), keyManager, *sessionManager.get() ), - bEnabledDebugBehaviorAPIs ? new rpc::Debug( *client, argv_string ) : nullptr, + new rpc::Personal( keyManager, *accountHolder, *g_client ), + new rpc::AdminEth( *g_client, *gasPricer.get(), keyManager, *sessionManager.get() ), + bEnabledDebugBehaviorAPIs ? new rpc::Debug( *g_client, argv_string ) : nullptr, nullptr ) ); if ( is_ipc ) { try { auto ipcConnector = new IpcServer( "geth" ); - jsonrpcIpcServer->addConnector( ipcConnector ); + g_jsonrpcIpcServer->addConnector( ipcConnector ); if ( !ipcConnector->StartListening() ) { clog( VerbosityError, "main" ) << "Cannot start listening for RPC requests on ipc port: " @@ -2014,7 +2025,7 @@ int main( int argc, char** argv ) try { return skaleFace->impl_skale_downloadSnapshotFragmentBinary( joRequest ); }; auto skale_server_connector = new SkaleServerOverride( chainParams, - fn_binary_snapshot_download, cntServers, client.get(), chainParams.nodeInfo.ip, + fn_binary_snapshot_download, cntServers, g_client.get(), chainParams.nodeInfo.ip, nExplicitPortHTTP4, chainParams.nodeInfo.ip6, nExplicitPortHTTP6, chainParams.nodeInfo.ip, nExplicitPortHTTPS4, chainParams.nodeInfo.ip6, nExplicitPortHTTPS6, chainParams.nodeInfo.ip, nExplicitPortWS4, @@ -2030,7 +2041,7 @@ int main( int argc, char** argv ) try { // skale_server_connector->m_bTraceCalls = bTraceJsonRpcCalls; skale_server_connector->max_connection_set( maxConnections ); - jsonrpcIpcServer->addConnector( skale_server_connector ); + g_jsonrpcIpcServer->addConnector( skale_server_connector ); if ( !skale_server_connector->StartListening() ) { // TODO Will it delete itself? return EX_IOERR; } @@ -2237,17 +2248,23 @@ int main( int argc, char** argv ) try { dev::setThreadName( "main" ); - if ( client ) { - unsigned int n = client->blockChain().details().number; + if ( g_client ) { + unsigned int n = g_client->blockChain().details().number; unsigned int mining = 0; while ( !exitHandler.shouldExit() ) - stopSealingAfterXBlocks( client.get(), n, mining ); - } else + stopSealingAfterXBlocks( g_client.get(), n, mining ); + } else { while ( !exitHandler.shouldExit() ) this_thread::sleep_for( chrono::milliseconds( 1000 ) ); - - if ( jsonrpcIpcServer.get() ) - jsonrpcIpcServer->StopListening(); + } + if ( g_jsonrpcIpcServer.get() ) { + g_jsonrpcIpcServer->StopListening(); + g_jsonrpcIpcServer.reset( nullptr ); + } + if ( g_client ) { + g_client->stopWorking(); + g_client.reset( nullptr ); + } std::cerr << localeconv()->decimal_point << std::endl; @@ -2264,21 +2281,25 @@ int main( int argc, char** argv ) try { } catch ( const Client::CreationException& ex ) { clog( VerbosityError, "main" ) << dev::nested_exception_what( ex ); // TODO close microprofile!! + g_client.reset( nullptr ); return EXIT_FAILURE; } catch ( const SkaleHost::CreationException& ex ) { clog( VerbosityError, "main" ) << dev::nested_exception_what( ex ); // TODO close microprofile!! + g_client.reset( nullptr ); return EXIT_FAILURE; } catch ( const std::exception& ex ) { clog( VerbosityError, "main" ) << "CRITICAL " << dev::nested_exception_what( ex ); clog( VerbosityError, "main" ) << "\n" << skutils::signal::generate_stack_trace() << "\n" << std::endl; + g_client.reset( nullptr ); return EXIT_FAILURE; } catch ( ... ) { clog( VerbosityError, "main" ) << "CRITICAL unknown error"; clog( VerbosityError, "main" ) << "\n" << skutils::signal::generate_stack_trace() << "\n" << std::endl; + g_client.reset( nullptr ); return EXIT_FAILURE; }