From 3bd0880c6d3352a457dac6723016f911201746b7 Mon Sep 17 00:00:00 2001 From: Daniel Cadenas Date: Fri, 6 Sep 2024 11:40:06 -0300 Subject: [PATCH] Ensure retention is smaller than rate limit --- config/settings.yml | 6 +- src/config.rs | 5 +- src/domain/followee_notification_factory.rs | 31 ++-- src/domain/notification_factory.rs | 151 ++++++++++++-------- src/follow_change_handler.rs | 3 +- src/publisher.rs | 29 ++-- 6 files changed, 138 insertions(+), 87 deletions(-) diff --git a/config/settings.yml b/config/settings.yml index b2b0425..35fb0b4 100644 --- a/config/settings.yml +++ b/config/settings.yml @@ -11,5 +11,7 @@ followers: google_project_id: "pub-verse-app" google_topic: "follow-changes" seconds_threshold: 60 - max_messages_per_hour: 2 - max_retention_minutes: 60 \ No newline at end of file + max_retention_minutes: 60 + max_messages_per_rate_period: 2 + # Must be bigger than max_retention_minutes + rate_period_minutes: 61 \ No newline at end of file diff --git a/src/config.rs b/src/config.rs index 5fb7316..4b806cb 100644 --- a/src/config.rs +++ b/src/config.rs @@ -18,8 +18,9 @@ pub struct Settings { pub google_project_id: String, pub google_topic: String, pub seconds_threshold: u64, - pub max_messages_per_hour: usize, - pub max_retention_minutes: i64, + pub max_retention_minutes: usize, + pub max_messages_per_rate_period: usize, + pub rate_period_minutes: usize, } impl Configurable for Settings { diff --git a/src/domain/followee_notification_factory.rs b/src/domain/followee_notification_factory.rs index 5b6d1c7..de8382b 100644 --- a/src/domain/followee_notification_factory.rs +++ b/src/domain/followee_notification_factory.rs @@ -9,8 +9,6 @@ use std::time::Duration; type Follower = PublicKey; type Followee = PublicKey; -pub const ONE_HOUR: Duration = Duration::from_secs(60 * 60); - pub enum SendableFollowChange { Single(RetainedFollowChange), Batchable(RetainedFollowChange), @@ -65,8 +63,17 @@ pub struct FolloweeNotificationFactory { } impl FolloweeNotificationFactory { - pub fn new(max_messages_per_hour: usize, max_retention: Duration, clock: T) -> Self { - let rate_counter = RateCounter::new(max_messages_per_hour, ONE_HOUR, clock.clone()); + pub fn new( + max_messages_per_rate_period: usize, + rate_period_minutes: Duration, + max_retention: Duration, + clock: T, + ) -> Self { + let rate_counter = RateCounter::new( + max_messages_per_rate_period, + rate_period_minutes, + clock.clone(), + ); Self { rate_counter, @@ -167,14 +174,6 @@ impl FolloweeNotificationFactory { } } -impl Debug for FolloweeNotificationFactory { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("FolloweeNotificationFactory") - .field("follow_changes", &self.follow_changes) - .finish() - } -} - fn collect_sendables( max_retention: &Duration, follow_changes_to_publish: &mut Vec>, @@ -220,3 +219,11 @@ fn send( follow_changes_to_publish.push(collected_follow_change); rate_counter.bump(); } + +impl Debug for FolloweeNotificationFactory { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("FolloweeNotificationFactory") + .field("follow_changes", &self.follow_changes) + .finish() + } +} diff --git a/src/domain/notification_factory.rs b/src/domain/notification_factory.rs index 37056b1..cc29ffb 100644 --- a/src/domain/notification_factory.rs +++ b/src/domain/notification_factory.rs @@ -18,23 +18,40 @@ type Followee = PublicKey; /// the results into `NotificationMessage` instances per followee. pub struct NotificationFactory { followee_maps: OrderMap>, + max_messages_per_rate_period: usize, + rate_period: Duration, max_retention: Duration, - max_messages_per_hour: usize, clock: T, } impl NotificationFactory { - pub fn new(max_messages_per_hour: usize, max_retention_minutes: i64, clock: T) -> Result { - assert!(max_messages_per_hour > 0); + pub fn new( + max_messages_per_rate_period: usize, + rate_period_minutes: usize, + max_retention_minutes: usize, + clock: T, + ) -> Result { + if max_messages_per_rate_period == 0 + || rate_period_minutes == 0 + || max_retention_minutes == 0 + { + anyhow::bail!("Rate limit, rate period, and max retention must be greater than 0"); + } + + if rate_period_minutes <= max_retention_minutes { + anyhow::bail!("Rate period must be greater than max retention"); + } + info!( - "After {} messages per hour, additional messages will be retained for {} minutes.", - max_messages_per_hour, max_retention_minutes + "After {} messages in {} minutes, additional messages will be retained for {} minutes.", + max_messages_per_rate_period, rate_period_minutes, max_retention_minutes ); Ok(Self { followee_maps: OrderMap::with_capacity(1_000), max_retention: Duration::from_secs(max_retention_minutes as u64 * 60), - max_messages_per_hour, + max_messages_per_rate_period, + rate_period: Duration::from_secs(rate_period_minutes as u64 * 60), clock, }) } @@ -45,7 +62,8 @@ impl NotificationFactory { .entry(follow_change.followee) .or_insert_with_key(|_| { FolloweeNotificationFactory::new( - self.max_messages_per_hour, + self.max_messages_per_rate_period, + self.rate_period, self.max_retention, self.clock.clone(), ) @@ -266,7 +284,6 @@ fn record_metrics(messages: &[NotificationMessage], retained_follow_changes: usi #[cfg(test)] mod tests { use super::*; - use crate::domain::followee_notification_factory::ONE_HOUR; use assertables::*; use chrono::{DateTime, Utc}; use governor::clock::FakeRelativeClock; @@ -290,9 +307,9 @@ mod tests { } #[test] - fn test_insert_unique_follow_change() { + fn test_insert_follow_change() { let mut notification_factory = - NotificationFactory::new(10, 10, FakeRelativeClock::default()).unwrap(); + NotificationFactory::new(10, 10, 9, FakeRelativeClock::default()).unwrap(); let follower = Keys::generate().public_key(); let followee = Keys::generate().public_key(); @@ -312,27 +329,27 @@ mod tests { #[test] fn test_does_not_replace_with_older_change() { - let mut unique_changes = - NotificationFactory::new(10, 10, FakeRelativeClock::default()).unwrap(); + let mut notification_factory = + NotificationFactory::new(10, 10, 9, FakeRelativeClock::default()).unwrap(); let follower = Keys::generate().public_key(); let followee = Keys::generate().public_key(); let newer_change = create_follow_change(follower, followee, seconds_to_datetime(2)); - unique_changes.insert(newer_change.clone()); + notification_factory.insert(newer_change.clone()); let older_change = create_unfollow_change(follower, followee, seconds_to_datetime(1)); - unique_changes.insert(older_change); + notification_factory.insert(older_change); - let messages = unique_changes.drain_into_messages(); + let messages = notification_factory.drain_into_messages(); assert_eq!(messages.len(), 1); assert_message_eq(&messages[0], &followee, [follower], &[]); } #[test] fn test_insert_same_follower_different_followee() { - let mut unique_changes = - NotificationFactory::new(10, 10, FakeRelativeClock::default()).unwrap(); + let mut notification_factory = + NotificationFactory::new(10, 10, 9, FakeRelativeClock::default()).unwrap(); let follower = Keys::generate().public_key(); let followee1 = Keys::generate().public_key(); @@ -341,10 +358,10 @@ mod tests { let change1 = create_follow_change(follower, followee1, seconds_to_datetime(2)); let change2 = create_follow_change(follower, followee2, seconds_to_datetime(1)); - unique_changes.insert(change1.clone()); - unique_changes.insert(change2.clone()); + notification_factory.insert(change1.clone()); + notification_factory.insert(change2.clone()); - let mut messages = unique_changes.drain_into_messages(); + let mut messages = notification_factory.drain_into_messages(); // Both changes should be kept since they have different followees assert_eq!( messages.sort(), @@ -358,8 +375,8 @@ mod tests { #[test] fn test_an_unfollow_cancels_a_follow() { - let mut unique_changes = - NotificationFactory::new(10, 10, FakeRelativeClock::default()).unwrap(); + let mut notification_factory = + NotificationFactory::new(10, 10, 9, FakeRelativeClock::default()).unwrap(); let follower = Keys::generate().public_key(); let followee = Keys::generate().public_key(); @@ -367,17 +384,17 @@ mod tests { let follow_change = create_follow_change(follower, followee, seconds_to_datetime(1)); let unfollow_change = create_unfollow_change(follower, followee, seconds_to_datetime(2)); - unique_changes.insert(follow_change.clone()); - unique_changes.insert(unfollow_change.clone()); + notification_factory.insert(follow_change.clone()); + notification_factory.insert(unfollow_change.clone()); // The unfollow should cancel the follow - assert_eq!(unique_changes.drain_into_messages(), []); + assert_eq!(notification_factory.drain_into_messages(), []); } #[test] fn test_a_follow_cancels_an_unfollow() { - let mut unique_changes = - NotificationFactory::new(10, 10, FakeRelativeClock::default()).unwrap(); + let mut notification_factory = + NotificationFactory::new(10, 10, 9, FakeRelativeClock::default()).unwrap(); let follower = Keys::generate().public_key(); let followee = Keys::generate().public_key(); @@ -385,20 +402,22 @@ mod tests { let unfollow_change = create_unfollow_change(follower, followee, seconds_to_datetime(1)); let follow_change = create_follow_change(follower, followee, seconds_to_datetime(2)); - unique_changes.insert(unfollow_change.clone()); - unique_changes.insert(follow_change.clone()); + notification_factory.insert(unfollow_change.clone()); + notification_factory.insert(follow_change.clone()); // The follow should cancel the unfollow - assert_eq!(unique_changes.drain_into_messages(), []); + assert_eq!(notification_factory.drain_into_messages(), []); } #[test] fn test_single_item_batch_before_rate_limit_is_hit() { - let max_follows_per_hour = 2; - let max_retention_minutes = 10; + let max_messages_per_rate_period = 2; + let max_retention_minutes = 9; + let rate_period_minutes = 10; let mut notification_factory = NotificationFactory::new( - max_follows_per_hour, + max_messages_per_rate_period, + rate_period_minutes, max_retention_minutes, FakeRelativeClock::default(), ) @@ -431,13 +450,18 @@ mod tests { #[test] fn test_no_message_after_rate_limit_is_hit_but_retention_not_elapsed() { // After one single follow change the rate limit will be hit - let max_messages_per_hour = 1; - let max_retention_minutes = 10; + let max_messages_per_rate_period = 1; + let max_retention_minutes = 9; + let rate_period_minutes = 10; let clock = FakeRelativeClock::default(); - let mut notification_factory = - NotificationFactory::new(max_messages_per_hour, max_retention_minutes, clock.clone()) - .unwrap(); + let mut notification_factory = NotificationFactory::new( + max_messages_per_rate_period, + rate_period_minutes, + max_retention_minutes, + clock.clone(), + ) + .unwrap(); let follower1 = Keys::generate().public_key(); let follower2 = Keys::generate().public_key(); @@ -448,7 +472,7 @@ mod tests { notification_factory.insert(change1.clone()); // We hit the rate limit, but the retention time hasn't elapsed yet. - // The rate is one follow per hour, so we only get one message, the + // The rate is one follow per 10 minutes, so we only get one message, the // other one is retained. let messages = notification_factory.drain_into_messages(); assert_batches_eq(&messages, &[(followee, &[change1])]); @@ -470,21 +494,26 @@ mod tests { assert_batches_eq(&messages, &[]); // We clear the rate limit - clock.advance(Duration::from_secs(50 * 60)); + clock.advance(Duration::from_secs(rate_period_minutes as u64 * 60)); let messages = notification_factory.drain_into_messages(); assert_batches_eq(&messages, &[(followee, &[change2, change3])]); } #[test] fn test_batch_sizes_after_rate_limit_and_retention_period() { - let max_messages_per_hour = 1; // After one single follow change, the rate limit will be hit + let max_messages_per_rate_period = 1; // After one single follow change, the rate limit will be hit let max_retention_minutes = 10; + let rate_period_minutes = 20; const MAX_FOLLOWERS_TRIPLED: usize = 3 * MAX_FOLLOWERS_PER_BATCH as usize; // The number of messages we will send for testing let clock = FakeRelativeClock::default(); - let mut notification_factory = - NotificationFactory::new(max_messages_per_hour, max_retention_minutes, clock.clone()) - .unwrap(); + let mut notification_factory = NotificationFactory::new( + max_messages_per_rate_period, + rate_period_minutes, + max_retention_minutes, + clock.clone(), + ) + .unwrap(); let followee = Keys::generate().public_key(); @@ -586,7 +615,7 @@ mod tests { assert_eq!(notification_factory.followees_len(), 2); assert_eq!(notification_factory.follow_changes_len(), 0); - clock.advance(ONE_HOUR + Duration::from_secs(1)); + clock.advance(Duration::from_secs(rate_period_minutes as u64 * 60 + 1)); let messages = notification_factory.drain_into_messages(); // Now all is cleared @@ -597,46 +626,46 @@ mod tests { #[test] fn test_is_empty_and_len() { - let mut unique_changes = - NotificationFactory::new(10, 10, FakeRelativeClock::default()).unwrap(); + let mut notification_factory = + NotificationFactory::new(10, 10, 9, FakeRelativeClock::default()).unwrap(); let follower1 = Keys::generate().public_key(); let follower2 = Keys::generate().public_key(); let followee1 = Keys::generate().public_key(); let followee2 = Keys::generate().public_key(); - assert!(unique_changes.is_empty()); - assert_eq!(unique_changes.follow_changes_len(), 0); - assert_eq!(unique_changes.followees_len(), 0); + assert!(notification_factory.is_empty()); + assert_eq!(notification_factory.follow_changes_len(), 0); + assert_eq!(notification_factory.followees_len(), 0); let change1 = create_follow_change(follower1, followee1, seconds_to_datetime(1)); let change2 = create_follow_change(follower1, followee2, seconds_to_datetime(1)); let change3 = create_follow_change(follower2, followee2, seconds_to_datetime(1)); - unique_changes.insert(change1); - unique_changes.insert(change2); - unique_changes.insert(change3); + notification_factory.insert(change1); + notification_factory.insert(change2); + notification_factory.insert(change3); - assert!(!unique_changes.is_empty()); - assert_eq!(unique_changes.follow_changes_len(), 3); - assert_eq!(unique_changes.followees_len(), 2); + assert!(!notification_factory.is_empty()); + assert_eq!(notification_factory.follow_changes_len(), 3); + assert_eq!(notification_factory.followees_len(), 2); } #[test] fn test_drain_clears_map() { - let mut unique_changes = - NotificationFactory::new(10, 10, FakeRelativeClock::default()).unwrap(); + let mut notification_factory = + NotificationFactory::new(10, 10, 9, FakeRelativeClock::default()).unwrap(); let follower = Keys::generate().public_key(); let followee = Keys::generate().public_key(); let change1 = create_follow_change(follower, followee, seconds_to_datetime(2)); - unique_changes.insert(change1); + notification_factory.insert(change1); let change2 = create_follow_change(follower, followee, seconds_to_datetime(1)); - unique_changes.insert(change2); + notification_factory.insert(change2); - let changes = unique_changes.drain_into_messages(); + let changes = notification_factory.drain_into_messages(); assert_eq!(changes.len(), 1); assert_message_eq(&changes[0], &followee, [follower], &[]) } diff --git a/src/follow_change_handler.rs b/src/follow_change_handler.rs index 414a6ec..55d7e38 100644 --- a/src/follow_change_handler.rs +++ b/src/follow_change_handler.rs @@ -38,7 +38,8 @@ where cancellation_token.clone(), google_publisher_client, settings.seconds_threshold, - settings.max_messages_per_hour, + settings.max_messages_per_rate_period, + settings.rate_period_minutes, settings.max_retention_minutes, DefaultClock::default(), ) diff --git a/src/publisher.rs b/src/publisher.rs index bfc2777..2560a4e 100644 --- a/src/publisher.rs +++ b/src/publisher.rs @@ -20,15 +20,20 @@ impl Publisher { cancellation_token: CancellationToken, mut client: impl PublishEvents + Send + Sync + 'static, seconds_threshold: u64, - max_messages_per_hour: usize, - max_retention_minutes: i64, + max_messages_per_rate_period: usize, + rate_period_minutes: usize, + max_retention_minutes: usize, clock: T, ) -> Result { let (publication_sender, mut publication_receiver) = mpsc::channel::(1); - let mut buffer = - NotificationFactory::new(max_messages_per_hour, max_retention_minutes, clock) - .map_err(|_| PublisherError::BufferInit)?; + let mut buffer = NotificationFactory::new( + max_messages_per_rate_period, + rate_period_minutes, + max_retention_minutes, + clock, + ) + .map_err(|_| PublisherError::BufferInit)?; tokio::spawn(async move { info!("Publishing messages every {} seconds", seconds_threshold); @@ -147,6 +152,7 @@ mod tests { let cancellation_token = CancellationToken::new(); let seconds_threshold = 1; let max_follows_per_hour = 10; + let rate_period_minutes = 10; let max_retention_minutes = 1; let publisher = Publisher::create( @@ -154,6 +160,7 @@ mod tests { mock_client, seconds_threshold, max_follows_per_hour, + rate_period_minutes, max_retention_minutes, FakeRelativeClock::default(), ) @@ -232,14 +239,16 @@ mod tests { let cancellation_token = CancellationToken::new(); let seconds_threshold = 1; - let max_follows_per_hour = 10; + let max_messages_per_rate_period = 10; + let rate_period_minutes = 10; let max_retention_minutes = 1; let publisher = Publisher::create( cancellation_token.clone(), mock_client, seconds_threshold, - max_follows_per_hour, + max_messages_per_rate_period, + rate_period_minutes, max_retention_minutes, FakeRelativeClock::default(), ) @@ -298,14 +307,16 @@ mod tests { let cancellation_token = CancellationToken::new(); let seconds_threshold = 1; - let max_follows_per_hour = 10; + let max_messages_per_rate_period = 10; + let rate_period_minutes = 10; let max_retention_minutes = 1; let publisher = Publisher::create( cancellation_token.clone(), mock_client, seconds_threshold, - max_follows_per_hour, + max_messages_per_rate_period, + rate_period_minutes, max_retention_minutes, FakeRelativeClock::default(), )