diff --git a/examples/config.yaml b/examples/config.yaml index c15e722..c0f339d 100644 --- a/examples/config.yaml +++ b/examples/config.yaml @@ -9,10 +9,9 @@ iroh: max_rpc_connections: 32 max_rpc_streams: 1024 sinks: {} - fs_storages: + storages: default: - replicas: 1 - fs_shards: + shards: - name: shard1 path: /tmp/trident/data/shard1 weight: 1 diff --git a/src/config.rs b/src/config.rs index af1dff2..ca58503 100644 --- a/src/config.rs +++ b/src/config.rs @@ -13,8 +13,7 @@ fn return_true() -> bool { #[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] pub struct StorageEngineConfig { - pub replicas: u8, - pub fs_shards: Vec, + pub shards: Vec, #[serde(default = "return_false")] pub is_import_missing_enabled: bool, } @@ -83,7 +82,7 @@ pub struct IrohConfig { #[serde(default = "HashMap::new")] pub sinks: HashMap, #[serde(default = "HashMap::new")] - pub fs_storages: HashMap, + pub storages: HashMap, pub gc_interval_secs: Option, } @@ -103,7 +102,7 @@ pub async fn load_config(config_path: &str) -> Result { pub async fn save_config(config_path: &str, config: &Config) -> Result<(), Error> { tokio::fs::write( config_path, - serde_yaml::to_string(config).unwrap().as_bytes(), + serde_yaml::to_string(config).expect("unreachable").as_bytes(), ) .await .map_err(Error::io_error) diff --git a/src/error.rs b/src/error.rs index 6b7e8bf..aa272df 100644 --- a/src/error.rs +++ b/src/error.rs @@ -19,8 +19,6 @@ pub enum Error { TableTicket { description: String }, #[error("blobs: {description}")] Blobs { description: String }, - #[error("hash_error: {description}")] - Hash { description: String }, #[error("entry_error: {description}")] Entry { description: String }, #[error("io_error: {description}")] @@ -39,6 +37,8 @@ pub enum Error { ExistingTable { description: String }, #[error("storage: {description}")] Storage { description: String }, + #[error("key: {description}")] + IncorrectKey { description: String }, } impl Error { @@ -78,12 +78,6 @@ impl Error { } } - pub fn hash(error: impl Display) -> Self { - Error::Hash { - description: error.to_string(), - } - } - pub fn entry(error: impl Display) -> Self { Error::Entry { description: error.to_string(), @@ -136,6 +130,12 @@ impl Error { description: error.to_string(), } } + + pub fn incorrect_key(error: impl Display) -> Self { + Error::IncorrectKey { + description: error.to_string(), + } + } } impl IntoResponse for Error { diff --git a/src/iroh_node.rs b/src/iroh_node.rs index 99b47e2..f790f1b 100644 --- a/src/iroh_node.rs +++ b/src/iroh_node.rs @@ -2,7 +2,6 @@ use crate::config::{Config, SinkConfig, StorageEngineConfig, TableConfig}; use crate::error::{Error, Result}; use crate::sinks::{IpfsSink, S3Sink, Sink}; use crate::storage::Storage; -use crate::utils::bytes_to_key; use crate::IrohClient; use async_stream::stream; use futures::StreamExt; @@ -113,7 +112,7 @@ impl IrohNode { } let author_id = match &config_lock.iroh.author { - Some(author) => AuthorId::from_str(author).unwrap(), + Some(author) => AuthorId::from_str(author).map_err(Error::author)?, None => { let author_id = sync_client.authors.create().await.map_err(Error::author)?; config_lock.iroh.author = Some(author_id.to_string()); @@ -136,14 +135,14 @@ impl IrohNode { } let mut table_storages = HashMap::new(); - let storage_configs = config_lock.iroh.fs_storages.clone(); + let storage_configs = config_lock.iroh.storages.clone(); for (table_name, table_config) in &mut config_lock.iroh.tables { let iroh_doc = sync_client .docs - .open(NamespaceId::from_str(&table_config.id).unwrap()) + .open(NamespaceId::from_str(&table_config.id).map_err(Error::storage)?) .await .map_err(Error::table)? - .unwrap(); + .ok_or_else(|| Error::table(format!("{} does not exist", table_config.id)))?; iroh_doc .set_download_policy(table_config.download_policy.clone()) .await @@ -169,7 +168,7 @@ impl IrohNode { table_storages.insert(table_name.clone(), storage_engine); } - let fs_storage_configs = config_lock.iroh.fs_storages.clone(); + let fs_storage_configs = config_lock.iroh.storages.clone(); drop(config_lock); @@ -396,10 +395,15 @@ impl IrohNode { } pub fn table_keys(&self, table_name: &str) -> Option>> { - self.table_storages.get(table_name).cloned().map_or_else(|| None, |table_storage| Some(stream! { - for await el in table_storage.get_all() { - yield Ok(format!("{}\n", std::str::from_utf8(bytes_to_key(el.unwrap().key())).unwrap())) - } - })) + self.table_storages.get(table_name).cloned().map_or_else( + || None, + |table_storage| { + Some(stream! { + for await el in table_storage.get_all() { + yield Ok(format!("{}\n", std::str::from_utf8(el.unwrap().key()).unwrap())) + } + }) + }, + ) } } diff --git a/src/sinks/ipfs_sink.rs b/src/sinks/ipfs_sink.rs index 5f75990..2b9acba 100644 --- a/src/sinks/ipfs_sink.rs +++ b/src/sinks/ipfs_sink.rs @@ -1,7 +1,7 @@ use crate::config::IpfsConfig; use crate::error::Error; use crate::sinks::Sink; -use crate::utils::{bytes_to_key, FRAGMENT}; +use crate::utils::FRAGMENT; use axum::async_trait; use percent_encoding::utf8_percent_encode; use reqwest::header::HeaderMap; @@ -32,19 +32,18 @@ impl Sink for IpfsSink { &self.name } - async fn send(&self, key: &[u8], path: &Path) -> Result<(), Error> { + async fn send(&self, key: &str, path: &Path) -> Result<(), Error> { // ToDo: Remove allocating and return stream // https://github.com/awslabs/aws-sdk-rust/discussions/361 - let encoded_key = - utf8_percent_encode(std::str::from_utf8(bytes_to_key(key)).unwrap(), FRAGMENT) - .collect::() - .to_lowercase(); + let encoded_key = utf8_percent_encode(key, FRAGMENT) + .collect::() + .to_lowercase(); let mut headers = HeaderMap::new(); headers.insert("Abspath", path.to_string_lossy().parse().unwrap()); - let file_part = reqwest::multipart::Part::bytes(tokio::fs::read(path).await.unwrap()) + let file_part = reqwest::multipart::Part::bytes(tokio::fs::read(path).await.map_err(Error::io_error)?) .file_name(encoded_key) .headers(headers) .mime_str("application/octet-stream") @@ -61,7 +60,8 @@ impl Sink for IpfsSink { .await .map_err(Error::sink)?; if !res.status().is_success() { - return Err(Error::sink(res.text().await.unwrap())); + let res_text = res.text().await.map_err(Error::sink)?; + return Err(Error::sink(res_text)); } Ok(()) } diff --git a/src/sinks/mod.rs b/src/sinks/mod.rs index 8c7ec7e..78e9e01 100644 --- a/src/sinks/mod.rs +++ b/src/sinks/mod.rs @@ -11,5 +11,5 @@ pub use s3_sink::S3Sink; #[async_trait] pub trait Sink: Send + Sync { fn name(&self) -> &str; - async fn send(&self, key: &[u8], path: &Path) -> Result<(), Error>; + async fn send(&self, key: &str, path: &Path) -> Result<(), Error>; } diff --git a/src/sinks/s3_sink.rs b/src/sinks/s3_sink.rs index 05f4e18..e2dfe75 100644 --- a/src/sinks/s3_sink.rs +++ b/src/sinks/s3_sink.rs @@ -1,7 +1,7 @@ use crate::config::S3Config; use crate::error::Error; use crate::sinks::Sink; -use crate::utils::{bytes_to_key, FRAGMENT}; +use crate::utils::FRAGMENT; use aws_credential_types::Credentials; use aws_sdk_s3::config::{BehaviorVersion, Region}; use aws_sdk_s3::primitives::ByteStream; @@ -52,13 +52,12 @@ impl Sink for S3Sink { &self.name } - async fn send(&self, key: &[u8], path: &Path) -> Result<(), Error> { + async fn send(&self, key: &str, path: &Path) -> Result<(), Error> { // ToDo: Remove allocating and return stream // https://github.com/awslabs/aws-sdk-rust/discussions/361 - let encoded_key = - utf8_percent_encode(std::str::from_utf8(bytes_to_key(key)).unwrap(), FRAGMENT) - .collect::() - .to_lowercase(); + let encoded_key = utf8_percent_encode(key, FRAGMENT) + .collect::() + .to_lowercase(); let body = ByteStream::from_path(Path::new(path)) .await .map_err(Error::sink)?; diff --git a/src/storage.rs b/src/storage.rs index 9032a7b..6b6e602 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -3,10 +3,11 @@ use crate::error::{Error, Result}; use crate::file_shard::FileShard; use crate::hash_ring::HashRing; use crate::sinks::Sink; -use crate::utils::{bytes_to_key, key_to_bytes}; +use crate::utils::key_to_bytes; use crate::{IrohClient, IrohDoc}; use async_stream::stream; use futures::{Stream, StreamExt}; +use iroh::bytes::store::ExportMode; use iroh::bytes::Hash; use iroh::client::{Entry, LiveEvent}; use iroh::rpc_protocol::ShareMode; @@ -18,7 +19,6 @@ use std::collections::{HashMap, HashSet}; use std::num::NonZeroUsize; use std::path::PathBuf; use std::sync::Arc; -use iroh::bytes::store::ExportMode; use tokio::io::AsyncRead; use tokio_task_pool::Pool; use tokio_util::bytes; @@ -32,8 +32,7 @@ pub struct Storage { iroh_doc: IrohDoc, sync_client: IrohClient, hash_ring: HashRing, - fs_shards: HashMap, - replicas: u8, + shards: HashMap, sinks: Vec>, keep_blob: bool, } @@ -50,29 +49,32 @@ impl Storage { cancellation_token: CancellationToken, task_tracker: TaskTracker, ) -> Result { - let mut fs_shards = HashMap::new(); - for fs_shard in &storage_config.fs_shards { - fs_shards.insert( - fs_shard.name.clone(), - FileShard::new(&fs_shard.path.join(table_name)).await?, + let mut shards = HashMap::new(); + for shard in &storage_config.shards { + shards.insert( + shard.name.clone(), + FileShard::new(&shard.path.join(table_name)).await?, ); } let storage = Storage { author_id, iroh_doc: iroh_doc.clone(), sync_client, - hash_ring: HashRing::with_hasher(storage_config.fs_shards.iter()), - fs_shards, - replicas: storage_config.replicas, + hash_ring: HashRing::with_hasher(storage_config.shards.iter()), + shards, sinks, keep_blob, }; let storage_clone = storage.clone(); + let mut stream = storage_clone + .iroh_doc() + .subscribe() + .await + .map_err(Error::storage)?; task_tracker.spawn({ let cancellation_token = cancellation_token.clone(); async move { - let mut stream = storage_clone.iroh_doc().subscribe().await.unwrap(); let mut wait_list = LruCache::new(NonZeroUsize::new(4096).expect("not possible")); info!("started"); loop { @@ -82,19 +84,26 @@ impl Storage { return Ok::<(), Error>(()) }, event = stream.next() => { - if let Some(event) = event { - let event = event.unwrap(); - match &event { + let event = match event { + Some(Ok(event)) => event, + Some(Err(error)) => { + warn!(error = ?error); + continue; + } + None => return Ok::<(), Error>(()), + }; + match &event { LiveEvent::InsertRemote { entry, content_status, .. } => { + let key = std::str::from_utf8(entry.key()).map_err(Error::incorrect_key)?; info!(event = ?event); match content_status { ContentStatus::Complete => { - storage_clone.process_remote_entry(entry).await?; - storage_clone.process_sinks(entry).await; + storage_clone.process_remote_entry(key, entry).await?; + storage_clone.process_sinks(key).await?; } ContentStatus::Missing | ContentStatus::Incomplete => { wait_list.put(entry.content_hash(), entry.clone()); @@ -102,7 +111,8 @@ impl Storage { }; } LiveEvent::InsertLocal { entry } => { - storage_clone.process_sinks(entry).await; + let key = std::str::from_utf8(entry.key()).map_err(Error::incorrect_key)?; + storage_clone.process_sinks(key).await?; } LiveEvent::ContentReady { hash } => { info!(event = ?event); @@ -110,15 +120,13 @@ impl Storage { warn!(action = "skipped_absent_hash", hash = ?hash); continue; }; - storage_clone.process_remote_entry(entry).await?; - storage_clone.process_sinks(entry).await; - storage_clone.retain_blob_if_needed(entry).await?; + let key = std::str::from_utf8(entry.key()).map_err(Error::incorrect_key)?; + storage_clone.process_remote_entry(key, entry).await?; + storage_clone.process_sinks(key).await?; + storage_clone.retain_blob_if_needed(key, entry.content_hash()).await?; } _ => {} }; - } else { - return Ok::<(), Error>(()) - } } } } @@ -126,12 +134,12 @@ impl Storage { .instrument(info_span!(parent: None, "fs_sync", table_id = iroh_doc.id().to_string())) }); - let fs_storage_clone = storage.clone(); + let storage_clone = storage.clone(); if storage_config.is_import_missing_enabled { task_tracker.spawn(async move { let all_keys: Arc> = Arc::new( - fs_storage_clone + storage_clone .iroh_doc() .get_many(Query::all()) .await @@ -142,14 +150,14 @@ impl Storage { ); let pool = Arc::new(Pool::bounded(16)); - for fs_shard in fs_storage_clone.fs_shards.values() { - let fs_storage_clone = fs_storage_clone.clone(); - let fs_shard = fs_shard.clone(); + for shard in storage_clone.shards.values() { + let storage_clone = storage_clone.clone(); + let shard = shard.clone(); let all_keys = all_keys.clone(); let pool = pool.clone(); let cancellation_token = cancellation_token.clone(); tokio::spawn(async move { - let base_path = fs_shard.path().to_path_buf(); + let base_path = shard.path().to_path_buf(); let mut read_dir_stream = tokio::fs::read_dir(&base_path) .await .map_err(Error::io_error)?; @@ -168,19 +176,24 @@ impl Storage { continue; } pool.spawn({ - let iroh_doc = fs_storage_clone.iroh_doc().clone(); + let iroh_doc = storage_clone.iroh_doc().clone(); async move { - let import_progress = iroh_doc + let import_progress = match iroh_doc .import_file( - fs_storage_clone.author_id, + storage_clone.author_id, key, &entry.path(), true, ) .await - .map_err(Error::doc) - .unwrap(); - if let Err(error) = import_progress.finish().await.map_err(Error::hash) { + .map_err(Error::doc) { + Ok(import_progress) => import_progress, + Err(error) => { + error!(error = ?error, path = ?entry.path(), key = ?entry.file_name(), "import_progress_error"); + return; + } + }; + if let Err(error) = import_progress.finish().await.map_err(Error::storage) { error!(error = ?error, path = ?entry.path(), key = ?entry.file_name(), "import_progress_error"); } info!(action = "imported", key = ?entry.file_name()) @@ -188,7 +201,7 @@ impl Storage { .instrument(info_span!(parent: None, "restore")) }) .await - .unwrap(); + .map_err(Error::io_error)?; } else { return Ok::<(), Error>(()) } @@ -203,57 +216,46 @@ impl Storage { Ok(storage) } - fn get_path(&self, key: &str) -> Result { + fn get_path(&self, key: &str) -> PathBuf { if let Some(file_shard_config) = self.hash_ring.range(key, 1).into_iter().next() { - let file_shard = &self.fs_shards[&file_shard_config.name]; - return Ok(file_shard.get_path_for(key)); + let file_shard = &self.shards[&file_shard_config.name]; + return file_shard.get_path_for(key); } - Err(Error::storage("missing file shards")) + unreachable!() } - async fn process_sinks(&self, entry: &Entry) { - let key = std::str::from_utf8(bytes_to_key(entry.key())).unwrap(); - let file_shard_path = self.get_path(key).unwrap(); + async fn process_sinks(&self, key: &str) -> Result<()> { + let shard_path = self.get_path(key); for sink in &self.sinks { - if let Err(error) = sink.send(entry.key(), &file_shard_path).await { + if let Err(error) = sink.send(key, &shard_path).await { warn!(error = ?error); continue; } info!(action = "send", sink = sink.name(), key = ?key); } + Ok(()) } - async fn process_remote_entry(&self, entry: &Entry) -> Result> { - let key = std::str::from_utf8(bytes_to_key(entry.key())).unwrap(); + async fn process_remote_entry(&self, key: &str, entry: &Entry) -> Result> { if entry.content_len() == 0 { self.delete_from_fs(key).await?; Ok(None) } else { - let file_shard_path = self.get_path(key).unwrap(); + let shard_path = self.get_path(key); self.iroh_doc() - .export_file( - entry.clone(), - file_shard_path.clone(), - ExportMode::TryReference, - ) + .export_file(entry.clone(), shard_path.clone(), ExportMode::TryReference) .await .map_err(Error::storage)? .finish() .await .map_err(Error::storage)?; - Ok(Some(file_shard_path)) + Ok(Some(shard_path)) } } - async fn retain_blob_if_needed(&self, entry: &Entry) -> Result<()> { + async fn retain_blob_if_needed(&self, key: &str, hash: Hash) -> Result<()> { if !self.keep_blob { - let key = std::str::from_utf8(bytes_to_key(entry.key())).unwrap(); - if let Err(error) = self - .sync_client - .blobs - .delete_blob(entry.content_hash()) - .await - { + if let Err(error) = self.sync_client.blobs.delete_blob(hash).await { warn!(error = ?error); } self.delete_from_fs(key).await?; @@ -264,7 +266,7 @@ impl Storage { pub async fn delete_from_fs(&self, key: &str) -> Result<()> { info!("delete_from_fs {:?} {:?}", self.iroh_doc().id(), key); if let Some(file_shard_config) = self.hash_ring.range(key, 1).into_iter().next() { - self.fs_shards[&file_shard_config.name] + self.shards[&file_shard_config.name] .delete(key) .await .map_err(Error::io_error)?; @@ -285,7 +287,7 @@ impl Storage { pub async fn insert(&self, key: &str, value: S) -> Result { if let Some(file_shard_config) = self.hash_ring.range(key, 1).into_iter().next() { - let file_shard = &self.fs_shards[&file_shard_config.name]; + let file_shard = &self.shards[&file_shard_config.name]; let data_path = file_shard .insert(key, value) .await @@ -295,7 +297,7 @@ impl Storage { .import_file(self.author_id, key_to_bytes(key), &data_path, true) .await .map_err(Error::doc)?; - return Ok(import_progress.finish().await.map_err(Error::hash)?.hash); + return Ok(import_progress.finish().await.map_err(Error::storage)?.hash); } Err(Error::FileShard { description: "missing file shards".to_string(), @@ -304,7 +306,7 @@ impl Storage { pub async fn exists(&self, key: &str) -> Result> { for file_shard_config in self.hash_ring.range(key, 1) { - let file_shard = &self.fs_shards[&file_shard_config.name]; + let file_shard = &self.shards[&file_shard_config.name]; if file_shard .exists(key) .await @@ -337,9 +339,9 @@ impl Storage { ))) } None => { - for file_shard_config in self.hash_ring.range(key, 1) { - let file_shard = &self.fs_shards[&file_shard_config.name]; - return match file_shard.open_store(key).await { + if let Some(shard_config) = self.hash_ring.range(key, 1).into_iter().next() { + let shard = &self.shards[&shard_config.name]; + return match shard.open_store(key).await { Ok(Some(file)) => Ok(Some(Box::new(file))), Ok(None) => Ok(None), Err(e) => Err(Error::io_error(e)), @@ -375,6 +377,6 @@ impl Storage { self.iroh_doc .set_hash(self.author_id, key_to_bytes(key), hash, size) .await - .map_err(Error::hash) + .map_err(Error::storage) } } diff --git a/src/utils.rs b/src/utils.rs index d71d045..5eda6fb 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -13,8 +13,3 @@ pub const FRAGMENT: &AsciiSet = &CONTROLS pub fn key_to_bytes(key: &str) -> bytes::Bytes { bytes::Bytes::copy_from_slice(key.as_bytes()) } - -#[inline] -pub fn bytes_to_key(b: &[u8]) -> &[u8] { - b -}