Skip to content

Commit

Permalink
[feat] Development
Browse files Browse the repository at this point in the history
  • Loading branch information
ppodolsky committed Feb 15, 2024
1 parent 6d78693 commit c7165b9
Show file tree
Hide file tree
Showing 15 changed files with 850 additions and 901 deletions.
356 changes: 201 additions & 155 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,4 @@ tracing-subscriber = "0.3"
futures = "0.3"
lru = "0.12.1"
tokio-task-pool = "0.1.5"
reqwest = { version = "0.11.24", default-features = false, features = ["rustls", "multipart"] }
90 changes: 73 additions & 17 deletions bin/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,23 @@ use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::runtime::Builder;
use tokio::signal;
use tokio::sync::RwLock;
use tokio_util::io::{ReaderStream, StreamReader};
use tokio_util::sync::CancellationToken;
use tokio_util::task::TaskTracker;
use tower_http::trace::{self, TraceLayer};
use tracing::Level;
use tracing::{info, Level};
use tracing_subscriber::EnvFilter;
use trident_storage::config::{
load_config, save_config, Config, MirroringConfig, SinkConfig, TableConfig,
};
use trident_storage::config::{load_config, save_config, Config, SinkConfig, TableConfig};
use trident_storage::error::Error;

use trident_storage::iroh_node::IrohNode;

fn return_true() -> bool {
true
}

/// Simple program to greet a person
#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
Expand All @@ -38,16 +43,22 @@ struct Args {

#[derive(Deserialize)]
struct TablesCreateRequest {
storage: Option<String>,
mirroring: Option<MirroringConfig>,
storage: String,
#[serde(default)]
sinks: Vec<String>,
#[serde(default = "return_true")]
keep_blob: bool,
}

#[derive(Deserialize)]
struct TablesImportRequest {
ticket: String,
download_policy: DownloadPolicy,
storage: Option<String>,
mirroring: Option<MirroringConfig>,
storage: String,
#[serde(default)]
sinks: Vec<String>,
#[serde(default = "return_true")]
keep_blob: bool,
}

#[derive(Deserialize)]
Expand Down Expand Up @@ -83,10 +94,16 @@ struct TablesLsResponse {
pub tables: HashMap<String, TableConfig>,
}

async fn create_app(args: Args) -> Result<AppState, Error> {
async fn create_app(
args: Args,
cancellation_token: CancellationToken,
task_tracker: TaskTracker,
) -> Result<AppState, Error> {
let config = Arc::new(RwLock::new(load_config(&args.config).await?));
let state = AppState {
iroh_node: Arc::new(RwLock::new(IrohNode::new(config.clone()).await?)),
iroh_node: Arc::new(RwLock::new(
IrohNode::new(config.clone(), cancellation_token, task_tracker).await?,
)),
config: config.clone(),
config_path: args.config.clone(),
};
Expand All @@ -96,8 +113,10 @@ async fn create_app(args: Args) -> Result<AppState, Error> {

async fn app() -> Result<(), Error> {
let args = Args::parse();
let cancellation_token = CancellationToken::new();
let task_tracker = TaskTracker::new();

let state = create_app(args).await?;
let state = create_app(args, cancellation_token.clone(), task_tracker.clone()).await?;
let config = state.config.clone();

// build our application with a route
Expand Down Expand Up @@ -127,10 +146,41 @@ async fn app() -> Result<(), Error> {
.await
.unwrap();

axum::serve(listener, app).await.unwrap();
axum::serve(listener, app)
.with_graceful_shutdown(shutdown_signal(cancellation_token, task_tracker))
.await
.unwrap();
Ok(())
}

async fn shutdown_signal(cancelation_token: CancellationToken, task_tracker: TaskTracker) {
let ctrl_c = async {
signal::ctrl_c()
.await
.expect("failed to install Ctrl+C handler");
};

#[cfg(unix)]
let terminate = async {
signal::unix::signal(signal::unix::SignalKind::terminate())
.expect("failed to install signal handler")
.recv()
.await;
};

#[cfg(not(unix))]
let terminate = std::future::pending::<()>();

tokio::select! {
_ = ctrl_c => {},
_ = terminate => {},
}
info!("received_signal");
cancelation_token.cancel();
task_tracker.wait().await;
info!("stopped_tasks");
}

fn main() -> Result<(), Error> {
// initialize tracing
tracing_subscriber::fmt()
Expand Down Expand Up @@ -187,8 +237,9 @@ async fn tables_create(
.await
.tables_create(
&table,
tables_create_request.storage.as_deref(),
tables_create_request.mirroring,
&tables_create_request.storage,
tables_create_request.sinks,
tables_create_request.keep_blob,
)
.await
{
Expand Down Expand Up @@ -217,8 +268,9 @@ async fn tables_import(
&table,
&tables_import_request.ticket,
tables_import_request.download_policy,
tables_import_request.storage.as_deref(),
tables_import_request.mirroring,
&tables_import_request.storage,
tables_import_request.sinks,
tables_import_request.keep_blob,
)
.await
{
Expand Down Expand Up @@ -313,9 +365,13 @@ async fn table_get(
Path((table, key)): Path<(String, String)>,
) -> Response {
match state.iroh_node.read().await.table_get(&table, &key).await {
Ok(reader) => Response::builder()
Ok(Some(reader)) => Response::builder()
.body(Body::from_stream(ReaderStream::new(reader)))
.unwrap(),
Ok(None) => Response::builder()
.status(StatusCode::NOT_FOUND)
.body(Body::default())
.unwrap(),
Err(e) => e.into_response(),
}
}
Expand Down
38 changes: 18 additions & 20 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,20 @@ use std::path::PathBuf;
fn return_false() -> bool {
false
}

#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
pub enum StorageEngineConfig {
FS(String),
Iroh,
fn return_true() -> bool {
true
}

#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
pub struct FSStorageEngineConfig {
pub struct StorageEngineConfig {
pub replicas: u8,
pub fs_shards: Vec<FSShardConfig>,
pub fs_shards: Vec<ShardConfig>,
#[serde(default = "return_false")]
pub is_import_missing_enabled: bool,
}

#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
pub struct FSShardConfig {
pub struct ShardConfig {
pub name: String,
pub path: PathBuf,
pub weight: usize,
Expand All @@ -44,27 +41,28 @@ pub struct S3Config {
}

#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum SinkConfig {
S3(S3Config),
pub struct IpfsConfig {
pub api_base_url: String,
pub in_place: bool,
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct MirroringConfig {
#[serde(default = "Vec::new")]
pub sinks: Vec<String>,
#[serde(default = "return_false")]
pub delete_after_mirroring: bool,
#[serde(rename_all = "snake_case")]
pub enum SinkConfig {
S3(S3Config),
Ipfs(IpfsConfig),
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct TableConfig {
pub id: String,
#[serde(default = "DownloadPolicy::default")]
pub download_policy: DownloadPolicy,
#[serde(skip_serializing_if = "Option::is_none")]
pub mirroring: Option<MirroringConfig>,
pub storage_engine: StorageEngineConfig,
#[serde(default = "Vec::new")]
pub sinks: Vec<String>,
pub storage_name: String,
#[serde(default = "return_true")]
pub keep_blob: bool,
}

#[derive(Clone, Debug, Serialize, Deserialize)]
Expand All @@ -85,7 +83,7 @@ pub struct IrohConfig {
#[serde(default = "HashMap::new")]
pub sinks: HashMap<String, SinkConfig>,
#[serde(default = "HashMap::new")]
pub fs_storages: HashMap<String, FSStorageEngineConfig>,
pub fs_storages: HashMap<String, StorageEngineConfig>,
pub gc_interval_secs: Option<u64>,
}

Expand Down
12 changes: 6 additions & 6 deletions src/hash_ring.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use crate::config::FSShardConfig;
use crate::config::ShardConfig;
use bisection::bisect_right;
use std::collections::HashMap;
use std::collections::{BinaryHeap, HashSet};
use std::hash::{Hash, Hasher};

impl Hash for FSShardConfig {
impl Hash for ShardConfig {
fn hash<H: Hasher>(&self, state: &mut H) {
self.path.hash(state)
}
Expand All @@ -13,12 +13,12 @@ impl Hash for FSShardConfig {
#[derive(Clone)]
pub struct HashRing {
v_nodes: usize,
ring: HashMap<u128, FSShardConfig>,
ring: HashMap<u128, ShardConfig>,
sorted_keys: Vec<u128>,
}

impl HashRing {
pub fn with_hasher<'a>(nodes: impl Iterator<Item = &'a FSShardConfig>) -> HashRing {
pub fn with_hasher<'a>(nodes: impl Iterator<Item = &'a ShardConfig>) -> HashRing {
let mut new_hash_ring = HashRing {
v_nodes: 160,
ring: HashMap::new(),
Expand All @@ -29,7 +29,7 @@ impl HashRing {
}

/// Adds a node to the hash ring
pub fn add_nodes<'a>(&mut self, nodes: impl Iterator<Item = &'a FSShardConfig>) {
pub fn add_nodes<'a>(&mut self, nodes: impl Iterator<Item = &'a ShardConfig>) {
for node in nodes {
for i in 0..self.v_nodes * node.weight {
let node_name = format!("{}-{}", &node.name, i);
Expand All @@ -41,7 +41,7 @@ impl HashRing {
self.sorted_keys = BinaryHeap::from(self.sorted_keys.clone()).into_sorted_vec();
}

pub fn range(&self, key: &str, size: usize) -> Vec<&FSShardConfig> {
pub fn range(&self, key: &str, size: usize) -> Vec<&ShardConfig> {
let mut result = Vec::with_capacity(size);
let mut visited = HashSet::new();
let position = if let Some(position) = self.get_pos(key) {
Expand Down
Loading

0 comments on commit c7165b9

Please sign in to comment.