Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support interrupt transaction #993

Merged
merged 11 commits into from
Nov 6, 2024
Merged
37 changes: 33 additions & 4 deletions libraries/chain/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1006,6 +1006,8 @@ struct controller_impl {
async_t async_aggregation = async_t::yes; // by default we process incoming votes asynchronously
my_finalizers_t my_finalizers;
std::atomic<bool> writing_snapshot = false;
std::atomic<bool> applying_block = false;
platform_timer& main_thread_timer;

thread_local static platform_timer timer; // a copy for main thread and each read-only thread
#if defined(EOSIO_EOS_VM_RUNTIME_ENABLED) || defined(EOSIO_EOS_VM_JIT_RUNTIME_ENABLED)
Expand Down Expand Up @@ -1285,6 +1287,7 @@ struct controller_impl {
read_mode( cfg.read_mode ),
thread_pool(),
my_finalizers(cfg.finalizers_dir / config::safety_filename),
main_thread_timer(timer), // assumes constructor is called from main thread
wasmif( conf.wasm_runtime, conf.eosvmoc_tierup, db, conf.state_dir, conf.eosvmoc_config, !conf.profile_accounts.empty() )
{
assert(cfg.chain_thread_pool_size > 0);
Expand Down Expand Up @@ -3780,6 +3783,9 @@ struct controller_impl {
}
}

applying_block = true;
auto apply = fc::make_scoped_exit([&](){ applying_block = false; });
greg7mdp marked this conversation as resolved.
Show resolved Hide resolved

transaction_trace_ptr trace;

size_t packed_idx = 0;
Expand All @@ -3806,7 +3812,11 @@ struct controller_impl {
std::holds_alternative<transaction_id_type>(receipt.trx);

if( transaction_failed && !transaction_can_fail) {
edump((*trace));
if (trace->except->code() == interrupt_exception::code_value) {
ilog("Interrupt of trx: ${t}", ("t", *trace));
} else {
edump((*trace));
}
throw *trace->except;
}

Expand Down Expand Up @@ -3875,7 +3885,8 @@ struct controller_impl {
} catch ( const boost::interprocess::bad_alloc& ) {
throw;
} catch ( const fc::exception& e ) {
edump((e.to_detail_string()));
if (e.code() != interrupt_exception::code_value)
edump((e.to_detail_string()));
abort_block();
throw;
} catch ( const std::exception& e ) {
Expand Down Expand Up @@ -4431,8 +4442,12 @@ struct controller_impl {
} catch ( const boost::interprocess::bad_alloc& ) {
throw;
} catch (const fc::exception& e) {
elog("exception thrown while applying block ${bn} : ${id}, previous ${p}, error: ${e}",
("bn", bsp->block_num())("id", bsp->id())("p", bsp->previous())("e", e.to_detail_string()));
if (e.code() == interrupt_exception::code_value) {
ilog("interrupt while applying block ${bn} : ${id}", ("bn", bsp->block_num())("id", bsp->id()));
} else {
elog("exception thrown while applying block ${bn} : ${id}, previous ${p}, error: ${e}",
("bn", bsp->block_num())("id", bsp->id())("p", bsp->previous())("e", e.to_detail_string()));
}
except = std::current_exception();
} catch (const std::exception& e) {
elog("exception thrown while applying block ${bn} : ${id}, previous ${p}, error: ${e}",
Expand Down Expand Up @@ -4495,6 +4510,16 @@ struct controller_impl {
return applied_trxs;
}

void interrupt_transaction() {
// Only interrupt transaction if applying a block. Speculative trxs already have a deadline set so they
// have limited run time already. This is to allow killing a long-running transaction in a block being
// validated.
if (applying_block) {
ilog("Interrupting apply block");
main_thread_timer.expire_now();
}
}

// @param if_active true if instant finality is active
static checksum256_type calc_merkle( deque<digest_type>&& digests, bool if_active ) {
if (if_active) {
Expand Down Expand Up @@ -5255,6 +5280,10 @@ deque<transaction_metadata_ptr> controller::abort_block() {
return my->abort_block();
}

void controller::interrupt_transaction() {
my->interrupt_transaction();
}

boost::asio::io_context& controller::get_thread_pool() {
return my->thread_pool.get_executor();
}
Expand Down
3 changes: 3 additions & 0 deletions libraries/chain/include/eosio/chain/controller.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,9 @@ namespace eosio::chain {
*/
deque<transaction_metadata_ptr> abort_block();

/// Expected to be called from signal handler
void interrupt_transaction();

/**
*
*/
Expand Down
8 changes: 5 additions & 3 deletions libraries/chain/include/eosio/chain/exceptions.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,9 @@ namespace eosio { namespace chain {
3080005, "Transaction CPU usage is too much for the remaining allowable usage of the current block" )
FC_DECLARE_DERIVED_EXCEPTION( deadline_exception, resource_exhausted_exception,
3080006, "Transaction took too long" )
FC_DECLARE_DERIVED_EXCEPTION( leeway_deadline_exception, deadline_exception,
3081001, "Transaction reached the deadline set due to leeway on account CPU limits" )

FC_DECLARE_DERIVED_EXCEPTION( greylist_net_usage_exceeded, resource_exhausted_exception,
3080007, "Transaction exceeded the current greylisted account network usage limit" )
FC_DECLARE_DERIVED_EXCEPTION( greylist_cpu_usage_exceeded, resource_exhausted_exception,
Expand All @@ -389,9 +392,8 @@ namespace eosio { namespace chain {
3080009, "Read-only transaction eos-vm-oc compile temporary failure" )
FC_DECLARE_DERIVED_EXCEPTION( ro_trx_vm_oc_compile_permanent_failure, resource_exhausted_exception,
3080010, "Read-only transaction eos-vm-oc compile permanent failure" )

FC_DECLARE_DERIVED_EXCEPTION( leeway_deadline_exception, deadline_exception,
3081001, "Transaction reached the deadline set due to leeway on account CPU limits" )
FC_DECLARE_DERIVED_EXCEPTION( interrupt_exception, resource_exhausted_exception,
3080011, "Transaction interrupted by signal" )

FC_DECLARE_DERIVED_EXCEPTION( authorization_exception, chain_exception,
3090000, "Authorization exception" )
Expand Down
1 change: 1 addition & 0 deletions libraries/chain/include/eosio/chain/platform_timer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ struct platform_timer {

void start(fc::time_point tp);
void stop();
void expire_now();

/* Sets a callback for when timer expires. Be aware this could might fire from a signal handling context and/or
on any particular thread. Only a single callback can be registered at once; trying to register more will
Expand Down
5 changes: 5 additions & 0 deletions libraries/chain/platform_timer_asio_fallback.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ void platform_timer::start(fc::time_point tp) {
}
}

void platform_timer::expire_now() {
expired = 1;
call_expiration_callback();
}

void platform_timer::stop() {
if(expired)
return;
Expand Down
5 changes: 5 additions & 0 deletions libraries/chain/platform_timer_kqueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,11 @@ void platform_timer::start(fc::time_point tp) {
}
}

void platform_timer::expire_now() {
expired = 1;
call_expiration_callback();
}

void platform_timer::stop() {
if(expired)
return;
Expand Down
5 changes: 5 additions & 0 deletions libraries/chain/platform_timer_posix.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ void platform_timer::start(fc::time_point tp) {
}
}

void platform_timer::expire_now() {
expired = 1;
call_expiration_callback();
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feels like this may introduce a possibility of the expiration callback being called twice? I am not sure how much that matters. Do we need to test_and_set both here and in the normal expiry path to guard against that?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was under the impression that it didn't matter to be called twice. Not sure how to exactly verify that. Maybe safest to do the test_and_set.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

void platform_timer::stop() {
if(expired)
return;
Expand Down
5 changes: 4 additions & 1 deletion libraries/chain/transaction_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,10 @@ namespace eosio::chain {
return;

auto now = fc::time_point::now();
if( explicit_billed_cpu_time || deadline_exception_code == deadline_exception::code_value ) {
if (explicit_billed_cpu_time) {
EOS_THROW( interrupt_exception, "interrupt signaled, ran ${bt}us, start ${s}",
("bt", now - pseudo_start)("s", start) );
} else if( deadline_exception_code == deadline_exception::code_value ) {
EOS_THROW( deadline_exception, "deadline exceeded ${billing_timer}us",
("billing_timer", now - pseudo_start)("now", now)("deadline", _deadline)("start", start) );
greg7mdp marked this conversation as resolved.
Show resolved Hide resolved
} else if( deadline_exception_code == block_cpu_usage_exceeded::code_value ) {
Expand Down
4 changes: 4 additions & 0 deletions plugins/test_control_api_plugin/test_control_api_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ void test_control_api_plugin::plugin_startup() {
TEST_CONTROL_RW_CALL(throw_on, 202, http_params_types::params_required)
}, appbase::exec_queue::read_write);

app().get_plugin<http_plugin>().add_api({
TEST_CONTROL_RW_CALL(swap_action, 202, http_params_types::params_required)
}, appbase::exec_queue::read_write);

}

void test_control_api_plugin::plugin_shutdown() {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,17 @@ class read_write {
};
empty throw_on(const throw_on_params& params) const;

private:
// produce a next block with `from` action replaced with `to` action
// requires Savanna to be active, this assumes blocks are is_proper_svnn_block
struct swap_action_params {
chain::name from; // replace from action in block to `to` action
chain::name to;
fc::crypto::private_key trx_priv_key;
fc::crypto::private_key blk_priv_key;
};
empty swap_action(const swap_action_params& params) const;

private:
test_control_ptr my;
};

Expand Down Expand Up @@ -68,3 +78,4 @@ class test_control_plugin : public plugin<test_control_plugin> {
FC_REFLECT(eosio::test_control_apis::empty, )
FC_REFLECT(eosio::test_control_apis::read_write::kill_node_on_producer_params, (producer)(where_in_sequence)(based_on_lib) )
FC_REFLECT(eosio::test_control_apis::read_write::throw_on_params, (signal)(exception) )
FC_REFLECT(eosio::test_control_apis::read_write::swap_action_params, (from)(to)(trx_priv_key)(blk_priv_key) )
91 changes: 85 additions & 6 deletions plugins/test_control_plugin/test_control_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@ class test_control_plugin_impl {
void kill_on_head(account_name prod, uint32_t where_in_seq);

void set_throw_on_options(const test_control_apis::read_write::throw_on_params& throw_options);
void set_swap_action_options(const test_control_apis::read_write::swap_action_params& swap_options);
private:
void block_start(chain::block_num_type block_num);
void accepted_block_header(const chain::block_id_type& id);
void accepted_block(const chain::block_id_type& id);
void accepted_block(const chain::block_id_type& id, const chain::signed_block_ptr& block);
void irreversible_block(const chain::block_id_type& id);
void applied_transaction();
void voted_block();
Expand All @@ -25,6 +26,9 @@ class test_control_plugin_impl {
void reset_throw();
void process_next_block_state(const chain::block_id_type& id);

void swap_action_in_block(const chain::signed_block_ptr& b);
void reset_swap_action() { _swap_on_options = {}; }

chain::controller& _chain;
struct kill_options {
account_name _producer;
Expand All @@ -35,7 +39,8 @@ class test_control_plugin_impl {
bool _track_head{false};
} _kill_options;

test_control_apis::read_write::throw_on_params _throw_options;
test_control_apis::read_write::throw_on_params _throw_options;
test_control_apis::read_write::swap_action_params _swap_on_options;

std::optional<boost::signals2::scoped_connection> _block_start_connection;
std::optional<boost::signals2::scoped_connection> _accepted_block_header_connection;
Expand All @@ -59,7 +64,7 @@ void test_control_plugin_impl::connect() {
_accepted_block_connection =
_chain.accepted_block().connect( [&]( const chain::block_signal_params& t ) {
const auto& [ block, id ] = t;
accepted_block( id );
accepted_block( id, block );
} );
_irreversible_block_connection.emplace(
_chain.irreversible_block().connect( [&]( const chain::block_signal_params& t ) {
Expand Down Expand Up @@ -96,6 +101,71 @@ void test_control_plugin_impl::reset_throw() {
_throw_options = test_control_apis::read_write::throw_on_params{};
}

void test_control_plugin_impl::swap_action_in_block(const chain::signed_block_ptr& b) {
if (b->transactions.empty())
return;

bool found = std::find_if(b->transactions.cbegin(), b->transactions.cend(), [&](const auto& t) {
return std::visit(chain::overloaded{
[](const transaction_id_type&) { return false; },
[&](const chain::packed_transaction& pt) {
for (const auto& a : pt.get_transaction().actions) {
if (a.name == _swap_on_options.from)
return true;
}
return false;
}
}, t.trx);
}) != b->transactions.cend();
if (!found)
return;

if (!b->is_proper_svnn_block()) {
elog("Block is not a Savanna block, swap_action failed.");
return;
}

auto copy_b = std::make_shared<chain::signed_block>(b->clone());
copy_b->previous = b->calculate_id();
copy_b->block_extensions.clear(); // remove QC extension since header will claim same as previous block
copy_b->timestamp = b->timestamp.next();
// swap out action
for (auto& t : copy_b->transactions) {
std::visit(chain::overloaded{
[](const transaction_id_type&) {},
[&](chain::packed_transaction& pt) {
for (auto& a : pt.get_transaction().actions) {
if (a.name == _swap_on_options.from) {
auto signed_tx = pt.get_signed_transaction();
auto& act = signed_tx.actions.back();
act.name = _swap_on_options.to;
// Re-sign the transaction
signed_tx.signatures.clear();
signed_tx.sign(_swap_on_options.trx_priv_key, _chain.get_chain_id());
// Replace the transaction
auto new_packed_tx = packed_transaction(signed_tx);
const_cast<packed_transaction&>(pt) = std::move(new_packed_tx);
}
}
}
}, t.trx);
}
// Re-calculate the transaction merkle
std::deque<chain::digest_type> trx_digests;
const auto& trxs = copy_b->transactions;
for( const auto& tr : trxs )
trx_digests.emplace_back( tr.digest() );
copy_b->transaction_mroot = chain::calculate_merkle( std::move(trx_digests) );
// Re-sign the block
copy_b->producer_signature = _swap_on_options.blk_priv_key.sign(copy_b->calculate_id());

// will be processed on the next start_block if is_new_best_head
const auto&[is_new_best_head, bh] = _chain.accept_block(copy_b->calculate_id(), copy_b);
ilog("Swapped action ${f} to ${t}, is_new_best_head ${bh}, block ${bn}",
("f", _swap_on_options.from)("t", _swap_on_options.to)("bh", is_new_best_head)("bn", bh ? bh->block_num() : 0));
reset_swap_action();
}

void test_control_plugin_impl::block_start(chain::block_num_type block_num) {
if (_throw_options.signal == "block_start")
throw_exception();
Expand All @@ -106,11 +176,13 @@ void test_control_plugin_impl::accepted_block_header(const chain::block_id_type&
throw_exception();
}

void test_control_plugin_impl::accepted_block(const chain::block_id_type& id) {
void test_control_plugin_impl::accepted_block(const chain::block_id_type& id, const chain::signed_block_ptr& block) {
if (_kill_options._track_head)
process_next_block_state(id);
if (_throw_options.signal == "accepted_block")
throw_exception();
if (!_swap_on_options.from.empty())
swap_action_in_block(block);
}

void test_control_plugin_impl::irreversible_block(const chain::block_id_type& id) {
Expand Down Expand Up @@ -185,12 +257,13 @@ void test_control_plugin_impl::kill_on_head(account_name prod, uint32_t where_in
_kill_options._track_head = true;
}

// ----------------- throw_on --------------------------------

void test_control_plugin_impl::set_throw_on_options(const test_control_apis::read_write::throw_on_params& throw_options) {
_throw_options = throw_options;
}

void test_control_plugin_impl::set_swap_action_options(const test_control_apis::read_write::swap_action_params& swap_options) {
_swap_on_options = swap_options;
}

test_control_plugin::test_control_plugin() = default;

Expand Down Expand Up @@ -230,5 +303,11 @@ empty read_write::throw_on(const read_write::throw_on_params& params) const {
return {};
}

empty read_write::swap_action(const read_write::swap_action_params& params) const {
ilog("received swap_action: ${p}", ("p", params));
my->set_swap_action_options(params);
return {};
}

} // namespace test_control_apis
} // namespace eosio
5 changes: 5 additions & 0 deletions programs/nodeos/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,8 @@ int main(int argc, char** argv)
app->set_stop_executor_cb([&app]() {
ilog("appbase quit called");
app->get_io_context().stop();
auto& chain = app->get_plugin<chain_plugin>().chain();
chain.interrupt_transaction();
});
app->set_version(htonl(short_hash));
app->set_version_string(eosio::version::version_client());
Expand Down Expand Up @@ -220,6 +222,9 @@ int main(int argc, char** argv)
elog( "database dirty flag set (likely due to unclean shutdown): replay required" );
return DATABASE_DIRTY;
}
} else if (e.code() == interrupt_exception::code_value) {
ilog("Interrupted, successfully exiting");
return SUCCESS;
}
elog( "${e}", ("e", e.to_detail_string()));
return OTHER_FAIL;
Expand Down
Loading
Loading