Skip to content

Commit

Permalink
3966 Remove clone from Membership and wrap it in Arc<RwLock>>
Browse files Browse the repository at this point in the history
  • Loading branch information
pls148 committed Dec 19, 2024
1 parent a927746 commit c36daea
Show file tree
Hide file tree
Showing 56 changed files with 787 additions and 446 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ name = "whitelist-push-cdn"
path = "push-cdn/whitelist-adapter.rs"

[dependencies]
async-lock = { workspace = true }
async-trait = { workspace = true }

cdn-broker = { workspace = true, features = ["global-permits"] }
Expand Down
66 changes: 38 additions & 28 deletions crates/examples/infra/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use std::{
time::Instant,
};

use async_lock::RwLock;
use async_trait::async_trait;
use cdn_broker::reexports::crypto::signature::KeyPair;
use chrono::Utc;
Expand Down Expand Up @@ -350,13 +351,17 @@ pub trait RunDa<
config: NetworkConfig<TYPES::SignatureKey>,
validator_config: ValidatorConfig<TYPES::SignatureKey>,
libp2p_advertise_address: Option<String>,
membership: &Arc<RwLock<<TYPES as NodeType>::Membership>>,
) -> Self;

/// Initializes the genesis state and HotShot instance; does not start HotShot consensus
/// # Panics if it cannot generate a genesis block, fails to initialize HotShot, or cannot
/// get the anchored view
/// Note: sequencing leaf does not have state, so does not return state
async fn initialize_state_and_hotshot(&self) -> SystemContextHandle<TYPES, NODE, V> {
async fn initialize_state_and_hotshot(
&self,
membership: Arc<RwLock<<TYPES as NodeType>::Membership>>,
) -> SystemContextHandle<TYPES, NODE, V> {
let initializer =
hotshot::HotShotInitializer::<TYPES>::from_genesis::<V>(TestInstanceState::default())
.await
Expand All @@ -371,20 +376,6 @@ pub trait RunDa<

let network = self.network();

let all_nodes = if cfg!(feature = "fixed-leader-election") {
let mut vec = config.config.known_nodes_with_stake.clone();
vec.truncate(config.config.fixed_leader_for_gpuvid);
vec
} else {
config.config.known_nodes_with_stake.clone()
};

let da_nodes = config.config.known_da_nodes.clone();

// Create the quorum membership from all nodes, specifying the committee
// as the known da nodes
let memberships = <TYPES as NodeType>::Membership::new(all_nodes, da_nodes);

let marketplace_config = MarketplaceConfig {
auction_results_provider: TestAuctionResultsProvider::<TYPES>::default().into(),
// TODO: we need to pass a valid fallback builder url here somehow
Expand All @@ -396,7 +387,7 @@ pub trait RunDa<
sk,
config.node_index,
config.config,
memberships,
membership,
Arc::from(network),
initializer,
ConsensusMetricsValue::default(),
Expand Down Expand Up @@ -526,13 +517,15 @@ pub trait RunDa<
}
}
}
let consensus_lock = context.hotshot.consensus();
let consensus = consensus_lock.read().await;
let num_eligible_leaders = context
.hotshot
.memberships
.read()
.await
.committee_leaders(TYPES::View::genesis(), TYPES::Epoch::genesis())
.len();
let consensus_lock = context.hotshot.consensus();
let consensus = consensus_lock.read().await;
let total_num_views = usize::try_from(consensus.locked_view().u64()).unwrap();
// `failed_num_views` could include uncommitted views
let failed_num_views = total_num_views - num_successful_commits;
Expand Down Expand Up @@ -622,6 +615,7 @@ where
config: NetworkConfig<TYPES::SignatureKey>,
validator_config: ValidatorConfig<TYPES::SignatureKey>,
_libp2p_advertise_address: Option<String>,
_membership: &Arc<RwLock<<TYPES as NodeType>::Membership>>,
) -> PushCdnDaRun<TYPES> {
// Convert to the Push-CDN-compatible type
let keypair = KeyPair {
Expand Down Expand Up @@ -708,6 +702,7 @@ where
config: NetworkConfig<TYPES::SignatureKey>,
validator_config: ValidatorConfig<TYPES::SignatureKey>,
libp2p_advertise_address: Option<String>,
membership: &Arc<RwLock<<TYPES as NodeType>::Membership>>,
) -> Libp2pDaRun<TYPES> {
// Extrapolate keys for ease of use
let public_key = &validator_config.public_key;
Expand Down Expand Up @@ -736,19 +731,14 @@ where
.to_string()
};

// Create the quorum membership from the list of known nodes
let all_nodes = config.config.known_nodes_with_stake.clone();
let da_nodes = config.config.known_da_nodes.clone();
let quorum_membership = TYPES::Membership::new(all_nodes, da_nodes);

// Derive the bind address
let bind_address =
derive_libp2p_multiaddr(&bind_address).expect("failed to derive bind address");

// Create the Libp2p network
let libp2p_network = Libp2pNetwork::from_config(
config.clone(),
quorum_membership,
Arc::clone(membership),
GossipConfig::default(),
RequestResponseConfig::default(),
bind_address,
Expand Down Expand Up @@ -820,6 +810,7 @@ where
config: NetworkConfig<TYPES::SignatureKey>,
validator_config: ValidatorConfig<TYPES::SignatureKey>,
libp2p_advertise_address: Option<String>,
membership: &Arc<RwLock<<TYPES as NodeType>::Membership>>,
) -> CombinedDaRun<TYPES> {
// Initialize our Libp2p network
let libp2p_network: Libp2pDaRun<TYPES> = <Libp2pDaRun<TYPES> as RunDa<
Expand All @@ -831,6 +822,7 @@ where
config.clone(),
validator_config.clone(),
libp2p_advertise_address.clone(),
membership,
)
.await;

Expand All @@ -844,6 +836,7 @@ where
config.clone(),
validator_config.clone(),
libp2p_advertise_address,
membership,
)
.await;

Expand Down Expand Up @@ -878,6 +871,7 @@ where
}
}

#[allow(clippy::too_many_lines)]
/// Main entry point for validators
/// # Panics
/// if unable to get the local ip address
Expand Down Expand Up @@ -974,11 +968,27 @@ pub async fn main_entry_point<
.join(",")
);

let all_nodes = if cfg!(feature = "fixed-leader-election") {
let mut vec = run_config.config.known_nodes_with_stake.clone();
vec.truncate(run_config.config.fixed_leader_for_gpuvid);
vec
} else {
run_config.config.known_nodes_with_stake.clone()
};
let membership = Arc::new(RwLock::new(<TYPES as NodeType>::Membership::new(
all_nodes,
run_config.config.known_da_nodes.clone(),
)));

info!("Initializing networking");
let run =
RUNDA::initialize_networking(run_config.clone(), validator_config, args.advertise_address)
.await;
let hotshot = run.initialize_state_and_hotshot().await;
let run = RUNDA::initialize_networking(
run_config.clone(),
validator_config,
args.advertise_address,
&membership,
)
.await;
let hotshot = run.initialize_state_and_hotshot(membership).await;

if let Some(task) = builder_task {
task.start(Box::new(hotshot.event_stream()));
Expand Down
25 changes: 17 additions & 8 deletions crates/hotshot/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ pub struct SystemContext<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versi
pub network: Arc<I::Network>,

/// Memberships used by consensus
pub memberships: Arc<TYPES::Membership>,
pub memberships: Arc<RwLock<TYPES::Membership>>,

/// the metrics that the implementor is using.
metrics: Arc<ConsensusMetricsValue>,
Expand Down Expand Up @@ -199,7 +199,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> SystemContext<T
private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
nonce: u64,
config: HotShotConfig<TYPES::SignatureKey>,
memberships: TYPES::Membership,
memberships: Arc<RwLock<TYPES::Membership>>,
network: Arc<I::Network>,
initializer: HotShotInitializer<TYPES>,
metrics: ConsensusMetricsValue,
Expand Down Expand Up @@ -252,7 +252,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> SystemContext<T
private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
nonce: u64,
config: HotShotConfig<TYPES::SignatureKey>,
memberships: TYPES::Membership,
memberships: Arc<RwLock<TYPES::Membership>>,
network: Arc<I::Network>,
initializer: HotShotInitializer<TYPES>,
metrics: ConsensusMetricsValue,
Expand Down Expand Up @@ -365,7 +365,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> SystemContext<T
start_view: initializer.start_view,
start_epoch: initializer.start_epoch,
network,
memberships: Arc::new(memberships),
memberships,
metrics: Arc::clone(&consensus_metrics),
internal_event_stream: (internal_tx, internal_rx.deactivate()),
output_event_stream: (external_tx.clone(), external_rx.clone().deactivate()),
Expand Down Expand Up @@ -512,6 +512,15 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> SystemContext<T
})?;

spawn(async move {
let memberships_da_committee_members = api
.memberships
.read()
.await
.da_committee_members(view_number, epoch)
.iter()
.cloned()
.collect();

join! {
// TODO We should have a function that can return a network error if there is one
// but first we'd need to ensure our network implementations can support that
Expand All @@ -523,7 +532,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> SystemContext<T
api
.network.da_broadcast_message(
serialized_message,
api.memberships.da_committee_members(view_number, epoch).iter().cloned().collect(),
memberships_da_committee_members,
BroadcastDelay::None,
),
api
Expand Down Expand Up @@ -608,7 +617,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> SystemContext<T
private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
node_id: u64,
config: HotShotConfig<TYPES::SignatureKey>,
memberships: TYPES::Membership,
memberships: Arc<RwLock<TYPES::Membership>>,
network: Arc<I::Network>,
initializer: HotShotInitializer<TYPES>,
metrics: ConsensusMetricsValue,
Expand Down Expand Up @@ -771,7 +780,7 @@ where
private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
nonce: u64,
config: HotShotConfig<TYPES::SignatureKey>,
memberships: TYPES::Membership,
memberships: Arc<RwLock<TYPES::Membership>>,
network: Arc<I::Network>,
initializer: HotShotInitializer<TYPES>,
metrics: ConsensusMetricsValue,
Expand All @@ -787,7 +796,7 @@ where
private_key.clone(),
nonce,
config.clone(),
memberships.clone(),
Arc::clone(&memberships),
Arc::clone(&network),
initializer.clone(),
metrics.clone(),
Expand Down
13 changes: 7 additions & 6 deletions crates/hotshot/src/tasks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ pub fn add_response_task<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versi
) {
let state = NetworkResponseState::<TYPES>::new(
handle.hotshot.consensus(),
(*handle.hotshot.memberships).clone().into(),
Arc::clone(&handle.hotshot.memberships),
handle.public_key().clone(),
handle.private_key().clone(),
handle.hotshot.id,
Expand Down Expand Up @@ -190,7 +190,7 @@ pub fn add_network_event_task<
>(
handle: &mut SystemContextHandle<TYPES, I, V>,
network: Arc<NET>,
membership: TYPES::Membership,
membership: Arc<RwLock<TYPES::Membership>>,
) {
let network_state: NetworkEventTaskState<_, V, _, _> = NetworkEventTaskState {
network,
Expand Down Expand Up @@ -321,7 +321,7 @@ where
private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
nonce: u64,
config: HotShotConfig<TYPES::SignatureKey>,
memberships: TYPES::Membership,
memberships: Arc<RwLock<TYPES::Membership>>,
network: Arc<I::Network>,
initializer: HotShotInitializer<TYPES>,
metrics: ConsensusMetricsValue,
Expand Down Expand Up @@ -516,16 +516,17 @@ where
/// Adds the `NetworkEventTaskState` tasks possibly modifying them as well.
fn add_network_event_tasks(&self, handle: &mut SystemContextHandle<TYPES, I, V>) {
let network = Arc::clone(&handle.network);
let memberships = Arc::clone(&handle.memberships);

self.add_network_event_task(handle, Arc::clone(&network), (*handle.memberships).clone());
self.add_network_event_task(handle, network, memberships);
}

/// Adds a `NetworkEventTaskState` task. Can be reimplemented to modify its behaviour.
fn add_network_event_task(
&self,
handle: &mut SystemContextHandle<TYPES, I, V>,
channel: Arc<<I as NodeImplementation<TYPES>>::Network>,
membership: TYPES::Membership,
membership: Arc<RwLock<TYPES::Membership>>,
) {
add_network_event_task(handle, channel, membership);
}
Expand Down Expand Up @@ -563,6 +564,6 @@ pub fn add_network_event_tasks<TYPES: NodeType, I: NodeImplementation<TYPES>, V:
add_network_event_task(
handle,
Arc::clone(&handle.network),
(*handle.memberships).clone(),
Arc::clone(&handle.memberships),
);
}
Loading

0 comments on commit c36daea

Please sign in to comment.