Skip to content

Commit

Permalink
fix: do not panic on blobs db IO error (#2400)
Browse files Browse the repository at this point in the history
## Description

* We `unwrap`ed an `io::Result` in the RPC handler for
`BlobsReadAtRequest`. Changed to return an error instead.
* Report the different error cases that can occur differently on the
client to help debugging.

## Breaking Changes

<!-- Optional, if there are any breaking changes document them,
including how to migrate older code. -->

## Notes & open questions

<!-- Any notes, remarks or open questions you have to make about the PR.
-->

## Change checklist

- [x] Self-review.
- [x] ~~Documentation updates if relevant.~~
- [x] ~~Tests if relevant.~~
- [x] All breaking changes documented.
  • Loading branch information
Frando authored Jun 24, 2024
1 parent aba70ea commit 38e8ce0
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 17 deletions.
3 changes: 2 additions & 1 deletion iroh/src/client/blobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -798,7 +798,8 @@ impl Reader {
let (size, is_complete) = match stream.next().await {
Some(Ok(BlobReadAtResponse::Entry { size, is_complete })) => (size, is_complete),
Some(Err(err)) => return Err(err),
None | Some(Ok(_)) => return Err(anyhow!("Expected header frame")),
Some(Ok(_)) => return Err(anyhow!("Expected header frame, but got data frame")),
None => return Err(anyhow!("Expected header frame, but RPC stream was dropped")),
};

let stream = stream.map(|item| match item {
Expand Down
23 changes: 7 additions & 16 deletions iroh/src/node/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -910,27 +910,18 @@ impl<D: BaoStore> Handler<D> {
let (tx, rx) = flume::bounded(RPC_BLOB_GET_CHANNEL_CAP);
let db = self.inner.db.clone();
self.inner.rt.spawn_pinned(move || async move {
let entry = db.get(&req.hash).await.unwrap();
if let Err(err) = read_loop(
req.offset,
req.len,
entry,
tx.clone(),
RPC_BLOB_GET_CHUNK_SIZE,
)
.await
{
if let Err(err) = read_loop(req, db, tx.clone(), RPC_BLOB_GET_CHUNK_SIZE).await {
tx.send_async(RpcResult::Err(err.into())).await.ok();
}
});

async fn read_loop(
offset: u64,
len: Option<usize>,
entry: Option<impl MapEntry>,
async fn read_loop<D: iroh_blobs::store::Store>(
req: BlobReadAtRequest,
db: D,
tx: flume::Sender<RpcResult<BlobReadAtResponse>>,
max_chunk_size: usize,
) -> anyhow::Result<()> {
let entry = db.get(&req.hash).await?;
let entry = entry.ok_or_else(|| anyhow!("Blob not found"))?;
let size = entry.size();
tx.send_async(Ok(BlobReadAtResponse::Entry {
Expand All @@ -940,7 +931,7 @@ impl<D: BaoStore> Handler<D> {
.await?;
let mut reader = entry.data_reader().await?;

let len = len.unwrap_or((size.value() - offset) as usize);
let len = req.len.unwrap_or((size.value() - req.offset) as usize);

let (num_chunks, chunk_size) = if len <= max_chunk_size {
(1, len)
Expand All @@ -957,7 +948,7 @@ impl<D: BaoStore> Handler<D> {
} else {
chunk_size
};
let chunk = reader.read_at(offset + read, chunk_size).await?;
let chunk = reader.read_at(req.offset + read, chunk_size).await?;
let chunk_len = chunk.len();
if !chunk.is_empty() {
tx.send_async(Ok(BlobReadAtResponse::Data { chunk }))
Expand Down

0 comments on commit 38e8ce0

Please sign in to comment.