Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Blob not found #2478

Open
azzamsa opened this issue Jul 8, 2024 · 5 comments
Open

Blob not found #2478

azzamsa opened this issue Jul 8, 2024 · 5 comments

Comments

@azzamsa
Copy link
Contributor

azzamsa commented Jul 8, 2024

Hi, 👋

Using the code below, I can consistently reproduce Blob not found.

use std::str::FromStr;

use iroh::{
    base::base32,
    client::{docs::Entry, Doc, Iroh},
    docs::{store::Query, NamespaceSecret},
    node::Node,
};

use clap::{Parser, Subcommand};
use indicatif::HumanBytes;
use rand::Rng;
use tokio_stream::StreamExt;

pub static GLOBAL_NAMESPACE: &str = "q4hiommvh3ttec3x2y7h4le5tkx2tee762s6miu4rer6d2asi4la";

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    run().await?;
    Ok(())
}

async fn run() -> anyhow::Result<()> {
    let opts = Opts::parse();
    let storage_path = std::env::current_dir().unwrap().join("data");
    tokio::fs::create_dir_all(&storage_path).await?;

    // Initialize node
    let node = Node::persistent(&storage_path).await?.spawn().await?;
    let client = node.client();

    // Get author, `authors().default()` doesn't work
    let author_id = if let Some(first_author) = node.client().authors().list().await?.next().await {
        first_author?
    } else {
        client.authors().create().await?
    };

    match opts.cmd.as_ref() {
        Some(Command::Add { description }) => {
            let document = get_document(client).await?;

            let todo_id = gen_id().to_string();
            println!("- [ ] {}: {}", &todo_id, &description);

            // Write
            document
                .set_bytes(author_id, todo_id, description.to_owned())
                .await?;
            let document_id = document.id().to_string();

            println!("\ndocument id: {document_id}");
            println!("author id: {author_id}");
        }
        None => {
            let document = get_document(client).await?;
            let document_id = document.id().to_string();

            println!("::: Listing entry for document_id: {document_id}");
            let mut stream = document
                .get_many(Query::single_latest_per_key())
                .await
                .unwrap();
            while let Some(entry) = stream.try_next().await.unwrap() {
                println!("entry {}", fmt_entry(&entry));
                let content = entry.content_bytes(&document).await;
                println!(
                    "  content {:?}",
                    std::str::from_utf8(
                        &content
                            .map(|x| x.to_vec())
                            .unwrap_or_else(|e| format!("[Error! {e}]").into_bytes())
                    )
                    .unwrap()
                )
            }
        }
    }

    Ok(())
}

async fn get_document(client: &Iroh) -> anyhow::Result<Doc> {
    let document = client
        .docs()
        .import_namespace(iroh::docs::Capability::Write(
            NamespaceSecret::from_str(GLOBAL_NAMESPACE).unwrap(),
        ))
        .await?;
    println!("::: Using namespace '{GLOBAL_NAMESPACE}'");
    Ok(document)
}

fn fmt_entry(entry: &Entry) -> String {
    let id = entry.id();
    let key = std::str::from_utf8(id.key()).unwrap_or("<bad key>");
    let author = id.author().fmt_short();
    let hash = entry.content_hash();
    let hash = base32::fmt_short(hash.as_bytes());
    let len = HumanBytes(entry.content_len());
    format!("@{author}: {key} = {hash} ({len})",)
}

fn gen_id() -> i32 {
    let mut rng = rand::thread_rng();
    rng.gen_range(1..=90)
}

#[derive(Parser)]
#[command(name = "todos", version)]
pub struct Opts {
    #[command(subcommand)]
    pub cmd: Option<Command>,
}

#[derive(Subcommand)]
pub enum Command {
    /// Add a new todo task
    Add {
        #[arg(short, long)]
        description: String,
    },
}
[dependencies]
tokio = { version = "1.38.0", features = ["macros", "rt-multi-thread"] }
tokio-stream = "0.1.15"

iroh = "0.19.0"

anyhow = "1.0.86"
thiserror = "1.0"
clap = { version = "4.5.8", features = ["derive"] }
rand = "0.8.5"
indicatif = "0.17.8"
iroh-blobs = "0.19.0"

How to reproduce

🐡 ❯ cargo run -- add --description "Clean my room"
::: Using namespace 'q4hiommvh3ttec3x2y7h4le5tkx2tee762s6miu4rer6d2asi4la'
- [ ] 55: Clean my room

🐡 ❯ cargo run -- add --description "Clean my room"
::: Using namespace 'q4hiommvh3ttec3x2y7h4le5tkx2tee762s6miu4rer6d2asi4la'
- [ ] 43: Clean my room

🐡 ❯ cargo run --
::: Using namespace 'q4hiommvh3ttec3x2y7h4le5tkx2tee762s6miu4rer6d2asi4la'
::: Listing entry for document_id: wqgmhfsmbxwhjmouwnu5vndvt6mp6giqxngj3ox6u4cv2l4qlola
entry @sn5lby3yeaobojnr: 43 = hplv3aekrv7dmvah (13 B)
  content "Clean my room"
entry @sn5lby3yeaobojnr: 55 = hplv3aekrv7dmvah (13 B)
  content "Clean my room"

🐡 ❯ cargo r add --description "How does it work"
::: Using namespace 'q4hiommvh3ttec3x2y7h4le5tkx2tee762s6miu4rer6d2asi4la'
- [ ] 47: How does it work

document id: wqgmhfsmbxwhjmouwnu5vndvt6mp6giqxngj3ox6u4cv2l4qlola
author id: sn5lby3yeaobojnrhkzt7xlrg6oonpxeb3kfuxdsb5nbvath3m4a

🐡 ❯ cargo run --
::: Using namespace 'q4hiommvh3ttec3x2y7h4le5tkx2tee762s6miu4rer6d2asi4la'
::: Listing entry for document_id: wqgmhfsmbxwhjmouwnu5vndvt6mp6giqxngj3ox6u4cv2l4qlola
entry @sn5lby3yeaobojnr: 43 = hplv3aekrv7dmvah (13 B)
  content "Clean my room"
entry @sn5lby3yeaobojnr: 47 = 4bhitlnjfq23tbld (16 B)
  content "[Error! Blob not found]"
entry @sn5lby3yeaobojnr: 55 = hplv3aekrv7dmvah (13 B)
  content "Clean my room"

If you see, running a same command multiple times resulting in Blob not found.
Try to run cargo r add --description "How does it work" multiple times, then list the content using cargo run --, you will find content "[Error! Blob not found]".

Credits

@Frando
Copy link
Member

Frando commented Jul 9, 2024

Hi @azzamsa,
thank you for the detailed report, I can replicate the issue and am now digging in.

@Frando
Copy link
Member

Frando commented Jul 9, 2024

Hi,
I found the cause of your issue. You have to make sure to always call node.shutdown().await before exiting the process, otherwise pending write transactions in the blobs or docs store are maybe not yet persisted. A while ago we changed the stores to not flush the storage after each write, but only after ~500ms or if shutdown is called. This greatly improves perfomance of write-heavy workloads.

However, we really should clarify this in the docs that shutdown must be called to make sure operations are persisted. Or we find a a way to do this reliably on drop, however this is hairy because Rust doesn't have async drop (yet).

In any case, for now the fix to your example would look like this:

async fn run() -> anyhow::Result<()> {
    tracing_subscriber::fmt::init();
    let opts = Opts::parse();
    let storage_path = std::env::current_dir().unwrap().join("data");
    tokio::fs::create_dir_all(&storage_path).await?;

    // Initialize node
    let node = Node::persistent(&storage_path).await?.spawn().await?;
    let res = run_inner(opts, &node).await;
    // Shutdown the node to make sure all writes are flushed.
    if let Err(err) = node.shutdown().await {
        println!("Error during shutdown: {err:?}");
    }
    res
}
async fn run_inner<D: iroh::blobs::store::Store>(opts: Opts, node: &Node<D>) -> anyhow::Result<()> {
    let client = node.client();
   // your existing code from `fn run`
   Ok(())
}

@zicklag
Copy link

zicklag commented Jul 9, 2024

Awesome, thanks for finding that!

@azzamsa you really found a couple sneaky ones. 😁 That explains why I didn't run into the issue with Weird, since it's a server that doesn't exit immediately after it's done.

@Frando what do you think about printing a warning in Node, when it's dropped, if it hasn't been shutdown yet?

@ramfox ramfox moved this to 📋 Backlog in iroh Jul 10, 2024
@Frando
Copy link
Member

Frando commented Jul 10, 2024

@Frando what do you think about printing a warning in Node, when it's dropped, if it hasn't been shutdown yet?

Yes. In the team chat we agreed the current situation is not ideal. A warning is the minimum, ideally we should either allow to manually commit transactions, and/or shutdown automatically when the last instance of a node is dropped (which will be hairy because shutdown is async).

@azzamsa
Copy link
Contributor Author

azzamsa commented Jul 10, 2024

In any case, the fix for your example would look like this:

Wow, it worked like a charm!

I found the cause of your issue. You need to always call node.shutdown().await before exiting the process. Otherwise, pending write transactions in the blobs or docs store may not yet be persisted. We changed the stores a while ago to not flush the storage after each write but only after ~500ms or when shutdown is called. This greatly improves the performance of write-heavy workloads.

I should have reported this issue here instead of asking in the Discord servers. I've had this issue since I first started using Iroh. I attached the MWE in the Discord chat with steps to reproduce the issue, but I got the same error with 2-3 different approaches, and unfortunately, no one responded for several days. I should have come here and posted the MWE instead, as you fixed it in just 17 minutes.

you really found a couple sneaky ones. 😁 That explains why I didn't run into the issue with Weird, since it's a server that doesn't exit immediately after it's done.

Yeah. Finally, I can continue my work. This was a blocking issue for me. Also, thank you, @zicklag, for helping me through this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Status: No status
Development

No branches or pull requests

3 participants