Skip to content

Commit

Permalink
Decay is more continuous and understandable (#96)
Browse files Browse the repository at this point in the history
  • Loading branch information
That3Percent authored and Theodus committed Dec 17, 2021
1 parent fb56576 commit 82502d6
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 73 deletions.
93 changes: 66 additions & 27 deletions src/indexer_selection/decay.rs
Original file line number Diff line number Diff line change
@@ -1,60 +1,97 @@
use crate::indexer_selection::utility::{concave_utility, SelectionFactor, UtilityAggregator};

pub trait Decay {
fn expected_utility(&self, u_a: f64) -> f64;
fn shift(&mut self, next: &Self, fraction: f64);
fn shift(&mut self, next: Option<&mut Self>, fraction: f64, keep: f64);
fn clear(&mut self);
}

pub trait DecayUtility {
fn count(&self) -> f64;
fn expected_utility(&self, u_a: f64) -> f64;
}

/// The DecayBuffer accounts for selection factors over various time-frames. Currently, these time
/// frames are 7 consecutive powers of 4 minute intervals, i.e. [1m, 4m, 16m, ... 4096m]. This
/// assumes that `decay` is called once every minute.
#[derive(Default)]
pub struct DecayBuffer<T: Default + Decay> {
pub struct DecayBuffer<T> {
frames: [T; 7],
decay_ticks: u64,
}

impl<T: Default + Decay> DecayBuffer<T> {
pub fn current_mut(&mut self) -> &mut T {
&mut self.frames[0]
}

impl<T: DecayUtility> DecayBuffer<T> {
pub fn expected_utility(&self, u_a: f64) -> SelectionFactor {
// TODO: This weight seems to have no appreciable effect
const FRAME_INDEX_WIEGTH: f64 = 0.5;
let mut aggregator = UtilityAggregator::new();
for (i, frame) in self.frames.iter().enumerate() {
let index_weight = FRAME_INDEX_WIEGTH * (i + 1) as f64;
// 1/10 query per minute = 85% confident.
let confidence = concave_utility(frame.count() * 10.0 / 4.0_f64.powf(i as f64), 0.19);

// Buckets decrease in relevance rapidly, making
// the most recent buckets contribute the most to the
// final result.
let importance = 1.0 / (1.0 + (i as f64));

aggregator.add(SelectionFactor {
utility: frame.expected_utility(u_a),
weight: concave_utility(index_weight * frame.count(), u_a),
weight: confidence * importance,
});
}

let agg_utility = aggregator.crunch();
SelectionFactor {
utility: agg_utility,
weight: 1.0,
}
}
}

impl<T> DecayBuffer<T> {
pub fn current_mut(&mut self) -> &mut T {
&mut self.frames[0]
}
}

impl<T: Decay> DecayBuffer<T> {
/*
The idea here is to pretend that we have a whole bunch of buckets of the minimum window size (1 min each)
A whole 8191 of them! Queries contribute to the bucket they belong, and each time we decay each bucket shifts down
and retains 99.5% of the information in the bucket.
The above would be bad for performance, so we achieve almost the same result by creating "frames"
of increasing size, each of which can hold many buckets. When we decay we do the same operation as above,
assuming that each bucket within a frame holds the average value of all buckets within that frame. This results in some "smearing"
of the data, but reduces the size of our array from 8191 to just 7 at the expense of slight complexity and loss of accuracy.
*/
pub fn decay(&mut self) {
// For each frame `frame[i]`,
// when `decay_ticks` is divisible by `pow(4, i-1)`,
// reduce the value of `frame[i]` by 1/4 and add the value of `frame[i-1]` to `frame[i]`.
self.decay_ticks += 1;
for i in (1..7).rev() {
let next_frame_ticks = 1 << ((i - 1) * 2);
if (self.decay_ticks % next_frame_ticks) != 0 {
continue;
}
let (l, r) = self.frames.split_at_mut(i);
let (prev, this) = (&l.last().unwrap(), &mut r[0]);
this.shift(prev, 0.25);
for i in (0..7).rev() {
// Select buckets [i], [i+]
let (l, r) = self.frames.split_at_mut(i + 1);
let (this, next) = (l.last_mut().unwrap(), r.get_mut(0));

// Decay rate of 0.5% per smallest bucket resolution
// That is, each time we shift decay 0.5% per non-aggregated bucket
// and aggregate the results into frames.
let retain = 0.995_f64.powf(4_f64.powf(i as f64));

// Shift one bucket, aggregating the results.
this.shift(next, 0.25_f64.powf(i as f64), retain);
}
// Clear the current frame.
self.frames[0].clear();
}
}

impl Decay for f64 {
fn shift(&mut self, next: Option<&mut Self>, fraction: f64, retain: f64) {
// Remove some amount of value from this frame
let take = *self * fraction;
*self -= take;
if let Some(next) = next {
// And add that value to the next frame, destroying some of the value
// as we go to forget over time.
*next += take * retain;
}
}

fn clear(&mut self) {
*self = 0.0;
}
}

Expand Down Expand Up @@ -131,6 +168,7 @@ mod tests {
}

#[test]
#[ignore = "Writes output to disk"]
fn reputation_response() {
let config = ResponseConfig {
title: "reputation-outage-response",
Expand Down Expand Up @@ -179,6 +217,7 @@ mod tests {
}

#[test]
#[ignore = "Writes output to disk"]
fn penalty_response() {
let config = ResponseConfig {
title: "reputation-penalty-response",
Expand Down
47 changes: 26 additions & 21 deletions src/indexer_selection/performance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,31 @@ use crate::{
};
use ordered_float::NotNan;

use super::decay::DecayUtility;

#[derive(Clone, Debug, Default)]
pub struct Performance {
performance: Vec<f64>,
count: Vec<f64>,
}

impl Decay for Performance {
fn shift(&mut self, mut next: Option<&mut Self>, fraction: f64, keep: f64) {
// For each quantized bucket, find the corresponding quantized bucket in
// the next frame, and shift information into it.
for (count, performance) in self.count.iter_mut().zip(self.performance.iter().cloned()) {
let next_performance = next.as_deref_mut().map(|n| n.bucket_mut(performance));
count.shift(next_performance, fraction, keep);
}
}

fn clear(&mut self) {
self.performance.clear();
self.count.clear();
}
}

impl DecayUtility for Performance {
fn expected_utility(&self, u_a: f64) -> f64 {
let mut agg_count = 0.0;
let mut agg_utility = 0.0;
Expand All @@ -46,21 +64,6 @@ impl Decay for Performance {
}
agg_utility / agg_count
}

fn shift(&mut self, next: &Self, fraction: f64) {
for count in &mut self.count {
*count = *count * fraction;
}
for (count, performance) in next.iter() {
self.add_successful_queries_inner(performance, count);
}
}

fn clear(&mut self) {
self.performance.clear();
self.count.clear();
}

fn count(&self) -> f64 {
self.count.iter().sum()
}
Expand Down Expand Up @@ -101,23 +104,25 @@ impl Performance {
}

pub fn add_successful_query(&mut self, duration: Duration) {
self.add_successful_queries_inner(Self::quantized_performance(duration), 1.0);
*self.bucket_mut(Self::quantized_performance(duration)) += 1.0;
}

fn add_successful_queries_inner(&mut self, quantized_performance: f64, count: f64) {
fn bucket_mut(&mut self, quantized_performance: f64) -> &mut f64 {
// Safety: Performance is NotNan. See also 47632ed6-4dcc-4b39-b064-c0ca01560626
match unsafe {
let index = match unsafe {
self.performance
.binary_search_by_key(&NotNan::new_unchecked(quantized_performance), |a| {
NotNan::new_unchecked(*a)
})
} {
Ok(index) => self.count[index] += count,
Ok(index) => index,
Err(index) => {
self.performance.insert(index, quantized_performance);
self.count.insert(index, count)
self.count.insert(index, 0.0);
index
}
}
};
&mut self.count[index]
}

fn iter<'a>(&'a self) -> impl 'a + Iterator<Item = (f64, f64)> {
Expand Down
64 changes: 40 additions & 24 deletions src/indexer_selection/reputation.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::indexer_selection::decay::Decay;
use crate::indexer_selection::decay::{Decay, DecayUtility};

// TODO: Other factors like how long the indexer has been in the network.
// Because reliability (which is what is captured here now) is useful on it's own, it may be useful
Expand All @@ -12,39 +12,58 @@ pub struct Reputation {
penalties: f64,
}

impl Decay for Reputation {
impl DecayUtility for Reputation {
fn expected_utility(&self, _u_a: f64) -> f64 {
let total_queries = self.successful_queries + self.failed_queries;
if total_queries == 0.0 {
return 0.0;
}
// Need to add a free success so that no buckets have a utility of 0.
// Even with a non-1 weight a utility of 0 ends up bringing the result
// to 0 which we can't afford.
let successful_queries = self.successful_queries + 0.5;
let total_queries = successful_queries + self.failed_queries;

// Use the ratio directly as utility, rather than passing it through concave_utility. This
// is because the likelihood a query will complete is a pretty straightforward conversion to
// utility. Eg: If we send 3 queries to each indexer A and B, and A returns 1 success, and B
// returns 3 successes - for some fixed value of a query the utility is number of returned
// queries * value of query.
let ratio = self.successful_queries / total_queries;
let mut ratio = successful_queries / total_queries;

// We want a non-linear relationship between success rate and utility.
// In a world which looks at count of nines, it's not acceptable to send
// a 50% success indexer 50% queries.
ratio *= ratio;

let penalty = ((self.penalties / total_queries) + 1.0).recip();
ratio * penalty
}

fn shift(&mut self, next: &Self, fraction: f64) {
self.successful_queries *= fraction;
self.successful_queries += next.successful_queries;
self.failed_queries *= fraction;
self.failed_queries += next.failed_queries;
self.penalties *= fraction;
self.penalties += next.penalties;
fn count(&self) -> f64 {
self.successful_queries + self.failed_queries
}
}

fn clear(&mut self) {
self.successful_queries = 0.0;
self.failed_queries = 0.0;
self.penalties = 0.0;
impl Decay for Reputation {
fn shift(&mut self, mut next: Option<&mut Self>, fraction: f64, keep: f64) {
self.successful_queries.shift(
next.as_deref_mut().map(|s| &mut s.successful_queries),
fraction,
keep,
);
self.failed_queries.shift(
next.as_deref_mut().map(|s| &mut s.failed_queries),
fraction,
keep,
);
self.penalties.shift(
next.as_deref_mut().map(|s| &mut s.penalties),
fraction,
keep,
);
}

fn count(&self) -> f64 {
self.successful_queries + self.failed_queries
fn clear(&mut self) {
self.successful_queries.clear();
self.failed_queries.clear();
self.penalties.clear();
}
}

Expand All @@ -58,9 +77,6 @@ impl Reputation {
}

pub fn penalize(&mut self, weight: u8) {
// Scale weight to a shift amount in range [0, 63].
let shamt = (weight / 4).max(1) - 1;
let weight = 1u64 << shamt;
self.penalties += weight as f64;
self.penalties += 1.1_f64.powf(weight as f64);
}
}
2 changes: 1 addition & 1 deletion src/indexer_selection/selection_factors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ impl SelectionFactors {
};
lock.allocations.release(receipt, status);
if error.is_timeout() {
lock.reputation.current_mut().penalize(30);
lock.reputation.current_mut().penalize(50);
}
}

Expand Down

0 comments on commit 82502d6

Please sign in to comment.