diff --git a/Cargo.lock b/Cargo.lock index 85e9f8f..30ef9f1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -355,7 +355,7 @@ dependencies = [ "futures-lite", "genawaiter", "iroh-blake3", - "iroh-io 0.6.0", + "iroh-io", "positioned-io", "range-collections", "self_cell", @@ -1378,6 +1378,30 @@ dependencies = [ "hashbrown", ] +[[package]] +name = "headers" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "322106e6bd0cba2d5ead589ddb8150a13d7c4217cf80d7c4f682ca994ccc6aa9" +dependencies = [ + "base64 0.21.7", + "bytes", + "headers-core", + "http 1.1.0", + "httpdate", + "mime", + "sha1", +] + +[[package]] +name = "headers-core" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "54b4a22553d4242c49fddb9ba998a99962b5cc6f22cb5a3482bec22522403ce4" +dependencies = [ + "http 1.1.0", +] + [[package]] name = "heck" version = "0.4.1" @@ -1764,7 +1788,7 @@ dependencies = [ "iroh-base", "iroh-bytes", "iroh-gossip", - "iroh-io 0.6.0", + "iroh-io", "iroh-metrics", "iroh-net", "iroh-sync", @@ -1843,7 +1867,7 @@ dependencies = [ "hashlink", "hex", "iroh-base", - "iroh-io 0.6.0", + "iroh-io", "iroh-metrics", "iroh-net", "num_cpus", @@ -1892,19 +1916,6 @@ dependencies = [ "tracing", ] -[[package]] -name = "iroh-io" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1fd67e386f948a6f09e71057b48fff51b6414f0080997495b5bdf2d1bdcdbe46" -dependencies = [ - "bytes", - "futures", - "pin-project", - "smallvec", - "tokio", -] - [[package]] name = "iroh-io" version = "0.6.0" @@ -2091,9 +2102,9 @@ checksum = "01cda141df6706de531b6c46c3a33ecca755538219bd484262fa09410c13539c" [[package]] name = "lock_api" -version = "0.4.11" +version = "0.4.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3c168f8615b12bc01f9c17e2eb0cc07dcae1940121185446edc3744920e8ef45" +checksum = "07af8b9cdd281b7915f413fa73f29ebd5d55d0d3f0155584dade1ff18cea1b17" dependencies = [ "autocfg", "scopeguard", @@ -2529,9 +2540,9 @@ checksum = "bb813b8af86854136c6922af0598d719255ecb2179515e6e7730d468f05c9cae" [[package]] name = "parking_lot" -version = "0.12.1" +version = "0.12.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f" +checksum = "7e4af0ca4f6caed20e900d564c242b8e5d4903fdacf31d3daf527b66fe6f42fb" dependencies = [ "lock_api", "parking_lot_core", @@ -2539,15 +2550,15 @@ dependencies = [ [[package]] name = "parking_lot_core" -version = "0.9.9" +version = "0.9.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c42a9226546d68acdd9c0a280d17ce19bfe27a46bf68784e4066115788d008e" +checksum = "1e401f977ab385c9e4e3ab30627d6f26d00e2c73eef317493c4ec6d468726cf8" dependencies = [ "cfg-if", "libc", "redox_syscall", "smallvec", - "windows-targets 0.48.5", + "windows-targets 0.52.5", ] [[package]] @@ -3157,11 +3168,11 @@ dependencies = [ [[package]] name = "redox_syscall" -version = "0.4.1" +version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4722d768eff46b75989dd134e5c353f0d6296e5aaa3132e776cbdb56be7731aa" +checksum = "469052894dcb553421e483e4209ee581a45100d31b4018de03e5a7ad86374a7e" dependencies = [ - "bitflags 1.3.2", + "bitflags 2.5.0", ] [[package]] @@ -4388,11 +4399,16 @@ dependencies = [ "async-stream", "axum", "bisection", + "bytes", "clap", + "flume", "futures", + "headers", + "hyper 1.3.1", "iroh", "iroh-base", - "iroh-io 0.4.0", + "iroh-bytes", + "iroh-io", "lru", "md5", "mime_guess", @@ -4400,6 +4416,7 @@ dependencies = [ "quic-rpc", "quinn", "rand 0.9.0-alpha.1", + "range-collections", "redb 2.1.0", "serde", "serde_yaml", diff --git a/Cargo.toml b/Cargo.toml index 15e983e..fc65f6e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,12 +17,14 @@ async-stream = "0.3" axum = "0.7" bisection = "0.1" clap = { version = "4.5", features = ["derive"] } +flume = "0.11" iroh = { version = "0.14.0", branch = "main", git = "https://github.com/izihawa/iroh", features = [ "metrics" ] } iroh-base = { version = "0.14.0", branch = "main", git = "https://github.com/izihawa/iroh" } -iroh-io = { version = "0.4.0" } +iroh-bytes = { version = "0.14.0", branch = "main", git = "https://github.com/izihawa/iroh" } +iroh-io = { version = "0.6.0" } md5 = "0.7" percent-encoding = "2.3.1" -quic-rpc = "0.7" +quic-rpc = { version = "0.7.0", features = ["flume-transport", "quinn-transport"] } quinn = "0.10" rand = "0.9.0-alpha.1" redb = { version = "2.0.0", features = ["logging"] } @@ -40,3 +42,7 @@ lru = "0.12" tokio-task-pool = "0.1" url = "2.5.0" mime_guess = "2.0.4" +bytes = "1.6.0" +hyper = "1.3.1" +headers = "0.4.0" +range-collections = "0.4.5" diff --git a/bin/main.rs b/bin/main.rs index 32efc86..e5ae4a4 100644 --- a/bin/main.rs +++ b/bin/main.rs @@ -9,6 +9,8 @@ use axum::{ }; use clap::{Parser, Subcommand}; use futures::TryStreamExt; +use headers::{HeaderMap, HeaderMapExt}; +use hyper::header; use iroh::rpc_protocol::ShareMode; use iroh::sync::store::DownloadPolicy; use iroh_base::hash::Hash; @@ -32,6 +34,7 @@ use trident_storage::config::{Config, TableConfig}; use trident_storage::error::Error; use trident_storage::iroh_node::IrohNode; +use trident_storage::ranges::parse_byte_range; /// Simple program to greet a person #[derive(Parser)] @@ -474,6 +477,7 @@ async fn table_foreign_insert( async fn table_root_get( State(state): State, method: Method, + headers: HeaderMap, Host(subdomain): Host, Path(key): Path, ) -> Response { @@ -493,45 +497,112 @@ async fn table_root_get( } Some(table) => table, }; - table_get(State(state), method, Path((table.to_string(), key))).await + table_get( + State(state), + method, + headers, + Path((table.to_string(), key)), + ) + .await } -async fn table_get( +async fn table_get_entire( State(state): State, method: Method, Path((table, key)): Path<(String, String)>, ) -> Response { - match state.iroh_node.read().await.table_get(&table, &key).await { - Ok(Some((reader, file_size, hash))) => match method { - Method::HEAD => Response::builder() - .header("Content-Length", file_size) - .header( - "Content-Type", - mime_guess::from_ext(&key) - .first_or_octet_stream() - .to_string(), - ) - .header("X-Iroh-Hash", hash.to_string()) + let iroh_node = state.iroh_node.read().await.table_get(&table, &key).await; + let (reader, file_size, hash) = match iroh_node { + Ok(Some(iroh_node)) => iroh_node, + Ok(None) => { + return Response::builder() + .status(StatusCode::NOT_FOUND) .body(Body::default()) - .unwrap(), - Method::GET => Response::builder() - .header("Content-Length", file_size) + .unwrap() + } + Err(e) => return e.into_response(), + }; + match method { + Method::HEAD => Response::builder() + .header(header::CONTENT_LENGTH, file_size) + .header( + header::CONTENT_TYPE, + mime_guess::from_ext(&key) + .first_or_octet_stream() + .to_string(), + ) + .header("X-Iroh-Hash", hash.to_string()) + .body(Body::default()) + .unwrap(), + Method::GET => Response::builder() + .header(header::CONTENT_LENGTH, file_size) + .header( + header::CONTENT_TYPE, + mime_guess::from_ext(&key) + .first_or_octet_stream() + .to_string(), + ) + .header("X-Iroh-Hash", hash.to_string()) + .body(Body::from_stream(ReaderStream::new(reader))) + .unwrap(), + _ => unreachable!(), + } +} + +async fn table_get( + State(state): State, + method: Method, + headers: HeaderMap, + Path((table, key)): Path<(String, String)>, +) -> Response { + match headers.typed_get::() { + None => table_get_entire(State(state), method, Path((table, key))).await, + Some(range_value) => { + let byte_range = parse_byte_range(range_value).unwrap(); + let (recv, size, is_all) = state + .iroh_node + .read() + .await + .table_get_partial(&table, &key, byte_range) + .await + .unwrap(); + let status_code = if is_all { + StatusCode::OK + } else { + StatusCode::PARTIAL_CONTENT + }; + let body = Body::from_stream(recv.into_stream()); + let builder = Response::builder() + .status(status_code) + .header(header::ACCEPT_RANGES, "bytes") + .header(header::CACHE_CONTROL, "public,max-age=31536000,immutable") .header( - "Content-Type", + header::CONTENT_TYPE, mime_guess::from_ext(&key) .first_or_octet_stream() .to_string(), - ) - .header("X-Iroh-Hash", hash.to_string()) - .body(Body::from_stream(ReaderStream::new(reader))) - .unwrap(), - _ => unreachable!(), - }, - Ok(None) => Response::builder() - .status(StatusCode::NOT_FOUND) - .body(Body::default()) - .unwrap(), - Err(e) => e.into_response(), + ); + // content-length needs to be the actual repsonse size + let transfer_size = match byte_range { + (Some(start), Some(end)) => end - start, + (Some(start), None) => size - start, + (None, Some(end)) => end, + (None, None) => size, + }; + let builder = builder.header(header::CONTENT_LENGTH, transfer_size); + + let builder = if byte_range.0.is_some() || byte_range.1.is_some() { + builder + .header( + header::CONTENT_RANGE, + format_content_range(byte_range.0, byte_range.1, size), + ) + .status(StatusCode::PARTIAL_CONTENT) + } else { + builder + }; + builder.body(body).unwrap() + } } } @@ -562,3 +633,13 @@ async fn table_ls(State(state): State, Path(table): Path) -> R .unwrap(), } } + +fn format_content_range(start: Option, end: Option, size: u64) -> String { + format!( + "bytes {}-{}/{}", + start.map(|x| x.to_string()).unwrap_or_default(), + end.map(|x| (x + 1).to_string()) + .unwrap_or_else(|| size.to_string()), + size + ) +} diff --git a/src/iroh_node.rs b/src/iroh_node.rs index f0cb4dd..e2bd95b 100644 --- a/src/iroh_node.rs +++ b/src/iroh_node.rs @@ -2,6 +2,8 @@ use crate::config::{Config, StorageEngineConfig, TableConfig}; use crate::error::{Error, Result}; use crate::table::Table; use async_stream::stream; +use bytes::Bytes; +use flume::Receiver; use futures::StreamExt; use iroh::bytes::store::fs::Store; use iroh::bytes::Hash; @@ -13,9 +15,11 @@ use iroh::sync::store::DownloadPolicy; use iroh::sync::{AuthorId, NamespaceId}; use iroh::ticket::DocTicket; use iroh_base::node_addr::NodeAddr; +use iroh_bytes::get::fsm::DecodeError; use std::collections::hash_map::Entry; use std::collections::HashMap; use std::fmt::{Debug, Formatter}; +use std::result; use std::str::FromStr; use std::sync::Arc; use std::time::Duration; @@ -385,6 +389,19 @@ impl IrohNode { table.get(key).await } + pub async fn table_get_partial( + &self, + table_name: &str, + key: &str, + byte_range: (Option, Option), + ) -> Result<(Receiver>, u64, bool)> { + let table = self + .tables + .get(table_name) + .ok_or_else(|| Error::missing_table(table_name))?; + table.get_partial(key, byte_range).await + } + pub async fn table_delete(&self, table_name: &str, key: &str) -> Result { let table = self .tables diff --git a/src/lib.rs b/src/lib.rs index 1ad8cc1..3b988d5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -23,6 +23,7 @@ pub mod error; pub mod file_shard; mod hash_ring; pub mod iroh_node; +pub mod ranges; mod table; mod utils; diff --git a/src/ranges.rs b/src/ranges.rs new file mode 100644 index 0000000..7635a2f --- /dev/null +++ b/src/ranges.rs @@ -0,0 +1,84 @@ +//! Utilities related to HTTP range requests. +use std::ops::Bound; + +use bytes::Bytes; +use headers::Range; +use iroh::bytes::store::bao_tree::ChunkNum; +use range_collections::{range_set::RangeSetRange, RangeSet2}; + +/// Given a range specified as arbitrary range bounds, normalize it into a range +/// that has inclusive start and exclusive end. +fn normalize_range(start: Bound, end: Bound) -> (Option, Option) { + match (start, end) { + (Bound::Included(start), Bound::Included(end)) => (Some(start), end.checked_add(1)), + (Bound::Included(start), Bound::Excluded(end)) => (Some(start), Some(end)), + (Bound::Included(start), Bound::Unbounded) => (Some(start), None), + (Bound::Excluded(start), Bound::Included(end)) => { + (start.checked_add(1), end.checked_add(1)) + } + (Bound::Excluded(start), Bound::Excluded(end)) => (start.checked_add(1), Some(end)), + (Bound::Excluded(start), Bound::Unbounded) => (start.checked_add(1), None), + (Bound::Unbounded, Bound::Included(end)) => (None, end.checked_add(1)), + (Bound::Unbounded, Bound::Excluded(end)) => (None, Some(end)), + (Bound::Unbounded, Bound::Unbounded) => (None, None), + } +} + +/// Convert a normalized range into a `RangeSet2` that represents the byte range. +pub fn to_byte_range(start: Option, end: Option) -> RangeSet2 { + match (start, end) { + (Some(start), Some(end)) => RangeSet2::from(start..end), + (Some(start), None) => RangeSet2::from(start..), + (None, Some(end)) => RangeSet2::from(..end), + (None, None) => RangeSet2::all(), + } +} + +/// Convert a normalized range into a `RangeSet2` that represents the chunk range. +/// +/// Ranges are rounded up so that the given byte range is completely covered by the chunk range. +pub fn to_chunk_range(start: Option, end: Option) -> RangeSet2 { + match (start, end) { + (Some(start), Some(end)) => { + RangeSet2::from(ChunkNum::full_chunks(start)..ChunkNum::chunks(end)) + } + (Some(start), None) => RangeSet2::from(ChunkNum::full_chunks(start)..), + (None, Some(end)) => RangeSet2::from(..ChunkNum::chunks(end)), + (None, None) => RangeSet2::all(), + } +} + +/// Given an incoming piece of data at an offset, and a set of ranges that are being requested, +/// split the data into parts that cover only requested ranges +pub fn slice(offset: u64, data: Bytes, ranges: RangeSet2) -> Vec { + let len = data.len() as u64; + let data_range = to_byte_range(Some(offset), offset.checked_add(len)); + let relevant = ranges & data_range; + relevant + .iter() + .map(|range| match range { + RangeSetRange::Range(range) => { + let start = (range.start - offset) as usize; + let end = (range.end - offset) as usize; + data.slice(start..end) + } + RangeSetRange::RangeFrom(range) => { + let start = (range.start - offset) as usize; + data.slice(start..) + } + }) + .collect() +} + +/// Parse the byte range from the request headers. +pub fn parse_byte_range(range: Range) -> anyhow::Result<(Option, Option)> { + println!("got range request {:?}", range); + let ranges = range.satisfiable_ranges(0).collect::>(); + if ranges.len() > 1 { + anyhow::bail!("multiple ranges not supported"); + } + let Some((start, end)) = ranges.into_iter().next() else { + anyhow::bail!("empty range"); + }; + Ok(normalize_range(start, end)) +} diff --git a/src/table.rs b/src/table.rs index e7121d0..9112573 100644 --- a/src/table.rs +++ b/src/table.rs @@ -2,10 +2,15 @@ use crate::config::{StorageEngineConfig, TableConfig}; use crate::error::{Error, Result}; use crate::file_shard::FileShard; use crate::hash_ring::HashRing; +use crate::ranges::{slice, to_byte_range, to_chunk_range}; use crate::utils::key_to_bytes; use crate::IrohDoc; use async_stream::stream; +use bytes::Bytes; use futures::{Stream, StreamExt}; +use iroh::bytes::get::fsm::{BlobContentNext, ConnectedNext, DecodeError, EndBlobNext}; +use iroh::bytes::protocol::RangeSpecSeq; +use iroh::bytes::store::bao_tree::io::BaoContentItem; use iroh::bytes::store::fs::Store; use iroh::bytes::store::{ExportMode, Map}; use iroh::bytes::Hash; @@ -24,6 +29,7 @@ use rand::thread_rng; use std::collections::{HashMap, HashSet}; use std::num::NonZeroUsize; use std::path::PathBuf; +use std::result; use std::sync::Arc; use tokio::io::AsyncRead; use tokio_task_pool::Pool; @@ -322,13 +328,7 @@ impl Table { }) .await .map_err(Error::io_error)?; - let import_result = progress.finish().await.map_err(Error::failed_download)?; - info!( - "found local_size {}, downloaded_size {}, content_len {}", - import_result.local_size, - import_result.downloaded_size, - entry.content_len() - ); + progress.finish().await.map_err(Error::failed_download)?; self.process_remote_entry(key, entry).await?; Ok(()) } @@ -474,6 +474,78 @@ impl Table { Ok(None) } + pub async fn get_partial( + &self, + key: &str, + range: (Option, Option), + ) -> Result<( + flume::Receiver>, + u64, + bool, + )> { + let (hash, _) = self.get_hash(key).await.unwrap().unwrap(); + + let connection = self.get_default_connection().await.unwrap(); + + let byte_ranges = to_byte_range(range.0, range.1); + let is_all = byte_ranges.is_all(); + let chunk_ranges = to_chunk_range(range.0, range.1); + let chunk_ranges = RangeSpecSeq::from_ranges(vec![chunk_ranges]); + + let request = iroh::bytes::protocol::GetRequest::new(hash, chunk_ranges.clone()); + let (send, recv) = flume::bounded::>(2); + + let req = iroh::bytes::get::fsm::start(connection.clone(), request); + let connected = req.next().await.unwrap(); + let ConnectedNext::StartRoot(x) = connected.next().await.unwrap() else { + panic!("unexpected") + }; + tracing::trace!("connected"); + let (mut current, size) = x.next().next().await.unwrap(); + tokio::spawn(async move { + let end = loop { + match current.next().await { + BlobContentNext::More((next, Ok(item))) => { + match item { + BaoContentItem::Leaf(leaf) => { + tracing::trace!("got leaf {} {}", leaf.offset, leaf.data.len()); + for item in slice(leaf.offset, leaf.data, byte_ranges.clone()) { + send.send_async(Ok(item)).await?; + } + } + BaoContentItem::Parent(parent) => { + tracing::trace!("got parent {:?}", parent); + } + } + current = next; + } + BlobContentNext::More((_, Err(err))) => { + send.send_async(Err(err)).await?; + anyhow::bail!("error"); + } + BlobContentNext::Done(end) => break end, + } + }; + let EndBlobNext::Closing(at_closing) = end.next() else { + anyhow::bail!("unexpected response"); + }; + let _stats = at_closing.next().await?; + Ok(()) + }); + Ok((recv, size, is_all)) + } + + /// Get the mime type for a hash from the remote node. + async fn get_default_connection(&self) -> anyhow::Result { + self.node + .magic_endpoint() + .connect( + self.node.my_addr().await.unwrap(), + iroh::bytes::protocol::ALPN, + ) + .await + } + pub fn get_all(&self) -> impl Stream> { let iroh_doc = self.iroh_doc.clone(); stream! {