Skip to content

Commit

Permalink
wrap committee state in Arc<RwLock<>>
Browse files Browse the repository at this point in the history
  • Loading branch information
imabdulbasit committed Dec 12, 2024
1 parent 7f2eec7 commit 6320f3f
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 43 deletions.
2 changes: 1 addition & 1 deletion sequencer/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence, V: Versions> Sequence
pub async fn init(
network_config: NetworkConfig<PubKey>,
validator_config: ValidatorConfig<<SeqTypes as NodeType>::SignatureKey>,
membership: Arc<RwLock<StaticCommittee>>,
membership: StaticCommittee,
instance_state: NodeState,
persistence: P,
network: Arc<N>,
Expand Down
4 changes: 2 additions & 2 deletions sequencer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -480,12 +480,12 @@ pub async fn init_node<P: SequencerPersistence, V: Versions>(
};

// Create the HotShot membership
let membership = Arc::new(RwLock::new(StaticCommittee::new_stake(
let membership = StaticCommittee::new_stake(
network_config.config.known_nodes_with_stake.clone(),
network_config.config.known_nodes_with_stake.clone(),
&instance_state,
network_config.config.epoch_height,
)));
);

// Initialize the Libp2p network
let network = {
Expand Down
87 changes: 48 additions & 39 deletions types/src/v0/impls/stake_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ impl StakeTables {
}

#[derive(Clone, Debug)]
pub struct StaticCommittee {
pub struct CommitteeState {
/// The nodes eligible for leadership.
/// NOTE: This is currently a hack because the DA leader needs to be the quorum
/// leader but without voting rights.
Expand All @@ -108,6 +108,10 @@ pub struct StaticCommittee {

/// TODO:
indexed_da_stake_table: BTreeMap<Epoch, HashMap<PubKey, StakeTableEntry<PubKey>>>,
}
#[derive(Clone, Debug)]
pub struct StaticCommittee {
state: Arc<RwLock<CommitteeState>>,

/// Number of blocks in an epoch
_epoch_size: u64,
Expand Down Expand Up @@ -158,11 +162,11 @@ impl StaticCommittee {
.get_stake_table(l1_block_height, self._contract_address.unwrap())
.await?;

// This works because `get_stake_table` is fetching *all*
// update events and building the table for us. We will need
// more subtlety when start fetching only the events since last update.
self.stake_table = HashSet::from_iter(updates.consensus_stake_table.0);
self.da_stake_table = HashSet::from_iter(updates.da_stake_table.0);
// // This works because `get_stake_table` is fetching *all*
// // update events and building the table for us. We will need
// // more subtlety when start fetching only the events since last update.
// self.stake_table = HashSet::from_iter(updates.consensus_stake_table.0);
// self.da_stake_table = HashSet::from_iter(updates.da_stake_table.0);

Ok(())
}
Expand Down Expand Up @@ -216,20 +220,24 @@ impl StaticCommittee {
let indexed_da_stake_table: BTreeMap<Epoch, _> =
BTreeMap::from([(Epoch::genesis(), indexed_da_stake_table)]);

Self {
let state = CommitteeState {
eligible_leaders,
stake_table: HashSet::from_iter(members),
da_stake_table: HashSet::from_iter(da_members),
indexed_stake_table,
indexed_da_stake_table,
};

Self {
state: Arc::new(RwLock::new(state)),
_epoch_size: epoch_size,
l1_client: instance_state.l1_client.clone(),
_contract_address: instance_state.chain_config.stake_table_contract,
}
}

async fn update_loop(committee: Arc<RwLock<StaticCommittee>>) -> impl Future<Output = ()> {
let l1_client = { committee.read().await.l1_client.clone() };
async fn update_loop(committee: StaticCommittee) -> impl Future<Output = ()> {
let l1_client = committee.l1_client.clone();

let retry_delay = l1_client.retry_delay;

Expand All @@ -248,16 +256,16 @@ impl StaticCommittee {

loop {
{
let mut committee_lock = committee.write().await;
if let Err(err) =
committee_lock.update_stake_table(l1_block_number).await
{
tracing::warn!(
?epoch,
?l1_block_number,
"error updating stake table. err {err}"
);
}
// let mut committee_lock = committee.write().await;
// if let Err(err) =
// committee_lock.update_stake_table(l1_block_number).await
// {
// tracing::warn!(
// ?epoch,
// ?l1_block_number,
// "error updating stake table. err {err}"
// );
// }
}

sleep(retry_delay).await;
Expand All @@ -273,7 +281,7 @@ impl StaticCommittee {
#[error("Could not lookup leader")] // TODO error variants? message?
pub struct LeaderLookupError;

impl Membership<SeqTypes> for Arc<RwLock<StaticCommittee>> {
impl Membership<SeqTypes> for StaticCommittee {
type Error = LeaderLookupError;

// DO NOT USE. Dummy constructor to comply w/ trait.
Expand Down Expand Up @@ -304,20 +312,21 @@ impl Membership<SeqTypes> for Arc<RwLock<StaticCommittee>> {
.filter(|entry| entry.stake() > U256::zero())
.collect();

let committee = StaticCommittee {
let state = CommitteeState {
eligible_leaders,
stake_table: HashSet::from_iter(members),
da_stake_table: HashSet::from_iter(da_members),
// TODO: ??
indexed_stake_table: BTreeMap::new(),
indexed_da_stake_table: BTreeMap::new(),
};
let committee = StaticCommittee {
state: Arc::new(RwLock::new(state)),
_epoch_size: 12, // TODO get the real number from config (I think)
l1_client: L1Client::http(Url::from_str("http:://ab.b").unwrap()),
_contract_address: None,
};

let committee = Arc::new(RwLock::new(committee));

let _handle = task::spawn(StaticCommittee::update_loop(committee.clone()));
committee
}
Expand All @@ -327,15 +336,15 @@ impl Membership<SeqTypes> for Arc<RwLock<StaticCommittee>> {
fn stake_table(&self, _epoch: Epoch) -> Vec<StakeTableEntry<PubKey>> {
let sc = tokio::runtime::Runtime::new()
.unwrap()
.block_on(async { self.read().await });
.block_on(async { self.state.read().await });

sc.stake_table.clone().into_iter().collect()
}
/// Get the stake table for the current view
fn da_stake_table(&self, _epoch: Epoch) -> Vec<StakeTableEntry<PubKey>> {
let sc = tokio::runtime::Runtime::new()
.unwrap()
.block_on(async { self.read().await });
.block_on(async { self.state.read().await });
sc.da_stake_table.clone().into_iter().collect()
}

Expand All @@ -347,7 +356,7 @@ impl Membership<SeqTypes> for Arc<RwLock<StaticCommittee>> {
) -> BTreeSet<PubKey> {
let sc = tokio::runtime::Runtime::new()
.unwrap()
.block_on(async { self.read().await });
.block_on(async { self.state.read().await });
sc.stake_table.iter().map(PubKey::public_key).collect()
}

Expand All @@ -359,7 +368,7 @@ impl Membership<SeqTypes> for Arc<RwLock<StaticCommittee>> {
) -> BTreeSet<PubKey> {
let sc = tokio::runtime::Runtime::new()
.unwrap()
.block_on(async { self.read().await });
.block_on(async { self.state.read().await });

sc.da_stake_table.iter().map(PubKey::public_key).collect()
}
Expand All @@ -372,7 +381,7 @@ impl Membership<SeqTypes> for Arc<RwLock<StaticCommittee>> {
) -> BTreeSet<PubKey> {
let sc = tokio::runtime::Runtime::new()
.unwrap()
.block_on(async { self.read().await });
.block_on(async { self.state.read().await });

sc.eligible_leaders.iter().map(PubKey::public_key).collect()
}
Expand All @@ -381,7 +390,7 @@ impl Membership<SeqTypes> for Arc<RwLock<StaticCommittee>> {
fn stake(&self, pub_key: &PubKey, epoch: Epoch) -> Option<StakeTableEntry<PubKey>> {
let sc = tokio::runtime::Runtime::new()
.unwrap()
.block_on(async { self.read().await });
.block_on(async { self.state.read().await });

// Only return the stake if it is above zero

Expand All @@ -394,7 +403,7 @@ impl Membership<SeqTypes> for Arc<RwLock<StaticCommittee>> {
fn da_stake(&self, pub_key: &PubKey, epoch: Epoch) -> Option<StakeTableEntry<PubKey>> {
let sc = tokio::runtime::Runtime::new()
.unwrap()
.block_on(async { self.read().await });
.block_on(async { self.state.read().await });

// Only return the stake if it is above zero
sc.indexed_da_stake_table
Expand All @@ -406,7 +415,7 @@ impl Membership<SeqTypes> for Arc<RwLock<StaticCommittee>> {
fn has_stake(&self, pub_key: &PubKey, epoch: Epoch) -> bool {
let sc = tokio::runtime::Runtime::new()
.unwrap()
.block_on(async { self.read().await });
.block_on(async { self.state.read().await });

sc.indexed_stake_table
.get(&epoch)
Expand All @@ -418,7 +427,7 @@ impl Membership<SeqTypes> for Arc<RwLock<StaticCommittee>> {
fn has_da_stake(&self, pub_key: &PubKey, epoch: Epoch) -> bool {
let sc = tokio::runtime::Runtime::new()
.unwrap()
.block_on(async { self.read().await });
.block_on(async { self.state.read().await });

sc.indexed_da_stake_table
.get(&epoch)
Expand All @@ -434,7 +443,7 @@ impl Membership<SeqTypes> for Arc<RwLock<StaticCommittee>> {
) -> Result<PubKey, Self::Error> {
let sc = tokio::runtime::Runtime::new()
.unwrap()
.block_on(async { self.read().await });
.block_on(async { self.state.read().await });

let index = *view_number as usize % sc.eligible_leaders.len();
let res = sc.eligible_leaders[index].clone();
Expand All @@ -445,7 +454,7 @@ impl Membership<SeqTypes> for Arc<RwLock<StaticCommittee>> {
fn total_nodes(&self, _epoch: Epoch) -> usize {
let sc = tokio::runtime::Runtime::new()
.unwrap()
.block_on(async { self.read().await });
.block_on(async { self.state.read().await });

sc.stake_table.len()
}
Expand All @@ -454,7 +463,7 @@ impl Membership<SeqTypes> for Arc<RwLock<StaticCommittee>> {
fn da_total_nodes(&self, _epoch: Epoch) -> usize {
let sc = tokio::runtime::Runtime::new()
.unwrap()
.block_on(async { self.read().await });
.block_on(async { self.state.read().await });

sc.da_stake_table.len()
}
Expand All @@ -463,7 +472,7 @@ impl Membership<SeqTypes> for Arc<RwLock<StaticCommittee>> {
fn success_threshold(&self, _epoch: Epoch) -> NonZeroU64 {
let sc = tokio::runtime::Runtime::new()
.unwrap()
.block_on(async { self.read().await });
.block_on(async { self.state.read().await });

NonZeroU64::new(((sc.stake_table.len() as u64 * 2) / 3) + 1).unwrap()
}
Expand All @@ -472,7 +481,7 @@ impl Membership<SeqTypes> for Arc<RwLock<StaticCommittee>> {
fn da_success_threshold(&self, _epoch: Epoch) -> NonZeroU64 {
let sc = tokio::runtime::Runtime::new()
.unwrap()
.block_on(async { self.read().await });
.block_on(async { self.state.read().await });

NonZeroU64::new(((sc.da_stake_table.len() as u64 * 2) / 3) + 1).unwrap()
}
Expand All @@ -481,7 +490,7 @@ impl Membership<SeqTypes> for Arc<RwLock<StaticCommittee>> {
fn failure_threshold(&self, _epoch: Epoch) -> NonZeroU64 {
let sc = tokio::runtime::Runtime::new()
.unwrap()
.block_on(async { self.read().await });
.block_on(async { self.state.read().await });

NonZeroU64::new(((sc.stake_table.len() as u64) / 3) + 1).unwrap()
}
Expand All @@ -490,7 +499,7 @@ impl Membership<SeqTypes> for Arc<RwLock<StaticCommittee>> {
fn upgrade_threshold(&self, _epoch: Epoch) -> NonZeroU64 {
let sc = tokio::runtime::Runtime::new()
.unwrap()
.block_on(async { self.read().await });
.block_on(async { self.state.read().await });

NonZeroU64::new(max(
(sc.stake_table.len() as u64 * 9) / 10,
Expand Down
2 changes: 1 addition & 1 deletion types/src/v0/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ impl NodeType for SeqTypes {
type Transaction = Transaction;
type InstanceState = NodeState;
type ValidatedState = ValidatedState;
type Membership = Arc<RwLock<StaticCommittee>>;
type Membership = StaticCommittee;
type BuilderSignatureKey = FeeAccount;
type AuctionResult = SolverAuctionResults;
}
Expand Down

0 comments on commit 6320f3f

Please sign in to comment.