From d4b30ba46eeb64b0f5dfaf17d493a1af1f9004d7 Mon Sep 17 00:00:00 2001 From: Pasha Date: Fri, 16 Feb 2024 18:36:18 +0300 Subject: [PATCH] [feat] Development --- Cargo.lock | 14 +++++++------- Cargo.toml | 2 +- bin/main.rs | 10 +++++++--- examples/config.yaml | 16 +++++++++------- src/storage.rs | 33 ++++++++++++++++++++------------- 5 files changed, 44 insertions(+), 31 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a820d3a..17016c8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2386,7 +2386,7 @@ checksum = "8f518f335dce6725a761382244631d86cf0ccb2863413590b31338feb467f9c3" [[package]] name = "iroh" version = "0.12.0" -source = "git+https://github.com/n0-computer/iroh?branch=main#983edcc0910d55035205759107fc3b318243480e" +source = "git+https://github.com/izihawa/iroh?branch=main#224095304067518b3fe886c8ec118089214fd24a" dependencies = [ "anyhow", "bao-tree", @@ -2445,7 +2445,7 @@ dependencies = [ [[package]] name = "iroh-base" version = "0.12.0" -source = "git+https://github.com/n0-computer/iroh?branch=main#983edcc0910d55035205759107fc3b318243480e" +source = "git+https://github.com/izihawa/iroh?branch=main#224095304067518b3fe886c8ec118089214fd24a" dependencies = [ "anyhow", "bao-tree", @@ -2474,7 +2474,7 @@ dependencies = [ [[package]] name = "iroh-bytes" version = "0.12.0" -source = "git+https://github.com/n0-computer/iroh?branch=main#983edcc0910d55035205759107fc3b318243480e" +source = "git+https://github.com/izihawa/iroh?branch=main#224095304067518b3fe886c8ec118089214fd24a" dependencies = [ "anyhow", "bao-tree", @@ -2512,7 +2512,7 @@ dependencies = [ [[package]] name = "iroh-gossip" version = "0.12.0" -source = "git+https://github.com/n0-computer/iroh?branch=main#983edcc0910d55035205759107fc3b318243480e" +source = "git+https://github.com/izihawa/iroh?branch=main#224095304067518b3fe886c8ec118089214fd24a" dependencies = [ "anyhow", "bytes", @@ -2566,7 +2566,7 @@ dependencies = [ [[package]] name = "iroh-metrics" version = "0.12.0" -source = "git+https://github.com/n0-computer/iroh?branch=main#983edcc0910d55035205759107fc3b318243480e" +source = "git+https://github.com/izihawa/iroh?branch=main#224095304067518b3fe886c8ec118089214fd24a" dependencies = [ "anyhow", "erased_set", @@ -2586,7 +2586,7 @@ dependencies = [ [[package]] name = "iroh-net" version = "0.12.0" -source = "git+https://github.com/n0-computer/iroh?branch=main#983edcc0910d55035205759107fc3b318243480e" +source = "git+https://github.com/izihawa/iroh?branch=main#224095304067518b3fe886c8ec118089214fd24a" dependencies = [ "aead", "anyhow", @@ -2661,7 +2661,7 @@ dependencies = [ [[package]] name = "iroh-sync" version = "0.12.0" -source = "git+https://github.com/n0-computer/iroh?branch=main#983edcc0910d55035205759107fc3b318243480e" +source = "git+https://github.com/izihawa/iroh?branch=main#224095304067518b3fe886c8ec118089214fd24a" dependencies = [ "anyhow", "bytes", diff --git a/Cargo.toml b/Cargo.toml index 349d54b..9d89496 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,7 +20,7 @@ aws-credential-types = { version = "1.0", features = ["hardcoded-credentials"] } axum = "0.7" bisection = "0.1" clap = { version = "4.4", features = ["derive"] } -iroh = { version = "0.12.0", branch = "main", git = "https://github.com/n0-computer/iroh", features = [ "flat-db", "metrics" ] } +iroh = { version = "0.12.0", branch = "main", git = "https://github.com/izihawa/iroh", features = [ "flat-db", "metrics" ] } iroh-io = { version = "0.3.0" } md5 = "0.7" percent-encoding = "2.3.1" diff --git a/bin/main.rs b/bin/main.rs index 4c69758..c5c70ae 100644 --- a/bin/main.rs +++ b/bin/main.rs @@ -21,7 +21,7 @@ use tokio_util::io::{ReaderStream, StreamReader}; use tokio_util::sync::CancellationToken; use tokio_util::task::TaskTracker; use tower_http::trace::{self, TraceLayer}; -use tracing::{info, Level}; +use tracing::{info, info_span, Instrument, Level}; use tracing_subscriber::EnvFilter; use trident_storage::config::{load_config, save_config, Config, SinkConfig, TableConfig}; use trident_storage::error::Error; @@ -147,7 +147,10 @@ async fn app() -> Result<(), Error> { .unwrap(); axum::serve(listener, app) - .with_graceful_shutdown(shutdown_signal(cancellation_token, task_tracker)) + .with_graceful_shutdown( + shutdown_signal(cancellation_token, task_tracker) + .instrument(info_span!(parent: None, "shutdown_signal")), + ) .await .unwrap(); Ok(()) @@ -175,8 +178,9 @@ async fn shutdown_signal(cancelation_token: CancellationToken, task_tracker: Tas _ = ctrl_c => {}, _ = terminate => {}, } - info!("received_signal"); + task_tracker.close(); cancelation_token.cancel(); + info!(tasks = task_tracker.len(), "stopping_tasks"); task_tracker.wait().await; info!("stopped_tasks"); } diff --git a/examples/config.yaml b/examples/config.yaml index 177473f..c15e722 100644 --- a/examples/config.yaml +++ b/examples/config.yaml @@ -1,8 +1,9 @@ http: endpoint: 0.0.0.0:80 iroh: + author: h3fjmgdtvchk7sl4zatfuamzzjo2ms75pg3d6xamv6pub7cj4osa tables: {} - path: /trident/iroh + path: /tmp/trident/iroh bind_port: 11204 rpc_port: 4919 max_rpc_connections: 32 @@ -12,10 +13,11 @@ iroh: default: replicas: 1 fs_shards: - - name: shard1 - path: /trident/data/shard1 - weight: 1 - - name: shard2 - path: /trident/data/shard1 - weight: 1 + - name: shard1 + path: /tmp/trident/data/shard1 + weight: 1 + - name: shard2 + path: /tmp/trident/data/shard1 + weight: 1 + is_import_missing_enabled: false gc_interval_secs: 86400 diff --git a/src/storage.rs b/src/storage.rs index 5db73c8..9032a7b 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -18,6 +18,7 @@ use std::collections::{HashMap, HashSet}; use std::num::NonZeroUsize; use std::path::PathBuf; use std::sync::Arc; +use iroh::bytes::store::ExportMode; use tokio::io::AsyncRead; use tokio_task_pool::Pool; use tokio_util::bytes; @@ -76,7 +77,10 @@ impl Storage { info!("started"); loop { tokio::select! { - _ = cancellation_token.cancelled() => return Ok::<(), Error>(()), + _ = cancellation_token.cancelled() => { + info!("cancel"); + return Ok::<(), Error>(()) + }, event = stream.next() => { if let Some(event) = event { let event = event.unwrap(); @@ -92,14 +96,7 @@ impl Storage { storage_clone.process_remote_entry(entry).await?; storage_clone.process_sinks(entry).await; } - ContentStatus::Missing => { - if entry.content_len() > 0 { - wait_list.put(entry.content_hash(), entry.clone()); - } else { - storage_clone.process_remote_entry(entry).await?; - } - } - ContentStatus::Incomplete => { + ContentStatus::Missing | ContentStatus::Incomplete => { wait_list.put(entry.content_hash(), entry.clone()); } }; @@ -115,7 +112,7 @@ impl Storage { }; storage_clone.process_remote_entry(entry).await?; storage_clone.process_sinks(entry).await; - storage_clone.retain_blob_if_needed(entry).await; + storage_clone.retain_blob_if_needed(entry).await?; } _ => {} }; @@ -159,7 +156,10 @@ impl Storage { loop { tokio::select! { - _ = cancellation_token.cancelled() => return Ok::<(), Error>(()), + _ = cancellation_token.cancelled() => { + info!("cancel"); + return Ok::<(), Error>(()) + }, entry = read_dir_stream.next_entry() => { let entry = entry.map_err(Error::io_error)?; if let Some(entry) = entry { @@ -231,7 +231,11 @@ impl Storage { } else { let file_shard_path = self.get_path(key).unwrap(); self.iroh_doc() - .export_file(entry.clone(), file_shard_path.clone()) + .export_file( + entry.clone(), + file_shard_path.clone(), + ExportMode::TryReference, + ) .await .map_err(Error::storage)? .finish() @@ -241,8 +245,9 @@ impl Storage { } } - async fn retain_blob_if_needed(&self, entry: &Entry) { + async fn retain_blob_if_needed(&self, entry: &Entry) -> Result<()> { if !self.keep_blob { + let key = std::str::from_utf8(bytes_to_key(entry.key())).unwrap(); if let Err(error) = self .sync_client .blobs @@ -251,7 +256,9 @@ impl Storage { { warn!(error = ?error); } + self.delete_from_fs(key).await?; } + Ok(()) } pub async fn delete_from_fs(&self, key: &str) -> Result<()> {