Skip to content

Commit

Permalink
Use all available threads for aggregation verification
Browse files Browse the repository at this point in the history
  • Loading branch information
hrxi committed Nov 25, 2024
1 parent 8336a5b commit a5c54b8
Showing 1 changed file with 31 additions and 25 deletions.
56 changes: 31 additions & 25 deletions handel/src/aggregation.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
use std::{
num::NonZeroUsize,
pin::Pin,
task::{Context, Poll},
thread,
};

use futures::{
future::{BoxFuture, FutureExt},
stream::{BoxStream, Stream, StreamExt},
};
use nimiq_time::{interval, Interval};
use nimiq_utils::stream::FuturesUnordered;

use crate::{
config::Config,
Expand Down Expand Up @@ -68,12 +71,15 @@ where

/// Future of the currently verified pending contribution.
/// There is only ever one contribution being verified at a time.
current_verification:
Option<BoxFuture<'static, (VerificationResult, PendingContribution<P::Contribution>)>>,
current_verifications: FuturesUnordered<
BoxFuture<'static, (VerificationResult, PendingContribution<P::Contribution>)>,
>,

/// The final result of the aggregation once it has been produced.
/// A `Some(_)` value here indicates that the aggregation has finished.
final_result: Option<P::Contribution>,

available_parallelism: usize,
}

impl<TId, P, N> Aggregation<TId, P, N>
Expand Down Expand Up @@ -131,8 +137,9 @@ where
network: sender,
start_level_interval,
periodic_update_interval,
current_verification: None,
current_verifications: FuturesUnordered::new(),
final_result: None,
available_parallelism: thread::available_parallelism().map_or(1, NonZeroUsize::get),
}
}

Expand Down Expand Up @@ -406,7 +413,7 @@ where
return Some(result);
}

self.current_verification = Some(fut);
self.current_verifications.push(fut);
None
}

Expand Down Expand Up @@ -466,27 +473,25 @@ where
}

// Poll the verification future if there is one.
if let Some(future) = &mut self.current_verification {
if let Poll::Ready((result, contribution)) = future.poll_unpin(cx) {
// If a result is produced, unset the future such that a new one can take its place.
self.current_verification = None;

if result.is_ok() {
// If the contribution was successfully verified, apply it and return the new
// best aggregate.
best_aggregate = Some(self.apply_contribution(contribution))
} else {
// Verification failed, ban sender.
warn!(
id = %self.protocol.identify(),
?result,
?contribution,
"Rejecting invalid contribution"
);
self.network.ban_node(contribution.origin);
}
while let Poll::Ready(Some((result, contribution))) =
self.current_verifications.poll_next_unpin(cx)
{
if result.is_ok() {
// If the contribution was successfully verified, apply it and return the new
// best aggregate.
best_aggregate = Some(self.apply_contribution(contribution));
break;
} else {
// Verification failed, ban sender.
warn!(
id = %self.protocol.identify(),
?result,
?contribution,
"Rejecting invalid contribution"
);
self.network.ban_node(contribution.origin);
}
};
}

// Check if the automatic update interval triggers, if so perform the update.
while let Poll::Ready(_instant) = self.periodic_update_interval.poll_next_unpin(cx) {
Expand All @@ -505,7 +510,8 @@ where
// does not have produced a value yet. This is necessary as the verification future could
// resolve immediately producing a second item for the stream. As the new best aggregate
// will be returned, this stream will be polled again creating the future in the next poll.
if self.current_verification.is_none() && best_aggregate.is_none() {
if self.current_verifications.len() < self.available_parallelism && best_aggregate.is_none()
{
// Get the next best pending contribution.
while let Poll::Ready(Some(contribution)) =
self.pending_contributions.poll_next_unpin(cx)
Expand Down

0 comments on commit a5c54b8

Please sign in to comment.