Skip to content

Commit

Permalink
Fix deadlock and race condition in proposal fetching
Browse files Browse the repository at this point in the history
* Change from broadcast channel to multi-consumer channel. This means
  only one fetcher task will receive each proposal to be fetched, which
  is the actual behavior we want. Before, with broadcast, we had multiple
  fetchers always fetching the same proposal, which is why we saw race
  conditions causing database serialization errors. It should now be
  possible to reenable multiple workers.
* Use an unbounded channel. This prevents a deadlock where a consumer
  sends back into the channel (e.g. to recursively fetch the parent of
  the proposal it had just fetched), but the channel is full, blocking
  the consumer, the very task responsible for clearing the blockage.
  • Loading branch information
jbearer committed Dec 9, 2024
1 parent 006ce0c commit 7ffbd17
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 23 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ ark-poly = "0.4"
ark-serialize = "0.4"
ark-srs = "0.3.1"
async-broadcast = "0.7.0"
async-channel = "2"
async-lock = "3"
async-once-cell = "0.5"
async-trait = "0.1"
Expand Down
8 changes: 8 additions & 0 deletions justfile
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,14 @@ demo *args:
demo-native *args: build
scripts/demo-native {{args}}

lint:
#!/usr/bin/env bash
set -euxo pipefail
# Use the same target dir for both `clippy` invocations
export CARGO_TARGET_DIR=${CARGO_TARGET_DIR:-target}
cargo clippy --workspace --features testing --all-targets -- -D warnings
cargo clippy --workspace --all-targets --manifest-path sequencer-sqlite/Cargo.toml -- -D warnings
build:
#!/usr/bin/env bash
set -euxo pipefail
Expand Down
4 changes: 2 additions & 2 deletions sequencer-sqlite/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion sequencer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ vergen = { workspace = true }
anyhow = { workspace = true }
ark-ff = { workspace = true }
ark-serialize = { workspace = true, features = ["derive"] }
async-broadcast = { workspace = true }
async-channel = { workspace = true }
async-lock = { workspace = true }
async-once-cell = { workspace = true }
async-trait = { workspace = true }
Expand Down
33 changes: 14 additions & 19 deletions sequencer/src/context.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{fmt::Display, sync::Arc};

use anyhow::Context;
use async_broadcast::{broadcast, Receiver, Sender};
use async_channel::{Receiver, Sender};
use async_lock::RwLock;
use clap::Parser;
use committable::Commitment;
Expand Down Expand Up @@ -64,13 +64,6 @@ pub struct ProposalFetcherConfig {
)]
pub num_workers: usize,

#[clap(
long = "proposal-fetcher-channel-capacity",
env = "ESPRESSO_SEQUENCER_PROPOSAL_FETCHER_CHANNEL_CAPACITY",
default_value = "100"
)]
pub channel_capacity: usize,

#[clap(
long = "proposal-fetcher-fetch-timeout",
env = "ESPRESSO_SEQUENCER_PROPOSAL_FETCHER_FETCH_TIMEOUT",
Expand Down Expand Up @@ -245,14 +238,18 @@ impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence, V: Versions> Sequence
};

// Spawn proposal fetching tasks.
let (send, recv) = broadcast(proposal_fetcher_cfg.channel_capacity);
ctx.spawn("proposal scanner", scan_proposals(ctx.handle.clone(), send));
let (send, recv) = async_channel::unbounded();
ctx.spawn(
"proposal scanner",
scan_proposals(ctx.handle.clone(), send.clone()),
);
for i in 0..proposal_fetcher_cfg.num_workers {
ctx.spawn(
format!("proposal fetcher {i}"),
fetch_proposals(
ctx.handle.clone(),
persistence.clone(),
send.clone(),
recv.clone(),
proposal_fetcher_cfg.fetch_timeout,
),
Expand Down Expand Up @@ -493,26 +490,24 @@ async fn scan_proposals<N, P, V>(
// to the anchor. This allows state replay from the decided state.
let parent_view = proposal.data.justify_qc.view_number;
let parent_leaf = proposal.data.justify_qc.data.leaf_commit;
fetcher
.broadcast_direct((parent_view, parent_leaf))
.await
.ok();
fetcher.send((parent_view, parent_leaf)).await.ok();
}
}

#[tracing::instrument(skip_all)]
async fn fetch_proposals<N, P, V>(
consensus: Arc<RwLock<Consensus<N, P, V>>>,
persistence: Arc<impl SequencerPersistence>,
mut scanner: Receiver<(ViewNumber, Commitment<Leaf2<SeqTypes>>)>,
sender: Sender<(ViewNumber, Commitment<Leaf2<SeqTypes>>)>,
receiver: Receiver<(ViewNumber, Commitment<Leaf2<SeqTypes>>)>,
fetch_timeout: Duration,
) where
N: ConnectedNetwork<PubKey>,
P: SequencerPersistence,
V: Versions,
{
let sender = scanner.new_sender();
while let Some((view, leaf)) = scanner.next().await {
let mut receiver = std::pin::pin!(receiver);
while let Some((view, leaf)) = receiver.next().await {
let span = tracing::warn_span!("fetch proposal", ?view, %leaf);
let res: anyhow::Result<()> = async {
let anchor_view = load_anchor_view(&*persistence).await;
Expand All @@ -527,7 +522,7 @@ async fn fetch_proposals<N, P, V>(
// parent.
let view = proposal.data.justify_qc.view_number;
let leaf = proposal.data.justify_qc.data.leaf_commit;
sender.broadcast_direct((view, leaf)).await.ok();
sender.send((view, leaf)).await.ok();
return Ok(());
}
Err(err) => {
Expand Down Expand Up @@ -579,7 +574,7 @@ async fn fetch_proposals<N, P, V>(

// If we fail fetching the proposal, don't let it clog up the fetching task. Just push
// it back onto the queue and move onto the next proposal.
sender.broadcast_direct((view, leaf)).await.ok();
sender.send((view, leaf)).await.ok();
}
}
}
Expand Down

0 comments on commit 7ffbd17

Please sign in to comment.