diff --git a/Cargo.toml b/Cargo.toml index 6837774b..487e9a5d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,6 +26,7 @@ lazy_static = "1.4" num-traits = "0.2" openssl = "0.10" ordered-float = "2.5" +parking_lot = "0.12.0" postgres-openssl = "0.5" primitive-types = "0.8" prometheus = "0.13" diff --git a/src/indexer_selection/decay.rs b/src/indexer_selection/decay.rs index 764bba47..ca4da6a2 100644 --- a/src/indexer_selection/decay.rs +++ b/src/indexer_selection/decay.rs @@ -1,4 +1,37 @@ use crate::indexer_selection::utility::*; +use std::time::Duration; + +// This could have been done more automatically by using a proc-macro, but this is simpler. +macro_rules! impl_struct_decay { + ($name:ty {$($field:ident),*}) => { + impl Decay for $name { + fn shift(&mut self, mut next: Option<&mut Self>, fraction: f64, keep: f64) { + $( + self.$field.shift( + next.as_deref_mut().map(|n| &mut n.$field), + fraction, + keep, + ); + )* + } + + fn clear(&mut self) { + // Doing a destructure ensures that we don't miss any fields, + // should they be added in the future. I tried it and the compiler + // even gives you a nice error message... + // + // missing structure fields: + // -{name} + let Self { $($field),* } = self; + + $( + $field.clear(); + )* + } + } + }; +} +pub(crate) use impl_struct_decay; pub trait Decay { fn shift(&mut self, next: Option<&mut Self>, fraction: f64, keep: f64); @@ -11,14 +44,28 @@ pub trait DecayUtility { } /// The DecayBuffer accounts for selection factors over various time-frames. Currently, these time -/// frames are 7 consecutive powers of 4 minute intervals, i.e. [1m, 4m, 16m, ... 4096m]. This -/// assumes that `decay` is called once every minute. -#[derive(Default)] -pub struct DecayBuffer { - frames: [T; 7], +/// frames are LEN consecutive powers of 4 intervals, i.e. [1m, 4m, 16m, ... 4096m] if LEN is 7 and +/// `decay` is called once every minute. +#[derive(Debug)] +pub struct DecayBufferUnconfigured { + frames: [T; LEN], } -impl DecayBuffer { +impl Default for DecayBufferUnconfigured +where + [T; L]: Default, +{ + fn default() -> Self { + Self { + frames: Default::default(), + } + } +} + +pub type DecayBuffer = DecayBufferUnconfigured; +pub type FastDecayBuffer = DecayBufferUnconfigured; + +impl DecayBufferUnconfigured { pub fn expected_utility(&self, utility_parameters: UtilityParameters) -> SelectionFactor { let mut aggregator = UtilityAggregator::new(); for (i, frame) in self.frames.iter().enumerate() { @@ -41,13 +88,26 @@ impl DecayBuffer { } } -impl DecayBuffer { +impl DecayBufferUnconfigured +where + Self: Default, +{ + pub fn new() -> Self { + Default::default() + } +} + +impl DecayBufferUnconfigured { pub fn current_mut(&mut self) -> &mut T { &mut self.frames[0] } + + pub fn frames(&self) -> &[T] { + &self.frames + } } -impl DecayBuffer { +impl DecayBufferUnconfigured { /* The idea here is to pretend that we have a whole bunch of buckets of the minimum window size (1 min each) A whole 8191 of them! Queries contribute to the bucket they belong, and each time we decay each bucket shifts down @@ -56,18 +116,20 @@ impl DecayBuffer { The above would be bad for performance, so we achieve almost the same result by creating "frames" of increasing size, each of which can hold many buckets. When we decay we do the same operation as above, assuming that each bucket within a frame holds the average value of all buckets within that frame. This results in some "smearing" - of the data, but reduces the size of our array from 8191 to just 7 at the expense of slight complexity and loss of accuracy. + of the data, but reduces the size of our array from 8191 to just 7 (assuming a LEN of 7) at the expense of + slight complexity and loss of accuracy. */ pub fn decay(&mut self) { - for i in (0..7).rev() { + for i in (0..L).rev() { // Select buckets [i], [i+] let (l, r) = self.frames.split_at_mut(i + 1); let (this, next) = (l.last_mut().unwrap(), r.get_mut(0)); - // Decay rate of 0.5% per smallest bucket resolution - // That is, each time we shift decay 0.5% per non-aggregated bucket + // Decay rate of 0.1% per smallest bucket resolution per point. + // That is, each time we shift decay 0.1% * points per non-aggregated bucket // and aggregate the results into frames. - let retain = 0.995_f64.powf(4_f64.powf(i as f64)); + let retain = 1.0 - ((D as f64) * 0.001f64); + let retain = retain.powf(4_f64.powf(i as f64)); // Shift one bucket, aggregating the results. this.shift(next, 0.25_f64.powf(i as f64), retain); @@ -92,6 +154,21 @@ impl Decay for f64 { } } +impl Decay for Duration { + fn shift(&mut self, next: Option<&mut Self>, fraction: f64, keep: f64) { + let secs = self.as_secs_f64(); + let take = secs * fraction; + *self = Duration::from_secs_f64(secs - take); + if let Some(next) = next { + *next += Duration::from_secs_f64(take * keep); + } + } + + fn clear(&mut self) { + *self = Duration::ZERO; + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/indexer_selection/mod.rs b/src/indexer_selection/mod.rs index de69ff39..35daa5e0 100644 --- a/src/indexer_selection/mod.rs +++ b/src/indexer_selection/mod.rs @@ -1,7 +1,7 @@ mod allocations; mod block_requirements; mod data_freshness; -mod decay; +pub mod decay; mod economic_security; mod indexers; mod performance; @@ -180,7 +180,7 @@ pub struct InputWriters { } pub struct Indexers { - network_params: NetworkParameters, + pub network_params: NetworkParameters, indexers: SharedLookup, indexings: SharedLookup, special_indexers: Eventual>>, @@ -301,8 +301,6 @@ impl Indexers { freshness_requirements(&mut context.operations, block_resolver).await } - // TODO: Specify budget in terms of a cost model - - // the budget should be different per query pub async fn select_indexer( &self, config: &UtilityConfig, @@ -312,13 +310,8 @@ impl Indexers { context: &mut Context<'_>, block_resolver: &BlockResolver, freshness_requirements: &BlockRequirements, - budget: USD, + budget: GRT, ) -> Result, SelectionError> { - let budget: GRT = self - .network_params - .usd_to_grt(budget) - .ok_or(SelectionError::MissingNetworkParams)?; - let selection = match self .make_selection( config, diff --git a/src/indexer_selection/price_efficiency.rs b/src/indexer_selection/price_efficiency.rs index fe37d3cf..c0b4bd59 100644 --- a/src/indexer_selection/price_efficiency.rs +++ b/src/indexer_selection/price_efficiency.rs @@ -50,13 +50,51 @@ impl PriceEfficiency { } Ok(cost) => cost, }; - let fee: GRT = cost.to_string().parse::().unwrap().shift(); + let mut fee: GRT = cost.to_string().parse::().unwrap().shift(); // Anything overpriced is refused. if fee > *max_budget { return Err(BadIndexerReason::FeeTooHigh.into()); } + // Set the minimum fee to a value which is lower than the + // revenue maximizing price. This is sort of a hack because + // indexers can't be bothered to optimize their cost models. + // We avoid query fees going too low, but also make it so that + // indexers can maximize their revenue by raising their prices + // to reward those who are putting more effort in than "default => 0.0000001;" + // + // This is somewhat hard, so we'll make some assumptions: + // First assumption: queries are proportional to utility. This is wrong, + // but it would simplify our revenue function to queries * utility + // Given: + // s = (-1.0 + 5.0f64.sqrt()) / 2.0 + // x = queries + // w = weight + // p = normalized price + // u = utility = ((1.0 / (p + s)) - s) ^ w + // Then the derivative of x * u is: + // ((1 / (p + s) - s)^w - (w*p / (((p + s)^2) * ((1/(p+s)) - s)^(1.0-w)) + // And, given a w, we want to solve for p such that the derivative is 0. + // This gives us the lower bound for the revenue maximizing price. + // + // I think this approximation is of a lower bound is reasonable + // (better not to overestimate and hurt the indexer's revenue) + // Solving for the correct revenue-maximizing value is complex and recursive (since the revenue + // maximizing price depends on the utility of all other indexers which itself depends + // on their revenue maximizing price... ad infinitum) + // + // TODO: I didn't solve for this generally over w yet. But, I know that if w is 0.5, + // then the correct value is ~ 0.601. Since price utility weights + // are currently hardcoded, then the closed solution over w can go in on the next iteration. + let min_rate = GRT::try_from(0.6).unwrap(); + let min_optimal_fee = *max_budget * min_rate; + // If their fee is less than the min optimal, lerp between them so that + // indexers are rewarded for being closer. + if fee < min_optimal_fee { + fee = (min_optimal_fee + fee) * GRT::try_from(0.5).unwrap(); + } + // I went over a bunch of options and am not happy with any of them. // Also this is hard to summarize but I'll take a shot. // It may help before explaining what this code does to understand @@ -148,7 +186,12 @@ mod test { eventuals::idle().await; let mut context = Context::new(BASIC_QUERY, "").unwrap(); // Expected values based on https://www.desmos.com/calculator/kxd4kpjxi5 - let tests = [(0.01, 0.0), (0.02, 0.2763), (0.1, 0.7746), (1.0, 0.9742)]; + let tests = [ + (0.01, 0.0), + (0.02, 0.487960), + (0.1, 0.64419), + (1.0, 0.68216), + ]; for (budget, expected_utility) in tests { let (fee, utility) = efficiency .get_price( @@ -158,9 +201,10 @@ mod test { ) .await .unwrap(); + let utility = utility.utility.powf(utility.weight); println!("fee: {}, {:?}", fee, utility); - assert_eq!(fee, "0.01".parse::().unwrap()); - assert_within(utility.utility, expected_utility, 0.0001); + assert!(fee >= "0.01".parse::().unwrap()); + assert_within(utility, expected_utility, 0.0001); } } } diff --git a/src/indexer_selection/reputation.rs b/src/indexer_selection/reputation.rs index 32467ac3..60f47e93 100644 --- a/src/indexer_selection/reputation.rs +++ b/src/indexer_selection/reputation.rs @@ -1,17 +1,23 @@ -use crate::indexer_selection::decay::{Decay, DecayUtility}; +use crate::indexer_selection::decay::{impl_struct_decay, Decay, DecayUtility}; // TODO: Other factors like how long the indexer has been in the network. // Because reliability (which is what is captured here now) is useful on it's own, it may be useful // to separate this into it's own utility rather than figure out how to combine that with other // "reputation" factors which are much more subjective. -#[derive(Debug, Default, Clone)] +#[derive(Debug, Default, Clone, Copy)] pub struct Reputation { successful_queries: f64, failed_queries: f64, penalties: f64, } +impl_struct_decay!(Reputation { + successful_queries, + failed_queries, + penalties +}); + impl DecayUtility for Reputation { fn expected_utility(&self, _u_a: f64) -> f64 { // Need to add a free success so that no buckets have a utility of 0. @@ -41,32 +47,6 @@ impl DecayUtility for Reputation { } } -impl Decay for Reputation { - fn shift(&mut self, mut next: Option<&mut Self>, fraction: f64, keep: f64) { - self.successful_queries.shift( - next.as_deref_mut().map(|s| &mut s.successful_queries), - fraction, - keep, - ); - self.failed_queries.shift( - next.as_deref_mut().map(|s| &mut s.failed_queries), - fraction, - keep, - ); - self.penalties.shift( - next.as_deref_mut().map(|s| &mut s.penalties), - fraction, - keep, - ); - } - - fn clear(&mut self) { - self.successful_queries.clear(); - self.failed_queries.clear(); - self.penalties.clear(); - } -} - impl Reputation { pub fn add_successful_query(&mut self) { self.successful_queries += 1.0; diff --git a/src/indexer_selection/test_utils.rs b/src/indexer_selection/test_utils.rs index 0b9871c3..d5393fb8 100644 --- a/src/indexer_selection/test_utils.rs +++ b/src/indexer_selection/test_utils.rs @@ -14,6 +14,7 @@ pub fn default_cost_model(price: GRT) -> CostModelSource { } } +#[track_caller] pub fn assert_within(value: f64, expected: f64, tolerance: f64) { let diff = (value - expected).abs(); assert!( diff --git a/src/main.rs b/src/main.rs index f9fd1b1b..d0247cee 100644 --- a/src/main.rs +++ b/src/main.rs @@ -129,7 +129,11 @@ async fn main() { config: query_engine::Config { indexer_selection_retry_limit: opt.indexer_selection_retry_limit, utility: UtilityConfig::default(), - query_budget: opt.query_budget, + budget_factors: QueryBudgetFactors { + scale: opt.query_budget_scale, + discount: opt.query_budget_discount, + processes: (opt.replica_count * opt.location_count) as f64, + }, }, indexer_client: IndexerClient { client: http_client.clone(), diff --git a/src/opt.rs b/src/opt.rs index 54a21723..604bd839 100644 --- a/src/opt.rs +++ b/src/opt.rs @@ -76,11 +76,29 @@ pub struct Opt { )] pub block_cache_size: usize, #[structopt( - long = "--query-budget", - env = "QUERY_BUDGET", - default_value = "0.0005" + long = "--query-budget-scale", + env = "QUERY_BUDGET_SCALE", + default_value = "3.1" )] - pub query_budget: USD, + pub query_budget_scale: f64, + #[structopt( + long = "--query-budget-discount", + env = "QUERY_BUDGET_DISCOUNT", + default_value = "0.595" + )] + pub query_budget_discount: f64, + #[structopt( + help = "The number of processes per Gateway location. This is used when approximating worldwide query volume.", + long = "--replica-count", + env = "REPLICA_COUNT" + )] + pub replica_count: u64, + #[structopt( + help = "The number of geographic Gateway locations. This is used when approximating worldwide query volume.", + long = "--location-count", + env = "LOCATION_COUNT" + )] + pub location_count: u64, #[structopt(long = "--port", env = "PORT", default_value = "6700")] pub port: u16, #[structopt(long = "--metrics-port", env = "METRICS_PORT", default_value = "7300")] diff --git a/src/query_engine/clock.rs b/src/query_engine/clock.rs new file mode 100644 index 00000000..ebd9acd0 --- /dev/null +++ b/src/query_engine/clock.rs @@ -0,0 +1,44 @@ +use std::time::Instant; +#[cfg(test)] +use std::{cell::RefCell, rc::Rc, time::Duration}; + +pub trait Clock { + fn now(&self) -> Instant; +} + +#[derive(Debug)] +pub struct SystemClock; + +impl Clock for SystemClock { + #[inline(always)] + fn now(&self) -> Instant { + Instant::now() + } +} + +#[cfg(test)] +#[derive(Clone, Debug)] +pub struct MockClock { + current_time: Rc>, +} + +#[cfg(test)] +impl MockClock { + pub fn new() -> Self { + Self { + current_time: Rc::new(RefCell::new(Instant::now())), + } + } + + pub fn advance_time(&mut self, amount: Duration) { + let mut l = self.current_time.borrow_mut(); + *l += amount; + } +} + +#[cfg(test)] +impl Clock for MockClock { + fn now(&self) -> Instant { + self.current_time.borrow().clone() + } +} diff --git a/src/query_engine/mod.rs b/src/query_engine/mod.rs index c4d08580..aae4109f 100644 --- a/src/query_engine/mod.rs +++ b/src/query_engine/mod.rs @@ -1,3 +1,5 @@ +mod clock; +mod price_automation; #[cfg(test)] mod tests; @@ -17,6 +19,7 @@ pub use crate::{ }; pub use graphql_client::Response; use lazy_static::lazy_static; +pub use price_automation::{QueryBudgetFactors, VolumeEstimator}; use prometheus; use serde_json::value::RawValue; use std::{ @@ -27,6 +30,7 @@ use std::{ Arc, }, }; +use tokio::sync::Mutex; use uuid::Uuid; #[derive(Debug)] @@ -104,6 +108,7 @@ pub struct APIKey { pub deployments: Vec, pub subgraphs: Vec<(String, i32)>, pub domains: Vec<(String, i32)>, + pub usage: Arc>, } #[derive(Debug)] @@ -139,7 +144,7 @@ impl From for QueryEngineError { pub struct Config { pub indexer_selection_retry_limit: usize, pub utility: UtilityConfig, - pub query_budget: GRT, + pub budget_factors: QueryBudgetFactors, } #[derive(Clone)] @@ -274,6 +279,22 @@ where _ => (), }; + let query_count = context.operations.len().max(1) as u64; + let budget = query + .api_key + .as_ref() + .unwrap() + .usage + .lock() + .await + .budget_for_queries(query_count, &self.config.budget_factors); + + let budget: GRT = self + .indexers + .network_params + .usd_to_grt(USD::try_from(budget).unwrap()) + .ok_or(SelectionError::MissingNetworkParams)?; + for retry_count in 0..self.config.indexer_selection_retry_limit { let selection_timer = with_metric( &METRICS.indexer_selection_duration, @@ -308,7 +329,7 @@ where &mut context, &block_resolver, &freshness_requirements, - self.config.query_budget, + budget, ) .await; selection_timer.map(|t| t.observe_duration()); diff --git a/src/query_engine/price_automation.rs b/src/query_engine/price_automation.rs new file mode 100644 index 00000000..5cf0b3f3 --- /dev/null +++ b/src/query_engine/price_automation.rs @@ -0,0 +1,219 @@ +// This module uses "automated volume discounting" which gives each API key user a different +// query budget based on their estimated global query volume. The generalized equation for the +// price is: +// +// scale / ((volume + offset) ^ discount) +// +// Where: +// * volume: The estimated count of queries over a 30 day period +// * offset: A query volume increase so that the differences are less extreme toward 0 +// * discount: How much discount to apply. As this approaches 0.0, all queries are priced +// the same regardless of volume. As this approaches 1.0, all API keys will pay the same +// amount regardless of volume. +// * scale: Moves the price up or down linearly. +// +// The magic values chosen were based off of 30 days hosted service volume taken on Feb 17, 2022 +// then tweaking until it looked like a fair distribution. + +use crate::indexer_selection::decay::*; +use std::time::{Duration, Instant}; + +use super::clock::*; + +#[derive(Clone)] +pub struct QueryBudgetFactors { + pub scale: f64, + pub discount: f64, + pub processes: f64, +} + +fn budget(volume: f64, factors: &QueryBudgetFactors) -> f64 { + const OFFSET: f64 = 500.0; + factors.scale / (volume * factors.processes + OFFSET).powf(factors.discount) +} + +// For each bucket: +// Know the time-in-bucket +// Know the number of queries +// Move time and queries over to new buckets +#[derive(Default, Debug)] +struct QueryVolume { + time_elapsed: Duration, + num_queries: f64, +} + +impl_struct_decay!(QueryVolume { + time_elapsed, + num_queries +}); + +#[derive(Debug)] +pub struct VolumeEstimator { + history: FastDecayBuffer, + last_time: Instant, + clock: C, +} + +impl Default for VolumeEstimator { + fn default() -> Self { + Self::new(SystemClock) + } +} + +impl VolumeEstimator +where + C: Clock, +{ + pub fn new(clock: C) -> Self { + Self { + last_time: clock.now(), + history: FastDecayBuffer::new(), + clock, + } + } + + // This must be called on a regular interval. The unit tests are assuming + // 2 minutes. + pub fn decay(&mut self) { + let next = self.clock.now(); + let prev = self.last_time; + self.last_time = next; + self.history.current_mut().time_elapsed += next - prev; + self.history.decay(); + } + + // Adds the queries and gives a budget for them. + pub fn budget_for_queries(&mut self, count: u64, factors: &QueryBudgetFactors) -> f64 { + let count = count as f64; + self.history.current_mut().num_queries += count; + budget(self.monthly_volume_estimate(), factors) * count + } + + fn monthly_volume_estimate(&self) -> f64 { + let mut elapsed_time = self.clock.now() - self.last_time; + let mut queries = 0.0; + for frame in self.history.frames() { + elapsed_time += frame.time_elapsed; + queries += frame.num_queries; + } + + // Scale to 30 days + let scale = 60.0 * 60.0 * 24.0 * 30.0; + let elapsed_time = elapsed_time.as_secs_f64(); + + queries * scale / elapsed_time + } +} + +#[cfg(test)] +mod tests { + use super::*; + + const FACTORS: QueryBudgetFactors = QueryBudgetFactors { + scale: 3.1, + discount: 0.6, + processes: 1.0, + }; + + fn budget(volume: f64) -> f64 { + super::budget(volume, &FACTORS) + } + + #[track_caller] + fn assert_approx(expected: f64, actual: f64, within: f64) { + assert!((actual - expected).abs() <= within); + } + + #[test] + fn stable_volume() { + let mut clock = MockClock::new(); + let mut estimate = VolumeEstimator::new(clock.clone()); + + let factors = QueryBudgetFactors { + processes: 10.0, + ..FACTORS + }; + + // Over a long period, do 2 queries per second and verify + // that the 30 day estimate is 5184000 across multiple delays + const COUNT: f64 = 2.0 * 21600.0 * 120.0; + for _ in 0..50 { + for _ in 0..120 { + clock.advance_time(Duration::from_secs(1)); + // Very precise, correct within < 1 query. + assert_approx( + COUNT * super::budget(COUNT, &factors), + COUNT * estimate.budget_for_queries(2, &factors) / 2.0, + super::budget(COUNT, &factors), + ); + } + estimate.decay(); + } + } + + #[test] + fn sine_volume() { + let mut clock = MockClock::new(); + let mut estimate = VolumeEstimator::new(clock.clone()); + + // Show that a stable oscillating traffic has low variance + // when looking at the estimate. + let mut elapsed = 0.0f64; + for _ in 0..100 { + for _ in 0..1000 { + for _ in 0..120 { + elapsed += 1.0; + clock.advance_time(Duration::from_secs(1)); + // sin is -1 .. 1, so the range here is 100.0 .. 200.0 + let queries = ((elapsed / 1000.0).sin() + 3.0) * 50.0; + estimate.budget_for_queries(queries as u64, &FACTORS); + } + estimate.decay(); + } + let daily_estimate = estimate.monthly_volume_estimate() / 30.0; + // The center of the range is 12,960,000. + // The QPS oscillates at +- 33% + // But, the estimate is within 2% on each iteration, + // and is sometimes much closer. Of course, that means the + // total error is less than 2% as well. + assert_approx(12960000.0, daily_estimate, 250000.0); + } + } + + #[test] + fn volume_increase() { + let mut clock = MockClock::new(); + let mut estimate = VolumeEstimator::new(clock.clone()); + + // Over a month, do 1 queries per minute. This is "testing" + for _ in 0..21600 { + clock.advance_time(Duration::from_secs(120)); + estimate.budget_for_queries(2, &FACTORS); + estimate.decay(); + } + + // Now in "prod", do 20 queries per second. An increase of 1200x + let frames = 30u64 * 24 * 30; // 30 days, 24 hours per day, 30 2 minute intervals per hour. + let per_frame = 2400u64; // 2400 queries in 2 minutes is 20 per second + let mut spend = 0.0; + + for _ in 0..frames { + for _ in 0..per_frame { + clock.advance_time(Duration::from_secs_f64(0.05)); + spend += estimate.budget_for_queries(1, &FACTORS); + } + estimate.decay(); + } + + let queries = (frames * per_frame) as f64; + // If we knew a-priori what the volume would be, we may have set this budget. + let should_spend = budget(queries) * queries; + + // Show that over 30 days this large increase of query volume was priced + // more or less appropriately (within 3%). By the 8th day, the + // budget has shifted from 0.0050870 to 0.0000729 (70x decrease), where it + // remains stable. So, on the next month the billing would be perfect. + assert!(spend > should_spend); + assert!(spend < (should_spend * 1.03)); + } +} diff --git a/src/query_engine/tests.rs b/src/query_engine/tests.rs index d658e129..d10adaf2 100644 --- a/src/query_engine/tests.rs +++ b/src/query_engine/tests.rs @@ -670,7 +670,11 @@ async fn test() { Config { indexer_selection_retry_limit: 3, utility: UtilityConfig::default(), - query_budget: 1u64.try_into().unwrap(), + budget_factors: QueryBudgetFactors { + scale: 1.0, + discount: 0.0, + processes: 1.0, + }, }, TopologyIndexer { topology: topology.clone(), diff --git a/src/sync_client.rs b/src/sync_client.rs index 82f3807e..32028f88 100644 --- a/src/sync_client.rs +++ b/src/sync_client.rs @@ -5,7 +5,7 @@ use crate::{ IndexingStatus, SecretKey, SelectionFactors, }, prelude::{shared_lookup::SharedLookupWriter, *}, - query_engine::{APIKey, InputWriters}, + query_engine::{APIKey, InputWriters, VolumeEstimator}, }; use eventuals::EventualExt as _; use graphql_client::{GraphQLQuery, Response}; @@ -44,15 +44,16 @@ pub fn create( } = inputs; let indexings = Arc::new(Mutex::new(indexings)); - create_sync_client::( + let mut api_key_usage = VolumeEstimations::new(); + create_sync_client::( agent_url.clone(), poll_interval, api_keys::OPERATION_NAME, api_keys::QUERY, - parse_api_keys, + move |v| parse_api_keys(v, &mut api_key_usage), api_keys, ); - create_sync_client::( + create_sync_client::( agent_url.clone(), poll_interval, conversion_rates::OPERATION_NAME, @@ -60,7 +61,7 @@ pub fn create( parse_conversion_rates, usd_to_grt_conversion, ); - create_sync_client::( + create_sync_client::( agent_url.clone(), poll_interval, network_parameters::OPERATION_NAME, @@ -78,7 +79,7 @@ pub fn create( parse_cost_models, ), ); - create_sync_client::( + create_sync_client::( agent_url.clone(), poll_interval, current_deployments::OPERATION_NAME, @@ -135,7 +136,7 @@ where Q::ResponseData: 'static, { let (writer, reader) = Eventual::new(); - create_sync_client::( + create_sync_client::( agent_url, poll_interval, operation, @@ -146,14 +147,15 @@ where reader } -fn create_sync_client( +fn create_sync_client( agent_url: String, poll_interval: Duration, operation: &'static str, query: &'static str, - parse_data: fn(Q::ResponseData) -> Option, + mut parse_data: F, mut writer: EventualWriter, ) where + F: 'static + FnMut(Q::ResponseData) -> Option + Send, T: 'static + Clone + Eq + Send, Q: GraphQLQuery, Q::ResponseData: 'static, @@ -165,11 +167,11 @@ fn create_sync_client( loop { let _timer = with_metric(&METRICS.queries.duration, &[operation], |h| h.start_timer()); - let result = execute_query::( + let result = execute_query::( &agent_url, operation, query, - parse_data, + &mut parse_data, &client, &mut last_update_id, ) @@ -191,15 +193,16 @@ fn create_sync_client( ); } -async fn execute_query<'f, Q, T>( +async fn execute_query<'f, Q, T, F>( agent_url: &'f str, operation: &'static str, query: &'static str, - parse_data: fn(Q::ResponseData) -> Option, + parse_data: &'f mut F, client: &'f reqwest::Client, last_update_id: &'f mut String, ) -> Option where + F: 'static + FnMut(Q::ResponseData) -> Option, T: 'static + Clone + Eq + Send, Q: GraphQLQuery, Q::ResponseData: 'static, @@ -275,7 +278,71 @@ where )] struct APIKeys; -fn parse_api_keys(data: api_keys::ResponseData) -> Option>>> { +struct VolumeEstimations { + by_api_key: HashMap>>, + decay_list: Arc>>>>>, +} + +impl Drop for VolumeEstimations { + fn drop(&mut self) { + *self.decay_list.lock() = None; + } +} + +impl VolumeEstimations { + pub fn new() -> Self { + let decay_list = Arc::new(parking_lot::Mutex::new(Some(Vec::new()))); + let result = Self { + by_api_key: HashMap::new(), + decay_list: decay_list.clone(), + }; + + // Every 2 minutes, call decay on every VolumeEstimator in our collection. + // This task will finish when the VolumeEstimations is dropped, because + // drop sets the decay_list to None which breaks the loop. + tokio::spawn(async move { + loop { + let start = Instant::now(); + let len = if let Some(decay_list) = decay_list.lock().as_deref() { + decay_list.len() + } else { + return; + }; + for i in 0..len { + let item = if let Some(decay_list) = decay_list.lock().as_deref() { + decay_list[i].clone() + } else { + return; + }; + item.lock().await.decay(); + } + let next = start + Duration::from_secs(120); + tokio::time::sleep_until(next).await; + } + }); + result + } + pub fn get(&mut self, key: &str) -> Arc> { + match self.by_api_key.get(key) { + Some(exist) => exist.clone(), + None => { + let result = Arc::new(Mutex::new(VolumeEstimator::default())); + self.by_api_key.insert(key.to_owned(), result.clone()); + self.decay_list + .lock() + .as_mut() + .unwrap() + .push(result.clone()); + result + } + } + } +} + +fn parse_api_keys( + data: api_keys::ResponseData, + usage: &mut VolumeEstimations, +) -> Option>>> { match data { api_keys::ResponseData { data: Some(api_keys::ApiKeysData { value, .. }), @@ -284,6 +351,7 @@ fn parse_api_keys(data: api_keys::ResponseData) -> Option