diff --git a/worker/rhpv2.go b/worker/rhpv2.go index 1a6bd3cfd..e0bc11abb 100644 --- a/worker/rhpv2.go +++ b/worker/rhpv2.go @@ -14,6 +14,7 @@ import ( rhpv2 "go.sia.tech/core/rhp/v2" "go.sia.tech/core/types" "go.sia.tech/renterd/api" + "go.sia.tech/renterd/internal/utils" "go.sia.tech/siad/build" "go.sia.tech/siad/crypto" "lukechampine.com/frand" @@ -85,9 +86,12 @@ func (hes HostErrorSet) Error() string { return "\n" + strings.Join(strs, "\n") } -func wrapErr(err *error, fnName string) { +func wrapErr(ctx context.Context, fnName string, err *error) { if *err != nil { *err = fmt.Errorf("%s: %w", fnName, *err) + if cause := context.Cause(ctx); cause != nil && !utils.IsErr(*err, cause) { + *err = fmt.Errorf("%w; %w", cause, *err) + } } } @@ -133,7 +137,7 @@ func updateRevisionOutputs(rev *types.FileContractRevision, cost, collateral typ // RPCSettings calls the Settings RPC, returning the host's reported settings. func RPCSettings(ctx context.Context, t *rhpv2.Transport) (settings rhpv2.HostSettings, err error) { - defer wrapErr(&err, "Settings") + defer wrapErr(ctx, "Settings", &err) var resp rhpv2.RPCSettingsResponse if err := t.Call(rhpv2.RPCSettingsID, nil, &resp); err != nil { @@ -147,7 +151,7 @@ func RPCSettings(ctx context.Context, t *rhpv2.Transport) (settings rhpv2.HostSe // RPCFormContract forms a contract with a host. func RPCFormContract(ctx context.Context, t *rhpv2.Transport, renterKey types.PrivateKey, txnSet []types.Transaction) (_ rhpv2.ContractRevision, _ []types.Transaction, err error) { - defer wrapErr(&err, "FormContract") + defer wrapErr(ctx, "FormContract", &err) // strip our signatures before sending parents, txn := txnSet[:len(txnSet)-1], txnSet[len(txnSet)-1] diff --git a/worker/rhpv3.go b/worker/rhpv3.go index dc483c340..5f1956b02 100644 --- a/worker/rhpv3.go +++ b/worker/rhpv3.go @@ -52,6 +52,9 @@ var ( // errTransport is used to wrap rpc errors caused by the transport. errTransport = errors.New("transport error") + // errDialTransport is returned when the worker could not dial the host. + errDialTransport = errors.New("could not dial transport") + // errBalanceInsufficient occurs when a withdrawal failed because the // account balance was insufficient. errBalanceInsufficient = errors.New("ephemeral account balance was insufficient") @@ -175,7 +178,7 @@ func (t *transportV3) DialStream(ctx context.Context) (*streamV3, error) { newTransport, err := dialTransport(ctx, t.siamuxAddr, t.hostKey) if err != nil { t.mu.Unlock() - return nil, fmt.Errorf("DialStream: could not dial transport: %w (%v)", err, time.Since(start)) + return nil, fmt.Errorf("DialStream: %w: %w (%v)", errDialTransport, err, time.Since(start)) } t.t = newTransport } @@ -623,7 +626,7 @@ type PriceTablePaymentFunc func(pt rhpv3.HostPriceTable) (rhpv3.PaymentMethod, e // RPCPriceTable calls the UpdatePriceTable RPC. func RPCPriceTable(ctx context.Context, t *transportV3, paymentFunc PriceTablePaymentFunc) (_ api.HostPriceTable, err error) { - defer wrapErr(&err, "PriceTable") + defer wrapErr(ctx, "PriceTable", &err) s, err := t.DialStream(ctx) if err != nil { @@ -660,7 +663,7 @@ func RPCPriceTable(ctx context.Context, t *transportV3, paymentFunc PriceTablePa // RPCAccountBalance calls the AccountBalance RPC. func RPCAccountBalance(ctx context.Context, t *transportV3, payment rhpv3.PaymentMethod, account rhpv3.Account, settingsID rhpv3.SettingsID) (bal types.Currency, err error) { - defer wrapErr(&err, "AccountBalance") + defer wrapErr(ctx, "AccountBalance", &err) s, err := t.DialStream(ctx) if err != nil { return types.ZeroCurrency, err @@ -685,7 +688,7 @@ func RPCAccountBalance(ctx context.Context, t *transportV3, payment rhpv3.Paymen // RPCFundAccount calls the FundAccount RPC. func RPCFundAccount(ctx context.Context, t *transportV3, payment rhpv3.PaymentMethod, account rhpv3.Account, settingsID rhpv3.SettingsID) (err error) { - defer wrapErr(&err, "FundAccount") + defer wrapErr(ctx, "FundAccount", &err) s, err := t.DialStream(ctx) if err != nil { return err @@ -712,7 +715,7 @@ func RPCFundAccount(ctx context.Context, t *transportV3, payment rhpv3.PaymentMe // fetching a pricetable using the fetched revision to pay for it. If // paymentFunc returns 'nil' as payment, the host is not paid. func RPCLatestRevision(ctx context.Context, t *transportV3, contractID types.FileContractID, paymentFunc func(rev *types.FileContractRevision) (rhpv3.HostPriceTable, rhpv3.PaymentMethod, error)) (_ types.FileContractRevision, err error) { - defer wrapErr(&err, "LatestRevision") + defer wrapErr(ctx, "LatestRevision", &err) s, err := t.DialStream(ctx) if err != nil { return types.FileContractRevision{}, err @@ -738,7 +741,7 @@ func RPCLatestRevision(ctx context.Context, t *transportV3, contractID types.Fil // RPCReadSector calls the ExecuteProgram RPC with a ReadSector instruction. func RPCReadSector(ctx context.Context, t *transportV3, w io.Writer, pt rhpv3.HostPriceTable, payment rhpv3.PaymentMethod, offset, length uint32, merkleRoot types.Hash256) (cost, refund types.Currency, err error) { - defer wrapErr(&err, "ReadSector") + defer wrapErr(ctx, "ReadSector", &err) s, err := t.DialStream(ctx) if err != nil { return types.ZeroCurrency, types.ZeroCurrency, err @@ -803,7 +806,7 @@ func RPCReadSector(ctx context.Context, t *transportV3, w io.Writer, pt rhpv3.Ho } func RPCAppendSector(ctx context.Context, t *transportV3, renterKey types.PrivateKey, pt rhpv3.HostPriceTable, rev *types.FileContractRevision, payment rhpv3.PaymentMethod, sectorRoot types.Hash256, sector *[rhpv2.SectorSize]byte) (cost types.Currency, err error) { - defer wrapErr(&err, "AppendSector") + defer wrapErr(ctx, "AppendSector", &err) // sanity check revision first if rev.RevisionNumber == math.MaxUint64 { @@ -941,7 +944,8 @@ func RPCAppendSector(ctx context.Context, t *transportV3, renterKey types.Privat } func RPCRenew(ctx context.Context, rrr api.RHPRenewRequest, bus Bus, t *transportV3, pt *rhpv3.HostPriceTable, rev types.FileContractRevision, renterKey types.PrivateKey, l *zap.SugaredLogger) (_ rhpv2.ContractRevision, _ []types.Transaction, _ types.Currency, err error) { - defer wrapErr(&err, "RPCRenew") + defer wrapErr(ctx, "RPCRenew", &err) + s, err := t.DialStream(ctx) if err != nil { return rhpv2.ContractRevision{}, nil, types.ZeroCurrency, fmt.Errorf("failed to dial stream: %w", err) diff --git a/worker/upload.go b/worker/upload.go index 4a97099bb..aa76964af 100644 --- a/worker/upload.go +++ b/worker/upload.go @@ -30,10 +30,11 @@ const ( ) var ( - errContractExpired = errors.New("contract expired") - errNoCandidateUploader = errors.New("no candidate uploader found") - errNotEnoughContracts = errors.New("not enough contracts to support requested redundancy") - errUploadInterrupted = errors.New("upload was interrupted") + errContractExpired = errors.New("contract expired") + errNoCandidateUploader = errors.New("no candidate uploader found") + errNotEnoughContracts = errors.New("not enough contracts to support requested redundancy") + errUploadInterrupted = errors.New("upload was interrupted") + errSectorUploadFinished = errors.New("sector upload already finished") ) type ( @@ -117,7 +118,7 @@ type ( root types.Hash256 ctx context.Context - cancel context.CancelFunc + cancel context.CancelCauseFunc mu sync.Mutex uploaded object.Sector @@ -750,7 +751,7 @@ func (u *upload) newSlabUpload(ctx context.Context, shards [][]byte, uploaders [ wg.Add(1) go func(idx int) { // create the ctx - sCtx, sCancel := context.WithCancel(ctx) + sCtx, sCancel := context.WithCancelCause(ctx) // create the sector // NOTE: we are computing the sector root here and pass it all the @@ -1087,7 +1088,7 @@ func (s *sectorUpload) finish(sector object.Sector) { s.mu.Lock() defer s.mu.Unlock() - s.cancel() + s.cancel(errSectorUploadFinished) s.uploaded = sector s.data = nil } diff --git a/worker/uploader.go b/worker/uploader.go index 80bd2393b..49e6922f9 100644 --- a/worker/uploader.go +++ b/worker/uploader.go @@ -116,9 +116,8 @@ outer: } // execute it - elapsed, err := u.execute(req) - - // the uploader's contract got renewed, requeue the request + start := time.Now() + duration, err := u.execute(req) if errors.Is(err, errMaxRevisionReached) { if u.tryRefresh(req.sector.ctx) { u.enqueue(req) @@ -126,6 +125,11 @@ outer: } } + // track stats + success, failure, uploadEstimateMS, uploadSpeedBytesPerMS := handleSectorUpload(err, duration, time.Since(start), req.overdrive, u.logger) + u.trackSectorUploadStats(uploadEstimateMS, uploadSpeedBytesPerMS) + u.trackConsecutiveFailures(success, failure) + // send the response select { case <-req.sector.ctx.Done(): @@ -134,16 +138,44 @@ outer: err: err, }: } + } + } +} - // track the error, ignore gracefully closed streams and canceled overdrives - canceledOverdrive := req.done() && req.overdrive && err != nil - if !canceledOverdrive && !isClosedStream(err) { - u.trackSectorUpload(err, elapsed) - } else { - u.logger.Debugw("not tracking sector upload metric", zap.Error(err)) - } +func handleSectorUpload(uploadErr error, uploadDuration, totalDuration time.Duration, overdrive bool, logger *zap.SugaredLogger) (success bool, failure bool, uploadEstimateMS float64, uploadSpeedBytesPerMS float64) { + // special case, uploader will refresh and the request will be requeued + if errors.Is(uploadErr, errMaxRevisionReached) { + logger.Debugw("sector upload failure was ignored", "uploadError", uploadErr, "uploadDuration", uploadDuration, "totalDuration", totalDuration, "overdrive", overdrive) + return false, false, 0, 0 + } + + // happy case, upload was successful + if uploadErr == nil { + ms := uploadDuration.Milliseconds() + if ms == 0 { + ms = 1 // avoid division by zero } + return true, false, float64(ms), float64(rhpv2.SectorSize / ms) } + + // upload failed because the sector was already uploaded by another host, in + // this case we want to punish the host for being too slow but only when we + // weren't overdriving or when it took too long to dial + if errors.Is(uploadErr, errSectorUploadFinished) { + slowDial := errors.Is(uploadErr, errDialTransport) && totalDuration > time.Second + if !overdrive || slowDial { + failure = overdrive + uploadEstimateMS = float64(totalDuration.Milliseconds() * 10) + logger.Debugw("sector upload failure was penalised", "uploadError", uploadErr, "uploadDuration", uploadDuration, "totalDuration", totalDuration, "overdrive", overdrive, "penalty", totalDuration.Milliseconds()*10) + } else { + logger.Debugw("sector upload failure was ignored", "uploadError", uploadErr, "uploadDuration", uploadDuration, "totalDuration", totalDuration, "overdrive", overdrive) + } + return false, failure, uploadEstimateMS, 0 + } + + // in all other cases we want to punish the host for failing the upload + logger.Debugw("sector upload failure was penalised", "uploadError", uploadErr, "uploadDuration", uploadDuration, "totalDuration", totalDuration, "overdrive", overdrive, "penalty", time.Hour) + return false, true, float64(time.Hour.Milliseconds()), 0 } func (u *uploader) Stop(err error) { @@ -198,6 +230,8 @@ func (u *uploader) estimate() float64 { return numSectors * estimateP90 } +// execute executes the sector upload request, if the upload was successful it +// returns the time it took to upload the sector to the host func (u *uploader) execute(req *sectorUploadReq) (time.Duration, error) { // grab fields u.mu.Lock() @@ -233,19 +267,17 @@ func (u *uploader) execute(req *sectorUploadReq) (time.Duration, error) { // update the bus if err := u.os.AddUploadingSector(ctx, req.uploadID, fcid, req.sector.root); err != nil { - return 0, fmt.Errorf("failed to add uploading sector to contract %v, err: %v", fcid, err) + return 0, fmt.Errorf("failed to add uploading sector to contract %v; %w", fcid, err) } // upload the sector start := time.Now() err = host.UploadSector(ctx, req.sector.root, req.sector.sectorData(), rev) if err != nil { - return 0, fmt.Errorf("failed to upload sector to contract %v, err: %v", fcid, err) + return 0, fmt.Errorf("failed to upload sector to contract %v; %w", fcid, err) } - // calculate elapsed time - elapsed := time.Since(start) - return elapsed, nil + return time.Since(start), nil } func (u *uploader) pop() *sectorUploadReq { @@ -268,21 +300,26 @@ func (u *uploader) signalWork() { } } -func (u *uploader) trackSectorUpload(err error, d time.Duration) { +func (u *uploader) trackConsecutiveFailures(success, failure bool) { u.mu.Lock() defer u.mu.Unlock() - if err != nil { - u.consecutiveFailures++ - u.statsSectorUploadEstimateInMS.Track(float64(time.Hour.Milliseconds())) - } else { - ms := d.Milliseconds() - if ms == 0 { - ms = 1 // avoid division by zero - } + if success { u.consecutiveFailures = 0 - u.statsSectorUploadEstimateInMS.Track(float64(ms)) // duration in ms - u.statsSectorUploadSpeedBytesPerMS.Track(float64(rhpv2.SectorSize / ms)) // bytes per ms + } else if failure { + u.consecutiveFailures++ + } +} + +func (u *uploader) trackSectorUploadStats(uploadEstimateMS, uploadSpeedBytesPerMS float64) { + u.mu.Lock() + defer u.mu.Unlock() + + if uploadEstimateMS > 0 { + u.statsSectorUploadEstimateInMS.Track(uploadEstimateMS) + } + if uploadSpeedBytesPerMS > 0 { + u.statsSectorUploadSpeedBytesPerMS.Track(uploadSpeedBytesPerMS) } } diff --git a/worker/uploader_test.go b/worker/uploader_test.go index b203827a5..93c61f2d6 100644 --- a/worker/uploader_test.go +++ b/worker/uploader_test.go @@ -3,8 +3,12 @@ package worker import ( "context" "errors" + "fmt" "testing" "time" + + rhpv2 "go.sia.tech/core/rhp/v2" + "go.uber.org/zap" ) func TestUploaderStopped(t *testing.T) { @@ -32,3 +36,58 @@ func TestUploaderStopped(t *testing.T) { t.Fatal("no response") } } + +func TestHandleSectorUpload(t *testing.T) { + ms := time.Millisecond + ss := float64(rhpv2.SectorSize) + overdrive := true + regular := false + + errHostError := errors.New("some host error") + errSectorUploadFinishedAndDial := fmt.Errorf("%w;%w", errDialTransport, errSectorUploadFinished) + + cases := []struct { + // input + uploadErr error + uploadDur time.Duration + totalDur time.Duration + overdrive bool + + // expected output + success bool + failure bool + uploadEstimateMS float64 + uploadSpeedBytesPerMS float64 + }{ + // happy case + {nil, ms, ms, regular, true, false, 1, ss}, + {nil, ms, ms, overdrive, true, false, 1, ss}, + + // renewed contract case + {errMaxRevisionReached, 0, ms, regular, false, false, 0, 0}, + {errMaxRevisionReached, 0, ms, overdrive, false, false, 0, 0}, + + // sector already uploaded case + {errSectorUploadFinished, ms, ms, regular, false, false, 10, 0}, + {errSectorUploadFinished, ms, ms, overdrive, false, false, 0, 0}, + {errSectorUploadFinishedAndDial, ms, ms, overdrive, false, false, 0, 0}, + {errSectorUploadFinishedAndDial, ms, 1001 * ms, overdrive, false, true, 10010, 0}, + + // host failure + {errHostError, ms, ms, regular, false, true, 3600000, 0}, + {errHostError, ms, ms, overdrive, false, true, 3600000, 0}, + } + + for i, c := range cases { + success, failure, uploadEstimateMS, uploadSpeedBytesPerMS := handleSectorUpload(c.uploadErr, c.uploadDur, c.totalDur, c.overdrive, zap.NewNop().Sugar()) + if success != c.success { + t.Fatalf("case %d failed: expected success %v, got %v", i+1, c.success, success) + } else if failure != c.failure { + t.Fatalf("case %d failed: expected failure %v, got %v", i+1, c.failure, failure) + } else if uploadEstimateMS != c.uploadEstimateMS { + t.Fatalf("case %d failed: expected uploadEstimateMS %v, got %v", i+1, c.uploadEstimateMS, uploadEstimateMS) + } else if uploadSpeedBytesPerMS != c.uploadSpeedBytesPerMS { + t.Fatalf("case %d failed: expected uploadSpeedBytesPerMS %v, got %v", i+1, c.uploadSpeedBytesPerMS, uploadSpeedBytesPerMS) + } + } +}