diff --git a/src/domain/follow_change.rs b/src/domain/follow_change.rs index c3fd07e..1b78cfe 100644 --- a/src/domain/follow_change.rs +++ b/src/domain/follow_change.rs @@ -58,6 +58,10 @@ impl FollowChange { } } + pub fn is_notifiable(&self) -> bool { + matches!(self.change_type, ChangeType::Followed) + } + #[cfg(test)] pub fn with_friendly_follower(mut self, name: FriendlyId) -> Self { self.friendly_follower = name; diff --git a/src/domain/followee_notification_factory.rs b/src/domain/followee_notification_factory.rs index 70a74a5..5b1f0de 100644 --- a/src/domain/followee_notification_factory.rs +++ b/src/domain/followee_notification_factory.rs @@ -1,4 +1,4 @@ -use super::{ChangeType, FollowChange, NotificationMessage, MAX_FOLLOWERS_PER_BATCH}; +use super::{FollowChange, NotificationMessage, MAX_FOLLOWERS_PER_BATCH}; use nostr_sdk::PublicKey; use ordermap::OrderMap; use std::fmt::Debug; @@ -9,7 +9,7 @@ type Follower = PublicKey; type Followee = PublicKey; pub struct FolloweeNotificationFactory { - pub follow_changes: OrderMap, + pub follow_changes: OrderMap>, pub followee: Option, min_time_between_messages: Duration, emptied_at: Option, @@ -25,7 +25,7 @@ impl FolloweeNotificationFactory { } } - pub fn insert(&mut self, follow_change: FollowChange) { + pub fn insert(&mut self, follow_change: Box) { match &self.followee { Some(followee) => { assert_eq!( @@ -71,17 +71,14 @@ impl FolloweeNotificationFactory { self.follow_changes.is_empty() && self.should_flush() } - pub fn no_followers(&self) -> bool { - !self - .follow_changes - .iter() - .any(|(_, v)| matches!(v.change_type, ChangeType::Followed)) + pub fn no_notifiables(&self) -> bool { + !self.follow_changes.iter().any(|(_, v)| v.is_notifiable()) } // Only followers are accumulated into messages, unfollowers are not, but // all of them are drained pub fn flush(&mut self) -> Vec { - if self.no_followers() { + if self.no_notifiables() { return vec![]; } @@ -92,8 +89,8 @@ impl FolloweeNotificationFactory { .follow_changes .drain(..) .map(|(_, v)| v) - .filter(|v| matches!(v.change_type, ChangeType::Followed)) - .collect::>() + .filter(|v| v.is_notifiable()) + .collect::>>() .chunks(MAX_FOLLOWERS_PER_BATCH) .map(|batch| batch.to_vec().into()) .collect(); diff --git a/src/domain/follows_differ.rs b/src/domain/follows_differ.rs index eb1e67c..0bdd662 100644 --- a/src/domain/follows_differ.rs +++ b/src/domain/follows_differ.rs @@ -26,7 +26,7 @@ where { repo: Arc, nostr_client: Arc, - follow_change_sender: Sender, + follow_change_sender: Sender>, } #[async_trait] @@ -115,7 +115,7 @@ where pub fn new( repo: Arc, nostr_client: Arc, - follow_change_sender: Sender, + follow_change_sender: Sender>, ) -> Self { Self { repo, @@ -225,7 +225,7 @@ where } fn send_follow_change(&self, follow_change: FollowChange) -> Result<()> { - self.follow_change_sender.send(follow_change)?; + self.follow_change_sender.send(Box::new(follow_change))?; Ok(()) } } @@ -395,6 +395,7 @@ mod tests { use super::*; use crate::domain::contact_list_follow::ContactListFollow; use crate::repo::RepoError; + use assertables::*; use chrono::{Duration, Utc}; use nostr_sdk::PublicKey; use std::borrow::Cow; @@ -955,13 +956,15 @@ mod tests { assert!(!has_nos_agent(&event_with_no_tag)); } - async fn assert_follow_changes(contact_events: Vec, mut expected: Vec) { + async fn assert_follow_changes(contact_events: Vec, expected: Vec) { let follow_changes = get_follow_changes_from_contact_events(contact_events) .await - .unwrap(); + .unwrap() + .into_iter() + .map(|fc| *fc) + .collect::>(); - expected.sort(); // Sort the expected follow changes - assert_eq!(follow_changes, expected); + assert_bag_eq!(follow_changes, expected); } fn create_contact_event( @@ -1000,7 +1003,7 @@ mod tests { async fn get_follow_changes_from_contact_events( contact_events: Vec, - ) -> Result> { + ) -> Result>> { let (follow_change_sender, _) = channel(100); let repo = Arc::new(MockRepo::default()); let follows_differ = FollowsDiffer::new( @@ -1010,7 +1013,7 @@ mod tests { ); let mut follow_change_receiver = follow_change_sender.subscribe(); - let follow_changes: Arc>> = Arc::new(Mutex::new(Vec::new())); + let follow_changes: Arc>>> = Arc::new(Mutex::new(Vec::new())); let shared_follow_changes = follow_changes.clone(); let follow_change_task = tokio::spawn(async move { loop { diff --git a/src/domain/notification_factory.rs b/src/domain/notification_factory.rs index 7505563..9f04199 100644 --- a/src/domain/notification_factory.rs +++ b/src/domain/notification_factory.rs @@ -33,7 +33,7 @@ impl NotificationFactory { } } - pub fn insert(&mut self, follow_change: FollowChange) { + pub fn insert(&mut self, follow_change: Box) { let followee_info = self .followee_maps .entry(follow_change.followee) @@ -159,10 +159,10 @@ mod tests { let followee = Keys::generate().public_key(); let change1 = create_follow_change(follower, followee, seconds_to_datetime(1)); - notification_factory.insert(change1); + notification_factory.insert(change1.into()); let change2 = create_follow_change(follower, followee, seconds_to_datetime(1)); - notification_factory.insert(change2.clone()); + notification_factory.insert(change2.into()); // When they share the same time, the last change added should be kept let messages = notification_factory.flush(); @@ -179,10 +179,10 @@ mod tests { let followee = Keys::generate().public_key(); let newer_change = create_follow_change(follower, followee, seconds_to_datetime(2)); - notification_factory.insert(newer_change.clone()); + notification_factory.insert(newer_change.into()); let older_change = create_unfollow_change(follower, followee, seconds_to_datetime(1)); - notification_factory.insert(older_change); + notification_factory.insert(older_change.into()); let messages = notification_factory.flush(); assert_eq!(messages.len(), 1); @@ -200,16 +200,16 @@ mod tests { let change1 = create_follow_change(follower, followee1, seconds_to_datetime(2)); let change2 = create_follow_change(follower, followee2, seconds_to_datetime(1)); - notification_factory.insert(change1.clone()); - notification_factory.insert(change2.clone()); + notification_factory.insert(change1.clone().into()); + notification_factory.insert(change2.clone().into()); let mut messages = notification_factory.flush(); // Both changes should be kept since they have different followees assert_eq!( messages.sort(), [ - NotificationMessage::from(change1.clone()), - NotificationMessage::from(change2.clone()) + NotificationMessage::from(Box::new(change1)), + NotificationMessage::from(Box::new(change2)) ] .sort() ); @@ -225,8 +225,8 @@ mod tests { let follow_change = create_follow_change(follower, followee, seconds_to_datetime(1)); let unfollow_change = create_unfollow_change(follower, followee, seconds_to_datetime(2)); - notification_factory.insert(follow_change.clone()); - notification_factory.insert(unfollow_change.clone()); + notification_factory.insert(follow_change.into()); + notification_factory.insert(unfollow_change.into()); // The unfollow should cancel the follow assert_eq!(notification_factory.flush(), []); @@ -241,8 +241,8 @@ mod tests { let unfollow_change = create_unfollow_change(follower, followee, seconds_to_datetime(1)); let follow_change = create_follow_change(follower, followee, seconds_to_datetime(2)); - notification_factory.insert(unfollow_change.clone()); - notification_factory.insert(follow_change.clone()); + notification_factory.insert(unfollow_change.into()); + notification_factory.insert(follow_change.into()); // The follow should cancel the unfollow assert_eq!(notification_factory.flush(), []); @@ -458,7 +458,7 @@ mod tests { follower: PublicKey, ) -> FollowChange { let change = create_follow_change(follower, followee, seconds_to_datetime(1)); - notification_factory.insert(change.clone()); + notification_factory.insert(change.clone().into()); change } @@ -470,23 +470,13 @@ mod tests { insert_follower(notification_factory, followee, follower) } - fn insert_unfollower( - notification_factory: &mut NotificationFactory, - followee: PublicKey, - follower: PublicKey, - ) -> FollowChange { - let change = create_unfollow_change(follower, followee, seconds_to_datetime(1)); - notification_factory.insert(change.clone()); - change - } - fn insert_new_unfollower<'a>( notification_factory: &mut NotificationFactory, followee: PublicKey, ) -> FollowChange { let follower = Keys::generate().public_key(); let change = create_unfollow_change(follower, followee, seconds_to_datetime(1)); - notification_factory.insert(change.clone()); + notification_factory.insert(change.clone().into()); change } @@ -513,7 +503,8 @@ mod tests { let mut expected_batches = Vec::new(); for (_, changes) in expected { - let batch: NotificationMessage = (*changes).to_vec().into(); + let batch: NotificationMessage = + (*changes).to_vec().into_iter().map(|fc| fc.into()).into(); expected_batches.push(batch); } diff --git a/src/domain/notification_message.rs b/src/domain/notification_message.rs index b2aee40..53e6230 100644 --- a/src/domain/notification_message.rs +++ b/src/domain/notification_message.rs @@ -39,20 +39,22 @@ impl NotificationMessage { &self.followee } - pub fn add(&mut self, follow_change: FollowChange) { + pub fn add(&mut self, follow_change: Box) { assert!(self.followee == follow_change.followee, "Followee mismatch"); + assert!( self.len() < MAX_FOLLOWERS_PER_BATCH, "Too many followers in a single message, can't exceed {}", MAX_FOLLOWERS_PER_BATCH ); - if follow_change.change_type == ChangeType::Followed { - self.follows.insert(follow_change.follower); - } else { - //TODO typed instead of conditional - panic!("Only followed changes are allowed"); - } + assert_eq!( + follow_change.change_type, + ChangeType::Followed, + "Only followed changes can be messaged" + ); + + self.follows.insert(follow_change.follower); if self.len() == 1 { self.friendly_follower = Some(follow_change.friendly_follower); @@ -61,7 +63,7 @@ impl NotificationMessage { } } - pub fn add_all(&mut self, follow_changes: impl IntoIterator) { + pub fn add_all(&mut self, follow_changes: impl IntoIterator>) { for follow_change in follow_changes { self.add(follow_change); } @@ -131,8 +133,8 @@ impl Debug for NotificationMessage { } } -impl From for NotificationMessage { - fn from(change: FollowChange) -> Self { +impl From> for NotificationMessage { + fn from(change: Box) -> Self { let mut message = NotificationMessage::new(change.followee); message.add(change); message @@ -141,7 +143,7 @@ impl From for NotificationMessage { impl From for NotificationMessage where - T: IntoIterator, + T: IntoIterator>, { fn from(changes: T) -> Self { let mut changes = changes.into_iter(); @@ -171,7 +173,7 @@ mod tests { let mut message = NotificationMessage::new(followee1); - message.add(follower1_follow); + message.add(Box::new(follower1_follow)); assert_eq!( serde_json::to_string(&message).unwrap(), @@ -201,16 +203,16 @@ mod tests { let mut message = NotificationMessage::new(followee1); - message.add(follower1_follow); - message.add(follower2_follow); - message.add(follower2_follow2); - message.add(follower3_follow); + message.add(follower1_follow.into()); + message.add(follower2_follow.into()); + message.add(follower2_follow2.into()); + message.add(follower3_follow.into()); // TODO: This panics on github CI, but not locally. Investigate. #[cfg(not(feature = "ci"))] { let result = std::panic::catch_unwind(|| { - NotificationMessage::new(followee1).add(wrong_followee_change) + NotificationMessage::new(followee1).add(wrong_followee_change.into()) }); assert!(result.is_err()); } diff --git a/src/follow_change_handler.rs b/src/follow_change_handler.rs index cc743d1..c71e19e 100644 --- a/src/follow_change_handler.rs +++ b/src/follow_change_handler.rs @@ -52,8 +52,8 @@ where } #[async_trait] -impl WorkerTask for FollowChangeHandler { - async fn call(&self, mut follow_change: FollowChange) -> Result<(), Box> { +impl WorkerTask> for FollowChangeHandler { + async fn call(&self, mut follow_change: Box) -> Result<(), Box> { // Fetch friendly IDs for the pubkeys or fallback to the DB if it takes // more than timeout_secs. Whatever is found through the network is // cached. diff --git a/src/main.rs b/src/main.rs index 0d570a6..54e7f96 100644 --- a/src/main.rs +++ b/src/main.rs @@ -97,7 +97,7 @@ async fn start(settings: Settings) -> Result<()> { info!("Initializing workers for follower list diff calculation"); let shared_nostr_client = Arc::new(create_client()); let (follow_change_sender, _) = - broadcast::channel::(settings.follow_change_channel_size.get()); + broadcast::channel::>(settings.follow_change_channel_size.get()); let follows_differ_worker = FollowsDiffer::new( repo.clone(), shared_nostr_client.clone(), diff --git a/src/publisher.rs b/src/publisher.rs index 2789711..61e2106 100644 --- a/src/publisher.rs +++ b/src/publisher.rs @@ -33,7 +33,7 @@ pub trait PublishEvents { /// them to Google PubSub after certain time is elapsed or a size threshold is /// hit. pub struct Publisher { - sender: mpsc::Sender, + sender: mpsc::Sender>, } impl Publisher { @@ -43,7 +43,7 @@ impl Publisher { flush_period_seconds: NonZeroUsize, min_seconds_between_messages: NonZeroUsize, ) -> Result { - let (publication_sender, mut publication_receiver) = mpsc::channel::(1); + let (publication_sender, mut publication_receiver) = mpsc::channel::>(1); let mut buffer = NotificationFactory::new(min_seconds_between_messages); tokio::spawn(async move { @@ -104,8 +104,8 @@ impl Publisher { pub async fn queue_publication( &self, - follow_change: FollowChange, - ) -> Result<(), SendError> { + follow_change: Box, + ) -> Result<(), SendError>> { self.sender.send(follow_change).await } } @@ -176,38 +176,50 @@ mod tests { let followee2_pubkey = Keys::generate().public_key(); publisher - .queue_publication(FollowChange::new_followed( - seconds_to_datetime(1), - follower_pubkey, - followee1_pubkey, - )) + .queue_publication( + FollowChange::new_followed( + seconds_to_datetime(1), + follower_pubkey, + followee1_pubkey, + ) + .into(), + ) .await .unwrap(); publisher - .queue_publication(FollowChange::new_unfollowed( - seconds_to_datetime(1), - follower_pubkey, - followee2_pubkey, - )) + .queue_publication( + FollowChange::new_unfollowed( + seconds_to_datetime(1), + follower_pubkey, + followee2_pubkey, + ) + .into(), + ) .await .unwrap(); publisher - .queue_publication(FollowChange::new_followed( - seconds_to_datetime(2), - follower_pubkey, - followee2_pubkey, - )) + .queue_publication( + FollowChange::new_followed( + seconds_to_datetime(2), + follower_pubkey, + followee2_pubkey, + ) + .into(), + ) .await .unwrap(); publisher - .queue_publication(FollowChange::new_unfollowed( - seconds_to_datetime(3), - follower_pubkey, - followee2_pubkey, - )) + .queue_publication( + FollowChange::new_unfollowed( + seconds_to_datetime(3), + follower_pubkey, + followee2_pubkey, + ) + .into(), + ) .await .unwrap(); @@ -216,10 +228,12 @@ mod tests { let events = published_events.lock().await; assert_bag_eq!( events.clone(), - [NotificationMessage::from(FollowChange::new_followed( - seconds_to_datetime(1), - follower_pubkey, - followee1_pubkey + [NotificationMessage::from(Box::new( + FollowChange::new_followed( + seconds_to_datetime(1), + follower_pubkey, + followee1_pubkey + ) ))] ); } @@ -250,20 +264,26 @@ mod tests { let followee2_pubkey = Keys::generate().public_key(); publisher - .queue_publication(FollowChange::new_followed( - seconds_to_datetime(2), - follower_pubkey, - followee1_pubkey, - )) + .queue_publication( + FollowChange::new_followed( + seconds_to_datetime(2), + follower_pubkey, + followee1_pubkey, + ) + .into(), + ) .await .unwrap(); publisher - .queue_publication(FollowChange::new_unfollowed( - seconds_to_datetime(1), - follower_pubkey, - followee2_pubkey, - )) + .queue_publication( + FollowChange::new_unfollowed( + seconds_to_datetime(1), + follower_pubkey, + followee2_pubkey, + ) + .into(), + ) .await .unwrap(); @@ -272,10 +292,12 @@ mod tests { let events = published_events.lock().await; assert_bag_eq!( events.clone(), - [NotificationMessage::from(FollowChange::new_followed( - seconds_to_datetime(2), - follower_pubkey, - followee1_pubkey, + [NotificationMessage::from(Box::new( + FollowChange::new_followed( + seconds_to_datetime(2), + follower_pubkey, + followee1_pubkey, + ) )),], ); } @@ -306,11 +328,11 @@ mod tests { // Queue up follow changes publisher - .queue_publication(FollowChange::new_followed( + .queue_publication(Box::new(FollowChange::new_followed( seconds_to_datetime(1), follower_pubkey, followee_pubkey, - )) + ))) .await .unwrap();