Skip to content

Commit

Permalink
Add direct offer w/ running census process to state bridge (#1422)
Browse files Browse the repository at this point in the history
* feat(portal-bridge): add direct offer / census feature to state bridge
  • Loading branch information
njgheorghita authored Sep 12, 2024
1 parent aab46ab commit e58bcb4
Show file tree
Hide file tree
Showing 12 changed files with 528 additions and 220 deletions.
18 changes: 15 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ async-trait = "0.1.68"
bytes = "1.3.0"
chrono = "0.4.38"
clap = { version = "4.2.1", features = ["derive"] }
delay_map = "0.4.0"
directories = "3.0"
discv5 = { version = "0.4.1", features = ["serde"] }
e2store = { path = "e2store" }
Expand Down
1 change: 1 addition & 0 deletions portal-bridge/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ anyhow.workspace = true
async-trait.workspace = true
chrono.workspace = true
clap.workspace = true
delay_map.workspace = true
discv5.workspace = true
e2store.workspace = true
eth_trie.workspace = true
Expand Down
13 changes: 7 additions & 6 deletions portal-bridge/src/bridge/era1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ pub struct Era1Bridge {
pub era1_files: Vec<String>,
pub http_client: Client,
pub metrics: BridgeMetricsReporter,
pub gossip_limit: usize,
// Semaphore used to limit the amount of active gossip transfers
// to make sure we don't overwhelm the trin client
pub gossip_semaphore: Arc<Semaphore>,
pub execution_api: ExecutionApi,
}

Expand All @@ -69,6 +71,7 @@ impl Era1Bridge {
.try_into()?;
let era1_files = get_shuffled_era1_files(&http_client).await?;
let metrics = BridgeMetricsReporter::new("era1".to_string(), &format!("{mode:?}"));
let gossip_semaphore = Arc::new(Semaphore::new(gossip_limit));
Ok(Self {
mode,
portal_client,
Expand All @@ -77,7 +80,7 @@ impl Era1Bridge {
era1_files,
http_client,
metrics,
gossip_limit,
gossip_semaphore,
execution_api,
})
}
Expand Down Expand Up @@ -228,9 +231,6 @@ impl Era1Bridge {

async fn gossip_era1(&self, era1_path: String, gossip_range: Option<Range<u64>>, hunt: bool) {
info!("Processing era1 file at path: {era1_path:?}");
// We are using a semaphore to limit the amount of active gossip transfers to make sure
// we don't overwhelm the trin client
let gossip_send_semaphore = Arc::new(Semaphore::new(self.gossip_limit));

let raw_era1 = self
.http_client
Expand Down Expand Up @@ -263,7 +263,8 @@ impl Era1Bridge {
continue;
}
}
let permit = gossip_send_semaphore
let permit = self
.gossip_semaphore
.clone()
.acquire_owned()
.await
Expand Down
23 changes: 14 additions & 9 deletions portal-bridge/src/bridge/history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ pub struct HistoryBridge {
pub header_oracle: HeaderOracle,
pub epoch_acc_path: PathBuf,
pub metrics: BridgeMetricsReporter,
pub gossip_limit: usize,
// Semaphore used to limit the amount of active gossip transfers
// to make sure we don't overwhelm the trin client
pub gossip_semaphore: Arc<Semaphore>,
}

impl HistoryBridge {
Expand All @@ -55,14 +57,15 @@ impl HistoryBridge {
gossip_limit: usize,
) -> Self {
let metrics = BridgeMetricsReporter::new("history".to_string(), &format!("{mode:?}"));
let gossip_semaphore = Arc::new(Semaphore::new(gossip_limit));
Self {
mode,
portal_client,
execution_api,
header_oracle,
epoch_acc_path,
metrics,
gossip_limit,
gossip_semaphore,
}
}
}
Expand Down Expand Up @@ -177,10 +180,6 @@ impl HistoryBridge {
// epoch_acc gets set on the first iteration of the loop
let mut current_epoch_index = u64::MAX;

// We are using a semaphore to limit the amount of active gossip transfers to make sure we
// don't overwhelm the trin client
let gossip_send_semaphore = Arc::new(Semaphore::new(self.gossip_limit));

info!("fetching headers in range: {gossip_range:?}");
let mut epoch_acc = None;
let mut serve_full_block_handles = vec![];
Expand All @@ -203,9 +202,7 @@ impl HistoryBridge {
} else if height > MERGE_BLOCK_NUMBER {
epoch_acc = None;
}
let permit = gossip_send_semaphore.clone().acquire_owned().await.expect(
"acquire_owned() can only error on semaphore close, this should be impossible",
);
let permit = self.acquire_gossip_permit().await;
self.metrics.report_current_block(height as i64);
serve_full_block_handles.push(Self::spawn_serve_full_block(
height,
Expand Down Expand Up @@ -408,4 +405,12 @@ impl HistoryBridge {
metrics.stop_process_timer(timer);
result
}

async fn acquire_gossip_permit(&self) -> OwnedSemaphorePermit {
self.gossip_semaphore
.clone()
.acquire_owned()
.await
.expect("to be able to acquire semaphore")
}
}
Loading

0 comments on commit e58bcb4

Please sign in to comment.