diff --git a/src/domain/follow_change_aggregator.rs b/src/domain/follow_change_aggregator.rs index 5be9720..7f8b071 100644 --- a/src/domain/follow_change_aggregator.rs +++ b/src/domain/follow_change_aggregator.rs @@ -84,23 +84,25 @@ impl FollowChangeAggregator { OrderMap::with_capacity(self.followee_maps.len() / MAX_FOLLOWERS_PER_MESSAGE); self.followee_maps.retain(|_followee, follow_change_map| { - let mut rate_limited = false; + let mut rate_limited_followee = false; // TODO: extract_if would have been great here, keep an eye on nightly let max_retention = &self.max_retention; let rate_limiter = &mut self.rate_limiter; follow_change_map.retain(|_follower, (inserted_at, follow_change)| { - rate_limited = collect_follow_change( + let (should_retain, is_followee_rate_limited) = collect_follow_change( max_retention, rate_limiter, inserted_at, &mut messages_map, - rate_limited, + rate_limited_followee, follow_change, &self.clock, ); - rate_limited + rate_limited_followee = is_followee_rate_limited; + + should_retain }); !follow_change_map.is_empty() @@ -169,10 +171,10 @@ fn collect_follow_change( rate_limiter: &mut FollowChangeRateLimiter, inserted_at: &mut T::Instant, messages_map: &mut OrderMap>, - rate_limited: bool, + is_followee_rate_limited: bool, follow_change: &mut FollowChange, clock: &T, -) -> bool { +) -> (bool, bool) { let followee = follow_change.followee; //let retained_for_too_long = inserted_at.elapsed() > *max_retention; let retained_for_too_long = @@ -186,15 +188,8 @@ fn collect_follow_change( .expect("Expected a non-empty batch for the followee"); let current_message_has_room = latest_batch_for_followee.len() < MAX_FOLLOWERS_PER_MESSAGE; - let rate_limited = rate_limited || rate_limiter.check_key(&followee).is_err(); - - println!( - "rate_limited: {}, retained_for_too_long: {}, inserted_at_elapsed: {}, max retention: {}", - rate_limited, - retained_for_too_long, - Duration::from_nanos(clock.now().duration_since(*inserted_at).into()).as_secs(), - max_retention.as_secs() - ); + let rate_limited = is_followee_rate_limited || rate_limiter.check_key(&followee).is_err(); + if !rate_limited || retained_for_too_long { let batch = if latest_batch_for_followee.is_empty() || (current_message_has_room && retained_for_too_long) @@ -207,12 +202,11 @@ fn collect_follow_change( .expect("New batch should be available") }; - println!("adding {} to batch", follow_change.follower); batch.add(follow_change.clone()); - return false; + return (false, rate_limited); } - rate_limited + (true, rate_limited) } fn record_metrics(messages: &[FollowChangeBatch], retained_follow_changes: usize) {