From 38e8ce0695504fe4d1c6ee27fcdbd9ded02a4c3b Mon Sep 17 00:00:00 2001 From: Franz Heinzmann Date: Mon, 24 Jun 2024 12:03:43 +0200 Subject: [PATCH] fix: do not panic on blobs db IO error (#2400) ## Description * We `unwrap`ed an `io::Result` in the RPC handler for `BlobsReadAtRequest`. Changed to return an error instead. * Report the different error cases that can occur differently on the client to help debugging. ## Breaking Changes ## Notes & open questions ## Change checklist - [x] Self-review. - [x] ~~Documentation updates if relevant.~~ - [x] ~~Tests if relevant.~~ - [x] All breaking changes documented. --- iroh/src/client/blobs.rs | 3 ++- iroh/src/node/rpc.rs | 23 +++++++---------------- 2 files changed, 9 insertions(+), 17 deletions(-) diff --git a/iroh/src/client/blobs.rs b/iroh/src/client/blobs.rs index 861060b751..e199e0f8d1 100644 --- a/iroh/src/client/blobs.rs +++ b/iroh/src/client/blobs.rs @@ -798,7 +798,8 @@ impl Reader { let (size, is_complete) = match stream.next().await { Some(Ok(BlobReadAtResponse::Entry { size, is_complete })) => (size, is_complete), Some(Err(err)) => return Err(err), - None | Some(Ok(_)) => return Err(anyhow!("Expected header frame")), + Some(Ok(_)) => return Err(anyhow!("Expected header frame, but got data frame")), + None => return Err(anyhow!("Expected header frame, but RPC stream was dropped")), }; let stream = stream.map(|item| match item { diff --git a/iroh/src/node/rpc.rs b/iroh/src/node/rpc.rs index cc79655e87..2c9b49fe38 100644 --- a/iroh/src/node/rpc.rs +++ b/iroh/src/node/rpc.rs @@ -910,27 +910,18 @@ impl Handler { let (tx, rx) = flume::bounded(RPC_BLOB_GET_CHANNEL_CAP); let db = self.inner.db.clone(); self.inner.rt.spawn_pinned(move || async move { - let entry = db.get(&req.hash).await.unwrap(); - if let Err(err) = read_loop( - req.offset, - req.len, - entry, - tx.clone(), - RPC_BLOB_GET_CHUNK_SIZE, - ) - .await - { + if let Err(err) = read_loop(req, db, tx.clone(), RPC_BLOB_GET_CHUNK_SIZE).await { tx.send_async(RpcResult::Err(err.into())).await.ok(); } }); - async fn read_loop( - offset: u64, - len: Option, - entry: Option, + async fn read_loop( + req: BlobReadAtRequest, + db: D, tx: flume::Sender>, max_chunk_size: usize, ) -> anyhow::Result<()> { + let entry = db.get(&req.hash).await?; let entry = entry.ok_or_else(|| anyhow!("Blob not found"))?; let size = entry.size(); tx.send_async(Ok(BlobReadAtResponse::Entry { @@ -940,7 +931,7 @@ impl Handler { .await?; let mut reader = entry.data_reader().await?; - let len = len.unwrap_or((size.value() - offset) as usize); + let len = req.len.unwrap_or((size.value() - req.offset) as usize); let (num_chunks, chunk_size) = if len <= max_chunk_size { (1, len) @@ -957,7 +948,7 @@ impl Handler { } else { chunk_size }; - let chunk = reader.read_at(offset + read, chunk_size).await?; + let chunk = reader.read_at(req.offset + read, chunk_size).await?; let chunk_len = chunk.len(); if !chunk.is_empty() { tx.send_async(Ok(BlobReadAtResponse::Data { chunk }))