Skip to content

Commit

Permalink
bus: remove migration surcharge multiplier
Browse files Browse the repository at this point in the history
  • Loading branch information
peterjan committed Dec 2, 2024
1 parent 6536815 commit efcc1a7
Show file tree
Hide file tree
Showing 19 changed files with 62 additions and 127 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
default: minor
---

# Remove hostBlockHeightLeeway from the gouging settings.
4 changes: 0 additions & 4 deletions api/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,10 +490,6 @@ func formatSettingsMetricName(gp GougingParams, name string) (metrics []promethe
Name: fmt.Sprintf("renterd_%s_settings_minmaxephemeralaccountbalance", name),
Value: gp.GougingSettings.MinMaxEphemeralAccountBalance.Siacoins(),
})
metrics = append(metrics, prometheus.Metric{
Name: fmt.Sprintf("renterd_%s_settings_migrationsurchargemultiplier", name),
Value: float64(gp.GougingSettings.MigrationSurchargeMultiplier),
})
metrics = append(metrics, prometheus.Metric{
Name: fmt.Sprintf("renterd_%s_redundancy_settings_minshards", name),
Value: float64(gp.RedundancySettings.MinShards),
Expand Down
12 changes: 0 additions & 12 deletions api/setting.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ var (
MinPriceTableValidity: 5 * time.Minute, // 5 minutes
MinAccountExpiry: 24 * time.Hour, // 1 day
MinMaxEphemeralAccountBalance: types.Siacoins(1), // 1 SC
MigrationSurchargeMultiplier: 10, // 10x
}

// DefaultPinnedSettings define the default price pin settings the bus is
Expand Down Expand Up @@ -114,12 +113,6 @@ type (
// MinMaxEphemeralAccountBalance is the minimum accepted value for
// `MaxEphemeralAccountBalance` in the host's price settings.
MinMaxEphemeralAccountBalance types.Currency `json:"minMaxEphemeralAccountBalance"`

// MigrationSurchargeMultiplier is the multiplier applied to the
// 'MaxDownloadPrice' when checking whether a host is too expensive,
// this multiplier is only applied for when trying to migrate critically
// low-health slabs.
MigrationSurchargeMultiplier uint64 `json:"migrationSurchargeMultiplier"`
}

// PinnedSettings holds the configuration for pinning certain settings to a
Expand Down Expand Up @@ -224,11 +217,6 @@ func (gs GougingSettings) Validate() error {
if gs.MinPriceTableValidity < 10*time.Second {
return errors.New("MinPriceTableValidity must be at least 10 seconds")
}
_, overflow := gs.MaxDownloadPrice.Mul64WithOverflow(gs.MigrationSurchargeMultiplier)
if overflow {
maxMultiplier := types.MaxCurrency.Div(gs.MaxDownloadPrice).Big().Uint64()
return fmt.Errorf("MigrationSurchargeMultiplier must be less than %v, otherwise applying it to MaxDownloadPrice overflows the currency type", maxMultiplier)
}
return nil
}

Expand Down
1 change: 0 additions & 1 deletion autopilot/contractor/evaluate.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ func EvaluateConfig(cfg api.AutopilotConfig, cs api.ConsensusState, rs api.Redun
MinPriceTableValidity: gs.MinPriceTableValidity,
MinAccountExpiry: gs.MinAccountExpiry,
MinMaxEphemeralAccountBalance: gs.MinMaxEphemeralAccountBalance,
MigrationSurchargeMultiplier: gs.MigrationSurchargeMultiplier,
}
}

Expand Down
2 changes: 1 addition & 1 deletion internal/host/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (

type (
Downloader interface {
DownloadSector(ctx context.Context, w io.Writer, root types.Hash256, offset, length uint32, overpay bool) error
DownloadSector(ctx context.Context, w io.Writer, root types.Hash256, offset, length uint32) error
PublicKey() types.PublicKey
}

Expand Down
1 change: 0 additions & 1 deletion internal/rhp/v3/rhp.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ func IsClosedStream(err error) bool {
}
func IsInsufficientFunds(err error) bool { return utils.IsErr(err, errInsufficientFunds) }
func IsPriceTableExpired(err error) bool { return utils.IsErr(err, errPriceTableExpired) }
func IsPriceTableGouging(err error) bool { return utils.IsErr(err, gouging.ErrPriceTableGouging) }
func IsPriceTableNotFound(err error) bool { return utils.IsErr(err, errPriceTableNotFound) }
func IsSectorNotFound(err error) bool {
return utils.IsErr(err, ErrSectorNotFound) || utils.IsErr(err, errSectorNotFoundOld)
Expand Down
6 changes: 6 additions & 0 deletions internal/sql/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,12 @@ var (
return performMigration(ctx, tx, migrationsFs, dbIdentifier, "00029_contract_elements", log)
},
},
{
ID: "00030_update_gouging_settings",
Migrate: func(tx Tx) error {
return performMigration(ctx, tx, migrationsFs, dbIdentifier, "00030_update_gouging_settings", log)
},
},
}
}
MetricsMigrations = func(ctx context.Context, migrationsFs embed.FS, log *zap.SugaredLogger) []Migration {
Expand Down
2 changes: 1 addition & 1 deletion internal/test/mocks/hoststore.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (hm *HostManager) Host(hk types.PublicKey, fcid types.FileContractID, siamu
return NewHost(hk)
}

func (h *Host) DownloadSector(ctx context.Context, w io.Writer, root types.Hash256, offset, length uint32, overpay bool) error {
func (h *Host) DownloadSector(ctx context.Context, w io.Writer, root types.Hash256, offset, length uint32) error {
return errors.New("implement when needed")
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
UPDATE settings SET value=JSON_REMOVE(value, '$.migrationSurchargeMultiplier') WHERE `key`="gouging";
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
UPDATE settings SET value=JSON_REMOVE(value, '$.migrationSurchargeMultiplier') WHERE `key`="gouging";
2 changes: 1 addition & 1 deletion worker/alerts.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func newMigrationFailedAlert(slabKey object.EncryptionKey, health float64, objec
"error": err.Error(),
"health": health,
"slabKey": slabKey.String(),
"hint": "Migration failures can be temporary, but if they persist it can eventually lead to data loss and should therefor be taken very seriously. It might be necessary to increase the MigrationSurchargeMultiplier in the gouging settings to ensure it has every chance of succeeding.",
"hint": "Migration failures can be temporary, but if they persist it can eventually lead to data loss and should therefor be taken very seriously.",
}

if len(objects) > 0 {
Expand Down
105 changes: 28 additions & 77 deletions worker/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@ import (
)

const (
downloadMemoryLimitDenom = 6 // 1/6th of the available download memory can be used by a single download
downloadOverpayHealthThreshold = 0.25
downloadMemoryLimitDenom = 6 // 1/6th of the available download memory can be used by a single download
)

var (
Expand Down Expand Up @@ -67,27 +66,23 @@ type (
length uint32

created time.Time
overpay bool

mu sync.Mutex
lastOverdrive time.Time
numCompleted int
numInflight uint64
numLaunched uint64
numOverdriving uint64
numOverpaid uint64
numRelaunched uint64

sectors []*sectorInfo
errs utils.HostErrorSet
}

slabDownloadResponse struct {
mem memory.Memory
surchargeApplied bool
shards [][]byte
index int
err error
mem memory.Memory
shards [][]byte
index int
err error
}

sectorDownloadReq struct {
Expand All @@ -98,7 +93,6 @@ type (
root types.Hash256
host *downloader

overpay bool
overdrive bool
sectorIndex int
resps *sectorResponses
Expand Down Expand Up @@ -286,14 +280,13 @@ func (mgr *downloadManager) DownloadObject(ctx context.Context, w io.Writer, o o
wg.Add(1)
go func(index int) {
defer wg.Done()
shards, surchargeApplied, err := mgr.downloadSlab(ctx, next.SlabSlice, false)
shards, err := mgr.downloadSlab(ctx, next.SlabSlice)
select {
case responseChan <- &slabDownloadResponse{
mem: mem,
surchargeApplied: surchargeApplied,
shards: shards,
index: index,
err: err,
mem: mem,
shards: shards,
index: index,
err: err,
}:
case <-ctx.Done():
mem.Release() // relase memory if we're interrupted
Expand Down Expand Up @@ -376,7 +369,7 @@ outer:
return nil
}

func (mgr *downloadManager) DownloadSlab(ctx context.Context, slab object.Slab, hosts []api.HostInfo) ([][]byte, bool, error) {
func (mgr *downloadManager) DownloadSlab(ctx context.Context, slab object.Slab, hosts []api.HostInfo) ([][]byte, error) {
// refresh the downloaders
mgr.refreshDownloaders(hosts)

Expand All @@ -396,7 +389,7 @@ func (mgr *downloadManager) DownloadSlab(ctx context.Context, slab object.Slab,

// check if we have enough shards
if availableShards < slab.MinShards {
return nil, false, fmt.Errorf("not enough hosts available to download the slab: %v/%v", availableShards, slab.MinShards)
return nil, fmt.Errorf("not enough hosts available to download the slab: %v/%v", availableShards, slab.MinShards)
}

// NOTE: we don't acquire memory here since DownloadSlab is only used for
Expand All @@ -408,19 +401,19 @@ func (mgr *downloadManager) DownloadSlab(ctx context.Context, slab object.Slab,
Offset: 0,
Length: uint32(slab.MinShards) * rhpv2.SectorSize,
}
shards, surchargeApplied, err := mgr.downloadSlab(ctx, slice, true)
shards, err := mgr.downloadSlab(ctx, slice)
if err != nil {
return nil, false, err
return nil, err
}

// decrypt and recover
slice.Decrypt(shards)
err = slice.Reconstruct(shards)
if err != nil {
return nil, false, err
return nil, err
}

return shards, surchargeApplied, err
return shards, err
}

func (mgr *downloadManager) Stats() downloadManagerStats {
Expand Down Expand Up @@ -518,7 +511,7 @@ func (mgr *downloadManager) refreshDownloaders(hosts []api.HostInfo) {
}
}

func (mgr *downloadManager) newSlabDownload(slice object.SlabSlice, migration bool) *slabDownload {
func (mgr *downloadManager) newSlabDownload(slice object.SlabSlice) *slabDownload {
// calculate the offset and length
offset, length := slice.SectorRegion()

Expand All @@ -545,16 +538,15 @@ func (mgr *downloadManager) newSlabDownload(slice object.SlabSlice, migration bo
length: length,

created: time.Now(),
overpay: migration && slice.Health <= downloadOverpayHealthThreshold,

sectors: sectors,
errs: make(utils.HostErrorSet),
}
}

func (mgr *downloadManager) downloadSlab(ctx context.Context, slice object.SlabSlice, migration bool) ([][]byte, bool, error) {
func (mgr *downloadManager) downloadSlab(ctx context.Context, slice object.SlabSlice) ([][]byte, error) {
// prepare new download
slab := mgr.newSlabDownload(slice, migration)
slab := mgr.newSlabDownload(slice)

// execute download
return slab.download(ctx)
Expand Down Expand Up @@ -698,13 +690,6 @@ func (s *slabDownload) nextRequest(ctx context.Context, resps *sectorResponses,
root: next.root,
host: fastest,

// overpay is set to 'true' when a request is retried after the slab
// download failed and we realise that it might have succeeded if we
// allowed overpaying for certain sectors, we only do this when trying
// to migrate a critically low-health slab that might otherwise be
// unrecoverable
overpay: false,

overdrive: overdrive,
sectorIndex: next.index,
resps: resps,
Expand All @@ -717,7 +702,7 @@ func (s *slabDownload) nextRequest(ctx context.Context, resps *sectorResponses,
return nil
}

func (s *slabDownload) download(ctx context.Context) ([][]byte, bool, error) {
func (s *slabDownload) download(ctx context.Context) ([][]byte, error) {
// cancel any sector downloads once the download is done
ctx, cancel := context.WithCancel(ctx)
defer cancel()
Expand All @@ -735,25 +720,20 @@ func (s *slabDownload) download(ctx context.Context) ([][]byte, bool, error) {
for i := 0; i < int(s.minShards); {
req := s.nextRequest(ctx, resps, false)
if req == nil {
return nil, false, fmt.Errorf("no host available for shard %d", i)
return nil, fmt.Errorf("no host available for shard %d", i)
}
s.launch(req)
i++
}

// collect requests that failed due to gouging
var gouging []*sectorDownloadReq

// collect responses
var done bool

loop:
for s.inflight() > 0 && !done {
select {
case <-s.mgr.shutdownCtx.Done():
return nil, false, errors.New("download stopped")
return nil, errors.New("download stopped")
case <-ctx.Done():
return nil, false, context.Cause(ctx)
return nil, context.Cause(ctx)
case <-resps.c:
resetOverdrive()
}
Expand Down Expand Up @@ -782,22 +762,11 @@ loop:
if err := s.mgr.os.DeleteHostSector(ctx, resp.req.host.PublicKey(), resp.req.root); err != nil {
s.mgr.logger.Errorw("failed to mark sector as lost", "hk", resp.req.host.PublicKey(), "root", resp.req.root, zap.Error(err))
}
} else if rhp3.IsPriceTableGouging(resp.err) && s.overpay && !resp.req.overpay {
resp.req.overpay = true // ensures we don't retry the same request over and over again
gouging = append(gouging, resp.req)
}
}
}
}

if !done && len(gouging) >= s.missing() {
for _, req := range gouging {
s.launch(req)
}
gouging = nil
goto loop
}

// track stats
s.mgr.statsOverdrivePct.Track(s.overdrivePct())
s.mgr.statsSlabDownloadSpeedBytesPerMS.Track(float64(s.downloadSpeed()))
Expand All @@ -808,7 +777,7 @@ func (s *slabDownload) overdrivePct() float64 {
s.mu.Lock()
defer s.mu.Unlock()

numOverdrive := (int(s.numLaunched) + int(s.numRelaunched)) - s.minShards
numOverdrive := int(s.numLaunched) - s.minShards
if numOverdrive < 0 {
numOverdrive = 0
}
Expand All @@ -827,27 +796,18 @@ func (s *slabDownload) downloadSpeed() int64 {
return int64(bytes) / ms
}

func (s *slabDownload) finish() ([][]byte, bool, error) {
func (s *slabDownload) finish() ([][]byte, error) {
s.mu.Lock()
defer s.mu.Unlock()
if s.numCompleted < s.minShards {
return nil, s.numOverpaid > 0, fmt.Errorf("failed to download slab: completed=%d inflight=%d launched=%d relaunched=%d overpaid=%d downloaders=%d errors=%d %v", s.numCompleted, s.numInflight, s.numLaunched, s.numRelaunched, s.numOverpaid, s.mgr.numDownloaders(), len(s.errs), s.errs)
return nil, fmt.Errorf("failed to download slab: completed=%d inflight=%d launched=%d downloaders=%d errors=%d %v", s.numCompleted, s.numInflight, s.numLaunched, s.mgr.numDownloaders(), len(s.errs), s.errs)
}

data := make([][]byte, len(s.sectors))
for i, sector := range s.sectors {
data[i] = sector.data
}
return data, s.numOverpaid > 0, nil
}

func (s *slabDownload) missing() int {
s.mu.Lock()
defer s.mu.Unlock()
if s.numCompleted < s.minShards {
return s.minShards - s.numCompleted
}
return 0
return data, nil
}

func (s *slabDownload) inflight() uint64 {
Expand All @@ -868,11 +828,7 @@ func (s *slabDownload) launch(req *sectorDownloadReq) {
if req.overdrive {
s.numOverdriving++
}
if req.overpay {
s.numRelaunched++
} else {
s.numLaunched++
}
s.numLaunched++
}

func (s *slabDownload) receive(resp sectorDownloadResp) (finished bool) {
Expand All @@ -891,11 +847,6 @@ func (s *slabDownload) receive(resp sectorDownloadResp) (finished bool) {
return false
}

// update num overpaid
if resp.req.overpay {
s.numOverpaid++
}

// store the sector
if len(s.sectors[resp.req.sectorIndex].data) == 0 {
s.sectors[resp.req.sectorIndex].data = resp.sector
Expand Down
Loading

0 comments on commit efcc1a7

Please sign in to comment.