Skip to content

Commit

Permalink
feat: return expired SignedPacket as fallback, closes #67
Browse files Browse the repository at this point in the history
  • Loading branch information
Nuhvi committed Oct 8, 2024
1 parent a4eca9d commit 4ae72cc
Show file tree
Hide file tree
Showing 10 changed files with 115 additions and 46 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ All notable changes to pkarr client and server will be documented in this file.
### Added

- Add strict monotonic unix `Timestamp`.
- Impl `PartialEq, Eq` for `SignedPacket`.
- Impl `From<PublicKey>` for `CacheKey`.

### Changed

Expand All @@ -16,6 +18,7 @@ All notable changes to pkarr client and server will be documented in this file.
- replace `ureq` with `reqwest` to work with HTTP/2 relays, and Wasm.
- update `mainline` to v3.0.0
- `Client::shutdown` and `Client::shutdown_sync` are now idempotent and return `()`.
- `Client::resolve`, `Client::resolve_sync` and `relay::Client::resolve` return expired cached `SignedPacket` if nothing else was found from the network.

### Removed

Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions pkarr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ rand = "0.8.5"
# optional dependencies
serde = { version = "1.0.209", features = ["derive"], optional = true }
futures = { version = "0.3.30", optional = true, default-features = false }
sha1_smol = { version = "1.0.1", optional = true }

[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
# Dht client dependencies:
Expand All @@ -38,6 +39,7 @@ js-sys = "0.3.69"
futures = "0.3.30"
getrandom = { version = "0.2", features = ["js"] }
reqwest = "0.12.7"
sha1_smol = "1.0.1"

[dev-dependencies]
postcard = { version = "1.0.10", features = ["alloc"] }
Expand All @@ -56,7 +58,7 @@ serde = ["dep:serde"]
## Use [dht::Client]
dht = ["dep:mainline", "flume"]
## Use [relay::Client]
relay = ["dep:reqwest", "dep:tokio"]
relay = ["dep:reqwest", "dep:tokio", "dep:sha1_smol"]

# Extra
## Use [Client::resolve_endpoints] and [Client::resolve_https_endpoints]
Expand All @@ -65,7 +67,7 @@ endpoints = ["dep:futures"]
## Use all features
full = ["dht", "relay", "serde", "endpoints"]

default = ["dht", "relay", "serde"]
default = ["dht", "serde"]

[package.metadata.docs.rs]
all-features = true
Expand Down
40 changes: 27 additions & 13 deletions pkarr/src/base/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,38 @@ use std::fmt::Debug;
use std::num::NonZeroUsize;
use std::sync::{Arc, Mutex};

use crate::{PublicKey, SignedPacket};
#[cfg(all(not(target_arch = "wasm32"), feature = "dht"))]
use mainline::MutableItem;

use crate::SignedPacket;

/// The sha1 hash of the [PublicKey] used as the key in [Cache].
pub type CacheKey = [u8; 20];

impl From<&PublicKey> for CacheKey {
/// Take the first 20 bytes from a [PublicKey] instead of hashing with sha1.
/// Useful when you don't want to import Sha1 hasher, or mainline crate.
///
/// Risks evicting SignedPackets from the cache if there ar two packets whose
/// PublicKey share the first 20 bytes.
///
/// Don't use both PublicKey and SignedPacket::target() to store packets,
/// otherwise you will end up duplicating and wasting cache space.
fn from(public_key: &PublicKey) -> CacheKey {
let cache_key: [u8; 20] = public_key.as_bytes()[0..20].try_into().unwrap();
#[cfg(all(not(target_arch = "wasm32"), feature = "dht"))]
impl From<&crate::PublicKey> for CacheKey {
fn from(public_key: &crate::PublicKey) -> CacheKey {
MutableItem::target_from_key(public_key.as_bytes(), &None).into()
}
}

#[cfg(any(target_arch = "wasm32", all(not(feature = "dht"), feature = "relay")))]
impl From<&crate::PublicKey> for CacheKey {
fn from(public_key: &crate::PublicKey) -> CacheKey {
let mut encoded = vec![];

encoded.extend(public_key);

let mut hasher = Sha1::new();
hasher.update(&encoded);
hasher.digest().bytes()
}
}

cache_key
#[cfg(any(target_arch = "wasm32", feature = "dht", feature = "relay"))]
impl From<crate::PublicKey> for CacheKey {
fn from(value: crate::PublicKey) -> Self {
(&value).into()
}
}

Expand Down
5 changes: 3 additions & 2 deletions pkarr/src/base/signed_packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ self_cell!(
dependent: Packet,
}

impl{Debug}
impl{Debug, PartialEq, Eq}
);

impl Inner {
Expand Down Expand Up @@ -56,7 +56,7 @@ impl Inner {
}
}

#[derive(Debug)]
#[derive(Debug, PartialEq, Eq)]
/// Signed DNS packet
pub struct SignedPacket {
inner: Inner,
Expand Down Expand Up @@ -569,6 +569,7 @@ mod tests {
}
}

#[cfg(feature = "dht")]
#[test]
fn to_mutable() {
let keypair = Keypair::random();
Expand Down
1 change: 1 addition & 0 deletions pkarr/src/base/timestamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ mod tests {
assert_eq!(decoded, timestamp)
}

#[cfg(feature = "serde")]
#[test]
fn serde() {
let timestamp = Timestamp::now();
Expand Down
33 changes: 29 additions & 4 deletions pkarr/src/client/dht.rs
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,15 @@ fn run(mut rpc: Rpc, cache: Box<dyn Cache>, settings: Settings, receiver: Receiv

// === Drop senders to done queries ===
for id in &report.done_get_queries {
senders.remove(id);
if let Some(senders) = senders.remove(id) {
if let Some(cached) = cache.get(id.as_bytes()) {
debug!(public_key = ?cached.public_key(), "Returning expired cache as a fallback");
// Send cached packets if available
for sender in senders {
let _ = sender.send(cached.clone());
}
}
};
}

// === Receive and handle incoming messages ===
Expand Down Expand Up @@ -502,6 +510,8 @@ pub enum ActorMessage {

#[cfg(test)]
mod tests {
use std::time::Duration;

use mainline::Testnet;

use super::*;
Expand Down Expand Up @@ -663,14 +673,29 @@ mod tests {

#[tokio::test]
async fn return_expired_packet_fallback() {
let client = Client::builder().build().unwrap();
let testnet = Testnet::new(10).unwrap();

let client = Client::builder()
.testnet(&testnet)
.dht_settings(DhtSettings {
request_timeout: Duration::from_millis(10).into(),
..Default::default()
})
// Everything is expired
.maximum_ttl(0)
.build()
.unwrap();

let keypair = Keypair::random();
let mut packet = dns::Packet::new_reply(0);
let packet = dns::Packet::new_reply(0);
let signed_packet = SignedPacket::from_packet(&keypair, &packet).unwrap();

client
.cache()
.put(keypair.public_key().into(), &signed_packet);
.put(&keypair.public_key().into(), &signed_packet);

let resolved = client.resolve(&keypair.public_key()).await.unwrap();

assert_eq!(resolved, Some(signed_packet));
}
}
69 changes: 45 additions & 24 deletions pkarr/src/client/relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,12 +178,24 @@ impl Client {
/// with the transport, transparent from [reqwest::Error].
/// - Returns [Error::IO] if something went wrong while reading the payload.
pub async fn resolve(&self, public_key: &PublicKey) -> Result<Option<SignedPacket>> {
let cached_packet = match self.get_from_cache(public_key) {
None => None,
Some(signed_packet) => return Ok(Some(signed_packet)),
if let Some(cached_packet) = self.cache.get(&public_key.as_ref().into()) {
let expires_in = cached_packet.expires_in(self.minimum_ttl, self.maximum_ttl);

if expires_in > 0 {
debug!(expires_in, "Have fresh signed_packet in cache.");
return Ok(Some(cached_packet));
}

debug!(expires_in, "Have expired signed_packet in cache.");

return Ok(self
.race_resolve(public_key, Some(cached_packet.clone()))
.await?
.or(Some(cached_packet)));
};
debug!("Cache miss");

self.race_resolve(public_key, cached_packet).await
self.race_resolve(public_key, None).await
}

// === Native Race implementation ===
Expand Down Expand Up @@ -315,26 +327,6 @@ impl Client {
})
}

fn get_from_cache(&self, public_key: &PublicKey) -> Option<SignedPacket> {
let cached_packet = self.cache.get(&public_key.as_ref().into());

if let Some(cached) = cached_packet {
let expires_in = cached.expires_in(self.minimum_ttl, self.maximum_ttl);

if expires_in > 0 {
debug!(expires_in, "Have fresh signed_packet in cache.");

return Some(cached.clone());
}

debug!(expires_in, "Have expired signed_packet in cache.");
} else {
debug!("Cache miss");
};

None
}

async fn resolve_from_relay(
&self,
relay: &str,
Expand Down Expand Up @@ -459,4 +451,33 @@ mod tests {

assert!(resolved.is_none());
}

#[tokio::test]
async fn return_expired_packet_fallback() {
let keypair = Keypair::random();

let mut server = mockito::Server::new_async().await;

let path = format!("/{}", keypair.public_key());

server.mock("GET", path.as_str()).with_status(404).create();

let relays = vec![server.url()];
let client = Client::builder()
.relays(relays)
.maximum_ttl(0)
.build()
.unwrap();

let packet = dns::Packet::new_reply(0);
let signed_packet = SignedPacket::from_packet(&keypair, &packet).unwrap();

client
.cache()
.put(&keypair.public_key().into(), &signed_packet);

let resolved = client.resolve(&keypair.public_key()).await.unwrap();

assert_eq!(resolved, Some(signed_packet));
}
}
2 changes: 1 addition & 1 deletion pkarr/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ pub enum Error {
PacketTooLarge(usize),

// === Flume errors ===
#[cfg(not(target_arch = "wasm32"))]
#[cfg(all(not(target_arch = "wasm32"), feature = "dht"))]
#[error(transparent)]
/// Transparent [flume::RecvError]
Receive(#[from] flume::RecvError),
Expand Down
1 change: 1 addition & 0 deletions pkarr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ pub const DEFAULT_RELAYS: [&str; 2] = ["https://relay.pkarr.org", "https://pkarr
/// Default [resolver](https://pkarr.org/resolvers)s
pub const DEFAULT_RESOLVERS: [&str; 2] = ["resolver.pkarr.org:6881", "pkarr.pubky.app:6881"];

#[cfg(any(target_arch = "wasm32", feature = "dht"))]
pub use client::{Client, ClientBuilder, Settings};

// Rexports
Expand Down

0 comments on commit 4ae72cc

Please sign in to comment.