diff --git a/Cargo.lock b/Cargo.lock index feaff9c45..e91e9f7ef 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1305,6 +1305,12 @@ dependencies = [ "serde", ] +[[package]] +name = "bytesize" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a3e368af43e418a04d52505cf3dbc23dda4e3407ae2fa99fd0e4f308ce546acc" + [[package]] name = "bzip2" version = "0.4.4" @@ -7486,6 +7492,7 @@ dependencies = [ "async-std", "async-trait", "bincode", + "bytesize", "clap", "cld", "commit", @@ -7510,6 +7517,7 @@ dependencies = [ "lazy_static", "portpicker", "rand 0.8.5", + "rand_chacha 0.3.1", "sequencer-utils", "serde", "serde_json", diff --git a/Cargo.toml b/Cargo.toml index 65edf1998..9d8dee4fd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,4 +30,6 @@ jf-utils = { git = "https://github.com/EspressoSystems/jellyfish" } surf-disco = { git = "https://github.com/EspressoSystems/surf-disco", tag = "v0.4.6" } tide-disco = { git = "https://github.com/EspressoSystems/tide-disco", tag = "v0.4.6" } +bytesize = "1.3" itertools = "0.12" +rand_chacha = "0.3" diff --git a/sequencer/Cargo.toml b/sequencer/Cargo.toml index 02749d71e..ca985a704 100644 --- a/sequencer/Cargo.toml +++ b/sequencer/Cargo.toml @@ -27,6 +27,7 @@ async-compatibility-layer = { git = "https://github.com/EspressoSystems/async-co async-std = "1.12.0" async-trait = "0.1.75" bincode = "1.3.3" +bytesize = { workspace = true } clap = { version = "4.3", features = ["derive", "env"] } cld = "0.5" commit = { git = "https://github.com/EspressoSystems/commit" } @@ -39,6 +40,7 @@ futures = "0.3" include_dir = "0.7" itertools = { workspace = true } lazy_static = "1.4" +rand_chacha = "0.3" serde_json = "1.0" sha2 = "0.10" # TODO temporary, used only for VID, should be set in hotshot time = "0.3" diff --git a/sequencer/src/bin/orchestrator.rs b/sequencer/src/bin/orchestrator.rs index a6d67562a..20402c883 100644 --- a/sequencer/src/bin/orchestrator.rs +++ b/sequencer/src/bin/orchestrator.rs @@ -1,10 +1,9 @@ use async_compatibility_layer::logging::{setup_backtrace, setup_logging}; use clap::Parser; -use cld::ClDuration; use hotshot::traits::election::static_committee::StaticElectionConfig; use hotshot::types::SignatureKey; use hotshot_orchestrator::{config::NetworkConfig, run_orchestrator}; -use sequencer::{PubKey, MAX_NMT_DEPTH}; +use sequencer::{options::parse_duration, PubKey, MAX_NMT_DEPTH}; use snafu::Snafu; use std::fmt::{self, Display, Formatter}; use std::num::{NonZeroUsize, ParseIntError}; @@ -108,19 +107,6 @@ struct Args { max_transactions: Option, } -#[derive(Clone, Debug, Snafu)] -struct ParseDurationError { - reason: String, -} - -fn parse_duration(s: &str) -> Result { - ClDuration::from_str(s) - .map(Duration::from) - .map_err(|err| ParseDurationError { - reason: err.to_string(), - }) -} - #[derive(Clone, Copy, Debug, PartialEq, Eq)] struct Ratio { numerator: u64, diff --git a/sequencer/src/bin/submit-transactions.rs b/sequencer/src/bin/submit-transactions.rs new file mode 100644 index 000000000..17c8615e4 --- /dev/null +++ b/sequencer/src/bin/submit-transactions.rs @@ -0,0 +1,188 @@ +//! Utility program to submit random transactions to an Espresso Sequencer. + +use async_compatibility_layer::logging::{setup_backtrace, setup_logging}; +use async_std::task::{sleep, spawn}; +use bytesize::ByteSize; +use clap::Parser; +use commit::{Commitment, Committable}; +use derive_more::From; +use futures::{ + channel::mpsc::{self, Sender}, + sink::SinkExt, + stream::StreamExt, +}; +use hotshot_query_service::{ + availability::{BlockQueryData, QueryablePayload}, + Error, +}; +use rand::{Rng, RngCore, SeedableRng}; +use rand_chacha::ChaChaRng; +use sequencer::{options::parse_duration, SeqTypes, Transaction}; +use snafu::Snafu; +use std::{collections::HashSet, time::Duration}; +use surf_disco::{Client, Url}; + +/// Submit random transactions to an Espresso Sequencer. +#[derive(Clone, Debug, Parser)] +struct Options { + /// Minimum size of transaction to submit. + #[clap(long, default_value = "1", value_parser = parse_size)] + min_size: usize, + + /// Maximum size of transaction to submit. + #[clap(long, default_value = "1kb", value_parser = parse_size)] + max_size: usize, + + /// Minimum namespace ID to submit to. + #[clap(long, default_value = "10000")] + min_namespace: u64, + + /// Maximum namespace ID to submit to. + #[clap(long, default_value = "10010")] + max_namespace: u64, + + /// Optional delay between submitting transactions. + /// + /// Can be used to moderate the rate of submission. + #[clap(long, value_parser = parse_duration)] + delay: Option, + + /// Maximum number of unprocessed transaction submissions. + /// + /// This can be used to apply backpressure so that the tasks submitting transactions do not get + /// too far ahead of the task processing results. + #[clap(long, default_value = "1000")] + channel_bound: usize, + + /// Seed for reproducible randomness. + #[clap(long)] + seed: Option, + + /// Number of parallel tasks to run. + #[clap(short, long, default_value = "1")] + jobs: usize, + + /// URL of the query service. + url: Url, +} + +#[derive(Clone, Debug, From, Snafu)] +struct ParseSizeError { + msg: String, +} + +fn parse_size(s: &str) -> Result { + Ok(s.parse::()?.0 as usize) +} + +#[async_std::main] +async fn main() { + setup_backtrace(); + setup_logging(); + + let opt = Options::parse(); + let (sender, mut receiver) = mpsc::channel(opt.channel_bound); + + let seed = opt.seed.unwrap_or_else(random_seed); + tracing::info!("PRNG seed: {seed}"); + let mut rng = ChaChaRng::seed_from_u64(seed); + + // Subscribe to block stream so we can check that our transactions are getting sequenced. + let client = Client::::new(opt.url.clone()); + let block_height: usize = client + .get("status/latest_block_height") + .send() + .await + .unwrap(); + let mut blocks = client + .socket(&format!("availability/stream/blocks/{}", block_height - 1)) + .subscribe() + .await + .unwrap(); + tracing::info!("listening for blocks starting at {block_height}"); + + // Spawn tasks to submit transactions. + for _ in 0..opt.jobs { + spawn(submit_transactions( + opt.clone(), + sender.clone(), + ChaChaRng::from_rng(&mut rng).unwrap(), + )); + } + + // Keep track of the results. + let mut pending = HashSet::new(); + while let Some(block) = blocks.next().await { + let block: BlockQueryData = match block { + Ok(block) => block, + Err(err) => { + tracing::warn!("error getting block: {err}"); + continue; + } + }; + tracing::info!("got block {}", block.height()); + + // Get all transactions which were submitted before this block. + while let Ok(Some(tx)) = receiver.try_next() { + pending.insert(tx); + } + + // Clear pending transactions from the block. + for (_, tx) in block.payload().enumerate() { + if pending.remove(&tx.commit()) { + tracing::debug!("got transaction {}", tx.commit()); + } + } + + tracing::info!("{} transactions still pending", pending.len()); + } + tracing::info!( + "block stream ended with {} transactions still pending", + pending.len() + ); +} + +async fn submit_transactions( + opt: Options, + mut sender: Sender>, + mut rng: ChaChaRng, +) { + let client = Client::::new(opt.url.clone()); + loop { + let tx = random_transaction(&opt, &mut rng); + let hash = tx.commit(); + tracing::debug!( + "submitting transaction {hash} for namespace {} of size {}", + tx.vm(), + tx.payload().len() + ); + if let Err(err) = client + .post::<()>("submit/submit") + .body_binary(&tx) + .unwrap() + .send() + .await + { + tracing::error!("failed to submit transaction: {err}"); + } + sender.send(hash).await.ok(); + + if let Some(delay) = opt.delay { + sleep(delay).await; + } + } +} + +fn random_transaction(opt: &Options, rng: &mut ChaChaRng) -> Transaction { + let vm = rng.gen_range(opt.min_namespace..=opt.max_namespace); + + let len = rng.gen_range(opt.min_size..=opt.max_size); + let mut payload = vec![0; len]; + rng.fill_bytes(&mut payload); + + Transaction::new(vm.into(), payload) +} + +fn random_seed() -> u64 { + ChaChaRng::from_entropy().next_u64() +} diff --git a/sequencer/src/options.rs b/sequencer/src/options.rs index 815cacb3a..0c7fabb6a 100644 --- a/sequencer/src/options.rs +++ b/sequencer/src/options.rs @@ -95,11 +95,11 @@ impl Options { } #[derive(Clone, Debug, Snafu)] -struct ParseDurationError { +pub struct ParseDurationError { reason: String, } -fn parse_duration(s: &str) -> Result { +pub fn parse_duration(s: &str) -> Result { ClDuration::from_str(s) .map(Duration::from) .map_err(|err| ParseDurationError { diff --git a/sequencer/src/vm.rs b/sequencer/src/vm.rs index 75358e63a..070ea6708 100644 --- a/sequencer/src/vm.rs +++ b/sequencer/src/vm.rs @@ -1,5 +1,5 @@ use ark_serialize::{CanonicalDeserialize, CanonicalSerialize}; -use derive_more::{From, Into}; +use derive_more::{Display, From, Into}; use jf_primitives::merkle_tree::namespaced_merkle_tree::Namespace; use serde::{Deserialize, Serialize}; @@ -11,6 +11,7 @@ use crate::transaction::{ApplicationTransaction, Transaction}; Serialize, Deserialize, Debug, + Display, PartialEq, Eq, Hash, @@ -22,6 +23,7 @@ use crate::transaction::{ApplicationTransaction, Transaction}; PartialOrd, Ord, )] +#[display(fmt = "{_0}")] pub struct VmId(pub(crate) u64); pub trait Vm {