Skip to content

Commit

Permalink
worker: ensure we penalise slow hosts on upload
Browse files Browse the repository at this point in the history
  • Loading branch information
peterjan committed Apr 26, 2024
1 parent c78947c commit b2b978e
Show file tree
Hide file tree
Showing 7 changed files with 175 additions and 141 deletions.
18 changes: 14 additions & 4 deletions worker/host_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,16 +93,26 @@ func (h *testHost) DownloadSector(ctx context.Context, w io.Writer, root types.H
}

func (h *testHost) UploadSector(ctx context.Context, sectorRoot types.Hash256, sector *[rhpv2.SectorSize]byte, rev types.FileContractRevision) error {
h.AddSector(sectorRoot, sector)
if h.uploadErr != nil {
return h.uploadErr
} else if h.uploadDelay > 0 {
// sleep if necessary
if h.uploadDelay > 0 {
select {
case <-time.After(h.uploadDelay):
case <-ctx.Done():
return context.Cause(ctx)
}
}

// check for cancellation
select {
case <-ctx.Done():
return context.Cause(ctx)
default:
}

if h.uploadErr != nil {
return h.uploadErr
}
h.AddSector(sectorRoot, sector)
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion worker/mocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ func newContractStoreMock() *contractStoreMock {
}

func (*contractStoreMock) RenewedContract(context.Context, types.FileContractID) (api.ContractMetadata, error) {
return api.ContractMetadata{}, nil
return api.ContractMetadata{}, api.ErrContractNotFound
}

func (*contractStoreMock) Contract(context.Context, types.FileContractID) (api.ContractMetadata, error) {
Expand Down
10 changes: 7 additions & 3 deletions worker/rhpv2.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
rhpv2 "go.sia.tech/core/rhp/v2"
"go.sia.tech/core/types"
"go.sia.tech/renterd/api"
"go.sia.tech/renterd/internal/utils"
"go.sia.tech/siad/build"
"go.sia.tech/siad/crypto"
"lukechampine.com/frand"
Expand Down Expand Up @@ -85,9 +86,12 @@ func (hes HostErrorSet) Error() string {
return "\n" + strings.Join(strs, "\n")
}

func wrapErr(err *error, fnName string) {
func wrapErr(ctx context.Context, fnName string, err *error) {
if *err != nil {
*err = fmt.Errorf("%s: %w", fnName, *err)
if cause := context.Cause(ctx); cause != nil && !utils.IsErr(*err, cause) {
*err = fmt.Errorf("%w; %w", cause, *err)
}
}
}

Expand Down Expand Up @@ -133,7 +137,7 @@ func updateRevisionOutputs(rev *types.FileContractRevision, cost, collateral typ

// RPCSettings calls the Settings RPC, returning the host's reported settings.
func RPCSettings(ctx context.Context, t *rhpv2.Transport) (settings rhpv2.HostSettings, err error) {
defer wrapErr(&err, "Settings")
defer wrapErr(ctx, "Settings", &err)

var resp rhpv2.RPCSettingsResponse
if err := t.Call(rhpv2.RPCSettingsID, nil, &resp); err != nil {
Expand All @@ -147,7 +151,7 @@ func RPCSettings(ctx context.Context, t *rhpv2.Transport) (settings rhpv2.HostSe

// RPCFormContract forms a contract with a host.
func RPCFormContract(ctx context.Context, t *rhpv2.Transport, renterKey types.PrivateKey, txnSet []types.Transaction) (_ rhpv2.ContractRevision, _ []types.Transaction, err error) {
defer wrapErr(&err, "FormContract")
defer wrapErr(ctx, "FormContract", &err)

// strip our signatures before sending
parents, txn := txnSet[:len(txnSet)-1], txnSet[len(txnSet)-1]
Expand Down
15 changes: 8 additions & 7 deletions worker/rhpv3.go
Original file line number Diff line number Diff line change
Expand Up @@ -623,7 +623,7 @@ type PriceTablePaymentFunc func(pt rhpv3.HostPriceTable) (rhpv3.PaymentMethod, e

// RPCPriceTable calls the UpdatePriceTable RPC.
func RPCPriceTable(ctx context.Context, t *transportV3, paymentFunc PriceTablePaymentFunc) (_ api.HostPriceTable, err error) {
defer wrapErr(&err, "PriceTable")
defer wrapErr(ctx, "PriceTable", &err)

s, err := t.DialStream(ctx)
if err != nil {
Expand Down Expand Up @@ -660,7 +660,7 @@ func RPCPriceTable(ctx context.Context, t *transportV3, paymentFunc PriceTablePa

// RPCAccountBalance calls the AccountBalance RPC.
func RPCAccountBalance(ctx context.Context, t *transportV3, payment rhpv3.PaymentMethod, account rhpv3.Account, settingsID rhpv3.SettingsID) (bal types.Currency, err error) {
defer wrapErr(&err, "AccountBalance")
defer wrapErr(ctx, "AccountBalance", &err)
s, err := t.DialStream(ctx)
if err != nil {
return types.ZeroCurrency, err
Expand All @@ -685,7 +685,7 @@ func RPCAccountBalance(ctx context.Context, t *transportV3, payment rhpv3.Paymen

// RPCFundAccount calls the FundAccount RPC.
func RPCFundAccount(ctx context.Context, t *transportV3, payment rhpv3.PaymentMethod, account rhpv3.Account, settingsID rhpv3.SettingsID) (err error) {
defer wrapErr(&err, "FundAccount")
defer wrapErr(ctx, "FundAccount", &err)
s, err := t.DialStream(ctx)
if err != nil {
return err
Expand All @@ -712,7 +712,7 @@ func RPCFundAccount(ctx context.Context, t *transportV3, payment rhpv3.PaymentMe
// fetching a pricetable using the fetched revision to pay for it. If
// paymentFunc returns 'nil' as payment, the host is not paid.
func RPCLatestRevision(ctx context.Context, t *transportV3, contractID types.FileContractID, paymentFunc func(rev *types.FileContractRevision) (rhpv3.HostPriceTable, rhpv3.PaymentMethod, error)) (_ types.FileContractRevision, err error) {
defer wrapErr(&err, "LatestRevision")
defer wrapErr(ctx, "LatestRevision", &err)
s, err := t.DialStream(ctx)
if err != nil {
return types.FileContractRevision{}, err
Expand All @@ -738,7 +738,7 @@ func RPCLatestRevision(ctx context.Context, t *transportV3, contractID types.Fil

// RPCReadSector calls the ExecuteProgram RPC with a ReadSector instruction.
func RPCReadSector(ctx context.Context, t *transportV3, w io.Writer, pt rhpv3.HostPriceTable, payment rhpv3.PaymentMethod, offset, length uint32, merkleRoot types.Hash256) (cost, refund types.Currency, err error) {
defer wrapErr(&err, "ReadSector")
defer wrapErr(ctx, "ReadSector", &err)
s, err := t.DialStream(ctx)
if err != nil {
return types.ZeroCurrency, types.ZeroCurrency, err
Expand Down Expand Up @@ -803,7 +803,7 @@ func RPCReadSector(ctx context.Context, t *transportV3, w io.Writer, pt rhpv3.Ho
}

func RPCAppendSector(ctx context.Context, t *transportV3, renterKey types.PrivateKey, pt rhpv3.HostPriceTable, rev *types.FileContractRevision, payment rhpv3.PaymentMethod, sectorRoot types.Hash256, sector *[rhpv2.SectorSize]byte) (cost types.Currency, err error) {
defer wrapErr(&err, "AppendSector")
defer wrapErr(ctx, "AppendSector", &err)

// sanity check revision first
if rev.RevisionNumber == math.MaxUint64 {
Expand Down Expand Up @@ -941,7 +941,8 @@ func RPCAppendSector(ctx context.Context, t *transportV3, renterKey types.Privat
}

func RPCRenew(ctx context.Context, rrr api.RHPRenewRequest, bus Bus, t *transportV3, pt *rhpv3.HostPriceTable, rev types.FileContractRevision, renterKey types.PrivateKey, l *zap.SugaredLogger) (_ rhpv2.ContractRevision, _ []types.Transaction, _ types.Currency, err error) {
defer wrapErr(&err, "RPCRenew")
defer wrapErr(ctx, "RPCRenew", &err)

s, err := t.DialStream(ctx)
if err != nil {
return rhpv2.ContractRevision{}, nil, types.ZeroCurrency, fmt.Errorf("failed to dial stream: %w", err)
Expand Down
15 changes: 8 additions & 7 deletions worker/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,11 @@ const (
)

var (
errContractExpired = errors.New("contract expired")
errNoCandidateUploader = errors.New("no candidate uploader found")
errNotEnoughContracts = errors.New("not enough contracts to support requested redundancy")
errUploadInterrupted = errors.New("upload was interrupted")
errContractExpired = errors.New("contract expired")
errNoCandidateUploader = errors.New("no candidate uploader found")
errNotEnoughContracts = errors.New("not enough contracts to support requested redundancy")
errUploadInterrupted = errors.New("upload was interrupted")
errSectorUploadFinished = errors.New("sector upload already finished")
)

type (
Expand Down Expand Up @@ -117,7 +118,7 @@ type (
root types.Hash256

ctx context.Context
cancel context.CancelFunc
cancel context.CancelCauseFunc

mu sync.Mutex
uploaded object.Sector
Expand Down Expand Up @@ -750,7 +751,7 @@ func (u *upload) newSlabUpload(ctx context.Context, shards [][]byte, uploaders [
wg.Add(1)
go func(idx int) {
// create the ctx
sCtx, sCancel := context.WithCancel(ctx)
sCtx, sCancel := context.WithCancelCause(ctx)

// create the sector
// NOTE: we are computing the sector root here and pass it all the
Expand Down Expand Up @@ -1087,7 +1088,7 @@ func (s *sectorUpload) finish(sector object.Sector) {
s.mu.Lock()
defer s.mu.Unlock()

s.cancel()
s.cancel(errSectorUploadFinished)
s.uploaded = sector
s.data = nil
}
Expand Down
95 changes: 61 additions & 34 deletions worker/uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,22 +116,35 @@ outer:
}

// execute it
elapsed, err := u.execute(req)

// the uploader's contract got renewed, requeue the request
if errors.Is(err, errMaxRevisionReached) {
if u.tryRefresh(req.sector.ctx) {
start := time.Now()
duration, err := u.execute(req)
if err == nil {
// only track the time it took to upload the sector in the happy case
u.trackSectorUpload(true, duration)
u.trackConsecutiveFailures(true)
} else if errors.Is(err, errMaxRevisionReached) {
// the uploader's contract got renewed, requeue the request
if err := u.tryRefresh(); err == nil {
u.enqueue(req)
continue outer
} else if !utils.IsErr(err, context.Canceled) {
u.logger.Errorf("failed to refresh the uploader's contract %v, err: %v", u.ContractID(), err)
}
}

// track the error, ignore gracefully closed streams and canceled overdrives
canceledOverdrive := req.done() && req.overdrive && err != nil
if !canceledOverdrive && !isClosedStream(err) {
u.trackSectorUpload(err, elapsed)
u.logger.Debugw("skip tracking sector upload", "total", time.Since(start), "duration", duration, "overdrive", req.overdrive, "err", err)
} else if errors.Is(err, errSectorUploadFinished) && !req.overdrive {
// punish the slow host by tracking a multiple of the total time
// we lost on it, but we only do so if we weren't overdriving,
// also note we are not tracking consecutive failures here
// because we're not sure if we had a successful host
// interaction
u.trackSectorUpload(true, time.Since(start)*10)
} else if !errors.Is(err, errSectorUploadFinished) {
// punish the host for failing the upload
u.trackSectorUpload(false, time.Hour)
u.trackConsecutiveFailures(false)
u.logger.Debugw("penalising host for failing to upload sector", "hk", u.hk, "overdrive", req.overdrive, "err", err)
} else {
u.logger.Debugw("not tracking sector upload metric", zap.Error(err))
u.logger.Debugw("skip tracking sector upload", "total", time.Since(start), "duration", duration, "overdrive", req.overdrive, "err", err)
}

// send the response
Expand Down Expand Up @@ -198,6 +211,8 @@ func (u *uploader) estimate() float64 {
return numSectors * estimateP90
}

// execute executes the sector upload request, if the upload was successful it
// returns the time it took to upload the sector to the host
func (u *uploader) execute(req *sectorUploadReq) (time.Duration, error) {
// grab fields
u.mu.Lock()
Expand Down Expand Up @@ -233,19 +248,17 @@ func (u *uploader) execute(req *sectorUploadReq) (time.Duration, error) {

// update the bus
if err := u.os.AddUploadingSector(ctx, req.uploadID, fcid, req.sector.root); err != nil {
return 0, fmt.Errorf("failed to add uploading sector to contract %v, err: %v", fcid, err)
return 0, fmt.Errorf("failed to add uploading sector to contract %v; %w", fcid, err)
}

// upload the sector
start := time.Now()
err = host.UploadSector(ctx, req.sector.root, req.sector.sectorData(), rev)
if err != nil {
return 0, fmt.Errorf("failed to upload sector to contract %v, err: %v", fcid, err)
return 0, fmt.Errorf("failed to upload sector to contract %v; %w", fcid, err)
}

// calculate elapsed time
elapsed := time.Since(start)
return elapsed, nil
return time.Since(start), nil
}

func (u *uploader) pop() *sectorUploadReq {
Expand All @@ -268,21 +281,34 @@ func (u *uploader) signalWork() {
}
}

func (u *uploader) trackSectorUpload(err error, d time.Duration) {
func (u *uploader) trackConsecutiveFailures(success bool) {
u.mu.Lock()
defer u.mu.Unlock()
if err != nil {
u.consecutiveFailures++
u.statsSectorUploadEstimateInMS.Track(float64(time.Hour.Milliseconds()))
} else {
ms := d.Milliseconds()
if ms == 0 {
ms = 1 // avoid division by zero
}

// update consecutive failures
if success {
u.consecutiveFailures = 0
u.statsSectorUploadEstimateInMS.Track(float64(ms)) // duration in ms
} else {
u.consecutiveFailures++
}
}

func (u *uploader) trackSectorUpload(success bool, d time.Duration) {
u.mu.Lock()
defer u.mu.Unlock()

// sanitize input
ms := d.Milliseconds()
if ms == 0 {
ms = 1 // avoid division by zero
}

// update estimates
if success {
u.statsSectorUploadEstimateInMS.Track(float64(ms))
u.statsSectorUploadSpeedBytesPerMS.Track(float64(rhpv2.SectorSize / ms)) // bytes per ms
} else {
u.statsSectorUploadEstimateInMS.Track(float64(ms))
}
}

Expand All @@ -298,17 +324,18 @@ func (u *uploader) tryRecomputeStats() {
u.statsSectorUploadSpeedBytesPerMS.Recompute()
}

func (u *uploader) tryRefresh(ctx context.Context) bool {
func (u *uploader) tryRefresh() error {
// use a sane timeout
ctx, cancel := context.WithTimeout(u.shutdownCtx, 30*time.Second)
defer cancel()

// fetch the renewed contract
renewed, err := u.cs.RenewedContract(ctx, u.ContractID())
if utils.IsErr(err, api.ErrContractNotFound) || utils.IsErr(err, context.Canceled) {
return false
} else if err != nil {
u.logger.Errorf("failed to fetch renewed contract %v, err: %v", u.ContractID(), err)
return false
if err != nil {
return err
}

// renew the uploader with the renewed contract
u.Refresh(renewed)
return true
return nil
}
Loading

0 comments on commit b2b978e

Please sign in to comment.