Skip to content

Commit

Permalink
[consensus] sync improvements to help slow nodes sync better (#15364)
Browse files Browse the repository at this point in the history
* [qs] grace period before GC committed batches

* [consensus] trigger sync based on remote LI timestamp

(cherry picked from commit d672085)
  • Loading branch information
ibalajiarun committed Nov 22, 2024
1 parent 527870e commit 45ed5f9
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 5 deletions.
7 changes: 7 additions & 0 deletions consensus/src/block_storage/sync_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,13 @@ impl BlockStore {
&& !self.block_exists(li.commit_info().id()))
|| self.commit_root().round() + 30.max(2 * self.vote_back_pressure_limit)
< li.commit_info().round()
// If the LI commit block timestamp is more than 30 secs ahead of self commit block
// timestamp, sync to the ledger info
|| li
.commit_info()
.timestamp_usecs()
.saturating_sub(self.commit_root().timestamp_usecs())
>= Duration::from_secs(30).as_micros() as u64
}

/// Checks if quorum certificate can be inserted in block store without RPC
Expand Down
11 changes: 9 additions & 2 deletions consensus/src/quorum_store/batch_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ pub struct BatchStore {
batch_quota: usize,
validator_signer: ValidatorSigner,
persist_subscribers: DashMap<HashValue, Vec<oneshot::Sender<PersistedValue>>>,
expiration_buffer_usecs: u64,
}

impl BatchStore {
Expand All @@ -128,6 +129,7 @@ impl BatchStore {
db_quota: usize,
batch_quota: usize,
validator_signer: ValidatorSigner,
expiration_buffer_usecs: u64,
) -> Self {
let db_clone = db.clone();
let batch_store = Self {
Expand All @@ -142,6 +144,7 @@ impl BatchStore {
batch_quota,
validator_signer,
persist_subscribers: DashMap::new(),
expiration_buffer_usecs,
};
let db_content = db_clone
.get_all_batches()
Expand Down Expand Up @@ -283,15 +286,19 @@ impl BatchStore {
// pub(crate) for testing
#[allow(clippy::unwrap_used)]
pub(crate) fn clear_expired_payload(&self, certified_time: u64) -> Vec<HashValue> {
let expired_digests = self.expirations.lock().unwrap().expire(certified_time);
// To help slow nodes catch up via execution without going to state sync we keep the blocks for 60 extra seconds
// after the expiration time. This will help remote peers fetch batches that just expired but are within their
// execution window.
let expiration_time = certified_time.saturating_sub(self.expiration_buffer_usecs);
let expired_digests = self.expirations.lock().unwrap().expire(expiration_time);
let mut ret = Vec::new();
for h in expired_digests {
let removed_value = match self.db_cache.entry(h) {
Occupied(entry) => {
// We need to check up-to-date expiration again because receiving the same
// digest with a higher expiration would update the persisted value and
// effectively extend the expiration.
if entry.get().expiration() <= certified_time {
if entry.get().expiration() <= expiration_time {
self.persist_subscribers.remove(entry.get().digest());
Some(entry.remove())
} else {
Expand Down
1 change: 1 addition & 0 deletions consensus/src/quorum_store/quorum_store_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ impl InnerBuilder {
self.config.db_quota,
self.config.batch_quota,
signer,
Duration::from_secs(60).as_micros() as u64,
));
self.batch_store = Some(batch_store.clone());
let batch_reader = Arc::new(BatchReaderImpl::new(batch_store.clone(), batch_requester));
Expand Down
1 change: 1 addition & 0 deletions consensus/src/quorum_store/tests/batch_store_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ pub fn batch_store_for_test(memory_quota: usize) -> Arc<BatchStore> {
2001, // db quota
2001, // batch quota
signers[0].clone(),
0,
))
}

Expand Down
6 changes: 3 additions & 3 deletions testsuite/forge-cli/src/suites/realistic_environment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,9 +303,9 @@ pub(crate) fn realistic_env_max_load_test(
.add_system_metrics_threshold(SystemMetricsThreshold::new(
// Check that we don't use more than 18 CPU cores for 15% of the time.
MetricsThreshold::new(25.0, 15),
// Memory starts around 7GB, and grows around 1.4GB/hr in this test.
// Memory starts around 8GB, and grows around 1.4GB/hr in this test.
// Check that we don't use more than final expected memory for more than 20% of the time.
MetricsThreshold::new_gb(7.0 + 1.4 * (duration_secs as f64 / 3600.0), 20),
MetricsThreshold::new_gb(8.0 + 1.4 * (duration_secs as f64 / 3600.0), 20),
))
.add_no_restarts()
.add_wait_for_catchup_s(
Expand All @@ -316,7 +316,7 @@ pub(crate) fn realistic_env_max_load_test(
.add_latency_threshold(4.5, LatencyType::P70)
.add_chain_progress(StateProgressThreshold {
max_non_epoch_no_progress_secs: 15.0,
max_epoch_no_progress_secs: 15.0,
max_epoch_no_progress_secs: 16.0,
max_non_epoch_round_gap: 4,
max_epoch_round_gap: 4,
});
Expand Down

0 comments on commit 45ed5f9

Please sign in to comment.