Skip to content

Commit

Permalink
[feat] Add Range queries
Browse files Browse the repository at this point in the history
  • Loading branch information
ppodolsky committed Apr 26, 2024
1 parent eb998fe commit e17219b
Show file tree
Hide file tree
Showing 7 changed files with 75 additions and 192 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ clap = { version = "4.5", features = ["derive"] }
flume = "0.11"
iroh = { version = "0.14.0", branch = "main", git = "https://github.com/izihawa/iroh", features = [ "metrics" ] }
iroh-base = { version = "0.14.0", branch = "main", git = "https://github.com/izihawa/iroh" }
iroh-bytes = { version = "0.14.0", branch = "main", git = "https://github.com/izihawa/iroh" }
iroh-io = { version = "0.6.0" }
md5 = "0.7"
percent-encoding = "2.3.1"
Expand Down
139 changes: 64 additions & 75 deletions bin/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use axum::{
};
use clap::{Parser, Subcommand};
use futures::TryStreamExt;
use headers::{HeaderMap, HeaderMapExt};
use headers::{HeaderMap, HeaderMapExt, Range};
use hyper::header;
use iroh::rpc_protocol::ShareMode;
use iroh::sync::store::DownloadPolicy;
Expand Down Expand Up @@ -511,15 +511,16 @@ async fn table_root_get(
)
.await
}

async fn table_get_entire(
async fn table_get(
State(state): State<AppState>,
method: Method,
headers: HeaderMap,
Path((table, key)): Path<(String, String)>,
) -> Response {
let iroh_node = state.iroh_node.read().await.table_get(&table, &key).await;
let (reader, file_size, hash) = match iroh_node {
Ok(Some(iroh_node)) => iroh_node,
let iroh_node = state.iroh_node.read().await;
let entry = iroh_node.table_get(&table, &key).await;
let entry = match entry {
Ok(Some(entry)) => entry,
Ok(None) => {
return Response::builder()
.status(StatusCode::NOT_FOUND)
Expand All @@ -528,86 +529,74 @@ async fn table_get_entire(
}
Err(e) => return e.into_response(),
};
match method {
Method::HEAD => Response::builder()
.header(header::CONTENT_LENGTH, file_size)
.header(
header::CONTENT_TYPE,
mime_guess::from_ext(&key)
.first_or_octet_stream()
.to_string(),
)
.header("X-Iroh-Hash", hash.to_string())
.body(Body::default())
.unwrap(),
Method::GET => Response::builder()
.header(header::CONTENT_LENGTH, file_size)
.header(
header::CONTENT_TYPE,
mime_guess::from_ext(&key)
.first_or_octet_stream()
.to_string(),
)
.header("X-Iroh-Hash", hash.to_string())
.body(Body::from_stream(ReaderStream::new(reader)))
.unwrap(),
_ => unreachable!(),
}
}

async fn table_get(
State(state): State<AppState>,
method: Method,
headers: HeaderMap,
Path((table, key)): Path<(String, String)>,
) -> Response {
match headers.typed_get::<headers::Range>() {
None => table_get_entire(State(state), method, Path((table, key))).await,
Some(range_value) => {
let byte_range = parse_byte_range(range_value).unwrap();
let (recv, size, is_all) = state
.iroh_node
.read()
.await
.table_get_partial(&table, &key, byte_range)
let response_builder =
Response::builder().header("X-Iroh-Hash", entry.content_hash().to_string());

let (reader, response_builder) = match headers.typed_get::<Range>() {
None => (
iroh_node
.client()
.blobs
.read(entry.content_hash())
.await
.unwrap();
let status_code = if is_all {
StatusCode::OK
} else {
StatusCode::PARTIAL_CONTENT
};
let body = Body::from_stream(recv.into_stream());
let builder = Response::builder()
.status(status_code)
.header(header::ACCEPT_RANGES, "bytes")
.map_err(Error::blobs),
response_builder
.status(StatusCode::OK)
.header(header::CONTENT_LENGTH, entry.content_len())
.header(
header::CONTENT_TYPE,
mime_guess::from_ext(&key)
mime_guess::from_path(&key)
.first_or_octet_stream()
.to_string(),
);
// content-length needs to be the actual repsonse size
let transfer_size = match byte_range {
(Some(start), Some(end)) => end - start,
(Some(start), None) => size - start,
(None, Some(end)) => end,
(None, None) => size,
),
),
Some(range_value) => {
let (start, end) = match parse_byte_range(range_value).map_err(Error::blobs) {
Ok((start, end)) => (start, end),
Err(e) => return e.into_response(),
};
let builder = builder.header(header::CONTENT_LENGTH, transfer_size);

let builder = if byte_range.0.is_some() || byte_range.1.is_some() {
builder
let offset = start.unwrap_or(0);
let length = end.map(|end| end - offset);
let definite_length = length.unwrap_or(entry.content_len() - offset);
(
iroh_node
.client()
.blobs
.read_at(entry.content_hash(), offset, length.map(|x| x as usize))
.await
.map_err(Error::blobs),
response_builder
.status(StatusCode::PARTIAL_CONTENT)
.header(header::ACCEPT_RANGES, "bytes")
.header(header::CONTENT_LENGTH, definite_length)
.header(
header::CONTENT_RANGE,
format_content_range(byte_range.0, byte_range.1, size),
format_content_range(start, end, definite_length),
)
.status(StatusCode::PARTIAL_CONTENT)
} else {
builder
};
builder.body(body).unwrap()
.header(
header::CONTENT_TYPE,
if definite_length == entry.content_len() {
mime_guess::from_path(&key)
.first_or_octet_stream()
.to_string()
} else {
mime_guess::mime::OCTET_STREAM.to_string()
},
),
)
}
};
let reader = match reader {
Ok(reader) => reader,
Err(e) => return e.into_response(),
};
match method {
Method::HEAD => response_builder.body(Body::default()).unwrap(),
Method::GET => response_builder
.body(Body::from_stream(ReaderStream::new(reader)))
.unwrap(),
_ => unreachable!(),
}
}

Expand Down
25 changes: 6 additions & 19 deletions src/iroh_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ use crate::config::{Config, StorageEngineConfig, TableConfig};
use crate::error::{Error, Result};
use crate::table::Table;
use async_stream::stream;
use bytes::Bytes;
use flume::Receiver;
use futures::StreamExt;
use iroh::bytes::store::fs::Store;
use iroh::bytes::Hash;
Expand All @@ -15,11 +13,9 @@ use iroh::sync::store::DownloadPolicy;
use iroh::sync::{AuthorId, NamespaceId};
use iroh::ticket::DocTicket;
use iroh_base::node_addr::NodeAddr;
use iroh_bytes::get::fsm::DecodeError;
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::fmt::{Debug, Formatter};
use std::result;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
Expand Down Expand Up @@ -90,7 +86,7 @@ impl IrohNode {
.docs
.list()
.await
.unwrap()
.map_err(Error::io_error)?
.map(|x| x.unwrap().0)
.collect::<Vec<_>>()
.await
Expand Down Expand Up @@ -176,6 +172,10 @@ impl IrohNode {
Ok(iroh_node)
}

pub fn client(&self) -> &iroh::client::mem::Iroh {
self.node.client()
}

pub async fn tables_ls(&self) -> HashMap<String, TableConfig> {
self.config.read().await.iroh.tables.clone()
}
Expand Down Expand Up @@ -381,27 +381,14 @@ impl IrohNode {
&self,
table_name: &str,
key: &str,
) -> Result<Option<(Box<dyn AsyncRead + Unpin + Send>, u64, Hash)>> {
) -> Result<Option<iroh::client::Entry>> {
let table = self
.tables
.get(table_name)
.ok_or_else(|| Error::missing_table(table_name))?;
table.get(key).await
}

pub async fn table_get_partial(
&self,
table_name: &str,
key: &str,
byte_range: (Option<u64>, Option<u64>),
) -> Result<(Receiver<result::Result<Bytes, DecodeError>>, u64, bool)> {
let table = self
.tables
.get(table_name)
.ok_or_else(|| Error::missing_table(table_name))?;
table.get_partial(key, byte_range).await
}

pub async fn table_delete(&self, table_name: &str, key: &str) -> Result<usize> {
let table = self
.tables
Expand Down
1 change: 0 additions & 1 deletion src/ranges.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ pub fn slice(offset: u64, data: Bytes, ranges: RangeSet2<u64>) -> Vec<Bytes> {

/// Parse the byte range from the request headers.
pub fn parse_byte_range(range: Range) -> anyhow::Result<(Option<u64>, Option<u64>)> {
println!("got range request {:?}", range);
let ranges = range.satisfiable_ranges(0).collect::<Vec<_>>();
if ranges.len() > 1 {
anyhow::bail!("multiple ranges not supported");
Expand Down
Loading

0 comments on commit e17219b

Please sign in to comment.