diff --git a/README.md b/README.md index 2f56f64..da279de 100644 --- a/README.md +++ b/README.md @@ -94,9 +94,9 @@ Plugins can be registered by calling `appbase::application::register_plugin()`. ### Boost ASIO AppBase maintains a singleton `application` instance which can be accessed via `appbase::app()`. This -application owns a `boost::asio::io_service` which starts running when `appbase::exec()` is called. If +application owns a `boost::asio::io_context` which starts running when `appbase::exec()` is called. If a plugin needs to perform IO or other asynchronous operations then it should dispatch it via `application` -`io_service` which is setup to use an execution priority queue. +`io_context` which is setup to use an execution priority queue. ``` app().post( appbase::priority::low, lambda ) ``` @@ -104,10 +104,10 @@ OR ``` delay_timer->async_wait( app().get_priority_queue().wrap( priority::low, lambda ) ); ``` -Use of `get_io_service()` directly is not recommended as the priority queue will not be respected. +Use of `get_io_context()` directly is not recommended as the priority queue will not be respected. -Because the app calls `io_service::run()` from within `application::exec()` and does not spawn any threads -all asynchronous operations posted to the io_service should be run in the same thread. +Because the app calls `io_context::run()` from within `application::exec()` and does not spawn any threads +all asynchronous operations posted to the io_context should be run in the same thread. ## Graceful Exit diff --git a/application_base.cpp b/application_base.cpp index 8d0292f..7abe81f 100644 --- a/application_base.cpp +++ b/application_base.cpp @@ -152,8 +152,8 @@ void application_base::wait_for_signal(std::shared_ptr }); } -void application_base::setup_signal_handling_on_ios(boost::asio::io_service& ios, bool startup) { - std::shared_ptr ss = std::make_shared(ios, SIGINT, SIGTERM); +void application_base::setup_signal_handling_on_ioc(boost::asio::io_context& io_ctx, bool startup) { + std::shared_ptr ss = std::make_shared(io_ctx, SIGINT, SIGTERM); #ifdef SIGPIPE ss->add(SIGPIPE); #endif @@ -165,15 +165,15 @@ void application_base::setup_signal_handling_on_ios(boost::asio::io_service& ios wait_for_signal(ss); } -void application_base::startup(boost::asio::io_service& io_serv) { +void application_base::startup(boost::asio::io_context& io_ctx) { //during startup, run a second thread to catch SIGINT/SIGTERM/SIGPIPE/SIGHUP - boost::asio::io_service startup_thread_ios; - setup_signal_handling_on_ios(startup_thread_ios, true); - std::thread startup_thread([&startup_thread_ios]() { - startup_thread_ios.run(); + boost::asio::io_context startup_thread_io_ctx; + setup_signal_handling_on_ioc(startup_thread_io_ctx, true); + std::thread startup_thread([&startup_thread_io_ctx]() { + startup_thread_io_ctx.run(); }); - auto clean_up_signal_thread = [&startup_thread_ios, &startup_thread]() { - startup_thread_ios.stop(); + auto clean_up_signal_thread = [&startup_thread_io_ctx, &startup_thread]() { + startup_thread_io_ctx.stop(); startup_thread.join(); }; @@ -185,16 +185,16 @@ void application_base::startup(boost::asio::io_service& io_serv) { } catch( ... ) { clean_up_signal_thread(); - shutdown(); + shutdown_plugins(); throw; } - //after startup, shut down the signal handling thread and catch the signals back on main io_service + //after startup, shut down the signal handling thread and catch the signals back on main io_context clean_up_signal_thread(); - setup_signal_handling_on_ios(io_serv, false); + setup_signal_handling_on_ioc(io_ctx, false); #ifdef SIGHUP - std::shared_ptr sighup_set(new boost::asio::signal_set(io_serv, SIGHUP)); + std::shared_ptr sighup_set(new boost::asio::signal_set(io_ctx, SIGHUP)); start_sighup_handler( sighup_set ); #endif } @@ -441,7 +441,7 @@ void application_base::handle_exception(std::exception_ptr eptr, std::string_vie } } -void application_base::shutdown() { +void application_base::shutdown_plugins() { std::exception_ptr eptr = nullptr; for(auto ritr = running_plugins.rbegin(); @@ -454,8 +454,17 @@ void application_base::shutdown() { handle_exception(std::current_exception(), (*ritr)->name()); } } - for(auto ritr = running_plugins.rbegin(); - ritr != running_plugins.rend(); ++ritr) { + + // if we caught an exception while shutting down a plugin, rethrow it so that main() + // can catch it and report the error + if (eptr) + std::rethrow_exception(eptr); +} + +void application_base::destroy_plugins() { + std::exception_ptr eptr = nullptr; + + for(auto ritr = running_plugins.rbegin(); ritr != running_plugins.rend(); ++ritr) { try { plugins.erase((*ritr)->name()); } catch(...) { @@ -474,7 +483,6 @@ void application_base::shutdown() { eptr = std::current_exception(); handle_exception(std::current_exception(), "plugin cleanup"); } - quit(); // if we caught an exception while shutting down a plugin, rethrow it so that main() // can catch it and report the error diff --git a/examples/executor_example/application.hpp b/examples/executor_example/application.hpp index ce357a7..e41647b 100644 --- a/examples/executor_example/application.hpp +++ b/examples/executor_example/application.hpp @@ -18,11 +18,13 @@ #include #include +#include + class my_executor { public: template auto post( int priority, Func&& func ) { - return boost::asio::post(io_serv, pri_queue.wrap(priority, --order, std::forward(func))); + return boost::asio::post(io_ctx, pri_queue.wrap(priority, --order, std::forward(func))); } auto& get_priority_queue() { return pri_queue; } @@ -31,11 +33,11 @@ class my_executor { void clear() { pri_queue.clear(); } - boost::asio::io_service& get_io_service() { return io_serv; } + boost::asio::io_context& get_io_context() { return io_ctx; } private: // members are ordered taking into account that the last one is destructed first - boost::asio::io_service io_serv; + boost::asio::io_context io_ctx; appbase::execution_priority_queue pri_queue; std::size_t order = std::numeric_limits::max(); // to maintain FIFO ordering in queue within priority }; diff --git a/include/appbase/application_base.hpp b/include/appbase/application_base.hpp index e837806..59471d2 100644 --- a/include/appbase/application_base.hpp +++ b/include/appbase/application_base.hpp @@ -114,8 +114,7 @@ class application_base { return initialize_impl(argc, argv, {find_plugin()...}, initialize_logging); } - void startup(boost::asio::io_service& io_serv); - void shutdown(); + void startup(boost::asio::io_context& io_ctx); /** * Wait until quit(), SIGINT or SIGTERM and then shutdown. @@ -125,29 +124,51 @@ class application_base { void exec(Executor& exec) { std::exception_ptr eptr = nullptr; { - auto& io_serv{exec.get_io_service()}; - boost::asio::io_service::work work(io_serv); + auto& io_ctx{exec.get_io_context()}; + boost::asio::executor_work_guard work = boost::asio::make_work_guard(io_ctx); (void)work; bool more = true; - while (more || io_serv.run_one()) { - if (is_quiting()) - break; + while (more || io_ctx.run_one()) { try { - io_serv.poll(); // queue up any ready; allowing high priority item to get into the queue + io_ctx.poll(); // queue up any ready; allowing high priority item to get into the queue + if (io_ctx.stopped()) + break; // execute the highest priority item more = exec.execute_highest(); } catch (...) { - more = true; // so we exit the while loop without calling io_serv.run_one() + more = true; // so we exit the while loop without calling io_ctx.run_one() quit(); eptr = std::current_exception(); handle_exception(eptr, "application loop"); } } + quit(); + + try { + shutdown_plugins(); // may rethrow exceptions + } catch (...) { + if (!eptr) + eptr = std::current_exception(); + } + + work.reset(); + try { - exec.clear(); // make sure the queue is empty - shutdown(); // may rethrow exceptions + // plugins shutdown down at this point, + + // Drain the io_context of anything that could be referencing plugins. + // Note this does not call exec.execute_highest(), so only drains into the priority queue assuming nothing + // has hijacked the io_context for other purposes. + io_ctx.restart(); + while (io_ctx.poll()) + ; + // clear priority queue of anything pushed by poll() + exec.clear(); + + // destroy the plugins now that all lambda that reference them have been destroyed + destroy_plugins(); } catch (...) { if (!eptr) eptr = std::current_exception(); @@ -290,6 +311,9 @@ class application_base { } ///@} + void shutdown_plugins(); + void destroy_plugins(); + application_base(std::shared_ptr&& e); ///< protected because application is a singleton that should be accessed via instance() /// !!! must be dtor'ed after plugins @@ -318,7 +342,7 @@ class application_base { void print_default_config(std::ostream& os); void wait_for_signal(std::shared_ptr ss); - void setup_signal_handling_on_ios(boost::asio::io_service& ios, bool startup); + void setup_signal_handling_on_ioc(boost::asio::io_context& io_ctx, bool startup); void handle_exception(std::exception_ptr eptr, std::string_view origin); }; @@ -343,12 +367,12 @@ class application_t : public application_base { } /** - * Post func to run on io_service with given priority. + * Post func to run on io_context with given priority. * * -- deprecated: use app().executor().post() * * @param priority can be appbase::priority::* constants or any int, larger ints run first - * @param func function to run on io_service + * @param func function to run on io_context * @return result of boost::asio::post */ template @@ -365,20 +389,36 @@ class application_t : public application_base { } /** - * Anything posted directly on this io_service is run at the highest of priority as it by-passes the + * Anything posted directly on this io_context is run at the highest of priority as it by-passes the * priority queue and is run immediately in exec(). Use with care and consider using app().executor().post() instead. * @return */ - boost::asio::io_service& get_io_service() { - return executor().get_io_service(); + boost::asio::io_context& get_io_context() { + return executor().get_io_context(); + } + + /** + * Create a timer with the main application io_context. Timer async_wait will execute on the main thread. + * + * Use with app().executor().wrap(priority::x, exec_queue::x, [](const boost::system::error_code& ec). + * For example: + * _timer.async_wait(app().executor().wrap(priority::high, exec_queue::read_write, + * [](const boost::system::error_code& ec) { + * if (!ec) + * // do something + * })); + */ + template + auto make_timer() { + return Timer{get_io_context()}; } void startup() { - application_base::startup(get_io_service()); + application_base::startup(get_io_context()); } application_t() : application_base(std::make_shared()) { - set_stop_executor_cb([&]() { get_io_service().stop(); }); + set_stop_executor_cb([&]() { get_io_context().stop(); }); set_post_cb([&](int prio, std::function cb) { executor().post(prio, std::move(cb)); }); } diff --git a/include/appbase/default_executor.hpp b/include/appbase/default_executor.hpp index f437de7..119c56c 100644 --- a/include/appbase/default_executor.hpp +++ b/include/appbase/default_executor.hpp @@ -3,13 +3,15 @@ #include #include +#include + namespace appbase { class default_executor { public: template auto post(int priority, Func&& func) { - return boost::asio::post(io_serv, pri_queue.wrap(priority, --order, std::forward(func))); + return boost::asio::post(io_ctx, pri_queue.wrap(priority, --order, std::forward(func))); } /** @@ -17,7 +19,7 @@ class default_executor { * prioritized execution. * * Example: - * boost::asio::steady_timer timer( app().get_io_service() ); + * boost::asio::steady_timer timer( app().get_io_context() ); * timer.async_wait( app().get_priority_queue().wrap(priority::low, [](){ do_something(); }) ); */ auto& get_priority_queue() { @@ -33,16 +35,16 @@ class default_executor { } /** - * Do not run io_service in any other threads, as application assumes single-threaded execution in exec(). - * @return io_serivice of application + * Do not run io_context in any other threads, as application assumes single-threaded execution in exec(). + * @return io_context of application */ - boost::asio::io_service& get_io_service() { - return io_serv; + boost::asio::io_context& get_io_context() { + return io_ctx; } private: // members are ordered taking into account that the last one is destructed first - boost::asio::io_service io_serv; + boost::asio::io_context io_ctx; execution_priority_queue pri_queue; std::size_t order = std::numeric_limits::max(); // to maintain FIFO ordering in queue within priority }; diff --git a/tests/shutdown_test.cpp b/tests/shutdown_test.cpp new file mode 100644 index 0000000..93698f9 --- /dev/null +++ b/tests/shutdown_test.cpp @@ -0,0 +1,79 @@ +#include +#include + +#include + +namespace bpo = boost::program_options; +using bpo::options_description; +using bpo::variables_map; + +static bool thing = true; +struct thing_better_be_alive { + ~thing_better_be_alive() noexcept(false) { + if(!thing) + throw "BOOM"; + } +}; + +class thready_plugin : public appbase::plugin { + public: + + template + void plugin_requires(Lambda&& l) {} + + void set_program_options( options_description& cli, options_description& cfg ) override {} + + void thread_work() { + boost::asio::post(ctx, [&]() { + thing_better_be_alive better_be; + boost::asio::post(appbase::app().get_io_context(), [&,is_it=std::move(better_be)]() { + thread_work(); + }); + }); + } + + void plugin_initialize( const variables_map& options ) {} + void plugin_startup() { + for(unsigned i = 0; i < 48*4; i++) + thread_work(); + + for(unsigned i = 0; i < 48; ++i) + threads.emplace_back([this]() { + ctx.run(); + }); + } + void plugin_shutdown() { + usleep(100000); //oh gee it takes a while to stop + ctx.stop(); + for(unsigned i = 0; i < 48; ++i) + threads[i].join(); + } + + ~thready_plugin() { + thing = false; + } + + boost::asio::io_context ctx; + boost::asio::executor_work_guard wg = boost::asio::make_work_guard(ctx); + + private: + std::vector threads; +}; + + +BOOST_AUTO_TEST_CASE(test_shutdown) +{ + appbase::application::register_plugin(); + appbase::scoped_app app; + + const char* argv[] = { "nodoes" }; + if( !app->initialize( 1, const_cast(argv) ) ) + return; + app->startup(); + boost::asio::post(appbase::app().get_io_context(), [&](){ + std::this_thread::sleep_for(std::chrono::milliseconds(5)); + app->quit(); + }); + app->exec(); + +}