Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create socket with provided executor #909

Merged
merged 11 commits into from
Oct 8, 2024
47 changes: 37 additions & 10 deletions libraries/libfc/include/fc/network/listener.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,30 +58,47 @@ struct listener_base<boost::asio::local::stream_protocol> {
/// @note Users should use fc::create_listener() instead, this class is the implementation
/// detail for fc::create_listener().
///
/// @tparam Protocol either \c boost::asio::ip::tcp or \c boost::asio::local::stream_protocol
/// @tparam Executor The executor for the acceptor and acceptor timer
/// @tparam SocketExecutorFn Function that returns executor to be used for the newly accepted socket:
/// ExecutorType(const endpoint_type&)
/// @tparam CreateSession Callback Function to call after each accepted connection, provides newly created socket
/// and the Executor of the newly accepted socket: void(Socket&& socket, ExecutorType& ex)
/// Note that the ExecutorType provided by SocketExecutorFn can be retrieved from the socket
/// via socket.get_executor().
///
/////////////////////////////////////////////////////////////////////////////////////////////
template <typename Protocol, typename Executor, typename CreateSession>
struct listener : listener_base<Protocol>, std::enable_shared_from_this<listener<Protocol, Executor, CreateSession>> {
template <typename Protocol, typename Executor, typename SocketExecutorFn, typename CreateSession>
greg7mdp marked this conversation as resolved.
Show resolved Hide resolved
struct listener : listener_base<Protocol>, std::enable_shared_from_this<listener<Protocol, Executor, SocketExecutorFn, CreateSession>> {
private:
typename Protocol::acceptor acceptor_;
boost::asio::deadline_timer accept_error_timer_;
boost::posix_time::time_duration accept_timeout_;
logger& logger_;
std::string extra_listening_log_info_;
SocketExecutorFn socket_executor_fn_;
CreateSession create_session_;

public:
using endpoint_type = typename Protocol::endpoint;
listener(Executor& executor, logger& logger, boost::posix_time::time_duration accept_timeout,
const std::string& local_address, const endpoint_type& endpoint,
const std::string& extra_listening_log_info, const CreateSession& create_session)
const std::string& extra_listening_log_info,
SocketExecutorFn socket_executor_fn, CreateSession create_session)
: listener_base<Protocol>(local_address), acceptor_(executor, endpoint), accept_error_timer_(executor),
accept_timeout_(accept_timeout), logger_(logger), extra_listening_log_info_(extra_listening_log_info),
create_session_(create_session) {}
socket_executor_fn_(std::move(socket_executor_fn)), create_session_(std::move(create_session)) {}

const auto& acceptor() const { return acceptor_; }

void do_accept() {
acceptor_.async_accept([self = this->shared_from_this()](boost::system::error_code ec, auto&& peer_socket) {
boost::system::error_code ec;
auto ep = acceptor_.local_endpoint(ec);
if (ec) {
fc_wlog(logger_, "Unable to retrieve local_endpoint of acceptor, error: ${e}", ("e", ec.message()));
}
acceptor_.async_accept(socket_executor_fn_(ep),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the purpose for the changes here and for passing the local endpoint as a parameter to the socket_executor_fn_? (this local endpoint parameter is not used by any of ship/http/p2p)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought it might be useful. If someone was wanting to choose which executor according to which endpoint. I'm happy to remove it since we don't have a current use-case.

[self = this->shared_from_this()](boost::system::error_code ec, auto&& peer_socket) {
self->on_accept(ec, std::forward<decltype(peer_socket)>(peer_socket));
});
}
Expand Down Expand Up @@ -152,16 +169,26 @@ struct listener : listener_base<Protocol>, std::enable_shared_from_this<listener
/// The lifetime of the created listener objects is controlled by \c executor, the created objects will be destroyed
/// when \c executor.stop() is called.
///
/// Each socket is created with the executor returned by SocketExecutorFn.
///
/// @note
/// This function is not thread safe for Unix socket because it will temporarily change working directory without any
/// lock. Any code which depends the current working directory (such as opening files with relative paths) in other
/// threads should be protected.
///
/// @tparam Protocol either \c boost::asio::ip::tcp or \c boost::asio::local::stream_protocol
/// @tparam Executor The executor for the acceptor and acceptor timer
/// @tparam SocketExecutorFn Function that returns executor to be used for the newly accepted socket:
/// ExecutorType(const endpoint_type&)
/// @tparam CreateSession Callback Function to call after each accepted connection, provides newly created socket
/// and the Executor of the newly accepted socket: void(Socket&& socket, ExecutorType& ex)
/// Note that the ExecutorType provided by SocketExecutorFn can be retrieved from the socket
/// via socket.get_executor().
/// @throws std::system_error or boost::system::system_error
template <typename Protocol, typename Executor, typename CreateSession>
template <typename Protocol, typename Executor, typename SocketExecutorFn, typename CreateSession>
greg7mdp marked this conversation as resolved.
Show resolved Hide resolved
void create_listener(Executor& executor, logger& logger, boost::posix_time::time_duration accept_timeout,
const std::string& address, const std::string& extra_listening_log_info,
const SocketExecutorFn& socket_executor_fn,
const CreateSession& create_session) {
using tcp = boost::asio::ip::tcp;
if constexpr (std::is_same_v<Protocol, tcp>) {
Expand All @@ -186,8 +213,8 @@ void create_listener(Executor& executor, logger& logger, boost::posix_time::time
auto create_listener = [&](const auto& endpoint) {
const auto& ip_addr = endpoint.address();
try {
auto listener = std::make_shared<fc::listener<Protocol, Executor, CreateSession>>(
executor, logger, accept_timeout, address, endpoint, extra_listening_log_info, create_session);
auto listener = std::make_shared<fc::listener<Protocol, Executor, SocketExecutorFn, CreateSession>>(
executor, logger, accept_timeout, address, endpoint, extra_listening_log_info, socket_executor_fn, create_session);
listener->log_listening(endpoint, address);
listener->do_accept();
++listened;
Expand Down Expand Up @@ -256,8 +283,8 @@ void create_listener(Executor& executor, logger& logger, boost::posix_time::time
fs::remove(sock_path);
}

auto listener = std::make_shared<fc::listener<stream_protocol, Executor, CreateSession>>(
executor, logger, accept_timeout, address, endpoint, extra_listening_log_info, create_session);
auto listener = std::make_shared<fc::listener<stream_protocol, Executor, SocketExecutorFn, CreateSession>>(
executor, logger, accept_timeout, address, endpoint, extra_listening_log_info, socket_executor_fn, create_session);
listener->log_listening(endpoint, address);
listener->do_accept();
}
Expand Down
4 changes: 3 additions & 1 deletion plugins/http_plugin/http_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,9 @@ namespace eosio {
};

fc::create_listener<Protocol>(plugin_state->thread_pool.get_executor(), logger(), accept_timeout, address,
extra_listening_log_info, create_session);
extra_listening_log_info,
[this](const auto&) -> boost::asio::io_context& { return plugin_state->thread_pool.get_executor(); },
create_session);
}

void create_beast_server(const std::string& address, api_category_set categories) {
Expand Down
41 changes: 23 additions & 18 deletions plugins/net_plugin/net_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -829,8 +829,8 @@ namespace eosio {
std::atomic<boost::asio::ip::port_type> remote_endpoint_port{0};

public:
boost::asio::io_context::strand strand;
std::shared_ptr<tcp::socket> socket; // only accessed through strand after construction
boost::asio::strand<tcp::socket::executor_type> strand;
std::shared_ptr<tcp::socket> socket; // only accessed through strand after construction

fc::message_buffer<1024*1024> pending_message_buffer;
std::size_t outstanding_read_bytes{0}; // accessed only from strand threads
Expand Down Expand Up @@ -1176,8 +1176,8 @@ namespace eosio {

connection::connection( const string& endpoint, const string& listen_address )
: peer_addr( endpoint ),
strand( my_impl->thread_pool.get_executor() ),
socket( new tcp::socket( my_impl->thread_pool.get_executor() ) ),
strand( boost::asio::make_strand(my_impl->thread_pool.get_executor()) ),
socket( new tcp::socket( strand ) ),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that the socket is always being created with a strand (both here and via the listener), that should mean that all the bind_executors to the strand can be removed. Not sure you want to tackle that now or not (going in to 1.0 probably not, but going in to main maybe)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left it mainly because it was targeting 1.0. At this point, maybe it would be cleaner. In many places you would have boost::asio::post(socket.get_executor(), [](){}) instead which is not really any difference I don't think.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You would have to move the timers to using the strand too (afaict not a problem) but after that it does seem like all 5 bind_executors could be removed. I am not seeing the need for boost::asio::post(socket.get_executor(), [](){}) in those same spots. Anyways, it's not something that needs to be done now just a clean up I noticed since we're constructing these more idiomaticly now.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, you were talking specifically about the bind_executor. I was thinking of the boost::asio::post(strand,.

listen_address( listen_address ),
log_p2p_address( endpoint ),
connection_id( ++my_impl->current_connection_id ),
Expand All @@ -1194,7 +1194,7 @@ namespace eosio {
connection::connection(tcp::socket&& s, const string& listen_address, size_t block_sync_rate_limit)
: peer_addr(),
block_sync_rate_limit(block_sync_rate_limit),
strand( my_impl->thread_pool.get_executor() ),
strand( s.get_executor() ),
socket( new tcp::socket( std::move(s) ) ),
listen_address( listen_address ),
connection_id( ++my_impl->current_connection_id ),
Expand Down Expand Up @@ -1353,7 +1353,7 @@ namespace eosio {

void connection::close( bool reconnect, bool shutdown ) {
set_state(connection_state::closing);
strand.post( [self = shared_from_this(), reconnect, shutdown]() {
boost::asio::post(strand, [self = shared_from_this(), reconnect, shutdown]() {
self->_close( reconnect, shutdown );
});
}
Expand Down Expand Up @@ -1472,7 +1472,7 @@ namespace eosio {
void connection::send_handshake() {
if (closed())
return;
strand.post( [c = shared_from_this()]() {
boost::asio::post(strand, [c = shared_from_this()]() {
fc::unique_lock g_conn( c->conn_mtx );
if( c->populate_handshake( c->last_handshake_sent ) ) {
static_assert( std::is_same_v<decltype( c->sent_handshake_count ), int16_t>, "INT16_MAX based on int16_t" );
Expand Down Expand Up @@ -1565,7 +1565,7 @@ namespace eosio {
std::vector<boost::asio::const_buffer> bufs;
buffer_queue.fill_out_buffer( bufs );

strand.post( [c{std::move(c)}, bufs{std::move(bufs)}]() {
boost::asio::post(strand, [c{std::move(c)}, bufs{std::move(bufs)}]() {
boost::asio::async_write( *c->socket, bufs,
boost::asio::bind_executor( c->strand, [c, socket=c->socket]( boost::system::error_code ec, std::size_t w ) {
try {
Expand Down Expand Up @@ -2073,7 +2073,7 @@ namespace eosio {
sync_source = new_sync_source;
request_sent = true;
sync_active_time = std::chrono::steady_clock::now();
new_sync_source->strand.post( [new_sync_source, start, end, fork_head_num=chain_info.fork_head_num, lib=chain_info.lib_num]() {
boost::asio::post(new_sync_source->strand, [new_sync_source, start, end, fork_head_num=chain_info.fork_head_num, lib=chain_info.lib_num]() {
peer_ilog( new_sync_source, "requesting range ${s} to ${e}, fhead ${h}, lib ${lib}", ("s", start)("e", end)("h", fork_head_num)("lib", lib) );
new_sync_source->request_sync_blocks( start, end );
} );
Expand Down Expand Up @@ -2669,7 +2669,7 @@ namespace eosio {

send_buffer_type sb = buff_factory.get_send_buffer( b );

cp->strand.post( [cp, bnum, sb{std::move(sb)}]() {
boost::asio::post(cp->strand, [cp, bnum, sb{std::move(sb)}]() {
cp->latest_blk_time = std::chrono::steady_clock::now();
bool has_block = cp->peer_lib_num >= bnum;
if( !has_block ) {
Expand All @@ -2687,7 +2687,7 @@ namespace eosio {
my_impl->connections.for_each_block_connection( [exclude_peer, msg{std::move(msg)}]( auto& cp ) {
if( !cp->current() ) return true;
if( cp->connection_id == exclude_peer ) return true;
cp->strand.post( [cp, msg]() {
boost::asio::post(cp->strand, [cp, msg]() {
if (cp->protocol_version >= proto_savanna) {
if (vote_logger.is_enabled(fc::log_level::debug))
peer_dlog(cp, "sending vote msg");
Expand Down Expand Up @@ -2716,7 +2716,7 @@ namespace eosio {

send_buffer_type sb = buff_factory.get_send_buffer( trx );
fc_dlog( logger, "sending trx: ${id}, to connection - ${cid}", ("id", trx->id())("cid", cp->connection_id) );
cp->strand.post( [cp, sb{std::move(sb)}]() {
boost::asio::post(cp->strand, [cp, sb{std::move(sb)}]() {
cp->enqueue_buffer( sb, no_reason );
} );
} );
Expand Down Expand Up @@ -2764,7 +2764,7 @@ namespace eosio {
}
}
connection_ptr c = shared_from_this();
strand.post([c]() {
boost::asio::post(strand, [c]() {
my_impl->connections.connect(c);
});
return true;
Expand Down Expand Up @@ -2841,7 +2841,7 @@ namespace eosio {
});

connection_ptr new_connection = std::make_shared<connection>(std::move(socket), listen_address, limit);
new_connection->strand.post([new_connection, this]() {
boost::asio::post(new_connection->strand, [new_connection, this]() {
if (new_connection->start_session()) {
connections.add(new_connection);
}
Expand Down Expand Up @@ -3728,7 +3728,7 @@ namespace eosio {
// may have come in on a different connection and posted into dispatcher strand before this one
if( block_header::num_from_id(id) <= lib_num || my_impl->dispatcher.have_block( id ) || cc.block_exists( id ) ) { // thread-safe
my_impl->dispatcher.add_peer_block( id, c->connection_id );
c->strand.post( [c, id, ptr{std::move(ptr)}]() {
boost::asio::post(c->strand, [c, id, ptr{std::move(ptr)}]() {
const fc::microseconds age(fc::time_point::now() - ptr->timestamp);
my_impl->sync_master->sync_recv_block( c, id, block_header::num_from_id(id), age );
});
Expand Down Expand Up @@ -3775,7 +3775,7 @@ namespace eosio {
fc_dlog(logger, "unlinkable_block ${bn} : ${id}, previous ${pn} : ${pid}",
("bn", ptr->block_num())("id", id)("pn", block_header::num_from_id(ptr->previous))("pid", ptr->previous));
}
c->strand.post( [c, id, blk_num=ptr->block_num(), close_mode]() {
boost::asio::post(c->strand, [c, id, blk_num=ptr->block_num(), close_mode]() {
my_impl->sync_master->rejected_block( c, blk_num, close_mode );
my_impl->dispatcher.rejected_block( id );
});
Expand Down Expand Up @@ -3847,7 +3847,7 @@ namespace eosio {
auto current_time = std::chrono::steady_clock::now();
my->connections.for_each_connection( [current_time]( const connection_ptr& c ) {
if( c->socket_is_open() ) {
c->strand.post([c, current_time]() {
boost::asio::post(c->strand, [c, current_time]() {
c->check_heartbeat(current_time);
} );
}
Expand Down Expand Up @@ -4400,7 +4400,12 @@ namespace eosio {

fc::create_listener<tcp>(
my->thread_pool.get_executor(), logger, accept_timeout, listen_addr, extra_listening_log_info,
[my = my, addr = p2p_addr, block_sync_rate_limit = block_sync_rate_limit](tcp::socket&& socket) { fc_dlog( logger, "start listening on ${addr} with peer sync throttle ${limit}", ("addr", addr)("limit", block_sync_rate_limit)); my->create_session(std::move(socket), addr, block_sync_rate_limit); });
[my = my](const auto&) { return boost::asio::make_strand(my->thread_pool.get_executor()); },
[my = my, addr = p2p_addr, block_sync_rate_limit = block_sync_rate_limit](tcp::socket&& socket) {
fc_dlog( logger, "start listening on ${addr} with peer sync throttle ${limit}",
("addr", addr)("limit", block_sync_rate_limit));
my->create_session(std::move(socket), addr, block_sync_rate_limit);
});
} catch (const plugin_config_exception& e) {
fc_elog( logger, "${msg}", ("msg", e.top_message()));
app().quit();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,17 @@ class session_base {
virtual ~session_base() = default;
};

template<typename SocketType, typename Executor, typename GetBlockID, typename GetBlock, typename OnDone>
template<typename SocketType, typename GetBlockID, typename GetBlock, typename OnDone>
requires std::is_same_v<SocketType, boost::asio::ip::tcp::socket> || std::is_same_v<SocketType, boost::asio::local::stream_protocol::socket>
class session final : public session_base {
using coro_throwing_stream = boost::asio::use_awaitable_t<>::as_default_on_t<boost::beast::websocket::stream<SocketType>>;
using coro_nonthrowing_steadytimer = boost::asio::as_tuple_t<boost::asio::use_awaitable_t<>>::as_default_on_t<boost::asio::steady_timer>;

public:
session(SocketType&& s, Executor&& st, chain::controller& controller,
std::optional<log_catalog>& trace_log, std::optional<log_catalog>& chain_state_log, std::optional<log_catalog>& finality_data_log,
GetBlockID&& get_block_id, GetBlock&& get_block, OnDone&& on_done, fc::logger& logger) :
strand(std::move(st)), stream(std::move(s)), wake_timer(strand), controller(controller),
session(SocketType&& s, chain::controller& controller,
std::optional<log_catalog>& trace_log, std::optional<log_catalog>& chain_state_log, std::optional<log_catalog>& finality_data_log,
GetBlockID&& get_block_id, GetBlock&& get_block, OnDone&& on_done, fc::logger& logger) :
strand(s.get_executor()), stream(std::move(s)), wake_timer(strand), controller(controller),
trace_log(trace_log), chain_state_log(chain_state_log), finality_data_log(finality_data_log),
get_block_id(get_block_id), get_block(get_block), on_done(on_done), logger(logger), remote_endpoint_string(get_remote_endpoint_string()) {
fc_ilog(logger, "incoming state history connection from ${a}", ("a", remote_endpoint_string));
Expand Down Expand Up @@ -307,7 +307,7 @@ class session final : public session_base {

private:
///these items must only ever be touched by the session's strand
Executor strand;
SocketType::executor_type strand;
coro_throwing_stream stream;
coro_nonthrowing_steadytimer wake_timer;
unsigned coros_running = 0;
Expand Down
10 changes: 5 additions & 5 deletions plugins/state_history_plugin/state_history_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,12 @@ struct state_history_plugin_impl {
template <typename Protocol>
void create_listener(const std::string& address) {
const boost::posix_time::milliseconds accept_timeout(200);
// connections set must only be modified by main thread; run listener on ship thread so sockets use default executor of the ship thread
fc::create_listener<Protocol>(thread_pool.get_executor(), _log, accept_timeout, address, "", [this](Protocol::socket&& socket) {
boost::asio::post(app().get_io_service(), [this, socket{std::move(socket)}]() mutable {
// connections set must only be modified by main thread; run listener on main thread so callback is on main thread
fc::create_listener<Protocol>(app().get_io_service(), _log, accept_timeout, address, "",
[this](const auto&) { return boost::asio::make_strand(thread_pool.get_executor()); },
[this](Protocol::socket&& socket) {
catch_and_log([this, &socket]() {
connections.emplace(new session(std::move(socket), boost::asio::make_strand(thread_pool.get_executor()), chain_plug->chain(),
connections.emplace(new session(std::move(socket), chain_plug->chain(),
trace_log, chain_state_log, finality_data_log,
[this](const chain::block_num_type block_num) {
return get_block_id(block_num);
Expand All @@ -119,7 +120,6 @@ struct state_history_plugin_impl {
}, _log));
});
});
});
}

void listen(){
Expand Down
Loading