diff --git a/.env b/.env index b6806503b..e3f99d775 100644 --- a/.env +++ b/.env @@ -15,21 +15,21 @@ ASYNC_STD_THREAD_COUNT=1 PROVER_RAYON_NUM_THREADS=2 # Internal port inside container -ESPRESSO_CDN_SERVER_PORT=40000 +ESPRESSO_CDN_SERVER_PORT=30000 ESPRESSO_CDN_SERVER_METRICS_PORT=9090 -ESPRESSO_ORCHESTRATOR_PORT=40001 +ESPRESSO_ORCHESTRATOR_PORT=30001 ESPRESSO_ORCHESTRATOR_NUM_NODES=5 ESPRESSO_ORCHESTRATOR_START_DELAY=5s ESPRESSO_ORCHESTRATOR_NEXT_VIEW_TIMEOUT=12s ESPRESSO_ORCHESTRATOR_BUILDER_TIMEOUT=2s ESPRESSO_SEQUENCER_CDN_ENDPOINT=marshal-0:${ESPRESSO_CDN_SERVER_PORT} ESPRESSO_SEQUENCER_ORCHESTRATOR_URL=http://orchestrator:${ESPRESSO_ORCHESTRATOR_PORT} -ESPRESSO_SEQUENCER_API_PORT=44000 -ESPRESSO_SEQUENCER_HOTSHOT_EVENT_STREAMING_API_PORT=42000 -ESPRESSO_SEQUENCER1_API_PORT=44001 -ESPRESSO_SEQUENCER2_API_PORT=44002 -ESPRESSO_SEQUENCER3_API_PORT=44003 -ESPRESSO_SEQUENCER4_API_PORT=44004 +ESPRESSO_SEQUENCER_API_PORT=24000 +ESPRESSO_SEQUENCER_HOTSHOT_EVENT_STREAMING_API_PORT=22000 +ESPRESSO_SEQUENCER1_API_PORT=24001 +ESPRESSO_SEQUENCER2_API_PORT=24002 +ESPRESSO_SEQUENCER3_API_PORT=24003 +ESPRESSO_SEQUENCER4_API_PORT=24004 ESPRESSO_SEQUENCER_MAX_BLOCK_SIZE=1mb ESPRESSO_SEQUENCER_BASE_FEE=1 ESPRESSO_SEQUENCER_URL=http://sequencer0:${ESPRESSO_SEQUENCER_API_PORT} @@ -42,10 +42,10 @@ ESPRESSO_SEQUENCER_L1_EVENTS_MAX_BLOCK_RANGE=1 ESPRESSO_SEQUENCER_ETH_MNEMONIC="test test test test test test test test test test test junk" # The first account is the permission less builder, the last are sequencer0 to 4 ESPRESSO_SEQUENCER_PREFUNDED_BUILDER_ACCOUNTS=0x23618e81E3f5cdF7f54C3d65f7FBc0aBf5B21E8f -ESPRESSO_COMMITMENT_TASK_PORT=60000 +ESPRESSO_COMMITMENT_TASK_PORT=30010 ESPRESSO_SEQUENCER0_DB_PORT=5432 ESPRESSO_SEQUENCER1_DB_PORT=5433 -ESPRESSO_STATE_RELAY_SERVER_PORT=40004 +ESPRESSO_STATE_RELAY_SERVER_PORT=30011 ESPRESSO_STATE_RELAY_SERVER_URL=http://state-relay-server:${ESPRESSO_STATE_RELAY_SERVER_PORT} ESPRESSO_BLOCK_EXPLORER_PORT=3000 @@ -90,16 +90,16 @@ ETHERSCAN_API_KEY="placeholder" # Temporary flags for state relay server, should remove after integrating with stake table # Related issue: [https://github.com/EspressoSystems/espresso-sequencer/issues/1022] -ESPRESSO_STATE_SIGNATURE_WEIGHT_THRESHOLD=3 +ESPRESSO_STATE_SIGNATURE_TOTAL_STAKE=5 # Prover service -ESPRESSO_PROVER_SERVICE_PORT=40050 +ESPRESSO_PROVER_SERVICE_PORT=30050 ESPRESSO_STATE_PROVER_UPDATE_INTERVAL=10m # Builder service ESPRESSO_BUILDER_L1_PROVIDER=${ESPRESSO_SEQUENCER_L1_PROVIDER} ESPRESSO_BUILDER_ETH_MNEMONIC=${ESPRESSO_SEQUENCER_ETH_MNEMONIC} -ESPRESSO_BUILDER_SERVER_PORT=41003 +ESPRESSO_BUILDER_SERVER_PORT=31003 ESPRESSO_BUILDER_CHANNEL_CAPACITY=1024 ESPRESSO_BUILDER_INIT_NODE_COUNT=$ESPRESSO_ORCHESTRATOR_NUM_NODES ESPRESSO_BUILDER_BOOTSTRAPPED_VIEW=0 @@ -108,14 +108,14 @@ ESPRESSO_BUILDER_BUFFER_VIEW_NUM_COUNT=50 # Load generator ESPRESSO_SUBMIT_TRANSACTIONS_DELAY=2s -ESPRESSO_SUBMIT_TRANSACTIONS_PUBLIC_PORT=44010 -ESPRESSO_SUBMIT_TRANSACTIONS_PRIVATE_PORT=44020 +ESPRESSO_SUBMIT_TRANSACTIONS_PUBLIC_PORT=24010 +ESPRESSO_SUBMIT_TRANSACTIONS_PRIVATE_PORT=24020 # Query service stress test -ESPRESSO_NASTY_CLIENT_PORT=44011 +ESPRESSO_NASTY_CLIENT_PORT=24011 # Query service stress test -ESPRESSO_NASTY_CLIENT_PORT=44011 +ESPRESSO_NASTY_CLIENT_PORT=24011 # Openzeppelin Defender Deployment Profile DEFENDER_KEY= diff --git a/docker-compose.yaml b/docker-compose.yaml index 883cfbbc9..32095ee03 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -138,7 +138,7 @@ services: - "$ESPRESSO_STATE_RELAY_SERVER_PORT:$ESPRESSO_STATE_RELAY_SERVER_PORT" environment: - ESPRESSO_STATE_RELAY_SERVER_PORT - - ESPRESSO_STATE_SIGNATURE_WEIGHT_THRESHOLD + - ESPRESSO_STATE_SIGNATURE_TOTAL_STAKE - RUST_LOG - RUST_LOG_FORMAT - ASYNC_STD_THREAD_COUNT @@ -223,7 +223,7 @@ services: image: ghcr.io/espressosystems/espresso-sequencer/sequencer:main ports: - "$ESPRESSO_SEQUENCER1_API_PORT:$ESPRESSO_SEQUENCER_API_PORT" - command: sequencer -- storage-sql -- http -- query -- catchup -- state -- explorer + command: sequencer -- storage-sql -- http -- query -- catchup -- status -- state -- explorer environment: - ESPRESSO_SEQUENCER_ORCHESTRATOR_URL - ESPRESSO_SEQUENCER_CDN_ENDPOINT @@ -462,7 +462,7 @@ services: - "$ESPRESSO_NASTY_CLIENT_PORT:$ESPRESSO_NASTY_CLIENT_PORT" environment: # Point the nasty client at sequencer1, the only one running the state API. - - ESPRESSO_SEQUENCER_URL=http://localhost:$ESPRESSO_SEQUENCER1_API_PORT + - ESPRESSO_SEQUENCER_URL=http://sequencer1:$ESPRESSO_SEQUENCER_API_PORT - ESPRESSO_NASTY_CLIENT_PORT - RUST_LOG - RUST_LOG_FORMAT diff --git a/hotshot-state-prover/src/bin/state-prover.rs b/hotshot-state-prover/src/bin/state-prover.rs index 1d9f94fb2..3e2f33082 100644 --- a/hotshot-state-prover/src/bin/state-prover.rs +++ b/hotshot-state-prover/src/bin/state-prover.rs @@ -29,6 +29,10 @@ struct Args { #[clap(short, long = "freq", value_parser = parse_duration, default_value = "10m", env = "ESPRESSO_STATE_PROVER_UPDATE_INTERVAL")] update_interval: Duration, + /// Interval between retries if a state update fails + #[clap(long = "retry-freq", value_parser = parse_duration, default_value = "2s", env = "ESPRESSO_STATE_PROVER_RETRY_INTERVAL")] + retry_interval: Duration, + /// URL of layer 1 Ethereum JSON-RPC provider. #[clap( long, @@ -99,6 +103,7 @@ async fn main() { let config = StateProverConfig { relay_server: args.relay_server.clone(), update_interval: args.update_interval, + retry_interval: args.retry_interval, l1_provider: args.l1_provider.clone(), light_client_address: args.light_client_address, eth_signing_key: MnemonicBuilder::::default() diff --git a/hotshot-state-prover/src/service.rs b/hotshot-state-prover/src/service.rs index 7d9448c06..afaff42f8 100644 --- a/hotshot-state-prover/src/service.rs +++ b/hotshot-state-prover/src/service.rs @@ -5,7 +5,7 @@ use anyhow::anyhow; use async_std::{ io, sync::Arc, - task::{sleep, spawn}, + task::{sleep, spawn, spawn_blocking}, }; use contract_bindings::light_client::{LightClient, LightClientErrors}; use displaydoc::Display; @@ -61,6 +61,8 @@ pub struct StateProverConfig { pub relay_server: Url, /// Interval between light client state update pub update_interval: Duration, + /// Interval between retries if a state update fails + pub retry_interval: Duration, /// URL of layer 1 Ethereum JSON-RPC provider. pub l1_provider: Url, /// Address of LightClient contract on layer 1. @@ -274,13 +276,14 @@ pub async fn submit_state_and_proof( pub async fn sync_state( st: &StakeTable, - proving_key: &ProvingKey, + proving_key: Arc, relay_server_client: &Client, config: &StateProverConfig, ) -> Result<(), ProverError> { tracing::info!("Start syncing light client state."); let bundle = fetch_latest_state(relay_server_client).await?; + tracing::info!("Bundle accumulated weight: {}", bundle.accumulated_weight); tracing::info!("Latest HotShot block height: {}", bundle.state.block_height); let old_state = read_contract_state(config).await?; tracing::info!( @@ -322,24 +325,22 @@ pub async fn sync_state( )); } - // TODO this assert fails. See https://github.com/EspressoSystems/espresso-sequencer/issues/1161 - // assert_eq!( - // bundle.state.stake_table_comm, - // st.commitment(SnapshotVersion::LastEpochStart).unwrap() - // ); - tracing::info!("Collected latest state and signatures. Start generating SNARK proof."); let proof_gen_start = Instant::now(); - let (proof, public_input) = generate_state_update_proof::<_, _, _, _>( - &mut ark_std::rand::thread_rng(), - proving_key, - &entries, - signer_bit_vec, - signatures, - &bundle.state, - &threshold, - config.stake_table_capacity, - )?; + let stake_table_capacity = config.stake_table_capacity; + let (proof, public_input) = spawn_blocking(move || { + generate_state_update_proof::<_, _, _, _>( + &mut ark_std::rand::thread_rng(), + &proving_key, + &entries, + signer_bit_vec, + signatures, + &bundle.state, + &threshold, + stake_table_capacity, + ) + }) + .await?; let proof_gen_elapsed = Instant::now().signed_duration_since(proof_gen_start); tracing::info!("Proof generation completed. Elapsed: {proof_gen_elapsed:.3}"); @@ -394,24 +395,21 @@ pub async fn run_prover_service( } } - let proving_key = async_std::task::block_on(async move { - Arc::new(load_proving_key(config.stake_table_capacity)) - }); + let stake_table_capacity = config.stake_table_capacity; + let proving_key = + spawn_blocking(move || Arc::new(load_proving_key(stake_table_capacity))).await; let update_interval = config.update_interval; + let retry_interval = config.retry_interval; loop { - let st = st.clone(); - let proving_key = proving_key.clone(); - let relay_server_client = relay_server_client.clone(); - let config = config.clone(); - // Use block_on to avoid blocking the async runtime with this computationally heavy task - async_std::task::block_on(async move { - if let Err(err) = sync_state(&st, &proving_key, &relay_server_client, &config).await { - tracing::error!("Cannot sync the light client state: {}", err); - } - }); - tracing::info!("Sleeping for {:?}", update_interval); - sleep(update_interval).await; + if let Err(err) = sync_state(&st, proving_key.clone(), &relay_server_client, &config).await + { + tracing::error!("Cannot sync the light client state, will retry: {}", err); + sleep(retry_interval).await; + } else { + tracing::info!("Sleeping for {:?}", update_interval); + sleep(update_interval).await; + } } } @@ -420,10 +418,12 @@ pub async fn run_prover_once(config: StateProverConfig, let st = init_stake_table_from_orchestrator(&config.orchestrator_url, config.stake_table_capacity) .await; - let proving_key = load_proving_key(config.stake_table_capacity); + let stake_table_capacity = config.stake_table_capacity; + let proving_key = + spawn_blocking(move || Arc::new(load_proving_key(stake_table_capacity))).await; let relay_server_client = Client::::new(config.relay_server.clone()); - sync_state(&st, &proving_key, &relay_server_client, &config) + sync_state(&st, proving_key, &relay_server_client, &config) .await .expect("Error syncing the light client state."); } @@ -624,6 +624,7 @@ mod test { Self { relay_server: Url::parse("http://localhost").unwrap(), update_interval: Duration::default(), + retry_interval: Duration::default(), l1_provider: Url::parse("http://localhost").unwrap(), light_client_address: Address::default(), eth_signing_key: SigningKey::random(&mut test_rng()), diff --git a/sequencer/src/bin/state-relay-server.rs b/sequencer/src/bin/state-relay-server.rs index cb14dcc62..2aef99c31 100644 --- a/sequencer/src/bin/state-relay-server.rs +++ b/sequencer/src/bin/state-relay-server.rs @@ -14,16 +14,15 @@ struct Args { )] port: u16, - /// Threshold to form an available state signature package. + /// Total amount of stake. /// WARNING: this is a temporary flag, should remove after integrating with stake table. /// Related issue: [https://github.com/EspressoSystems/espresso-sequencer/issues/1022] #[clap( - short, long, - env = "ESPRESSO_STATE_SIGNATURE_WEIGHT_THRESHOLD", - default_value = "3" + env = "ESPRESSO_STATE_SIGNATURE_TOTAL_STAKE", + default_value = "5" )] - threshold: u64, + total_stake: u64, } #[async_std::main] @@ -32,11 +31,12 @@ async fn main() { setup_backtrace(); let args = Args::parse(); + let threshold = ((2 * args.total_stake) / 3) + 1; - tracing::info!("starting state relay server on port {}", args.port); + tracing::info!(port = args.port, threshold, "starting state relay server"); run_relay_server( None, - args.threshold, + threshold, format!("http://0.0.0.0:{}", args.port).parse().unwrap(), SEQUENCER_VERSION, ) diff --git a/sequencer/src/bin/submit-transactions.rs b/sequencer/src/bin/submit-transactions.rs index 8c76d457f..2dd1e38b0 100644 --- a/sequencer/src/bin/submit-transactions.rs +++ b/sequencer/src/bin/submit-transactions.rs @@ -41,6 +41,19 @@ struct Options { #[clap(long, name = "MAX_SIZE", default_value = "1kb", value_parser = parse_size, env = "ESPRESSO_SUBMIT_TRANSACTIONS_MAX_SIZE")] max_size: u64, + /// Minimum size of batch of transactions to submit. + /// + /// Batches will be a random count between MIN_BATCH_SIZE and MAX_BATCH_SIZE, with a falling distribution favoring smaller batches. + /// This is by selecting a random size S on each iteration I since last batch, and collecting a batch whenever that S <= I. + #[clap(long, name = "MIN_BATCH_SIZE", default_value = "1", value_parser = parse_size, env = "ESPRESSO_SUBMIT_TRANSACTIONS_MIN_BATCH_SIZE")] + min_batch_size: u64, + + /// Maximum size of batch of transactions to submit. + /// + /// Batches will be a random count between MIN_BATCH_SIZE and MAX_BATCH_SIZE, with a falling distribution favoring smaller batches. + #[clap(long, name = "MAX_BATCH_SIZE", default_value = "20", value_parser = parse_size, env = "ESPRESSO_SUBMIT_TRANSACTIONS_MAX_BATCH_SIZE")] + max_batch_size: u64, + /// Minimum namespace ID to submit to. #[clap( long, @@ -119,6 +132,9 @@ impl Options { .clone() .unwrap_or_else(|| self.url.join("submit").unwrap()) } + fn use_public_mempool(&self) -> bool { + self.submit_url.is_none() + } } #[async_std::main] @@ -241,28 +257,59 @@ async fn submit_transactions( // mean `opt.delay`, or parameter `\lambda = 1 / opt.delay`. let delay_distr = rand_distr::Exp::::new(1f64 / opt.delay.as_millis() as f64).unwrap(); + let mut txns = Vec::new(); + let mut hashes = Vec::new(); loop { let tx = random_transaction(&opt, &mut rng); let hash = tx.commit(); tracing::info!( - "submitting transaction {hash} for namespace {} of size {}", + "Adding transaction {hash} for namespace {} of size {}", tx.namespace(), tx.payload().len() ); - if let Err(err) = client - .post::<()>("submit") - .body_binary(&tx) - .unwrap() - .send() - .await - { - tracing::error!("failed to submit transaction: {err}"); + txns.push(tx); + hashes.push(hash); + + let randomized_batch_size = if opt.use_public_mempool() { + 1 + } else { + rng.gen_range(opt.min_batch_size..=opt.max_batch_size) + }; + let txns_batch_count = txns.len() as u64; + if randomized_batch_size <= txns_batch_count { + if let Err(err) = if txns_batch_count == 1 { + // occasionally test the 'submit' endpoint, just for coverage + client + .post::<()>("submit") + .body_binary(&txns[0]) + .unwrap() + .send() + .await + } else { + client + .post::<()>("batch") + .body_binary(&txns) + .unwrap() + .send() + .await + } { + tracing::error!("failed to submit batch of {txns_batch_count} transactions: {err}"); + } else { + tracing::info!("submitted batch of {txns_batch_count} transactions"); + let submitted_at = Instant::now(); + for hash in hashes.iter() { + sender + .send(SubmittedTransaction { + hash: *hash, + submitted_at, + }) + .await + .ok(); + } + } + txns.clear(); + hashes.clear(); } - let submitted_at = Instant::now(); - sender - .send(SubmittedTransaction { hash, submitted_at }) - .await - .ok(); let delay = Duration::from_millis(delay_distr.sample(&mut rng) as u64); tracing::info!("sleeping for {delay:?}");