Skip to content

Commit

Permalink
Merge branch 'develop' into pectra-devnet4
Browse files Browse the repository at this point in the history
  • Loading branch information
Tumas committed Nov 20, 2024
2 parents c664f18 + b5b2c95 commit 201219d
Show file tree
Hide file tree
Showing 6 changed files with 255 additions and 76 deletions.
132 changes: 132 additions & 0 deletions eth1_api/src/eth1_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,7 @@ struct RawForkChoiceUpdatedResponse {
}

#[derive(Debug, Error)]
#[cfg_attr(test, derive(PartialEq, Eq))]
enum Error {
#[error("all Eth1 RPC endpoints exhausted")]
EndpointsExhausted,
Expand Down Expand Up @@ -589,6 +590,137 @@ mod tests {

use super::*;

#[tokio::test]
async fn test_eth1_endpoints_error_with_no_endpoints() -> Result<()> {
let config = Arc::new(Config::mainnet());
let auth = Arc::default();

let eth1_api = Arc::new(Eth1Api::new(
config,
Client::new(),
auth,
vec![],
None,
None,
));

assert!(eth1_api.el_offline().await);
assert_eq!(eth1_api.current_endpoint().await, None);

assert_eq!(
eth1_api
.current_head_number()
.await
.expect_err("Eth1Api with no endpoints should return an error")
.downcast::<Error>()?,
Error::NoEndpointsProvided,
);

Ok(())
}

#[tokio::test]
async fn test_eth1_endpoints_error_with_single_endpoint() -> Result<()> {
let server = MockServer::start();

server.mock(|when, then| {
when.method(Method::POST).path("/");
then.status(500).body("{}");
});

let config = Arc::new(Config::mainnet());
let auth = Arc::default();
let server_url = server.url("/").parse()?;

let eth1_api = Arc::new(Eth1Api::new(
config,
Client::new(),
auth,
vec![server_url],
None,
None,
));

assert!(!eth1_api.el_offline().await);
assert_eq!(
eth1_api
.current_head_number()
.await
.expect_err("500 response should be a an error")
.downcast::<Error>()?,
Error::EndpointsExhausted,
);

// Despite the endpoint returning an error, it remains the only available option
assert!(eth1_api.current_endpoint().await.is_some());
assert!(eth1_api.el_offline().await);

Ok(())
}

#[tokio::test]
async fn test_eth1_endpoints_error_with_multiple_endpoints() -> Result<()> {
let server = MockServer::start();

server.mock(|when, then| {
when.method(Method::POST).path("/");
then.status(500).body("{}");
});

let body = json!({
"jsonrpc": "2.0",
"id": 1,
"result": "0x1d243",
});

server.mock(|when, then| {
when.method(Method::POST).path("/next");
then.status(200).body(body.to_string());
});

let config = Arc::new(Config::mainnet());
let auth = Arc::default();
let server_url = server.url("/").parse()?;
let next_server_url = server.url("/next").parse()?;

let eth1_api = Arc::new(Eth1Api::new(
config,
Client::new(),
auth,
vec![server_url, next_server_url],
None,
None,
));

// Set to use the primary endpoint which is not a fallback
assert!(!eth1_api.el_offline().await);
assert!(!eth1_api
.current_endpoint()
.await
.expect("endpoint should be avaialble")
.is_fallback());

assert_eq!(
eth1_api
.current_head_number()
.await
.expect("the fallback endpoint should be working"),
119_363,
);

// Expect to use the fallback endpoint when the primary endpoint returns an error
assert!(eth1_api
.current_endpoint()
.await
.expect("the fallback endpoint should be avaialble")
.is_fallback());

// Even though the primary endpoint is offline, eth1_api itself is not offline
assert!(!eth1_api.el_offline().await);

Ok(())
}

#[tokio::test]
async fn test_bellatrix_payload_deserialization_with_real_response() -> Result<()> {
let body = json!({
Expand Down
11 changes: 3 additions & 8 deletions fork_choice_control/src/block_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use types::{
nonstandard::{BlockRewards, Phase, SlashingKind},
phase0::primitives::H256,
preset::Preset,
traits::{BeaconBlock as _, BeaconState as _, SignedBeaconBlock as _},
traits::{BeaconBlock as _, SignedBeaconBlock as _},
};

#[derive(Constructor)]
Expand Down Expand Up @@ -165,16 +165,11 @@ impl<P: Preset> BlockProcessor<P> {
let block_slot = block.message().slot();

// > Make a copy of the state to avoid mutability issues
let mut state = self
let state = self
.state_cache
.before_or_at_slot(store, parent.block_root, block_slot)
.try_state_at_slot(store, parent.block_root, block_slot)?
.unwrap_or_else(|| parent.state(store));

// > Process slots (including those with no blocks) since block
if state.slot() < block_slot {
combined::process_slots(&self.chain_config, state.make_mut(), block_slot)?;
}

combined::process_block_for_gossip(&self.chain_config, &state, block)?;

Ok(None)
Expand Down
8 changes: 4 additions & 4 deletions fork_choice_control/src/mutator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -650,7 +650,7 @@ where
Ok(AggregateAndProofAction::Accept {
aggregate_and_proof,
attesting_indices,
is_superset,
is_subset_aggregate,
}) => {
if let Some(metrics) = self.metrics.as_ref() {
metrics.register_mutator_aggregate_and_proof(&["accepted"]);
Expand All @@ -671,10 +671,10 @@ where
let (gossip_id, sender) = origin.split();

if let Some(gossip_id) = gossip_id {
if is_superset {
P2pMessage::Accept(gossip_id).send(&self.p2p_tx);
} else {
if is_subset_aggregate {
P2pMessage::Ignore(gossip_id).send(&self.p2p_tx);
} else {
P2pMessage::Accept(gossip_id).send(&self.p2p_tx);
}
}

Expand Down
2 changes: 1 addition & 1 deletion fork_choice_store/src/misc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,7 @@ pub enum AggregateAndProofAction<P: Preset> {
Accept {
aggregate_and_proof: Arc<SignedAggregateAndProof<P>>,
attesting_indices: AttestingIndices<P>,
is_superset: bool,
is_subset_aggregate: bool,
},
Ignore,
DelayUntilBlock(Arc<SignedAggregateAndProof<P>>, H256),
Expand Down
58 changes: 12 additions & 46 deletions fork_choice_store/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1057,13 +1057,7 @@ impl<P: Preset> Store<P> {
};

// > Check the block is valid and compute the post-state
let block_action = state_transition_for_gossip(parent)?;

if let Some(action) = block_action {
return Ok(Some(action));
}

Ok(None)
state_transition_for_gossip(parent)
}

pub fn validate_block_with_custom_state_transition(
Expand Down Expand Up @@ -1275,30 +1269,19 @@ impl<P: Preset> Store<P> {
},
);

let committee = if let Some(committee_bits) = aggregate.committee_bits() {
let committee_indices = misc::get_committee_indices::<P>(*committee_bits);
let index = aggregate
.committee_bits()
.and_then(|bits| misc::get_committee_indices::<P>(*bits).next())
.unwrap_or(index);

let mut committees = vec![];

for committee_index in committee_indices {
let committee = accessors::beacon_committee(&target_state, slot, committee_index)?;

committees.extend(committee);
}

committees.into()
} else {
accessors::beacon_committee(&target_state, slot, index)?
.into_iter()
.collect::<Box<[_]>>()
};
let committee = accessors::beacon_committee(&target_state, slot, index)?;

// > The aggregator's validator index is within the committee
ensure!(
committee.contains(&aggregator_index),
committee.into_iter().contains(&aggregator_index),
Error::AggregatorNotInCommittee {
aggregate_and_proof,
committee,
committee: committee.into_iter().collect(),
},
);

Expand Down Expand Up @@ -1333,12 +1316,12 @@ impl<P: Preset> Store<P> {
)?;

// https://github.com/ethereum/consensus-specs/pull/2847
let is_superset = self.aggregate_and_proof_supersets.check(&aggregate);
let is_subset_aggregate = !self.aggregate_and_proof_supersets.check(&aggregate);

Ok(AggregateAndProofAction::Accept {
aggregate_and_proof,
attesting_indices,
is_superset,
is_subset_aggregate,
})
}

Expand Down Expand Up @@ -1757,33 +1740,16 @@ impl<P: Preset> Store<P> {
return Ok(BlobSidecarAction::Ignore(true));
}

let mut state = self
let state = self
.state_cache
.before_or_at_slot(self, block_header.parent_root, block_header.slot)
.try_state_at_slot(self, block_header.parent_root, block_header.slot)?
.unwrap_or_else(|| {
self.chain_link(block_header.parent_root)
.or_else(|| self.chain_link_before_or_at(block_header.slot))
.map(|chain_link| chain_link.state(self))
.unwrap_or_else(|| self.head().state(self))
});

if state.slot() < block_header.slot {
if Feature::WarnOnStateCacheSlotProcessing.is_enabled() && self.is_forward_synced() {
// `Backtrace::force_capture` can be costly and a warning may be excessive,
// but this is controlled by a `Feature` that should be disabled by default.
warn!(
"processing slots for beacon state not found in state cache before state transition \
(block root: {:?}, from slot {} to {})\n{}",
block_header.parent_root,
state.slot(),
block_header.slot,
Backtrace::force_capture(),
);
}

combined::process_slots(&self.chain_config, state.make_mut(), block_header.slot)?;
}

// [REJECT] The proposer signature of blob_sidecar.signed_block_header, is valid with respect to the block_header.proposer_index pubkey.
SingleVerifier.verify_singular(
blob_sidecar
Expand Down
Loading

0 comments on commit 201219d

Please sign in to comment.