From 7f680b75070242470c04e97d2e6cd1b5f78f35df Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Mon, 2 Dec 2024 10:40:04 +0100 Subject: [PATCH 01/21] fix(iroh-dns-server): remove accidental blocking from store --- iroh-dns-server/src/store.rs | 14 +-- iroh-dns-server/src/store/signed_packets.rs | 95 ++++++++++++--------- 2 files changed, 63 insertions(+), 46 deletions(-) diff --git a/iroh-dns-server/src/store.rs b/iroh-dns-server/src/store.rs index d7cb595b39..89f3ca9f43 100644 --- a/iroh-dns-server/src/store.rs +++ b/iroh-dns-server/src/store.rs @@ -6,8 +6,8 @@ use anyhow::Result; use hickory_proto::rr::{Name, RecordSet, RecordType, RrKey}; use iroh_metrics::inc; use lru::LruCache; -use parking_lot::Mutex; use pkarr::{mainline::dht::DhtSettings, PkarrClient, SignedPacket}; +use tokio::sync::Mutex; use tracing::{debug, trace}; use ttl_cache::TtlCache; @@ -97,14 +97,15 @@ impl ZoneStore { record_type: RecordType, ) -> Result>> { tracing::info!("{} {}", name, record_type); - if let Some(rset) = self.cache.lock().resolve(pubkey, name, record_type) { + if let Some(rset) = self.cache.lock().await.resolve(pubkey, name, record_type) { return Ok(Some(rset)); } - if let Some(packet) = self.store.get(pubkey)? { + if let Some(packet) = self.store.get(pubkey).await? { return self .cache .lock() + .await .insert_and_resolve(&packet, name, record_type); }; @@ -120,6 +121,7 @@ impl ZoneStore { return self .cache .lock() + .await .insert_and_resolve_dht(&packet, name, record_type); } else { debug!("DHT resolve failed"); @@ -132,7 +134,7 @@ impl ZoneStore { // allow unused async: this will be async soon. #[allow(clippy::unused_async)] pub async fn get_signed_packet(&self, pubkey: &PublicKeyBytes) -> Result> { - self.store.get(pubkey) + self.store.get(pubkey).await } /// Insert a signed packet into the cache and the store. @@ -143,9 +145,9 @@ impl ZoneStore { #[allow(clippy::unused_async)] pub async fn insert(&self, signed_packet: SignedPacket, _source: PacketSource) -> Result { let pubkey = PublicKeyBytes::from_signed_packet(&signed_packet); - if self.store.upsert(signed_packet)? { + if self.store.upsert(signed_packet).await? { inc!(Metrics, pkarr_publish_update); - self.cache.lock().remove(&pubkey); + self.cache.lock().await.remove(&pubkey); Ok(true) } else { inc!(Metrics, pkarr_publish_noop); diff --git a/iroh-dns-server/src/store/signed_packets.rs b/iroh-dns-server/src/store/signed_packets.rs index 13964e66c6..f68a501605 100644 --- a/iroh-dns-server/src/store/signed_packets.rs +++ b/iroh-dns-server/src/store/signed_packets.rs @@ -1,4 +1,4 @@ -use std::path::Path; +use std::{path::Path, sync::Arc}; use anyhow::{Context, Result}; use iroh_metrics::inc; @@ -14,7 +14,7 @@ const SIGNED_PACKETS_TABLE: TableDefinition<&SignedPacketsKey, &[u8]> = #[derive(Debug)] pub struct SignedPacketStore { - db: Database, + db: Arc, } impl SignedPacketStore { @@ -47,53 +47,68 @@ impl SignedPacketStore { let _table = write_tx.open_table(SIGNED_PACKETS_TABLE)?; } write_tx.commit()?; - Ok(Self { db }) + Ok(Self { db: Arc::new(db) }) } - pub fn upsert(&self, packet: SignedPacket) -> Result { + pub async fn upsert(&self, packet: SignedPacket) -> Result { let key = PublicKeyBytes::from_signed_packet(&packet); - let tx = self.db.begin_write()?; - let mut replaced = false; - { - let mut table = tx.open_table(SIGNED_PACKETS_TABLE)?; - if let Some(existing) = get_packet(&table, &key)? { - if existing.more_recent_than(&packet) { - return Ok(false); - } else { - replaced = true; + let db = self.db.clone(); + tokio::task::spawn_blocking(move || { + let tx = db.begin_write()?; + let mut replaced = false; + { + let mut table = tx.open_table(SIGNED_PACKETS_TABLE)?; + if let Some(existing) = get_packet(&table, &key)? { + if existing.more_recent_than(&packet) { + return Ok(false); + } else { + replaced = true; + } } + let value = packet.as_bytes(); + table.insert(key.as_bytes(), &value[..])?; } - let value = packet.as_bytes(); - table.insert(key.as_bytes(), &value[..])?; - } - tx.commit()?; - if replaced { - inc!(Metrics, store_packets_updated); - } else { - inc!(Metrics, store_packets_inserted); - } - Ok(true) + tx.commit()?; + if replaced { + inc!(Metrics, store_packets_updated); + } else { + inc!(Metrics, store_packets_inserted); + } + Ok(true) + }) + .await? } - pub fn get(&self, key: &PublicKeyBytes) -> Result> { - let tx = self.db.begin_read()?; - let table = tx.open_table(SIGNED_PACKETS_TABLE)?; - get_packet(&table, key) + pub async fn get(&self, key: &PublicKeyBytes) -> Result> { + let db = self.db.clone(); + let key = key.clone(); + let res = tokio::task::spawn_blocking(move || { + let tx = db.begin_read()?; + let table = tx.open_table(SIGNED_PACKETS_TABLE)?; + get_packet(&table, &key) + }) + .await??; + Ok(res) } - pub fn remove(&self, key: &PublicKeyBytes) -> Result { - let tx = self.db.begin_write()?; - let updated = { - let mut table = tx.open_table(SIGNED_PACKETS_TABLE)?; - let did_remove = table.remove(key.as_bytes())?.is_some(); - #[allow(clippy::let_and_return)] - did_remove - }; - tx.commit()?; - if updated { - inc!(Metrics, store_packets_removed) - } - Ok(updated) + pub async fn remove(&self, key: &PublicKeyBytes) -> Result { + let db = self.db.clone(); + let key = key.clone(); + tokio::task::spawn_blocking(move || { + let tx = db.begin_write()?; + let updated = { + let mut table = tx.open_table(SIGNED_PACKETS_TABLE)?; + let did_remove = table.remove(key.as_bytes())?.is_some(); + #[allow(clippy::let_and_return)] + did_remove + }; + tx.commit()?; + if updated { + inc!(Metrics, store_packets_removed) + } + Ok(updated) + }) + .await? } } From 3fb51ae849edf8d7eff567e4b7cb28325133d26c Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Mon, 2 Dec 2024 11:23:23 +0100 Subject: [PATCH 02/21] happy clippy --- iroh-dns-server/src/store/signed_packets.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/iroh-dns-server/src/store/signed_packets.rs b/iroh-dns-server/src/store/signed_packets.rs index f68a501605..220af863e6 100644 --- a/iroh-dns-server/src/store/signed_packets.rs +++ b/iroh-dns-server/src/store/signed_packets.rs @@ -81,7 +81,7 @@ impl SignedPacketStore { pub async fn get(&self, key: &PublicKeyBytes) -> Result> { let db = self.db.clone(); - let key = key.clone(); + let key = *key; let res = tokio::task::spawn_blocking(move || { let tx = db.begin_read()?; let table = tx.open_table(SIGNED_PACKETS_TABLE)?; @@ -93,7 +93,7 @@ impl SignedPacketStore { pub async fn remove(&self, key: &PublicKeyBytes) -> Result { let db = self.db.clone(); - let key = key.clone(); + let key = *key; tokio::task::spawn_blocking(move || { let tx = db.begin_write()?; let updated = { From 642b6c292733e33c479694dc2f743f76ca41390b Mon Sep 17 00:00:00 2001 From: Asmir Avdicevic Date: Mon, 2 Dec 2024 13:17:55 +0100 Subject: [PATCH 03/21] feat(benchmark): iroh-dns-server write benchmark --- Cargo.lock | 1 + iroh-dns-server/Cargo.toml | 7 ++++- iroh-dns-server/benches/write.rs | 54 ++++++++++++++++++++++++++++++++ iroh-dns-server/src/lib.rs | 2 +- 4 files changed, 62 insertions(+), 2 deletions(-) create mode 100644 iroh-dns-server/benches/write.rs diff --git a/Cargo.lock b/Cargo.lock index b4185ce95c..b2ceb7607d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2528,6 +2528,7 @@ dependencies = [ "base64-url", "bytes", "clap", + "criterion", "derive_more", "dirs-next", "futures-lite 2.5.0", diff --git a/iroh-dns-server/Cargo.toml b/iroh-dns-server/Cargo.toml index c8771a8c2e..c8b5fe9191 100644 --- a/iroh-dns-server/Cargo.toml +++ b/iroh-dns-server/Cargo.toml @@ -41,7 +41,7 @@ rustls-pemfile = { version = "2.1" } serde = { version = "1", features = ["derive"] } struct_iterable = "0.1.1" strum = { version = "0.26", features = ["derive"] } -tokio = { version = "1.36.0", features = ["full"] } +tokio = { version = "1", features = ["full"] } tokio-rustls = { version = "0.26", default-features = false, features = [ "logging", "ring", @@ -59,10 +59,15 @@ url = "2.5" z32 = "1.1.1" [dev-dependencies] +criterion = "0.5.1" hickory-resolver = "=0.25.0-alpha.2" iroh = { version = "0.28.0", path = "../iroh" } iroh-test = { version = "0.28.0", path = "../iroh-test" } pkarr = { version = "2.2.0", features = ["rand"] } +[[bench]] +name = "write" +harness = false + [package.metadata.docs.rs] all-features = true diff --git a/iroh-dns-server/benches/write.rs b/iroh-dns-server/benches/write.rs new file mode 100644 index 0000000000..e610f3f39b --- /dev/null +++ b/iroh-dns-server/benches/write.rs @@ -0,0 +1,54 @@ +use anyhow::Result; +use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput}; +use iroh::discovery::pkarr::PkarrRelayClient; +use iroh::dns::node_info::NodeInfo; +use iroh::key::SecretKey; +use iroh_dns_server::config::Config; +use iroh_dns_server::server::Server; +use iroh_dns_server::store::ZoneStore; +use tokio::runtime::Runtime; + +const LOCALHOST_PKARR: &str = "http://localhost:8080/pkarr"; + +async fn start_dns_server(config: Config) -> Result { + let store = ZoneStore::persistent(Config::signed_packet_store_path()?)?; + Server::spawn(config, store).await +} + +fn benchmark_dns_server(c: &mut Criterion) { + let mut group = c.benchmark_group("dns_server_writes"); + for iters in [10 as u64, 100 as u64, 500 as u64].iter() { + group.throughput(Throughput::Elements(*iters)); + group.bench_with_input(BenchmarkId::from_parameter(iters), iters, |b, &iters| { + b.iter(|| { + let rt = Runtime::new().unwrap(); + rt.block_on(async move { + let config = Config::load("./config.dev.toml").await.unwrap(); + let server = start_dns_server(config).await.unwrap(); + + let secret_key = SecretKey::generate(); + let node_id = secret_key.public(); + + let pkarr_relay = LOCALHOST_PKARR.parse().expect("valid url"); + let relay_url = Some("http://localhost:8080".parse().unwrap()); + let pkarr = PkarrRelayClient::new(pkarr_relay); + let node_info = NodeInfo::new(node_id, relay_url, Default::default()); + let signed_packet = node_info.to_pkarr_signed_packet(&secret_key, 30).unwrap(); + + let start = std::time::Instant::now(); + for _ in 0..iters { + pkarr.publish(&signed_packet).await.unwrap(); + } + let duration = start.elapsed(); + + server.shutdown().await.unwrap(); + + duration + }) + }); + }); + } +} + +criterion_group!(benches, benchmark_dns_server); +criterion_main!(benches); diff --git a/iroh-dns-server/src/lib.rs b/iroh-dns-server/src/lib.rs index d95b68b2a0..36c21a9449 100644 --- a/iroh-dns-server/src/lib.rs +++ b/iroh-dns-server/src/lib.rs @@ -8,7 +8,7 @@ pub mod http; pub mod metrics; pub mod server; pub mod state; -mod store; +pub mod store; mod util; #[cfg(test)] From 058fc8fd056e5a768a9c62574c69fb00e0e0b34c Mon Sep 17 00:00:00 2001 From: Asmir Avdicevic Date: Mon, 2 Dec 2024 14:49:56 +0100 Subject: [PATCH 04/21] adjustments --- iroh-dns-server/benches/write.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/iroh-dns-server/benches/write.rs b/iroh-dns-server/benches/write.rs index e610f3f39b..d6e72b3835 100644 --- a/iroh-dns-server/benches/write.rs +++ b/iroh-dns-server/benches/write.rs @@ -17,7 +17,8 @@ async fn start_dns_server(config: Config) -> Result { fn benchmark_dns_server(c: &mut Criterion) { let mut group = c.benchmark_group("dns_server_writes"); - for iters in [10 as u64, 100 as u64, 500 as u64].iter() { + group.sample_size(10); + for iters in [10_u64, 100_u64, 250_u64, 1000_u64].iter() { group.throughput(Throughput::Elements(*iters)); group.bench_with_input(BenchmarkId::from_parameter(iters), iters, |b, &iters| { b.iter(|| { From 19f657a474558fada677c9ea34797c6cdd8f67f3 Mon Sep 17 00:00:00 2001 From: Asmir Avdicevic Date: Mon, 2 Dec 2024 15:02:24 +0100 Subject: [PATCH 05/21] fmt & cleanup exports --- iroh-dns-server/benches/write.rs | 8 ++------ iroh-dns-server/src/lib.rs | 5 ++++- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/iroh-dns-server/benches/write.rs b/iroh-dns-server/benches/write.rs index d6e72b3835..143a2b0917 100644 --- a/iroh-dns-server/benches/write.rs +++ b/iroh-dns-server/benches/write.rs @@ -1,11 +1,7 @@ use anyhow::Result; use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput}; -use iroh::discovery::pkarr::PkarrRelayClient; -use iroh::dns::node_info::NodeInfo; -use iroh::key::SecretKey; -use iroh_dns_server::config::Config; -use iroh_dns_server::server::Server; -use iroh_dns_server::store::ZoneStore; +use iroh::{discovery::pkarr::PkarrRelayClient, dns::node_info::NodeInfo, key::SecretKey}; +use iroh_dns_server::{config::Config, server::Server, ZoneStore}; use tokio::runtime::Runtime; const LOCALHOST_PKARR: &str = "http://localhost:8080/pkarr"; diff --git a/iroh-dns-server/src/lib.rs b/iroh-dns-server/src/lib.rs index 36c21a9449..8c18327e6e 100644 --- a/iroh-dns-server/src/lib.rs +++ b/iroh-dns-server/src/lib.rs @@ -8,9 +8,12 @@ pub mod http; pub mod metrics; pub mod server; pub mod state; -pub mod store; +mod store; mod util; +// Re-export to be able to construct your own dns-server +pub use store::ZoneStore; + #[cfg(test)] mod tests { use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr}; From 1839f5ce545275dd12b0fe899f9fcc28875dcc69 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Tue, 3 Dec 2024 11:15:45 +0200 Subject: [PATCH 06/21] Move db ops into an actor and implement write batching --- iroh-dns-server/src/store/signed_packets.rs | 205 ++++++++++++++------ 1 file changed, 147 insertions(+), 58 deletions(-) diff --git a/iroh-dns-server/src/store/signed_packets.rs b/iroh-dns-server/src/store/signed_packets.rs index 220af863e6..4afbad64ed 100644 --- a/iroh-dns-server/src/store/signed_packets.rs +++ b/iroh-dns-server/src/store/signed_packets.rs @@ -1,9 +1,11 @@ -use std::{path::Path, sync::Arc}; +use std::{path::Path, result, time::Duration}; use anyhow::{Context, Result}; use iroh_metrics::inc; use pkarr::SignedPacket; use redb::{backends::InMemoryBackend, Database, ReadableTable, TableDefinition}; +use tokio::sync::{mpsc, oneshot}; +use tokio_util::{sync::CancellationToken, task::AbortOnDropHandle}; use tracing::info; use crate::{metrics::Metrics, util::PublicKeyBytes}; @@ -14,7 +16,121 @@ const SIGNED_PACKETS_TABLE: TableDefinition<&SignedPacketsKey, &[u8]> = #[derive(Debug)] pub struct SignedPacketStore { - db: Arc, + send: mpsc::Sender, + cancel: CancellationToken, + _task: AbortOnDropHandle<()>, +} + +impl Drop for SignedPacketStore { + fn drop(&mut self) { + self.cancel.cancel(); + } +} + +enum Message { + Upsert { + packet: SignedPacket, + res: oneshot::Sender, + }, + Get { + key: PublicKeyBytes, + res: oneshot::Sender>, + }, + Remove { + key: PublicKeyBytes, + res: oneshot::Sender, + }, +} + +struct Actor { + db: Database, + recv: mpsc::Receiver, + cancel: CancellationToken, + max_batch_size: usize, + max_batch_time: Duration, +} + +impl Actor { + async fn run(self) { + match self.run0().await { + Ok(()) => {} + Err(e) => { + tracing::error!("packet store actor failed: {:?}", e); + } + } + } + + async fn run0(mut self) -> anyhow::Result<()> { + loop { + let transaction = self.db.begin_write()?; + let mut tables = Tables::new(&transaction)?; + let timeout = tokio::time::sleep(self.max_batch_time); + tokio::pin!(timeout); + loop { + for _ in 0..self.max_batch_size { + tokio::select! { + _ = self.cancel.cancelled() => { + drop(tables); + transaction.commit()?; + return Ok(()); + } + _ = &mut timeout => break, + Some(msg) = self.recv.recv() => { + match msg { + Message::Get { key, res } => { + let packet = get_packet(&tables.signed_packets, &key)?; + res.send(packet).ok(); + } + Message::Upsert { packet, res } => { + let key = PublicKeyBytes::from_signed_packet(&packet); + let mut replaced = false; + if let Some(existing) = get_packet(&tables.signed_packets, &key)? { + if existing.more_recent_than(&packet) { + res.send(false).ok(); + continue; + } else { + replaced = true; + } + } + let value = packet.as_bytes(); + tables.signed_packets.insert(key.as_bytes(), &value[..])?; + if replaced { + inc!(Metrics, store_packets_updated); + } else { + inc!(Metrics, store_packets_inserted); + } + res.send(true).ok(); + } + Message::Remove { key, res } => { + let updated = + tables.signed_packets.remove(key.as_bytes())?.is_some() + ; + if updated { + inc!(Metrics, store_packets_removed); + } + res.send(updated).ok(); + } + } + } + } + } + } + } + } +} + +/// A struct similar to [`redb::Table`] but for all tables that make up the +/// signed packet store. +pub(super) struct Tables<'a> { + pub signed_packets: redb::Table<'a, &'static SignedPacketsKey, &'static [u8]>, +} + +impl<'txn> Tables<'txn> { + pub fn new(tx: &'txn redb::WriteTransaction) -> result::Result { + Ok(Self { + signed_packets: tx.open_table(SIGNED_PACKETS_TABLE)?, + }) + } } impl SignedPacketStore { @@ -42,73 +158,46 @@ impl SignedPacketStore { } pub fn open(db: Database) -> Result { + // create tables let write_tx = db.begin_write()?; - { - let _table = write_tx.open_table(SIGNED_PACKETS_TABLE)?; - } + let _ = Tables::new(&write_tx)?; write_tx.commit()?; - Ok(Self { db: Arc::new(db) }) + let (send, recv) = mpsc::channel(1024); + let cancel = CancellationToken::new(); + let cancel2 = cancel.clone(); + let actor = Actor { + db, + recv, + cancel: cancel2, + max_batch_size: 1024, + max_batch_time: Duration::from_secs(1), + }; + let task = tokio::spawn(async move { actor.run().await }); + Ok(Self { + send, + cancel, + _task: AbortOnDropHandle::new(task), + }) } pub async fn upsert(&self, packet: SignedPacket) -> Result { - let key = PublicKeyBytes::from_signed_packet(&packet); - let db = self.db.clone(); - tokio::task::spawn_blocking(move || { - let tx = db.begin_write()?; - let mut replaced = false; - { - let mut table = tx.open_table(SIGNED_PACKETS_TABLE)?; - if let Some(existing) = get_packet(&table, &key)? { - if existing.more_recent_than(&packet) { - return Ok(false); - } else { - replaced = true; - } - } - let value = packet.as_bytes(); - table.insert(key.as_bytes(), &value[..])?; - } - tx.commit()?; - if replaced { - inc!(Metrics, store_packets_updated); - } else { - inc!(Metrics, store_packets_inserted); - } - Ok(true) - }) - .await? + let (tx, rx) = oneshot::channel(); + self.send.send(Message::Upsert { packet, res: tx }).await?; + Ok(rx.await?) } pub async fn get(&self, key: &PublicKeyBytes) -> Result> { - let db = self.db.clone(); - let key = *key; - let res = tokio::task::spawn_blocking(move || { - let tx = db.begin_read()?; - let table = tx.open_table(SIGNED_PACKETS_TABLE)?; - get_packet(&table, &key) - }) - .await??; - Ok(res) + let (tx, rx) = oneshot::channel(); + self.send.send(Message::Get { key: *key, res: tx }).await?; + Ok(rx.await?) } pub async fn remove(&self, key: &PublicKeyBytes) -> Result { - let db = self.db.clone(); - let key = *key; - tokio::task::spawn_blocking(move || { - let tx = db.begin_write()?; - let updated = { - let mut table = tx.open_table(SIGNED_PACKETS_TABLE)?; - let did_remove = table.remove(key.as_bytes())?.is_some(); - #[allow(clippy::let_and_return)] - did_remove - }; - tx.commit()?; - if updated { - inc!(Metrics, store_packets_removed) - } - Ok(updated) - }) - .await? + let (tx, rx) = oneshot::channel(); + self.send + .send(Message::Remove { key: *key, res: tx }) + .await?; + Ok(rx.await?) } } From cf00d2b5e17d0d8a9d6612fd0b1ef589273fd244 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Tue, 3 Dec 2024 11:17:49 +0200 Subject: [PATCH 07/21] increase batch size --- iroh-dns-server/src/store/signed_packets.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iroh-dns-server/src/store/signed_packets.rs b/iroh-dns-server/src/store/signed_packets.rs index 4afbad64ed..3970590117 100644 --- a/iroh-dns-server/src/store/signed_packets.rs +++ b/iroh-dns-server/src/store/signed_packets.rs @@ -169,7 +169,7 @@ impl SignedPacketStore { db, recv, cancel: cancel2, - max_batch_size: 1024, + max_batch_size: 1024 * 64, max_batch_time: Duration::from_secs(1), }; let task = tokio::spawn(async move { actor.run().await }); From 478a2affc1d2016fbdea1182ac9e23ed2da51d27 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Tue, 3 Dec 2024 12:02:40 +0200 Subject: [PATCH 08/21] donate a thread to the tokio runtime so we can safely block Also fix some stupid bugs --- iroh-dns-server/src/store/signed_packets.rs | 104 +++++++++++--------- 1 file changed, 58 insertions(+), 46 deletions(-) diff --git a/iroh-dns-server/src/store/signed_packets.rs b/iroh-dns-server/src/store/signed_packets.rs index 3970590117..868bd8e7f5 100644 --- a/iroh-dns-server/src/store/signed_packets.rs +++ b/iroh-dns-server/src/store/signed_packets.rs @@ -5,7 +5,7 @@ use iroh_metrics::inc; use pkarr::SignedPacket; use redb::{backends::InMemoryBackend, Database, ReadableTable, TableDefinition}; use tokio::sync::{mpsc, oneshot}; -use tokio_util::{sync::CancellationToken, task::AbortOnDropHandle}; +use tokio_util::sync::CancellationToken; use tracing::info; use crate::{metrics::Metrics, util::PublicKeyBytes}; @@ -18,12 +18,18 @@ const SIGNED_PACKETS_TABLE: TableDefinition<&SignedPacketsKey, &[u8]> = pub struct SignedPacketStore { send: mpsc::Sender, cancel: CancellationToken, - _task: AbortOnDropHandle<()>, + thread: Option>, } impl Drop for SignedPacketStore { fn drop(&mut self) { + // cancel the actor self.cancel.cancel(); + // join the thread. This is important so that Drop implementations that + // are called from the actor thread can complete before we return. + if let Some(thread) = self.thread.take() { + let _ = thread.join(); + } } } @@ -51,70 +57,71 @@ struct Actor { } impl Actor { - async fn run(self) { + async fn run(mut self) { match self.run0().await { Ok(()) => {} Err(e) => { + self.cancel.cancel(); tracing::error!("packet store actor failed: {:?}", e); } } } - async fn run0(mut self) -> anyhow::Result<()> { + async fn run0(&mut self) -> anyhow::Result<()> { loop { let transaction = self.db.begin_write()?; let mut tables = Tables::new(&transaction)?; let timeout = tokio::time::sleep(self.max_batch_time); tokio::pin!(timeout); - loop { - for _ in 0..self.max_batch_size { - tokio::select! { - _ = self.cancel.cancelled() => { - drop(tables); - transaction.commit()?; - return Ok(()); - } - _ = &mut timeout => break, - Some(msg) = self.recv.recv() => { - match msg { - Message::Get { key, res } => { - let packet = get_packet(&tables.signed_packets, &key)?; - res.send(packet).ok(); - } - Message::Upsert { packet, res } => { - let key = PublicKeyBytes::from_signed_packet(&packet); - let mut replaced = false; - if let Some(existing) = get_packet(&tables.signed_packets, &key)? { - if existing.more_recent_than(&packet) { - res.send(false).ok(); - continue; - } else { - replaced = true; - } - } - let value = packet.as_bytes(); - tables.signed_packets.insert(key.as_bytes(), &value[..])?; - if replaced { - inc!(Metrics, store_packets_updated); + for _ in 0..self.max_batch_size { + tokio::select! { + _ = self.cancel.cancelled() => { + drop(tables); + transaction.commit()?; + return Ok(()); + } + _ = &mut timeout => break, + Some(msg) = self.recv.recv() => { + match msg { + Message::Get { key, res } => { + let packet = get_packet(&tables.signed_packets, &key)?; + res.send(packet).ok(); + } + Message::Upsert { packet, res } => { + let key = PublicKeyBytes::from_signed_packet(&packet); + let mut replaced = false; + if let Some(existing) = get_packet(&tables.signed_packets, &key)? { + if existing.more_recent_than(&packet) { + res.send(false).ok(); + continue; } else { - inc!(Metrics, store_packets_inserted); + replaced = true; } - res.send(true).ok(); } - Message::Remove { key, res } => { - let updated = - tables.signed_packets.remove(key.as_bytes())?.is_some() - ; - if updated { - inc!(Metrics, store_packets_removed); - } - res.send(updated).ok(); + let value = packet.as_bytes(); + tables.signed_packets.insert(key.as_bytes(), &value[..])?; + if replaced { + inc!(Metrics, store_packets_updated); + } else { + inc!(Metrics, store_packets_inserted); + } + res.send(true).ok(); + } + Message::Remove { key, res } => { + let updated = + tables.signed_packets.remove(key.as_bytes())?.is_some() + ; + if updated { + inc!(Metrics, store_packets_removed); } + res.send(updated).ok(); } } } } } + drop(tables); + transaction.commit()?; } } } @@ -172,11 +179,16 @@ impl SignedPacketStore { max_batch_size: 1024 * 64, max_batch_time: Duration::from_secs(1), }; - let task = tokio::spawn(async move { actor.run().await }); + let handle = tokio::runtime::Handle::try_current()?; + // start an io thread and donate it to the tokio runtime so we can do blocking IO + // inside the thread despite being in a tokio runtime + let thread = std::thread::spawn(move || { + handle.block_on(actor.run()); + }); Ok(Self { send, cancel, - _task: AbortOnDropHandle::new(task), + thread: Some(thread), }) } From c0e0ede8dc610b06516abb72ef25ace4644bd238 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Tue, 3 Dec 2024 12:27:52 +0200 Subject: [PATCH 09/21] Name the thread --- iroh-dns-server/src/store/signed_packets.rs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/iroh-dns-server/src/store/signed_packets.rs b/iroh-dns-server/src/store/signed_packets.rs index 868bd8e7f5..2a3e10fda3 100644 --- a/iroh-dns-server/src/store/signed_packets.rs +++ b/iroh-dns-server/src/store/signed_packets.rs @@ -179,12 +179,14 @@ impl SignedPacketStore { max_batch_size: 1024 * 64, max_batch_time: Duration::from_secs(1), }; - let handle = tokio::runtime::Handle::try_current()?; // start an io thread and donate it to the tokio runtime so we can do blocking IO // inside the thread despite being in a tokio runtime - let thread = std::thread::spawn(move || { - handle.block_on(actor.run()); - }); + let handle = tokio::runtime::Handle::try_current()?; + let thread = std::thread::Builder::new() + .name("packet-store-actor".into()) + .spawn(move || { + handle.block_on(actor.run()); + })?; Ok(Self { send, cancel, From 405f80ef5a94b7115e81098529c7ed63353839a1 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Tue, 3 Dec 2024 12:30:12 +0200 Subject: [PATCH 10/21] Add constants for max batch time and size --- iroh-dns-server/src/store/signed_packets.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/iroh-dns-server/src/store/signed_packets.rs b/iroh-dns-server/src/store/signed_packets.rs index 2a3e10fda3..63c5f66c7e 100644 --- a/iroh-dns-server/src/store/signed_packets.rs +++ b/iroh-dns-server/src/store/signed_packets.rs @@ -13,6 +13,8 @@ use crate::{metrics::Metrics, util::PublicKeyBytes}; pub type SignedPacketsKey = [u8; 32]; const SIGNED_PACKETS_TABLE: TableDefinition<&SignedPacketsKey, &[u8]> = TableDefinition::new("signed-packets-1"); +const MAX_BATCH_SIZE: usize = 1024 * 64; +const MAX_BATCH_TIME: Duration = Duration::from_secs(1); #[derive(Debug)] pub struct SignedPacketStore { @@ -176,8 +178,8 @@ impl SignedPacketStore { db, recv, cancel: cancel2, - max_batch_size: 1024 * 64, - max_batch_time: Duration::from_secs(1), + max_batch_size: MAX_BATCH_SIZE, + max_batch_time: MAX_BATCH_TIME, }; // start an io thread and donate it to the tokio runtime so we can do blocking IO // inside the thread despite being in a tokio runtime From 3864b3389d334bc8fd376e7b5f34ef42d3840636 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Tue, 3 Dec 2024 14:26:14 +0200 Subject: [PATCH 11/21] Add eviction --- iroh-dns-server/src/store/signed_packets.rs | 73 +++++++++++++++++++-- 1 file changed, 68 insertions(+), 5 deletions(-) diff --git a/iroh-dns-server/src/store/signed_packets.rs b/iroh-dns-server/src/store/signed_packets.rs index 63c5f66c7e..2c8eb54b22 100644 --- a/iroh-dns-server/src/store/signed_packets.rs +++ b/iroh-dns-server/src/store/signed_packets.rs @@ -1,8 +1,9 @@ use std::{path::Path, result, time::Duration}; use anyhow::{Context, Result}; +use bytes::Bytes; use iroh_metrics::inc; -use pkarr::SignedPacket; +use pkarr::{system_time, SignedPacket}; use redb::{backends::InMemoryBackend, Database, ReadableTable, TableDefinition}; use tokio::sync::{mpsc, oneshot}; use tokio_util::sync::CancellationToken; @@ -48,16 +49,35 @@ enum Message { key: PublicKeyBytes, res: oneshot::Sender, }, + Snapshot { + res: oneshot::Sender, + }, + CheckExpired { + key: Bytes, + }, } struct Actor { db: Database, recv: mpsc::Receiver, cancel: CancellationToken, + options: Options, +} + +pub struct Options { max_batch_size: usize, max_batch_time: Duration, } +impl Default for Options { + fn default() -> Self { + Self { + max_batch_size: MAX_BATCH_SIZE, + max_batch_time: MAX_BATCH_TIME, + } + } +} + impl Actor { async fn run(mut self) { match self.run0().await { @@ -73,9 +93,9 @@ impl Actor { loop { let transaction = self.db.begin_write()?; let mut tables = Tables::new(&transaction)?; - let timeout = tokio::time::sleep(self.max_batch_time); + let timeout = tokio::time::sleep(self.options.max_batch_time); tokio::pin!(timeout); - for _ in 0..self.max_batch_size { + for _ in 0..self.options.max_batch_size { tokio::select! { _ = self.cancel.cancelled() => { drop(tables); @@ -118,6 +138,12 @@ impl Actor { } res.send(updated).ok(); } + Message::Snapshot { res } => { + res.send(Snapshot::new(&self.db)?).ok(); + } + Message::CheckExpired { key } => { + todo!() + } } } } @@ -142,6 +168,19 @@ impl<'txn> Tables<'txn> { } } +pub(super) struct Snapshot { + pub signed_packets: redb::ReadOnlyTable<&'static SignedPacketsKey, &'static [u8]>, +} + +impl Snapshot { + pub fn new(db: &Database) -> Result { + let tx = db.begin_read()?; + Ok(Self { + signed_packets: tx.open_table(SIGNED_PACKETS_TABLE)?, + }) + } +} + impl SignedPacketStore { pub fn persistent(path: impl AsRef) -> Result { let path = path.as_ref(); @@ -178,8 +217,7 @@ impl SignedPacketStore { db, recv, cancel: cancel2, - max_batch_size: MAX_BATCH_SIZE, - max_batch_time: MAX_BATCH_TIME, + options: Default::default(), }; // start an io thread and donate it to the tokio runtime so we can do blocking IO // inside the thread despite being in a tokio runtime @@ -227,3 +265,28 @@ fn get_packet( let packet = SignedPacket::from_bytes(&row.value().to_vec().into())?; Ok(Some(packet)) } + +async fn evict_task(send: mpsc::Sender, expiry: Duration) -> anyhow::Result<()> { + let expiry_ms = expiry.as_micros() as u64; + loop { + let (tx, rx) = oneshot::channel(); + let _ = send.send(Message::Snapshot { res: tx }).await.ok(); + let Ok(snapshot) = rx.await else { + break; + }; + let expired = system_time() - expiry_ms; + for item in snapshot.signed_packets.iter()? { + let (key, value) = item?; + let value = Bytes::copy_from_slice(value.value()); + let packet = SignedPacket::from_bytes(&value)?; + if packet.timestamp() < expired { + let _ = send + .send(Message::CheckExpired { + key: Bytes::copy_from_slice(key.value()), + }) + .await?; + } + } + } + Ok(()) +} From 136da0c1cff752f85d6165f718d091cbb4dd52af Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Tue, 3 Dec 2024 15:39:04 +0200 Subject: [PATCH 12/21] WIP add eviction --- iroh-dns-server/src/lib.rs | 8 ++- iroh-dns-server/src/store/signed_packets.rs | 76 +++++++++++++++++---- 2 files changed, 69 insertions(+), 15 deletions(-) diff --git a/iroh-dns-server/src/lib.rs b/iroh-dns-server/src/lib.rs index 8c18327e6e..75eded5875 100644 --- a/iroh-dns-server/src/lib.rs +++ b/iroh-dns-server/src/lib.rs @@ -16,7 +16,10 @@ pub use store::ZoneStore; #[cfg(test)] mod tests { - use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr}; + use std::{ + net::{Ipv4Addr, Ipv6Addr, SocketAddr}, + time::Duration, + }; use anyhow::Result; use hickory_resolver::{ @@ -174,6 +177,9 @@ mod tests { assert_eq!(res.node_id, node_id); assert_eq!(res.info.relay_url.map(Url::from), Some(relay_url)); + println!("Sleeping for 1 hour"); + tokio::time::sleep(Duration::from_secs(3600)).await; + server.shutdown().await?; Ok(()) } diff --git a/iroh-dns-server/src/store/signed_packets.rs b/iroh-dns-server/src/store/signed_packets.rs index 2c8eb54b22..c875893786 100644 --- a/iroh-dns-server/src/store/signed_packets.rs +++ b/iroh-dns-server/src/store/signed_packets.rs @@ -7,7 +7,7 @@ use pkarr::{system_time, SignedPacket}; use redb::{backends::InMemoryBackend, Database, ReadableTable, TableDefinition}; use tokio::sync::{mpsc, oneshot}; use tokio_util::sync::CancellationToken; -use tracing::info; +use tracing::{error, info}; use crate::{metrics::Metrics, util::PublicKeyBytes}; @@ -21,16 +21,21 @@ const MAX_BATCH_TIME: Duration = Duration::from_secs(1); pub struct SignedPacketStore { send: mpsc::Sender, cancel: CancellationToken, - thread: Option>, + write_thread: Option>, + evict_thread: Option>, } impl Drop for SignedPacketStore { fn drop(&mut self) { + println!("Dropping SignedPacketStore"); // cancel the actor self.cancel.cancel(); // join the thread. This is important so that Drop implementations that // are called from the actor thread can complete before we return. - if let Some(thread) = self.thread.take() { + if let Some(thread) = self.write_thread.take() { + let _ = thread.join(); + } + if let Some(thread) = self.evict_thread.take() { let _ = thread.join(); } } @@ -53,7 +58,7 @@ enum Message { res: oneshot::Sender, }, CheckExpired { - key: Bytes, + key: PublicKeyBytes, }, } @@ -64,9 +69,16 @@ struct Actor { options: Options, } +#[derive(Debug, Clone, Copy)] pub struct Options { + /// Maximum number of packets to process in a single write transaction. max_batch_size: usize, + /// Maximum time to keep a write transaction open. max_batch_time: Duration, + /// Time to keep packets in the store before eviction. + eviction: Duration, + /// Pause between eviction checks. + eviction_interval: Duration, } impl Default for Options { @@ -74,6 +86,8 @@ impl Default for Options { Self { max_batch_size: MAX_BATCH_SIZE, max_batch_time: MAX_BATCH_TIME, + eviction: Duration::from_secs(10), + eviction_interval: Duration::from_secs(10), } } } @@ -90,10 +104,12 @@ impl Actor { } async fn run0(&mut self) -> anyhow::Result<()> { + let expired_us = Duration::from_secs(10).as_micros() as u64; loop { let transaction = self.db.begin_write()?; let mut tables = Tables::new(&transaction)?; let timeout = tokio::time::sleep(self.options.max_batch_time); + let expired = system_time() - expired_us; tokio::pin!(timeout); for _ in 0..self.options.max_batch_size { tokio::select! { @@ -142,7 +158,13 @@ impl Actor { res.send(Snapshot::new(&self.db)?).ok(); } Message::CheckExpired { key } => { - todo!() + if let Some(packet) = get_packet(&tables.signed_packets, &key)? { + if packet.timestamp() < expired { + println!("Removing expired packet"); + let _ = tables.signed_packets.remove(key.as_bytes())?; + // inc!(Metrics, store_packets_expired); + } + } } } } @@ -211,26 +233,36 @@ impl SignedPacketStore { let _ = Tables::new(&write_tx)?; write_tx.commit()?; let (send, recv) = mpsc::channel(1024); + let send2 = send.clone(); let cancel = CancellationToken::new(); let cancel2 = cancel.clone(); + let cancel3 = cancel.clone(); + let options = Default::default(); let actor = Actor { db, recv, cancel: cancel2, - options: Default::default(), + options, }; // start an io thread and donate it to the tokio runtime so we can do blocking IO // inside the thread despite being in a tokio runtime let handle = tokio::runtime::Handle::try_current()?; - let thread = std::thread::Builder::new() + let write_thread = std::thread::Builder::new() .name("packet-store-actor".into()) .spawn(move || { handle.block_on(actor.run()); })?; + let handle = tokio::runtime::Handle::try_current()?; + let evict_thread = std::thread::Builder::new() + .name("packet-store-evict".into()) + .spawn(move || { + handle.block_on(evict_task(send2, options, cancel3)); + })?; Ok(Self { send, cancel, - thread: Some(thread), + write_thread: Some(write_thread), + evict_thread: Some(evict_thread), }) } @@ -266,27 +298,43 @@ fn get_packet( Ok(Some(packet)) } -async fn evict_task(send: mpsc::Sender, expiry: Duration) -> anyhow::Result<()> { - let expiry_ms = expiry.as_micros() as u64; +async fn evict_task(send: mpsc::Sender, options: Options, cancel: CancellationToken) { + let cancel2 = cancel.clone(); + let _ = cancel2 + .run_until_cancelled(async move { + info!("starting evict task"); + if let Err(cause) = evict_task_inner(send, options).await { + error!("evict task failed: {:?}", cause); + } + // when we are done for whatever reason we want to shut down the actor + cancel.cancel(); + }) + .await; +} + +/// Periodically check for expired packets and remove them. +async fn evict_task_inner(send: mpsc::Sender, options: Options) -> anyhow::Result<()> { + let expiry_ms = options.eviction.as_micros() as u64; loop { let (tx, rx) = oneshot::channel(); let _ = send.send(Message::Snapshot { res: tx }).await.ok(); let Ok(snapshot) = rx.await else { - break; + anyhow::bail!("failed to get snapshot"); }; let expired = system_time() - expiry_ms; for item in snapshot.signed_packets.iter()? { - let (key, value) = item?; + let (_, value) = item?; let value = Bytes::copy_from_slice(value.value()); let packet = SignedPacket::from_bytes(&value)?; if packet.timestamp() < expired { let _ = send .send(Message::CheckExpired { - key: Bytes::copy_from_slice(key.value()), + key: PublicKeyBytes::from_signed_packet(&packet), }) .await?; } } + // sleep for the eviction interval so we don't constantly check + tokio::time::sleep(options.eviction_interval).await; } - Ok(()) } From 02f8f2a24eb2ff3d580d0202e5029c2f42ab303b Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Tue, 3 Dec 2024 17:20:19 +0200 Subject: [PATCH 13/21] Wire up eviction, including configuration --- Cargo.lock | 18 ++++ iroh-dns-server/Cargo.toml | 2 + iroh-dns-server/benches/write.rs | 2 +- iroh-dns-server/src/config.rs | 53 +++++++++ iroh-dns-server/src/lib.rs | 44 +++++++- iroh-dns-server/src/metrics.rs | 2 + iroh-dns-server/src/server.rs | 13 ++- iroh-dns-server/src/store.rs | 9 +- iroh-dns-server/src/store/signed_packets.rs | 113 ++++++++++++-------- 9 files changed, 196 insertions(+), 60 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b2ceb7607d..5f89c790ce 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2039,6 +2039,22 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "humantime" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" + +[[package]] +name = "humantime-serde" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57a3db5ea5923d99402c94e9feb261dc5ee9b4efa158b0315f788cf549cc200c" +dependencies = [ + "humantime", + "serde", +] + [[package]] name = "hyper" version = "1.5.1" @@ -2537,6 +2553,7 @@ dependencies = [ "hickory-resolver", "hickory-server", "http 1.1.0", + "humantime-serde", "iroh", "iroh-metrics", "iroh-test 0.28.0", @@ -2551,6 +2568,7 @@ dependencies = [ "serde", "struct_iterable", "strum", + "testresult", "tokio", "tokio-rustls", "tokio-rustls-acme", diff --git a/iroh-dns-server/Cargo.toml b/iroh-dns-server/Cargo.toml index c8b5fe9191..58f259d549 100644 --- a/iroh-dns-server/Cargo.toml +++ b/iroh-dns-server/Cargo.toml @@ -29,6 +29,7 @@ governor = "0.6.3" #needs new release of tower_governor for 0.7.0 hickory-proto = "=0.25.0-alpha.2" hickory-server = { version = "=0.25.0-alpha.2", features = ["dns-over-rustls"] } http = "1.0.0" +humantime-serde = "1.1.1" iroh-metrics = { version = "0.28.0", path = "../iroh-metrics" } lru = "0.12.3" parking_lot = "0.12.1" @@ -64,6 +65,7 @@ hickory-resolver = "=0.25.0-alpha.2" iroh = { version = "0.28.0", path = "../iroh" } iroh-test = { version = "0.28.0", path = "../iroh-test" } pkarr = { version = "2.2.0", features = ["rand"] } +testresult = "0.4.1" [[bench]] name = "write" diff --git a/iroh-dns-server/benches/write.rs b/iroh-dns-server/benches/write.rs index 143a2b0917..52924672f3 100644 --- a/iroh-dns-server/benches/write.rs +++ b/iroh-dns-server/benches/write.rs @@ -7,7 +7,7 @@ use tokio::runtime::Runtime; const LOCALHOST_PKARR: &str = "http://localhost:8080/pkarr"; async fn start_dns_server(config: Config) -> Result { - let store = ZoneStore::persistent(Config::signed_packet_store_path()?)?; + let store = ZoneStore::persistent(Config::signed_packet_store_path()?, Default::default())?; Server::spawn(config, store).await } diff --git a/iroh-dns-server/src/config.rs b/iroh-dns-server/src/config.rs index 732d65e4e8..ba8409d24a 100644 --- a/iroh-dns-server/src/config.rs +++ b/iroh-dns-server/src/config.rs @@ -4,6 +4,7 @@ use std::{ env, net::{IpAddr, Ipv4Addr, SocketAddr}, path::{Path, PathBuf}, + time::Duration, }; use anyhow::{anyhow, Context, Result}; @@ -13,6 +14,7 @@ use tracing::info; use crate::{ dns::DnsConfig, http::{CertMode, HttpConfig, HttpsConfig, RateLimitConfig}, + store::ZoneStoreOptions, }; const DEFAULT_METRICS_ADDR: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 9117); @@ -44,11 +46,61 @@ pub struct Config { /// Config for the mainline lookup. pub mainline: Option, + /// Config for the zone store. + pub zone_store: Option, + /// Config for pkarr rate limit #[serde(default)] pub pkarr_put_rate_limit: RateLimitConfig, } +/// The config for the store. +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct StoreConfig { + /// Maximum number of packets to process in a single write transaction. + max_batch_size: usize, + + /// Maximum time to keep a write transaction open. + #[serde(with = "humantime_serde")] + max_batch_time: Duration, + + /// Time to keep packets in the store before eviction. + #[serde(with = "humantime_serde")] + eviction: Duration, + + /// Pause between eviction checks. + #[serde(with = "humantime_serde")] + eviction_interval: Duration, +} + +impl Default for StoreConfig { + fn default() -> Self { + ZoneStoreOptions::default().into() + } +} + +impl From for StoreConfig { + fn from(value: ZoneStoreOptions) -> Self { + Self { + max_batch_size: value.max_batch_size, + max_batch_time: value.max_batch_time, + eviction: value.eviction, + eviction_interval: value.eviction_interval, + } + } +} + +impl From for ZoneStoreOptions { + fn from(value: StoreConfig) -> Self { + Self { + max_batch_size: value.max_batch_size, + max_batch_time: value.max_batch_time, + eviction: value.eviction, + eviction_interval: value.eviction_interval, + } + } +} + /// The config for the metrics server. #[derive(Debug, Serialize, Deserialize)] pub struct MetricsConfig { @@ -187,6 +239,7 @@ impl Default for Config { rr_aaaa: None, rr_ns: Some("ns1.irohdns.example.".to_string()), }, + zone_store: None, metrics: None, mainline: None, pkarr_put_rate_limit: RateLimitConfig::default(), diff --git a/iroh-dns-server/src/lib.rs b/iroh-dns-server/src/lib.rs index 75eded5875..6afeb939bd 100644 --- a/iroh-dns-server/src/lib.rs +++ b/iroh-dns-server/src/lib.rs @@ -32,9 +32,16 @@ mod tests { key::SecretKey, }; use pkarr::{PkarrClient, SignedPacket}; + use testresult::TestResult; use url::Url; - use crate::{config::BootstrapOption, server::Server}; + use crate::{ + config::BootstrapOption, + server::Server, + store::{PacketSource, ZoneStoreOptions}, + util::PublicKeyBytes, + ZoneStore, + }; #[tokio::test] async fn pkarr_publish_dns_resolve() -> Result<()> { @@ -177,13 +184,39 @@ mod tests { assert_eq!(res.node_id, node_id); assert_eq!(res.info.relay_url.map(Url::from), Some(relay_url)); - println!("Sleeping for 1 hour"); - tokio::time::sleep(Duration::from_secs(3600)).await; - server.shutdown().await?; Ok(()) } + #[tokio::test] + async fn store_eviction() -> TestResult<()> { + iroh_test::logging::setup_multithreaded(); + let options = ZoneStoreOptions { + eviction: Duration::from_millis(100), + eviction_interval: Duration::from_millis(100), + max_batch_time: Duration::from_millis(100), + ..Default::default() + }; + let store = ZoneStore::in_memory(options)?; + + // create a signed packet + let secret_key = SecretKey::generate(); + let node_id = secret_key.public(); + let relay_url: Url = "https://relay.example.".parse()?; + let node_info = NodeInfo::new(node_id, Some(relay_url.clone()), Default::default()); + let signed_packet = node_info.to_pkarr_signed_packet(&secret_key, 30)?; + let key = PublicKeyBytes::from_signed_packet(&signed_packet); + + store + .insert(signed_packet, PacketSource::PkarrPublish) + .await?; + + tokio::time::sleep(Duration::from_secs(1)).await; + let entry = store.get_signed_packet(&key).await?; + assert!(entry.is_none()); + Ok(()) + } + #[tokio::test] async fn integration_mainline() -> Result<()> { iroh_test::logging::setup_multithreaded(); @@ -194,7 +227,8 @@ mod tests { // spawn our server with mainline support let (server, nameserver, _http_url) = - Server::spawn_for_tests_with_mainline(Some(BootstrapOption::Custom(bootstrap))).await?; + Server::spawn_for_tests_with_options(Some(BootstrapOption::Custom(bootstrap)), None) + .await?; let origin = "irohdns.example."; diff --git a/iroh-dns-server/src/metrics.rs b/iroh-dns-server/src/metrics.rs index 82d715dd5a..1e0cc21088 100644 --- a/iroh-dns-server/src/metrics.rs +++ b/iroh-dns-server/src/metrics.rs @@ -22,6 +22,7 @@ pub struct Metrics { pub store_packets_inserted: Counter, pub store_packets_removed: Counter, pub store_packets_updated: Counter, + pub store_packets_expired: Counter, } impl Default for Metrics { @@ -44,6 +45,7 @@ impl Default for Metrics { store_packets_inserted: Counter::new("Signed packets inserted into the store"), store_packets_removed: Counter::new("Signed packets removed from the store"), store_packets_updated: Counter::new("Number of updates to existing packets"), + store_packets_expired: Counter::new("Number of expired packets"), } } } diff --git a/iroh-dns-server/src/server.rs b/iroh-dns-server/src/server.rs index 865c5cecf8..e40e3de1fd 100644 --- a/iroh-dns-server/src/server.rs +++ b/iroh-dns-server/src/server.rs @@ -14,7 +14,11 @@ use crate::{ /// Spawn the server and run until the `Ctrl-C` signal is received, then shutdown. pub async fn run_with_config_until_ctrl_c(config: Config) -> Result<()> { - let mut store = ZoneStore::persistent(Config::signed_packet_store_path()?)?; + let zone_store_options = config.zone_store.clone().unwrap_or_default(); + let mut store = ZoneStore::persistent( + Config::signed_packet_store_path()?, + zone_store_options.into(), + )?; if let Some(bootstrap) = config.mainline_enabled() { info!("mainline fallback enabled"); store = store.with_mainline_fallback(bootstrap); @@ -96,14 +100,15 @@ impl Server { /// HTTP server. #[cfg(test)] pub async fn spawn_for_tests() -> Result<(Self, std::net::SocketAddr, url::Url)> { - Self::spawn_for_tests_with_mainline(None).await + Self::spawn_for_tests_with_options(None, None).await } /// Spawn a server suitable for testing, while optionally enabling mainline with custom /// bootstrap addresses. #[cfg(test)] - pub async fn spawn_for_tests_with_mainline( + pub async fn spawn_for_tests_with_options( mainline: Option, + options: Option, ) -> Result<(Self, std::net::SocketAddr, url::Url)> { use std::net::{IpAddr, Ipv4Addr}; @@ -117,7 +122,7 @@ impl Server { config.https = None; config.metrics = Some(MetricsConfig::disabled()); - let mut store = ZoneStore::in_memory()?; + let mut store = ZoneStore::in_memory(options.unwrap_or_default())?; if let Some(bootstrap) = mainline { info!("mainline fallback enabled"); store = store.with_mainline_fallback(bootstrap); diff --git a/iroh-dns-server/src/store.rs b/iroh-dns-server/src/store.rs index 89f3ca9f43..3286fe0132 100644 --- a/iroh-dns-server/src/store.rs +++ b/iroh-dns-server/src/store.rs @@ -19,6 +19,7 @@ use crate::{ }; mod signed_packets; +pub use signed_packets::Options as ZoneStoreOptions; /// Cache up to 1 million pkarr zones by default pub const DEFAULT_CACHE_CAPACITY: usize = 1024 * 1024; @@ -44,14 +45,14 @@ pub struct ZoneStore { impl ZoneStore { /// Create a persistent store - pub fn persistent(path: impl AsRef) -> Result { - let packet_store = SignedPacketStore::persistent(path)?; + pub fn persistent(path: impl AsRef, options: ZoneStoreOptions) -> Result { + let packet_store = SignedPacketStore::persistent(path, options)?; Ok(Self::new(packet_store)) } /// Create an in-memory store. - pub fn in_memory() -> Result { - let packet_store = SignedPacketStore::in_memory()?; + pub fn in_memory(options: ZoneStoreOptions) -> Result { + let packet_store = SignedPacketStore::in_memory(options)?; Ok(Self::new(packet_store)) } diff --git a/iroh-dns-server/src/store/signed_packets.rs b/iroh-dns-server/src/store/signed_packets.rs index c875893786..130b0cd2b5 100644 --- a/iroh-dns-server/src/store/signed_packets.rs +++ b/iroh-dns-server/src/store/signed_packets.rs @@ -1,4 +1,4 @@ -use std::{path::Path, result, time::Duration}; +use std::{future::Future, path::Path, result, time::Duration}; use anyhow::{Context, Result}; use bytes::Bytes; @@ -7,37 +7,27 @@ use pkarr::{system_time, SignedPacket}; use redb::{backends::InMemoryBackend, Database, ReadableTable, TableDefinition}; use tokio::sync::{mpsc, oneshot}; use tokio_util::sync::CancellationToken; -use tracing::{error, info}; +use tracing::{debug, error, info, trace}; use crate::{metrics::Metrics, util::PublicKeyBytes}; pub type SignedPacketsKey = [u8; 32]; const SIGNED_PACKETS_TABLE: TableDefinition<&SignedPacketsKey, &[u8]> = TableDefinition::new("signed-packets-1"); -const MAX_BATCH_SIZE: usize = 1024 * 64; -const MAX_BATCH_TIME: Duration = Duration::from_secs(1); #[derive(Debug)] pub struct SignedPacketStore { send: mpsc::Sender, cancel: CancellationToken, - write_thread: Option>, - evict_thread: Option>, + _write_thread: IoThread, + _evict_thread: IoThread, } impl Drop for SignedPacketStore { fn drop(&mut self) { - println!("Dropping SignedPacketStore"); // cancel the actor self.cancel.cancel(); - // join the thread. This is important so that Drop implementations that - // are called from the actor thread can complete before we return. - if let Some(thread) = self.write_thread.take() { - let _ = thread.join(); - } - if let Some(thread) = self.evict_thread.take() { - let _ = thread.join(); - } + // after cancellation, the two threads will be joined } } @@ -72,22 +62,22 @@ struct Actor { #[derive(Debug, Clone, Copy)] pub struct Options { /// Maximum number of packets to process in a single write transaction. - max_batch_size: usize, + pub max_batch_size: usize, /// Maximum time to keep a write transaction open. - max_batch_time: Duration, + pub max_batch_time: Duration, /// Time to keep packets in the store before eviction. - eviction: Duration, + pub eviction: Duration, /// Pause between eviction checks. - eviction_interval: Duration, + pub eviction_interval: Duration, } impl Default for Options { fn default() -> Self { Self { - max_batch_size: MAX_BATCH_SIZE, - max_batch_time: MAX_BATCH_TIME, - eviction: Duration::from_secs(10), - eviction_interval: Duration::from_secs(10), + max_batch_size: 1024 * 64, + max_batch_time: Duration::from_secs(1), + eviction: Duration::from_secs(3600 * 24 * 7), + eviction_interval: Duration::from_secs(3600 * 24), } } } @@ -104,7 +94,7 @@ impl Actor { } async fn run0(&mut self) -> anyhow::Result<()> { - let expired_us = Duration::from_secs(10).as_micros() as u64; + let expired_us = self.options.eviction.as_micros() as u64; loop { let transaction = self.db.begin_write()?; let mut tables = Tables::new(&transaction)?; @@ -160,9 +150,8 @@ impl Actor { Message::CheckExpired { key } => { if let Some(packet) = get_packet(&tables.signed_packets, &key)? { if packet.timestamp() < expired { - println!("Removing expired packet"); let _ = tables.signed_packets.remove(key.as_bytes())?; - // inc!(Metrics, store_packets_expired); + inc!(Metrics, store_packets_expired); } } } @@ -204,7 +193,7 @@ impl Snapshot { } impl SignedPacketStore { - pub fn persistent(path: impl AsRef) -> Result { + pub fn persistent(path: impl AsRef, options: Options) -> Result { let path = path.as_ref(); info!("loading packet database from {}", path.to_string_lossy()); if let Some(parent) = path.parent() { @@ -218,16 +207,16 @@ impl SignedPacketStore { let db = Database::builder() .create(path) .context("failed to open packet database")?; - Self::open(db) + Self::open(db, options) } - pub fn in_memory() -> Result { + pub fn in_memory(options: Options) -> Result { info!("using in-memory packet database"); let db = Database::builder().create_with_backend(InMemoryBackend::new())?; - Self::open(db) + Self::open(db, options) } - pub fn open(db: Database) -> Result { + pub fn open(db: Database, options: Options) -> Result { // create tables let write_tx = db.begin_write()?; let _ = Tables::new(&write_tx)?; @@ -237,7 +226,6 @@ impl SignedPacketStore { let cancel = CancellationToken::new(); let cancel2 = cancel.clone(); let cancel3 = cancel.clone(); - let options = Default::default(); let actor = Actor { db, recv, @@ -246,23 +234,15 @@ impl SignedPacketStore { }; // start an io thread and donate it to the tokio runtime so we can do blocking IO // inside the thread despite being in a tokio runtime - let handle = tokio::runtime::Handle::try_current()?; - let write_thread = std::thread::Builder::new() - .name("packet-store-actor".into()) - .spawn(move || { - handle.block_on(actor.run()); - })?; - let handle = tokio::runtime::Handle::try_current()?; - let evict_thread = std::thread::Builder::new() - .name("packet-store-evict".into()) - .spawn(move || { - handle.block_on(evict_task(send2, options, cancel3)); - })?; + let _write_thread = IoThread::new("packet-store-actor", move || actor.run())?; + let _evict_thread = IoThread::new("packet-store-evict", move || { + evict_task(send2, options, cancel3) + })?; Ok(Self { send, cancel, - write_thread: Some(write_thread), - evict_thread: Some(evict_thread), + _write_thread, + _evict_thread, }) } @@ -321,12 +301,14 @@ async fn evict_task_inner(send: mpsc::Sender, options: Options) -> anyh let Ok(snapshot) = rx.await else { anyhow::bail!("failed to get snapshot"); }; + debug!("got snapshot"); let expired = system_time() - expiry_ms; for item in snapshot.signed_packets.iter()? { let (_, value) = item?; let value = Bytes::copy_from_slice(value.value()); let packet = SignedPacket::from_bytes(&value)?; if packet.timestamp() < expired { + debug!("evicting expired packet {}", packet.public_key()); let _ = send .send(Message::CheckExpired { key: PublicKeyBytes::from_signed_packet(&packet), @@ -338,3 +320,42 @@ async fn evict_task_inner(send: mpsc::Sender, options: Options) -> anyh tokio::time::sleep(options.eviction_interval).await; } } + +/// An io thread that drives a future to completion on the current tokio runtime +/// +/// Inside the future, blocking IO can be done without blocking one of the tokio +/// pool threads. +#[derive(Debug)] +struct IoThread { + handle: Option>, +} + +impl IoThread { + /// Spawn a new io thread. + /// + /// Calling this function requires that the current thread is running in a + /// tokio runtime. It is up to the caller to make sure the future exits, + /// e.g. by using a cancellation token. Otherwise, drop will block. + fn new(name: &str, f: F) -> Result + where + F: FnOnce() -> Fut + Send + 'static, + Fut: Future, + { + let rt = tokio::runtime::Handle::try_current()?; + let handle = std::thread::Builder::new() + .name(name.into()) + .spawn(move || rt.block_on(f())) + .context("failed to spawn thread")?; + Ok(Self { + handle: Some(handle), + }) + } +} + +impl Drop for IoThread { + fn drop(&mut self) { + if let Some(handle) = self.handle.take() { + let _ = handle.join(); + } + } +} From 453dffbffc2d7fec11174239a8b04f0add7350ce Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Tue, 3 Dec 2024 19:01:38 +0200 Subject: [PATCH 14/21] clippy --- iroh-dns-server/src/store/signed_packets.rs | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/iroh-dns-server/src/store/signed_packets.rs b/iroh-dns-server/src/store/signed_packets.rs index 3608baa539..d8d827b7a2 100644 --- a/iroh-dns-server/src/store/signed_packets.rs +++ b/iroh-dns-server/src/store/signed_packets.rs @@ -309,11 +309,10 @@ async fn evict_task_inner(send: mpsc::Sender, options: Options) -> anyh let packet = SignedPacket::from_bytes(&value)?; if packet.timestamp() < expired { debug!("evicting expired packet {}", packet.public_key()); - let _ = send - .send(Message::CheckExpired { - key: PublicKeyBytes::from_signed_packet(&packet), - }) - .await?; + send.send(Message::CheckExpired { + key: PublicKeyBytes::from_signed_packet(&packet), + }) + .await?; } } // sleep for the eviction interval so we don't constantly check From 691f7586d32559a680966120ead928ab88d45b32 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Wed, 4 Dec 2024 16:53:13 +0200 Subject: [PATCH 15/21] Try out the other way to do eviction a second table that has time -> Set --- iroh-dns-server/src/store/signed_packets.rs | 47 ++++++++++++++------- iroh-dns-server/src/util.rs | 4 ++ 2 files changed, 35 insertions(+), 16 deletions(-) diff --git a/iroh-dns-server/src/store/signed_packets.rs b/iroh-dns-server/src/store/signed_packets.rs index d8d827b7a2..d96fe06f9f 100644 --- a/iroh-dns-server/src/store/signed_packets.rs +++ b/iroh-dns-server/src/store/signed_packets.rs @@ -1,10 +1,11 @@ use std::{future::Future, path::Path, result, time::Duration}; use anyhow::{Context, Result}; -use bytes::Bytes; use iroh_metrics::inc; use pkarr::{system_time, SignedPacket}; -use redb::{backends::InMemoryBackend, Database, ReadableTable, TableDefinition}; +use redb::{ + backends::InMemoryBackend, Database, MultimapTableDefinition, ReadableTable, TableDefinition, +}; use tokio::sync::{mpsc, oneshot}; use tokio_util::sync::CancellationToken; use tracing::{debug, error, info}; @@ -14,6 +15,8 @@ use crate::{metrics::Metrics, util::PublicKeyBytes}; pub type SignedPacketsKey = [u8; 32]; const SIGNED_PACKETS_TABLE: TableDefinition<&SignedPacketsKey, &[u8]> = TableDefinition::new("signed-packets-1"); +const UPDATE_TIME_TABLE: MultimapTableDefinition<[u8; 8], SignedPacketsKey> = + MultimapTableDefinition::new("update-time-1"); #[derive(Debug)] pub struct SignedPacketStore { @@ -48,6 +51,7 @@ enum Message { res: oneshot::Sender, }, CheckExpired { + time: [u8; 8], key: PublicKeyBytes, }, } @@ -123,11 +127,13 @@ impl Actor { res.send(false).ok(); continue; } else { + tables.update_time.remove(&packet.timestamp().to_be_bytes(), key.as_bytes())?; replaced = true; } } let value = packet.as_bytes(); tables.signed_packets.insert(key.as_bytes(), &value[..])?; + tables.update_time.insert(&packet.timestamp().to_be_bytes(), key.as_bytes())?; if replaced { inc!(Metrics, store_packets_updated); } else { @@ -136,9 +142,14 @@ impl Actor { res.send(true).ok(); } Message::Remove { key, res } => { - let updated = - tables.signed_packets.remove(key.as_bytes())?.is_some() - ; + let updated = if let Some(row) = tables.signed_packets.remove(key.as_bytes())? { + let packet = SignedPacket::from_bytes(&row.value().to_vec().into())?; + tables.update_time.remove(&packet.timestamp().to_be_bytes(), key.as_bytes())?; + inc!(Metrics, store_packets_removed); + true + } else { + false + }; if updated { inc!(Metrics, store_packets_removed); } @@ -147,7 +158,8 @@ impl Actor { Message::Snapshot { res } => { res.send(Snapshot::new(&self.db)?).ok(); } - Message::CheckExpired { key } => { + Message::CheckExpired { key, time } => { + tables.update_time.remove_all(&time)?; if let Some(packet) = get_packet(&tables.signed_packets, &key)? { if packet.timestamp() < expired { let _ = tables.signed_packets.remove(key.as_bytes())?; @@ -169,18 +181,22 @@ impl Actor { /// signed packet store. pub(super) struct Tables<'a> { pub signed_packets: redb::Table<'a, &'static SignedPacketsKey, &'static [u8]>, + pub update_time: redb::MultimapTable<'a, [u8; 8], SignedPacketsKey>, } impl<'txn> Tables<'txn> { pub fn new(tx: &'txn redb::WriteTransaction) -> result::Result { Ok(Self { signed_packets: tx.open_table(SIGNED_PACKETS_TABLE)?, + update_time: tx.open_multimap_table(UPDATE_TIME_TABLE)?, }) } } pub(super) struct Snapshot { + #[allow(dead_code)] pub signed_packets: redb::ReadOnlyTable<&'static SignedPacketsKey, &'static [u8]>, + pub update_time: redb::ReadOnlyMultimapTable<[u8; 8], SignedPacketsKey>, } impl Snapshot { @@ -188,6 +204,7 @@ impl Snapshot { let tx = db.begin_read()?; Ok(Self { signed_packets: tx.open_table(SIGNED_PACKETS_TABLE)?, + update_time: tx.open_multimap_table(UPDATE_TIME_TABLE)?, }) } } @@ -303,16 +320,14 @@ async fn evict_task_inner(send: mpsc::Sender, options: Options) -> anyh }; debug!("got snapshot"); let expired = system_time() - expiry_ms; - for item in snapshot.signed_packets.iter()? { - let (_, value) = item?; - let value = Bytes::copy_from_slice(value.value()); - let packet = SignedPacket::from_bytes(&value)?; - if packet.timestamp() < expired { - debug!("evicting expired packet {}", packet.public_key()); - send.send(Message::CheckExpired { - key: PublicKeyBytes::from_signed_packet(&packet), - }) - .await?; + for item in snapshot.update_time.range(..expired.to_be_bytes())? { + let (time, keys) = item?; + for item in keys { + let key = item?; + let time = time.value(); + let key = PublicKeyBytes::new(key.value()); + debug!("evicting expired packet {:?} {}", time, key); + send.send(Message::CheckExpired { time, key }).await?; } } // sleep for the eviction interval so we don't constantly check diff --git a/iroh-dns-server/src/util.rs b/iroh-dns-server/src/util.rs index 6fc28b4d1f..b395b91d06 100644 --- a/iroh-dns-server/src/util.rs +++ b/iroh-dns-server/src/util.rs @@ -22,6 +22,10 @@ use pkarr::SignedPacket; pub struct PublicKeyBytes([u8; 32]); impl PublicKeyBytes { + pub fn new(bytes: [u8; 32]) -> Self { + Self(bytes) + } + pub fn from_z32(s: &str) -> Result { let bytes = z32::decode(s.as_bytes())?; let bytes: [u8; 32] = bytes.try_into().map_err(|_| anyhow!("invalid length"))?; From 9d503d253b3aae7a6d20f01cbcf437b2f2647698 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Wed, 4 Dec 2024 17:32:05 +0200 Subject: [PATCH 16/21] give eviction more time --- iroh-dns-server/src/lib.rs | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/iroh-dns-server/src/lib.rs b/iroh-dns-server/src/lib.rs index 6afeb939bd..1c7bcc7492 100644 --- a/iroh-dns-server/src/lib.rs +++ b/iroh-dns-server/src/lib.rs @@ -212,9 +212,14 @@ mod tests { .await?; tokio::time::sleep(Duration::from_secs(1)).await; - let entry = store.get_signed_packet(&key).await?; - assert!(entry.is_none()); - Ok(()) + for _ in 0..10 { + let entry = store.get_signed_packet(&key).await?; + if entry.is_none() { + return Ok(()); + } + tokio::time::sleep(Duration::from_secs(1)).await; + } + panic!("store did not evict packet"); } #[tokio::test] From 7173d383ebf70b3849b1255c21f0819267d58452 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Thu, 5 Dec 2024 12:14:25 +0200 Subject: [PATCH 17/21] more loggging for the dns server actor and eviction thread --- iroh-dns-server/src/lib.rs | 14 ++-- iroh-dns-server/src/store/signed_packets.rs | 71 ++++++++++++++++----- 2 files changed, 63 insertions(+), 22 deletions(-) diff --git a/iroh-dns-server/src/lib.rs b/iroh-dns-server/src/lib.rs index 1c7bcc7492..e663a17a9e 100644 --- a/iroh-dns-server/src/lib.rs +++ b/iroh-dns-server/src/lib.rs @@ -200,11 +200,7 @@ mod tests { let store = ZoneStore::in_memory(options)?; // create a signed packet - let secret_key = SecretKey::generate(); - let node_id = secret_key.public(); - let relay_url: Url = "https://relay.example.".parse()?; - let node_info = NodeInfo::new(node_id, Some(relay_url.clone()), Default::default()); - let signed_packet = node_info.to_pkarr_signed_packet(&secret_key, 30)?; + let signed_packet = random_signed_packet()?; let key = PublicKeyBytes::from_signed_packet(&signed_packet); store @@ -273,4 +269,12 @@ mod tests { config.add_name_server(nameserver_config); AsyncResolver::tokio(config, Default::default()) } + + fn random_signed_packet() -> Result { + let secret_key = SecretKey::generate(); + let node_id = secret_key.public(); + let relay_url: Url = "https://relay.example.".parse()?; + let node_info = NodeInfo::new(node_id, Some(relay_url.clone()), Default::default()); + Ok(node_info.to_pkarr_signed_packet(&secret_key, 30)?) + } } diff --git a/iroh-dns-server/src/store/signed_packets.rs b/iroh-dns-server/src/store/signed_packets.rs index d96fe06f9f..c199d4c3ac 100644 --- a/iroh-dns-server/src/store/signed_packets.rs +++ b/iroh-dns-server/src/store/signed_packets.rs @@ -1,6 +1,7 @@ use std::{future::Future, path::Path, result, time::Duration}; use anyhow::{Context, Result}; +use bytes::Bytes; use iroh_metrics::inc; use pkarr::{system_time, SignedPacket}; use redb::{ @@ -8,7 +9,7 @@ use redb::{ }; use tokio::sync::{mpsc, oneshot}; use tokio_util::sync::CancellationToken; -use tracing::{debug, error, info}; +use tracing::{debug, error, info, trace}; use crate::{metrics::Metrics, util::PublicKeyBytes}; @@ -78,10 +79,14 @@ pub struct Options { impl Default for Options { fn default() -> Self { Self { + // 64k packets max_batch_size: 1024 * 64, + // this means we lose at most 1 second of data in case of a crash max_batch_time: Duration::from_secs(1), + // 7 days eviction: Duration::from_secs(3600 * 24 * 7), - eviction_interval: Duration::from_secs(3600 * 24), + // eviction can run frequently since it does not do a full scan + eviction_interval: Duration::from_secs(10), } } } @@ -91,19 +96,20 @@ impl Actor { match self.run0().await { Ok(()) => {} Err(e) => { - self.cancel.cancel(); tracing::error!("packet store actor failed: {:?}", e); + self.cancel.cancel(); } } } async fn run0(&mut self) -> anyhow::Result<()> { - let expired_us = self.options.eviction.as_micros() as u64; + let expiry_us = self.options.eviction.as_micros() as u64; loop { + trace!("batch"); let transaction = self.db.begin_write()?; let mut tables = Tables::new(&transaction)?; let timeout = tokio::time::sleep(self.options.max_batch_time); - let expired = system_time() - expired_us; + let expired = system_time() - expiry_us; tokio::pin!(timeout); for _ in 0..self.options.max_batch_size { tokio::select! { @@ -116,21 +122,25 @@ impl Actor { Some(msg) = self.recv.recv() => { match msg { Message::Get { key, res } => { + trace!("get {}", key); let packet = get_packet(&tables.signed_packets, &key)?; res.send(packet).ok(); } Message::Upsert { packet, res } => { let key = PublicKeyBytes::from_signed_packet(&packet); - let mut replaced = false; - if let Some(existing) = get_packet(&tables.signed_packets, &key)? { + trace!("upsert {}", key); + let replaced = if let Some(existing) = get_packet(&tables.signed_packets, &key)? { if existing.more_recent_than(&packet) { res.send(false).ok(); continue; } else { + // remove the packet from the update time index tables.update_time.remove(&packet.timestamp().to_be_bytes(), key.as_bytes())?; - replaced = true; + true } - } + } else { + false + }; let value = packet.as_bytes(); tables.signed_packets.insert(key.as_bytes(), &value[..])?; tables.update_time.insert(&packet.timestamp().to_be_bytes(), key.as_bytes())?; @@ -142,8 +152,9 @@ impl Actor { res.send(true).ok(); } Message::Remove { key, res } => { + trace!("remove {}", key); let updated = if let Some(row) = tables.signed_packets.remove(key.as_bytes())? { - let packet = SignedPacket::from_bytes(&row.value().to_vec().into())?; + let packet = SignedPacket::from_bytes(&Bytes::copy_from_slice(row.value()))?; tables.update_time.remove(&packet.timestamp().to_be_bytes(), key.as_bytes())?; inc!(Metrics, store_packets_removed); true @@ -156,9 +167,11 @@ impl Actor { res.send(updated).ok(); } Message::Snapshot { res } => { + trace!("snapshot"); res.send(Snapshot::new(&self.db)?).ok(); } Message::CheckExpired { key, time } => { + trace!("check expired {} at {}", key, u64::from_be_bytes(time)); tables.update_time.remove_all(&time)?; if let Some(packet) = get_packet(&tables.signed_packets, &key)? { if packet.timestamp() < expired { @@ -311,22 +324,46 @@ async fn evict_task(send: mpsc::Sender, options: Options, cancel: Cance /// Periodically check for expired packets and remove them. async fn evict_task_inner(send: mpsc::Sender, options: Options) -> anyhow::Result<()> { - let expiry_ms = options.eviction.as_micros() as u64; + let expiry_us = options.eviction.as_micros() as u64; loop { let (tx, rx) = oneshot::channel(); let _ = send.send(Message::Snapshot { res: tx }).await.ok(); + // if we can't get the snapshot we exit the loop, main actor dead let Ok(snapshot) = rx.await else { anyhow::bail!("failed to get snapshot"); }; - debug!("got snapshot"); - let expired = system_time() - expiry_ms; + let expired = system_time() - expiry_us; + trace!("evicting packets older than {}", expired); + // if getting the range fails we exit the loop and shut down + // if individual reads fail we log the error and limp on for item in snapshot.update_time.range(..expired.to_be_bytes())? { - let (time, keys) = item?; + let (time, keys) = match item { + Ok(v) => v, + Err(e) => { + error!("failed to read update_time row {:?}", e); + continue; + } + }; + let time = time.value(); + trace!("evicting expired packets at {}", u64::from_be_bytes(time)); for item in keys { - let key = item?; - let time = time.value(); + let key = match item { + Ok(v) => v, + Err(e) => { + error!( + "failed to read update_time item at {}: {:?}", + u64::from_be_bytes(time), + e + ); + continue; + } + }; let key = PublicKeyBytes::new(key.value()); - debug!("evicting expired packet {:?} {}", time, key); + debug!( + "evicting expired packet {} {}", + u64::from_be_bytes(time), + key + ); send.send(Message::CheckExpired { time, key }).await?; } } From 13f17cb0a12f801d95715d2501324e2592b91f55 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Thu, 5 Dec 2024 13:43:45 +0200 Subject: [PATCH 18/21] Add peekable receiver and make sure we don't start transactions for nothing --- iroh-dns-server/src/store/signed_packets.rs | 46 +++++++++++++++++++-- 1 file changed, 43 insertions(+), 3 deletions(-) diff --git a/iroh-dns-server/src/store/signed_packets.rs b/iroh-dns-server/src/store/signed_packets.rs index c199d4c3ac..768004922f 100644 --- a/iroh-dns-server/src/store/signed_packets.rs +++ b/iroh-dns-server/src/store/signed_packets.rs @@ -35,6 +35,7 @@ impl Drop for SignedPacketStore { } } +#[derive(derive_more::Debug)] enum Message { Upsert { packet: SignedPacket, @@ -49,6 +50,7 @@ enum Message { res: oneshot::Sender, }, Snapshot { + #[debug(skip)] res: oneshot::Sender, }, CheckExpired { @@ -59,7 +61,7 @@ enum Message { struct Actor { db: Database, - recv: mpsc::Receiver, + recv: PeekableReceiver, cancel: CancellationToken, options: Options, } @@ -104,8 +106,9 @@ impl Actor { async fn run0(&mut self) -> anyhow::Result<()> { let expiry_us = self.options.eviction.as_micros() as u64; - loop { + while let Some(msg) = self.recv.recv().await { trace!("batch"); + self.recv.push_back(msg).unwrap(); let transaction = self.db.begin_write()?; let mut tables = Tables::new(&transaction)?; let timeout = tokio::time::sleep(self.options.max_batch_time); @@ -187,6 +190,7 @@ impl Actor { drop(tables); transaction.commit()?; } + Ok(()) } } @@ -258,7 +262,7 @@ impl SignedPacketStore { let cancel3 = cancel.clone(); let actor = Actor { db, - recv, + recv: PeekableReceiver::new(recv), cancel: cancel2, options, }; @@ -410,3 +414,39 @@ impl Drop for IoThread { } } } + +/// A wrapper for a tokio mpsc receiver that allows peeking at the next message. +#[derive(Debug)] +pub(super) struct PeekableReceiver { + msg: Option, + recv: tokio::sync::mpsc::Receiver, +} + +#[allow(dead_code)] +impl PeekableReceiver { + pub fn new(recv: tokio::sync::mpsc::Receiver) -> Self { + Self { msg: None, recv } + } + + /// Receive the next message. + /// + /// Will block if there are no messages. + /// Returns None only if there are no more messages (sender is dropped). + pub async fn recv(&mut self) -> Option { + if let Some(msg) = self.msg.take() { + return Some(msg); + } + self.recv.recv().await + } + + /// Push back a message. This will only work if there is room for it. + /// Otherwise, it will fail and return the message. + pub fn push_back(&mut self, msg: T) -> std::result::Result<(), T> { + if self.msg.is_none() { + self.msg = Some(msg); + Ok(()) + } else { + Err(msg) + } + } +} From a68a2af05047600008f63d86bb0fa2b83c2d4628 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Thu, 5 Dec 2024 14:04:53 +0200 Subject: [PATCH 19/21] don't open write txn if somebody wants a snapshot --- iroh-dns-server/src/store/signed_packets.rs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/iroh-dns-server/src/store/signed_packets.rs b/iroh-dns-server/src/store/signed_packets.rs index 768004922f..4a726310df 100644 --- a/iroh-dns-server/src/store/signed_packets.rs +++ b/iroh-dns-server/src/store/signed_packets.rs @@ -107,6 +107,14 @@ impl Actor { async fn run0(&mut self) -> anyhow::Result<()> { let expiry_us = self.options.eviction.as_micros() as u64; while let Some(msg) = self.recv.recv().await { + // if we get a snapshot message here we don't need to do a write transaction + let msg = if let Message::Snapshot { res } = msg { + let snapshot = Snapshot::new(&self.db)?; + res.send(snapshot).ok(); + continue; + } else { + msg + }; trace!("batch"); self.recv.push_back(msg).unwrap(); let transaction = self.db.begin_write()?; From 548cbd4181bdcf8cfb27dd87ddcfc25c5618bef6 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Thu, 5 Dec 2024 16:19:34 +0200 Subject: [PATCH 20/21] replace remove_all with individual remove calls since time is probably unique in 99.9% of all cases, it does not make a diff... --- iroh-dns-server/src/store/signed_packets.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iroh-dns-server/src/store/signed_packets.rs b/iroh-dns-server/src/store/signed_packets.rs index 4a726310df..34de1abe94 100644 --- a/iroh-dns-server/src/store/signed_packets.rs +++ b/iroh-dns-server/src/store/signed_packets.rs @@ -183,9 +183,9 @@ impl Actor { } Message::CheckExpired { key, time } => { trace!("check expired {} at {}", key, u64::from_be_bytes(time)); - tables.update_time.remove_all(&time)?; if let Some(packet) = get_packet(&tables.signed_packets, &key)? { if packet.timestamp() < expired { + tables.update_time.remove(&time, key.as_bytes())?; let _ = tables.signed_packets.remove(key.as_bytes())?; inc!(Metrics, store_packets_expired); } From 191a0cb5cfce827f65a66762ac5c5f6027c78ac2 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Thu, 5 Dec 2024 16:28:32 +0200 Subject: [PATCH 21/21] clippy --- iroh-dns-server/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iroh-dns-server/src/lib.rs b/iroh-dns-server/src/lib.rs index e663a17a9e..9cea6bf51e 100644 --- a/iroh-dns-server/src/lib.rs +++ b/iroh-dns-server/src/lib.rs @@ -275,6 +275,6 @@ mod tests { let node_id = secret_key.public(); let relay_url: Url = "https://relay.example.".parse()?; let node_info = NodeInfo::new(node_id, Some(relay_url.clone()), Default::default()); - Ok(node_info.to_pkarr_signed_packet(&secret_key, 30)?) + node_info.to_pkarr_signed_packet(&secret_key, 30) } }