Skip to content

Commit

Permalink
More edge cases
Browse files Browse the repository at this point in the history
  • Loading branch information
dcadenas committed Sep 5, 2024
1 parent 337c893 commit 3b3446b
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 80 deletions.
46 changes: 14 additions & 32 deletions src/domain/followee_notification_factory.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::FollowChange;
use crate::rate_counter::RateCounter;
use governor::{clock::Clock, clock::Reference, nanos::Nanos};
use governor::{clock::Clock, clock::Reference};
use nostr_sdk::PublicKey;
use ordermap::OrderMap;
use std::fmt::Debug;
Expand Down Expand Up @@ -63,19 +63,17 @@ impl<T: Clock> FolloweeNotificationFactory<T> {
follow_changes_to_publish: &mut Vec<CollectedFollowChange>,
) {
// TODO: extract_if would have been great here, keep an eye on nightly
let rate_counter = &mut self.rate_counter;
let follow_change_map = &mut self.follow_changes;

follow_change_map.retain(|_follower, (inserted_at, follow_change)| {
collect_follow_change(
max_retention,
inserted_at,
follow_changes_to_publish,
follow_change,
&self.clock,
rate_counter,
)
});
self.follow_changes
.retain(|_follower, (inserted_at, follow_change)| {
collect_follow_change(
max_retention,
inserted_at,
follow_changes_to_publish,
follow_change,
&self.clock,
&mut self.rate_counter,
)
});
}
}

Expand All @@ -87,21 +85,6 @@ impl<T: Clock> Debug for FolloweeNotificationFactory<T> {
}
}

/// Collects a follow change into a batched message and returns whether the change should
/// be retained for later due to rate limits.
///
/// - If the messages sent so far have been rate-limited, the change will be
/// retained for later processing but only within the max retention period.
///
/// - Once the retention period is elapsed, the retained changes are sent in batched messages.
/// Messages with only one item will include friendly ID information, the
/// notification service will show them as "[email protected] is a new
/// follower!"
/// Messages with multiple items will be shown as "You have 29 new followers and 29 unfollows!"
///
/// - The batching process ensures that no message contains more than
/// MAX_FOLLOWERS_PER_BATCH changes. If it didn't we'd hit the APNS max
/// payload limit.
fn collect_follow_change<T: Clock>(
max_retention: &Duration,
inserted_at: &mut T::Instant,
Expand All @@ -110,16 +93,15 @@ fn collect_follow_change<T: Clock>(
clock: &T,
rate_counter: &mut RateCounter<T>,
) -> bool {
let retained_for_too_long =
clock.now().duration_since(*inserted_at) > Nanos::new(max_retention.as_nanos() as u64);
let retained_for_too_long = clock.now().duration_since(*inserted_at) > (*max_retention).into();

if retained_for_too_long {
send_batchable(follow_change, follow_changes_to_publish, rate_counter);
return false;
}

let rate_limited = rate_counter.is_hit();
if rate_limited && !retained_for_too_long {
if rate_limited {
return true;
}

Expand Down
90 changes: 45 additions & 45 deletions src/domain/notification_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,13 @@ impl<T: Clock> NotificationFactory<T> {
followee_info.add_follower_change(follow_change)
}

/// Collects follow/unfollow changes per followee into NotificationMessage objects,
/// which essentially map to push notifications. Rate-limited changes are retained
/// for later so they can be included in a batch rather than sent individually and immediately.
/// Follow changes retained for over an hour, or those that fit within the current
/// batch, bypass the rate limit and are included immediately.
/// Collects follow/unfollow changes per followee into NotificationMessage
/// objects, which essentially map to push notifications. Rate-limited
/// changes are retained for later so they can be included in a batch rather
/// than sent individually and immediately. Those retained for
/// over an hour, or those that fit within the current batch, bypass the
/// rate limit and are included immediately. When there are not rate limits
/// and no retained messages, notifications are not batched.
pub fn drain_into_messages(&mut self) -> Vec<NotificationMessage> {
let initial_follow_changes_len = self.follow_changes_len();
let initial_followees_len = self.followees_len();
Expand All @@ -61,37 +63,41 @@ impl<T: Clock> NotificationFactory<T> {
!followee_changes.is_deletable()
});

let messages = messages_map
.into_iter()
.flat_map(|(followee, follow_changes)| {
let mut messages = Vec::new();
let mut batchables = Vec::new();

follow_changes
.into_iter()
.for_each(|collected_change| match collected_change {
CollectedFollowChange::Single(change) => {
let mut message = NotificationMessage::new(followee);
message.add(change);
messages.push(message);
}
CollectedFollowChange::Batchable(change) => {
batchables.push(change);
}
});

batchables
.chunks(MAX_FOLLOWERS_PER_BATCH)
.for_each(|batch| {
let mut message = NotificationMessage::new(followee);
for change in batch {
message.add(change.clone());
}
messages.push(message);
});
messages
})
.collect::<Vec<NotificationMessage>>();
let mut messages = Vec::new();
for (followee, follow_changes) in messages_map.into_iter() {
let mut followee_messages = Vec::new();
let mut singles = Vec::new();
let mut batchables = Vec::new();

for collected_change in follow_changes {
match collected_change {
CollectedFollowChange::Single(change) => {
singles.push(change);
}
CollectedFollowChange::Batchable(change) => {
batchables.push(change);
}
}
}

for batch in batchables.chunks(MAX_FOLLOWERS_PER_BATCH) {
let mut message = NotificationMessage::new(followee);
message.add_all(batch.to_owned());
followee_messages.push(message);
}

// If the batches created have room for more changes, we can add
// the singles to them, those that don't fit are sent as single
for message in followee_messages.iter_mut() {
message.drain_from(&mut singles);
}

for change in singles {
followee_messages.push(change.into());
}

messages.extend(followee_messages);
}

if messages.is_empty() {
debug!(
Expand Down Expand Up @@ -138,9 +144,7 @@ fn record_metrics(messages: &[NotificationMessage], retained_follow_changes: usi
let mut aggregated_follow_changes = 0;

for message in messages {
let message_len = message.len();

if message_len == 1 {
if message.is_single() {
individual_follow_changes += 1;
} else {
aggregated_follow_changes += 1;
Expand Down Expand Up @@ -397,9 +401,8 @@ mod tests {
messages,
messages.iter().map(|m| m.len()).sum::<usize>()
);
assert_eq!(
messages[0].len(),
1,
assert!(
messages[0].is_single(),
"Expected a single follow change in the message"
);

Expand Down Expand Up @@ -458,7 +461,6 @@ mod tests {
notification_factory.insert(change.clone());

let messages = notification_factory.drain_into_messages();

// Only the one for the new followee is sent as it's not rate limited, the other one hit the limit so it's retained.
assert_eq!(messages.len(), 1);
assert_eq!(notification_factory.follow_changes_len(), 1);
Expand All @@ -467,11 +469,9 @@ mod tests {
clock.advance(Duration::from_secs((max_retention_minutes as u64 + 1) * 60));

let messages = notification_factory.drain_into_messages();

// This one has a single item
assert_eq!(messages.len(), 1);
assert_eq!(messages[0].len(), 1);

// We keep the followee info for the time the rate limit counter can
// calculate the rate which is one hour. This is in case new changes
// arrive so we remember the rate limit for one more period
Expand Down
26 changes: 23 additions & 3 deletions src/domain/notification_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,29 @@ impl NotificationMessage {
}
}

pub fn drain_from(&mut self, changes: &mut Vec<FollowChange>) {
let batch_len = self.len();
let remaining = MAX_FOLLOWERS_PER_BATCH - batch_len;
let end = remaining.min(changes.len());

let singles_to_add: Vec<FollowChange> = changes.drain(..end).collect();

self.add_all(singles_to_add);
}

pub fn add_all(&mut self, follow_changes: impl IntoIterator<Item = FollowChange>) {
for follow_change in follow_changes {
self.add(follow_change);
}
}

pub fn len(&self) -> usize {
self.follows.len() + self.unfollows.len()
}

pub fn is_single(&self) -> bool {
self.len() == 1
}
}

fn serialize_as_vec_of_npubs<S>(
Expand Down Expand Up @@ -132,9 +152,9 @@ impl Debug for NotificationMessage {
}

impl From<FollowChange> for NotificationMessage {
fn from(val: FollowChange) -> Self {
let mut message = NotificationMessage::new(val.followee);
message.add(val);
fn from(change: FollowChange) -> Self {
let mut message = NotificationMessage::new(change.followee);
message.add(change);
message
}
}
Expand Down
47 changes: 47 additions & 0 deletions src/rate_counter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,50 @@ impl<T: Clock> RateCounter<T> {
self.items_sent.len() as u32 >= self.limit
}
}

#[cfg(test)]

mod tests {
use super::*;
use governor::clock::FakeRelativeClock;

#[test]
fn test_rate_counter() {
let clock = FakeRelativeClock::default();
// Configured for a rate of 2 items in 10 seconds before hitting the limit
let mut rate_counter = RateCounter::new(2, Duration::from_secs(10), clock.clone());

rate_counter.bump();
rate_counter.bump();
assert!(
rate_counter.is_hit(),
"2 items in 0 seconds should hit the limit"
);

clock.advance(Duration::from_secs(5));
assert!(
rate_counter.is_hit(),
"2 items in 5 seconds should hit the limit"
);

clock.advance(Duration::from_secs(5));
rate_counter.bump();
rate_counter.bump();
assert!(
rate_counter.is_hit(),
"4 items in 10 seconds should hit the limit"
);

clock.advance(Duration::from_secs(5));
assert!(
rate_counter.is_hit(),
"4 items in 15 seconds should hit the limit"
);

clock.advance(Duration::from_secs(5));
assert!(
!rate_counter.is_hit(),
"4 items in 20 seconds should not hit the limit"
);
}
}

0 comments on commit 3b3446b

Please sign in to comment.