Skip to content

Commit

Permalink
[feat] Development
Browse files Browse the repository at this point in the history
  • Loading branch information
ppodolsky committed Feb 15, 2024
1 parent adaa185 commit 6d78693
Show file tree
Hide file tree
Showing 11 changed files with 270 additions and 223 deletions.
238 changes: 107 additions & 131 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ aws-credential-types = { version = "1.0", features = ["hardcoded-credentials"] }
axum = "0.7"
bisection = "0.1"
clap = { version = "4.4", features = ["derive"] }
iroh = { version = "0.12.0", branch = "perf-startup", git = "https://github.com/n0-computer/iroh", features = [ "flat-db", "metrics" ] }
iroh = { version = "0.12.0", branch = "main", git = "https://github.com/n0-computer/iroh", features = [ "flat-db", "metrics" ] }
iroh-io = { version = "0.3.0" }
md5 = "0.7"
percent-encoding = "2.3.1"
Expand Down
12 changes: 12 additions & 0 deletions bin/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use axum::{
};
use clap::Parser;
use futures::TryStreamExt;
use iroh::sync::store::DownloadPolicy;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
Expand All @@ -18,6 +19,7 @@ use tokio::sync::RwLock;
use tokio_util::io::{ReaderStream, StreamReader};
use tower_http::trace::{self, TraceLayer};
use tracing::Level;
use tracing_subscriber::EnvFilter;
use trident_storage::config::{
load_config, save_config, Config, MirroringConfig, SinkConfig, TableConfig,
};
Expand All @@ -43,6 +45,7 @@ struct TablesCreateRequest {
#[derive(Deserialize)]
struct TablesImportRequest {
ticket: String,
download_policy: DownloadPolicy,
storage: Option<String>,
mirroring: Option<MirroringConfig>,
}
Expand Down Expand Up @@ -132,8 +135,16 @@ fn main() -> Result<(), Error> {
// initialize tracing
tracing_subscriber::fmt()
.with_target(false)
.with_env_filter(EnvFilter::from_default_env())
.compact()
.init();
/*console_subscriber::ConsoleLayer::builder()
// set how long the console will retain data from completed tasks
.retention(Duration::from_secs(120))
// set the address the server is bound to
.server_addr(([0, 0, 0, 0], 6669))
// ... other configurations ...
.init();*/
Builder::new_multi_thread()
.enable_all()
.build()
Expand Down Expand Up @@ -205,6 +216,7 @@ async fn tables_import(
.tables_import(
&table,
&tables_import_request.ticket,
tables_import_request.download_policy,
tables_import_request.storage.as_deref(),
tables_import_request.mirroring,
)
Expand Down
5 changes: 5 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::error::Error;
use iroh::sync::store::DownloadPolicy;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::path::PathBuf;
Expand All @@ -17,6 +18,8 @@ pub enum StorageEngineConfig {
pub struct FSStorageEngineConfig {
pub replicas: u8,
pub fs_shards: Vec<FSShardConfig>,
#[serde(default = "return_false")]
pub is_import_missing_enabled: bool,
}

#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
Expand Down Expand Up @@ -57,6 +60,8 @@ pub struct MirroringConfig {
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct TableConfig {
pub id: String,
#[serde(default = "DownloadPolicy::default")]
pub download_policy: DownloadPolicy,
#[serde(skip_serializing_if = "Option::is_none")]
pub mirroring: Option<MirroringConfig>,
pub storage_engine: StorageEngineConfig,
Expand Down
21 changes: 15 additions & 6 deletions src/iroh_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ use async_stream::stream;
use futures::StreamExt;
use iroh::bytes::Hash;
use iroh::client::quic::RPC_ALPN;
use iroh::net::derp::DerpMode;
use iroh::node::{GcPolicy, Node};
use iroh::rpc_protocol::{ProviderRequest, ProviderResponse, ShareMode};
use iroh::sync::store::DownloadPolicy;
use iroh::sync::{AuthorId, NamespaceId};
use iroh::ticket::DocTicket;
use quic_rpc::transport::quinn::QuinnServerEndpoint;
Expand Down Expand Up @@ -87,7 +87,6 @@ impl IrohNode {
let mut node_builder = Node::builder(db, docs)
.secret_key(secret_key)
.peers_data_path(peer_data_path)
.derp_mode(DerpMode::Default)
.bind_port(config_lock.iroh.bind_port)
.rpc_endpoint(rpc_endpoint);

Expand Down Expand Up @@ -138,7 +137,11 @@ impl IrohNode {
.await
.map_err(Error::table)?
.unwrap();
iroh_doc.start_sync(vec![]).await.unwrap();
iroh_doc
.set_download_policy(table_config.download_policy.clone())
.await
.map_err(Error::doc)?;
iroh_doc.start_sync(vec![]).await.map_err(Error::doc)?;
let storage_engine = match &table_config.storage_engine {
StorageEngineConfig::Iroh => {
StorageEngine::Iroh(IrohStorageEngine::new(author_id, iroh_doc.clone()))
Expand Down Expand Up @@ -222,7 +225,6 @@ impl IrohNode {
Entry::Occupied(_) => Err(Error::existing_table(table_name)),
Entry::Vacant(entry) => {
let iroh_doc = self.sync_client.docs.create().await.map_err(Error::table)?;

let (storage_engine, storage_engine_config) = match storage_name {
None => (
StorageEngine::Iroh(IrohStorageEngine::new(
Expand Down Expand Up @@ -266,6 +268,7 @@ impl IrohNode {
table_name.to_string(),
TableConfig {
id: iroh_doc.id().to_string(),
download_policy: DownloadPolicy::default(),
mirroring: mirroring_config,
storage_engine: storage_engine_config,
},
Expand All @@ -280,6 +283,7 @@ impl IrohNode {
&mut self,
table_name: &str,
table_ticket: &str,
download_policy: DownloadPolicy,
storage_name: Option<&str>,
mirroring_config: Option<MirroringConfig>,
) -> Result<NamespaceId> {
Expand All @@ -289,9 +293,13 @@ impl IrohNode {
let iroh_doc = self
.sync_client
.docs
.import(DocTicket::from_str(table_ticket).unwrap())
.import(DocTicket::from_str(table_ticket).map_err(Error::doc)?)
.await
.map_err(Error::table)?;
iroh_doc
.set_download_policy(download_policy.clone())
.await
.map_err(Error::doc)?;
let (storage_engine, storage_engine_config) = match storage_name {
None => (
StorageEngine::Iroh(IrohStorageEngine::new(
Expand Down Expand Up @@ -336,6 +344,7 @@ impl IrohNode {
table_name.to_string(),
TableConfig {
id: iroh_doc.id().to_string(),
download_policy: download_policy,
mirroring: mirroring_config,
storage_engine: storage_engine_config,
},
Expand Down Expand Up @@ -427,7 +436,7 @@ impl IrohNode {

pub async fn table_exists(&self, table_name: &str, key: &str) -> Result<bool> {
match self.table_storages.get(table_name) {
Some(table_storage) => Ok(table_storage.get_hash(key).await?.is_some()),
Some(table_storage) => table_storage.exists(key).await,
None => Err(Error::missing_table(table_name)),
}
}
Expand Down
142 changes: 81 additions & 61 deletions src/storages/fs_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,13 @@ use iroh::sync::store::Query;
use iroh::sync::{AuthorId, ContentStatus};
use lru::LruCache;
use std::collections::{HashMap, HashSet};
use std::io;
use std::num::NonZeroUsize;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::io::AsyncRead;
use tokio_task_pool::Pool;
use tokio_util::bytes;
use tracing::{info, info_span, warn, Instrument};
use tracing::{error, info, info_span, warn, Instrument};

#[derive(Clone)]
pub struct FSStorageEngine {
Expand Down Expand Up @@ -95,65 +94,69 @@ impl FSStorageEngine {

let fs_storage_clone = fs_storage.clone();

tokio::spawn(async move {
let all_keys: Arc<HashSet<_>> = Arc::new(
fs_storage_clone
.iroh_doc()
.get_many(Query::all())
.await
.map_err(Error::doc)?
.map(|x| bytes::Bytes::copy_from_slice(x.unwrap().key()))
.collect()
.await,
);
let pool = Arc::new(Pool::bounded(16));

for fs_shard in &fs_storage_clone.fs_shards.values() {
let fs_storage_clone = fs_storage_clone.clone();
let fs_shard = fs_shard.clone();
let all_keys = all_keys.clone();
let pool = pool.clone();
tokio::spawn(async move {
let base_path = fs_shard.path().to_path_buf();
let mut read_dir_stream = tokio::fs::read_dir(&base_path)
.await
.map_err(Error::io_error)?;
while let Some(entry) = read_dir_stream
.next_entry()
if fs_storage_config.is_import_missing_enabled {
tokio::spawn(async move {
let all_keys: Arc<HashSet<_>> = Arc::new(
fs_storage_clone
.iroh_doc()
.get_many(Query::all())
.await
.map_err(Error::io_error)?
{
let key = key_to_bytes(&entry.file_name().to_string_lossy());
if all_keys.contains(&key) || key.starts_with(&[b'~']) {
info!(action = "skipped", key = ?entry.file_name());
continue;
}
pool.spawn({
let iroh_doc = fs_storage_clone.iroh_doc().clone();
async move {
let import_progress = iroh_doc
.import_file(
fs_storage_clone.author_id,
key,
&entry.path(),
true,
)
.await
.map_err(Error::doc)
.unwrap();
import_progress.finish().await.map_err(Error::hash).unwrap();
info!(action = "imported", key = ?entry.file_name())
.map_err(Error::doc)?
.map(|x| bytes::Bytes::copy_from_slice(x.unwrap().key()))
.collect()
.await,
);
let pool = Arc::new(Pool::bounded(16));

for fs_shard in fs_storage_clone.fs_shards.values() {
let fs_storage_clone = fs_storage_clone.clone();
let fs_shard = fs_shard.clone();
let all_keys = all_keys.clone();
let pool = pool.clone();
tokio::spawn(async move {
let base_path = fs_shard.path().to_path_buf();
let mut read_dir_stream = tokio::fs::read_dir(&base_path)
.await
.map_err(Error::io_error)?;
while let Some(entry) = read_dir_stream
.next_entry()
.await
.map_err(Error::io_error)?
{
let key = key_to_bytes(&entry.file_name().to_string_lossy());
if all_keys.contains(&key) || key.starts_with(&[b'~']) {
continue;
}
.instrument(info_span!(parent: None, "restore"))
})
.await
.unwrap();
}
Ok::<(), Error>(())
});
}
Ok::<(), Error>(())
});
pool.spawn({
let iroh_doc = fs_storage_clone.iroh_doc().clone();
async move {
let import_progress = iroh_doc
.import_file(
fs_storage_clone.author_id,
key,
&entry.path(),
true,
)
.await
.map_err(Error::doc)
.unwrap();
match import_progress.finish().await.map_err(Error::hash) {
Err(error) => error!(error = ?error, path = ?entry.path(), key = ?entry.file_name(), "import_progress_error"),
_ => {}
};
info!(action = "imported", key = ?entry.file_name())
}
.instrument(info_span!(parent: None, "restore"))
})
.await
.unwrap();
}
Ok::<(), Error>(())
});
}
Ok::<(), Error>(())
});
}
Ok(fs_storage)
}

Expand Down Expand Up @@ -223,10 +226,15 @@ impl FSStorageEngine {
})
}

pub async fn exists(&self, key: &str) -> io::Result<Option<PathBuf>> {
pub async fn exists(&self, key: &str) -> Result<Option<PathBuf>> {
for file_shard_config in self.hash_ring.range(key, 1) {
let file_shard = &self.fs_shards[&file_shard_config.name];
if file_shard.exists(key).await?.is_some() {
if file_shard
.exists(key)
.await
.map_err(Error::io_error)?
.is_some()
{
return Ok(Some(file_shard.get_path_for(key)));
}
}
Expand All @@ -236,4 +244,16 @@ impl FSStorageEngine {
pub fn iroh_doc(&self) -> &IrohDoc {
&self.iroh_doc
}

pub async fn get(&self, key: &str) -> Result<Box<dyn AsyncRead + Unpin + Send>> {
for file_shard_config in self.hash_ring.range(key, 1) {
let file_shard = &self.fs_shards[&file_shard_config.name];
match file_shard.open_store(key).await {
Ok(Some(file)) => return Ok(Box::new(file)),
Ok(None) => return Err(Error::io_error("missing file")),
Err(e) => return Err(Error::io_error(e)),
}
}
Err(Error::io_error("missing shard"))
}
}
22 changes: 20 additions & 2 deletions src/storages/iroh_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::error::{Error, Result};
use crate::utils::key_to_bytes;
use crate::IrohDoc;
use iroh::bytes::Hash;
use iroh::sync::store::Query;
use iroh::sync::store::{Query, SortBy, SortDirection};
use iroh::sync::AuthorId;
use tokio::io::{AsyncRead, AsyncReadExt};

Expand Down Expand Up @@ -49,7 +49,9 @@ impl IrohStorageEngine {
pub async fn exists(&self, key: &str) -> Result<bool> {
Ok(self
.iroh_doc
.get_one(Query::key_exact(key_to_bytes(key)))
.get_one(
Query::key_exact(key_to_bytes(key)).sort_by(SortBy::KeyAuthor, SortDirection::Asc),
)
.await
.map_err(Error::doc)?
.is_some())
Expand All @@ -58,4 +60,20 @@ impl IrohStorageEngine {
pub const fn iroh_doc(&self) -> &IrohDoc {
&self.iroh_doc
}

pub async fn get(&self, key: &str) -> Result<Box<dyn AsyncRead + Unpin + Send>> {
Ok(Box::new(
self.iroh_doc()
.get_one(
Query::key_exact(key_to_bytes(key))
.sort_by(SortBy::KeyAuthor, SortDirection::Asc),
)
.await
.map_err(Error::doc)?
.ok_or_else(|| Error::missing_key(key))?
.content_reader(self.iroh_doc())
.await
.map_err(Error::doc)?,
))
}
}
Loading

0 comments on commit 6d78693

Please sign in to comment.