From 127741fda4a7cf7b744f3b38a0179a556cbc1609 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Wed, 4 Dec 2024 16:53:13 +0200 Subject: [PATCH] Try out the other way to do eviction a second table that has time -> Set --- iroh-dns-server/src/store/signed_packets.rs | 46 ++++++++++++++------- iroh-dns-server/src/util.rs | 4 ++ 2 files changed, 34 insertions(+), 16 deletions(-) diff --git a/iroh-dns-server/src/store/signed_packets.rs b/iroh-dns-server/src/store/signed_packets.rs index d8d827b7a21..294d9bd553b 100644 --- a/iroh-dns-server/src/store/signed_packets.rs +++ b/iroh-dns-server/src/store/signed_packets.rs @@ -1,10 +1,11 @@ use std::{future::Future, path::Path, result, time::Duration}; use anyhow::{Context, Result}; -use bytes::Bytes; use iroh_metrics::inc; use pkarr::{system_time, SignedPacket}; -use redb::{backends::InMemoryBackend, Database, ReadableTable, TableDefinition}; +use redb::{ + backends::InMemoryBackend, Database, MultimapTableDefinition, ReadableTable, TableDefinition, +}; use tokio::sync::{mpsc, oneshot}; use tokio_util::sync::CancellationToken; use tracing::{debug, error, info}; @@ -14,6 +15,8 @@ use crate::{metrics::Metrics, util::PublicKeyBytes}; pub type SignedPacketsKey = [u8; 32]; const SIGNED_PACKETS_TABLE: TableDefinition<&SignedPacketsKey, &[u8]> = TableDefinition::new("signed-packets-1"); +const UPDATE_TIME_TABLE: MultimapTableDefinition<[u8; 8], SignedPacketsKey> = + MultimapTableDefinition::new("update-time-1"); #[derive(Debug)] pub struct SignedPacketStore { @@ -48,6 +51,7 @@ enum Message { res: oneshot::Sender, }, CheckExpired { + time: [u8; 8], key: PublicKeyBytes, }, } @@ -123,11 +127,13 @@ impl Actor { res.send(false).ok(); continue; } else { + tables.update_time.remove(&packet.timestamp().to_be_bytes(), key.as_bytes())?; replaced = true; } } let value = packet.as_bytes(); tables.signed_packets.insert(key.as_bytes(), &value[..])?; + tables.update_time.insert(&packet.timestamp().to_be_bytes(), key.as_bytes())?; if replaced { inc!(Metrics, store_packets_updated); } else { @@ -136,9 +142,14 @@ impl Actor { res.send(true).ok(); } Message::Remove { key, res } => { - let updated = - tables.signed_packets.remove(key.as_bytes())?.is_some() - ; + let updated = if let Some(row) = tables.signed_packets.remove(key.as_bytes())? { + let packet = SignedPacket::from_bytes(&row.value().to_vec().into())?; + tables.update_time.remove(&packet.timestamp().to_be_bytes(), key.as_bytes())?; + inc!(Metrics, store_packets_removed); + true + } else { + false + }; if updated { inc!(Metrics, store_packets_removed); } @@ -147,7 +158,8 @@ impl Actor { Message::Snapshot { res } => { res.send(Snapshot::new(&self.db)?).ok(); } - Message::CheckExpired { key } => { + Message::CheckExpired { key, time } => { + tables.update_time.remove_all(&time)?; if let Some(packet) = get_packet(&tables.signed_packets, &key)? { if packet.timestamp() < expired { let _ = tables.signed_packets.remove(key.as_bytes())?; @@ -169,18 +181,21 @@ impl Actor { /// signed packet store. pub(super) struct Tables<'a> { pub signed_packets: redb::Table<'a, &'static SignedPacketsKey, &'static [u8]>, + pub update_time: redb::MultimapTable<'a, [u8; 8], SignedPacketsKey>, } impl<'txn> Tables<'txn> { pub fn new(tx: &'txn redb::WriteTransaction) -> result::Result { Ok(Self { signed_packets: tx.open_table(SIGNED_PACKETS_TABLE)?, + update_time: tx.open_multimap_table(UPDATE_TIME_TABLE)?, }) } } pub(super) struct Snapshot { pub signed_packets: redb::ReadOnlyTable<&'static SignedPacketsKey, &'static [u8]>, + pub update_time: redb::ReadOnlyMultimapTable<[u8; 8], SignedPacketsKey>, } impl Snapshot { @@ -188,6 +203,7 @@ impl Snapshot { let tx = db.begin_read()?; Ok(Self { signed_packets: tx.open_table(SIGNED_PACKETS_TABLE)?, + update_time: tx.open_multimap_table(UPDATE_TIME_TABLE)?, }) } } @@ -303,16 +319,14 @@ async fn evict_task_inner(send: mpsc::Sender, options: Options) -> anyh }; debug!("got snapshot"); let expired = system_time() - expiry_ms; - for item in snapshot.signed_packets.iter()? { - let (_, value) = item?; - let value = Bytes::copy_from_slice(value.value()); - let packet = SignedPacket::from_bytes(&value)?; - if packet.timestamp() < expired { - debug!("evicting expired packet {}", packet.public_key()); - send.send(Message::CheckExpired { - key: PublicKeyBytes::from_signed_packet(&packet), - }) - .await?; + for item in snapshot.update_time.range(..expired.to_be_bytes())? { + let (time, keys) = item?; + for item in keys { + let key = item?; + let time = time.value(); + let key = PublicKeyBytes::new(key.value()); + debug!("evicting expired packet {:?} {}", time, key); + send.send(Message::CheckExpired { time, key }).await?; } } // sleep for the eviction interval so we don't constantly check diff --git a/iroh-dns-server/src/util.rs b/iroh-dns-server/src/util.rs index 6fc28b4d1f9..b395b91d068 100644 --- a/iroh-dns-server/src/util.rs +++ b/iroh-dns-server/src/util.rs @@ -22,6 +22,10 @@ use pkarr::SignedPacket; pub struct PublicKeyBytes([u8; 32]); impl PublicKeyBytes { + pub fn new(bytes: [u8; 32]) -> Self { + Self(bytes) + } + pub fn from_z32(s: &str) -> Result { let bytes = z32::decode(s.as_bytes())?; let bytes: [u8; 32] = bytes.try_into().map_err(|_| anyhow!("invalid length"))?;