diff --git a/src/indexer_selection/decay.rs b/src/indexer_selection/decay.rs index 06a49c74..f2beae85 100644 --- a/src/indexer_selection/decay.rs +++ b/src/indexer_selection/decay.rs @@ -1,60 +1,97 @@ use crate::indexer_selection::utility::{concave_utility, SelectionFactor, UtilityAggregator}; pub trait Decay { - fn expected_utility(&self, u_a: f64) -> f64; - fn shift(&mut self, next: &Self, fraction: f64); + fn shift(&mut self, next: Option<&mut Self>, fraction: f64, keep: f64); fn clear(&mut self); +} + +pub trait DecayUtility { fn count(&self) -> f64; + fn expected_utility(&self, u_a: f64) -> f64; } /// The DecayBuffer accounts for selection factors over various time-frames. Currently, these time /// frames are 7 consecutive powers of 4 minute intervals, i.e. [1m, 4m, 16m, ... 4096m]. This /// assumes that `decay` is called once every minute. #[derive(Default)] -pub struct DecayBuffer { +pub struct DecayBuffer { frames: [T; 7], - decay_ticks: u64, } -impl DecayBuffer { - pub fn current_mut(&mut self) -> &mut T { - &mut self.frames[0] - } - +impl DecayBuffer { pub fn expected_utility(&self, u_a: f64) -> SelectionFactor { - // TODO: This weight seems to have no appreciable effect - const FRAME_INDEX_WIEGTH: f64 = 0.5; let mut aggregator = UtilityAggregator::new(); for (i, frame) in self.frames.iter().enumerate() { - let index_weight = FRAME_INDEX_WIEGTH * (i + 1) as f64; + // 1/10 query per minute = 85% confident. + let confidence = concave_utility(frame.count() * 10.0 / 4.0_f64.powf(i as f64), 0.19); + + // Buckets decrease in relevance rapidly, making + // the most recent buckets contribute the most to the + // final result. + let importance = 1.0 / (1.0 + (i as f64)); + aggregator.add(SelectionFactor { utility: frame.expected_utility(u_a), - weight: concave_utility(index_weight * frame.count(), u_a), + weight: confidence * importance, }); } + let agg_utility = aggregator.crunch(); SelectionFactor { utility: agg_utility, weight: 1.0, } } +} +impl DecayBuffer { + pub fn current_mut(&mut self) -> &mut T { + &mut self.frames[0] + } +} + +impl DecayBuffer { + /* + The idea here is to pretend that we have a whole bunch of buckets of the minimum window size (1 min each) + A whole 8191 of them! Queries contribute to the bucket they belong, and each time we decay each bucket shifts down + and retains 99.5% of the information in the bucket. + + The above would be bad for performance, so we achieve almost the same result by creating "frames" + of increasing size, each of which can hold many buckets. When we decay we do the same operation as above, + assuming that each bucket within a frame holds the average value of all buckets within that frame. This results in some "smearing" + of the data, but reduces the size of our array from 8191 to just 7 at the expense of slight complexity and loss of accuracy. + */ pub fn decay(&mut self) { - // For each frame `frame[i]`, - // when `decay_ticks` is divisible by `pow(4, i-1)`, - // reduce the value of `frame[i]` by 1/4 and add the value of `frame[i-1]` to `frame[i]`. - self.decay_ticks += 1; - for i in (1..7).rev() { - let next_frame_ticks = 1 << ((i - 1) * 2); - if (self.decay_ticks % next_frame_ticks) != 0 { - continue; - } - let (l, r) = self.frames.split_at_mut(i); - let (prev, this) = (&l.last().unwrap(), &mut r[0]); - this.shift(prev, 0.25); + for i in (0..7).rev() { + // Select buckets [i], [i+] + let (l, r) = self.frames.split_at_mut(i + 1); + let (this, next) = (l.last_mut().unwrap(), r.get_mut(0)); + + // Decay rate of 0.5% per smallest bucket resolution + // That is, each time we shift decay 0.5% per non-aggregated bucket + // and aggregate the results into frames. + let retain = 0.995_f64.powf(4_f64.powf(i as f64)); + + // Shift one bucket, aggregating the results. + this.shift(next, 0.25_f64.powf(i as f64), retain); } - // Clear the current frame. - self.frames[0].clear(); + } +} + +impl Decay for f64 { + fn shift(&mut self, next: Option<&mut Self>, fraction: f64, retain: f64) { + // Remove some amount of value from this frame + let take = *self * fraction; + *self -= take; + if let Some(next) = next { + // And add that value to the next frame, destroying some of the value + // as we go to forget over time. + *next += take * retain; + } + } + + fn clear(&mut self) { + *self = 0.0; } } @@ -131,6 +168,7 @@ mod tests { } #[test] + #[ignore = "Writes output to disk"] fn reputation_response() { let config = ResponseConfig { title: "reputation-outage-response", @@ -179,6 +217,7 @@ mod tests { } #[test] + #[ignore = "Writes output to disk"] fn penalty_response() { let config = ResponseConfig { title: "reputation-penalty-response", diff --git a/src/indexer_selection/performance.rs b/src/indexer_selection/performance.rs index 417ca4ff..db0bd392 100644 --- a/src/indexer_selection/performance.rs +++ b/src/indexer_selection/performance.rs @@ -27,6 +27,8 @@ use crate::{ }; use ordered_float::NotNan; +use super::decay::DecayUtility; + #[derive(Clone, Debug, Default)] pub struct Performance { performance: Vec, @@ -34,6 +36,22 @@ pub struct Performance { } impl Decay for Performance { + fn shift(&mut self, mut next: Option<&mut Self>, fraction: f64, keep: f64) { + // For each quantized bucket, find the corresponding quantized bucket in + // the next frame, and shift information into it. + for (count, performance) in self.count.iter_mut().zip(self.performance.iter().cloned()) { + let next_performance = next.as_deref_mut().map(|n| n.bucket_mut(performance)); + count.shift(next_performance, fraction, keep); + } + } + + fn clear(&mut self) { + self.performance.clear(); + self.count.clear(); + } +} + +impl DecayUtility for Performance { fn expected_utility(&self, u_a: f64) -> f64 { let mut agg_count = 0.0; let mut agg_utility = 0.0; @@ -46,21 +64,6 @@ impl Decay for Performance { } agg_utility / agg_count } - - fn shift(&mut self, next: &Self, fraction: f64) { - for count in &mut self.count { - *count = *count * fraction; - } - for (count, performance) in next.iter() { - self.add_successful_queries_inner(performance, count); - } - } - - fn clear(&mut self) { - self.performance.clear(); - self.count.clear(); - } - fn count(&self) -> f64 { self.count.iter().sum() } @@ -101,23 +104,25 @@ impl Performance { } pub fn add_successful_query(&mut self, duration: Duration) { - self.add_successful_queries_inner(Self::quantized_performance(duration), 1.0); + *self.bucket_mut(Self::quantized_performance(duration)) += 1.0; } - fn add_successful_queries_inner(&mut self, quantized_performance: f64, count: f64) { + fn bucket_mut(&mut self, quantized_performance: f64) -> &mut f64 { // Safety: Performance is NotNan. See also 47632ed6-4dcc-4b39-b064-c0ca01560626 - match unsafe { + let index = match unsafe { self.performance .binary_search_by_key(&NotNan::new_unchecked(quantized_performance), |a| { NotNan::new_unchecked(*a) }) } { - Ok(index) => self.count[index] += count, + Ok(index) => index, Err(index) => { self.performance.insert(index, quantized_performance); - self.count.insert(index, count) + self.count.insert(index, 0.0); + index } - } + }; + &mut self.count[index] } fn iter<'a>(&'a self) -> impl 'a + Iterator { diff --git a/src/indexer_selection/reputation.rs b/src/indexer_selection/reputation.rs index 7180e0a0..32467ac3 100644 --- a/src/indexer_selection/reputation.rs +++ b/src/indexer_selection/reputation.rs @@ -1,4 +1,4 @@ -use crate::indexer_selection::decay::Decay; +use crate::indexer_selection::decay::{Decay, DecayUtility}; // TODO: Other factors like how long the indexer has been in the network. // Because reliability (which is what is captured here now) is useful on it's own, it may be useful @@ -12,39 +12,58 @@ pub struct Reputation { penalties: f64, } -impl Decay for Reputation { +impl DecayUtility for Reputation { fn expected_utility(&self, _u_a: f64) -> f64 { - let total_queries = self.successful_queries + self.failed_queries; - if total_queries == 0.0 { - return 0.0; - } + // Need to add a free success so that no buckets have a utility of 0. + // Even with a non-1 weight a utility of 0 ends up bringing the result + // to 0 which we can't afford. + let successful_queries = self.successful_queries + 0.5; + let total_queries = successful_queries + self.failed_queries; + // Use the ratio directly as utility, rather than passing it through concave_utility. This // is because the likelihood a query will complete is a pretty straightforward conversion to // utility. Eg: If we send 3 queries to each indexer A and B, and A returns 1 success, and B // returns 3 successes - for some fixed value of a query the utility is number of returned // queries * value of query. - let ratio = self.successful_queries / total_queries; + let mut ratio = successful_queries / total_queries; + + // We want a non-linear relationship between success rate and utility. + // In a world which looks at count of nines, it's not acceptable to send + // a 50% success indexer 50% queries. + ratio *= ratio; + let penalty = ((self.penalties / total_queries) + 1.0).recip(); ratio * penalty } - fn shift(&mut self, next: &Self, fraction: f64) { - self.successful_queries *= fraction; - self.successful_queries += next.successful_queries; - self.failed_queries *= fraction; - self.failed_queries += next.failed_queries; - self.penalties *= fraction; - self.penalties += next.penalties; + fn count(&self) -> f64 { + self.successful_queries + self.failed_queries } +} - fn clear(&mut self) { - self.successful_queries = 0.0; - self.failed_queries = 0.0; - self.penalties = 0.0; +impl Decay for Reputation { + fn shift(&mut self, mut next: Option<&mut Self>, fraction: f64, keep: f64) { + self.successful_queries.shift( + next.as_deref_mut().map(|s| &mut s.successful_queries), + fraction, + keep, + ); + self.failed_queries.shift( + next.as_deref_mut().map(|s| &mut s.failed_queries), + fraction, + keep, + ); + self.penalties.shift( + next.as_deref_mut().map(|s| &mut s.penalties), + fraction, + keep, + ); } - fn count(&self) -> f64 { - self.successful_queries + self.failed_queries + fn clear(&mut self) { + self.successful_queries.clear(); + self.failed_queries.clear(); + self.penalties.clear(); } } @@ -58,9 +77,6 @@ impl Reputation { } pub fn penalize(&mut self, weight: u8) { - // Scale weight to a shift amount in range [0, 63]. - let shamt = (weight / 4).max(1) - 1; - let weight = 1u64 << shamt; - self.penalties += weight as f64; + self.penalties += 1.1_f64.powf(weight as f64); } } diff --git a/src/indexer_selection/selection_factors.rs b/src/indexer_selection/selection_factors.rs index ee53bf75..1cc02b03 100644 --- a/src/indexer_selection/selection_factors.rs +++ b/src/indexer_selection/selection_factors.rs @@ -102,7 +102,7 @@ impl SelectionFactors { }; lock.allocations.release(receipt, status); if error.is_timeout() { - lock.reputation.current_mut().penalize(30); + lock.reputation.current_mut().penalize(50); } }