From a5c54b84d89adf366b11fab6fa5045f63d5fba21 Mon Sep 17 00:00:00 2001 From: hrxi Date: Mon, 25 Nov 2024 18:47:27 +0100 Subject: [PATCH] Use all available threads for aggregation verification --- handel/src/aggregation.rs | 56 ++++++++++++++++++++++----------------- 1 file changed, 31 insertions(+), 25 deletions(-) diff --git a/handel/src/aggregation.rs b/handel/src/aggregation.rs index ec2b30abd7..087ebed5cc 100644 --- a/handel/src/aggregation.rs +++ b/handel/src/aggregation.rs @@ -1,6 +1,8 @@ use std::{ + num::NonZeroUsize, pin::Pin, task::{Context, Poll}, + thread, }; use futures::{ @@ -8,6 +10,7 @@ use futures::{ stream::{BoxStream, Stream, StreamExt}, }; use nimiq_time::{interval, Interval}; +use nimiq_utils::stream::FuturesUnordered; use crate::{ config::Config, @@ -68,12 +71,15 @@ where /// Future of the currently verified pending contribution. /// There is only ever one contribution being verified at a time. - current_verification: - Option)>>, + current_verifications: FuturesUnordered< + BoxFuture<'static, (VerificationResult, PendingContribution)>, + >, /// The final result of the aggregation once it has been produced. /// A `Some(_)` value here indicates that the aggregation has finished. final_result: Option, + + available_parallelism: usize, } impl Aggregation @@ -131,8 +137,9 @@ where network: sender, start_level_interval, periodic_update_interval, - current_verification: None, + current_verifications: FuturesUnordered::new(), final_result: None, + available_parallelism: thread::available_parallelism().map_or(1, NonZeroUsize::get), } } @@ -406,7 +413,7 @@ where return Some(result); } - self.current_verification = Some(fut); + self.current_verifications.push(fut); None } @@ -466,27 +473,25 @@ where } // Poll the verification future if there is one. - if let Some(future) = &mut self.current_verification { - if let Poll::Ready((result, contribution)) = future.poll_unpin(cx) { - // If a result is produced, unset the future such that a new one can take its place. - self.current_verification = None; - - if result.is_ok() { - // If the contribution was successfully verified, apply it and return the new - // best aggregate. - best_aggregate = Some(self.apply_contribution(contribution)) - } else { - // Verification failed, ban sender. - warn!( - id = %self.protocol.identify(), - ?result, - ?contribution, - "Rejecting invalid contribution" - ); - self.network.ban_node(contribution.origin); - } + while let Poll::Ready(Some((result, contribution))) = + self.current_verifications.poll_next_unpin(cx) + { + if result.is_ok() { + // If the contribution was successfully verified, apply it and return the new + // best aggregate. + best_aggregate = Some(self.apply_contribution(contribution)); + break; + } else { + // Verification failed, ban sender. + warn!( + id = %self.protocol.identify(), + ?result, + ?contribution, + "Rejecting invalid contribution" + ); + self.network.ban_node(contribution.origin); } - }; + } // Check if the automatic update interval triggers, if so perform the update. while let Poll::Ready(_instant) = self.periodic_update_interval.poll_next_unpin(cx) { @@ -505,7 +510,8 @@ where // does not have produced a value yet. This is necessary as the verification future could // resolve immediately producing a second item for the stream. As the new best aggregate // will be returned, this stream will be polled again creating the future in the next poll. - if self.current_verification.is_none() && best_aggregate.is_none() { + if self.current_verifications.len() < self.available_parallelism && best_aggregate.is_none() + { // Get the next best pending contribution. while let Poll::Ready(Some(contribution)) = self.pending_contributions.poll_next_unpin(cx)