diff --git a/Cargo.lock b/Cargo.lock index 7f36cb2..b8f1a46 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -234,9 +234,9 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" [[package]] name = "aws-config" -version = "1.1.2" +version = "1.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7e64b72d4bdbb41a73d27709c65a25b6e4bfc8321bf70fa3a8b19ce7d4eb81b0" +checksum = "91bb1df3e5f0e47475498570cdd10ab0370bc1d7af3151aa0161d5f5876b6908" dependencies = [ "aws-credential-types", "aws-http", @@ -265,9 +265,9 @@ dependencies = [ [[package]] name = "aws-credential-types" -version = "1.1.2" +version = "1.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4a7cb3510b95492bd9014b60e2e3bee3e48bc516e220316f8e6b60df18b47331" +checksum = "d67c6836a1009b23e3f4cd1457c83e0aa49a490d9c3033b53c3f7b8cf2facc0f" dependencies = [ "aws-smithy-async", "aws-smithy-runtime-api", @@ -277,9 +277,9 @@ dependencies = [ [[package]] name = "aws-http" -version = "0.60.2" +version = "0.60.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a95d41abe4e941399fdb4bc2f54713eac3c839d98151875948bb24e66ab658f2" +checksum = "a081f0c4576f7549dd255987d2d23920eccaa90cdd21d6440f91e0d7537f0e0d" dependencies = [ "aws-smithy-runtime-api", "aws-smithy-types", @@ -293,9 +293,9 @@ dependencies = [ [[package]] name = "aws-runtime" -version = "1.1.2" +version = "1.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "233cca219c6705d525ace011d6f9bc51aaf32fce5b4c41661d2d7ff22d9b4d49" +checksum = "ab7bf4b9b083e6dc86e2bb4fb09a4daca97e12cc0bc174a6f51fe624c23aa87f" dependencies = [ "aws-credential-types", "aws-http", @@ -315,9 +315,9 @@ dependencies = [ [[package]] name = "aws-sdk-s3" -version = "1.12.0" +version = "1.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "634fbe5b6591ee2e281cd2ba8641e9bd752dbf5bf338924d6ad4bd5a3304fe31" +checksum = "4f94d9842a304e066d1eddea61c703fb217ce5a1063cc087d07c8de1db4535f7" dependencies = [ "aws-credential-types", "aws-http", @@ -345,9 +345,9 @@ dependencies = [ [[package]] name = "aws-sdk-sso" -version = "1.10.0" +version = "1.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ee41005e0f3a19ae749c7953d9e1f1ef8d2183f76f64966e346fa41c1ba0ed44" +checksum = "865b1b85249196e4868a48a4b6683d60e19b82371b869297bc9982bc9f47de4b" dependencies = [ "aws-credential-types", "aws-http", @@ -368,9 +368,9 @@ dependencies = [ [[package]] name = "aws-sdk-ssooidc" -version = "1.10.0" +version = "1.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fa08168f8a27505e7b90f922c32a489feb1f2133878981a15138bebc849ac09c" +checksum = "beadaf5c48f5d923329ec87b7db6eea5e4ce2af4cc9f96d9f85d520f34573f40" dependencies = [ "aws-credential-types", "aws-http", @@ -391,9 +391,9 @@ dependencies = [ [[package]] name = "aws-sdk-sts" -version = "1.10.0" +version = "1.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "29102eff04d50ef70f11a48823db33e33c6cc5f027bfb6ff4864efbd5f1f66f3" +checksum = "262a6d40c8e11eea633ba541a39745cebfcee9c1683020cee63413048b5c6188" dependencies = [ "aws-credential-types", "aws-http", @@ -415,9 +415,9 @@ dependencies = [ [[package]] name = "aws-sigv4" -version = "1.1.2" +version = "1.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b92384b39aedb258aa734fe0e7b2ffcd13f33e68227251a72cd2635e0acc8f1a" +checksum = "511879249616f30e30fd2fa81edb4833784f65dd5d56053b7de2e2bcb583dda7" dependencies = [ "aws-credential-types", "aws-smithy-eventstream", @@ -430,6 +430,7 @@ dependencies = [ "hex", "hmac", "http 0.2.11", + "http 1.0.0", "once_cell", "p256 0.11.1", "percent-encoding", @@ -599,9 +600,9 @@ dependencies = [ [[package]] name = "aws-types" -version = "1.1.2" +version = "1.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8549aa62c5b7db5c57ab915200ee214b4f5d8f19b29a4a8fa0b3ad3bca1380e3" +checksum = "ee2739d97d47f47cdf0d27982019a405dcc736df25925d1a75049f1faa79df88" dependencies = [ "aws-credential-types", "aws-smithy-async", @@ -728,6 +729,15 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4c7f02d4ea65f2c1853089ffd8d2787bdbc63de2f0d29dedbcf8ccdfa0ccd4cf" +[[package]] +name = "base64" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b25d992356d2eb0ed82172f5248873db5560c4721f564b13cb5193bda5e668e" +dependencies = [ + "byteorder", +] + [[package]] name = "base64" version = "0.13.1" @@ -863,9 +873,9 @@ dependencies = [ [[package]] name = "chrono" -version = "0.4.31" +version = "0.4.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f2c685bad3eb3d45a01354cedb7d5faa66194d1d58ba6e267a8de788f79db38" +checksum = "41daef31d7a747c5c847246f36de49ced6f7403b4cdabc807a97b5cc184cda7a" dependencies = [ "android-tzdata", "iana-time-zone", @@ -873,7 +883,7 @@ dependencies = [ "num-traits", "serde", "wasm-bindgen", - "windows-targets 0.48.5", + "windows-targets 0.52.0", ] [[package]] @@ -2406,7 +2416,7 @@ checksum = "8f518f335dce6725a761382244631d86cf0ccb2863413590b31338feb467f9c3" [[package]] name = "iroh" version = "0.12.0" -source = "git+https://github.com/n0-computer/iroh#7965836513fcece047f54358d3ef11e8797f9d02" +source = "git+https://github.com/n0-computer/iroh?branch=perf-startup#4fbb3bf064ea51ef4127147aa6d70427da4ad4eb" dependencies = [ "anyhow", "bao-tree", @@ -2465,7 +2475,7 @@ dependencies = [ [[package]] name = "iroh-base" version = "0.12.0" -source = "git+https://github.com/n0-computer/iroh#7965836513fcece047f54358d3ef11e8797f9d02" +source = "git+https://github.com/n0-computer/iroh?branch=perf-startup#4fbb3bf064ea51ef4127147aa6d70427da4ad4eb" dependencies = [ "anyhow", "bao-tree", @@ -2473,6 +2483,7 @@ dependencies = [ "hex", "multibase", "postcard", + "redb", "serde", "serde-error", "thiserror", @@ -2494,7 +2505,7 @@ dependencies = [ [[package]] name = "iroh-bytes" version = "0.12.0" -source = "git+https://github.com/n0-computer/iroh#7965836513fcece047f54358d3ef11e8797f9d02" +source = "git+https://github.com/n0-computer/iroh?branch=perf-startup#4fbb3bf064ea51ef4127147aa6d70427da4ad4eb" dependencies = [ "anyhow", "bao-tree", @@ -2510,10 +2521,12 @@ dependencies = [ "iroh-io", "num_cpus", "once_cell", + "paths-as-strings", "postcard", "quinn", "rand", "range-collections", + "redb", "reflink-copy", "self_cell", "serde", @@ -2529,7 +2542,7 @@ dependencies = [ [[package]] name = "iroh-gossip" version = "0.12.0" -source = "git+https://github.com/n0-computer/iroh#7965836513fcece047f54358d3ef11e8797f9d02" +source = "git+https://github.com/n0-computer/iroh?branch=perf-startup#4fbb3bf064ea51ef4127147aa6d70427da4ad4eb" dependencies = [ "anyhow", "bytes", @@ -2570,7 +2583,7 @@ dependencies = [ [[package]] name = "iroh-metrics" version = "0.12.0" -source = "git+https://github.com/n0-computer/iroh#7965836513fcece047f54358d3ef11e8797f9d02" +source = "git+https://github.com/n0-computer/iroh?branch=perf-startup#4fbb3bf064ea51ef4127147aa6d70427da4ad4eb" dependencies = [ "anyhow", "erased_set", @@ -2590,7 +2603,7 @@ dependencies = [ [[package]] name = "iroh-net" version = "0.12.0" -source = "git+https://github.com/n0-computer/iroh#7965836513fcece047f54358d3ef11e8797f9d02" +source = "git+https://github.com/n0-computer/iroh?branch=perf-startup#4fbb3bf064ea51ef4127147aa6d70427da4ad4eb" dependencies = [ "aead", "anyhow", @@ -2665,7 +2678,7 @@ dependencies = [ [[package]] name = "iroh-sync" version = "0.12.0" -source = "git+https://github.com/n0-computer/iroh#7965836513fcece047f54358d3ef11e8797f9d02" +source = "git+https://github.com/n0-computer/iroh?branch=perf-startup#4fbb3bf064ea51ef4127147aa6d70427da4ad4eb" dependencies = [ "anyhow", "bytes", @@ -3184,9 +3197,9 @@ dependencies = [ [[package]] name = "ouroboros" -version = "0.18.2" +version = "0.18.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a50b637ffd883b2733a8483599fb6136b9dcedaa1850f7ac08b9b6f9f2061208" +checksum = "97b7be5a8a3462b752f4be3ff2b2bf2f7f1d00834902e46be2a4d68b87b0573c" dependencies = [ "aliasable", "ouroboros_macro", @@ -3195,9 +3208,9 @@ dependencies = [ [[package]] name = "ouroboros_macro" -version = "0.18.2" +version = "0.18.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3633d65683f13b9bcfaa3150880b018899fb0e5d0542f4adaea4f503fdb5eabf" +checksum = "b645dcde5f119c2c454a92d0dfa271a2a3b205da92e4292a68ead4bdbfde1f33" dependencies = [ "heck", "itertools", @@ -3309,6 +3322,15 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8835116a5c179084a830efb3adc117ab007512b535bc1a21c991d3b32a6b44dd" +[[package]] +name = "paths-as-strings" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7963201bed34799ddd0f54a2e1e4c5082313ca260db118822109f344f13b285c" +dependencies = [ + "base64 0.10.1", +] + [[package]] name = "pem" version = "2.0.1" @@ -3649,9 +3671,9 @@ checksum = "dc375e1527247fe1a97d8b7156678dfe7c1af2fc075c9a4db3690ecd2a148068" [[package]] name = "proc-macro2" -version = "1.0.76" +version = "1.0.78" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "95fc56cda0b5c3325f5fbbd7ff9fda9e02bb00bb3dac51252d2f1bfa1cb8cc8c" +checksum = "e2422ad645d89c99f8f3e6b88a9fdeca7fabeac836b1002371c4367c8f984aae" dependencies = [ "unicode-ident", ] @@ -3935,13 +3957,13 @@ dependencies = [ [[package]] name = "regex" -version = "1.10.2" +version = "1.10.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "380b951a9c5e80ddfd6136919eef32310721aa4aacd4889a8d39124b026ab343" +checksum = "b62dbe01f0b06f9d8dc7d49e05a0785f153b00b2c227856282f671e0318c9b15" dependencies = [ "aho-corasick", "memchr", - "regex-automata 0.4.3", + "regex-automata 0.4.4", "regex-syntax 0.8.2", ] @@ -3956,9 +3978,9 @@ dependencies = [ [[package]] name = "regex-automata" -version = "0.4.3" +version = "0.4.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f804c7828047e88b2d32e2d7fe5a105da8ee3264f01902f796c8e067dc2483f" +checksum = "3b7fa1134405e2ec9353fd416b17f8dacd46c473d7d3fd1cf202706a14eb792a" dependencies = [ "aho-corasick", "memchr", @@ -4567,9 +4589,9 @@ dependencies = [ [[package]] name = "smallvec" -version = "1.12.0" +version = "1.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2593d31f82ead8df961d8bd23a64c2ccf2eb5dd34b0a34bfb4dd54011c72009e" +checksum = "e6ecd384b10a64542d77071bd64bd7b231f4ed5940fba55e98c3de13824cf3d7" dependencies = [ "serde", ] @@ -5026,6 +5048,15 @@ dependencies = [ "tokio-util", ] +[[package]] +name = "tokio-task-pool" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75b979d664a052ebb638f0eb4d1134bed9552a5975b82c0c61bf55ff27ff8953" +dependencies = [ + "tokio", +] + [[package]] name = "tokio-util" version = "0.7.10" @@ -5219,6 +5250,7 @@ dependencies = [ "futures", "iroh", "iroh-io", + "lru", "md5", "percent-encoding", "quic-rpc", @@ -5228,6 +5260,7 @@ dependencies = [ "thiserror", "tokio", "tokio-stream", + "tokio-task-pool", "tokio-util", "tower-http", "tracing", @@ -5409,9 +5442,9 @@ checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a" [[package]] name = "uuid" -version = "1.6.1" +version = "1.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5e395fcf16a7a3d8127ec99782007af141946b4795001f876d54fb0d55978560" +checksum = "f00cc9702ca12d3c81455259621e676d0f7251cec66a21e98fe2e9a37db93b2a" [[package]] name = "valuable" @@ -5599,8 +5632,6 @@ version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e686886bc078bc1b0b600cac0147aadb815089b6e4da64016cbd754b6342700f" dependencies = [ - "windows-implement", - "windows-interface", "windows-targets 0.48.5", ] @@ -5621,6 +5652,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e48a53791691ab099e5e2ad123536d0fff50652600abaf43bbf952894110d0be" dependencies = [ "windows-core 0.52.0", + "windows-implement", + "windows-interface", "windows-targets 0.52.0", ] @@ -5644,24 +5677,24 @@ dependencies = [ [[package]] name = "windows-implement" -version = "0.48.0" +version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5e2ee588991b9e7e6c8338edf3333fbe4da35dc72092643958ebb43f0ab2c49c" +checksum = "12168c33176773b86799be25e2a2ba07c7aab9968b37541f1094dbd7a60c8946" dependencies = [ "proc-macro2", "quote", - "syn 1.0.109", + "syn 2.0.48", ] [[package]] name = "windows-interface" -version = "0.48.0" +version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e6fb8df20c9bcaa8ad6ab513f7b40104840c8867d5751126e4df3b08388d0cc7" +checksum = "9d8dc32e0095a7eeccebd0e3f09e9509365ecb3fc6ac4d6f5f14a3f6392942d1" dependencies = [ "proc-macro2", "quote", - "syn 1.0.109", + "syn 2.0.48", ] [[package]] @@ -5817,16 +5850,16 @@ dependencies = [ [[package]] name = "wmi" -version = "0.13.1" +version = "0.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ced703d10188571ce53582c2932ce640ed3c413cff7ee6e2d961f9abdb6a63d1" +checksum = "fff298e96fd8ef6bb55dcb2a7fd2f26969f962bf428ffa6b267457dd804d64d8" dependencies = [ "chrono", "futures", "log", "serde", "thiserror", - "windows 0.48.0", + "windows 0.52.0", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index acd200b..b73a8e3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,7 +20,7 @@ aws-credential-types = { version = "1.0", features = ["hardcoded-credentials"] } axum = "0.7" bisection = "0.1" clap = { version = "4.4", features = ["derive"] } -iroh = { version = "0.12.0", git = "https://github.com/n0-computer/iroh", features = [ "flat-db", "metrics" ] } +iroh = { version = "0.12.0", branch = "perf-startup", git = "https://github.com/n0-computer/iroh", features = [ "flat-db", "metrics" ] } iroh-io = { version = "0.3.0" } md5 = "0.7" percent-encoding = "2.3.1" @@ -36,3 +36,5 @@ tower-http = { version = "0.5", features = ["trace"] } tracing = "0.1" tracing-subscriber = "0.3" futures = "0.3" +lru = "0.12.1" +tokio-task-pool = "0.1.5" diff --git a/examples/config.yaml b/examples/config.yaml index e66e479..177473f 100644 --- a/examples/config.yaml +++ b/examples/config.yaml @@ -5,7 +5,7 @@ iroh: path: /trident/iroh bind_port: 11204 rpc_port: 4919 - max_rpc_connections: 16 + max_rpc_connections: 32 max_rpc_streams: 1024 sinks: {} fs_storages: @@ -14,7 +14,8 @@ iroh: fs_shards: - name: shard1 path: /trident/data/shard1 - weight: 20 + weight: 1 - name: shard2 - path: /trident/data/shard2 - weight: 20 \ No newline at end of file + path: /trident/data/shard1 + weight: 1 + gc_interval_secs: 86400 diff --git a/src/config.rs b/src/config.rs index a827c0e..41d219e 100644 --- a/src/config.rs +++ b/src/config.rs @@ -81,6 +81,7 @@ pub struct IrohConfig { pub sinks: HashMap, #[serde(default = "HashMap::new")] pub fs_storages: HashMap, + pub gc_interval_secs: Option, } #[derive(Clone, Debug, Serialize, Deserialize)] diff --git a/src/iroh_node.rs b/src/iroh_node.rs index 2348e54..b0a87a0 100644 --- a/src/iroh_node.rs +++ b/src/iroh_node.rs @@ -10,10 +10,11 @@ use crate::storages::{Storage, StorageEngine}; use crate::utils::bytes_to_key; use crate::IrohClient; use async_stream::stream; +use futures::StreamExt; use iroh::bytes::Hash; use iroh::client::quic::RPC_ALPN; use iroh::net::derp::DerpMode; -use iroh::node::Node; +use iroh::node::{GcPolicy, Node}; use iroh::rpc_protocol::{ProviderRequest, ProviderResponse, ShareMode}; use iroh::sync::{AuthorId, NamespaceId}; use iroh::ticket::DocTicket; @@ -24,9 +25,11 @@ use std::collections::HashMap; use std::net::{Ipv4Addr, SocketAddrV4}; use std::str::FromStr; use std::sync::Arc; +use std::time::Duration; use tokio::io::AsyncRead; use tokio::sync::RwLock; use tokio_stream::Stream; +use tracing::info; pub struct IrohNode { sync_client: IrohClient, @@ -81,18 +84,32 @@ impl IrohNode { QuinnServerEndpoint::::new(rpc_quinn_endpoint) .map_err(Error::node_create)?; - let node = Node::builder(db, docs) + let mut node_builder = Node::builder(db, docs) .secret_key(secret_key) .peers_data_path(peer_data_path) .derp_mode(DerpMode::Default) .bind_port(config_lock.iroh.bind_port) - .rpc_endpoint(rpc_endpoint) - .spawn() - .await - .map_err(Error::node_create)?; + .rpc_endpoint(rpc_endpoint); + if let Some(gc_interval_secs) = config_lock.iroh.gc_interval_secs { + node_builder = + node_builder.gc_policy(GcPolicy::Interval(Duration::from_secs(gc_interval_secs))) + } + let node = node_builder.spawn().await.map_err(Error::node_create)?; let sync_client = node.client(); + for doc in sync_client + .docs + .list() + .await + .unwrap() + .map(|x| x.unwrap().0) + .collect::>() + .await + { + info!(action = "all_docs", "{}", doc); + } + let author_id = match &config_lock.iroh.author { Some(author) => AuthorId::from_str(author).unwrap(), None => { @@ -135,19 +152,23 @@ impl IrohNode { .await?, ), }; - let mirroring = table_config.mirroring.clone().map(|mirroring_config| { - Mirroring::new( - iroh_doc.clone(), - mirroring_config - .sinks - .clone() - .into_iter() - .map(|sink_name| sinks[&sink_name].clone()) - .collect(), - sync_client.clone(), - mirroring_config.delete_after_mirroring, - ) - }); + let mirroring = match &table_config.mirroring { + Some(mirroring_config) => Some( + Mirroring::new( + iroh_doc.clone(), + mirroring_config + .sinks + .clone() + .into_iter() + .map(|sink_name| sinks[&sink_name].clone()) + .collect(), + sync_client.clone(), + mirroring_config.delete_after_mirroring, + ) + .await?, + ), + None => None, + }; let storage = Storage::new(storage_engine, mirroring); table_storages.insert(table_name.clone(), storage); } @@ -222,19 +243,23 @@ impl IrohNode { StorageEngineConfig::FS(storage_name.to_string()), ), }; - let mirroring = mirroring_config.clone().map(|mirroring_config| { - Mirroring::new( - iroh_doc.clone(), - mirroring_config - .sinks - .clone() - .into_iter() - .map(|sink_name| self.sinks[&sink_name].clone()) - .collect(), - self.sync_client.clone(), - mirroring_config.delete_after_mirroring, - ) - }); + let mirroring = match &mirroring_config { + Some(mirroring_config) => Some( + Mirroring::new( + iroh_doc.clone(), + mirroring_config + .sinks + .clone() + .into_iter() + .map(|sink_name| self.sinks[&sink_name].clone()) + .collect(), + self.sync_client.clone(), + mirroring_config.delete_after_mirroring, + ) + .await?, + ), + None => None, + }; let storage = Storage::new(storage_engine, mirroring); entry.insert(storage); self.config.write().await.iroh.tables.insert( @@ -287,20 +312,23 @@ impl IrohNode { StorageEngineConfig::FS(storage_name.to_string()), ), }; - - let mirroring = mirroring_config.clone().map(|mirroring_config| { - Mirroring::new( - iroh_doc.clone(), - mirroring_config - .sinks - .clone() - .into_iter() - .map(|sink_name| self.sinks[&sink_name].clone()) - .collect(), - self.sync_client.clone(), - mirroring_config.delete_after_mirroring, - ) - }); + let mirroring = match &mirroring_config { + Some(mirroring_config) => Some( + Mirroring::new( + iroh_doc.clone(), + mirroring_config + .sinks + .clone() + .into_iter() + .map(|sink_name| self.sinks[&sink_name].clone()) + .collect(), + self.sync_client.clone(), + mirroring_config.delete_after_mirroring, + ) + .await?, + ), + None => None, + }; let storage = Storage::new(storage_engine, mirroring); entry.insert(storage); @@ -321,14 +349,21 @@ impl IrohNode { pub async fn tables_drop(&mut self, table_name: &str) -> Result<()> { match self.table_storages.remove(table_name) { None => Err(Error::missing_table(table_name)), - Some(_) => Ok(self - .config - .write() - .await - .iroh - .tables - .remove(table_name) - .unwrap()), + Some(table_storage) => { + self.sync_client + .docs + .drop_doc(table_storage.iroh_doc().id()) + .await + .map_err(Error::doc)?; + Ok(self + .config + .write() + .await + .iroh + .tables + .remove(table_name) + .unwrap()) + } }?; Ok(()) } @@ -361,9 +396,7 @@ impl IrohNode { let Some((from_hash, from_size)) = from_table.get_hash(from_key).await? else { return Err(Error::missing_key(from_key)); }; - to_table - .insert_hash(to_key, from_hash.clone(), from_size) - .await?; + to_table.insert_hash(to_key, from_hash, from_size).await?; Ok(from_hash) } diff --git a/src/lib.rs b/src/lib.rs index b18b357..cb506fe 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,15 +1,18 @@ -#![warn( - clippy::all, - clippy::restriction, - clippy::pedantic, - clippy::nursery, - clippy::cargo -)] -#![allow( - clippy::implicit_return, - clippy::missing_docs_in_private_items, - clippy::multiple_unsafe_ops_per_block, - clippy::question_mark_used +#![deny( + non_shorthand_field_patterns, + no_mangle_generic_items, + overflowing_literals, + path_statements, + unused_allocation, + unused_comparisons, + unused_parens, + while_true, + trivial_numeric_casts, + unused_extern_crates, + unused_import_braces, + unused_qualifications, + unused_must_use, + clippy::unwrap_used )] use iroh::rpc_protocol::{ProviderRequest, ProviderResponse}; diff --git a/src/sinks.rs b/src/sinks.rs index c97004d..7528159 100644 --- a/src/sinks.rs +++ b/src/sinks.rs @@ -6,11 +6,12 @@ use aws_sdk_s3::config::{BehaviorVersion, Region}; use aws_sdk_s3::primitives::ByteStream; use axum::async_trait; use percent_encoding::utf8_percent_encode; +use tokio_util::bytes; #[async_trait] pub trait Sink: Send + Sync { fn name(&self) -> &str; - async fn send(&self, key: &[u8], value: Vec) -> Result<(), Error>; + async fn send(&self, key: &[u8], value: bytes::Bytes) -> Result<(), Error>; } pub struct S3Sink { @@ -56,12 +57,13 @@ impl Sink for S3Sink { &self.name } - async fn send(&self, key: &[u8], value: Vec) -> Result<(), Error> { + async fn send(&self, key: &[u8], value: bytes::Bytes) -> Result<(), Error> { // ToDo: Remove allocating and return stream // https://github.com/awslabs/aws-sdk-rust/discussions/361 - let encoded_key = utf8_percent_encode(std::str::from_utf8(bytes_to_key(key)).unwrap(), FRAGMENT) - .collect::() - .to_lowercase(); + let encoded_key = + utf8_percent_encode(std::str::from_utf8(bytes_to_key(key)).unwrap(), FRAGMENT) + .collect::() + .to_lowercase(); self.client .put_object() diff --git a/src/storages/fs_storage.rs b/src/storages/fs_storage.rs index 66e976d..cd5f3d0 100644 --- a/src/storages/fs_storage.rs +++ b/src/storages/fs_storage.rs @@ -9,11 +9,16 @@ use iroh::bytes::Hash; use iroh::client::{Entry, LiveEvent}; use iroh::sync::store::Query; use iroh::sync::{AuthorId, ContentStatus}; -use std::collections::HashMap; +use lru::LruCache; +use std::collections::{HashMap, HashSet}; use std::io; +use std::num::NonZeroUsize; use std::path::PathBuf; +use std::sync::Arc; use tokio::io::AsyncRead; -use tracing::{info, info_span, Instrument}; +use tokio_task_pool::Pool; +use tokio_util::bytes; +use tracing::{info, info_span, warn, Instrument}; #[derive(Clone)] pub struct FSStorageEngine { @@ -41,11 +46,11 @@ impl FSStorageEngine { fs_shards, replicas: fs_storage_config.replicas, }; + let fs_storage_clone = fs_storage.clone(); tokio::spawn({ - let fs_storage = fs_storage.clone(); async move { - let mut stream = fs_storage.iroh_doc().subscribe().await.unwrap(); - let mut wait_list = HashMap::new(); + let mut stream = fs_storage_clone.iroh_doc().subscribe().await.unwrap(); + let mut wait_list = LruCache::new(NonZeroUsize::new(1024).expect("not possible")); info!("started"); while let Some(event) = stream.next().await { let event = event.unwrap(); @@ -58,39 +63,58 @@ impl FSStorageEngine { info!(event = ?event); match content_status { ContentStatus::Complete => { - fs_storage.process_remote_entry(entry).await?; + fs_storage_clone.process_remote_entry(entry).await?; } ContentStatus::Missing => { if entry.content_len() > 0 { - wait_list.insert(entry.content_hash(), entry.clone()); + wait_list.put(entry.content_hash(), entry.clone()); } else { - fs_storage.process_remote_entry(entry).await?; + fs_storage_clone.process_remote_entry(entry).await?; } } ContentStatus::Incomplete => { - wait_list.insert(entry.content_hash(), entry.clone()); + wait_list.put(entry.content_hash(), entry.clone()); } }; } LiveEvent::ContentReady { hash } => { info!(event = ?event); - fs_storage - .process_remote_entry(&wait_list.remove(hash).unwrap()) - .await?; + let Some(hash) = &wait_list.pop(hash) else { + warn!(action = "skipped_absent_hash", hash = ?hash); + continue; + }; + fs_storage_clone.process_remote_entry(hash).await?; } _ => {} }; } Ok::<(), Error>(()) } - .instrument(info_span!("fs_sync", table_id = iroh_doc.id().to_string())) + .instrument(info_span!(parent: None, "fs_sync", table_id = iroh_doc.id().to_string())) }); - for (_, fs_shard) in &fs_storage.fs_shards { - tokio::spawn({ - let fs_storage = fs_storage.clone(); - let base_path = fs_shard.path().to_path_buf(); - info!("started"); - async move { + + let fs_storage_clone = fs_storage.clone(); + + tokio::spawn(async move { + let all_keys: Arc> = Arc::new( + fs_storage_clone + .iroh_doc() + .get_many(Query::all()) + .await + .map_err(Error::doc)? + .map(|x| bytes::Bytes::copy_from_slice(x.unwrap().key())) + .collect() + .await, + ); + let pool = Arc::new(Pool::bounded(16)); + + for fs_shard in &fs_storage_clone.fs_shards.values() { + let fs_storage_clone = fs_storage_clone.clone(); + let fs_shard = fs_shard.clone(); + let all_keys = all_keys.clone(); + let pool = pool.clone(); + tokio::spawn(async move { + let base_path = fs_shard.path().to_path_buf(); let mut read_dir_stream = tokio::fs::read_dir(&base_path) .await .map_err(Error::io_error)?; @@ -100,44 +124,39 @@ impl FSStorageEngine { .map_err(Error::io_error)? { let key = key_to_bytes(&entry.file_name().to_string_lossy()); - let exists_in_iroh = fs_storage - .iroh_doc() - .get_one(Query::key_exact(&key)) - .await - .map_err(Error::doc)? - .is_some(); - if !exists_in_iroh { - info!("importing: {:?}", entry.path()); - let import_progress = fs_storage - .iroh_doc() - .import_file(fs_storage.author_id, key, &entry.path(), true) - .await - .map_err(Error::doc)?; - import_progress.finish().await.map_err(Error::hash)?; - } else { - info!("skipping: {:?}", entry.path()); + if all_keys.contains(&key) || key.starts_with(&[b'~']) { + info!(action = "skipped", key = ?entry.file_name()); + continue; } + pool.spawn({ + let iroh_doc = fs_storage_clone.iroh_doc().clone(); + async move { + let import_progress = iroh_doc + .import_file( + fs_storage_clone.author_id, + key, + &entry.path(), + true, + ) + .await + .map_err(Error::doc) + .unwrap(); + import_progress.finish().await.map_err(Error::hash).unwrap(); + info!(action = "imported", key = ?entry.file_name()) + } + .instrument(info_span!(parent: None, "restore")) + }) + .await + .unwrap(); } Ok::<(), Error>(()) - } - .instrument(info_span!("restore")) - }); - } + }); + } + Ok::<(), Error>(()) + }); Ok(fs_storage) } - pub async fn get(&self, key: &str) -> Result { - if let Some(file_shard_config) = self.hash_ring.range(key, 1).into_iter().next() { - let file_shard = &self.fs_shards[&file_shard_config.name]; - return file_shard - .open_store(key) - .await - .map_err(Error::io_error)? - .ok_or_else(|| Error::missing_key(key)); - } - Err(Error::storage("missing file shards")) - } - fn get_path(&self, key: &str) -> Result { if let Some(file_shard_config) = self.hash_ring.range(key, 1).into_iter().next() { let file_shard = &self.fs_shards[&file_shard_config.name]; @@ -182,7 +201,6 @@ impl FSStorageEngine { .del(self.author_id, key_to_bytes(key)) .await .map_err(Error::missing_key)?; - self.delete_from_fs(key).await?; Ok(removed_items) } diff --git a/src/storages/iroh_storage.rs b/src/storages/iroh_storage.rs index fefc542..32e6bb5 100644 --- a/src/storages/iroh_storage.rs +++ b/src/storages/iroh_storage.rs @@ -20,19 +20,6 @@ impl IrohStorageEngine { } } - pub async fn get(&self, key: &str) -> Result { - Ok(Box::new( - self.iroh_doc - .get_one(Query::key_exact(key_to_bytes(key))) - .await - .map_err(Error::doc)? - .ok_or_else(|| Error::missing_key(key))? - .content_reader(&self.iroh_doc) - .await - .map_err(Error::doc)?, - )) - } - pub async fn delete(&self, key: &str) -> Result { self.iroh_doc .del(self.author_id, key_to_bytes(key)) diff --git a/src/storages/mirroring.rs b/src/storages/mirroring.rs index 155a51c..dac3d6e 100644 --- a/src/storages/mirroring.rs +++ b/src/storages/mirroring.rs @@ -1,80 +1,90 @@ -use crate::error::Error; +use crate::error::{Error, Result}; use crate::sinks::Sink; use crate::{IrohClient, IrohDoc}; use futures::StreamExt; use iroh::client::LiveEvent; -use std::collections::HashMap; +use lru::LruCache; +use std::num::NonZeroUsize; use std::sync::Arc; -use tokio::io::AsyncReadExt; use tokio::task::JoinHandle; use tracing::{info, info_span, warn, Instrument}; #[derive(Clone)] pub struct Mirroring { - thread: Arc>>, + thread: Arc>>, } impl Mirroring { - #[must_use] - pub fn new( + pub async fn new( iroh_doc: IrohDoc, sinks: Vec>, sync_client: IrohClient, delete_after_mirroring: bool, - ) -> Self { + ) -> Result { let table_id = iroh_doc.id().to_string(); + let mut stream = iroh_doc.subscribe().await.map_err(Error::doc)?; let thread = tokio::spawn( async move { - let mut stream = iroh_doc.subscribe().await.unwrap(); - let mut wait_list = HashMap::new(); + let mut wait_list = LruCache::new(NonZeroUsize::new(1024).expect("not_possible")); info!("started"); while let Some(event) = stream.next().await { - let event = event.unwrap(); + let event = match event { + Ok(event) => event, + Err(error) => { + warn!(error = ?error); + continue + } + }; match &event { LiveEvent::InsertLocal { entry } => { info!(event = ?event); - let mut reader = match entry.content_reader(&sync_client).await { - Ok(reader) => reader, - Err(error) => { - warn!(error = ?error); - continue; - } - }; - let mut buffer = vec![]; - if let Err(error) = reader.read_to_end(&mut buffer).await { - warn!(error = ?error); - continue; - } - for sink in &sinks { - if let Err(error) = sink.send(entry.key(), buffer.clone()).await { - warn!(error = ?error); - continue; + match sync_client.blobs.read(entry.content_hash()).await { + Ok(mut reader) => { + let bytes = match reader.read_to_bytes().await { + Ok(bytes) => bytes, + Err(error) => { + warn!(error = ?error); + continue; + } + }; + for sink in &sinks { + if let Err(error) = + sink.send(entry.key(), bytes.clone()).await + { + warn!(error = ?error); + continue; + } + info!(action = "sent", sink = sink.name(), key = ?std::str::from_utf8(entry.key())); + } } + Err(error) => warn!(error = ?error), } } LiveEvent::InsertRemote { entry, .. } => { info!(event = ?event); - wait_list.insert(entry.content_hash(), entry.key().to_vec()); + wait_list.put(entry.content_hash(), entry.key().to_vec()); } LiveEvent::ContentReady { hash } => { info!(event = ?event); - let Some(key) = wait_list.remove(hash) else { + let Some(key) = wait_list.get(hash) else { warn!(error = "missing_key_in_wait_list"); continue; }; match sync_client.blobs.read(*hash).await { Ok(mut reader) => { - let mut buffer = vec![]; - if let Err(error) = reader.read_to_end(&mut buffer).await { - warn!(error = ?error); - continue; - } + let bytes = match reader.read_to_bytes().await { + Ok(bytes) => bytes, + Err(error) => { + warn!(error = ?error); + continue; + } + }; for sink in &sinks { - info!(action = "send", sink = sink.name()); - if let Err(error) = sink.send(&key, buffer.clone()).await { + if let Err(error) = sink.send(key, bytes.clone()).await { warn!(error = ?error); continue; } + info!(action = "sent", sink = sink.name(), key = ?std::str::from_utf8(key)); } if delete_after_mirroring { if let Err(error) = @@ -94,10 +104,10 @@ impl Mirroring { warn!("stopped_mirroring"); Ok(()) } - .instrument(info_span!("mirroring", table_id = table_id)), + .instrument(info_span!(parent: None, "mirroring", table_id = table_id)), ); - Self { + Ok(Self { thread: Arc::new(thread), - } + }) } } diff --git a/src/storages/mod.rs b/src/storages/mod.rs index b745b92..4a517b7 100644 --- a/src/storages/mod.rs +++ b/src/storages/mod.rs @@ -36,10 +36,16 @@ impl Storage { } pub async fn get(&self, key: &str) -> Result> { - match &self.engine { - StorageEngine::FS(storage) => Ok(Box::new(storage.get(key).await?)), - StorageEngine::Iroh(storage) => Ok(Box::new(storage.get(key).await?)), - } + Ok(Box::new( + self.iroh_doc() + .get_one(Query::key_exact(key_to_bytes(key))) + .await + .map_err(Error::doc)? + .ok_or_else(|| Error::missing_key(key))? + .content_reader(self.iroh_doc()) + .await + .map_err(Error::doc)?, + )) } pub async fn insert(&self, key: &str, value: S) -> Result { diff --git a/src/utils.rs b/src/utils.rs index d6cfd05..d71d045 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -9,12 +9,12 @@ pub const FRAGMENT: &AsciiSet = &CONTROLS .add(b'`') .add(b'/'); +#[inline] pub fn key_to_bytes(key: &str) -> bytes::Bytes { - let mut v = Vec::from(key.as_bytes()); - v.push(b'\0'); - bytes::Bytes::from(v) + bytes::Bytes::copy_from_slice(key.as_bytes()) } +#[inline] pub fn bytes_to_key(b: &[u8]) -> &[u8] { - b.strip_suffix(b"\0").unwrap_or(b) + b } diff --git a/trident-py/pyproject.toml b/trident-py/pyproject.toml index 681e26e..28c0add 100644 --- a/trident-py/pyproject.toml +++ b/trident-py/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "izihawa-trident-client" -version = "1.0.2" +version = "1.0.5" description = "" authors = ["Pasha Podolsky "] readme = "README.md" diff --git a/trident-py/trident/client.py b/trident-py/trident/client.py index 3770fdc..d751722 100644 --- a/trident-py/trident/client.py +++ b/trident-py/trident/client.py @@ -85,9 +85,8 @@ async def table_get_chunks(self, table: str, key: str, timeout: float = None) -> async def table_ls(self, table) -> typing.AsyncGenerator[str, None]: response = await self.get(f"/tables/{table}/") - async for data, _ in response.content.iter_chunks(): - for line in data.split('\n'): - yield line + async for line in response.content: + yield line.decode()[:-1] async def table_exists(self, table: str, key: str) -> bool: url = f"/tables/{table}/{key}/exists/"