Skip to content

Commit

Permalink
Removing old batches shouldn't reset rate limit
Browse files Browse the repository at this point in the history
  • Loading branch information
dcadenas committed Sep 3, 2024
1 parent f6e7a8d commit 2be6f17
Showing 1 changed file with 12 additions and 18 deletions.
30 changes: 12 additions & 18 deletions src/domain/follow_change_aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,23 +84,25 @@ impl<T: Clock> FollowChangeAggregator<T> {
OrderMap::with_capacity(self.followee_maps.len() / MAX_FOLLOWERS_PER_MESSAGE);

self.followee_maps.retain(|_followee, follow_change_map| {
let mut rate_limited = false;
let mut rate_limited_followee = false;

// TODO: extract_if would have been great here, keep an eye on nightly
let max_retention = &self.max_retention;
let rate_limiter = &mut self.rate_limiter;
follow_change_map.retain(|_follower, (inserted_at, follow_change)| {
rate_limited = collect_follow_change(
let (should_retain, is_followee_rate_limited) = collect_follow_change(
max_retention,
rate_limiter,
inserted_at,
&mut messages_map,
rate_limited,
rate_limited_followee,
follow_change,
&self.clock,
);

rate_limited
rate_limited_followee = is_followee_rate_limited;

should_retain
});

!follow_change_map.is_empty()
Expand Down Expand Up @@ -169,10 +171,10 @@ fn collect_follow_change<T: Clock>(
rate_limiter: &mut FollowChangeRateLimiter<T>,
inserted_at: &mut T::Instant,
messages_map: &mut OrderMap<PublicKey, Vec<FollowChangeBatch>>,
rate_limited: bool,
is_followee_rate_limited: bool,
follow_change: &mut FollowChange,
clock: &T,
) -> bool {
) -> (bool, bool) {
let followee = follow_change.followee;
//let retained_for_too_long = inserted_at.elapsed() > *max_retention;
let retained_for_too_long =
Expand All @@ -186,15 +188,8 @@ fn collect_follow_change<T: Clock>(
.expect("Expected a non-empty batch for the followee");

let current_message_has_room = latest_batch_for_followee.len() < MAX_FOLLOWERS_PER_MESSAGE;
let rate_limited = rate_limited || rate_limiter.check_key(&followee).is_err();

println!(
"rate_limited: {}, retained_for_too_long: {}, inserted_at_elapsed: {}, max retention: {}",
rate_limited,
retained_for_too_long,
Duration::from_nanos(clock.now().duration_since(*inserted_at).into()).as_secs(),
max_retention.as_secs()
);
let rate_limited = is_followee_rate_limited || rate_limiter.check_key(&followee).is_err();

if !rate_limited || retained_for_too_long {
let batch = if latest_batch_for_followee.is_empty()
|| (current_message_has_room && retained_for_too_long)
Expand All @@ -207,12 +202,11 @@ fn collect_follow_change<T: Clock>(
.expect("New batch should be available")
};

println!("adding {} to batch", follow_change.follower);
batch.add(follow_change.clone());
return false;
return (false, rate_limited);
}

rate_limited
(true, rate_limited)
}

fn record_metrics(messages: &[FollowChangeBatch], retained_follow_changes: usize) {
Expand Down

0 comments on commit 2be6f17

Please sign in to comment.