Skip to content

Commit

Permalink
feat(iroh-bytes)!: refactor downloader queue and add progress reporti…
Browse files Browse the repository at this point in the history
…ng (#2085)

## Description

This PR contains changes to the downloader:

* Remove the `Role::Provider` vs `Role::Candidate` distinction. We added
this back when we did not have content propagation in docs, and it now
does not make much sense for our architecture. Either we know/assume a
node has something, or not. The "inbetween" state did not make sense
anymore IMO.
* Rework the queuing logic to be based on a simple queue of pending
downloads. Before, if a download could not be started because the
concurrenty limits were reached, the download was considered failed and
inserted, with a delay, into the retry queue. Now, if a download cannot
be started, we just wait until a slot frees up, and then start it. Note
that the queue is, for now, quite simple - if the next download in the
queue cannot be started (e.g. because all provider nodes are currently
busy with other downloads), we do not try to start the second-top
download in the queue, but instead wait until a slot is freed up. We
could certainly optimize this by "jumping the queue" in certain cases,
this would however also need more logic to make sure that downloads
cannot be "forgotten". Therefore, for now the queue is handled strictly
in order.
* The retry behavior is refactored: We now retry nodes (not downloads as
before) up to a retry limit, with an increasing timeout. If a download
can only proceed with a retrying node, it is parked and the next item in
the queue is processed. The download is unparked if the retrying node
successfully connects.
* Add progress reporting to downloads managed through the downloader.
For this I wrote a `SharedProgress` handler that allows to subscribe to
already running downloads: If an intent is registered for hash A, and
this download is started, and while it is running, another intent is
registered for the same hash, it will now receive an
`DownloadProgress::InitialState` which contains a `TransferProgress`
which functions as a reducer for the progress events This can be used
from the client even, and further events can be reduced/merged into that
struct. The PR contains a test for this concurrent progress reporting.
* Expose the downloader in the iroh node. Download requests via the RPC
API can now set a `DownloadMode` enum either to `Direct` or to `Queued`:
the former will behave as currently (issue an iroh-bytes request
directly, without a queue or concurrency limits) and the latter will add
the download to the downloader queue.

## Breaking changes

Changes in `iroh`:
* The `BlobDownloadRequest` has a new field `mode` to select between
direct and queued downloads, and now contains a list of `nodes` in place
of a single `node` before

Changes in `iroh_bytes`: 
* `Role` enum is removed
* `Downloader::queue` now takes a `DownloadRequest` with more options
than before
* `DownloadProgress` has a new variant `InitialState` which is emitted
when attaching to an already-running download
* `ConcurrencyLimits` gained a new field

Other changes:
* `SetTagOption` was moved from `iroh`  to `iroh-bytes`

## Notes & open questions

* Another followup improves content downloading in docs: #2127 .

* A few more tests around the queuing behavior would be great

* I have a partially done followup which adds a hook for content
discovery to the downloader

<!-- Any notes, remarks or open questions you have to make about the PR.
-->

## Change checklist

- [x] Self-review.
- [x] Documentation updates if relevant.
- [x] Tests if relevant.

---------

Co-authored-by: Friedel Ziegelmayer <[email protected]>
  • Loading branch information
Frando and dignifiedquire authored Apr 22, 2024
1 parent f508830 commit 93290e3
Show file tree
Hide file tree
Showing 25 changed files with 2,188 additions and 894 deletions.
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 25 additions & 3 deletions iroh-base/src/hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use bao_tree::blake3;
use postcard::experimental::max_size::MaxSize;
use serde::{de, Deserialize, Deserializer, Serialize, Serializer};

use crate::base32::{parse_array_hex_or_base32, HexOrBase32ParseError};
use crate::base32::{self, parse_array_hex_or_base32, HexOrBase32ParseError};

/// Hash type used throughout.
#[derive(PartialEq, Eq, Copy, Clone, Hash)]
Expand Down Expand Up @@ -54,6 +54,12 @@ impl Hash {
pub fn to_hex(&self) -> String {
self.0.to_hex().to_string()
}

/// Convert to a base32 string limited to the first 10 bytes for a friendly string
/// representation of the hash.
pub fn fmt_short(&self) -> String {
base32::fmt_short(self.as_bytes())
}
}

impl AsRef<[u8]> for Hash {
Expand Down Expand Up @@ -173,7 +179,18 @@ impl MaxSize for Hash {

/// A format identifier
#[derive(
Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize, Default, Debug, MaxSize,
Clone,
Copy,
PartialEq,
Eq,
PartialOrd,
Ord,
Serialize,
Deserialize,
Default,
Debug,
MaxSize,
Hash,
)]
pub enum BlobFormat {
/// Raw blob
Expand Down Expand Up @@ -205,7 +222,7 @@ impl BlobFormat {
}

/// A hash and format pair
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, MaxSize)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, MaxSize, Hash)]
pub struct HashAndFormat {
/// The hash
pub hash: Hash,
Expand Down Expand Up @@ -289,6 +306,11 @@ mod redb_support {
}

impl HashAndFormat {
/// Create a new hash and format pair.
pub fn new(hash: Hash, format: BlobFormat) -> Self {
Self { hash, format }
}

/// Create a new hash and format pair, using the default (raw) format.
pub fn raw(hash: Hash) -> Self {
Self {
Expand Down
4 changes: 3 additions & 1 deletion iroh-bytes/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ flume = "0.11"
futures = "0.3.25"
futures-buffered = "0.2.4"
genawaiter = { version = "0.99.1", features = ["futures03"] }
hashlink = { version = "0.9.0", optional = true }
hex = "0.4.3"
iroh-base = { version = "0.14.0", features = ["redb"], path = "../iroh-base" }
iroh-io = { version = "0.6.0", features = ["stats"] }
Expand All @@ -51,6 +52,7 @@ tracing-futures = "0.2.5"

[dev-dependencies]
http-body = "0.4.5"
iroh-bytes = { path = ".", features = ["downloader"] }
iroh-test = { path = "../iroh-test" }
proptest = "1.0.0"
serde_json = "1.0.107"
Expand All @@ -63,8 +65,8 @@ tempfile = "3.10.0"

[features]
default = ["fs-store"]
downloader = ["iroh-net", "parking_lot", "tokio-util/time", "hashlink"]
fs-store = ["reflink-copy", "redb", "redb_v1", "tempfile"]
downloader = ["iroh-net", "parking_lot", "tokio-util/time"]
metrics = ["iroh-metrics"]

[[example]]
Expand Down
Loading

0 comments on commit 93290e3

Please sign in to comment.