diff --git a/runtime-sdk/src/dispatcher.rs b/runtime-sdk/src/dispatcher.rs index 1d02962ead..518bfc5727 100644 --- a/runtime-sdk/src/dispatcher.rs +++ b/runtime-sdk/src/dispatcher.rs @@ -31,7 +31,7 @@ use crate::{ error::{Error as _, RuntimeError}, event::IntoTags, keymanager::{KeyManagerClient, KeyManagerError}, - module::{self, BlockHandler, MethodHandler, TransactionHandler}, + module::{self, BlockHandler, InMsgHandler, InMsgResult, MethodHandler, TransactionHandler}, modules, modules::core::API as _, runtime::Runtime, @@ -537,7 +537,7 @@ impl Dispatcher { messages, block_tags: block_tags.into_tags(), tx_reject_hashes: vec![], - in_msgs_count: 0, // TODO: Support processing incoming messages. + in_msgs_count: 0, }) } } @@ -547,17 +547,63 @@ impl transaction::dispatcher::Dispatcher for Dispatche &self, rt_ctx: transaction::Context<'_>, batch: &TxnBatch, - _in_msgs: &[roothash::IncomingMessage], + in_msgs: &[roothash::IncomingMessage], ) -> Result { - self.execute_batch_common( + let mut in_msgs_count = 0; + + let mut result = self.execute_batch_common( rt_ctx, |ctx| -> Result, RuntimeError> { // If prefetch limit is set enable prefetch. let prefetch_enabled = R::PREFETCH_LIMIT > 0; + let mut results = Vec::with_capacity(batch.len()); + + // Process incoming messages first. + let mut batch_it = batch.iter(); + 'inmsg: for in_msg in in_msgs { + match R::IncomingMessagesHandler::process_in_msg(ctx, &in_msg) { + InMsgResult::Skip => { + // Skip, but treat as processed. + in_msgs_count += 1; + } + InMsgResult::Execute(raw_tx, tx) => { + // Verify that the transaction has been included in the batch. + match batch_it.next() { + None => { + // Nothing in the batch when there should be an incoming message. + return Err(Error::MalformedTransactionInBatch(anyhow!( + "missing incoming message" + )) + .into()); + } + Some(batch_tx) if batch_tx != raw_tx => { + // Incoming message does not match what is in the batch. + return Err(Error::MalformedTransactionInBatch(anyhow!( + "mismatched incoming message" + )) + .into()); + } + _ => { + // Everything is ok. + } + } + + // Further execute the inner transaction. The transaction has already + // passed checks so it is ok to include in a block. + let tx_size = raw_tx.len().try_into().unwrap(); + let index = results.len(); + results.push(Self::execute_tx(ctx, tx_size, tx, index)?); + in_msgs_count += 1; + } + InMsgResult::Stop => break 'inmsg, + } + } + + let inmsg_txs = results.len(); let mut txs = Vec::with_capacity(batch.len()); let mut prefixes: BTreeSet = BTreeSet::new(); - for tx in batch.iter() { + for tx in batch.iter().skip(inmsg_txs) { let tx_size = tx.len().try_into().map_err(|_| { Error::MalformedTransactionInBatch(anyhow!("transaction too large")) })?; @@ -580,24 +626,29 @@ impl transaction::dispatcher::Dispatcher for Dispatche } // Execute the batch. - let mut results = Vec::with_capacity(batch.len()); - for (index, (tx_size, tx)) in txs.into_iter().enumerate() { + for (index, (tx_size, tx)) in txs.into_iter().skip(inmsg_txs).enumerate() { results.push(Self::execute_tx(ctx, tx_size, tx, index)?); } Ok(results) }, - ) + )?; + + // Include number of processed incoming messages in the final result. + result.in_msgs_count = in_msgs_count; + + Ok(result) } fn schedule_and_execute_batch( &self, rt_ctx: transaction::Context<'_>, batch: &mut TxnBatch, - _in_msgs: &[roothash::IncomingMessage], + in_msgs: &[roothash::IncomingMessage], ) -> Result { let cfg = R::SCHEDULE_CONTROL; let mut tx_reject_hashes = Vec::new(); + let mut in_msgs_count = 0; let mut result = self.execute_batch_common( rt_ctx, @@ -607,13 +658,35 @@ impl transaction::dispatcher::Dispatcher for Dispatche // The idea is to keep scheduling transactions as long as we have some space // available in the block as determined by gas use. let mut new_batch = Vec::new(); - let mut results = Vec::with_capacity(batch.len()); + let mut results = Vec::with_capacity(in_msgs.len() + batch.len()); let mut requested_batch_len = cfg.initial_batch_size; + + // Process incoming messages first. + 'inmsg: for in_msg in in_msgs { + match R::IncomingMessagesHandler::process_in_msg(ctx, &in_msg) { + InMsgResult::Skip => { + // Skip, but treat as processed. + in_msgs_count += 1; + } + InMsgResult::Execute(raw_tx, tx) => { + // Further execute the inner transaction. The transaction has already + // passed checks so it is ok to include in a block. + let tx_size = raw_tx.len().try_into().unwrap(); + let index = new_batch.len(); + new_batch.push(raw_tx.to_owned()); + results.push(Self::execute_tx(ctx, tx_size, tx, index)?); + + in_msgs_count += 1; + } + InMsgResult::Stop => break 'inmsg, + } + } + + // Process regular transactions. 'batch: loop { // Remember length of last batch. let last_batch_len = batch.len(); let last_batch_tx_hash = batch.last().map(|raw_tx| Hash::digest_bytes(raw_tx)); - for raw_tx in batch.drain(..) { // If we don't have enough gas for processing even the cheapest transaction // we are done. Same if we reached the runtime-imposed maximum tx count. @@ -689,8 +762,10 @@ impl transaction::dispatcher::Dispatcher for Dispatche }, )?; - // Include rejected transaction hashes in the final result. + // Include rejected transaction hashes and number of processed incoming messages in the + // final result. result.tx_reject_hashes = tx_reject_hashes; + result.in_msgs_count = in_msgs_count; Ok(result) } @@ -877,6 +952,7 @@ mod test { core::Genesis { parameters: core::Parameters { max_batch_gas: u64::MAX, + max_inmsg_gas: 0, max_tx_size: 32 * 1024, max_tx_signers: 1, max_multisig_signers: 8, @@ -885,6 +961,7 @@ mod test { auth_signature: 0, auth_multisig_signer: 0, callformat_x25519_deoxysii: 0, + inmsg_base: 0, }, min_gas_price: BTreeMap::from([(token::Denomination::NATIVE, 0)]), }, diff --git a/runtime-sdk/src/error.rs b/runtime-sdk/src/error.rs index 3339e4f706..7a62f8c731 100644 --- a/runtime-sdk/src/error.rs +++ b/runtime-sdk/src/error.rs @@ -1,4 +1,6 @@ //! Error types for runtimes. +use std::fmt::Display; + pub use oasis_core_runtime::types::Error as RuntimeError; use crate::{dispatcher, module::CallResult}; @@ -56,6 +58,18 @@ pub trait Error: std::error::Error { { Err(self) } + + /// Converts the error into a serializable error. + fn into_serializable(self) -> SerializableError + where + Self: Sized, + { + SerializableError { + module: self.module_name().to_owned(), + code: self.code(), + message: self.to_string(), + } + } } impl Error for std::convert::Infallible { @@ -68,6 +82,47 @@ impl Error for std::convert::Infallible { } } +/// A standardized serialized implementation for an error. +#[derive(Debug, Default, Clone, thiserror::Error, cbor::Encode, cbor::Decode)] +pub struct SerializableError { + pub module: String, + pub code: u32, + pub message: String, +} + +impl Display for SerializableError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.message) + } +} + +impl Error for SerializableError { + fn module_name(&self) -> &str { + &self.module + } + + fn code(&self) -> u32 { + self.code + } +} + +impl From for SerializableError { + fn from(result: CallResult) -> Self { + match result { + CallResult::Failed { + module, + code, + message, + } => Self { + module, + code, + message, + }, + _ => Default::default(), + } + } +} + #[cfg(test)] mod test { use super::*; diff --git a/runtime-sdk/src/lib.rs b/runtime-sdk/src/lib.rs index b002cb057f..a52e11ee88 100644 --- a/runtime-sdk/src/lib.rs +++ b/runtime-sdk/src/lib.rs @@ -3,6 +3,7 @@ #![deny(rust_2018_idioms, unreachable_pub)] #![forbid(unsafe_code)] #![feature(int_log)] +#![feature(associated_type_defaults)] pub mod callformat; pub mod config; diff --git a/runtime-sdk/src/module.rs b/runtime-sdk/src/module.rs index 6e9eecfa0e..257389404d 100644 --- a/runtime-sdk/src/module.rs +++ b/runtime-sdk/src/module.rs @@ -9,6 +9,7 @@ use impl_trait_for_tuples::impl_for_tuples; use crate::{ context::{Context, TxContext}, + core::consensus::roothash, dispatcher, error, error::Error as _, event, modules, @@ -565,6 +566,39 @@ impl ModuleInfoHandler for Tuple { } } +/// Incoming message handler. +pub trait InMsgHandler { + /// Process an incoming message. + fn process_in_msg<'a, C: Context>( + ctx: &mut C, + in_msg: &'a roothash::IncomingMessage, + ) -> InMsgResult<'a>; +} + +/// Result of processing an incoming message. +#[derive(Debug)] +pub enum InMsgResult<'a> { + /// Skip to next incoming message, but count as processed. + Skip, + /// Add to batch/verify inclusion and execute. + Execute(&'a [u8], Transaction), + /// Stop processing incoming messages. + Stop, +} + +/// An incoming message handler which discards all incoming messages. +pub struct InMsgDiscard; + +impl InMsgHandler for InMsgDiscard { + fn process_in_msg<'a, C: Context>( + _ctx: &mut C, + _in_msg: &'a roothash::IncomingMessage, + ) -> InMsgResult<'a> { + // Just skip all messages without doing anything. + InMsgResult::Skip + } +} + /// A runtime module. pub trait Module { /// Module name. @@ -591,6 +625,11 @@ pub trait Module { /// Set the module's parameters. fn set_params(store: S, params: Self::Parameters) { + params + .validate_basic() + .map_err(|_| ()) + .expect("module parameters are invalid"); + let store = storage::PrefixStore::new(store, &Self::NAME); let mut store = storage::TypedStore::new(store); store.insert(Self::Parameters::STORE_KEY, params); diff --git a/runtime-sdk/src/modules/consensus/mod.rs b/runtime-sdk/src/modules/consensus/mod.rs index 98325218fb..0986738018 100644 --- a/runtime-sdk/src/modules/consensus/mod.rs +++ b/runtime-sdk/src/modules/consensus/mod.rs @@ -1,8 +1,6 @@ //! Consensus module. //! //! Low level consensus module for communicating with the consensus layer. -use std::str::FromStr; - use thiserror::Error; use oasis_core_runtime::{ @@ -44,7 +42,7 @@ pub struct Parameters { impl Default for Parameters { fn default() -> Self { Self { - consensus_denomination: token::Denomination::from_str("TEST").unwrap(), + consensus_denomination: "TEST".parse().unwrap(), consensus_scaling_factor: 1, } } diff --git a/runtime-sdk/src/modules/consensus_inmsg/config.rs b/runtime-sdk/src/modules/consensus_inmsg/config.rs new file mode 100644 index 0000000000..1f4e4c64c3 --- /dev/null +++ b/runtime-sdk/src/modules/consensus_inmsg/config.rs @@ -0,0 +1,15 @@ +use crate::modules; + +/// Incoming message handler configuration. +pub trait Config: 'static { + /// The accounts module to use. + type Accounts: modules::accounts::API; + /// The consensus module to use. + type Consensus: modules::consensus::API; + + /// Maximum number of outgoing consensus message slots that an incoming message can claim. + /// + /// When this is configured to be greater than zero it allows incoming messages to also emit + /// consensus messages as a result of executing a transaction. + const MAX_CONSENSUS_MSG_SLOTS_PER_TX: u32 = 1; +} diff --git a/runtime-sdk/src/modules/consensus_inmsg/events.rs b/runtime-sdk/src/modules/consensus_inmsg/events.rs new file mode 100644 index 0000000000..5b238d63bd --- /dev/null +++ b/runtime-sdk/src/modules/consensus_inmsg/events.rs @@ -0,0 +1,16 @@ +use super::MODULE_NAME; +use crate::error; + +/// Events emitted by the consensus incoming message handler module. +#[derive(Debug, cbor::Encode, oasis_runtime_sdk_macros::Event)] +#[cbor(untagged)] +pub enum Event { + #[sdk_event(code = 1)] + Processed { + id: u64, + #[cbor(optional)] + tag: u64, + #[cbor(optional)] + error: Option, + }, +} diff --git a/runtime-sdk/src/modules/consensus_inmsg/mod.rs b/runtime-sdk/src/modules/consensus_inmsg/mod.rs new file mode 100644 index 0000000000..7268fe4199 --- /dev/null +++ b/runtime-sdk/src/modules/consensus_inmsg/mod.rs @@ -0,0 +1,178 @@ +//! Handling incoming messages from the consensus layer. + +mod config; +mod events; +#[cfg(test)] +mod test; + +use std::convert::TryInto; + +use anyhow::anyhow; + +use crate::{ + context::{Context, Mode}, + core::consensus::roothash::IncomingMessage, + dispatcher::{self, DispatchResult}, + error::Error as _, + module::{CallResult, InMsgHandler, InMsgResult}, + modules::{ + accounts::API as _, + consensus::API as _, + core::{Error, API as _}, + }, + runtime::Runtime, + types::token, +}; + +pub use config::Config; +pub use events::Event; + +/// Unique module name. +const MODULE_NAME: &str = "consensus_inmsg"; + +/// Incoming message handler. +pub struct InMsgTx { + _cfg: std::marker::PhantomData, +} + +impl InMsgHandler for InMsgTx { + fn process_in_msg<'a, C: Context>(ctx: &mut C, in_msg: &'a IncomingMessage) -> InMsgResult<'a> { + // Determine whether we should stop processing incoming messages based on remaining gas. + let base_gas = ::Core::gas_costs(ctx).inmsg_base; + let max_batch_gas = ::Core::max_batch_gas(ctx); + let max_inmsg_gas = ::Core::max_inmsg_gas(ctx); + let remaining_gas = ::Core::remaining_batch_gas(ctx); + let min_remaining_gas = max_batch_gas + .saturating_sub(max_inmsg_gas) + .saturating_add(base_gas); + + if remaining_gas <= min_remaining_gas { + return InMsgResult::Stop; + } + + // By default, the address to mint the attached tokens into is the caller specified in the + // incoming message (authenticated by the consensus layer). + let mut mint_address = in_msg.caller.clone().into(); + let mut error = None; + + // Capture the result so we make sure to mint the tokens even in case of a bad transaction. + let mut result = match &in_msg.data[..] { + &[] => { + // The incoming message does not contain a transaciton. In this case we only perform + // the deposit and don't execute anything else. + InMsgResult::Skip + } + raw_tx => { + // The incoming message contains a transaction. In this case it must be a valid + // transaction and we execute it. If the transaction is malformed, it is skipped. + match dispatcher::Dispatcher::::decode_tx(ctx, raw_tx) { + Err(_) => { + error = Some(Error::MalformedTransaction(anyhow!("decoding failed"))); + InMsgResult::Skip + } + Ok(tx) => { + // In case the transaction is valid, the mint address is the signer of the + // contained transaction. + mint_address = tx.auth_info.signer_info[0].address_spec.address(); + + // In case the transaction cannot actually fit in the allocated space, skip + // as we will never be able to include it. + if tx.auth_info.fee.gas > max_inmsg_gas { + error = Some(Error::OutOfGas(max_inmsg_gas, tx.auth_info.fee.gas)); + InMsgResult::Skip + } else if tx.auth_info.fee.consensus_messages + > Cfg::MAX_CONSENSUS_MSG_SLOTS_PER_TX + { + error = Some(Error::OutOfMessageSlots); + InMsgResult::Skip + } else { + // If we don't have enough gas remaining to process the inner transaction, + // we need to stop processing incoming messages. + if tx.auth_info.fee.gas > remaining_gas { + return InMsgResult::Stop; + } + // Same if we don't have enough consensus message slots. + if tx.auth_info.fee.consensus_messages > ctx.remaining_messages() { + return InMsgResult::Stop; + } + + // We still need to do transaction checks. However those checks may + // fail if the minted tokens are not available. + // + // Given that a failing check can only change the result from Execute + // to Skip (but not Stop), this is fine. + InMsgResult::Execute(raw_tx, tx) + } + } + } + } + }; + + // Charge base gas for processing an incoming message. If there is not enough gas, stop + // processing further incoming messages. + if let Err(_) = ::Core::use_batch_gas(ctx, base_gas) { + return InMsgResult::Stop; + } + + // Mint tokens into the given address. + let amount_fee = + Cfg::Consensus::amount_from_consensus(ctx, in_msg.fee.clone().try_into().unwrap()) + .unwrap(); + let amount_deposit = + Cfg::Consensus::amount_from_consensus(ctx, in_msg.tokens.clone().try_into().unwrap()) + .unwrap(); + let denomination = Cfg::Consensus::consensus_denomination(ctx).unwrap(); + Cfg::Accounts::mint( + ctx, + mint_address, + &token::BaseUnits::new(amount_fee + amount_deposit, denomination.clone()), + ) + .unwrap(); + + // Move fee into the accumulator. + Cfg::Accounts::move_into_fee_accumulator( + ctx, + mint_address, + &token::BaseUnits::new(amount_fee, denomination), + ) + .unwrap(); + + // Perform transaction checks before deciding to execute the transaction. In case the check + // fails we do not bother executing the transaction and just skip it. + // + // This also takes care of potential duplicate transactions. + if let InMsgResult::Execute(raw_tx, tx) = result { + let check_result = ctx.with_child(Mode::CheckTx, |mut ctx| { + dispatcher::Dispatcher::::dispatch_tx( + &mut ctx, + raw_tx.len().try_into().unwrap(), + tx.clone(), + 0, + ) + }); + result = match check_result { + Err(err) => { + error = Some(Error::TxCheckFailed(err.into_serializable())); + InMsgResult::Skip + } + Ok(DispatchResult { + result: result @ CallResult::Failed { .. }, + .. + }) => { + error = Some(Error::TxCheckFailed(result.into())); + InMsgResult::Skip + } + _ => InMsgResult::Execute(raw_tx, tx), + }; + } + + // Emit incoming message processed event. + ctx.emit_event(Event::Processed { + id: in_msg.id, + tag: in_msg.tag, + error: error.map(Error::into_serializable), + }); + + result + } +} diff --git a/runtime-sdk/src/modules/consensus_inmsg/test.rs b/runtime-sdk/src/modules/consensus_inmsg/test.rs new file mode 100644 index 0000000000..13c7382d35 --- /dev/null +++ b/runtime-sdk/src/modules/consensus_inmsg/test.rs @@ -0,0 +1,355 @@ +use std::collections::BTreeMap; + +use crate::{ + context::{Context, Mode}, + core::consensus::roothash::IncomingMessage, + crypto::signature, + error::SerializableError, + event::IntoTags, + module::{InMsgHandler, InMsgResult, MigrationHandler, Module}, + modules, + runtime::Runtime, + testing::{keys, mock}, + types::{token, transaction}, + Version, +}; + +struct Config; + +impl modules::core::Config for Config {} + +impl super::Config for Config { + type Accounts = modules::accounts::Module; + type Consensus = modules::consensus::Module; +} + +type Core = modules::core::Module; + +type Accounts = modules::accounts::Module; + +type InMsgTx = super::InMsgTx; + +struct TestRuntime; + +impl Runtime for TestRuntime { + const VERSION: Version = Version::new(0, 0, 0); + + type Core = Core; + + type Modules = (Core, Accounts, modules::consensus::Module); + + fn genesis_state() -> ::Genesis { + Default::default() + } +} + +#[test] +fn test_process_in_msg_no_gas() { + let mut mock = mock::Mock::default(); + let mut ctx = mock.create_ctx(); + + Core::set_params( + ctx.runtime_state(), + modules::core::Parameters { + max_batch_gas: 1_000_000, + max_inmsg_gas: 0, + ..Default::default() + }, + ); + + let in_msg = IncomingMessage { + id: 42, + caller: keys::alice::address().into(), + tag: 1000, + fee: 1000u128.into(), + tokens: 2000u128.into(), + data: vec![], + }; + let decision = InMsgTx::process_in_msg(&mut ctx, &in_msg); + assert!( + matches!(decision, InMsgResult::Stop), + "should stop due to max_inmsg_gas being zero" + ); +} + +#[test] +fn test_process_in_msg_no_tx() { + let mut mock = mock::Mock::default(); + let mut ctx = mock.create_ctx(); + + Core::set_params( + ctx.runtime_state(), + modules::core::Parameters { + max_batch_gas: 1_000_000, + max_inmsg_gas: 500_000, + ..Default::default() + }, + ); + + let in_msg = IncomingMessage { + id: 42, + caller: keys::alice::address().into(), + tag: 1000, + fee: 1000u128.into(), + tokens: 2000u128.into(), + data: vec![], + }; + let decision = InMsgTx::process_in_msg(&mut ctx, &in_msg); + + assert!( + matches!(decision, InMsgResult::Skip), + "should skip as message does not contain a tx" + ); + + let (etags, _) = ctx.commit(); + let tags = etags.into_tags(); + assert_eq!(tags.len(), 2, "2 events should be emitted"); + assert_eq!(tags[0].key, b"accounts\x00\x00\x00\x03"); // accounts.Mint (code = 3) event + assert_eq!(tags[1].key, b"consensus_inmsg\x00\x00\x00\x01"); // consensus_inmsg.Processed (code = 1) event + + let expected_mint = modules::accounts::Event::Mint { + owner: keys::alice::address(), + amount: token::BaseUnits::new(3000, "TEST".parse().unwrap()), // Default consensus denomination is TEST. + }; + assert_eq!(tags[0].value, cbor::to_vec(vec![expected_mint])); + + let expected_processed = super::Event::Processed { + id: 42, + tag: 1000, + error: None, + }; + assert_eq!(tags[1].value, cbor::to_vec(vec![expected_processed])); +} + +#[test] +fn test_process_in_msg_tx_malformed() { + let mut mock = mock::Mock::default(); + let mut ctx = mock.create_ctx(); + + Core::set_params( + ctx.runtime_state(), + modules::core::Parameters { + max_batch_gas: 1_000_000, + max_inmsg_gas: 500_000, + ..Default::default() + }, + ); + + let in_msg = IncomingMessage { + id: 42, + caller: keys::alice::address().into(), + tag: 1000, + fee: 1000u128.into(), + tokens: 2000u128.into(), + data: b"not a valid transaction".to_vec(), + }; + let decision = InMsgTx::process_in_msg(&mut ctx, &in_msg); + + assert!( + matches!(decision, InMsgResult::Skip), + "should skip as tx is malformed" + ); + + let (etags, _) = ctx.commit(); + let tags = etags.into_tags(); + assert_eq!(tags.len(), 2, "2 events should be emitted"); + assert_eq!(tags[0].key, b"accounts\x00\x00\x00\x03"); // accounts.Mint (code = 3) event + assert_eq!(tags[1].key, b"consensus_inmsg\x00\x00\x00\x01"); // consensus_inmsg.Processed (code = 1) event + + let expected_mint = modules::accounts::Event::Mint { + owner: keys::alice::address(), + amount: token::BaseUnits::new(3000, "TEST".parse().unwrap()), // Default consensus denomination is TEST. + }; + assert_eq!(tags[0].value, cbor::to_vec(vec![expected_mint])); + + let expected_processed = super::Event::Processed { + id: 42, + tag: 1000, + error: Some(SerializableError { + module: "core".to_owned(), + code: 1, + message: "malformed transaction: decoding failed".to_owned(), + }), + }; + assert_eq!(tags[1].value, cbor::to_vec(vec![expected_processed])); +} + +#[test] +fn test_process_in_msg_tx() { + let _guard = signature::context::test_using_chain_context(); + let mut mock = mock::Mock::default(); + let mut ctx = mock.create_ctx_for_runtime::(Mode::ExecuteTx); + + Core::set_params( + ctx.runtime_state(), + modules::core::Parameters { + max_batch_gas: 1_000_000, + max_inmsg_gas: 500_000, + max_tx_size: 32 * 1024, + max_tx_signers: 1, + min_gas_price: BTreeMap::from([("TEST".parse().unwrap(), 0)]), + ..Default::default() + }, + ); + + signature::context::set_chain_context(Default::default(), "test"); + + let tx = transaction::Transaction { + version: transaction::LATEST_TRANSACTION_VERSION, + call: transaction::Call { + format: transaction::CallFormat::Plain, + method: "accounts.Transfer".to_owned(), + body: cbor::to_value(modules::accounts::types::Transfer { + to: keys::bob::address(), + amount: token::BaseUnits::new(1_000, "TEST".parse().unwrap()), + }), + ..Default::default() + }, + auth_info: transaction::AuthInfo { + signer_info: vec![transaction::SignerInfo::new_sigspec( + keys::alice::sigspec(), + 0, + )], + fee: transaction::Fee { + amount: token::BaseUnits::new(10, "TEST".parse().unwrap()), + gas: 1000, + consensus_messages: 0, + }, + ..Default::default() + }, + }; + let tx = cbor::to_vec(tx); + let signature = keys::alice::signer() + .context_sign( + &signature::context::get_chain_context_for(transaction::SIGNATURE_CONTEXT_BASE), + &tx, + ) + .unwrap(); + let utx = + transaction::UnverifiedTransaction(tx, vec![transaction::AuthProof::Signature(signature)]); + + let in_msg = IncomingMessage { + id: 42, + caller: keys::alice::address().into(), + tag: 1000, + fee: 1000u128.into(), + tokens: 2000u128.into(), + data: cbor::to_vec(utx), + }; + let decision = InMsgTx::process_in_msg(&mut ctx, &in_msg); + + assert!( + matches!(decision, InMsgResult::Execute(..)), + "should execute tx" + ); + + let (etags, _) = ctx.commit(); + let tags = etags.into_tags(); + assert_eq!(tags.len(), 2, "2 events should be emitted"); + assert_eq!(tags[0].key, b"accounts\x00\x00\x00\x03"); // accounts.Mint (code = 3) event + assert_eq!(tags[1].key, b"consensus_inmsg\x00\x00\x00\x01"); // consensus_inmsg.Processed (code = 1) event + + let expected_mint = modules::accounts::Event::Mint { + owner: keys::alice::address(), + amount: token::BaseUnits::new(3000, "TEST".parse().unwrap()), // Default consensus denomination is TEST. + }; + assert_eq!(tags[0].value, cbor::to_vec(vec![expected_mint])); + + let expected_processed = super::Event::Processed { + id: 42, + tag: 1000, + error: None, + }; + assert_eq!(tags[1].value, cbor::to_vec(vec![expected_processed])); +} + +#[test] +fn test_process_in_msg_tx_fail_checks() { + let _guard = signature::context::test_using_chain_context(); + let mut mock = mock::Mock::default(); + let mut ctx = mock.create_ctx_for_runtime::(Mode::ExecuteTx); + + Core::set_params( + ctx.runtime_state(), + modules::core::Parameters { + max_batch_gas: 1_000_000, + max_inmsg_gas: 500_000, + max_tx_size: 32 * 1024, + max_tx_signers: 1, + min_gas_price: BTreeMap::from([("TEST".parse().unwrap(), 0)]), + ..Default::default() + }, + ); + + signature::context::set_chain_context(Default::default(), "test"); + + let tx = transaction::Transaction { + version: transaction::LATEST_TRANSACTION_VERSION, + call: transaction::Call { + format: transaction::CallFormat::Plain, + method: "accounts.Transfer".to_owned(), + body: cbor::to_value(modules::accounts::types::Transfer { + to: keys::bob::address(), + amount: token::BaseUnits::new(1_000, "TEST".parse().unwrap()), + }), + ..Default::default() + }, + auth_info: transaction::AuthInfo { + signer_info: vec![transaction::SignerInfo::new_sigspec( + keys::alice::sigspec(), + 0, + )], + fee: transaction::Fee { + // Set a fee that we don't have the funds to pay. + amount: token::BaseUnits::new(10_000, "TEST".parse().unwrap()), + gas: 1000, + consensus_messages: 0, + }, + ..Default::default() + }, + }; + let tx = cbor::to_vec(tx); + let signature = keys::alice::signer() + .context_sign( + &signature::context::get_chain_context_for(transaction::SIGNATURE_CONTEXT_BASE), + &tx, + ) + .unwrap(); + let utx = + transaction::UnverifiedTransaction(tx, vec![transaction::AuthProof::Signature(signature)]); + + let in_msg = IncomingMessage { + id: 42, + caller: keys::alice::address().into(), + tag: 1000, + fee: 1000u128.into(), + tokens: 2000u128.into(), + data: cbor::to_vec(utx), + }; + let decision = InMsgTx::process_in_msg(&mut ctx, &in_msg); + + assert!(matches!(decision, InMsgResult::Skip), "should skip tx"); + + let (etags, _) = ctx.commit(); + let tags = etags.into_tags(); + assert_eq!(tags.len(), 2, "2 events should be emitted"); + assert_eq!(tags[0].key, b"accounts\x00\x00\x00\x03"); // accounts.Mint (code = 3) event + assert_eq!(tags[1].key, b"consensus_inmsg\x00\x00\x00\x01"); // consensus_inmsg.Processed (code = 1) event + + let expected_mint = modules::accounts::Event::Mint { + owner: keys::alice::address(), + amount: token::BaseUnits::new(3000, "TEST".parse().unwrap()), // Default consensus denomination is TEST. + }; + assert_eq!(tags[0].value, cbor::to_vec(vec![expected_mint])); + + let expected_processed = super::Event::Processed { + id: 42, + tag: 1000, + error: Some(SerializableError { + module: "core".to_owned(), + code: 5, + message: "check failed: insufficient balance to pay fees".to_owned(), + }), + }; + assert_eq!(tags[1].value, cbor::to_vec(vec![expected_processed])); +} diff --git a/runtime-sdk/src/modules/core/mod.rs b/runtime-sdk/src/modules/core/mod.rs index 1a0daee94d..4a67123097 100644 --- a/runtime-sdk/src/modules/core/mod.rs +++ b/runtime-sdk/src/modules/core/mod.rs @@ -13,7 +13,7 @@ use crate::{ callformat, context::{BatchContext, Context, TxContext}, dispatcher, - error::Error as SDKError, + error::{Error as SDKError, SerializableError}, module::{ self, CallResult, InvariantHandler as _, MethodHandler as _, Module as _, ModuleInfoHandler as _, @@ -132,6 +132,10 @@ pub enum Error { #[error("{0}")] #[sdk_error(transparent)] TxSimulationFailed(#[from] TxSimulationFailure), + + #[error("check failed: {0}")] + #[sdk_error(transparent)] + TxCheckFailed(#[from] SerializableError), } impl Error { @@ -214,12 +218,15 @@ pub struct GasCosts { pub auth_multisig_signer: u64, pub callformat_x25519_deoxysii: u64, + + pub inmsg_base: u64, } /// Parameters for the core module. #[derive(Clone, Debug, Default, cbor::Encode, cbor::Decode)] pub struct Parameters { pub max_batch_gas: u64, + pub max_inmsg_gas: u64, pub max_tx_size: u32, pub max_tx_signers: u32, pub max_multisig_signers: u32, @@ -228,7 +235,17 @@ pub struct Parameters { } impl module::Parameters for Parameters { - type Error = std::convert::Infallible; + type Error = Error; + + fn validate_basic(&self) -> Result<(), Self::Error> { + if self.max_inmsg_gas > self.max_batch_gas { + return Err(Error::InvalidArgument(anyhow!( + "max_inmsg_gas > max_batch_gas" + ))); + } + + Ok(()) + } } pub trait API { @@ -257,6 +274,12 @@ pub trait API { /// Configured maximum amount of gas that can be used in a batch. fn max_batch_gas(ctx: &mut C) -> u64; + /// Configured maximum amount of gas that can be used for incoming messages. + fn max_inmsg_gas(ctx: &mut C) -> u64; + + /// Gas costs related to the core module. + fn gas_costs(ctx: &mut C) -> GasCosts; + /// Configured minimum gas price. fn min_gas_price(ctx: &mut C, denom: &token::Denomination) -> u128; @@ -424,6 +447,14 @@ impl API for Module { Self::params(ctx.runtime_state()).max_batch_gas } + fn max_inmsg_gas(ctx: &mut C) -> u64 { + Self::params(ctx.runtime_state()).max_inmsg_gas + } + + fn gas_costs(ctx: &mut C) -> GasCosts { + Self::params(ctx.runtime_state()).gas_costs + } + fn min_gas_price(ctx: &mut C, denom: &token::Denomination) -> u128 { Self::params(ctx.runtime_state()) .min_gas_price diff --git a/runtime-sdk/src/modules/core/test.rs b/runtime-sdk/src/modules/core/test.rs index 8b1fb8fe1e..7219172787 100644 --- a/runtime-sdk/src/modules/core/test.rs +++ b/runtime-sdk/src/modules/core/test.rs @@ -28,6 +28,7 @@ fn test_use_gas() { ctx.runtime_state(), Parameters { max_batch_gas: BLOCK_MAX_GAS, + max_inmsg_gas: 0, max_tx_size: 32 * 1024, max_tx_signers: 8, max_multisig_signers: 8, @@ -127,6 +128,7 @@ fn test_query_min_gas_price() { ctx.runtime_state(), Parameters { max_batch_gas: 10000, + max_inmsg_gas: 0, max_tx_size: 32 * 1024, max_tx_signers: 8, max_multisig_signers: 8, @@ -298,6 +300,7 @@ impl Runtime for GasWasterRuntime { super::Genesis { parameters: Parameters { max_batch_gas: u64::MAX, + max_inmsg_gas: 0, max_tx_size: 32 * 1024, max_tx_signers: 8, max_multisig_signers: 8, @@ -306,6 +309,7 @@ impl Runtime for GasWasterRuntime { auth_signature: Self::AUTH_SIGNATURE_GAS, auth_multisig_signer: Self::AUTH_MULTISIG_GAS, callformat_x25519_deoxysii: 0, + inmsg_base: 0, }, min_gas_price: { let mut mgp = BTreeMap::new(); @@ -659,6 +663,7 @@ fn test_approve_unverified_tx() { ctx.runtime_state(), Parameters { max_batch_gas: u64::MAX, + max_inmsg_gas: 0, max_tx_size: 32 * 1024, max_tx_signers: 2, max_multisig_signers: 2, @@ -758,6 +763,7 @@ fn test_min_gas_price() { ctx.runtime_state(), Parameters { max_batch_gas: u64::MAX, + max_inmsg_gas: 0, max_tx_size: 32 * 1024, max_tx_signers: 8, max_multisig_signers: 8, @@ -766,6 +772,7 @@ fn test_min_gas_price() { auth_signature: GasWasterRuntime::AUTH_SIGNATURE_GAS, auth_multisig_signer: GasWasterRuntime::AUTH_MULTISIG_GAS, callformat_x25519_deoxysii: 0, + inmsg_base: 0, }, min_gas_price: { let mut mgp = BTreeMap::new(); @@ -941,6 +948,7 @@ fn test_gas_used_events() { ctx.runtime_state(), Parameters { max_batch_gas: 1_000_000, + max_inmsg_gas: 0, max_tx_size: 32 * 1024, max_tx_signers: 8, max_multisig_signers: 8, diff --git a/runtime-sdk/src/modules/mod.rs b/runtime-sdk/src/modules/mod.rs index a26dd435cb..716c61005b 100644 --- a/runtime-sdk/src/modules/mod.rs +++ b/runtime-sdk/src/modules/mod.rs @@ -3,5 +3,6 @@ pub mod accounts; pub mod consensus; pub mod consensus_accounts; +pub mod consensus_inmsg; pub mod core; pub mod rewards; diff --git a/runtime-sdk/src/modules/rewards/mod.rs b/runtime-sdk/src/modules/rewards/mod.rs index 2f6a07e3fd..1005275150 100644 --- a/runtime-sdk/src/modules/rewards/mod.rs +++ b/runtime-sdk/src/modules/rewards/mod.rs @@ -8,7 +8,7 @@ use thiserror::Error; use crate::{ context::Context, core::consensus::beacon, - module::{self, Module as _, Parameters as _}, + module::{self, Module as _}, modules, sdk_derive, storage, types::address::{Address, SignatureAddressSpec}, }; @@ -99,11 +99,6 @@ impl Module {} impl Module { /// Initialize state from genesis. fn init(ctx: &mut C, genesis: Genesis) { - genesis - .parameters - .validate_basic() - .expect("invalid genesis parameters"); - // Set genesis parameters. Self::set_params(ctx.runtime_state(), genesis.parameters); } diff --git a/runtime-sdk/src/runtime.rs b/runtime-sdk/src/runtime.rs index ba9ce1e7cc..fb108a2ed7 100644 --- a/runtime-sdk/src/runtime.rs +++ b/runtime-sdk/src/runtime.rs @@ -16,8 +16,8 @@ use crate::{ crypto, dispatcher, keymanager::{KeyManagerClient, TrustedPolicySigners}, module::{ - BlockHandler, InvariantHandler, MethodHandler, MigrationHandler, ModuleInfoHandler, - TransactionHandler, + BlockHandler, InMsgDiscard, InMsgHandler, InvariantHandler, MethodHandler, + MigrationHandler, ModuleInfoHandler, TransactionHandler, }, modules, storage, }; @@ -37,6 +37,8 @@ pub trait Runtime { /// Module that provides the core API. type Core: modules::core::API; + /// Incoming message handler. + type IncomingMessagesHandler: InMsgHandler = InMsgDiscard; /// Supported modules. type Modules: TransactionHandler diff --git a/runtime-sdk/src/types/address.rs b/runtime-sdk/src/types/address.rs index 00b32297a1..9f2a75e04f 100644 --- a/runtime-sdk/src/types/address.rs +++ b/runtime-sdk/src/types/address.rs @@ -254,6 +254,12 @@ impl From
for ConsensusAddress { } } +impl From for Address { + fn from(addr: ConsensusAddress) -> Address { + Address::from_bytes(addr.as_ref()).unwrap() + } +} + #[cfg(test)] mod test { use super::*; diff --git a/tests/e2e/runtime.go b/tests/e2e/runtime.go index bd3b0f20cf..492b1f213d 100644 --- a/tests/e2e/runtime.go +++ b/tests/e2e/runtime.go @@ -139,11 +139,11 @@ func (sc *RuntimeScenario) Fixture() (*oasis.NetworkFixture, error) { Parameters: api.ConsensusParameters{ MaxAllowances: 10, }, - TotalSupply: *quantity.NewFromUint64(200), + TotalSupply: *quantity.NewFromUint64(300), Ledger: map[api.Address]*api.Account{ api.Address(testing.Alice.Address): { General: api.GeneralAccount{ - Balance: *quantity.NewFromUint64(100), + Balance: *quantity.NewFromUint64(200), Allowances: map[api.Address]quantity.Quantity{ api.NewRuntimeAddress(runtimeID): *quantity.NewFromUint64(100), }, @@ -198,6 +198,7 @@ func (sc *RuntimeScenario) Fixture() (*oasis.NetworkFixture, error) { TxnScheduler: registry.TxnSchedulerParameters{ MaxBatchSize: 1000, MaxBatchSizeBytes: 16 * 1024 * 1024, // 16 MB. + MaxInMessages: 128, BatchFlushTimeout: 1 * time.Second, ProposerTimeout: 30, }, diff --git a/tests/e2e/scenarios.go b/tests/e2e/scenarios.go index c1922723cb..8f5defafb7 100644 --- a/tests/e2e/scenarios.go +++ b/tests/e2e/scenarios.go @@ -35,7 +35,11 @@ var ( }) // SimpleConsensusRuntime is the simple-consensus runtime test. - SimpleConsensusRuntime *RuntimeScenario = NewRuntimeScenario("test-runtime-simple-consensus", []RunTestFunction{SimpleConsensusTest, ConsensusAccountsParametersTest}) + SimpleConsensusRuntime *RuntimeScenario = NewRuntimeScenario("test-runtime-simple-consensus", []RunTestFunction{ + SimpleConsensusTest, + ConsensusAccountsParametersTest, + ConsensusIncomingMessageBasicTest, + }) // SimpleEVMRuntime is the simple-evm runtime test. SimpleEVMRuntime *RuntimeScenario = NewRuntimeScenario("test-runtime-simple-evm", []RunTestFunction{ diff --git a/tests/e2e/simple_consensus.go b/tests/e2e/simple_consensus.go index 6cbc53b8ce..0fc3b18a4f 100644 --- a/tests/e2e/simple_consensus.go +++ b/tests/e2e/simple_consensus.go @@ -7,9 +7,12 @@ import ( "google.golang.org/grpc" + coreSignature "github.com/oasisprotocol/oasis-core/go/common/crypto/signature" "github.com/oasisprotocol/oasis-core/go/common/logging" "github.com/oasisprotocol/oasis-core/go/common/quantity" consensus "github.com/oasisprotocol/oasis-core/go/consensus/api" + "github.com/oasisprotocol/oasis-core/go/consensus/api/transaction" + roothash "github.com/oasisprotocol/oasis-core/go/roothash/api" staking "github.com/oasisprotocol/oasis-core/go/staking/api" "github.com/oasisprotocol/oasis-sdk/client-sdk/go/client" @@ -133,6 +136,28 @@ func makeWithdrawCheck(from types.Address, nonce uint64, to types.Address, amoun } } +func makeMintCheck(owner types.Address, amount types.BaseUnits) func(e client.DecodedEvent) bool { + return func(e client.DecodedEvent) bool { + ae, ok := e.(*accounts.Event) + if !ok { + return false + } + if ae.Mint == nil { + return false + } + if !ae.Mint.Owner.Equal(owner) { + return false + } + if ae.Mint.Amount.Amount.Cmp(&amount.Amount) != 0 { + return false + } + if ae.Mint.Amount.Denomination != amount.Denomination { + return false + } + return true + } +} + func SimpleConsensusTest(sc *RuntimeScenario, log *logging.Logger, conn *grpc.ClientConn, rtc client.RuntimeClient) error { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) defer cancel() @@ -341,3 +366,56 @@ func ConsensusAccountsParametersTest(sc *RuntimeScenario, log *logging.Logger, c return nil } + +// ConsensusIncomingMessageBasicTest tests handling of basic incoming messages. +func ConsensusIncomingMessageBasicTest(sc *RuntimeScenario, log *logging.Logger, conn *grpc.ClientConn, rtc client.RuntimeClient) error { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + defer cancel() + + cons := consensus.NewConsensusClient(conn) + consDenomination := types.Denomination("TEST") + + accounts := accounts.NewV1(rtc) + acCh, err := rtc.WatchEvents(ctx, []client.EventDecoder{accounts}, false) + if err != nil { + return err + } + + chainContext, err := cons.GetChainContext(ctx) + if err != nil { + return fmt.Errorf("failed to get chain context: %w", err) + } + + coreSignature.UnsafeResetChainContext() + coreSignature.SetChainContext(chainContext) + + // Generate a simple SubmitMsg transaction without any data. + tx := roothash.NewSubmitMsgTx(0, &transaction.Fee{Gas: 10_000}, &roothash.SubmitMsg{ + ID: runtimeID, + Fee: *quantity.NewFromUint64(10), + Tokens: *quantity.NewFromUint64(50), + }) + signer := testing.Alice.Signer.(interface{ Unwrap() coreSignature.Signer }).Unwrap() + sigTx, err := transaction.Sign(signer, tx) + if err != nil { + return fmt.Errorf("failed to sign SubmitMsg transaction: %w", err) + } + + err = cons.SubmitTx(ctx, sigTx) + if err != nil { + return fmt.Errorf("failed to execute SubmitMsg transaction: %w", err) + } + + // Wait for the message to be processed. + // NOTE: The test runtime uses a scaling factor of 1000 so all balances in the runtime are + // 1000x larger than in the consensus layer. + amount := types.NewBaseUnits(*quantity.NewFromUint64(60_000), consDenomination) + if err = ensureRuntimeEvent(log, acCh, makeMintCheck(testing.Alice.Address, amount)); err != nil { + return fmt.Errorf("ensuring alice mint runtime event: %w", err) + } + + // TODO: Test with transaction. + // TODO: Test with duplicate transactions (e.g. two different incoming msgs containing same transaction in same round). + + return nil +} diff --git a/tests/runtimes/benchmarking/src/lib.rs b/tests/runtimes/benchmarking/src/lib.rs index a43c1a3e80..38933519cc 100644 --- a/tests/runtimes/benchmarking/src/lib.rs +++ b/tests/runtimes/benchmarking/src/lib.rs @@ -47,6 +47,7 @@ impl sdk::Runtime for Runtime { modules::core::Genesis { parameters: modules::core::Parameters { max_batch_gas: 10_000_000, + max_inmsg_gas: 0, max_tx_size: 32 * 1024, max_tx_signers: 8, max_multisig_signers: 8, diff --git a/tests/runtimes/simple-consensus/src/lib.rs b/tests/runtimes/simple-consensus/src/lib.rs index ddb8687557..3269c5622c 100644 --- a/tests/runtimes/simple-consensus/src/lib.rs +++ b/tests/runtimes/simple-consensus/src/lib.rs @@ -7,6 +7,11 @@ pub struct Config; impl modules::core::Config for Config {} +impl modules::consensus_inmsg::Config for Config { + type Accounts = modules::accounts::Module; + type Consensus = modules::consensus::Module; +} + /// Simple consensus runtime. pub struct Runtime; @@ -22,6 +27,8 @@ impl sdk::Runtime for Runtime { type Core = modules::core::Module; + type IncomingMessagesHandler = modules::consensus_inmsg::InMsgTx; + type Modules = ( modules::accounts::Module, modules::consensus::Module, @@ -64,6 +71,7 @@ impl sdk::Runtime for Runtime { modules::core::Genesis { parameters: modules::core::Parameters { max_batch_gas: 10_000, + max_inmsg_gas: 5_000, max_tx_size: 32 * 1024, max_tx_signers: 8, max_multisig_signers: 8, diff --git a/tests/runtimes/simple-contracts/src/lib.rs b/tests/runtimes/simple-contracts/src/lib.rs index b3d2a1c190..2b9620f0eb 100644 --- a/tests/runtimes/simple-contracts/src/lib.rs +++ b/tests/runtimes/simple-contracts/src/lib.rs @@ -72,6 +72,7 @@ impl sdk::Runtime for Runtime { modules::core::Genesis { parameters: modules::core::Parameters { max_batch_gas: 10_000_000, + max_inmsg_gas: 0, max_tx_size: 512 * 1024, max_tx_signers: 8, max_multisig_signers: 8, diff --git a/tests/runtimes/simple-evm/src/lib.rs b/tests/runtimes/simple-evm/src/lib.rs index fbceacb3f0..89f6cc0fe2 100644 --- a/tests/runtimes/simple-evm/src/lib.rs +++ b/tests/runtimes/simple-evm/src/lib.rs @@ -77,6 +77,7 @@ impl sdk::Runtime for Runtime { modules::core::Genesis { parameters: modules::core::Parameters { max_batch_gas: 1_000_000, + max_inmsg_gas: 500_000, max_tx_size: 32 * 1024, max_tx_signers: 8, max_multisig_signers: 8, diff --git a/tests/runtimes/simple-keyvalue/src/lib.rs b/tests/runtimes/simple-keyvalue/src/lib.rs index 59739b489e..7ac96b7020 100644 --- a/tests/runtimes/simple-keyvalue/src/lib.rs +++ b/tests/runtimes/simple-keyvalue/src/lib.rs @@ -126,6 +126,7 @@ impl sdk::Runtime for Runtime { modules::core::Genesis { parameters: modules::core::Parameters { max_batch_gas: 2_000, + max_inmsg_gas: 1_000, max_tx_size: 32 * 1024, max_tx_signers: 8, max_multisig_signers: 8, @@ -134,6 +135,7 @@ impl sdk::Runtime for Runtime { auth_signature: 10, auth_multisig_signer: 10, callformat_x25519_deoxysii: 50, + inmsg_base: 10, }, min_gas_price: { let mut mgp = BTreeMap::new(); diff --git a/tests/runtimes/simple-keyvalue/src/test.rs b/tests/runtimes/simple-keyvalue/src/test.rs index f6be5ebd67..8e6a4f86b0 100644 --- a/tests/runtimes/simple-keyvalue/src/test.rs +++ b/tests/runtimes/simple-keyvalue/src/test.rs @@ -16,6 +16,7 @@ fn test_impl_for_tuple() { ctx.runtime_state(), core::Parameters { max_batch_gas: u64::MAX, + max_inmsg_gas: 0, max_tx_size: 32 * 1024, max_tx_signers: 1, max_multisig_signers: 1,