From 7ffbd1778236db898e2aff0f58c8f1844fe08e4f Mon Sep 17 00:00:00 2001 From: Jeb Bearer Date: Mon, 9 Dec 2024 10:01:05 -0800 Subject: [PATCH] Fix deadlock and race condition in proposal fetching * Change from broadcast channel to multi-consumer channel. This means only one fetcher task will receive each proposal to be fetched, which is the actual behavior we want. Before, with broadcast, we had multiple fetchers always fetching the same proposal, which is why we saw race conditions causing database serialization errors. It should now be possible to reenable multiple workers. * Use an unbounded channel. This prevents a deadlock where a consumer sends back into the channel (e.g. to recursively fetch the parent of the proposal it had just fetched), but the channel is full, blocking the consumer, the very task responsible for clearing the blockage. --- Cargo.lock | 2 +- Cargo.toml | 1 + justfile | 8 ++++++++ sequencer-sqlite/Cargo.lock | 4 ++-- sequencer/Cargo.toml | 2 +- sequencer/src/context.rs | 33 ++++++++++++++------------------- 6 files changed, 27 insertions(+), 23 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 727655059..9a06129d8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8595,7 +8595,7 @@ dependencies = [ "anyhow", "ark-ff", "ark-serialize", - "async-broadcast", + "async-channel 2.3.1", "async-lock 3.4.0", "async-once-cell", "async-trait", diff --git a/Cargo.toml b/Cargo.toml index 011701805..12738fd66 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -40,6 +40,7 @@ ark-poly = "0.4" ark-serialize = "0.4" ark-srs = "0.3.1" async-broadcast = "0.7.0" +async-channel = "2" async-lock = "3" async-once-cell = "0.5" async-trait = "0.1" diff --git a/justfile b/justfile index 5d7521a92..ef2f31483 100644 --- a/justfile +++ b/justfile @@ -10,6 +10,14 @@ demo *args: demo-native *args: build scripts/demo-native {{args}} +lint: + #!/usr/bin/env bash + set -euxo pipefail + # Use the same target dir for both `clippy` invocations + export CARGO_TARGET_DIR=${CARGO_TARGET_DIR:-target} + cargo clippy --workspace --features testing --all-targets -- -D warnings + cargo clippy --workspace --all-targets --manifest-path sequencer-sqlite/Cargo.toml -- -D warnings + build: #!/usr/bin/env bash set -euxo pipefail diff --git a/sequencer-sqlite/Cargo.lock b/sequencer-sqlite/Cargo.lock index ad03aad19..b2f2e85e6 100644 --- a/sequencer-sqlite/Cargo.lock +++ b/sequencer-sqlite/Cargo.lock @@ -4083,7 +4083,7 @@ dependencies = [ [[package]] name = "hotshot-query-service" version = "0.1.75" -source = "git+https://github.com/EspressoSystems/hotshot-query-service?branch=hotshot/0.5.82#5e2c984d19da3826f4cc8d80c5cf1a84dcd377f7" +source = "git+https://github.com/EspressoSystems/hotshot-query-service?tag=v0.1.75#dffefa160f441a663723a67bc54efedb11a88b02" dependencies = [ "anyhow", "ark-serialize", @@ -8319,7 +8319,7 @@ dependencies = [ "anyhow", "ark-ff", "ark-serialize", - "async-broadcast", + "async-channel 2.3.1", "async-lock 3.4.0", "async-once-cell", "async-trait", diff --git a/sequencer/Cargo.toml b/sequencer/Cargo.toml index 5fb19d8f6..f9948fc79 100644 --- a/sequencer/Cargo.toml +++ b/sequencer/Cargo.toml @@ -43,7 +43,7 @@ vergen = { workspace = true } anyhow = { workspace = true } ark-ff = { workspace = true } ark-serialize = { workspace = true, features = ["derive"] } -async-broadcast = { workspace = true } +async-channel = { workspace = true } async-lock = { workspace = true } async-once-cell = { workspace = true } async-trait = { workspace = true } diff --git a/sequencer/src/context.rs b/sequencer/src/context.rs index 06e27e3b1..c14b7454a 100644 --- a/sequencer/src/context.rs +++ b/sequencer/src/context.rs @@ -1,7 +1,7 @@ use std::{fmt::Display, sync::Arc}; use anyhow::Context; -use async_broadcast::{broadcast, Receiver, Sender}; +use async_channel::{Receiver, Sender}; use async_lock::RwLock; use clap::Parser; use committable::Commitment; @@ -64,13 +64,6 @@ pub struct ProposalFetcherConfig { )] pub num_workers: usize, - #[clap( - long = "proposal-fetcher-channel-capacity", - env = "ESPRESSO_SEQUENCER_PROPOSAL_FETCHER_CHANNEL_CAPACITY", - default_value = "100" - )] - pub channel_capacity: usize, - #[clap( long = "proposal-fetcher-fetch-timeout", env = "ESPRESSO_SEQUENCER_PROPOSAL_FETCHER_FETCH_TIMEOUT", @@ -245,14 +238,18 @@ impl, P: SequencerPersistence, V: Versions> Sequence }; // Spawn proposal fetching tasks. - let (send, recv) = broadcast(proposal_fetcher_cfg.channel_capacity); - ctx.spawn("proposal scanner", scan_proposals(ctx.handle.clone(), send)); + let (send, recv) = async_channel::unbounded(); + ctx.spawn( + "proposal scanner", + scan_proposals(ctx.handle.clone(), send.clone()), + ); for i in 0..proposal_fetcher_cfg.num_workers { ctx.spawn( format!("proposal fetcher {i}"), fetch_proposals( ctx.handle.clone(), persistence.clone(), + send.clone(), recv.clone(), proposal_fetcher_cfg.fetch_timeout, ), @@ -493,10 +490,7 @@ async fn scan_proposals( // to the anchor. This allows state replay from the decided state. let parent_view = proposal.data.justify_qc.view_number; let parent_leaf = proposal.data.justify_qc.data.leaf_commit; - fetcher - .broadcast_direct((parent_view, parent_leaf)) - .await - .ok(); + fetcher.send((parent_view, parent_leaf)).await.ok(); } } @@ -504,15 +498,16 @@ async fn scan_proposals( async fn fetch_proposals( consensus: Arc>>, persistence: Arc, - mut scanner: Receiver<(ViewNumber, Commitment>)>, + sender: Sender<(ViewNumber, Commitment>)>, + receiver: Receiver<(ViewNumber, Commitment>)>, fetch_timeout: Duration, ) where N: ConnectedNetwork, P: SequencerPersistence, V: Versions, { - let sender = scanner.new_sender(); - while let Some((view, leaf)) = scanner.next().await { + let mut receiver = std::pin::pin!(receiver); + while let Some((view, leaf)) = receiver.next().await { let span = tracing::warn_span!("fetch proposal", ?view, %leaf); let res: anyhow::Result<()> = async { let anchor_view = load_anchor_view(&*persistence).await; @@ -527,7 +522,7 @@ async fn fetch_proposals( // parent. let view = proposal.data.justify_qc.view_number; let leaf = proposal.data.justify_qc.data.leaf_commit; - sender.broadcast_direct((view, leaf)).await.ok(); + sender.send((view, leaf)).await.ok(); return Ok(()); } Err(err) => { @@ -579,7 +574,7 @@ async fn fetch_proposals( // If we fail fetching the proposal, don't let it clog up the fetching task. Just push // it back onto the queue and move onto the next proposal. - sender.broadcast_direct((view, leaf)).await.ok(); + sender.send((view, leaf)).await.ok(); } } }