Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support interrupt transaction #993

Merged
merged 11 commits into from
Nov 6, 2024
Merged
2 changes: 1 addition & 1 deletion libraries/appbase
47 changes: 42 additions & 5 deletions libraries/chain/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1006,6 +1006,8 @@ struct controller_impl {
async_t async_aggregation = async_t::yes; // by default we process incoming votes asynchronously
my_finalizers_t my_finalizers;
std::atomic<bool> writing_snapshot = false;
std::atomic<bool> applying_block = false;
platform_timer& main_thread_timer;

thread_local static platform_timer timer; // a copy for main thread and each read-only thread
#if defined(EOSIO_EOS_VM_RUNTIME_ENABLED) || defined(EOSIO_EOS_VM_JIT_RUNTIME_ENABLED)
Expand Down Expand Up @@ -1285,6 +1287,7 @@ struct controller_impl {
read_mode( cfg.read_mode ),
thread_pool(),
my_finalizers(cfg.finalizers_dir / config::safety_filename),
main_thread_timer(timer), // assumes constructor is called from main thread
wasmif( conf.wasm_runtime, conf.eosvmoc_tierup, db, conf.state_dir, conf.eosvmoc_config, !conf.profile_accounts.empty() )
{
assert(cfg.chain_thread_pool_size > 0);
Expand Down Expand Up @@ -3780,6 +3783,9 @@ struct controller_impl {
}
}

applying_block = true;
auto apply = fc::make_scoped_exit([&](){ applying_block = false; });
greg7mdp marked this conversation as resolved.
Show resolved Hide resolved

transaction_trace_ptr trace;

size_t packed_idx = 0;
Expand All @@ -3806,7 +3812,11 @@ struct controller_impl {
std::holds_alternative<transaction_id_type>(receipt.trx);

if( transaction_failed && !transaction_can_fail) {
edump((*trace));
if (trace->except->code() == interrupt_exception::code_value) {
ilog("Interrupt of trx id: ${id}", ("id", trace->id));
} else {
edump((*trace));
}
throw *trace->except;
}

Expand Down Expand Up @@ -3875,7 +3885,8 @@ struct controller_impl {
} catch ( const boost::interprocess::bad_alloc& ) {
throw;
} catch ( const fc::exception& e ) {
edump((e.to_detail_string()));
if (e.code() != interrupt_exception::code_value)
edump((e.to_detail_string()));
abort_block();
throw;
} catch ( const std::exception& e ) {
Expand Down Expand Up @@ -4359,7 +4370,15 @@ struct controller_impl {
log_irreversible();
transition_to_savanna_if_needed();
return controller::apply_blocks_result::complete;
} FC_LOG_AND_RETHROW( )
} catch (fc::exception& e) {
if (e.code() != interrupt_exception::code_value) {
wlog("${d}", ("d",e.to_detail_string()));
FC_RETHROW_EXCEPTION(e, warn, "rethrow");
}
throw;
} catch (...) {
try { throw; } FC_LOG_AND_RETHROW()
}
}

controller::apply_blocks_result maybe_apply_blocks( const forked_callback_t& forked_cb, const trx_meta_cache_lookup& trx_lookup )
Expand Down Expand Up @@ -4431,8 +4450,12 @@ struct controller_impl {
} catch ( const boost::interprocess::bad_alloc& ) {
throw;
} catch (const fc::exception& e) {
elog("exception thrown while applying block ${bn} : ${id}, previous ${p}, error: ${e}",
("bn", bsp->block_num())("id", bsp->id())("p", bsp->previous())("e", e.to_detail_string()));
if (e.code() == interrupt_exception::code_value) {
ilog("interrupt while applying block ${bn} : ${id}", ("bn", bsp->block_num())("id", bsp->id()));
} else {
elog("exception thrown while applying block ${bn} : ${id}, previous ${p}, error: ${e}",
("bn", bsp->block_num())("id", bsp->id())("p", bsp->previous())("e", e.to_detail_string()));
}
except = std::current_exception();
} catch (const std::exception& e) {
elog("exception thrown while applying block ${bn} : ${id}, previous ${p}, error: ${e}",
Expand Down Expand Up @@ -4495,6 +4518,16 @@ struct controller_impl {
return applied_trxs;
}

void interrupt_transaction() {
// Only interrupt transaction if applying a block. Speculative trxs already have a deadline set so they
// have limited run time already. This is to allow killing a long-running transaction in a block being
// validated.
if (applying_block) {
ilog("Interrupting apply block");
main_thread_timer.expire_now();
}
}

// @param if_active true if instant finality is active
static checksum256_type calc_merkle( deque<digest_type>&& digests, bool if_active ) {
if (if_active) {
Expand Down Expand Up @@ -5255,6 +5288,10 @@ deque<transaction_metadata_ptr> controller::abort_block() {
return my->abort_block();
}

void controller::interrupt_transaction() {
my->interrupt_transaction();
}

boost::asio::io_context& controller::get_thread_pool() {
return my->thread_pool.get_executor();
}
Expand Down
3 changes: 3 additions & 0 deletions libraries/chain/include/eosio/chain/controller.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,9 @@ namespace eosio::chain {
*/
deque<transaction_metadata_ptr> abort_block();

/// Expected to be called from signal handler
void interrupt_transaction();

/**
*
*/
Expand Down
8 changes: 5 additions & 3 deletions libraries/chain/include/eosio/chain/exceptions.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,9 @@ namespace eosio { namespace chain {
3080005, "Transaction CPU usage is too much for the remaining allowable usage of the current block" )
FC_DECLARE_DERIVED_EXCEPTION( deadline_exception, resource_exhausted_exception,
3080006, "Transaction took too long" )
FC_DECLARE_DERIVED_EXCEPTION( leeway_deadline_exception, deadline_exception,
3081001, "Transaction reached the deadline set due to leeway on account CPU limits" )

FC_DECLARE_DERIVED_EXCEPTION( greylist_net_usage_exceeded, resource_exhausted_exception,
3080007, "Transaction exceeded the current greylisted account network usage limit" )
FC_DECLARE_DERIVED_EXCEPTION( greylist_cpu_usage_exceeded, resource_exhausted_exception,
Expand All @@ -389,9 +392,8 @@ namespace eosio { namespace chain {
3080009, "Read-only transaction eos-vm-oc compile temporary failure" )
FC_DECLARE_DERIVED_EXCEPTION( ro_trx_vm_oc_compile_permanent_failure, resource_exhausted_exception,
3080010, "Read-only transaction eos-vm-oc compile permanent failure" )

FC_DECLARE_DERIVED_EXCEPTION( leeway_deadline_exception, deadline_exception,
3081001, "Transaction reached the deadline set due to leeway on account CPU limits" )
FC_DECLARE_DERIVED_EXCEPTION( interrupt_exception, resource_exhausted_exception,
3080011, "Transaction interrupted by signal" )

FC_DECLARE_DERIVED_EXCEPTION( authorization_exception, chain_exception,
3090000, "Authorization exception" )
Expand Down
1 change: 1 addition & 0 deletions libraries/chain/include/eosio/chain/platform_timer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ struct platform_timer {

void start(fc::time_point tp);
void stop();
void expire_now();

/* Sets a callback for when timer expires. Be aware this could might fire from a signal handling context and/or
on any particular thread. Only a single callback can be registered at once; trying to register more will
Expand Down
27 changes: 12 additions & 15 deletions libraries/chain/platform_timer_asio_fallback.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,39 +57,36 @@ platform_timer::~platform_timer() {

void platform_timer::start(fc::time_point tp) {
if(tp == fc::time_point::maximum()) {
expired = 0;
expired = false;
return;
}
fc::microseconds x = tp.time_since_epoch() - fc::time_point::now().time_since_epoch();
if(x.count() <= 0)
expired = 1;
expired = true;
else {
#if 0
std::promise<void> p;
auto f = p.get_future();
checktime_ios->post([&p,this]() {
expired = 0;
p.set_value();
});
f.get();
#endif
expired = 0;
expired = false;
my->timer->expires_after(std::chrono::microseconds(x.count()));
my->timer->async_wait([this](const boost::system::error_code& ec) {
if(ec)
return;
expired = 1;
call_expiration_callback();
expire_now();
});
}
}

void platform_timer::expire_now() {
bool expected = false;
if (expired.compare_exchange_strong(expected, true)) {
call_expiration_callback();
}
}

void platform_timer::stop() {
if(expired)
return;

my->timer->cancel();
expired = 1;
expired = true;
}

}}
20 changes: 13 additions & 7 deletions libraries/chain/platform_timer_kqueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,7 @@ platform_timer::platform_timer() {

if(c == 1 && anEvent.filter == EVFILT_TIMER) {
platform_timer* self = (platform_timer*)anEvent.udata;
self->expired = 1;
self->call_expiration_callback();
self->expire_now();
}
else if(c == 1 && anEvent.filter == EVFILT_USER)
return;
Expand Down Expand Up @@ -90,19 +89,26 @@ platform_timer::~platform_timer() {

void platform_timer::start(fc::time_point tp) {
if(tp == fc::time_point::maximum()) {
expired = 0;
expired = false;
return;
}
fc::microseconds x = tp.time_since_epoch() - fc::time_point::now().time_since_epoch();
if(x.count() <= 0)
expired = 1;
expired = true;
else {
struct kevent64_s aTimerEvent;
EV_SET64(&aTimerEvent, my->timerid, EVFILT_TIMER, EV_ADD|EV_ENABLE|EV_ONESHOT, NOTE_USECONDS|NOTE_CRITICAL, x.count(), (uint64_t)this, 0, 0);

expired = 0;
expired = false;
if(kevent64(kqueue_fd, &aTimerEvent, 1, NULL, 0, KEVENT_FLAG_IMMEDIATE, NULL) != 0)
expired = 1;
expired = true;
}
}

void platform_timer::expire_now() {
bool expected = false;
if (expired.compare_exchange_strong(expected, true)) {
call_expiration_callback();
}
}

Expand All @@ -113,7 +119,7 @@ void platform_timer::stop() {
struct kevent64_s stop_timer_event;
EV_SET64(&stop_timer_event, my->timerid, EVFILT_TIMER, EV_DELETE, 0, 0, 0, 0, 0);
kevent64(kqueue_fd, &stop_timer_event, 1, NULL, 0, KEVENT_FLAG_IMMEDIATE, NULL);
expired = 1;
expired = true;
}

}}
30 changes: 19 additions & 11 deletions libraries/chain/platform_timer_posix.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@
#include <fc/fwd_impl.hpp>
#include <fc/exception/exception.hpp>

#include <atomic>
#include <mutex>

#include <signal.h>
#include <time.h>
#include <sys/types.h>

namespace eosio { namespace chain {
namespace eosio::chain {

static_assert(std::atomic_bool::is_always_lock_free, "Only lock-free atomics AS-safe.");

Expand All @@ -19,18 +21,17 @@ struct platform_timer::impl {

static void sig_handler(int, siginfo_t* si, void*) {
platform_timer* self = (platform_timer*)si->si_value.sival_ptr;
self->expired = 1;
self->call_expiration_callback();
self->expire_now();
}
};

platform_timer::platform_timer() {
static_assert(sizeof(impl) <= fwd_size);

static bool initialized;
static std::mutex initalized_mutex;
static std::mutex initialized_mutex;

if(std::lock_guard guard(initalized_mutex); !initialized) {
if(std::lock_guard guard(initialized_mutex); !initialized) {
struct sigaction act;
sigemptyset(&act.sa_mask);
act.sa_sigaction = impl::sig_handler;
Expand All @@ -55,19 +56,26 @@ platform_timer::~platform_timer() {

void platform_timer::start(fc::time_point tp) {
if(tp == fc::time_point::maximum()) {
expired = 0;
expired = false;
return;
}
fc::microseconds x = tp.time_since_epoch() - fc::time_point::now().time_since_epoch();
if(x.count() <= 0)
expired = 1;
expired = true;
else {
time_t secs = x.count() / 1000000;
long nsec = (x.count() - (secs*1000000)) * 1000;
struct itimerspec enable = {{0, 0}, {secs, nsec}};
expired = 0;
expired = false;
if(timer_settime(my->timerid, 0, &enable, NULL) != 0)
expired = 1;
expired = true;
}
}

void platform_timer::expire_now() {
bool expected = false;
if (expired.compare_exchange_strong(expected, true)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess you can save a line of code using atomic_flag, but it doesn't really matter

call_expiration_callback();
}
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feels like this may introduce a possibility of the expiration callback being called twice? I am not sure how much that matters. Do we need to test_and_set both here and in the normal expiry path to guard against that?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was under the impression that it didn't matter to be called twice. Not sure how to exactly verify that. Maybe safest to do the test_and_set.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Expand All @@ -76,7 +84,7 @@ void platform_timer::stop() {
return;
struct itimerspec disable = {{0, 0}, {0, 0}};
timer_settime(my->timerid, 0, &disable, NULL);
expired = 1;
expired = true;
}

}}
}
5 changes: 4 additions & 1 deletion libraries/chain/transaction_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,10 @@ namespace eosio::chain {
return;

auto now = fc::time_point::now();
if( explicit_billed_cpu_time || deadline_exception_code == deadline_exception::code_value ) {
if (explicit_billed_cpu_time && block_deadline > now) {
EOS_THROW( interrupt_exception, "interrupt signaled, ran ${bt}us, start ${s}",
("bt", now - pseudo_start)("s", start) );
} else if( explicit_billed_cpu_time || deadline_exception_code == deadline_exception::code_value ) {
EOS_THROW( deadline_exception, "deadline exceeded ${billing_timer}us",
("billing_timer", now - pseudo_start)("now", now)("deadline", _deadline)("start", start) );
greg7mdp marked this conversation as resolved.
Show resolved Hide resolved
} else if( deadline_exception_code == block_cpu_usage_exceeded::code_value ) {
Expand Down
4 changes: 4 additions & 0 deletions plugins/test_control_api_plugin/test_control_api_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ void test_control_api_plugin::plugin_startup() {
TEST_CONTROL_RW_CALL(throw_on, 202, http_params_types::params_required)
}, appbase::exec_queue::read_write);

app().get_plugin<http_plugin>().add_api({
TEST_CONTROL_RW_CALL(swap_action, 202, http_params_types::params_required)
}, appbase::exec_queue::read_write);

}

void test_control_api_plugin::plugin_shutdown() {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,17 @@ class read_write {
};
empty throw_on(const throw_on_params& params) const;

private:
// produce a next block with `from` action replaced with `to` action
// requires Savanna to be active, this assumes blocks are is_proper_svnn_block
struct swap_action_params {
chain::name from; // replace from action in block to `to` action
chain::name to;
fc::crypto::private_key trx_priv_key;
fc::crypto::private_key blk_priv_key;
};
empty swap_action(const swap_action_params& params) const;

private:
test_control_ptr my;
};

Expand Down Expand Up @@ -68,3 +78,4 @@ class test_control_plugin : public plugin<test_control_plugin> {
FC_REFLECT(eosio::test_control_apis::empty, )
FC_REFLECT(eosio::test_control_apis::read_write::kill_node_on_producer_params, (producer)(where_in_sequence)(based_on_lib) )
FC_REFLECT(eosio::test_control_apis::read_write::throw_on_params, (signal)(exception) )
FC_REFLECT(eosio::test_control_apis::read_write::swap_action_params, (from)(to)(trx_priv_key)(blk_priv_key) )
Loading