diff --git a/Cargo.lock b/Cargo.lock index 52eedcf5c..ef8632271 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2835,7 +2835,6 @@ dependencies = [ "contract-bindings", "derive_more 1.0.0", "diff-test-bn254", - "dyn-clone", "ethers", "fluent-asserter", "futures", @@ -8628,7 +8627,6 @@ dependencies = [ "derivative", "derive_more 1.0.0", "dotenvy", - "dyn-clone", "escargot", "espresso-macros", "espresso-types", diff --git a/Cargo.toml b/Cargo.toml index c52556dc4..630d32d08 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -55,7 +55,6 @@ cld = "0.5" derive_more = { version = "1.0", features = ["full"] } es-version = { git = "https://github.com/EspressoSystems/es-version.git", branch = "main" } dotenvy = "0.15" -dyn-clone = "1.0" ethers = { version = "2.0", features = ["solc", "ws"] } futures = "0.3" tokio = { version = "1", default-features = false, features = [ diff --git a/sequencer-sqlite/Cargo.lock b/sequencer-sqlite/Cargo.lock index f7c10eeea..0779e290f 100644 --- a/sequencer-sqlite/Cargo.lock +++ b/sequencer-sqlite/Cargo.lock @@ -2753,7 +2753,6 @@ dependencies = [ "contract-bindings", "derive_more 1.0.0", "diff-test-bn254", - "dyn-clone", "ethers", "fluent-asserter", "futures", @@ -8352,7 +8351,6 @@ dependencies = [ "derivative", "derive_more 1.0.0", "dotenvy", - "dyn-clone", "espresso-types", "ethers", "futures", diff --git a/sequencer/Cargo.toml b/sequencer/Cargo.toml index b95b182af..d05d74920 100644 --- a/sequencer/Cargo.toml +++ b/sequencer/Cargo.toml @@ -62,7 +62,6 @@ csv = "1" derivative = "2.2" derive_more = { workspace = true } dotenvy = { workspace = true } -dyn-clone = { workspace = true } espresso-types = { path = "../types" } ethers = { workspace = true } futures = { workspace = true } diff --git a/sequencer/api/migrations/postgres/V401__archive_provider.sql b/sequencer/api/migrations/postgres/V401__archive_provider.sql new file mode 100644 index 000000000..2c733d5b9 --- /dev/null +++ b/sequencer/api/migrations/postgres/V401__archive_provider.sql @@ -0,0 +1,21 @@ +-- Add information needed for consensus storage to act as a provider for archive recovery. + +-- Add payload hash to DA proposal, since the query service requests missing payloads by hash. +ALTER TABLE da_proposal + ADD COLUMN payload_hash VARCHAR; +CREATE INDEX da_proposal_payload_hash_idx ON da_proposal (payload_hash); + +-- Add payload hash to VID share, since the query service requests missing VID common by payload +-- hash. +ALTER TABLE vid_share + ADD COLUMN payload_hash VARCHAR; +CREATE INDEX vid_share_payload_hash_idx ON vid_share (payload_hash); + +-- Add QC storage, since the query service requires missing leaves to be fetched alongside a QC with +-- that leaf hash. +CREATE TABLE quorum_certificate ( + view BIGINT PRIMARY KEY, + leaf_hash VARCHAR NOT NULL, + data BYTEA NOT NULL +); +CREATE INDEX quorum_certificate_leaf_hash_idx ON quorum_certificate (leaf_hash); diff --git a/sequencer/api/migrations/sqlite/V201__archive_provider.sql b/sequencer/api/migrations/sqlite/V201__archive_provider.sql new file mode 100644 index 000000000..1c1779d32 --- /dev/null +++ b/sequencer/api/migrations/sqlite/V201__archive_provider.sql @@ -0,0 +1,21 @@ +-- Add information needed for consensus storage to act as a provider for archive recovery. + +-- Add payload hash to DA proposal, since the query service requests missing payloads by hash. +ALTER TABLE da_proposal + ADD COLUMN payload_hash VARCHAR; +CREATE INDEX da_proposal_payload_hash_idx ON da_proposal (payload_hash); + +-- Add payload hash to VID share, since the query service requests missing VID common by payload +-- hash. +ALTER TABLE vid_share + ADD COLUMN payload_hash VARCHAR; +CREATE INDEX vid_share_payload_hash_idx ON vid_share (payload_hash); + +-- Add QC storage, since the query service requires missing leaves to be fetched alongside a QC with +-- that leaf hash. +CREATE TABLE quorum_certificate ( + view BIGINT PRIMARY KEY, + leaf_hash VARCHAR NOT NULL, + data BLOB NOT NULL +); +CREATE INDEX quorum_certificate_leaf_hash_idx ON quorum_certificate (leaf_hash); diff --git a/sequencer/api/public-env-vars.toml b/sequencer/api/public-env-vars.toml index 9c9d20346..c58af6c35 100644 --- a/sequencer/api/public-env-vars.toml +++ b/sequencer/api/public-env-vars.toml @@ -60,6 +60,9 @@ variables = [ "ESPRESSO_SEQUENCER_CATCHUP_MAX_RETRY_DELAY", "ESPRESSO_SEQUENCER_CDN_ENDPOINT", "ESPRESSO_SEQUENCER_CHUNK_FETCH_DELAY", + "ESPRESSO_SEQUENCER_CONSENSUS_STORAGE_MINIMUM_RETENTION", + "ESPRESSO_SEQUENCER_CONSENSUS_STORAGE_TARGET_RETENTION", + "ESPRESSO_SEQUENCER_CONSENSUS_STORAGE_TARGET_USAGE", "ESPRESSO_SEQUENCER_FETCH_RATE_LIMIT", "ESPRESSO_SEQUENCER_HOTSHOT_ADDRESS", "ESPRESSO_SEQUENCER_HOTSHOT_EVENT_STREAMING_API_PORT", diff --git a/sequencer/src/api.rs b/sequencer/src/api.rs index a3cd5c593..21b1fe343 100644 --- a/sequencer/src/api.rs +++ b/sequencer/src/api.rs @@ -1066,10 +1066,15 @@ mod api_tests { }; use hotshot_types::drb::{INITIAL_DRB_RESULT, INITIAL_DRB_SEED_INPUT}; use hotshot_types::{ - data::QuorumProposal2, event::LeafInfo, simple_certificate::QuorumCertificate, - traits::node_implementation::ConsensusTime, + data::{DaProposal, QuorumProposal2, VidDisperseShare}, + event::LeafInfo, + message::Proposal, + simple_certificate::QuorumCertificate, + traits::{node_implementation::ConsensusTime, signature_key::SignatureKey, EncodeBytes}, + vid::vid_scheme, }; + use jf_vid::VidScheme; use portpicker::pick_unused_port; use sequencer_utils::test_utils::setup_test; use std::fmt::Debug; @@ -1226,6 +1231,7 @@ mod api_tests { } setup_test(); + let (pubkey, privkey) = PubKey::generated_from_seed_indexed([0; 32], 1); let storage = D::create_storage().await; let persistence = D::persistence_options(&storage).create().await.unwrap(); @@ -1240,11 +1246,13 @@ mod api_tests { // Create two non-consecutive leaf chains. let mut chain1 = vec![]; + let genesis = Leaf::genesis(&Default::default(), &NodeState::mock()).await; + let payload = genesis.block_payload().unwrap(); + let payload_bytes_arc = payload.encode(); + let disperse = vid_scheme(2).disperse(payload_bytes_arc.clone()).unwrap(); + let payload_commitment = disperse.commit; let mut quorum_proposal = QuorumProposal2:: { - block_header: Leaf::genesis(&Default::default(), &NodeState::mock()) - .await - .block_header() - .clone(), + block_header: genesis.block_header().clone(), view_number: ViewNumber::genesis(), justify_qc: QuorumCertificate::genesis::( &ValidatedState::default(), @@ -1274,6 +1282,50 @@ mod api_tests { qc.data.leaf_commit = Committable::commit(&leaf); justify_qc = qc.clone(); chain1.push((leaf.clone(), qc.clone())); + + // Include a quorum proposal for each leaf. + let quorum_proposal_signature = + PubKey::sign(&privkey, &bincode::serialize(&quorum_proposal).unwrap()) + .expect("Failed to sign quorum_proposal"); + persistence + .append_quorum_proposal(&Proposal { + data: quorum_proposal.clone(), + signature: quorum_proposal_signature, + _pd: Default::default(), + }) + .await + .unwrap(); + + // Include VID information for each leaf. + let share = VidDisperseShare:: { + view_number: leaf.view_number(), + payload_commitment, + share: disperse.shares[0].clone(), + common: disperse.common.clone(), + recipient_key: pubkey, + }; + persistence + .append_vid(&share.to_proposal(&privkey).unwrap()) + .await + .unwrap(); + + // Include payload information for each leaf. + let block_payload_signature = + PubKey::sign(&privkey, &payload_bytes_arc).expect("Failed to sign block payload"); + let da_proposal_inner = DaProposal:: { + encoded_transactions: payload_bytes_arc.clone(), + metadata: payload.ns_table().clone(), + view_number: leaf.view_number(), + }; + let da_proposal = Proposal { + data: da_proposal_inner, + signature: block_payload_signature, + _pd: Default::default(), + }; + persistence + .append_da(&da_proposal, payload_commitment) + .await + .unwrap(); } // Split into two chains. let mut chain2 = chain1.split_off(2); @@ -1312,7 +1364,8 @@ mod api_tests { .await .unwrap(); - // Check that the leaves were moved to archive storage. + // Check that the leaves were moved to archive storage, along with payload and VID + // information. for (leaf, qc) in chain1.iter().chain(&chain2) { tracing::info!(height = leaf.height(), "check archive"); let qd = data_source.get_leaf(leaf.height() as usize).await.await; @@ -1320,7 +1373,128 @@ mod api_tests { let stored_qc = qd.qc().clone().to_qc2(); assert_eq!(&stored_leaf, leaf); assert_eq!(&stored_qc, qc); + + data_source + .get_block(leaf.height() as usize) + .await + .try_resolve() + .ok() + .unwrap(); + data_source + .get_vid_common(leaf.height() as usize) + .await + .try_resolve() + .ok() + .unwrap(); + + // Check that all data has been garbage collected for the decided views. + assert!(persistence + .load_da_proposal(leaf.view_number()) + .await + .unwrap() + .is_none()); + assert!(persistence + .load_vid_share(leaf.view_number()) + .await + .unwrap() + .is_none()); + assert!(persistence + .load_quorum_proposal(leaf.view_number()) + .await + .is_err()); } + + // Check that data has _not_ been garbage collected for the missing view. + assert!(persistence + .load_da_proposal(ViewNumber::new(2)) + .await + .unwrap() + .is_some()); + assert!(persistence + .load_vid_share(ViewNumber::new(2)) + .await + .unwrap() + .is_some()); + persistence + .load_quorum_proposal(ViewNumber::new(2)) + .await + .unwrap(); + } + + #[tokio::test(flavor = "multi_thread")] + pub async fn test_decide_missing_data() + where + D: TestableSequencerDataSource + Debug + 'static, + { + setup_test(); + + let storage = D::create_storage().await; + let persistence = D::persistence_options(&storage).create().await.unwrap(); + let data_source: Arc> = + Arc::new(StorageState::new( + D::create(D::persistence_options(&storage), Default::default(), false) + .await + .unwrap(), + ApiState::new(future::pending()), + )); + let consumer = ApiEventConsumer::from(data_source.clone()); + + let mut qc = QuorumCertificate::genesis::( + &ValidatedState::default(), + &NodeState::mock(), + ) + .await + .to_qc2(); + let leaf = Leaf::genesis(&ValidatedState::default(), &NodeState::mock()).await; + + // Append the genesis leaf. We don't use this for the test, because the update function will + // automatically fill in the missing data for genesis. We just append this to get into a + // consistent state to then append the leaf from view 1, which will have missing data. + tracing::info!(?leaf, ?qc, "decide genesis leaf"); + persistence + .append_decided_leaves( + leaf.view_number(), + [(&leaf_info(leaf.clone().into()), qc.clone())], + &consumer, + ) + .await + .unwrap(); + + // Create another leaf, with missing data. + let mut block_header = leaf.block_header().clone(); + *block_header.height_mut() += 1; + let qp = QuorumProposal2 { + block_header, + view_number: leaf.view_number() + 1, + justify_qc: qc.clone(), + upgrade_certificate: None, + view_change_evidence: None, + drb_seed: INITIAL_DRB_SEED_INPUT, + drb_result: INITIAL_DRB_RESULT, + }; + + let leaf = Leaf2::from_quorum_proposal(&qp); + qc.view_number = leaf.view_number(); + qc.data.leaf_commit = Committable::commit(&leaf); + + // Decide a leaf without the corresponding payload or VID. + tracing::info!(?leaf, ?qc, "append leaf 1"); + persistence + .append_decided_leaves( + leaf.view_number(), + [(&leaf_info(leaf.clone()), qc)], + &consumer, + ) + .await + .unwrap(); + + // Check that we still processed the leaf. + assert_eq!( + leaf, + data_source.get_leaf(1).await.await.leaf().clone().into() + ); + assert!(data_source.get_vid_common(1).await.is_pending()); + assert!(data_source.get_block(1).await.is_pending()); } fn leaf_info(leaf: Leaf2) -> LeafInfo { diff --git a/sequencer/src/api/options.rs b/sequencer/src/api/options.rs index e640f6ab0..7d7d01284 100644 --- a/sequencer/src/api/options.rs +++ b/sequencer/src/api/options.rs @@ -3,7 +3,7 @@ use anyhow::{bail, Context}; use clap::Parser; use espresso_types::{ - v0::traits::{EventConsumer, NullEventConsumer, SequencerPersistence}, + v0::traits::{EventConsumer, NullEventConsumer, PersistenceOptions, SequencerPersistence}, BlockMerkleTree, PubKey, }; use futures::{ @@ -13,6 +13,7 @@ use futures::{ use hotshot_events_service::events::Error as EventStreamingError; use hotshot_query_service::{ data_source::{ExtensibleDataSource, MetricsDataSource}, + fetching::provider::QueryServiceProvider, status::{self, UpdateStatusData}, ApiState as AppState, Error, }; @@ -27,7 +28,7 @@ use vbs::version::StaticVersionType; use super::{ data_source::{ - provider, CatchupDataSource, HotShotConfigDataSource, NodeStateDataSource, + provider, CatchupDataSource, HotShotConfigDataSource, NodeStateDataSource, Provider, SequencerDataSource, StateSignatureDataSource, SubmitDataSource, }, endpoints, fs, sql, @@ -333,12 +334,18 @@ impl Options { N: ConnectedNetwork, P: SequencerPersistence, { - let ds = sql::DataSource::create( - mod_opt.clone(), - provider::(query_opt.peers.clone(), bind_version), - false, - ) - .await?; + let mut provider = Provider::default(); + + // Use the database itself as a fetching provider: sometimes we can fetch data that is + // missing from the query service from ephemeral consensus storage. + provider = provider.with_provider(mod_opt.clone().create().await?); + // If that fails, fetch missing data from peers. + for peer in query_opt.peers { + tracing::info!("will fetch missing data from {peer}"); + provider = provider.with_provider(QueryServiceProvider::new(peer, bind_version)); + } + + let ds = sql::DataSource::create(mod_opt.clone(), provider, false).await?; let (metrics, ds, mut app) = self .init_app_modules(ds, state.clone(), bind_version) .await?; diff --git a/sequencer/src/api/sql.rs b/sequencer/src/api/sql.rs index f08a55a79..c265f8974 100644 --- a/sequencer/src/api/sql.rs +++ b/sequencer/src/api/sql.rs @@ -53,7 +53,7 @@ impl SequencerDataSource for DataSource { let fetch_limit = opt.fetch_rate_limit; let active_fetch_delay = opt.active_fetch_delay; let chunk_fetch_delay = opt.chunk_fetch_delay; - let mut cfg = Config::try_from(opt)?; + let mut cfg = Config::try_from(&opt)?; if reset { cfg = cfg.reset_schema(); @@ -471,7 +471,9 @@ mod impl_testable_data_source { #[cfg(feature = "embedded-db")] { - let opt = crate::persistence::sql::SqliteOptions { path: db.path() }; + let opt = crate::persistence::sql::SqliteOptions { + path: Some(db.path()), + }; opt.into() } } diff --git a/sequencer/src/context.rs b/sequencer/src/context.rs index d0cbfe61e..141ff9e4f 100644 --- a/sequencer/src/context.rs +++ b/sequencer/src/context.rs @@ -32,6 +32,7 @@ use hotshot_types::{ }, PeerConfig, ValidatorConfig, }; +use std::fmt::Debug; use tracing::{Instrument, Level}; use url::Url; @@ -311,7 +312,7 @@ impl, P: SequencerPersistence, V: Versions> Sequence /// /// When this context is dropped or [`shut_down`](Self::shut_down), background tasks will be /// cancelled in the reverse order that they were spawned. - pub fn spawn(&mut self, name: impl Display, task: impl Future + Send + 'static) { + pub fn spawn(&mut self, name: impl Display, task: impl Future + Send + 'static) { self.tasks.spawn(name, task); } @@ -322,7 +323,11 @@ impl, P: SequencerPersistence, V: Versions> Sequence /// /// The only difference between a short-lived background task and a [long-lived](Self::spawn) /// one is how urgently logging related to the task is treated. - pub fn spawn_short_lived(&mut self, name: impl Display, task: impl Future + Send + 'static) { + pub fn spawn_short_lived( + &mut self, + name: impl Display, + task: impl Future + Send + 'static, + ) { self.tasks.spawn_short_lived(name, task); } @@ -442,8 +447,8 @@ macro_rules! spawn_with_log_level { spawn( async move { tracing::event!($lvl, "spawning background task"); - $task.await; - tracing::event!($lvl, "background task exited"); + let res = $task.await; + tracing::event!($lvl, ?res, "background task exited"); } .instrument(span), ) @@ -457,7 +462,7 @@ impl TaskList { /// /// When this [`TaskList`] is dropped or [`shut_down`](Self::shut_down), background tasks will /// be cancelled in the reverse order that they were spawned. - pub fn spawn(&mut self, name: impl Display, task: impl Future + Send + 'static) { + pub fn spawn(&mut self, name: impl Display, task: impl Future + Send + 'static) { spawn_with_log_level!(self, Level::INFO, name, task); } @@ -468,7 +473,11 @@ impl TaskList { /// /// The only difference between a short-lived background task and a [long-lived](Self::spawn) /// one is how urgently logging related to the task is treated. - pub fn spawn_short_lived(&mut self, name: impl Display, task: impl Future + Send + 'static) { + pub fn spawn_short_lived( + &mut self, + name: impl Display, + task: impl Future + Send + 'static, + ) { spawn_with_log_level!(self, Level::DEBUG, name, task); } diff --git a/sequencer/src/persistence.rs b/sequencer/src/persistence.rs index 5c0d201a5..c70592019 100644 --- a/sequencer/src/persistence.rs +++ b/sequencer/src/persistence.rs @@ -23,16 +23,20 @@ pub trait ChainConfigPersistence: Sized + Send + Sync { #[cfg(any(test, feature = "testing"))] mod testing { - use espresso_types::v0::traits::SequencerPersistence; + use espresso_types::v0::traits::{PersistenceOptions, SequencerPersistence}; use super::*; #[allow(dead_code)] #[async_trait] pub trait TestablePersistence: SequencerPersistence { - type Storage; + type Storage: Sync; async fn tmp_storage() -> Self::Storage; - async fn connect(storage: &Self::Storage) -> Self; + fn options(storage: &Self::Storage) -> impl PersistenceOptions; + + async fn connect(storage: &Self::Storage) -> Self { + Self::options(storage).create().await.unwrap() + } } } @@ -45,7 +49,8 @@ mod persistence_tests { use async_lock::RwLock; use committable::Committable; use espresso_types::{ - traits::EventConsumer, Event, Leaf, Leaf2, NodeState, PubKey, SeqTypes, ValidatedState, + traits::{EventConsumer, NullEventConsumer, PersistenceOptions}, + Event, Leaf, Leaf2, NodeState, PubKey, SeqTypes, ValidatedState, }; use hotshot::types::{BLSPubKey, SignatureKey}; use hotshot_example_types::node_types::TestVersions; @@ -747,4 +752,132 @@ mod persistence_tests { assert!(info.leaf.block_payload().is_some()); } } + + #[tokio::test(flavor = "multi_thread")] + pub async fn test_pruning() { + setup_test(); + + let tmp = P::tmp_storage().await; + + let mut options = P::options(&tmp); + options.set_view_retention(1); + let storage = options.create().await.unwrap(); + + // Add some "old" data, from view 0. + let leaf = Leaf::genesis(&ValidatedState::default(), &NodeState::mock()).await; + let leaf_payload = leaf.block_payload().unwrap(); + let leaf_payload_bytes_arc = leaf_payload.encode(); + let disperse = vid_scheme(2) + .disperse(leaf_payload_bytes_arc.clone()) + .unwrap(); + let payload_commitment = disperse.commit; + let (pubkey, privkey) = BLSPubKey::generated_from_seed_indexed([0; 32], 1); + let vid_share = VidDisperseShare:: { + view_number: ViewNumber::new(0), + payload_commitment, + share: disperse.shares[0].clone(), + common: disperse.common, + recipient_key: pubkey, + } + .to_proposal(&privkey) + .unwrap() + .clone(); + + let quorum_proposal = QuorumProposal2:: { + block_header: leaf.block_header().clone(), + view_number: ViewNumber::genesis(), + justify_qc: QuorumCertificate::genesis::( + &ValidatedState::default(), + &NodeState::mock(), + ) + .await + .to_qc2(), + upgrade_certificate: None, + view_change_evidence: None, + drb_seed: INITIAL_DRB_SEED_INPUT, + drb_result: INITIAL_DRB_RESULT, + }; + let quorum_proposal_signature = + BLSPubKey::sign(&privkey, &bincode::serialize(&quorum_proposal).unwrap()) + .expect("Failed to sign quorum proposal"); + let quorum_proposal = Proposal { + data: quorum_proposal, + signature: quorum_proposal_signature, + _pd: Default::default(), + }; + + let block_payload_signature = BLSPubKey::sign(&privkey, &leaf_payload_bytes_arc) + .expect("Failed to sign block payload"); + let da_proposal = Proposal { + data: DaProposal:: { + encoded_transactions: leaf_payload_bytes_arc, + metadata: leaf_payload.ns_table().clone(), + view_number: ViewNumber::new(0), + }, + signature: block_payload_signature, + _pd: Default::default(), + }; + + storage + .append_da(&da_proposal, payload_commitment) + .await + .unwrap(); + storage.append_vid(&vid_share).await.unwrap(); + storage + .append_quorum_proposal(&quorum_proposal) + .await + .unwrap(); + + // Decide a newer view, view 1. + storage + .append_decided_leaves(ViewNumber::new(1), [], &NullEventConsumer) + .await + .unwrap(); + + // The old data is not more than the retention period (1 view) old, so it should not be + // GCed. + assert_eq!( + storage + .load_da_proposal(ViewNumber::new(0)) + .await + .unwrap() + .unwrap(), + da_proposal + ); + assert_eq!( + storage + .load_vid_share(ViewNumber::new(0)) + .await + .unwrap() + .unwrap(), + vid_share + ); + assert_eq!( + storage + .load_quorum_proposal(ViewNumber::new(0)) + .await + .unwrap(), + quorum_proposal + ); + + // Decide an even newer view, triggering GC of the old data. + storage + .append_decided_leaves(ViewNumber::new(2), [], &NullEventConsumer) + .await + .unwrap(); + assert!(storage + .load_da_proposal(ViewNumber::new(0)) + .await + .unwrap() + .is_none()); + assert!(storage + .load_vid_share(ViewNumber::new(0)) + .await + .unwrap() + .is_none()); + assert!(storage + .load_quorum_proposal(ViewNumber::new(0)) + .await + .is_err()); + } } diff --git a/sequencer/src/persistence/fs.rs b/sequencer/src/persistence/fs.rs index e482090e7..43e5541aa 100644 --- a/sequencer/src/persistence/fs.rs +++ b/sequencer/src/persistence/fs.rs @@ -12,7 +12,10 @@ use hotshot_types::{ event::{Event, EventType, HotShotAction, LeafInfo}, message::{convert_proposal, Proposal}, simple_certificate::{QuorumCertificate, QuorumCertificate2, UpgradeCertificate}, - traits::{block_contents::BlockPayload, node_implementation::ConsensusTime}, + traits::{ + block_contents::{BlockHeader, BlockPayload}, + node_implementation::ConsensusTime, + }, utils::View, vid::VidSchemeType, vote::HasViewNumber, @@ -23,6 +26,7 @@ use std::{ collections::BTreeMap, fs::{self, File, OpenOptions}, io::{Read, Seek, SeekFrom, Write}, + ops::RangeInclusive, path::{Path, PathBuf}, }; @@ -39,6 +43,24 @@ pub struct Options { #[clap(long, env = "ESPRESSO_SEQUENCER_STORE_UNDECIDED_STATE", hide = true)] store_undecided_state: bool, + + /// Number of views to retain in consensus storage before data that hasn't been archived is + /// garbage collected. + /// + /// The longer this is, the more certain that all data will eventually be archived, even if + /// there are temporary problems with archive storage or partially missing data. This can be set + /// very large, as most data is garbage collected as soon as it is finalized by consensus. This + /// setting only applies to views which never get decided (ie forks in consensus) and views for + /// which this node is partially offline. These should be exceptionally rare. + /// + /// The default of 130000 views equates to approximately 3 days (259200 seconds) at an average + /// view time of 2s. + #[clap( + long, + env = "ESPRESSO_SEQUENCER_CONSENSUS_VIEW_RETENTION", + default_value = "130000" + )] + pub(crate) consensus_view_retention: u64, } impl Default for Options { @@ -52,6 +74,7 @@ impl Options { Self { path, store_undecided_state: false, + consensus_view_retention: 130000, } } @@ -64,13 +87,21 @@ impl Options { impl PersistenceOptions for Options { type Persistence = Persistence; + fn set_view_retention(&mut self, view_retention: u64) { + self.consensus_view_retention = view_retention; + } + async fn create(&mut self) -> anyhow::Result { let path = self.path.clone(); let store_undecided_state = self.store_undecided_state; + let view_retention = self.consensus_view_retention; Ok(Persistence { store_undecided_state, - inner: Arc::new(RwLock::new(Inner { path })), + inner: Arc::new(RwLock::new(Inner { + path, + view_retention, + })), }) } @@ -93,6 +124,7 @@ pub struct Persistence { #[derive(Debug)] struct Inner { path: PathBuf, + view_retention: u64, } impl Inner { @@ -178,47 +210,64 @@ impl Inner { Ok(()) } - fn collect_garbage(&mut self, view: ViewNumber) -> anyhow::Result<()> { + fn collect_garbage( + &mut self, + view: ViewNumber, + intervals: &[RangeInclusive], + ) -> anyhow::Result<()> { let view_number = view.u64(); + let prune_view = view.saturating_sub(self.view_retention); - let delete_files = |view_number: u64, dir_path: PathBuf| -> anyhow::Result<()> { - if !dir_path.is_dir() { - return Ok(()); - } - - for entry in fs::read_dir(dir_path)? { - let entry = entry?; - let path = entry.path(); + let delete_files = + |intervals: &[RangeInclusive], keep, dir_path: PathBuf| -> anyhow::Result<()> { + if !dir_path.is_dir() { + return Ok(()); + } - if let Some(file) = path.file_stem().and_then(|n| n.to_str()) { - if let Ok(v) = file.parse::() { - if v <= view_number { - fs::remove_file(&path)?; + for entry in fs::read_dir(dir_path)? { + let entry = entry?; + let path = entry.path(); + + if let Some(file) = path.file_stem().and_then(|n| n.to_str()) { + if let Ok(v) = file.parse::() { + // If the view is the anchor view, keep it no matter what. + if let Some(keep) = keep { + if keep == v { + continue; + } + } + // Otherwise, delete it if it is time to prune this view _or_ if the + // given intervals, which we've already successfully processed, contain + // the view; in this case we simply don't need it anymore. + if v < prune_view || intervals.iter().any(|i| i.contains(&v)) { + fs::remove_file(&path)?; + } } } } - } - Ok(()) - }; + Ok(()) + }; - delete_files(view_number, self.da_dir_path())?; - delete_files(view_number, self.vid_dir_path())?; - delete_files(view_number, self.quorum_proposals_dir_path())?; + delete_files(intervals, None, self.da_dir_path())?; + delete_files(intervals, None, self.vid_dir_path())?; + delete_files(intervals, None, self.quorum_proposals_dir_path())?; // Save the most recent leaf as it will be our anchor point if the node restarts. - if view_number > 0 { - delete_files(view_number - 1, self.decided_leaf_path())?; - } + delete_files(intervals, Some(view_number), self.decided_leaf_path())?; Ok(()) } + /// Generate events based on persisted decided leaves. + /// + /// Returns a list of closed intervals of views which can be safely deleted, as all leaves + /// within these view ranges have been processed by the event consumer. async fn generate_decide_events( &self, view: ViewNumber, consumer: &impl EventConsumer, - ) -> anyhow::Result<()> { + ) -> anyhow::Result>> { // Generate a decide event for each leaf, to be processed by the event consumer. We make a // separate event for each leaf because it is possible we have non-consecutive leaves in our // storage, which would not be valid as a single decide with a single leaf chain. @@ -286,7 +335,10 @@ impl Inner { } } + let mut intervals = vec![]; + let mut current_interval = None; for (view, (leaf, qc)) in leaves { + let height = leaf.leaf.block_header().block_number(); consumer .handle_event(&Event { view_number: ViewNumber::new(view), @@ -297,9 +349,27 @@ impl Inner { }, }) .await?; + if let Some((start, end, current_height)) = current_interval.as_mut() { + if height == *current_height + 1 { + // If we have a chain of consecutive leaves, extend the current interval of + // views which are safe to delete. + *current_height += 1; + *end = view; + } else { + // Otherwise, end the current interval and start a new one. + intervals.push(*start..=*end); + current_interval = Some((view, view, height)); + } + } else { + // Start a new interval. + current_interval = Some((view, view, height)); + } + } + if let Some((start, end, _)) = current_interval { + intervals.push(start..=end); } - Ok(()) + Ok(intervals) } fn load_da_proposal( @@ -477,20 +547,21 @@ impl SequencerPersistence for Persistence { )?; } - // Event processing failure is not an error, since by this point we have at least managed to - // persist the decided leaves successfully, and the event processing will just run again at - // the next decide. If there is an error here, we just log it and return early with success - // to prevent GC from running before the decided leaves are processed. - if let Err(err) = inner.generate_decide_events(view, consumer).await { - tracing::warn!(?view, "event processing failed: {err:#}"); - return Ok(()); - } - - if let Err(err) = inner.collect_garbage(view) { - // Similarly, garbage collection is not an error. We have done everything we strictly - // needed to do, and GC will run again at the next decide. Log the error but do not - // return it. - tracing::warn!(?view, "GC failed: {err:#}"); + match inner.generate_decide_events(view, consumer).await { + Err(err) => { + // Event processing failure is not an error, since by this point we have at least + // managed to persist the decided leaves successfully, and the event processing will + // just run again at the next decide. + tracing::warn!(?view, "event processing failed: {err:#}"); + } + Ok(intervals) => { + if let Err(err) = inner.collect_garbage(view, &intervals) { + // Similarly, garbage collection is not an error. We have done everything we + // strictly needed to do, and GC will run again at the next decide. Log the + // error but do not return it. + tracing::warn!(?view, "GC failed: {err:#}"); + } + } } Ok(()) @@ -856,8 +927,8 @@ mod testing { TempDir::new().unwrap() } - async fn connect(storage: &Self::Storage) -> Self { - Options::new(storage.path().into()).create().await.unwrap() + fn options(storage: &Self::Storage) -> impl PersistenceOptions { + Options::new(storage.path().into()) } } } diff --git a/sequencer/src/persistence/no_storage.rs b/sequencer/src/persistence/no_storage.rs index f7b34aeb1..e80e89205 100644 --- a/sequencer/src/persistence/no_storage.rs +++ b/sequencer/src/persistence/no_storage.rs @@ -29,6 +29,8 @@ pub struct Options; impl PersistenceOptions for Options { type Persistence = NoStorage; + fn set_view_retention(&mut self, _: u64) {} + async fn create(&mut self) -> anyhow::Result { Ok(NoStorage) } diff --git a/sequencer/src/persistence/sql.rs b/sequencer/src/persistence/sql.rs index 4cb250d9b..61c624355 100644 --- a/sequencer/src/persistence/sql.rs +++ b/sequencer/src/persistence/sql.rs @@ -10,13 +10,22 @@ use espresso_types::{ BackoffParams, Leaf, Leaf2, NetworkConfig, Payload, }; use futures::stream::StreamExt; -use hotshot_query_service::data_source::storage::sql::{syntax_helpers::MAX_FN, Db}; -use hotshot_query_service::data_source::{ - storage::{ - pruning::PrunerCfg, - sql::{include_migrations, query_as, Config, SqlStorage}, +use hotshot_query_service::{ + availability::LeafQueryData, + data_source::{ + storage::{ + pruning::PrunerCfg, + sql::{ + include_migrations, query_as, syntax_helpers::MAX_FN, Config, Db, SqlStorage, + Transaction, TransactionMode, Write, + }, + }, + Transaction as _, VersionedDataSource, + }, + fetching::{ + request::{LeafRequest, PayloadRequest, VidCommonRequest}, + Provider, }, - Transaction as _, VersionedDataSource, }; use hotshot_types::{ consensus::CommitmentMap, @@ -24,12 +33,15 @@ use hotshot_types::{ event::{Event, EventType, HotShotAction, LeafInfo}, message::{convert_proposal, Proposal}, simple_certificate::{QuorumCertificate, QuorumCertificate2, UpgradeCertificate}, - traits::{node_implementation::ConsensusTime, BlockPayload}, + traits::{ + block_contents::{BlockHeader, BlockPayload}, + node_implementation::ConsensusTime, + }, utils::View, - vid::VidSchemeType, + vid::{VidCommitment, VidCommon}, vote::HasViewNumber, }; -use jf_vid::VidScheme; +use itertools::Itertools; use sqlx::Row; use sqlx::{query, Executor}; use std::{collections::BTreeMap, path::PathBuf, str::FromStr, sync::Arc, time::Duration}; @@ -37,7 +49,7 @@ use std::{collections::BTreeMap, path::PathBuf, str::FromStr, sync::Arc, time::D use crate::{catchup::SqlStateCatchup, SeqTypes, ViewNumber}; /// Options for Postgres-backed persistence. -#[derive(Parser, Clone, Derivative, Default)] +#[derive(Parser, Clone, Derivative)] #[derivative(Debug)] pub struct PostgresOptions { /// Hostname for the remote Postgres database server. @@ -67,6 +79,12 @@ pub struct PostgresOptions { pub(crate) use_tls: bool, } +impl Default for PostgresOptions { + fn default() -> Self { + Self::parse_from(std::iter::empty::()) + } +} + #[derive(Parser, Clone, Derivative, Default, From, Into)] #[derivative(Debug)] pub struct SqliteOptions { @@ -77,7 +95,7 @@ pub struct SqliteOptions { env = "ESPRESSO_SEQUENCER_STORAGE_PATH", value_parser = build_sqlite_path )] - pub(crate) path: PathBuf, + pub(crate) path: Option, } pub fn build_sqlite_path(path: &str) -> anyhow::Result { @@ -93,7 +111,7 @@ pub fn build_sqlite_path(path: &str) -> anyhow::Result { } /// Options for database-backed persistence, supporting both Postgres and SQLite. -#[derive(Parser, Clone, Derivative, Default, From, Into)] +#[derive(Parser, Clone, Derivative, From, Into)] #[derivative(Debug)] pub struct Options { #[cfg(not(feature = "embedded-db"))] @@ -134,6 +152,10 @@ pub struct Options { #[clap(flatten)] pub(crate) pruning: PruningOptions, + /// Pruning parameters for ephemeral consensus storage. + #[clap(flatten)] + pub(crate) consensus_pruning: ConsensusPruningOptions, + #[clap(long, env = "ESPRESSO_SEQUENCER_STORE_UNDECIDED_STATE", hide = true)] pub(crate) store_undecided_state: bool, @@ -211,6 +233,12 @@ pub struct Options { pub(crate) pool: Option>, } +impl Default for Options { + fn default() -> Self { + Self::parse_from(std::iter::empty::()) + } +} + #[cfg(not(feature = "embedded-db"))] impl From for Config { fn from(opt: PostgresOptions) -> Self { @@ -254,7 +282,10 @@ impl From for Config { fn from(opt: SqliteOptions) -> Self { let mut cfg = Config::default(); - cfg = cfg.db_path(opt.path); + if let Some(path) = opt.path { + cfg = cfg.db_path(path); + } + cfg = cfg.max_connections(20); cfg = cfg.idle_connection_timeout(Duration::from_secs(120)); cfg = cfg.connection_timeout(Duration::from_secs(10240)); @@ -290,17 +321,17 @@ impl From for Options { } } } -impl TryFrom for Config { +impl TryFrom<&Options> for Config { type Error = anyhow::Error; - fn try_from(opt: Options) -> Result { - let mut cfg = match opt.uri { + fn try_from(opt: &Options) -> Result { + let mut cfg = match &opt.uri { Some(uri) => uri.parse()?, None => Self::default(), }; - if let Some(pool) = opt.pool { - cfg = cfg.pool(pool); + if let Some(pool) = &opt.pool { + cfg = cfg.pool(pool.clone()); } cfg = cfg.max_connections(opt.max_connections); @@ -315,10 +346,10 @@ impl TryFrom for Config { "$CARGO_MANIFEST_DIR/api/migrations/postgres" )); - let pg_options = opt.postgres_options; + let pg_options = &opt.postgres_options; - if let Some(host) = pg_options.host { - cfg = cfg.host(host); + if let Some(host) = &pg_options.host { + cfg = cfg.host(host.clone()); } if let Some(port) = pg_options.port { @@ -348,7 +379,9 @@ impl TryFrom for Config { "$CARGO_MANIFEST_DIR/api/migrations/sqlite" )); - cfg = cfg.db_path(opt.sqlite_options.path); + if let Some(path) = &opt.sqlite_options.path { + cfg = cfg.db_path(path.clone()); + } } if opt.prune { @@ -363,7 +396,7 @@ impl TryFrom for Config { } /// Pruning parameters. -#[derive(Parser, Clone, Debug, Default)] +#[derive(Parser, Clone, Copy, Debug)] pub struct PruningOptions { /// Threshold for pruning, specified in bytes. /// If the disk usage surpasses this threshold, pruning is initiated for data older than the specified minimum retention period. @@ -437,16 +470,79 @@ impl From for PrunerCfg { } } +/// Pruning parameters for ephemeral consensus storage. +#[derive(Parser, Clone, Copy, Debug)] +pub struct ConsensusPruningOptions { + /// Number of views to try to retain in consensus storage before data that hasn't been archived + /// is garbage collected. + /// + /// The longer this is, the more certain that all data will eventually be archived, even if + /// there are temporary problems with archive storage or partially missing data. This can be set + /// very large, as most data is garbage collected as soon as it is finalized by consensus. This + /// setting only applies to views which never get decided (ie forks in consensus) and views for + /// which this node is partially offline. These should be exceptionally rare. + /// + /// Note that in extreme scenarios, data may be garbage collected even before TARGET_RETENTION + /// views, if consensus storage exceeds TARGET_USAGE. For a hard lower bound on how long + /// consensus data will be retained, see MINIMUM_RETENTION. + /// + /// The default of 302000 views equates to approximately 1 week (604800 seconds) at an average + /// view time of 2s. + #[clap( + name = "TARGET_RETENTION", + long = "consensus-storage-target-retention", + env = "ESPRESSO_SEQUENCER_CONSENSUS_STORAGE_TARGET_RETENTION", + default_value = "302000" + )] + target_retention: u64, + + /// Minimum number of views to try to retain in consensus storage before data that hasn't been + /// archived is garbage collected. + /// + /// This bound allows data to be retained even if consensus storage occupies more than + /// TARGET_USAGE. This can be used to ensure sufficient time to move consensus data to archival + /// storage as necessary, even under extreme circumstances where otherwise garbage collection + /// would kick in based on TARGET_RETENTION. + /// + /// The default of 130000 views equates to approximately 3 days (259200 seconds) at an average + /// view time of 2s. + #[clap( + name = "MINIMUM_RETENTION", + long = "consensus-storage-minimum-retention", + env = "ESPRESSO_SEQUENCER_CONSENSUS_STORAGE_MINIMUM_RETENTION", + default_value = "130000" + )] + minimum_retention: u64, + + /// Amount (in bytes) of data to retain in consensus storage before garbage collecting more + /// aggressively. + /// + /// See also TARGET_RETENTION and MINIMUM_RETENTION. + #[clap( + name = "TARGET_USAGE", + long = "consensus-storage-target-usage", + env = "ESPRESSO_SEQUENCER_CONSENSUS_STORAGE_TARGET_USAGE", + default_value = "1000000000" + )] + target_usage: u64, +} + #[async_trait] impl PersistenceOptions for Options { type Persistence = Persistence; + fn set_view_retention(&mut self, view_retention: u64) { + self.consensus_pruning.target_retention = view_retention; + self.consensus_pruning.minimum_retention = view_retention; + } + async fn create(&mut self) -> anyhow::Result { let store_undecided_state = self.store_undecided_state; - let config = self.clone().try_into()?; + let config = (&*self).try_into()?; let persistence = Persistence { store_undecided_state, db: SqlStorage::connect(config).await?, + gc_opt: self.consensus_pruning, }; persistence.migrate_quorum_proposal_leaf_hashes().await?; self.pool = Some(persistence.db.pool()); @@ -454,16 +550,17 @@ impl PersistenceOptions for Options { } async fn reset(self) -> anyhow::Result<()> { - SqlStorage::connect(Config::try_from(self)?.reset_schema()).await?; + SqlStorage::connect(Config::try_from(&self)?.reset_schema()).await?; Ok(()) } } /// Postgres-backed persistence. -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct Persistence { db: SqlStorage, store_undecided_state: bool, + gc_opt: ConsensusPruningOptions, } impl Persistence { @@ -501,6 +598,313 @@ impl Persistence { tx.commit().await } + + async fn generate_decide_events(&self, consumer: &impl EventConsumer) -> anyhow::Result<()> { + let mut last_processed_view: Option = self + .db + .read() + .await? + .fetch_optional("SELECT last_processed_view FROM event_stream WHERE id = 1 LIMIT 1") + .await? + .map(|row| row.get("last_processed_view")); + loop { + // In SQLite, overlapping read and write transactions can lead to database errors. To + // avoid this: + // - start a read transaction to query and collect all the necessary data. + // - Commit (or implicitly drop) the read transaction once the data is fetched. + // - use the collected data to generate a "decide" event for the consumer. + // - begin a write transaction to delete the data and update the event stream. + let mut tx = self.db.read().await?; + + // Collect a chain of consecutive leaves, starting from the first view after the last + // decide. This will correspond to a decide event, and defines a range of views which + // can be garbage collected. This may even include views for which there was no leaf, + // for which we might still have artifacts like proposals that never finalized. + let from_view = match last_processed_view { + Some(v) => v + 1, + None => 0, + }; + + let mut parent = None; + let mut rows = query("SELECT leaf, qc FROM anchor_leaf WHERE view >= $1 ORDER BY view") + .bind(from_view) + .fetch(tx.as_mut()); + let mut leaves = vec![]; + let mut final_qc = None; + while let Some(row) = rows.next().await { + let row = match row { + Ok(row) => row, + Err(err) => { + // If there's an error getting a row, try generating an event with the rows + // we do have. + tracing::warn!("error loading row: {err:#}"); + break; + } + }; + + let leaf_data: Vec = row.get("leaf"); + let leaf = bincode::deserialize::(&leaf_data)?; + let qc_data: Vec = row.get("qc"); + let qc = bincode::deserialize::>(&qc_data)?; + let height = leaf.block_header().block_number(); + + // Ensure we are only dealing with a consecutive chain of leaves. We don't want to + // garbage collect any views for which we missed a leaf or decide event; at least + // not right away, in case we need to recover that data later. + if let Some(parent) = parent { + if height != parent + 1 { + tracing::debug!( + height, + parent, + "ending decide event at non-consecutive leaf" + ); + break; + } + } + parent = Some(height); + leaves.push(leaf); + final_qc = Some(qc); + } + drop(rows); + + let Some(final_qc) = final_qc else { + // End event processing when there are no more decided views. + tracing::debug!(from_view, "no new leaves at decide"); + return Ok(()); + }; + + // Find the range of views encompassed by this leaf chain. All data in this range can be + // processed by the consumer and then deleted. + let from_view = leaves[0].view_number(); + let to_view = leaves[leaves.len() - 1].view_number(); + + // Collect VID shares for the decide event. + let mut vid_shares = tx + .fetch_all( + query("SELECT view, data FROM vid_share where view >= $1 AND view <= $2") + .bind(from_view.u64() as i64) + .bind(to_view.u64() as i64), + ) + .await? + .into_iter() + .map(|row| { + let view: i64 = row.get("view"); + let data: Vec = row.get("data"); + let vid_proposal = bincode::deserialize::< + Proposal>, + >(&data)?; + Ok((view as u64, vid_proposal.data)) + }) + .collect::>>()?; + + // Collect DA proposals for the decide event. + let mut da_proposals = tx + .fetch_all( + query("SELECT view, data FROM da_proposal where view >= $1 AND view <= $2") + .bind(from_view.u64() as i64) + .bind(to_view.u64() as i64), + ) + .await? + .into_iter() + .map(|row| { + let view: i64 = row.get("view"); + let data: Vec = row.get("data"); + let da_proposal = + bincode::deserialize::>>(&data)?; + Ok((view as u64, da_proposal.data)) + }) + .collect::>>()?; + + drop(tx); + + // Collate all the information by view number and construct a chain of leaves. + let leaf_chain = leaves + .into_iter() + // Go in reverse chronological order, as expected by Decide events. + .rev() + .map(|mut leaf| { + let view = leaf.view_number(); + + // Include the VID share if available. + let vid_share = vid_shares.remove(&view); + if vid_share.is_none() { + tracing::debug!(?view, "VID share not available at decide"); + } + + // Fill in the full block payload using the DA proposals we had persisted. + if let Some(proposal) = da_proposals.remove(&view) { + let payload = + Payload::from_bytes(&proposal.encoded_transactions, &proposal.metadata); + leaf.fill_block_payload_unchecked(payload); + } else if view == ViewNumber::genesis() { + // We don't get a DA proposal for the genesis view, but we know what the + // payload always is. + leaf.fill_block_payload_unchecked(Payload::empty().0); + } else { + tracing::debug!(?view, "DA proposal not available at decide"); + } + + LeafInfo { + leaf: leaf.into(), + vid_share, + // Note: the following fields are not used in Decide event processing, and + // should be removed. For now, we just default them. + state: Default::default(), + delta: Default::default(), + } + }) + .collect(); + + // Generate decide event for the consumer. + tracing::debug!(?to_view, ?final_qc, ?leaf_chain, "generating decide event"); + consumer + .handle_event(&Event { + view_number: to_view, + event: EventType::Decide { + leaf_chain: Arc::new(leaf_chain), + qc: Arc::new(final_qc.to_qc2()), + block_size: None, + }, + }) + .await?; + + let mut tx = self.db.write().await?; + + // Now that we have definitely processed leaves up to `to_view`, we can update + // `last_processed_view` so we don't process these leaves again. We may still fail at + // this point, or shut down, and fail to complete this update. At worst this will lead + // to us sending a duplicate decide event the next time we are called; this is fine as + // the event consumer is required to be idempotent. + tx.upsert( + "event_stream", + ["id", "last_processed_view"], + ["id"], + [(1i32, to_view.u64() as i64)], + ) + .await?; + + // Delete the data that has been fully processed. + tx.execute( + query("DELETE FROM vid_share where view >= $1 AND view <= $2") + .bind(from_view.u64() as i64) + .bind(to_view.u64() as i64), + ) + .await?; + tx.execute( + query("DELETE FROM da_proposal where view >= $1 AND view <= $2") + .bind(from_view.u64() as i64) + .bind(to_view.u64() as i64), + ) + .await?; + tx.execute( + query("DELETE FROM quorum_proposals where view >= $1 AND view <= $2") + .bind(from_view.u64() as i64) + .bind(to_view.u64() as i64), + ) + .await?; + tx.execute( + query("DELETE FROM quorum_certificate where view >= $1 AND view <= $2") + .bind(from_view.u64() as i64) + .bind(to_view.u64() as i64), + ) + .await?; + + // Clean up leaves, but do not delete the most recent one (all leaves with a view number + // less than the given value). This is necessary to ensure that, in case of a restart, + // we can resume from the last decided leaf. + tx.execute( + query("DELETE FROM anchor_leaf WHERE view >= $1 AND view < $2") + .bind(from_view.u64() as i64) + .bind(to_view.u64() as i64), + ) + .await?; + + tx.commit().await?; + last_processed_view = Some(to_view.u64() as i64); + } + } + + #[tracing::instrument(skip(self))] + async fn prune(&self, cur_view: ViewNumber) -> anyhow::Result<()> { + let mut tx = self.db.write().await?; + + // Prune everything older than the target retention period. + prune_to_view( + &mut tx, + cur_view.u64().saturating_sub(self.gc_opt.target_retention), + ) + .await?; + + // Check our storage usage; if necessary we will prune more aggressively (up to the minimum + // retention) to get below the target usage. + #[cfg(feature = "embedded-db")] + let usage_query = format!( + "SELECT sum(pgsize) FROM dbstat WHERE name IN ({})", + PRUNE_TABLES + .iter() + .map(|table| format!("'{table}'")) + .join(",") + ); + + #[cfg(not(feature = "embedded-db"))] + let usage_query = { + let table_sizes = PRUNE_TABLES + .iter() + .map(|table| format!("pg_table_size('{table}')")) + .join(" + "); + format!("SELECT {table_sizes}") + }; + + let (usage,): (i64,) = query_as(&usage_query).fetch_one(tx.as_mut()).await?; + tracing::debug!(usage, "consensus storage usage after pruning"); + + if (usage as u64) > self.gc_opt.target_usage { + tracing::warn!( + usage, + gc_opt = ?self.gc_opt, + "consensus storage is running out of space, pruning to minimum retention" + ); + prune_to_view( + &mut tx, + cur_view.u64().saturating_sub(self.gc_opt.minimum_retention), + ) + .await?; + } + + tx.commit().await + } +} + +const PRUNE_TABLES: &[&str] = &[ + "anchor_leaf", + "vid_share", + "da_proposal", + "quorum_proposals", + "quorum_certificate", +]; + +async fn prune_to_view(tx: &mut Transaction, view: u64) -> anyhow::Result<()> { + if view == 0 { + // Nothing to prune, the entire chain is younger than the retention period. + return Ok(()); + } + tracing::debug!(view, "pruning consensus storage"); + + for table in PRUNE_TABLES { + let res = query(&format!("DELETE FROM {table} WHERE view < $1")) + .bind(view as i64) + .execute(tx.as_mut()) + .await + .context(format!("pruning {table}"))?; + if res.rows_affected() > 0 { + tracing::info!( + "garbage collected {} rows from {table}", + res.rows_affected() + ); + } + } + + Ok(()) } #[async_trait] @@ -575,13 +979,18 @@ impl SequencerPersistence for Persistence { // Generate an event for the new leaves and, only if it succeeds, clean up data we no longer // need. - let consumer = dyn_clone::clone(consumer); - - if let Err(err) = collect_garbage(self, view, consumer).await { + if let Err(err) = self.generate_decide_events(consumer).await { // GC/event processing failure is not an error, since by this point we have at least // managed to persist the decided leaves successfully, and GC will just run again at the // next decide. Log an error but do not return it. - tracing::warn!(?view, "GC/event processing failed: {err:#}"); + tracing::warn!(?view, "event processing failed: {err:#}"); + return Ok(()); + } + + // Garbage collect data which was not included in any decide event, but which at this point + // is old enough to just forget about. + if let Err(err) = self.prune(view).await { + tracing::warn!(?view, "pruning failed: {err:#}"); } Ok(()) @@ -740,24 +1149,25 @@ impl SequencerPersistence for Persistence { &self, proposal: &Proposal>, ) -> anyhow::Result<()> { - let data = &proposal.data; - let view = data.view_number().u64(); + let view = proposal.data.view_number.u64(); + let payload_hash = proposal.data.payload_commitment; let data_bytes = bincode::serialize(proposal).unwrap(); let mut tx = self.db.write().await?; tx.upsert( "vid_share", - ["view", "data"], + ["view", "data", "payload_hash"], ["view"], - [(view as i64, data_bytes)], + [(view as i64, data_bytes, payload_hash.to_string())], ) .await?; tx.commit().await } + async fn append_da( &self, proposal: &Proposal>, - _vid_commit: ::Commit, + vid_commit: VidCommitment, ) -> anyhow::Result<()> { let data = &proposal.data; let view = data.view_number().u64(); @@ -766,13 +1176,14 @@ impl SequencerPersistence for Persistence { let mut tx = self.db.write().await?; tx.upsert( "da_proposal", - ["view", "data"], + ["view", "data", "payload_hash"], ["view"], - [(view as i64, data_bytes)], + [(view as i64, data_bytes, vid_commit.to_string())], ) .await?; tx.commit().await } + async fn record_action(&self, view: ViewNumber, action: HotShotAction) -> anyhow::Result<()> { // Todo Remove this after https://github.com/EspressoSystems/espresso-sequencer/issues/1931 if !matches!(action, HotShotAction::Propose | HotShotAction::Vote) { @@ -829,6 +1240,22 @@ impl SequencerPersistence for Persistence { [(view_number as i64, leaf_hash.to_string(), proposal_bytes)], ) .await?; + + // We also keep track of any QC we see in case we need it to recover our archival storage. + let justify_qc = &proposal.data.justify_qc; + let justify_qc_bytes = bincode::serialize(&justify_qc).context("serializing QC")?; + tx.upsert( + "quorum_certificate", + ["view", "leaf_hash", "data"], + ["view"], + [( + justify_qc.view_number.u64() as i64, + justify_qc.data.leaf_commit.to_string(), + &justify_qc_bytes, + )], + ) + .await?; + tx.commit().await } @@ -883,177 +1310,144 @@ impl SequencerPersistence for Persistence { } } -async fn collect_garbage( - storage: &Persistence, - view: ViewNumber, - consumer: impl EventConsumer, -) -> anyhow::Result<()> { - // In SQLite, overlapping read and write transactions can lead to database errors. - // To avoid this: - // - start a read transaction to query and collect all the necessary data. - // - Commit (or implicitly drop) the read transaction once the data is fetched. - // - use the collected data to generate a "decide" event for the consumer. - // - begin a write transaction to delete the data and update the event stream. - let mut tx = storage.db.read().await?; - - // collect VID shares. - let mut vid_shares = tx - .fetch_all(query("SELECT * FROM vid_share where view <= $1").bind(view.u64() as i64)) - .await? - .into_iter() - .map(|row| { - let view: i64 = row.get("view"); - let data: Vec = row.get("data"); - let vid_proposal = - bincode::deserialize::>>(&data)?; - Ok((view as u64, vid_proposal.data)) - }) - .collect::>>()?; - - // collect DA proposals. - let mut da_proposals = tx - .fetch_all(query("SELECT * FROM da_proposal where view <= $1").bind(view.u64() as i64)) - .await? - .into_iter() - .map(|row| { - let view: i64 = row.get("view"); - let data: Vec = row.get("data"); - let da_proposal = - bincode::deserialize::>>(&data)?; - Ok((view as u64, da_proposal.data)) - }) - .collect::>>()?; +#[async_trait] +impl Provider for Persistence { + #[tracing::instrument(skip(self))] + async fn fetch(&self, req: VidCommonRequest) -> Option { + let mut tx = match self.db.read().await { + Ok(tx) => tx, + Err(err) => { + tracing::warn!("could not open transaction: {err:#}"); + return None; + } + }; - // collect leaves - let mut leaves = tx - .fetch_all( - query("SELECT view, leaf, qc FROM anchor_leaf WHERE view <= $1") - .bind(view.u64() as i64), + let bytes = match query_as::<(Vec,)>( + "SELECT data FROM vid_share WHERE payload_hash = $1 LIMIT 1", ) - .await? - .into_iter() - .map(|row| { - let view: i64 = row.get("view"); - let leaf_data: Vec = row.get("leaf"); - let leaf = bincode::deserialize::(&leaf_data)?; - let qc_data: Vec = row.get("qc"); - let qc = bincode::deserialize::>(&qc_data)?; - Ok((view as u64, (leaf, qc))) - }) - .collect::>>()?; - - // Exclude from the decide event any leaves which have definitely already been processed. We may - // have selected an already-processed leaf because the oldest leaf -- the last leaf processed in - // the previous decide event -- remained in the database to serve as the anchor leaf, so our - // query would have returned it. In fact, this will almost always be the case, but there are two - // cases where it might not be, and we must process this leaf after all: - // - // 1. The oldest leaf is the genesis leaf, and there _is_ no previous decide event - // 2. We previously stored some leaves in the database and then failed while processing the - // decide event, or shut down before generating the decide event, and so we are only now - // generating the decide event for those previous leaves. - // - // Since these cases (particularly case 2) are hard to account for explicitly, we just use a - // persistent value in the database to remember how far we have successfully processed the event - // stream. - let last_processed_view: Option = tx - .fetch_optional(query( - "SELECT last_processed_view FROM event_stream WHERE id = 1 LIMIT 1", - )) - .await? - .map(|row| row.get("last_processed_view")); - let leaves = if let Some(v) = last_processed_view { - let new_leaves = leaves.split_off(&((v as u64) + 1)); - if !leaves.is_empty() { - tracing::debug!( - v, - remaining_leaves = new_leaves.len(), - ?leaves, - "excluding already-processed leaves from decide event" - ); - } - new_leaves - } else { - leaves - }; + .bind(req.0.to_string()) + .fetch_one(tx.as_mut()) + .await + { + Ok((bytes,)) => bytes, + Err(err) => { + tracing::warn!("error loading VID share: {err:#}"); + return None; + } + }; - drop(tx); + let share: Proposal> = + match bincode::deserialize(&bytes) { + Ok(share) => share, + Err(err) => { + tracing::warn!("error decoding VID share: {err:#}"); + return None; + } + }; - // Generate a decide event for each leaf, to be processed by the event consumer. We make a - // separate event for each leaf because it is possible we have non-consecutive leaves in our - // storage, which would not be valid as a single decide with a single leaf chain. - for (view, (mut leaf, qc)) in leaves { - // Include the VID share if available. - let vid_share = vid_shares.remove(&view); - if vid_share.is_none() { - tracing::debug!(view, "VID share not available at decide"); - } + Some(share.data.common) + } +} - // Fill in the full block payload using the DA proposals we had persisted. - if let Some(proposal) = da_proposals.remove(&view) { - let payload = Payload::from_bytes(&proposal.encoded_transactions, &proposal.metadata); - leaf.fill_block_payload_unchecked(payload); - } else if view == ViewNumber::genesis().u64() { - // We don't get a DA proposal for the genesis view, but we know what the payload always - // is. - leaf.fill_block_payload_unchecked(Payload::empty().0); - } else { - tracing::debug!(view, "DA proposal not available at decide"); - } +#[async_trait] +impl Provider for Persistence { + #[tracing::instrument(skip(self))] + async fn fetch(&self, req: PayloadRequest) -> Option { + let mut tx = match self.db.read().await { + Ok(tx) => tx, + Err(err) => { + tracing::warn!("could not open transaction: {err:#}"); + return None; + } + }; - let leaf_info = LeafInfo { - leaf: leaf.into(), - vid_share, + let bytes = match query_as::<(Vec,)>( + "SELECT data FROM da_proposal WHERE payload_hash = $1 LIMIT 1", + ) + .bind(req.0.to_string()) + .fetch_one(tx.as_mut()) + .await + { + Ok((bytes,)) => bytes, + Err(err) => { + tracing::warn!("error loading DA proposal: {err:#}"); + return None; + } + }; - // Note: the following fields are not used in Decide event processing, and - // should be removed. For now, we just default them. - state: Default::default(), - delta: Default::default(), + let proposal: Proposal> = match bincode::deserialize(&bytes) + { + Ok(proposal) => proposal, + Err(err) => { + tracing::warn!("error decoding DA proposal: {err:#}"); + return None; + } }; - tracing::debug!(?view, ?qc, ?leaf_info, "generating decide event"); - consumer - .handle_event(&Event { - view_number: ViewNumber::new(view), - event: EventType::Decide { - leaf_chain: Arc::new(vec![leaf_info]), - qc: Arc::new(qc.to_qc2()), - block_size: None, - }, - }) - .await?; + + Some(Payload::from_bytes( + &proposal.data.encoded_transactions, + &proposal.data.metadata, + )) } +} - let mut tx = storage.db.write().await?; - // Now that we have definitely processed leaves up to `view`, we can update - // `last_processed_view` so we don't process these leaves again. We may still fail at this - // point, or shut down, and fail to complete this update. At worst this will lead to us sending - // a duplicate decide event the next time we are called; this is fine as the event consumer is - // required to be idempotent. - tx.upsert( - "event_stream", - ["id", "last_processed_view"], - ["id"], - [(1i32, view.u64() as i64)], - ) - .await?; - - tx.execute(query("DELETE FROM vid_share where view <= $1").bind(view.u64() as i64)) - .await?; +#[async_trait] +impl Provider> for Persistence { + #[tracing::instrument(skip(self))] + async fn fetch(&self, req: LeafRequest) -> Option> { + let mut tx = match self.db.read().await { + Ok(tx) => tx, + Err(err) => { + tracing::warn!("could not open transaction: {err:#}"); + return None; + } + }; - tx.execute(query("DELETE FROM da_proposal where view <= $1").bind(view.u64() as i64)) - .await?; + let (leaf, qc) = match fetch_leaf_from_proposals(&mut tx, req).await { + Ok(res) => res, + Err(err) => { + tracing::info!("requested leaf not found in undecided proposals: {err:#}"); + return None; + } + }; - // Clean up leaves, but do not delete the most recent one (all leaves with a view number less than the given value). - // This is necessary to ensure that, in case of a restart, we can resume from the last decided leaf. - tx.execute(query("DELETE FROM anchor_leaf WHERE view < $1").bind(view.u64() as i64)) - .await?; + match LeafQueryData::new(leaf, qc) { + Ok(leaf) => Some(leaf), + Err(err) => { + tracing::warn!("fetched invalid leaf: {err:#}"); + None + } + } + } +} - // Clean up old proposals. These are not part of the decide event we generate for the consumer, - // so we don't need to return them. - tx.execute(query("DELETE FROM quorum_proposals where view <= $1").bind(view.u64() as i64)) - .await?; +async fn fetch_leaf_from_proposals( + tx: &mut Transaction, + req: LeafRequest, +) -> anyhow::Result<(Leaf, QuorumCertificate)> { + // Look for a quorum proposal corresponding to this leaf. + let (proposal_bytes,) = + query_as::<(Vec,)>("SELECT data FROM quorum_proposals WHERE leaf_hash = $1 LIMIT 1") + .bind(req.expected_leaf.to_string()) + .fetch_one(tx.as_mut()) + .await + .context("fetching proposal")?; - tx.commit().await + // Look for a QC corresponding to this leaf. + let (qc_bytes,) = + query_as::<(Vec,)>("SELECT data FROM quorum_certificate WHERE leaf_hash = $1 LIMIT 1") + .bind(req.expected_leaf.to_string()) + .fetch_one(tx.as_mut()) + .await + .context("fetching QC")?; + + let proposal: Proposal> = + bincode::deserialize(&proposal_bytes).context("deserializing quorum proposal")?; + let qc: QuorumCertificate = + bincode::deserialize(&qc_bytes).context("deserializing quorum certificate")?; + + let leaf = Leaf::from_quorum_proposal(&proposal.data); + Ok((leaf, qc)) } #[cfg(test)] @@ -1070,24 +1464,26 @@ mod testing { Arc::new(TmpDb::init().await) } - async fn connect(db: &Self::Storage) -> Self { + #[allow(refining_impl_trait)] + fn options(db: &Self::Storage) -> Options { #[cfg(not(feature = "embedded-db"))] { - let mut opt: Options = PostgresOptions { + PostgresOptions { port: Some(db.port()), host: Some(db.host()), user: Some("postgres".into()), password: Some("password".into()), ..Default::default() } - .into(); - opt.create().await.unwrap() + .into() } #[cfg(feature = "embedded-db")] { - let mut opt: Options = SqliteOptions { path: db.path() }.into(); - opt.create().await.unwrap() + SqliteOptions { + path: Some(db.path()), + } + .into() } } } @@ -1108,17 +1504,22 @@ mod test { use super::*; use crate::{persistence::testing::TestablePersistence, BLSPubKey, PubKey}; - use espresso_types::{Leaf, NodeState, ValidatedState}; + use espresso_types::{traits::NullEventConsumer, Leaf, NodeState, ValidatedState}; use futures::stream::TryStreamExt; use hotshot_example_types::node_types::TestVersions; use hotshot_types::{ drb::{INITIAL_DRB_RESULT, INITIAL_DRB_SEED_INPUT}, simple_certificate::QuorumCertificate, - traits::signature_key::SignatureKey, + traits::{block_contents::vid_commitment, signature_key::SignatureKey, EncodeBytes}, + vid::vid_scheme, }; + use jf_vid::VidScheme; + use sequencer_utils::test_utils::setup_test; #[tokio::test(flavor = "multi_thread")] async fn test_quorum_proposals_leaf_hash_migration() { + setup_test(); + // Create some quorum proposals to test with. let leaf: Leaf2 = Leaf::genesis(&ValidatedState::default(), &NodeState::mock()) .await @@ -1192,4 +1593,256 @@ mod test { ); } } + + #[tokio::test(flavor = "multi_thread")] + async fn test_fetching_providers() { + setup_test(); + + let tmp = Persistence::tmp_storage().await; + let storage = Persistence::connect(&tmp).await; + + // Mock up some data. + let leaf = Leaf::genesis(&ValidatedState::default(), &NodeState::mock()).await; + let leaf_payload = leaf.block_payload().unwrap(); + let leaf_payload_bytes_arc = leaf_payload.encode(); + let disperse = vid_scheme(2) + .disperse(leaf_payload_bytes_arc.clone()) + .unwrap(); + let payload_commitment = disperse.commit; + let (pubkey, privkey) = BLSPubKey::generated_from_seed_indexed([0; 32], 1); + let vid_share = VidDisperseShare:: { + view_number: ViewNumber::new(0), + payload_commitment, + share: disperse.shares[0].clone(), + common: disperse.common, + recipient_key: pubkey, + } + .to_proposal(&privkey) + .unwrap() + .clone(); + + let quorum_proposal = QuorumProposal2:: { + block_header: leaf.block_header().clone(), + view_number: leaf.view_number(), + justify_qc: leaf.justify_qc().to_qc2(), + upgrade_certificate: None, + view_change_evidence: None, + drb_seed: INITIAL_DRB_SEED_INPUT, + drb_result: INITIAL_DRB_RESULT, + }; + let quorum_proposal_signature = + BLSPubKey::sign(&privkey, &bincode::serialize(&quorum_proposal).unwrap()) + .expect("Failed to sign quorum proposal"); + let quorum_proposal = Proposal { + data: quorum_proposal, + signature: quorum_proposal_signature, + _pd: Default::default(), + }; + + let block_payload_signature = BLSPubKey::sign(&privkey, &leaf_payload_bytes_arc) + .expect("Failed to sign block payload"); + let da_proposal = Proposal { + data: DaProposal:: { + encoded_transactions: leaf_payload_bytes_arc, + metadata: leaf_payload.ns_table().clone(), + view_number: ViewNumber::new(0), + }, + signature: block_payload_signature, + _pd: Default::default(), + }; + + let mut next_quorum_proposal = quorum_proposal.clone(); + next_quorum_proposal.data.view_number += 1; + next_quorum_proposal.data.justify_qc.view_number += 1; + next_quorum_proposal.data.justify_qc.data.leaf_commit = + Committable::commit(&leaf.clone().into()); + let qc = &next_quorum_proposal.data.justify_qc; + + // Add to database. + storage + .append_da(&da_proposal, payload_commitment) + .await + .unwrap(); + storage.append_vid(&vid_share).await.unwrap(); + storage + .append_quorum_proposal(&quorum_proposal) + .await + .unwrap(); + + // Add an extra quorum proposal so we have a QC pointing back at `leaf`. + storage + .append_quorum_proposal(&next_quorum_proposal) + .await + .unwrap(); + + // Fetch it as if we were rebuilding an archive. + assert_eq!( + vid_share.data.common, + storage + .fetch(VidCommonRequest(vid_share.data.payload_commitment)) + .await + .unwrap() + ); + assert_eq!( + leaf_payload, + storage + .fetch(PayloadRequest(vid_share.data.payload_commitment)) + .await + .unwrap() + ); + assert_eq!( + LeafQueryData::new(leaf.clone(), qc.clone().to_qc()).unwrap(), + storage + .fetch(LeafRequest::new( + leaf.block_header().block_number(), + Committable::commit(&leaf), + qc.clone().to_qc().commit() + )) + .await + .unwrap() + ); + } + + /// Test conditions that trigger pruning. + /// + /// This is a configurable test that can be used to test different configurations of GC, + /// `pruning_opt`. The test populates the database with some data for view 1, asserts that it is + /// retained for view 2, and then asserts that it is pruned by view 3. There are various + /// different configurations that can achieve this behavior, such that the data is retained and + /// then pruned due to different logic and code paths. + async fn test_pruning_helper(pruning_opt: ConsensusPruningOptions) { + setup_test(); + + let tmp = Persistence::tmp_storage().await; + let mut opt = Persistence::options(&tmp); + opt.consensus_pruning = pruning_opt; + let storage = opt.create().await.unwrap(); + + let data_view = ViewNumber::new(1); + + // Populate some data. + let leaf = Leaf::genesis(&ValidatedState::default(), &NodeState::mock()).await; + let leaf_payload = leaf.block_payload().unwrap(); + let leaf_payload_bytes_arc = leaf_payload.encode(); + + let disperse = vid_scheme(2) + .disperse(leaf_payload_bytes_arc.clone()) + .unwrap(); + let payload_commitment = vid_commitment(&leaf_payload_bytes_arc, 2); + let (pubkey, privkey) = BLSPubKey::generated_from_seed_indexed([0; 32], 1); + let vid = VidDisperseShare:: { + view_number: data_view, + payload_commitment, + share: disperse.shares[0].clone(), + common: disperse.common, + recipient_key: pubkey, + } + .to_proposal(&privkey) + .unwrap() + .clone(); + let quorum_proposal = QuorumProposal2:: { + block_header: leaf.block_header().clone(), + view_number: data_view, + justify_qc: QuorumCertificate::genesis::( + &ValidatedState::default(), + &NodeState::mock(), + ) + .await + .to_qc2(), + upgrade_certificate: None, + view_change_evidence: None, + drb_seed: INITIAL_DRB_SEED_INPUT, + drb_result: INITIAL_DRB_RESULT, + }; + let quorum_proposal_signature = + BLSPubKey::sign(&privkey, &bincode::serialize(&quorum_proposal).unwrap()) + .expect("Failed to sign quorum proposal"); + let quorum_proposal = Proposal { + data: quorum_proposal, + signature: quorum_proposal_signature, + _pd: Default::default(), + }; + + let block_payload_signature = BLSPubKey::sign(&privkey, &leaf_payload_bytes_arc) + .expect("Failed to sign block payload"); + let da_proposal = Proposal { + data: DaProposal:: { + encoded_transactions: leaf_payload_bytes_arc.clone(), + metadata: leaf_payload.ns_table().clone(), + view_number: data_view, + }, + signature: block_payload_signature, + _pd: Default::default(), + }; + + tracing::info!(?vid, ?da_proposal, ?quorum_proposal, "append data"); + storage.append_vid(&vid).await.unwrap(); + storage + .append_da(&da_proposal, payload_commitment) + .await + .unwrap(); + storage + .append_quorum_proposal(&quorum_proposal) + .await + .unwrap(); + + // The first decide doesn't trigger any garbage collection, even though our usage exceeds + // the target, because of the minimum retention. + tracing::info!("decide view 1"); + storage + .append_decided_leaves(data_view + 1, [], &NullEventConsumer) + .await + .unwrap(); + assert_eq!( + storage.load_vid_share(data_view).await.unwrap().unwrap(), + vid + ); + assert_eq!( + storage.load_da_proposal(data_view).await.unwrap().unwrap(), + da_proposal + ); + assert_eq!( + storage.load_quorum_proposal(data_view).await.unwrap(), + quorum_proposal + ); + + // After another view, our data is beyond the minimum retention (though not the target + // retention) so it gets pruned. + tracing::info!("decide view 2"); + storage + .append_decided_leaves(data_view + 2, [], &NullEventConsumer) + .await + .unwrap(); + assert!(storage.load_vid_share(data_view).await.unwrap().is_none(),); + assert!(storage.load_da_proposal(data_view).await.unwrap().is_none()); + storage.load_quorum_proposal(data_view).await.unwrap_err(); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_pruning_minimum_retention() { + test_pruning_helper(ConsensusPruningOptions { + // Use a very low target usage, to show that we still retain data up to the minimum + // retention even when usage is above target. + target_usage: 0, + minimum_retention: 1, + // Use a very high target retention, so that pruning is only triggered by the minimum + // retention. + target_retention: u64::MAX, + }) + .await + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_pruning_target_retention() { + test_pruning_helper(ConsensusPruningOptions { + target_retention: 1, + // Use a very low minimum retention, so that data is only kept around due to the target + // retention. + minimum_retention: 0, + // Use a very high target usage, so that pruning is only triggered by the target + // retention. + target_usage: u64::MAX, + }) + .await + } } diff --git a/types/Cargo.toml b/types/Cargo.toml index a789916a2..32d0c8e38 100644 --- a/types/Cargo.toml +++ b/types/Cargo.toml @@ -22,7 +22,6 @@ committable = { workspace = true } contract-bindings = { path = "../contract-bindings" } derive_more = { workspace = true } diff-test-bn254 = { git = "https://github.com/EspressoSystems/solidity-bn254.git" } -dyn-clone = { workspace = true } ethers = { workspace = true } fluent-asserter = "0.1.9" futures = { workspace = true } diff --git a/types/src/v0/traits.rs b/types/src/v0/traits.rs index ec4eea666..17b33ad7a 100644 --- a/types/src/v0/traits.rs +++ b/types/src/v0/traits.rs @@ -5,7 +5,6 @@ use std::{cmp::max, collections::BTreeMap, fmt::Debug, ops::Range, sync::Arc}; use anyhow::{bail, ensure, Context}; use async_trait::async_trait; use committable::{Commitment, Committable}; -use dyn_clone::DynClone; use futures::{FutureExt, TryFutureExt}; use hotshot::{types::EventType, HotShotInitializer}; use hotshot_types::{ @@ -415,6 +414,7 @@ impl StateCatchup for Vec { pub trait PersistenceOptions: Clone + Send + Sync + 'static { type Persistence: SequencerPersistence; + fn set_view_retention(&mut self, view_retention: u64); async fn create(&mut self) -> anyhow::Result; async fn reset(self) -> anyhow::Result<()>; } @@ -693,16 +693,13 @@ pub trait SequencerPersistence: Sized + Send + Sync + Clone + 'static { } #[async_trait] -pub trait EventConsumer: Debug + DynClone + Send + Sync { +pub trait EventConsumer: Debug + Send + Sync { async fn handle_event(&self, event: &Event) -> anyhow::Result<()>; } -dyn_clone::clone_trait_object!(EventConsumer); - #[async_trait] impl EventConsumer for Box where - Self: Clone, T: EventConsumer + ?Sized, { async fn handle_event(&self, event: &Event) -> anyhow::Result<()> {