Skip to content

Commit

Permalink
Try out the other way to do eviction
Browse files Browse the repository at this point in the history
a second table that has time -> Set<Key>
  • Loading branch information
rklaehn committed Dec 4, 2024
1 parent 453dffb commit 691f758
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 16 deletions.
47 changes: 31 additions & 16 deletions iroh-dns-server/src/store/signed_packets.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use std::{future::Future, path::Path, result, time::Duration};

use anyhow::{Context, Result};
use bytes::Bytes;
use iroh_metrics::inc;
use pkarr::{system_time, SignedPacket};
use redb::{backends::InMemoryBackend, Database, ReadableTable, TableDefinition};
use redb::{
backends::InMemoryBackend, Database, MultimapTableDefinition, ReadableTable, TableDefinition,
};
use tokio::sync::{mpsc, oneshot};
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info};
Expand All @@ -14,6 +15,8 @@ use crate::{metrics::Metrics, util::PublicKeyBytes};
pub type SignedPacketsKey = [u8; 32];
const SIGNED_PACKETS_TABLE: TableDefinition<&SignedPacketsKey, &[u8]> =
TableDefinition::new("signed-packets-1");
const UPDATE_TIME_TABLE: MultimapTableDefinition<[u8; 8], SignedPacketsKey> =
MultimapTableDefinition::new("update-time-1");

#[derive(Debug)]
pub struct SignedPacketStore {
Expand Down Expand Up @@ -48,6 +51,7 @@ enum Message {
res: oneshot::Sender<Snapshot>,
},
CheckExpired {
time: [u8; 8],
key: PublicKeyBytes,
},
}
Expand Down Expand Up @@ -123,11 +127,13 @@ impl Actor {
res.send(false).ok();
continue;
} else {
tables.update_time.remove(&packet.timestamp().to_be_bytes(), key.as_bytes())?;
replaced = true;
}
}
let value = packet.as_bytes();
tables.signed_packets.insert(key.as_bytes(), &value[..])?;
tables.update_time.insert(&packet.timestamp().to_be_bytes(), key.as_bytes())?;
if replaced {
inc!(Metrics, store_packets_updated);
} else {
Expand All @@ -136,9 +142,14 @@ impl Actor {
res.send(true).ok();
}
Message::Remove { key, res } => {
let updated =
tables.signed_packets.remove(key.as_bytes())?.is_some()
;
let updated = if let Some(row) = tables.signed_packets.remove(key.as_bytes())? {
let packet = SignedPacket::from_bytes(&row.value().to_vec().into())?;
tables.update_time.remove(&packet.timestamp().to_be_bytes(), key.as_bytes())?;
inc!(Metrics, store_packets_removed);
true
} else {
false
};
if updated {
inc!(Metrics, store_packets_removed);
}
Expand All @@ -147,7 +158,8 @@ impl Actor {
Message::Snapshot { res } => {
res.send(Snapshot::new(&self.db)?).ok();
}
Message::CheckExpired { key } => {
Message::CheckExpired { key, time } => {
tables.update_time.remove_all(&time)?;
if let Some(packet) = get_packet(&tables.signed_packets, &key)? {
if packet.timestamp() < expired {
let _ = tables.signed_packets.remove(key.as_bytes())?;
Expand All @@ -169,25 +181,30 @@ impl Actor {
/// signed packet store.
pub(super) struct Tables<'a> {
pub signed_packets: redb::Table<'a, &'static SignedPacketsKey, &'static [u8]>,
pub update_time: redb::MultimapTable<'a, [u8; 8], SignedPacketsKey>,
}

impl<'txn> Tables<'txn> {
pub fn new(tx: &'txn redb::WriteTransaction) -> result::Result<Self, redb::TableError> {
Ok(Self {
signed_packets: tx.open_table(SIGNED_PACKETS_TABLE)?,
update_time: tx.open_multimap_table(UPDATE_TIME_TABLE)?,
})
}
}

pub(super) struct Snapshot {
#[allow(dead_code)]
pub signed_packets: redb::ReadOnlyTable<&'static SignedPacketsKey, &'static [u8]>,
pub update_time: redb::ReadOnlyMultimapTable<[u8; 8], SignedPacketsKey>,
}

impl Snapshot {
pub fn new(db: &Database) -> Result<Self> {
let tx = db.begin_read()?;
Ok(Self {
signed_packets: tx.open_table(SIGNED_PACKETS_TABLE)?,
update_time: tx.open_multimap_table(UPDATE_TIME_TABLE)?,
})
}
}
Expand Down Expand Up @@ -303,16 +320,14 @@ async fn evict_task_inner(send: mpsc::Sender<Message>, options: Options) -> anyh
};
debug!("got snapshot");
let expired = system_time() - expiry_ms;
for item in snapshot.signed_packets.iter()? {
let (_, value) = item?;
let value = Bytes::copy_from_slice(value.value());
let packet = SignedPacket::from_bytes(&value)?;
if packet.timestamp() < expired {
debug!("evicting expired packet {}", packet.public_key());
send.send(Message::CheckExpired {
key: PublicKeyBytes::from_signed_packet(&packet),
})
.await?;
for item in snapshot.update_time.range(..expired.to_be_bytes())? {
let (time, keys) = item?;
for item in keys {
let key = item?;
let time = time.value();
let key = PublicKeyBytes::new(key.value());
debug!("evicting expired packet {:?} {}", time, key);
send.send(Message::CheckExpired { time, key }).await?;
}
}
// sleep for the eviction interval so we don't constantly check
Expand Down
4 changes: 4 additions & 0 deletions iroh-dns-server/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ use pkarr::SignedPacket;
pub struct PublicKeyBytes([u8; 32]);

impl PublicKeyBytes {
pub fn new(bytes: [u8; 32]) -> Self {
Self(bytes)
}

pub fn from_z32(s: &str) -> Result<Self> {
let bytes = z32::decode(s.as_bytes())?;
let bytes: [u8; 32] = bytes.try_into().map_err(|_| anyhow!("invalid length"))?;
Expand Down

0 comments on commit 691f758

Please sign in to comment.