Skip to content

Commit

Permalink
[feat] Development
Browse files Browse the repository at this point in the history
  • Loading branch information
ppodolsky committed Feb 16, 2024
1 parent c7165b9 commit d4b30ba
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 31 deletions.
14 changes: 7 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ aws-credential-types = { version = "1.0", features = ["hardcoded-credentials"] }
axum = "0.7"
bisection = "0.1"
clap = { version = "4.4", features = ["derive"] }
iroh = { version = "0.12.0", branch = "main", git = "https://github.com/n0-computer/iroh", features = [ "flat-db", "metrics" ] }
iroh = { version = "0.12.0", branch = "main", git = "https://github.com/izihawa/iroh", features = [ "flat-db", "metrics" ] }
iroh-io = { version = "0.3.0" }
md5 = "0.7"
percent-encoding = "2.3.1"
Expand Down
10 changes: 7 additions & 3 deletions bin/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use tokio_util::io::{ReaderStream, StreamReader};
use tokio_util::sync::CancellationToken;
use tokio_util::task::TaskTracker;
use tower_http::trace::{self, TraceLayer};
use tracing::{info, Level};
use tracing::{info, info_span, Instrument, Level};
use tracing_subscriber::EnvFilter;
use trident_storage::config::{load_config, save_config, Config, SinkConfig, TableConfig};
use trident_storage::error::Error;
Expand Down Expand Up @@ -147,7 +147,10 @@ async fn app() -> Result<(), Error> {
.unwrap();

axum::serve(listener, app)
.with_graceful_shutdown(shutdown_signal(cancellation_token, task_tracker))
.with_graceful_shutdown(
shutdown_signal(cancellation_token, task_tracker)
.instrument(info_span!(parent: None, "shutdown_signal")),
)
.await
.unwrap();
Ok(())
Expand Down Expand Up @@ -175,8 +178,9 @@ async fn shutdown_signal(cancelation_token: CancellationToken, task_tracker: Tas
_ = ctrl_c => {},
_ = terminate => {},
}
info!("received_signal");
task_tracker.close();
cancelation_token.cancel();
info!(tasks = task_tracker.len(), "stopping_tasks");
task_tracker.wait().await;
info!("stopped_tasks");
}
Expand Down
16 changes: 9 additions & 7 deletions examples/config.yaml
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
http:
endpoint: 0.0.0.0:80
iroh:
author: h3fjmgdtvchk7sl4zatfuamzzjo2ms75pg3d6xamv6pub7cj4osa
tables: {}
path: /trident/iroh
path: /tmp/trident/iroh
bind_port: 11204
rpc_port: 4919
max_rpc_connections: 32
Expand All @@ -12,10 +13,11 @@ iroh:
default:
replicas: 1
fs_shards:
- name: shard1
path: /trident/data/shard1
weight: 1
- name: shard2
path: /trident/data/shard1
weight: 1
- name: shard1
path: /tmp/trident/data/shard1
weight: 1
- name: shard2
path: /tmp/trident/data/shard1
weight: 1
is_import_missing_enabled: false
gc_interval_secs: 86400
33 changes: 20 additions & 13 deletions src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::collections::{HashMap, HashSet};
use std::num::NonZeroUsize;
use std::path::PathBuf;
use std::sync::Arc;
use iroh::bytes::store::ExportMode;
use tokio::io::AsyncRead;
use tokio_task_pool::Pool;
use tokio_util::bytes;
Expand Down Expand Up @@ -76,7 +77,10 @@ impl Storage {
info!("started");
loop {
tokio::select! {
_ = cancellation_token.cancelled() => return Ok::<(), Error>(()),
_ = cancellation_token.cancelled() => {
info!("cancel");
return Ok::<(), Error>(())
},
event = stream.next() => {
if let Some(event) = event {
let event = event.unwrap();
Expand All @@ -92,14 +96,7 @@ impl Storage {
storage_clone.process_remote_entry(entry).await?;
storage_clone.process_sinks(entry).await;
}
ContentStatus::Missing => {
if entry.content_len() > 0 {
wait_list.put(entry.content_hash(), entry.clone());
} else {
storage_clone.process_remote_entry(entry).await?;
}
}
ContentStatus::Incomplete => {
ContentStatus::Missing | ContentStatus::Incomplete => {
wait_list.put(entry.content_hash(), entry.clone());
}
};
Expand All @@ -115,7 +112,7 @@ impl Storage {
};
storage_clone.process_remote_entry(entry).await?;
storage_clone.process_sinks(entry).await;
storage_clone.retain_blob_if_needed(entry).await;
storage_clone.retain_blob_if_needed(entry).await?;
}
_ => {}
};
Expand Down Expand Up @@ -159,7 +156,10 @@ impl Storage {

loop {
tokio::select! {
_ = cancellation_token.cancelled() => return Ok::<(), Error>(()),
_ = cancellation_token.cancelled() => {
info!("cancel");
return Ok::<(), Error>(())
},
entry = read_dir_stream.next_entry() => {
let entry = entry.map_err(Error::io_error)?;
if let Some(entry) = entry {
Expand Down Expand Up @@ -231,7 +231,11 @@ impl Storage {
} else {
let file_shard_path = self.get_path(key).unwrap();
self.iroh_doc()
.export_file(entry.clone(), file_shard_path.clone())
.export_file(
entry.clone(),
file_shard_path.clone(),
ExportMode::TryReference,
)
.await
.map_err(Error::storage)?
.finish()
Expand All @@ -241,8 +245,9 @@ impl Storage {
}
}

async fn retain_blob_if_needed(&self, entry: &Entry) {
async fn retain_blob_if_needed(&self, entry: &Entry) -> Result<()> {
if !self.keep_blob {
let key = std::str::from_utf8(bytes_to_key(entry.key())).unwrap();
if let Err(error) = self
.sync_client
.blobs
Expand All @@ -251,7 +256,9 @@ impl Storage {
{
warn!(error = ?error);
}
self.delete_from_fs(key).await?;
}
Ok(())
}

pub async fn delete_from_fs(&self, key: &str) -> Result<()> {
Expand Down

0 comments on commit d4b30ba

Please sign in to comment.