Skip to content

Commit

Permalink
Merge pull request #907 from EspressoSystems/feat/random-transactions
Browse files Browse the repository at this point in the history
Add a program to submit random transactions
  • Loading branch information
jbearer authored Dec 26, 2023
2 parents afe873c + 4c8f79d commit e9127fc
Show file tree
Hide file tree
Showing 7 changed files with 206 additions and 18 deletions.
8 changes: 8 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,6 @@ jf-utils = { git = "https://github.com/EspressoSystems/jellyfish" }
surf-disco = { git = "https://github.com/EspressoSystems/surf-disco", tag = "v0.4.6" }
tide-disco = { git = "https://github.com/EspressoSystems/tide-disco", tag = "v0.4.6" }

bytesize = "1.3"
itertools = "0.12"
rand_chacha = "0.3"
2 changes: 2 additions & 0 deletions sequencer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ async-compatibility-layer = { git = "https://github.com/EspressoSystems/async-co
async-std = "1.12.0"
async-trait = "0.1.75"
bincode = "1.3.3"
bytesize = { workspace = true }
clap = { version = "4.3", features = ["derive", "env"] }
cld = "0.5"
commit = { git = "https://github.com/EspressoSystems/commit" }
Expand All @@ -39,6 +40,7 @@ futures = "0.3"
include_dir = "0.7"
itertools = { workspace = true }
lazy_static = "1.4"
rand_chacha = "0.3"
serde_json = "1.0"
sha2 = "0.10" # TODO temporary, used only for VID, should be set in hotshot
time = "0.3"
Expand Down
16 changes: 1 addition & 15 deletions sequencer/src/bin/orchestrator.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
use async_compatibility_layer::logging::{setup_backtrace, setup_logging};
use clap::Parser;
use cld::ClDuration;
use hotshot::traits::election::static_committee::StaticElectionConfig;
use hotshot::types::SignatureKey;
use hotshot_orchestrator::{config::NetworkConfig, run_orchestrator};
use sequencer::{PubKey, MAX_NMT_DEPTH};
use sequencer::{options::parse_duration, PubKey, MAX_NMT_DEPTH};
use snafu::Snafu;
use std::fmt::{self, Display, Formatter};
use std::num::{NonZeroUsize, ParseIntError};
Expand Down Expand Up @@ -108,19 +107,6 @@ struct Args {
max_transactions: Option<NonZeroUsize>,
}

#[derive(Clone, Debug, Snafu)]
struct ParseDurationError {
reason: String,
}

fn parse_duration(s: &str) -> Result<Duration, ParseDurationError> {
ClDuration::from_str(s)
.map(Duration::from)
.map_err(|err| ParseDurationError {
reason: err.to_string(),
})
}

#[derive(Clone, Copy, Debug, PartialEq, Eq)]
struct Ratio {
numerator: u64,
Expand Down
188 changes: 188 additions & 0 deletions sequencer/src/bin/submit-transactions.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
//! Utility program to submit random transactions to an Espresso Sequencer.
use async_compatibility_layer::logging::{setup_backtrace, setup_logging};
use async_std::task::{sleep, spawn};
use bytesize::ByteSize;
use clap::Parser;
use commit::{Commitment, Committable};
use derive_more::From;
use futures::{
channel::mpsc::{self, Sender},
sink::SinkExt,
stream::StreamExt,
};
use hotshot_query_service::{
availability::{BlockQueryData, QueryablePayload},
Error,
};
use rand::{Rng, RngCore, SeedableRng};
use rand_chacha::ChaChaRng;
use sequencer::{options::parse_duration, SeqTypes, Transaction};
use snafu::Snafu;
use std::{collections::HashSet, time::Duration};
use surf_disco::{Client, Url};

/// Submit random transactions to an Espresso Sequencer.
#[derive(Clone, Debug, Parser)]
struct Options {
/// Minimum size of transaction to submit.
#[clap(long, default_value = "1", value_parser = parse_size)]
min_size: usize,

/// Maximum size of transaction to submit.
#[clap(long, default_value = "1kb", value_parser = parse_size)]
max_size: usize,

/// Minimum namespace ID to submit to.
#[clap(long, default_value = "10000")]
min_namespace: u64,

/// Maximum namespace ID to submit to.
#[clap(long, default_value = "10010")]
max_namespace: u64,

/// Optional delay between submitting transactions.
///
/// Can be used to moderate the rate of submission.
#[clap(long, value_parser = parse_duration)]
delay: Option<Duration>,

/// Maximum number of unprocessed transaction submissions.
///
/// This can be used to apply backpressure so that the tasks submitting transactions do not get
/// too far ahead of the task processing results.
#[clap(long, default_value = "1000")]
channel_bound: usize,

/// Seed for reproducible randomness.
#[clap(long)]
seed: Option<u64>,

/// Number of parallel tasks to run.
#[clap(short, long, default_value = "1")]
jobs: usize,

/// URL of the query service.
url: Url,
}

#[derive(Clone, Debug, From, Snafu)]
struct ParseSizeError {
msg: String,
}

fn parse_size(s: &str) -> Result<usize, ParseSizeError> {
Ok(s.parse::<ByteSize>()?.0 as usize)
}

#[async_std::main]
async fn main() {
setup_backtrace();
setup_logging();

let opt = Options::parse();
let (sender, mut receiver) = mpsc::channel(opt.channel_bound);

let seed = opt.seed.unwrap_or_else(random_seed);
tracing::info!("PRNG seed: {seed}");
let mut rng = ChaChaRng::seed_from_u64(seed);

// Subscribe to block stream so we can check that our transactions are getting sequenced.
let client = Client::<Error>::new(opt.url.clone());
let block_height: usize = client
.get("status/latest_block_height")
.send()
.await
.unwrap();
let mut blocks = client
.socket(&format!("availability/stream/blocks/{}", block_height - 1))
.subscribe()
.await
.unwrap();
tracing::info!("listening for blocks starting at {block_height}");

// Spawn tasks to submit transactions.
for _ in 0..opt.jobs {
spawn(submit_transactions(
opt.clone(),
sender.clone(),
ChaChaRng::from_rng(&mut rng).unwrap(),
));
}

// Keep track of the results.
let mut pending = HashSet::new();
while let Some(block) = blocks.next().await {
let block: BlockQueryData<SeqTypes> = match block {
Ok(block) => block,
Err(err) => {
tracing::warn!("error getting block: {err}");
continue;
}
};
tracing::info!("got block {}", block.height());

// Get all transactions which were submitted before this block.
while let Ok(Some(tx)) = receiver.try_next() {
pending.insert(tx);
}

// Clear pending transactions from the block.
for (_, tx) in block.payload().enumerate() {
if pending.remove(&tx.commit()) {
tracing::debug!("got transaction {}", tx.commit());
}
}

tracing::info!("{} transactions still pending", pending.len());
}
tracing::info!(
"block stream ended with {} transactions still pending",
pending.len()
);
}

async fn submit_transactions(
opt: Options,
mut sender: Sender<Commitment<Transaction>>,
mut rng: ChaChaRng,
) {
let client = Client::<Error>::new(opt.url.clone());
loop {
let tx = random_transaction(&opt, &mut rng);
let hash = tx.commit();
tracing::debug!(
"submitting transaction {hash} for namespace {} of size {}",
tx.vm(),
tx.payload().len()
);
if let Err(err) = client
.post::<()>("submit/submit")
.body_binary(&tx)
.unwrap()
.send()
.await
{
tracing::error!("failed to submit transaction: {err}");
}
sender.send(hash).await.ok();

if let Some(delay) = opt.delay {
sleep(delay).await;
}
}
}

fn random_transaction(opt: &Options, rng: &mut ChaChaRng) -> Transaction {
let vm = rng.gen_range(opt.min_namespace..=opt.max_namespace);

let len = rng.gen_range(opt.min_size..=opt.max_size);
let mut payload = vec![0; len];
rng.fill_bytes(&mut payload);

Transaction::new(vm.into(), payload)
}

fn random_seed() -> u64 {
ChaChaRng::from_entropy().next_u64()
}
4 changes: 2 additions & 2 deletions sequencer/src/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,11 @@ impl Options {
}

#[derive(Clone, Debug, Snafu)]
struct ParseDurationError {
pub struct ParseDurationError {
reason: String,
}

fn parse_duration(s: &str) -> Result<Duration, ParseDurationError> {
pub fn parse_duration(s: &str) -> Result<Duration, ParseDurationError> {
ClDuration::from_str(s)
.map(Duration::from)
.map_err(|err| ParseDurationError {
Expand Down
4 changes: 3 additions & 1 deletion sequencer/src/vm.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use ark_serialize::{CanonicalDeserialize, CanonicalSerialize};
use derive_more::{From, Into};
use derive_more::{Display, From, Into};
use jf_primitives::merkle_tree::namespaced_merkle_tree::Namespace;
use serde::{Deserialize, Serialize};

Expand All @@ -11,6 +11,7 @@ use crate::transaction::{ApplicationTransaction, Transaction};
Serialize,
Deserialize,
Debug,
Display,
PartialEq,
Eq,
Hash,
Expand All @@ -22,6 +23,7 @@ use crate::transaction::{ApplicationTransaction, Transaction};
PartialOrd,
Ord,
)]
#[display(fmt = "{_0}")]
pub struct VmId(pub(crate) u64);

pub trait Vm {
Expand Down

0 comments on commit e9127fc

Please sign in to comment.