Skip to content

Commit

Permalink
[feat] Development
Browse files Browse the repository at this point in the history
  • Loading branch information
ppodolsky committed Jan 25, 2024
1 parent a4c60bd commit adaa185
Show file tree
Hide file tree
Showing 14 changed files with 345 additions and 250 deletions.
145 changes: 89 additions & 56 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ aws-credential-types = { version = "1.0", features = ["hardcoded-credentials"] }
axum = "0.7"
bisection = "0.1"
clap = { version = "4.4", features = ["derive"] }
iroh = { version = "0.12.0", git = "https://github.com/n0-computer/iroh", features = [ "flat-db", "metrics" ] }
iroh = { version = "0.12.0", branch = "perf-startup", git = "https://github.com/n0-computer/iroh", features = [ "flat-db", "metrics" ] }
iroh-io = { version = "0.3.0" }
md5 = "0.7"
percent-encoding = "2.3.1"
Expand All @@ -36,3 +36,5 @@ tower-http = { version = "0.5", features = ["trace"] }
tracing = "0.1"
tracing-subscriber = "0.3"
futures = "0.3"
lru = "0.12.1"
tokio-task-pool = "0.1.5"
9 changes: 5 additions & 4 deletions examples/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ iroh:
path: /trident/iroh
bind_port: 11204
rpc_port: 4919
max_rpc_connections: 16
max_rpc_connections: 32
max_rpc_streams: 1024
sinks: {}
fs_storages:
Expand All @@ -14,7 +14,8 @@ iroh:
fs_shards:
- name: shard1
path: /trident/data/shard1
weight: 20
weight: 1
- name: shard2
path: /trident/data/shard2
weight: 20
path: /trident/data/shard1
weight: 1
gc_interval_secs: 86400
1 change: 1 addition & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ pub struct IrohConfig {
pub sinks: HashMap<String, SinkConfig>,
#[serde(default = "HashMap::new")]
pub fs_storages: HashMap<String, FSStorageEngineConfig>,
pub gc_interval_secs: Option<u64>,
}

#[derive(Clone, Debug, Serialize, Deserialize)]
Expand Down
147 changes: 90 additions & 57 deletions src/iroh_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@ use crate::storages::{Storage, StorageEngine};
use crate::utils::bytes_to_key;
use crate::IrohClient;
use async_stream::stream;
use futures::StreamExt;
use iroh::bytes::Hash;
use iroh::client::quic::RPC_ALPN;
use iroh::net::derp::DerpMode;
use iroh::node::Node;
use iroh::node::{GcPolicy, Node};
use iroh::rpc_protocol::{ProviderRequest, ProviderResponse, ShareMode};
use iroh::sync::{AuthorId, NamespaceId};
use iroh::ticket::DocTicket;
Expand All @@ -24,9 +25,11 @@ use std::collections::HashMap;
use std::net::{Ipv4Addr, SocketAddrV4};
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use tokio::io::AsyncRead;
use tokio::sync::RwLock;
use tokio_stream::Stream;
use tracing::info;

pub struct IrohNode {
sync_client: IrohClient,
Expand Down Expand Up @@ -81,18 +84,32 @@ impl IrohNode {
QuinnServerEndpoint::<ProviderRequest, ProviderResponse>::new(rpc_quinn_endpoint)
.map_err(Error::node_create)?;

let node = Node::builder(db, docs)
let mut node_builder = Node::builder(db, docs)
.secret_key(secret_key)
.peers_data_path(peer_data_path)
.derp_mode(DerpMode::Default)
.bind_port(config_lock.iroh.bind_port)
.rpc_endpoint(rpc_endpoint)
.spawn()
.await
.map_err(Error::node_create)?;
.rpc_endpoint(rpc_endpoint);

if let Some(gc_interval_secs) = config_lock.iroh.gc_interval_secs {
node_builder =
node_builder.gc_policy(GcPolicy::Interval(Duration::from_secs(gc_interval_secs)))
}
let node = node_builder.spawn().await.map_err(Error::node_create)?;
let sync_client = node.client();

for doc in sync_client
.docs
.list()
.await
.unwrap()
.map(|x| x.unwrap().0)
.collect::<Vec<_>>()
.await
{
info!(action = "all_docs", "{}", doc);
}

let author_id = match &config_lock.iroh.author {
Some(author) => AuthorId::from_str(author).unwrap(),
None => {
Expand Down Expand Up @@ -135,19 +152,23 @@ impl IrohNode {
.await?,
),
};
let mirroring = table_config.mirroring.clone().map(|mirroring_config| {
Mirroring::new(
iroh_doc.clone(),
mirroring_config
.sinks
.clone()
.into_iter()
.map(|sink_name| sinks[&sink_name].clone())
.collect(),
sync_client.clone(),
mirroring_config.delete_after_mirroring,
)
});
let mirroring = match &table_config.mirroring {
Some(mirroring_config) => Some(
Mirroring::new(
iroh_doc.clone(),
mirroring_config
.sinks
.clone()
.into_iter()
.map(|sink_name| sinks[&sink_name].clone())
.collect(),
sync_client.clone(),
mirroring_config.delete_after_mirroring,
)
.await?,
),
None => None,
};
let storage = Storage::new(storage_engine, mirroring);
table_storages.insert(table_name.clone(), storage);
}
Expand Down Expand Up @@ -222,19 +243,23 @@ impl IrohNode {
StorageEngineConfig::FS(storage_name.to_string()),
),
};
let mirroring = mirroring_config.clone().map(|mirroring_config| {
Mirroring::new(
iroh_doc.clone(),
mirroring_config
.sinks
.clone()
.into_iter()
.map(|sink_name| self.sinks[&sink_name].clone())
.collect(),
self.sync_client.clone(),
mirroring_config.delete_after_mirroring,
)
});
let mirroring = match &mirroring_config {
Some(mirroring_config) => Some(
Mirroring::new(
iroh_doc.clone(),
mirroring_config
.sinks
.clone()
.into_iter()
.map(|sink_name| self.sinks[&sink_name].clone())
.collect(),
self.sync_client.clone(),
mirroring_config.delete_after_mirroring,
)
.await?,
),
None => None,
};
let storage = Storage::new(storage_engine, mirroring);
entry.insert(storage);
self.config.write().await.iroh.tables.insert(
Expand Down Expand Up @@ -287,20 +312,23 @@ impl IrohNode {
StorageEngineConfig::FS(storage_name.to_string()),
),
};

let mirroring = mirroring_config.clone().map(|mirroring_config| {
Mirroring::new(
iroh_doc.clone(),
mirroring_config
.sinks
.clone()
.into_iter()
.map(|sink_name| self.sinks[&sink_name].clone())
.collect(),
self.sync_client.clone(),
mirroring_config.delete_after_mirroring,
)
});
let mirroring = match &mirroring_config {
Some(mirroring_config) => Some(
Mirroring::new(
iroh_doc.clone(),
mirroring_config
.sinks
.clone()
.into_iter()
.map(|sink_name| self.sinks[&sink_name].clone())
.collect(),
self.sync_client.clone(),
mirroring_config.delete_after_mirroring,
)
.await?,
),
None => None,
};
let storage = Storage::new(storage_engine, mirroring);

entry.insert(storage);
Expand All @@ -321,14 +349,21 @@ impl IrohNode {
pub async fn tables_drop(&mut self, table_name: &str) -> Result<()> {
match self.table_storages.remove(table_name) {
None => Err(Error::missing_table(table_name)),
Some(_) => Ok(self
.config
.write()
.await
.iroh
.tables
.remove(table_name)
.unwrap()),
Some(table_storage) => {
self.sync_client
.docs
.drop_doc(table_storage.iroh_doc().id())
.await
.map_err(Error::doc)?;
Ok(self
.config
.write()
.await
.iroh
.tables
.remove(table_name)
.unwrap())
}
}?;
Ok(())
}
Expand Down Expand Up @@ -361,9 +396,7 @@ impl IrohNode {
let Some((from_hash, from_size)) = from_table.get_hash(from_key).await? else {
return Err(Error::missing_key(from_key));
};
to_table
.insert_hash(to_key, from_hash.clone(), from_size)
.await?;
to_table.insert_hash(to_key, from_hash, from_size).await?;
Ok(from_hash)
}

Expand Down
27 changes: 15 additions & 12 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
#![warn(
clippy::all,
clippy::restriction,
clippy::pedantic,
clippy::nursery,
clippy::cargo
)]
#![allow(
clippy::implicit_return,
clippy::missing_docs_in_private_items,
clippy::multiple_unsafe_ops_per_block,
clippy::question_mark_used
#![deny(
non_shorthand_field_patterns,
no_mangle_generic_items,
overflowing_literals,
path_statements,
unused_allocation,
unused_comparisons,
unused_parens,
while_true,
trivial_numeric_casts,
unused_extern_crates,
unused_import_braces,
unused_qualifications,
unused_must_use,
clippy::unwrap_used
)]

use iroh::rpc_protocol::{ProviderRequest, ProviderResponse};
Expand Down
12 changes: 7 additions & 5 deletions src/sinks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ use aws_sdk_s3::config::{BehaviorVersion, Region};
use aws_sdk_s3::primitives::ByteStream;
use axum::async_trait;
use percent_encoding::utf8_percent_encode;
use tokio_util::bytes;

#[async_trait]
pub trait Sink: Send + Sync {
fn name(&self) -> &str;
async fn send(&self, key: &[u8], value: Vec<u8>) -> Result<(), Error>;
async fn send(&self, key: &[u8], value: bytes::Bytes) -> Result<(), Error>;
}

pub struct S3Sink {
Expand Down Expand Up @@ -56,12 +57,13 @@ impl Sink for S3Sink {
&self.name
}

async fn send(&self, key: &[u8], value: Vec<u8>) -> Result<(), Error> {
async fn send(&self, key: &[u8], value: bytes::Bytes) -> Result<(), Error> {
// ToDo: Remove allocating and return stream
// https://github.com/awslabs/aws-sdk-rust/discussions/361
let encoded_key = utf8_percent_encode(std::str::from_utf8(bytes_to_key(key)).unwrap(), FRAGMENT)
.collect::<String>()
.to_lowercase();
let encoded_key =
utf8_percent_encode(std::str::from_utf8(bytes_to_key(key)).unwrap(), FRAGMENT)
.collect::<String>()
.to_lowercase();

self.client
.put_object()
Expand Down
Loading

0 comments on commit adaa185

Please sign in to comment.