Skip to content

Commit

Permalink
pool: auto-close subscription after Duration of no new events
Browse files Browse the repository at this point in the history
Add `SubscribeAutoCloseOptions::relative_timeout`

Closes rust-nostr#691

Co-authored-by: dluvian <[email protected]>
Signed-off-by: Yuki Kishimoto <[email protected]>
  • Loading branch information
yukibtc and dluvian committed Dec 27, 2024
1 parent 08e1e74 commit 60a10b1
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 48 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
* database: impl PartialEq and Eq for `Events` ([Yuki Kishimoto])
* database: add `SaveEventStatus` enum ([Yuki Kishimoto])
* pool: add `ReceiverStream` ([Yuki Kishimoto])
* Add `SubscribeAutoCloseOptions::idle_timeout` ([Yuki Kishimoto])
* sdk: automatically resend event after NIP-42 authentication ([Yuki Kishimoto])
* sdk: add `Connection::embedded_tor_with_path` ([Yuki Kishimoto])
* connect: add `NostrConnect::status` ([Yuki Kishimoto])
Expand Down
9 changes: 8 additions & 1 deletion bindings/nostr-sdk-ffi/src/relay/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,12 +212,19 @@ impl SubscribeAutoCloseOptions {
builder
}

/// Automatically close subscription after `Duration`
/// Automatically close subscription after duration.
pub fn timeout(&self, timeout: Option<Duration>) -> Self {
let mut builder = self.clone();
builder.inner = builder.inner.timeout(timeout);
builder
}

/// Automatically close subscription if no notifications/events are received within the duration.
pub fn idle_timeout(&self, timeout: Option<Duration>) -> Self {
let mut builder = self.clone();
builder.inner = builder.inner.idle_timeout(timeout);
builder
}
}

/// Subscribe options
Expand Down
6 changes: 6 additions & 0 deletions bindings/nostr-sdk-js/src/relay/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,12 @@ impl JsSubscribeAutoCloseOptions {
pub fn timeout(self, timeout: Option<JsDuration>) -> Self {
self.inner.timeout(timeout.map(|t| *t)).into()
}

/// Automatically close subscription if no notifications/events are received within the duration.
#[wasm_bindgen(js_name = idleTimeout)]
pub fn idle_timeout(self, timeout: Option<JsDuration>) -> Self {
self.inner.idle_timeout(timeout.map(|t| *t)).into()
}
}

/// Subscribe options
Expand Down
117 changes: 71 additions & 46 deletions crates/nostr-relay-pool/src/relay/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1361,40 +1361,7 @@ impl InnerRelay {

// Check if auto-close condition is set
match opts.auto_close {
Some(opts) => {
let relay = self.clone();
task::spawn(async move {
let res: Option<(bool, Option<SubscriptionAutoClosedReason>)> =
relay.handle_auto_closing(&id, filters, opts).await;

// Check if CLOSE needed
let to_close: bool = match res {
Some((to_close, reason)) => {
// Send subscription auto closed notification
if let Some(reason) = reason {
relay.send_notification(
RelayNotification::SubscriptionAutoClosed { reason },
false,
);
}

to_close
}
None => {
tracing::warn!(id = %id, "Timeout reached for subscription, auto-closing.");
true
}
};

// Close subscription
if to_close {
tracing::debug!(id = %id, "Auto-closing subscription.");
relay.send_msg(ClientMessage::close(id))?;
}

Ok::<(), Error>(())
});
}
Some(opts) => self.spawn_auto_closing_handler(id, filters, opts),
None => {
// No auto-close subscription: update subscription filters
self.update_subscription(id, filters, true).await;
Expand All @@ -1404,25 +1371,83 @@ impl InnerRelay {
Ok(())
}

fn spawn_auto_closing_handler(
&self,
id: SubscriptionId,
filters: Vec<Filter>,
opts: SubscribeAutoCloseOptions,
) {
let relay = self.clone();
task::spawn(async move {
// Check if CLOSE needed
let to_close: bool = match relay.handle_auto_closing(&id, filters, opts).await {
Some((to_close, reason)) => {
// Send subscription auto-closed notification
if let Some(reason) = reason {
relay.send_notification(
RelayNotification::SubscriptionAutoClosed { reason },
false,
);
}

to_close
}
// Timeout
None => {
tracing::warn!(id = %id, "Timeout reached for subscription, auto-closing.");
true
}
};

// Close subscription
if to_close {
tracing::debug!(id = %id, "Auto-closing subscription.");
relay.send_msg(ClientMessage::close(id))?;
}

Ok::<(), Error>(())
});
}

async fn handle_auto_closing(
&self,
id: &SubscriptionId,
filters: Vec<Filter>,
opts: SubscribeAutoCloseOptions,
) -> Option<(bool, Option<SubscriptionAutoClosedReason>)> {
time::timeout(opts.timeout, async move {
let mut counter = 0;
let mut counter: u16 = 0;
let mut received_eose: bool = false;
let mut require_resubscription: bool = false;
let mut last_event: Option<Instant> = None;

// Subscribe to notifications
let mut notifications = self.internal_notification_sender.subscribe();
while let Ok(notification) = notifications.recv().await {

// Listen to notifications with timeout
// If no notification is received within no-events timeout, `None` is returned.
while let Ok(notification) =
time::timeout(opts.idle_timeout, notifications.recv()).await?
{
// Check if no-events timeout is reached
if let (Some(idle_timeout), Some(last_event)) = (opts.idle_timeout, last_event) {
if last_event.elapsed() > idle_timeout {
// Close the subscription
return Some((true, None)); // TODO: use SubscriptionAutoClosedReason::Timeout?
}
}

match notification {
RelayNotification::Message { message, .. } => match message {
RelayMessage::Event {
subscription_id, ..
} => {
if &subscription_id == id {
// If no-events timeout is enabled, update instant of last event received
if opts.idle_timeout.is_some() {
last_event = Some(Instant::now());
}

if let ReqExitPolicy::WaitForEventsAfterEOSE(num) = opts.exit_policy
{
if received_eose {
Expand Down Expand Up @@ -1455,17 +1480,17 @@ impl InnerRelay {
if self.state.is_auto_authentication_enabled() {
require_resubscription = true;
} else {
return (
return Some((
false,
Some(SubscriptionAutoClosedReason::Closed(message)),
); // No need to send CLOSE msg
)); // No need to send CLOSE msg
}
}
_ => {
return (
return Some((
false,
Some(SubscriptionAutoClosedReason::Closed(message)),
); // No need to send CLOSE msg
)); // No need to send CLOSE msg
}
}
}
Expand All @@ -1482,18 +1507,18 @@ impl InnerRelay {
}
}
RelayNotification::AuthenticationFailed => {
return (
return Some((
false,
Some(SubscriptionAutoClosedReason::AuthenticationFailed),
); // No need to send CLOSE msg
)); // No need to send CLOSE msg
}
RelayNotification::RelayStatus { status } => {
if status.is_disconnected() {
return (false, None); // No need to send CLOSE msg
return Some((false, None)); // No need to send CLOSE msg
}
}
RelayNotification::Shutdown => {
return (false, None); // No need to send CLOSE msg
return Some((false, None)); // No need to send CLOSE msg
}
_ => (),
}
Expand All @@ -1520,9 +1545,9 @@ impl InnerRelay {
.await;
}

(true, Some(SubscriptionAutoClosedReason::Completed)) // Need to send CLOSE msg
Some((true, Some(SubscriptionAutoClosedReason::Completed))) // Need to send CLOSE msg
})
.await
.await?
}

pub async fn unsubscribe(&self, id: SubscriptionId) -> Result<(), Error> {
Expand Down
9 changes: 8 additions & 1 deletion crates/nostr-relay-pool/src/relay/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ impl RelayOptions {
pub struct SubscribeAutoCloseOptions {
pub(super) exit_policy: ReqExitPolicy,
pub(super) timeout: Option<Duration>,
pub(super) idle_timeout: Option<Duration>,
}

impl SubscribeAutoCloseOptions {
Expand All @@ -172,11 +173,17 @@ impl SubscribeAutoCloseOptions {
self
}

/// Automatically close subscription after [Duration]
/// Automatically close subscription after [`Duration`].
pub fn timeout(mut self, timeout: Option<Duration>) -> Self {
self.timeout = timeout;
self
}

/// Automatically close subscription if no notifications/events are received within the [`Duration`].
pub fn idle_timeout(mut self, timeout: Option<Duration>) -> Self {
self.idle_timeout = timeout;
self
}
}

/// Subscribe options
Expand Down

0 comments on commit 60a10b1

Please sign in to comment.