From e59e4b264df3743e86e47ef5225d561597a81e1b Mon Sep 17 00:00:00 2001 From: Daniel Cadenas Date: Wed, 25 Sep 2024 11:44:44 -0300 Subject: [PATCH] Adaptable rate of messages --- config/settings.yml | 2 +- src/domain/follow_change.rs | 2 +- src/domain/followee_notification_factory.rs | 86 ++++++++-- src/domain/notification_message.rs | 2 +- src/lib.rs | 1 + src/rate_limiter.rs | 178 ++++++++++++++++++++ 6 files changed, 251 insertions(+), 20 deletions(-) create mode 100644 src/rate_limiter.rs diff --git a/config/settings.yml b/config/settings.yml index 3f1dc08..eff53d4 100644 --- a/config/settings.yml +++ b/config/settings.yml @@ -13,6 +13,6 @@ followers: google_project_id: "pub-verse-app" google_topic: "follow-changes" flush_period_seconds: 60 # how often to flush the buffer to generate messages - min_seconds_between_messages: 43200 # Half a day, so 2 messages per day + min_seconds_between_messages: 900 # 15 minutes pagerank_cron_expression: "0 0 0 * * *" # Daily at midnight http_cache_seconds: 86400 # 24 hours \ No newline at end of file diff --git a/src/domain/follow_change.rs b/src/domain/follow_change.rs index cac6416..76c0387 100644 --- a/src/domain/follow_change.rs +++ b/src/domain/follow_change.rs @@ -83,7 +83,7 @@ impl FollowChange { self.friendly_followee = name; } - pub fn is_notifiable(&self) -> bool { + pub fn is_follower(&self) -> bool { matches!(self.change_type, ChangeType::Followed) } diff --git a/src/domain/followee_notification_factory.rs b/src/domain/followee_notification_factory.rs index b7c4f26..7919dd0 100644 --- a/src/domain/followee_notification_factory.rs +++ b/src/domain/followee_notification_factory.rs @@ -1,4 +1,5 @@ use super::{FollowChange, NotificationMessage, MAX_FOLLOWERS_PER_BATCH}; +use crate::rate_limiter::RateLimiter; use nostr_sdk::PublicKey; use ordermap::OrderMap; use std::fmt::Debug; @@ -8,19 +9,26 @@ use tokio::time::Instant; type Follower = PublicKey; type Followee = PublicKey; +/// Accumulates messages for a followee and flushes them in batches pub struct FolloweeNotificationFactory { pub follow_changes: OrderMap>, pub followee: Option, min_time_between_messages: Duration, + rate_limiter: RateLimiter, emptied_at: Option, } impl FolloweeNotificationFactory { pub fn new(min_time_between_messages: Duration) -> Self { + // Rate limiter for 1 message every 12 hours, bursts of 10 + let capacity = 10.0; + let rate_limiter = RateLimiter::new(capacity, Duration::from_secs(12 * 60 * 60)); + Self { follow_changes: OrderMap::with_capacity(100), followee: None, min_time_between_messages, + rate_limiter, emptied_at: None, } } @@ -55,46 +63,90 @@ impl FolloweeNotificationFactory { self.follow_changes.insert(*follower, follow_change); } - // This is basically a sliding window log rate limiter - // No flushes if the time since the last flush is less than min_time_between_messages - pub fn should_flush(&self) -> bool { - match self.emptied_at { - Some(emptied_at) => { - let now = Instant::now(); - assert!(emptied_at <= now); - now.duration_since(emptied_at) > self.min_time_between_messages - } + // Flushes if minimum time between messages has elapsed and rate limit is not exceeded. + // If a day has elapsed since the last flush, it will flush regardless of the rate limit. + pub fn should_flush(&mut self) -> bool { + let now = Instant::now(); + + let min_time_elapsed = match self.emptied_at { + Some(emptied_at) => now.duration_since(emptied_at) >= self.min_time_between_messages, None => true, + }; + + if !min_time_elapsed { + return false; } + + let one_day_elapsed = match self.emptied_at { + Some(emptied_at) => now.duration_since(emptied_at) >= Duration::from_secs(24 * 60 * 60), + None => true, + }; + + if one_day_elapsed { + return true; + } + + // Check if tokens are available without consuming them + self.rate_limiter.can_consume(1.0) } - pub fn should_delete(&self) -> bool { + pub fn followers_len(&self) -> usize { + self.follow_changes + .iter() + .filter(|(_, v)| v.is_follower()) + .count() + } + + pub fn should_delete(&mut self) -> bool { self.follow_changes.is_empty() && self.should_flush() } - pub fn no_notifiables(&self) -> bool { - !self.follow_changes.iter().any(|(_, v)| v.is_notifiable()) + pub fn no_followers(&self) -> bool { + !self.follow_changes.iter().any(|(_, v)| v.is_follower()) } // Only followers are accumulated into messages, unfollowers are not, but // all of them are drained pub fn flush(&mut self) -> Vec { - if self.no_notifiables() { + if self.no_followers() { return vec![]; } if self.should_flush() { - self.emptied_at = Some(Instant::now()); + let now = Instant::now(); + let one_day_elapsed = match self.emptied_at { + Some(emptied_at) => { + now.duration_since(emptied_at) >= Duration::from_secs(24 * 60 * 60) + } + None => true, + }; - return self + self.emptied_at = Some(now); + + let followers = self .follow_changes .drain(..) .map(|(_, v)| v) - .filter(|v| v.is_notifiable()) - .collect::>>() + .filter(|v| v.is_follower()) + .collect::>>(); + + let messages: Vec = followers .chunks(MAX_FOLLOWERS_PER_BATCH) .map(|batch| batch.to_vec().into()) .collect(); + + let tokens_needed = messages.len() as f64; + + if one_day_elapsed { + // Overcharge the rate limiter to consume tokens regardless of availability + self.rate_limiter.overcharge(tokens_needed); + } else { + if !self.rate_limiter.consume(tokens_needed) { + return vec![]; + } + } + + return messages; } vec![] diff --git a/src/domain/notification_message.rs b/src/domain/notification_message.rs index 21ab458..c455a85 100644 --- a/src/domain/notification_message.rs +++ b/src/domain/notification_message.rs @@ -52,7 +52,7 @@ impl NotificationMessage { ); assert!( - follow_change.is_notifiable(), + follow_change.is_follower(), "Only followed changes can be messaged" ); diff --git a/src/lib.rs b/src/lib.rs index 6378e53..496011b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -6,6 +6,7 @@ pub mod http_server; pub mod metrics; pub mod migrations; pub mod publisher; +pub mod rate_limiter; pub mod relay_subscriber; pub mod repo; pub mod scheduler; diff --git a/src/rate_limiter.rs b/src/rate_limiter.rs new file mode 100644 index 0000000..8cf35dd --- /dev/null +++ b/src/rate_limiter.rs @@ -0,0 +1,178 @@ +use tokio::time::{Duration, Instant}; + +/// Token bucket rate limiter. +pub struct RateLimiter { + burst_capacity: f64, // Maximum number of tokens + tokens: f64, // Current number of tokens (can be negative) + refill_rate_per_sec: f64, // Tokens added per second + last_refill: Instant, // Last time tokens were refilled + max_negative_tokens: f64, // Maximum allowed negative tokens (deficit) +} + +impl RateLimiter { + /// Creates a new RateLimiter. + pub fn new(burst_capacity: f64, refill_duration: Duration) -> Self { + let refill_rate_per_sec = burst_capacity / refill_duration.as_secs_f64(); + let tokens = burst_capacity; + let max_negative_tokens = burst_capacity * 1000.0; + + Self { + burst_capacity, + tokens, + refill_rate_per_sec, + last_refill: Instant::now(), + max_negative_tokens, + } + } + + /// Refills tokens based on the elapsed time since the last refill. + fn refill_tokens(&mut self) { + let now = Instant::now(); + let elapsed = now.duration_since(self.last_refill).as_secs_f64(); + self.last_refill = now; + + let tokens_to_add = elapsed * self.refill_rate_per_sec; + self.tokens = (self.tokens + tokens_to_add).min(self.burst_capacity); + } + + /// Checks if the specified number of tokens are available without consuming them. + pub fn can_consume(&mut self, tokens_needed: f64) -> bool { + self.refill_tokens(); + self.tokens >= tokens_needed + } + + /// Attempts to consume the specified number of tokens. + pub fn consume(&mut self, tokens_needed: f64) -> bool { + self.refill_tokens(); + if self.tokens >= tokens_needed { + self.tokens -= tokens_needed; + true + } else { + false + } + } + + /// Consumes tokens regardless of availability (can result in negative token count). + pub fn overcharge(&mut self, tokens_needed: f64) { + self.refill_tokens(); + self.tokens -= tokens_needed; + + if self.tokens < -self.max_negative_tokens { + self.tokens = -self.max_negative_tokens; + } + } + + pub fn get_available_tokens(&mut self) -> f64 { + self.refill_tokens(); + self.tokens + } +} + +#[cfg(test)] +mod tests { + use super::*; + use tokio::time::{self, advance}; + use tokio::time::{Duration, Instant}; + + #[tokio::test] + async fn test_initial_tokens() { + let capacity = 10.0; + let refill_duration = Duration::from_secs(86400); // 1 day + let rate_limiter = RateLimiter::new(capacity, refill_duration); + + assert_eq!(rate_limiter.tokens, capacity); + } + + #[tokio::test] + async fn test_consume_tokens_success() { + let capacity = 10.0; + let refill_duration = Duration::from_secs(86400); // 1 day + let mut rate_limiter = RateLimiter::new(capacity, refill_duration); + + // Consume 5 tokens + let result = rate_limiter.consume(5.0); + assert!(result); + assert_eq!(rate_limiter.tokens, 5.0); + } + + #[tokio::test] + async fn test_consume_tokens_failure() { + let capacity = 10.0; + let refill_duration = Duration::from_secs(86400); // 1 day + let mut rate_limiter = RateLimiter::new(capacity, refill_duration); + + // Attempt to consume more tokens than available + let result = rate_limiter.consume(15.0); + assert!(!result); + // Tokens should remain unchanged since consume failed + assert_eq!(rate_limiter.tokens, capacity); + } + + #[tokio::test] + async fn test_overcharge() { + let capacity = 10.0; + let refill_duration = Duration::from_secs(86400); // 1 day + let max_negative_tokens = capacity * 1000.0; + let mut rate_limiter = RateLimiter::new(capacity, refill_duration); + + // Overcharge by 15 tokens + rate_limiter.overcharge(15.0); + assert_eq!(rate_limiter.tokens, -5.0); + + // Overcharge beyond max_negative_tokens + rate_limiter.overcharge(max_negative_tokens * 2.0); + assert_eq!(rate_limiter.tokens, -max_negative_tokens); + } + + #[tokio::test(start_paused = true)] + async fn test_refill_tokens() { + let capacity = 10.0; + let refill_duration = Duration::from_secs(10); // Short duration for testing + let mut rate_limiter = RateLimiter::new(capacity, refill_duration); + + // Consume all tokens + let result = rate_limiter.consume(capacity); + assert!(result); + assert_eq!(rate_limiter.tokens, 0.0); + + // Advance time by half of the refill duration + time::advance(Duration::from_secs(5)).await; + rate_limiter.refill_tokens(); + // Should have refilled half the tokens + assert_eq!(rate_limiter.tokens, 5.0); + + // Advance time to complete the refill duration + time::advance(Duration::from_secs(5)).await; + rate_limiter.refill_tokens(); + assert_eq!(rate_limiter.tokens, capacity); + } + + #[tokio::test(start_paused = true)] + async fn test_overcharge_and_refill() { + let capacity = 10.0; + let refill_duration = Duration::from_secs(10); // Short duration for testing + let mut rate_limiter = RateLimiter::new(capacity, refill_duration); + + // Overcharge by 15 tokens + rate_limiter.overcharge(15.0); + assert_eq!(rate_limiter.tokens, -5.0); + + // Advance time to refill tokens + time::advance(Duration::from_secs(20)).await; // Wait enough to refill capacity + rate_limiter.refill_tokens(); + // Tokens should be at capacity, but deficit should be reduced + assert_eq!(rate_limiter.tokens, capacity); + } + + #[tokio::test] + async fn test_max_negative_tokens() { + let capacity = 10.0; + let refill_duration = Duration::from_secs(86400); // 1 day + let max_negative_tokens = capacity * 1000.0; + let mut rate_limiter = RateLimiter::new(capacity, refill_duration); + + // Overcharge repeatedly to exceed max_negative_tokens + rate_limiter.overcharge(max_negative_tokens + 50.0); + assert_eq!(rate_limiter.tokens, -max_negative_tokens); + } +}