Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(tenderdash-abci): deadlock on multiple msgs received #41

Merged
merged 1 commit into from
Oct 5, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 103 additions & 8 deletions abci/src/server/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,15 @@ use bytes::{Buf, BufMut, BytesMut};
use futures::{SinkExt, StreamExt};
use prost::Message;
use proto::abci::{Request, Response};
use tokio::sync::{
mpsc::{self, Receiver, Sender},
Mutex,
use tokio::{
io::{AsyncRead, AsyncWrite},
sync::{
mpsc::{self, Receiver, Sender},
Mutex,
},
};
use tokio_util::{
codec::{Decoder, Encoder},
codec::{Decoder, Encoder, Framed},
net::Listener,
};

Expand Down Expand Up @@ -63,7 +66,7 @@ impl<'a> Codec {
async fn worker<L>(
listener: Arc<Mutex<L>>,
request_tx: Sender<proto::abci::Request>,
mut response_rx: Receiver<proto::abci::Response>,
response_rx: Receiver<proto::abci::Response>,
cancel: CancellationToken,
) where
L: Listener + Send + Sync,
Expand All @@ -87,13 +90,24 @@ impl<'a> Codec {
tracing::info!(?address, "accepted connection");

let stream = Box::pin(stream);
let mut codec = tokio_util::codec::Framed::new(stream, Coder {});
let codec = Framed::new(stream, Coder {});

Self::process_worker_queues(codec, request_tx, response_rx, cancel).await;
}
async fn process_worker_queues<L: AsyncRead + AsyncWrite + Unpin>(
mut codec: Framed<L, Coder>,
request_tx: Sender<proto::abci::Request>,
mut response_rx: Receiver<proto::abci::Response>,
cancel: CancellationToken,
) {
loop {
tokio::select! {
request = codec.next() => match request {
// Only read next message if we have capacity in request_tx to process it.
// Otherwise, we might block the codec worker on request_tx.send() and never
// process the next message from the response_rx stream.
request = codec.next(), if request_tx.capacity() > 0 => match request {
Some(Ok(i)) => {
if let Err(error) = request_tx.send(i).await {
if let Err(error) = request_tx.try_send(i) {
tracing::error!(?error, "unable to forward request for processing");
cancel.cancel();
}
Expand Down Expand Up @@ -189,3 +203,84 @@ impl Encoder<proto::abci::Response> for Coder {
Ok(())
}
}

#[cfg(test)]
mod test {
use prost::Message;
use tenderdash_proto::abci;
use tokio::{io::AsyncWriteExt, sync::mpsc};
use tokio_util::sync::CancellationToken;

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
/// Test if a bug in the codec receiving 2 requests without a response in
/// between is fixed.
async fn test_codec_msg_msg_resp() {
tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.with_ansi(true)
.try_init()
.ok();

let (request_tx, mut request_rx) = mpsc::channel::<abci::Request>(1);
let (response_tx, response_rx) = mpsc::channel::<abci::Response>(1);
let cancel = CancellationToken::new();

let (mut client, server) = tokio::io::duplex(10240);

let codec = tokio_util::codec::Framed::new(server, super::Coder {});

let worker_cancel = cancel.clone();
let hdl = tokio::spawn(
async move {
super::Codec::process_worker_queues(codec, request_tx, response_rx, worker_cancel)
}
.await,
);

// We send 2 requests over the wire
for n_requests in 0..5 {
let encoded = abci::Request {
value: Some(abci::request::Value::Echo(abci::RequestEcho {
message: format!("hello {}", n_requests),
})),
}
.encode_length_delimited_to_vec();

client.write_all(&encoded).await.unwrap();
}

// Now, wait till the codec has processed the requests
// The bug we fixed was that the codec would not process the second request
// until a response was sent.
// If the bug is still present, the test should report error here.
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;

// Then, we read one request
tracing::debug!("MAIN THREAD: reading request 1");
request_rx.recv().await.expect("dequeue request 1");
tracing::debug!("MAIN THREAD: dequeued request 1");

// Then, we send a response
tracing::debug!("MAIN THREAD: sending response 1");
response_tx
.send(abci::Response {
value: Some(abci::response::Value::Echo(abci::ResponseEcho {
message: "hello".to_string(),
})),
})
.await
.expect("enqueue response 1");
tracing::debug!("MAIN THREAD: enqueued response 1");

// Then, we read second request
tracing::debug!("MAIN THREAD: reading request 2");
request_rx.recv().await.expect("dequeue request 2");
tracing::debug!("MAIN THREAD: dequeued request 2");

// Success :)

// Cleanup
cancel.cancel();
hdl.await.unwrap();
}
}
Loading