Skip to content

Commit

Permalink
worker: remove gouging checker from price table cache
Browse files Browse the repository at this point in the history
  • Loading branch information
ChrisSchinnerl committed Dec 2, 2024
1 parent c37dbdc commit 03266c9
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 148 deletions.
70 changes: 38 additions & 32 deletions worker/prices.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,14 @@ import (
"go.sia.tech/core/types"
)

const (
// priceRefreshInterval is the max interval at which we refresh price tables.
// It is set to 20 minutes since that equals 2 blocks on average which keeps
// prices reasonably up-to-date. If the price table has an expiry shorter
// than this, it will be refreshed sooner.
priceRefreshInterval = 20 * time.Minute
)

type (
PricesFetcher interface {
Prices(ctx context.Context) (rhpv4.HostPrices, error)
Expand All @@ -24,9 +32,10 @@ type (
cachedPrices struct {
hk types.PublicKey

mu sync.Mutex
prices rhpv4.HostPrices
update *pricesUpdate
mu sync.Mutex
prices rhpv4.HostPrices
renewTime time.Time
update *pricesUpdate
}

pricesUpdate struct {
Expand Down Expand Up @@ -71,34 +80,23 @@ func (p *cachedPrices) ongoingUpdate() (bool, *pricesUpdate) {
return ongoing, p.update
}

func (p *cachedPrices) fetch(ctx context.Context, h PricesFetcher) (prices rhpv4.HostPrices, err error) {
func (p *cachedPrices) fetch(ctx context.Context, h PricesFetcher) (rhpv4.HostPrices, error) {
// grab the current price table
p.mu.Lock()
prices = p.prices
prices := p.prices
p.mu.Unlock()

// get gouging checker to figure out how many blocks we have left before the
// current price table is considered to gouge on the block height
gc, err := GougingCheckerFromContext(ctx, false)
if err != nil {
return rhpv4.HostPrices{}, err
}

// figure out whether we should update the price table, if not we can return
if !prices.ValidUntil.IsZero() {
closeToGouging := gc.BlocksUntilBlockHeightGouging(prices.TipHeight) <= priceTableBlockHeightLeeway
closeToExpiring := time.Now().Add(priceTableValidityLeeway).After(prices.ValidUntil)
if !closeToExpiring && !closeToGouging {
return prices, nil
}
if !p.renewTime.IsZero() && time.Now().Before(p.renewTime) {
return prices, nil
}

// figure out whether an update is already ongoing, if there's one ongoing
// we can either wait or return early depending on whether the price table
// we have is still usable
ongoing, update := p.ongoingUpdate()
if ongoing && time.Now().Add(priceTableValidityLeeway).Before(prices.ValidUntil) {
return
return prices, nil
} else if ongoing {
select {
case <-ctx.Done():
Expand All @@ -109,19 +107,27 @@ func (p *cachedPrices) fetch(ctx context.Context, h PricesFetcher) (prices rhpv4
}

// this thread is updating the price table
defer func() {
update.prices = prices
update.err = err
close(update.done)

p.mu.Lock()
if err == nil {
p.prices = prices
prices, err := h.Prices(ctx)

// signal the other threads that the update is done
update.prices = prices
update.err = err
close(update.done)

p.mu.Lock()
defer p.mu.Unlock()

if err == nil {
p.prices = prices

// renew the prices 2 blocks after receiving them or 30 seconds
// before expiry, whatever comes first
p.renewTime = time.Now().Add(priceRefreshInterval)
if expiry := p.prices.ValidUntil.Add(-priceTableValidityLeeway); expiry.Before(p.renewTime) {
p.renewTime = expiry
}
p.update = nil
p.mu.Unlock()
}()
}
p.update = nil

// otherwise fetch it
return h.Prices(ctx)
return prices, err
}
59 changes: 21 additions & 38 deletions worker/prices_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,30 +13,17 @@ import (
)

func TestPricesCache(t *testing.T) {
cm := mocks.NewChain(api.ConsensusState{
BlockHeight: 1,
})
cState, _ := cm.ConsensusState(context.Background())

blockHeightLeeway := 10
gCtx := WithGougingChecker(context.Background(), cm, api.GougingParams{
ConsensusState: cState,
GougingSettings: api.GougingSettings{
HostBlockHeightLeeway: blockHeightLeeway,
},
})

cache := newPricesCache()
hk := types.PublicKey{1}
hostMock := mocks.NewHost(hk)
c := mocks.NewContract(hk, types.FileContractID{1})

// expire its price table
// expire its prices
expiredPT := newTestHostPriceTable()
expiredPT.Expiry = time.Now()
hostMock.UpdatePriceTable(expiredPT)

// manage the host, make sure fetching the price table blocks
// manage the host, make sure fetching the prices blocks
fetchPTBlockChan := make(chan struct{})
validPrices := newTestHostPrices()
h := newTestHostCustom(hostMock, c, func() api.HostPriceTable {
Expand All @@ -48,50 +35,46 @@ func TestPricesCache(t *testing.T) {
})

// trigger a fetch to make it block
go cache.fetch(gCtx, h)
go cache.fetch(context.Background(), h)
time.Sleep(50 * time.Millisecond)

// fetch it again but with a canceled context to avoid blocking
// indefinitely, the error will indicate we were blocking on a price table
// indefinitely, the error will indicate we were blocking on a prices
// update
ctx, cancel := context.WithCancel(gCtx)
ctx, cancel := context.WithCancel(context.Background())
cancel()
_, err := cache.fetch(ctx, h)
if !errors.Is(err, errPriceTableUpdateTimedOut) {
t.Fatal("expected errPriceTableUpdateTimedOut, got", err)
}

// unblock and assert we paid for the price table
// unblock and assert we paid for the prices
close(fetchPTBlockChan)
update, err := cache.fetch(gCtx, h)
update, err := cache.fetch(context.Background(), h)
if err != nil {
t.Fatal(err)
} else if update.Signature != validPrices.Signature {
t.Fatal("price table mismatch")
t.Fatal("prices mismatch")
}

// refresh the price table on the host, update again, assert we receive the
// same price table as it hasn't expired yet
h.UpdatePriceTable(newTestHostPriceTable())
update, err = cache.fetch(gCtx, h)
// refresh the prices on the host, update again, assert we receive the
// same prices as it hasn't expired yet
oldValidPrices := validPrices
validPrices = newTestHostPrices()
h.UpdatePrices(validPrices)
update, err = cache.fetch(context.Background(), h)
if err != nil {
t.Fatal(err)
} else if update.Signature != validPrices.Signature {
t.Fatal("price table mismatch")
} else if update.Signature != oldValidPrices.Signature {
t.Fatal("prices mismatch")
}

// increase the current block height to be exactly
// 'priceTableBlockHeightLeeway' blocks before the leeway of the gouging
// settings
h.UpdatePrices(newTestHostPrices())
validPrices = h.HostPrices()
cm.UpdateHeight(validPrices.TipHeight + uint64(blockHeightLeeway) - priceTableBlockHeightLeeway)

// fetch it again and assert we updated the price table
update, err = cache.fetch(gCtx, h)
// manually expire the prices
cache.cache[h.PublicKey()].renewTime = time.Now().Add(-time.Second)
update, err = cache.fetch(context.Background(), h)
if err != nil {
t.Fatal(err)
} else if update.Signature != h.HostPrices().Signature {
t.Fatal("price table mismatch")
} else if update.Signature != validPrices.Signature {
t.Fatal("prices mismatch")
}
}
70 changes: 29 additions & 41 deletions worker/pricetables.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,11 @@ import (
"context"
"errors"
"fmt"
"math"
"sync"
"time"

rhpv3 "go.sia.tech/core/rhp/v3"
"go.sia.tech/core/types"
"go.sia.tech/renterd/api"
"lukechampine.com/frand"
)

const (
Expand Down Expand Up @@ -46,9 +43,10 @@ type (
priceTable struct {
hk types.PublicKey

mu sync.Mutex
hpt api.HostPriceTable
update *priceTableUpdate
mu sync.Mutex
hpt api.HostPriceTable
renewTime time.Time
update *priceTableUpdate
}

priceTableUpdate struct {
Expand Down Expand Up @@ -93,35 +91,23 @@ func (pt *priceTable) ongoingUpdate() (bool, *priceTableUpdate) {
return ongoing, pt.update
}

func (p *priceTable) fetch(ctx context.Context, h priceTableFetcher, rev *types.FileContractRevision) (hpt api.HostPriceTable, cost types.Currency, err error) {
func (p *priceTable) fetch(ctx context.Context, h priceTableFetcher, rev *types.FileContractRevision) (api.HostPriceTable, types.Currency, error) {
// grab the current price table
p.mu.Lock()
hpt = p.hpt
hpt := p.hpt
p.mu.Unlock()

// get gouging checker to figure out how many blocks we have left before the
// current price table is considered to gouge on the block height
gc, err := GougingCheckerFromContext(ctx, false)
if err != nil {
return api.HostPriceTable{}, types.ZeroCurrency, err
}

// figure out whether we should update the price table, if not we can return
if hpt.UID != (rhpv3.SettingsID{}) {
randomUpdateLeeway := frand.Intn(int(math.Floor(hpt.HostPriceTable.Validity.Seconds() * 0.1)))
closeToGouging := gc.BlocksUntilBlockHeightGouging(hpt.HostBlockHeight) <= priceTableBlockHeightLeeway
closeToExpiring := time.Now().Add(priceTableValidityLeeway).Add(time.Duration(randomUpdateLeeway) * time.Second).After(hpt.Expiry)
if !closeToExpiring && !closeToGouging {
return
}
if !p.renewTime.IsZero() && time.Now().Before(p.renewTime) {
return hpt, types.ZeroCurrency, nil
}

// figure out whether an update is already ongoing, if there's one ongoing
// we can either wait or return early depending on whether the price table
// we have is still usable
ongoing, update := p.ongoingUpdate()
if ongoing && time.Now().Add(priceTableValidityLeeway).Before(hpt.Expiry) {
return
return hpt, types.ZeroCurrency, nil
} else if ongoing {
select {
case <-ctx.Done():
Expand All @@ -132,25 +118,27 @@ func (p *priceTable) fetch(ctx context.Context, h priceTableFetcher, rev *types.
}

// this thread is updating the price table
defer func() {
update.hpt = hpt
update.err = err
close(update.done)

p.mu.Lock()
if err == nil {
p.hpt = hpt
}
p.update = nil
p.mu.Unlock()
}()
hpt, cost, err := h.PriceTable(ctx, rev)

// otherwise fetch it
hpt, cost, err = h.PriceTable(ctx, rev)
// signal the other threads that the update is done
update.hpt = hpt
update.err = err
close(update.done)

// handle error after recording
if err != nil {
return api.HostPriceTable{}, types.ZeroCurrency, fmt.Errorf("failed to update pricetable, err %v", err)
p.mu.Lock()
defer p.mu.Unlock()

if err == nil {
p.hpt = hpt

// renew the prices 2 blocks after receiving them or 30 seconds
// before expiry, whatever comes first
p.renewTime = time.Now().Add(priceRefreshInterval) // 2 blocks
if expiry := p.hpt.Expiry.Add(-priceTableValidityLeeway); expiry.Before(p.renewTime) {
p.renewTime = expiry
}
}
return
p.update = nil

return hpt, cost, err
}
Loading

0 comments on commit 03266c9

Please sign in to comment.