Skip to content

Commit

Permalink
Price: Automated volume discounting and automated efficient markets (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
That3Percent authored Feb 28, 2022
1 parent fbc4bb2 commit 2d3e554
Show file tree
Hide file tree
Showing 13 changed files with 551 additions and 77 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ lazy_static = "1.4"
num-traits = "0.2"
openssl = "0.10"
ordered-float = "2.5"
parking_lot = "0.12.0"
postgres-openssl = "0.5"
primitive-types = "0.8"
prometheus = "0.13"
Expand Down
103 changes: 90 additions & 13 deletions src/indexer_selection/decay.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,37 @@
use crate::indexer_selection::utility::*;
use std::time::Duration;

// This could have been done more automatically by using a proc-macro, but this is simpler.
macro_rules! impl_struct_decay {
($name:ty {$($field:ident),*}) => {
impl Decay for $name {
fn shift(&mut self, mut next: Option<&mut Self>, fraction: f64, keep: f64) {
$(
self.$field.shift(
next.as_deref_mut().map(|n| &mut n.$field),
fraction,
keep,
);
)*
}

fn clear(&mut self) {
// Doing a destructure ensures that we don't miss any fields,
// should they be added in the future. I tried it and the compiler
// even gives you a nice error message...
//
// missing structure fields:
// -{name}
let Self { $($field),* } = self;

$(
$field.clear();
)*
}
}
};
}
pub(crate) use impl_struct_decay;

pub trait Decay {
fn shift(&mut self, next: Option<&mut Self>, fraction: f64, keep: f64);
Expand All @@ -11,14 +44,28 @@ pub trait DecayUtility {
}

/// The DecayBuffer accounts for selection factors over various time-frames. Currently, these time
/// frames are 7 consecutive powers of 4 minute intervals, i.e. [1m, 4m, 16m, ... 4096m]. This
/// assumes that `decay` is called once every minute.
#[derive(Default)]
pub struct DecayBuffer<T> {
frames: [T; 7],
/// frames are LEN consecutive powers of 4 intervals, i.e. [1m, 4m, 16m, ... 4096m] if LEN is 7 and
/// `decay` is called once every minute.
#[derive(Debug)]
pub struct DecayBufferUnconfigured<T, const LOSS_POINTS: u16, const LEN: usize> {
frames: [T; LEN],
}

impl<T: DecayUtility> DecayBuffer<T> {
impl<T, const D: u16, const L: usize> Default for DecayBufferUnconfigured<T, D, L>
where
[T; L]: Default,
{
fn default() -> Self {
Self {
frames: Default::default(),
}
}
}

pub type DecayBuffer<T> = DecayBufferUnconfigured<T, 5, 7>;
pub type FastDecayBuffer<T> = DecayBufferUnconfigured<T, 10, 6>;

impl<T: DecayUtility, const D: u16, const L: usize> DecayBufferUnconfigured<T, D, L> {
pub fn expected_utility(&self, utility_parameters: UtilityParameters) -> SelectionFactor {
let mut aggregator = UtilityAggregator::new();
for (i, frame) in self.frames.iter().enumerate() {
Expand All @@ -41,13 +88,26 @@ impl<T: DecayUtility> DecayBuffer<T> {
}
}

impl<T> DecayBuffer<T> {
impl<T, const D: u16, const L: usize> DecayBufferUnconfigured<T, D, L>
where
Self: Default,
{
pub fn new() -> Self {
Default::default()
}
}

impl<T, const D: u16, const L: usize> DecayBufferUnconfigured<T, D, L> {
pub fn current_mut(&mut self) -> &mut T {
&mut self.frames[0]
}

pub fn frames(&self) -> &[T] {
&self.frames
}
}

impl<T: Decay> DecayBuffer<T> {
impl<T: Decay, const D: u16, const L: usize> DecayBufferUnconfigured<T, D, L> {
/*
The idea here is to pretend that we have a whole bunch of buckets of the minimum window size (1 min each)
A whole 8191 of them! Queries contribute to the bucket they belong, and each time we decay each bucket shifts down
Expand All @@ -56,18 +116,20 @@ impl<T: Decay> DecayBuffer<T> {
The above would be bad for performance, so we achieve almost the same result by creating "frames"
of increasing size, each of which can hold many buckets. When we decay we do the same operation as above,
assuming that each bucket within a frame holds the average value of all buckets within that frame. This results in some "smearing"
of the data, but reduces the size of our array from 8191 to just 7 at the expense of slight complexity and loss of accuracy.
of the data, but reduces the size of our array from 8191 to just 7 (assuming a LEN of 7) at the expense of
slight complexity and loss of accuracy.
*/
pub fn decay(&mut self) {
for i in (0..7).rev() {
for i in (0..L).rev() {
// Select buckets [i], [i+]
let (l, r) = self.frames.split_at_mut(i + 1);
let (this, next) = (l.last_mut().unwrap(), r.get_mut(0));

// Decay rate of 0.5% per smallest bucket resolution
// That is, each time we shift decay 0.5% per non-aggregated bucket
// Decay rate of 0.1% per smallest bucket resolution per point.
// That is, each time we shift decay 0.1% * points per non-aggregated bucket
// and aggregate the results into frames.
let retain = 0.995_f64.powf(4_f64.powf(i as f64));
let retain = 1.0 - ((D as f64) * 0.001f64);
let retain = retain.powf(4_f64.powf(i as f64));

// Shift one bucket, aggregating the results.
this.shift(next, 0.25_f64.powf(i as f64), retain);
Expand All @@ -92,6 +154,21 @@ impl Decay for f64 {
}
}

impl Decay for Duration {
fn shift(&mut self, next: Option<&mut Self>, fraction: f64, keep: f64) {
let secs = self.as_secs_f64();
let take = secs * fraction;
*self = Duration::from_secs_f64(secs - take);
if let Some(next) = next {
*next += Duration::from_secs_f64(take * keep);
}
}

fn clear(&mut self) {
*self = Duration::ZERO;
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
13 changes: 3 additions & 10 deletions src/indexer_selection/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
mod allocations;
mod block_requirements;
mod data_freshness;
mod decay;
pub mod decay;
mod economic_security;
mod indexers;
mod performance;
Expand Down Expand Up @@ -180,7 +180,7 @@ pub struct InputWriters {
}

pub struct Indexers {
network_params: NetworkParameters,
pub network_params: NetworkParameters,
indexers: SharedLookup<Address, IndexerDataReader>,
indexings: SharedLookup<Indexing, SelectionFactors>,
special_indexers: Eventual<HashMap<Address, NotNan<f64>>>,
Expand Down Expand Up @@ -301,8 +301,6 @@ impl Indexers {
freshness_requirements(&mut context.operations, block_resolver).await
}

// TODO: Specify budget in terms of a cost model -
// the budget should be different per query
pub async fn select_indexer(
&self,
config: &UtilityConfig,
Expand All @@ -312,13 +310,8 @@ impl Indexers {
context: &mut Context<'_>,
block_resolver: &BlockResolver,
freshness_requirements: &BlockRequirements,
budget: USD,
budget: GRT,
) -> Result<Option<(IndexerQuery, ScoringSample)>, SelectionError> {
let budget: GRT = self
.network_params
.usd_to_grt(budget)
.ok_or(SelectionError::MissingNetworkParams)?;

let selection = match self
.make_selection(
config,
Expand Down
52 changes: 48 additions & 4 deletions src/indexer_selection/price_efficiency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,51 @@ impl PriceEfficiency {
}
Ok(cost) => cost,
};
let fee: GRT = cost.to_string().parse::<GRTWei>().unwrap().shift();
let mut fee: GRT = cost.to_string().parse::<GRTWei>().unwrap().shift();

// Anything overpriced is refused.
if fee > *max_budget {
return Err(BadIndexerReason::FeeTooHigh.into());
}

// Set the minimum fee to a value which is lower than the
// revenue maximizing price. This is sort of a hack because
// indexers can't be bothered to optimize their cost models.
// We avoid query fees going too low, but also make it so that
// indexers can maximize their revenue by raising their prices
// to reward those who are putting more effort in than "default => 0.0000001;"
//
// This is somewhat hard, so we'll make some assumptions:
// First assumption: queries are proportional to utility. This is wrong,
// but it would simplify our revenue function to queries * utility
// Given:
// s = (-1.0 + 5.0f64.sqrt()) / 2.0
// x = queries
// w = weight
// p = normalized price
// u = utility = ((1.0 / (p + s)) - s) ^ w
// Then the derivative of x * u is:
// ((1 / (p + s) - s)^w - (w*p / (((p + s)^2) * ((1/(p+s)) - s)^(1.0-w))
// And, given a w, we want to solve for p such that the derivative is 0.
// This gives us the lower bound for the revenue maximizing price.
//
// I think this approximation is of a lower bound is reasonable
// (better not to overestimate and hurt the indexer's revenue)
// Solving for the correct revenue-maximizing value is complex and recursive (since the revenue
// maximizing price depends on the utility of all other indexers which itself depends
// on their revenue maximizing price... ad infinitum)
//
// TODO: I didn't solve for this generally over w yet. But, I know that if w is 0.5,
// then the correct value is ~ 0.601. Since price utility weights
// are currently hardcoded, then the closed solution over w can go in on the next iteration.
let min_rate = GRT::try_from(0.6).unwrap();
let min_optimal_fee = *max_budget * min_rate;
// If their fee is less than the min optimal, lerp between them so that
// indexers are rewarded for being closer.
if fee < min_optimal_fee {
fee = (min_optimal_fee + fee) * GRT::try_from(0.5).unwrap();
}

// I went over a bunch of options and am not happy with any of them.
// Also this is hard to summarize but I'll take a shot.
// It may help before explaining what this code does to understand
Expand Down Expand Up @@ -148,7 +186,12 @@ mod test {
eventuals::idle().await;
let mut context = Context::new(BASIC_QUERY, "").unwrap();
// Expected values based on https://www.desmos.com/calculator/kxd4kpjxi5
let tests = [(0.01, 0.0), (0.02, 0.2763), (0.1, 0.7746), (1.0, 0.9742)];
let tests = [
(0.01, 0.0),
(0.02, 0.487960),
(0.1, 0.64419),
(1.0, 0.68216),
];
for (budget, expected_utility) in tests {
let (fee, utility) = efficiency
.get_price(
Expand All @@ -158,9 +201,10 @@ mod test {
)
.await
.unwrap();
let utility = utility.utility.powf(utility.weight);
println!("fee: {}, {:?}", fee, utility);
assert_eq!(fee, "0.01".parse::<GRT>().unwrap());
assert_within(utility.utility, expected_utility, 0.0001);
assert!(fee >= "0.01".parse::<GRT>().unwrap());
assert_within(utility, expected_utility, 0.0001);
}
}
}
36 changes: 8 additions & 28 deletions src/indexer_selection/reputation.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,23 @@
use crate::indexer_selection::decay::{Decay, DecayUtility};
use crate::indexer_selection::decay::{impl_struct_decay, Decay, DecayUtility};

// TODO: Other factors like how long the indexer has been in the network.
// Because reliability (which is what is captured here now) is useful on it's own, it may be useful
// to separate this into it's own utility rather than figure out how to combine that with other
// "reputation" factors which are much more subjective.

#[derive(Debug, Default, Clone)]
#[derive(Debug, Default, Clone, Copy)]
pub struct Reputation {
successful_queries: f64,
failed_queries: f64,
penalties: f64,
}

impl_struct_decay!(Reputation {
successful_queries,
failed_queries,
penalties
});

impl DecayUtility for Reputation {
fn expected_utility(&self, _u_a: f64) -> f64 {
// Need to add a free success so that no buckets have a utility of 0.
Expand Down Expand Up @@ -41,32 +47,6 @@ impl DecayUtility for Reputation {
}
}

impl Decay for Reputation {
fn shift(&mut self, mut next: Option<&mut Self>, fraction: f64, keep: f64) {
self.successful_queries.shift(
next.as_deref_mut().map(|s| &mut s.successful_queries),
fraction,
keep,
);
self.failed_queries.shift(
next.as_deref_mut().map(|s| &mut s.failed_queries),
fraction,
keep,
);
self.penalties.shift(
next.as_deref_mut().map(|s| &mut s.penalties),
fraction,
keep,
);
}

fn clear(&mut self) {
self.successful_queries.clear();
self.failed_queries.clear();
self.penalties.clear();
}
}

impl Reputation {
pub fn add_successful_query(&mut self) {
self.successful_queries += 1.0;
Expand Down
1 change: 1 addition & 0 deletions src/indexer_selection/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ pub fn default_cost_model(price: GRT) -> CostModelSource {
}
}

#[track_caller]
pub fn assert_within(value: f64, expected: f64, tolerance: f64) {
let diff = (value - expected).abs();
assert!(
Expand Down
6 changes: 5 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,11 @@ async fn main() {
config: query_engine::Config {
indexer_selection_retry_limit: opt.indexer_selection_retry_limit,
utility: UtilityConfig::default(),
query_budget: opt.query_budget,
budget_factors: QueryBudgetFactors {
scale: opt.query_budget_scale,
discount: opt.query_budget_discount,
processes: (opt.replica_count * opt.location_count) as f64,
},
},
indexer_client: IndexerClient {
client: http_client.clone(),
Expand Down
26 changes: 22 additions & 4 deletions src/opt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,29 @@ pub struct Opt {
)]
pub block_cache_size: usize,
#[structopt(
long = "--query-budget",
env = "QUERY_BUDGET",
default_value = "0.0005"
long = "--query-budget-scale",
env = "QUERY_BUDGET_SCALE",
default_value = "3.1"
)]
pub query_budget: USD,
pub query_budget_scale: f64,
#[structopt(
long = "--query-budget-discount",
env = "QUERY_BUDGET_DISCOUNT",
default_value = "0.595"
)]
pub query_budget_discount: f64,
#[structopt(
help = "The number of processes per Gateway location. This is used when approximating worldwide query volume.",
long = "--replica-count",
env = "REPLICA_COUNT"
)]
pub replica_count: u64,
#[structopt(
help = "The number of geographic Gateway locations. This is used when approximating worldwide query volume.",
long = "--location-count",
env = "LOCATION_COUNT"
)]
pub location_count: u64,
#[structopt(long = "--port", env = "PORT", default_value = "6700")]
pub port: u16,
#[structopt(long = "--metrics-port", env = "METRICS_PORT", default_value = "7300")]
Expand Down
Loading

0 comments on commit 2d3e554

Please sign in to comment.