Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce risk of losing data due to garbage collection or restarts #2252

Merged
merged 18 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ cld = "0.5"
derive_more = { version = "1.0", features = ["full"] }
es-version = { git = "https://github.com/EspressoSystems/es-version.git", branch = "main" }
dotenvy = "0.15"
dyn-clone = "1.0"
ethers = { version = "2.0", features = ["solc", "ws"] }
futures = "0.3"
tokio = { version = "1", default-features = false, features = [
Expand Down
2 changes: 0 additions & 2 deletions sequencer-sqlite/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion sequencer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ csv = "1"
derivative = "2.2"
derive_more = { workspace = true }
dotenvy = { workspace = true }
dyn-clone = { workspace = true }
espresso-types = { path = "../types" }
ethers = { workspace = true }
futures = { workspace = true }
Expand Down
21 changes: 21 additions & 0 deletions sequencer/api/migrations/postgres/V401__archive_provider.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
-- Add information needed for consensus storage to act as a provider for archive recovery.

-- Add payload hash to DA proposal, since the query service requests missing payloads by hash.
ALTER TABLE da_proposal
ADD COLUMN payload_hash VARCHAR;
CREATE INDEX da_proposal_payload_hash_idx ON da_proposal (payload_hash);

-- Add payload hash to VID share, since the query service requests missing VID common by payload
-- hash.
ALTER TABLE vid_share
ADD COLUMN payload_hash VARCHAR;
CREATE INDEX vid_share_payload_hash_idx ON vid_share (payload_hash);

-- Add QC storage, since the query service requires missing leaves to be fetched alongside a QC with
-- that leaf hash.
CREATE TABLE quorum_certificate (
view BIGINT PRIMARY KEY,
leaf_hash VARCHAR NOT NULL,
data BYTEA NOT NULL
);
CREATE INDEX quorum_certificate_leaf_hash_idx ON quorum_certificate (leaf_hash);
21 changes: 21 additions & 0 deletions sequencer/api/migrations/sqlite/V201__archive_provider.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
-- Add information needed for consensus storage to act as a provider for archive recovery.

-- Add payload hash to DA proposal, since the query service requests missing payloads by hash.
ALTER TABLE da_proposal
ADD COLUMN payload_hash VARCHAR;
CREATE INDEX da_proposal_payload_hash_idx ON da_proposal (payload_hash);

-- Add payload hash to VID share, since the query service requests missing VID common by payload
-- hash.
ALTER TABLE vid_share
ADD COLUMN payload_hash VARCHAR;
CREATE INDEX vid_share_payload_hash_idx ON vid_share (payload_hash);

-- Add QC storage, since the query service requires missing leaves to be fetched alongside a QC with
-- that leaf hash.
CREATE TABLE quorum_certificate (
view BIGINT PRIMARY KEY,
leaf_hash VARCHAR NOT NULL,
data BLOB NOT NULL
);
CREATE INDEX quorum_certificate_leaf_hash_idx ON quorum_certificate (leaf_hash);
3 changes: 3 additions & 0 deletions sequencer/api/public-env-vars.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ variables = [
"ESPRESSO_SEQUENCER_CATCHUP_MAX_RETRY_DELAY",
"ESPRESSO_SEQUENCER_CDN_ENDPOINT",
"ESPRESSO_SEQUENCER_CHUNK_FETCH_DELAY",
"ESPRESSO_SEQUENCER_CONSENSUS_STORAGE_MINIMUM_RETENTION",
"ESPRESSO_SEQUENCER_CONSENSUS_STORAGE_TARGET_RETENTION",
"ESPRESSO_SEQUENCER_CONSENSUS_STORAGE_TARGET_USAGE",
"ESPRESSO_SEQUENCER_FETCH_RATE_LIMIT",
"ESPRESSO_SEQUENCER_HOTSHOT_ADDRESS",
"ESPRESSO_SEQUENCER_HOTSHOT_EVENT_STREAMING_API_PORT",
Expand Down
188 changes: 181 additions & 7 deletions sequencer/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1066,10 +1066,15 @@ mod api_tests {
};
use hotshot_types::drb::{INITIAL_DRB_RESULT, INITIAL_DRB_SEED_INPUT};
use hotshot_types::{
data::QuorumProposal2, event::LeafInfo, simple_certificate::QuorumCertificate,
traits::node_implementation::ConsensusTime,
data::{DaProposal, QuorumProposal2, VidDisperseShare},
event::LeafInfo,
message::Proposal,
simple_certificate::QuorumCertificate,
traits::{node_implementation::ConsensusTime, signature_key::SignatureKey, EncodeBytes},
vid::vid_scheme,
};

use jf_vid::VidScheme;
use portpicker::pick_unused_port;
use sequencer_utils::test_utils::setup_test;
use std::fmt::Debug;
Expand Down Expand Up @@ -1226,6 +1231,7 @@ mod api_tests {
}

setup_test();
let (pubkey, privkey) = PubKey::generated_from_seed_indexed([0; 32], 1);

let storage = D::create_storage().await;
let persistence = D::persistence_options(&storage).create().await.unwrap();
Expand All @@ -1240,11 +1246,13 @@ mod api_tests {
// Create two non-consecutive leaf chains.
let mut chain1 = vec![];

let genesis = Leaf::genesis(&Default::default(), &NodeState::mock()).await;
let payload = genesis.block_payload().unwrap();
let payload_bytes_arc = payload.encode();
let disperse = vid_scheme(2).disperse(payload_bytes_arc.clone()).unwrap();
let payload_commitment = disperse.commit;
let mut quorum_proposal = QuorumProposal2::<SeqTypes> {
block_header: Leaf::genesis(&Default::default(), &NodeState::mock())
.await
.block_header()
.clone(),
block_header: genesis.block_header().clone(),
view_number: ViewNumber::genesis(),
justify_qc: QuorumCertificate::genesis::<MockSequencerVersions>(
&ValidatedState::default(),
Expand Down Expand Up @@ -1274,6 +1282,50 @@ mod api_tests {
qc.data.leaf_commit = Committable::commit(&leaf);
justify_qc = qc.clone();
chain1.push((leaf.clone(), qc.clone()));

// Include a quorum proposal for each leaf.
let quorum_proposal_signature =
PubKey::sign(&privkey, &bincode::serialize(&quorum_proposal).unwrap())
.expect("Failed to sign quorum_proposal");
persistence
.append_quorum_proposal(&Proposal {
data: quorum_proposal.clone(),
signature: quorum_proposal_signature,
_pd: Default::default(),
})
.await
.unwrap();

// Include VID information for each leaf.
let share = VidDisperseShare::<SeqTypes> {
view_number: leaf.view_number(),
payload_commitment,
share: disperse.shares[0].clone(),
common: disperse.common.clone(),
recipient_key: pubkey,
};
persistence
.append_vid(&share.to_proposal(&privkey).unwrap())
.await
.unwrap();

// Include payload information for each leaf.
let block_payload_signature =
PubKey::sign(&privkey, &payload_bytes_arc).expect("Failed to sign block payload");
let da_proposal_inner = DaProposal::<SeqTypes> {
encoded_transactions: payload_bytes_arc.clone(),
metadata: payload.ns_table().clone(),
view_number: leaf.view_number(),
};
let da_proposal = Proposal {
data: da_proposal_inner,
signature: block_payload_signature,
_pd: Default::default(),
};
persistence
.append_da(&da_proposal, payload_commitment)
.await
.unwrap();
}
// Split into two chains.
let mut chain2 = chain1.split_off(2);
Expand Down Expand Up @@ -1312,15 +1364,137 @@ mod api_tests {
.await
.unwrap();

// Check that the leaves were moved to archive storage.
// Check that the leaves were moved to archive storage, along with payload and VID
// information.
for (leaf, qc) in chain1.iter().chain(&chain2) {
tracing::info!(height = leaf.height(), "check archive");
let qd = data_source.get_leaf(leaf.height() as usize).await.await;
let stored_leaf: Leaf2 = qd.leaf().clone().into();
let stored_qc = qd.qc().clone().to_qc2();
assert_eq!(&stored_leaf, leaf);
assert_eq!(&stored_qc, qc);

data_source
.get_block(leaf.height() as usize)
.await
.try_resolve()
.ok()
.unwrap();
data_source
.get_vid_common(leaf.height() as usize)
.await
.try_resolve()
.ok()
.unwrap();

// Check that all data has been garbage collected for the decided views.
assert!(persistence
.load_da_proposal(leaf.view_number())
.await
.unwrap()
.is_none());
assert!(persistence
.load_vid_share(leaf.view_number())
.await
.unwrap()
.is_none());
assert!(persistence
.load_quorum_proposal(leaf.view_number())
.await
.is_err());
}

// Check that data has _not_ been garbage collected for the missing view.
assert!(persistence
.load_da_proposal(ViewNumber::new(2))
.await
.unwrap()
.is_some());
assert!(persistence
.load_vid_share(ViewNumber::new(2))
.await
.unwrap()
.is_some());
persistence
.load_quorum_proposal(ViewNumber::new(2))
.await
.unwrap();
}

#[tokio::test(flavor = "multi_thread")]
pub async fn test_decide_missing_data<D>()
where
D: TestableSequencerDataSource + Debug + 'static,
{
setup_test();

let storage = D::create_storage().await;
let persistence = D::persistence_options(&storage).create().await.unwrap();
let data_source: Arc<StorageState<network::Memory, NoStorage, _, MockSequencerVersions>> =
Arc::new(StorageState::new(
D::create(D::persistence_options(&storage), Default::default(), false)
.await
.unwrap(),
ApiState::new(future::pending()),
));
let consumer = ApiEventConsumer::from(data_source.clone());

let mut qc = QuorumCertificate::genesis::<MockSequencerVersions>(
&ValidatedState::default(),
&NodeState::mock(),
)
.await
.to_qc2();
let leaf = Leaf::genesis(&ValidatedState::default(), &NodeState::mock()).await;

// Append the genesis leaf. We don't use this for the test, because the update function will
// automatically fill in the missing data for genesis. We just append this to get into a
// consistent state to then append the leaf from view 1, which will have missing data.
tracing::info!(?leaf, ?qc, "decide genesis leaf");
persistence
.append_decided_leaves(
leaf.view_number(),
[(&leaf_info(leaf.clone().into()), qc.clone())],
&consumer,
)
.await
.unwrap();

// Create another leaf, with missing data.
let mut block_header = leaf.block_header().clone();
*block_header.height_mut() += 1;
let qp = QuorumProposal2 {
block_header,
view_number: leaf.view_number() + 1,
justify_qc: qc.clone(),
upgrade_certificate: None,
view_change_evidence: None,
drb_seed: INITIAL_DRB_SEED_INPUT,
drb_result: INITIAL_DRB_RESULT,
};

let leaf = Leaf2::from_quorum_proposal(&qp);
qc.view_number = leaf.view_number();
qc.data.leaf_commit = Committable::commit(&leaf);

// Decide a leaf without the corresponding payload or VID.
tracing::info!(?leaf, ?qc, "append leaf 1");
persistence
.append_decided_leaves(
leaf.view_number(),
[(&leaf_info(leaf.clone()), qc)],
&consumer,
)
.await
.unwrap();

// Check that we still processed the leaf.
assert_eq!(
leaf,
data_source.get_leaf(1).await.await.leaf().clone().into()
);
assert!(data_source.get_vid_common(1).await.is_pending());
assert!(data_source.get_block(1).await.is_pending());
}

fn leaf_info(leaf: Leaf2) -> LeafInfo<SeqTypes> {
Expand Down
23 changes: 15 additions & 8 deletions sequencer/src/api/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use anyhow::{bail, Context};
use clap::Parser;
use espresso_types::{
v0::traits::{EventConsumer, NullEventConsumer, SequencerPersistence},
v0::traits::{EventConsumer, NullEventConsumer, PersistenceOptions, SequencerPersistence},
BlockMerkleTree, PubKey,
};
use futures::{
Expand All @@ -13,6 +13,7 @@ use futures::{
use hotshot_events_service::events::Error as EventStreamingError;
use hotshot_query_service::{
data_source::{ExtensibleDataSource, MetricsDataSource},
fetching::provider::QueryServiceProvider,
status::{self, UpdateStatusData},
ApiState as AppState, Error,
};
Expand All @@ -27,7 +28,7 @@ use vbs::version::StaticVersionType;

use super::{
data_source::{
provider, CatchupDataSource, HotShotConfigDataSource, NodeStateDataSource,
provider, CatchupDataSource, HotShotConfigDataSource, NodeStateDataSource, Provider,
SequencerDataSource, StateSignatureDataSource, SubmitDataSource,
},
endpoints, fs, sql,
Expand Down Expand Up @@ -333,12 +334,18 @@ impl Options {
N: ConnectedNetwork<PubKey>,
P: SequencerPersistence,
{
let ds = sql::DataSource::create(
mod_opt.clone(),
provider::<V>(query_opt.peers.clone(), bind_version),
false,
)
.await?;
let mut provider = Provider::default();

// Use the database itself as a fetching provider: sometimes we can fetch data that is
// missing from the query service from ephemeral consensus storage.
provider = provider.with_provider(mod_opt.clone().create().await?);
// If that fails, fetch missing data from peers.
for peer in query_opt.peers {
tracing::info!("will fetch missing data from {peer}");
provider = provider.with_provider(QueryServiceProvider::new(peer, bind_version));
}

let ds = sql::DataSource::create(mod_opt.clone(), provider, false).await?;
let (metrics, ds, mut app) = self
.init_app_modules(ds, state.clone(), bind_version)
.await?;
Expand Down
Loading
Loading