Skip to content

Commit

Permalink
Merge pull request #34 from AntelopeIO/clean-shutdown
Browse files Browse the repository at this point in the history
Drain io_context after shutdown of plugins
  • Loading branch information
heifner authored Oct 21, 2024
2 parents c1f92a5 + 24d85e1 commit 1309099
Show file tree
Hide file tree
Showing 6 changed files with 182 additions and 51 deletions.
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,20 +94,20 @@ Plugins can be registered by calling `appbase::application::register_plugin()`.
### Boost ASIO

AppBase maintains a singleton `application` instance which can be accessed via `appbase::app()`. This
application owns a `boost::asio::io_service` which starts running when `appbase::exec()` is called. If
application owns a `boost::asio::io_context` which starts running when `appbase::exec()` is called. If
a plugin needs to perform IO or other asynchronous operations then it should dispatch it via `application`
`io_service` which is setup to use an execution priority queue.
`io_context` which is setup to use an execution priority queue.
```
app().post( appbase::priority::low, lambda )
```
OR
```
delay_timer->async_wait( app().get_priority_queue().wrap( priority::low, lambda ) );
```
Use of `get_io_service()` directly is not recommended as the priority queue will not be respected.
Use of `get_io_context()` directly is not recommended as the priority queue will not be respected.

Because the app calls `io_service::run()` from within `application::exec()` and does not spawn any threads
all asynchronous operations posted to the io_service should be run in the same thread.
Because the app calls `io_context::run()` from within `application::exec()` and does not spawn any threads
all asynchronous operations posted to the io_context should be run in the same thread.

## Graceful Exit

Expand Down
42 changes: 25 additions & 17 deletions application_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,8 @@ void application_base::wait_for_signal(std::shared_ptr<boost::asio::signal_set>
});
}

void application_base::setup_signal_handling_on_ios(boost::asio::io_service& ios, bool startup) {
std::shared_ptr<boost::asio::signal_set> ss = std::make_shared<boost::asio::signal_set>(ios, SIGINT, SIGTERM);
void application_base::setup_signal_handling_on_ioc(boost::asio::io_context& io_ctx, bool startup) {
std::shared_ptr<boost::asio::signal_set> ss = std::make_shared<boost::asio::signal_set>(io_ctx, SIGINT, SIGTERM);
#ifdef SIGPIPE
ss->add(SIGPIPE);
#endif
Expand All @@ -165,15 +165,15 @@ void application_base::setup_signal_handling_on_ios(boost::asio::io_service& ios
wait_for_signal(ss);
}

void application_base::startup(boost::asio::io_service& io_serv) {
void application_base::startup(boost::asio::io_context& io_ctx) {
//during startup, run a second thread to catch SIGINT/SIGTERM/SIGPIPE/SIGHUP
boost::asio::io_service startup_thread_ios;
setup_signal_handling_on_ios(startup_thread_ios, true);
std::thread startup_thread([&startup_thread_ios]() {
startup_thread_ios.run();
boost::asio::io_context startup_thread_io_ctx;
setup_signal_handling_on_ioc(startup_thread_io_ctx, true);
std::thread startup_thread([&startup_thread_io_ctx]() {
startup_thread_io_ctx.run();
});
auto clean_up_signal_thread = [&startup_thread_ios, &startup_thread]() {
startup_thread_ios.stop();
auto clean_up_signal_thread = [&startup_thread_io_ctx, &startup_thread]() {
startup_thread_io_ctx.stop();
startup_thread.join();
};

Expand All @@ -185,16 +185,16 @@ void application_base::startup(boost::asio::io_service& io_serv) {

} catch( ... ) {
clean_up_signal_thread();
shutdown();
shutdown_plugins();
throw;
}

//after startup, shut down the signal handling thread and catch the signals back on main io_service
//after startup, shut down the signal handling thread and catch the signals back on main io_context
clean_up_signal_thread();
setup_signal_handling_on_ios(io_serv, false);
setup_signal_handling_on_ioc(io_ctx, false);

#ifdef SIGHUP
std::shared_ptr<boost::asio::signal_set> sighup_set(new boost::asio::signal_set(io_serv, SIGHUP));
std::shared_ptr<boost::asio::signal_set> sighup_set(new boost::asio::signal_set(io_ctx, SIGHUP));
start_sighup_handler( sighup_set );
#endif
}
Expand Down Expand Up @@ -441,7 +441,7 @@ void application_base::handle_exception(std::exception_ptr eptr, std::string_vie
}
}

void application_base::shutdown() {
void application_base::shutdown_plugins() {
std::exception_ptr eptr = nullptr;

for(auto ritr = running_plugins.rbegin();
Expand All @@ -454,8 +454,17 @@ void application_base::shutdown() {
handle_exception(std::current_exception(), (*ritr)->name());
}
}
for(auto ritr = running_plugins.rbegin();
ritr != running_plugins.rend(); ++ritr) {

// if we caught an exception while shutting down a plugin, rethrow it so that main()
// can catch it and report the error
if (eptr)
std::rethrow_exception(eptr);
}

void application_base::destroy_plugins() {
std::exception_ptr eptr = nullptr;

for(auto ritr = running_plugins.rbegin(); ritr != running_plugins.rend(); ++ritr) {
try {
plugins.erase((*ritr)->name());
} catch(...) {
Expand All @@ -474,7 +483,6 @@ void application_base::shutdown() {
eptr = std::current_exception();
handle_exception(std::current_exception(), "plugin cleanup");
}
quit();

// if we caught an exception while shutting down a plugin, rethrow it so that main()
// can catch it and report the error
Expand Down
8 changes: 5 additions & 3 deletions examples/executor_example/application.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
#include <appbase/application_base.hpp>
#include <appbase/execution_priority_queue.hpp>

#include <limits>

class my_executor {
public:
template <typename Func>
auto post( int priority, Func&& func ) {
return boost::asio::post(io_serv, pri_queue.wrap(priority, --order, std::forward<Func>(func)));
return boost::asio::post(io_ctx, pri_queue.wrap(priority, --order, std::forward<Func>(func)));
}

auto& get_priority_queue() { return pri_queue; }
Expand All @@ -31,11 +33,11 @@ class my_executor {

void clear() { pri_queue.clear(); }

boost::asio::io_service& get_io_service() { return io_serv; }
boost::asio::io_context& get_io_context() { return io_ctx; }

private:
// members are ordered taking into account that the last one is destructed first
boost::asio::io_service io_serv;
boost::asio::io_context io_ctx;
appbase::execution_priority_queue pri_queue;
std::size_t order = std::numeric_limits<size_t>::max(); // to maintain FIFO ordering in queue within priority
};
Expand Down
78 changes: 59 additions & 19 deletions include/appbase/application_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,7 @@ class application_base {
return initialize_impl(argc, argv, {find_plugin<Plugin>()...}, initialize_logging);
}

void startup(boost::asio::io_service& io_serv);
void shutdown();
void startup(boost::asio::io_context& io_ctx);

/**
* Wait until quit(), SIGINT or SIGTERM and then shutdown.
Expand All @@ -125,29 +124,51 @@ class application_base {
void exec(Executor& exec) {
std::exception_ptr eptr = nullptr;
{
auto& io_serv{exec.get_io_service()};
boost::asio::io_service::work work(io_serv);
auto& io_ctx{exec.get_io_context()};
boost::asio::executor_work_guard<boost::asio::io_context::executor_type> work = boost::asio::make_work_guard(io_ctx);
(void)work;
bool more = true;

while (more || io_serv.run_one()) {
if (is_quiting())
break;
while (more || io_ctx.run_one()) {
try {
io_serv.poll(); // queue up any ready; allowing high priority item to get into the queue
io_ctx.poll(); // queue up any ready; allowing high priority item to get into the queue
if (io_ctx.stopped())
break;
// execute the highest priority item
more = exec.execute_highest();
} catch (...) {
more = true; // so we exit the while loop without calling io_serv.run_one()
more = true; // so we exit the while loop without calling io_ctx.run_one()
quit();
eptr = std::current_exception();
handle_exception(eptr, "application loop");
}
}

quit();

try {
shutdown_plugins(); // may rethrow exceptions
} catch (...) {
if (!eptr)
eptr = std::current_exception();
}

work.reset();

try {
exec.clear(); // make sure the queue is empty
shutdown(); // may rethrow exceptions
// plugins shutdown down at this point,

// Drain the io_context of anything that could be referencing plugins.
// Note this does not call exec.execute_highest(), so only drains into the priority queue assuming nothing
// has hijacked the io_context for other purposes.
io_ctx.restart();
while (io_ctx.poll())
;
// clear priority queue of anything pushed by poll()
exec.clear();

// destroy the plugins now that all lambda that reference them have been destroyed
destroy_plugins();
} catch (...) {
if (!eptr)
eptr = std::current_exception();
Expand Down Expand Up @@ -290,6 +311,9 @@ class application_base {
}
///@}

void shutdown_plugins();
void destroy_plugins();

application_base(std::shared_ptr<void>&& e); ///< protected because application is a singleton that should be accessed via instance()

/// !!! must be dtor'ed after plugins
Expand Down Expand Up @@ -318,7 +342,7 @@ class application_base {
void print_default_config(std::ostream& os);

void wait_for_signal(std::shared_ptr<boost::asio::signal_set> ss);
void setup_signal_handling_on_ios(boost::asio::io_service& ios, bool startup);
void setup_signal_handling_on_ioc(boost::asio::io_context& io_ctx, bool startup);

void handle_exception(std::exception_ptr eptr, std::string_view origin);
};
Expand All @@ -343,12 +367,12 @@ class application_t : public application_base {
}

/**
* Post func to run on io_service with given priority.
* Post func to run on io_context with given priority.
*
* -- deprecated: use app().executor().post()
*
* @param priority can be appbase::priority::* constants or any int, larger ints run first
* @param func function to run on io_service
* @param func function to run on io_context
* @return result of boost::asio::post
*/
template <typename Func>
Expand All @@ -365,20 +389,36 @@ class application_t : public application_base {
}

/**
* Anything posted directly on this io_service is run at the highest of priority as it by-passes the
* Anything posted directly on this io_context is run at the highest of priority as it by-passes the
* priority queue and is run immediately in exec(). Use with care and consider using app().executor().post() instead.
* @return
*/
boost::asio::io_service& get_io_service() {
return executor().get_io_service();
boost::asio::io_context& get_io_context() {
return executor().get_io_context();
}

/**
* Create a timer with the main application io_context. Timer async_wait will execute on the main thread.
*
* Use with app().executor().wrap(priority::x, exec_queue::x, [](const boost::system::error_code& ec).
* For example:
* _timer.async_wait(app().executor().wrap(priority::high, exec_queue::read_write,
* [](const boost::system::error_code& ec) {
* if (!ec)
* // do something
* }));
*/
template<typename Timer>
auto make_timer() {
return Timer{get_io_context()};
}

void startup() {
application_base::startup(get_io_service());
application_base::startup(get_io_context());
}

application_t() : application_base(std::make_shared<executor_t>()) {
set_stop_executor_cb([&]() { get_io_service().stop(); });
set_stop_executor_cb([&]() { get_io_context().stop(); });
set_post_cb([&](int prio, std::function<void()> cb) { executor().post(prio, std::move(cb)); });
}

Expand Down
16 changes: 9 additions & 7 deletions include/appbase/default_executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,23 @@
#include <appbase/application_base.hpp>
#include <appbase/execution_priority_queue.hpp>

#include <limits>

namespace appbase {

class default_executor {
public:
template <typename Func>
auto post(int priority, Func&& func) {
return boost::asio::post(io_serv, pri_queue.wrap(priority, --order, std::forward<Func>(func)));
return boost::asio::post(io_ctx, pri_queue.wrap(priority, --order, std::forward<Func>(func)));
}

/**
* Provide access to execution priority queue so it can be used to wrap functions for
* prioritized execution.
*
* Example:
* boost::asio::steady_timer timer( app().get_io_service() );
* boost::asio::steady_timer timer( app().get_io_context() );
* timer.async_wait( app().get_priority_queue().wrap(priority::low, [](){ do_something(); }) );
*/
auto& get_priority_queue() {
Expand All @@ -33,16 +35,16 @@ class default_executor {
}

/**
* Do not run io_service in any other threads, as application assumes single-threaded execution in exec().
* @return io_serivice of application
* Do not run io_context in any other threads, as application assumes single-threaded execution in exec().
* @return io_context of application
*/
boost::asio::io_service& get_io_service() {
return io_serv;
boost::asio::io_context& get_io_context() {
return io_ctx;
}

private:
// members are ordered taking into account that the last one is destructed first
boost::asio::io_service io_serv;
boost::asio::io_context io_ctx;
execution_priority_queue pri_queue;
std::size_t order = std::numeric_limits<size_t>::max(); // to maintain FIFO ordering in queue within priority
};
Expand Down
Loading

0 comments on commit 1309099

Please sign in to comment.