Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Decorate RPC errors with additional information about whether they are transmission errors or errors returned by the host #1039

Merged
merged 9 commits into from
Mar 13, 2024
11 changes: 6 additions & 5 deletions autopilot/autopilot.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"go.sia.tech/renterd/api"
"go.sia.tech/renterd/build"
"go.sia.tech/renterd/hostdb"
"go.sia.tech/renterd/internal/utils"
"go.sia.tech/renterd/object"
"go.sia.tech/renterd/wallet"
"go.sia.tech/renterd/webhooks"
Expand Down Expand Up @@ -299,7 +300,7 @@ func (ap *Autopilot) Run() error {

// perform maintenance
setChanged, err := ap.c.performContractMaintenance(ap.shutdownCtx, w)
if err != nil && isErr(err, context.Canceled) {
if err != nil && utils.IsErr(err, context.Canceled) {
return
} else if err != nil {
ap.logger.Errorf("contract maintenance failed, err: %v", err)
Expand Down Expand Up @@ -405,9 +406,9 @@ func (ap *Autopilot) blockUntilConfigured(interrupt <-chan time.Time) (configure
cancel()

// if the config was not found, or we were unable to fetch it, keep blocking
if isErr(err, context.Canceled) {
if utils.IsErr(err, context.Canceled) {
return
} else if isErr(err, api.ErrAutopilotNotFound) {
} else if utils.IsErr(err, api.ErrAutopilotNotFound) {
once.Do(func() { ap.logger.Info("autopilot is waiting to be configured...") })
} else if err != nil {
ap.logger.Errorf("autopilot is unable to fetch its configuration from the bus, err: %v", err)
Expand Down Expand Up @@ -438,7 +439,7 @@ func (ap *Autopilot) blockUntilOnline() (online bool) {
online = len(peers) > 0
cancel()

if isErr(err, context.Canceled) {
if utils.IsErr(err, context.Canceled) {
return
} else if err != nil {
ap.logger.Errorf("failed to get peers, err: %v", err)
Expand Down Expand Up @@ -472,7 +473,7 @@ func (ap *Autopilot) blockUntilSynced(interrupt <-chan time.Time) (synced, block
cancel()

// if an error occurred, or if we're not synced, we continue
if isErr(err, context.Canceled) {
if utils.IsErr(err, context.Canceled) {
return
} else if err != nil {
ap.logger.Errorf("failed to get consensus state, err: %v", err)
Expand Down
19 changes: 10 additions & 9 deletions autopilot/contract_pruning.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"go.sia.tech/core/types"
"go.sia.tech/renterd/alerts"
"go.sia.tech/renterd/api"
"go.sia.tech/renterd/internal/utils"
"go.sia.tech/siad/build"
)

Expand Down Expand Up @@ -65,14 +66,14 @@ func (pm pruneMetrics) String() string {
func (pr pruneResult) toAlert() (id types.Hash256, alert *alerts.Alert) {
id = alertIDForContract(alertPruningID, pr.fcid)

if shouldTrigger := pr.err != nil && !((isErr(pr.err, errInvalidMerkleProof) && build.VersionCmp(pr.version, "1.6.0") < 0) ||
isErr(pr.err, api.ErrContractNotFound) || // contract got archived
isErr(pr.err, errConnectionRefused) ||
isErr(pr.err, errConnectionTimedOut) ||
isErr(pr.err, errConnectionResetByPeer) ||
isErr(pr.err, errInvalidHandshakeSignature) ||
isErr(pr.err, errNoRouteToHost) ||
isErr(pr.err, errNoSuchHost)); shouldTrigger {
if shouldTrigger := pr.err != nil && !((utils.IsErr(pr.err, errInvalidMerkleProof) && build.VersionCmp(pr.version, "1.6.0") < 0) ||
utils.IsErr(pr.err, api.ErrContractNotFound) || // contract got archived
utils.IsErr(pr.err, errConnectionRefused) ||
utils.IsErr(pr.err, errConnectionTimedOut) ||
utils.IsErr(pr.err, errConnectionResetByPeer) ||
utils.IsErr(pr.err, errInvalidHandshakeSignature) ||
utils.IsErr(pr.err, errNoRouteToHost) ||
utils.IsErr(pr.err, errNoSuchHost)); shouldTrigger {
alert = newContractPruningFailedAlert(pr.hk, pr.version, pr.fcid, pr.err)
}
return
Expand Down Expand Up @@ -196,7 +197,7 @@ func (c *contractor) pruneContract(w Worker, fcid types.FileContractID) pruneRes
pruned, remaining, err := w.RHPPruneContract(ctx, fcid, timeoutPruneContract)
if err != nil && pruned == 0 {
return pruneResult{fcid: fcid, hk: host.PublicKey, version: host.Settings.Version, err: err}
} else if err != nil && isErr(err, context.DeadlineExceeded) {
} else if err != nil && utils.IsErr(err, context.DeadlineExceeded) {
err = nil
}

Expand Down
5 changes: 3 additions & 2 deletions autopilot/contractor.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"go.sia.tech/core/types"
"go.sia.tech/renterd/api"
"go.sia.tech/renterd/hostdb"
"go.sia.tech/renterd/internal/utils"
"go.sia.tech/renterd/wallet"
"go.sia.tech/renterd/worker"
"go.uber.org/zap"
Expand Down Expand Up @@ -1425,7 +1426,7 @@ func (c *contractor) renewContract(ctx context.Context, w Worker, ci contractInf
"renterFunds", renterFunds,
"expectedNewStorage", expectedNewStorage,
)
if strings.Contains(err.Error(), wallet.ErrInsufficientBalance.Error()) {
if utils.IsErr(err, wallet.ErrInsufficientBalance) && !worker.IsErrHost(err) {
return api.ContractMetadata{}, false, err
}
return api.ContractMetadata{}, true, err
Expand Down Expand Up @@ -1508,7 +1509,7 @@ func (c *contractor) refreshContract(ctx context.Context, w Worker, ci contractI
return api.ContractMetadata{}, true, err
}
c.logger.Errorw("refresh failed", zap.Error(err), "hk", hk, "fcid", fcid)
if strings.Contains(err.Error(), wallet.ErrInsufficientBalance.Error()) {
if utils.IsErr(err, wallet.ErrInsufficientBalance) && !worker.IsErrHost(err) {
return api.ContractMetadata{}, false, err
}
return api.ContractMetadata{}, true, err
Expand Down
10 changes: 2 additions & 8 deletions autopilot/ipfilter.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"go.sia.tech/core/types"
"go.sia.tech/renterd/internal/utils"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -137,7 +138,7 @@ func (r *ipResolver) lookup(hostIP string) ([]string, error) {
addrs, err := r.resolver.LookupIPAddr(ctx, host)
if err != nil {
// check the cache if it's an i/o timeout or server misbehaving error
if isErr(err, errIOTimeout) || isErr(err, errServerMisbehaving) {
if utils.IsErr(err, errIOTimeout) || utils.IsErr(err, errServerMisbehaving) {
if entry, found := r.cache[hostIP]; found && time.Since(entry.created) < ipCacheEntryValidity {
r.logger.Debugf("using cached IP addresses for %v, err: %v", hostIP, err)
return entry.subnets, nil
Expand Down Expand Up @@ -188,10 +189,3 @@ func parseSubnets(addresses []net.IPAddr) []string {

return subnets
}

func isErr(err error, target error) bool {
if errors.Is(err, target) {
return true
}
return err != nil && target != nil && strings.Contains(err.Error(), target.Error())
}
13 changes: 7 additions & 6 deletions autopilot/ipfilter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"go.sia.tech/core/types"
"go.sia.tech/renterd/internal/utils"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -61,20 +62,20 @@ func TestIPResolver(t *testing.T) {

// test lookup error
r.setNextErr(errors.New("unknown error"))
if _, err := ipr.lookup("example.com:1234"); !isErr(err, errors.New("unknown error")) {
if _, err := ipr.lookup("example.com:1234"); !utils.IsErr(err, errors.New("unknown error")) {
t.Fatal("unexpected error", err)
}

// test IO timeout - no cache entry
r.setNextErr(errIOTimeout)
if _, err := ipr.lookup("example.com:1234"); !isErr(err, errIOTimeout) {
if _, err := ipr.lookup("example.com:1234"); !utils.IsErr(err, errIOTimeout) {
t.Fatal("unexpected error", err)
}

// test IO timeout - expired cache entry
ipr.cache["example.com:1234"] = ipCacheEntry{subnets: []string{"a"}}
r.setNextErr(errIOTimeout)
if _, err := ipr.lookup("example.com:1234"); !isErr(err, errIOTimeout) {
if _, err := ipr.lookup("example.com:1234"); !utils.IsErr(err, errIOTimeout) {
t.Fatal("unexpected error", err)
}

Expand All @@ -89,19 +90,19 @@ func TestIPResolver(t *testing.T) {

// test too many addresses - more than two
r.setAddr("example.com", []net.IPAddr{{}, {}, {}})
if _, err := ipr.lookup("example.com:1234"); !isErr(err, errTooManyAddresses) {
if _, err := ipr.lookup("example.com:1234"); !utils.IsErr(err, errTooManyAddresses) {
t.Fatal("unexpected error", err)
}

// test too many addresses - two of the same type
r.setAddr("example.com", []net.IPAddr{{IP: net.IPv4(1, 2, 3, 4)}, {IP: net.IPv4(1, 2, 3, 4)}})
if _, err := ipr.lookup("example.com:1234"); !isErr(err, errTooManyAddresses) {
if _, err := ipr.lookup("example.com:1234"); !utils.IsErr(err, errTooManyAddresses) {
t.Fatal("unexpected error", err)
}

// test invalid addresses
r.setAddr("example.com", []net.IPAddr{{IP: ipv4Localhost}, {IP: net.IP{127, 0, 0, 2}}})
if _, err := ipr.lookup("example.com:1234"); !isErr(err, errTooManyAddresses) {
if _, err := ipr.lookup("example.com:1234"); !utils.IsErr(err, errTooManyAddresses) {
t.Fatal("unexpected error", err)
}

Expand Down
3 changes: 2 additions & 1 deletion autopilot/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"go.sia.tech/renterd/api"
"go.sia.tech/renterd/internal/utils"
"go.sia.tech/renterd/object"
"go.sia.tech/renterd/stats"
"go.uber.org/zap"
Expand Down Expand Up @@ -156,7 +157,7 @@ func (m *migrator) performMigrations(p *workerPool) {

if err != nil {
m.logger.Errorf("%v: migration %d/%d failed, key: %v, health: %v, overpaid: %v, err: %v", id, j.slabIdx+1, j.batchSize, j.Key, j.Health, res.SurchargeApplied, err)
skipAlert := isErr(err, api.ErrSlabNotFound)
skipAlert := utils.IsErr(err, api.ErrSlabNotFound)
if !skipAlert {
if res.SurchargeApplied {
m.ap.RegisterAlert(ctx, newCriticalMigrationFailedAlert(j.Key, j.Health, err))
Expand Down
3 changes: 2 additions & 1 deletion autopilot/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"go.sia.tech/core/types"
"go.sia.tech/renterd/api"
"go.sia.tech/renterd/hostdb"
"go.sia.tech/renterd/internal/utils"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -314,7 +315,7 @@ func (s *scanner) launchScanWorkers(ctx context.Context, w scanWorker, reqs chan
scan, err := w.RHPScan(ctx, req.hostKey, req.hostIP, s.currentTimeout())
if err != nil {
break // abort
} else if !isErr(errors.New(scan.ScanError), errIOTimeout) && scan.Ping > 0 {
} else if !utils.IsErr(errors.New(scan.ScanError), errIOTimeout) && scan.Ping > 0 {
s.tracker.addDataPoint(time.Duration(scan.Ping))
}

Expand Down
20 changes: 20 additions & 0 deletions internal/utils/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package utils

import (
"errors"
"strings"
)

// IsErr can be used to compare an error to a target and also works when used on
// errors that haven't been wrapped since it will fall back to a string
// comparison. Useful to check errors returned over the network.
func IsErr(err error, target error) bool {
if (err == nil) != (target == nil) {
return false
} else if errors.Is(err, target) {
return true
}
// TODO: we can get rid of the lower casing once siad is gone and
// renterd/hostd use the same error messages
return strings.Contains(strings.ToLower(err.Error()), strings.ToLower(target.Error()))
}
75 changes: 56 additions & 19 deletions worker/rhpv3.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"math"
"math/big"
"net"
"strings"
"sync"
"time"

Expand All @@ -20,6 +19,7 @@ import (
"go.sia.tech/mux/v1"
"go.sia.tech/renterd/api"
"go.sia.tech/renterd/hostdb"
"go.sia.tech/renterd/internal/utils"
"go.sia.tech/siad/crypto"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -47,6 +47,12 @@ const (
)

var (
// errHost is used to wrap rpc errors returned by the host.
errHost = errors.New("host responded with error")

// errTransport is used to wrap rpc errors caused by the transport.
errTransport = errors.New("transport error")

// errBalanceInsufficient occurs when a withdrawal failed because the
// account balance was insufficient.
errBalanceInsufficient = errors.New("ephemeral account balance was insufficient")
Expand Down Expand Up @@ -83,31 +89,42 @@ var (
errWithdrawalExpired = errors.New("withdrawal request expired")
)

func isBalanceInsufficient(err error) bool { return isError(err, errBalanceInsufficient) }
func isBalanceMaxExceeded(err error) bool { return isError(err, errBalanceMaxExceeded) }
// IsErrHost indicates whether an error was returned by a host as part of an RPC.
func IsErrHost(err error) bool {
ChrisSchinnerl marked this conversation as resolved.
Show resolved Hide resolved
return utils.IsErr(err, errHost)
}

func isBalanceInsufficient(err error) bool { return utils.IsErr(err, errBalanceInsufficient) }
func isBalanceMaxExceeded(err error) bool { return utils.IsErr(err, errBalanceMaxExceeded) }
func isClosedStream(err error) bool {
return isError(err, mux.ErrClosedStream) || isError(err, net.ErrClosed)
return utils.IsErr(err, mux.ErrClosedStream) || utils.IsErr(err, net.ErrClosed)
}
func isInsufficientFunds(err error) bool { return isError(err, ErrInsufficientFunds) }
func isPriceTableExpired(err error) bool { return isError(err, errPriceTableExpired) }
func isPriceTableGouging(err error) bool { return isError(err, errPriceTableGouging) }
func isPriceTableNotFound(err error) bool { return isError(err, errPriceTableNotFound) }
func isInsufficientFunds(err error) bool { return utils.IsErr(err, ErrInsufficientFunds) }
func isPriceTableExpired(err error) bool { return utils.IsErr(err, errPriceTableExpired) }
func isPriceTableGouging(err error) bool { return utils.IsErr(err, errPriceTableGouging) }
func isPriceTableNotFound(err error) bool { return utils.IsErr(err, errPriceTableNotFound) }
func isSectorNotFound(err error) bool {
return isError(err, errSectorNotFound) || isError(err, errSectorNotFoundOld)
return utils.IsErr(err, errSectorNotFound) || utils.IsErr(err, errSectorNotFoundOld)
}
func isWithdrawalsInactive(err error) bool { return isError(err, errWithdrawalsInactive) }
func isWithdrawalExpired(err error) bool { return isError(err, errWithdrawalExpired) }
func isWithdrawalsInactive(err error) bool { return utils.IsErr(err, errWithdrawalsInactive) }
func isWithdrawalExpired(err error) bool { return utils.IsErr(err, errWithdrawalExpired) }

func isError(err error, target error) bool {
if err == nil {
return err == target
// wrapRPCErr extracts the innermost error, wraps it in either a errHost or
// errTransport and finally wraps it using the provided fnName.
func wrapRPCErr(err *error, fnName string) {
if *err == nil {
return
}
innerErr := *err
for errors.Unwrap(innerErr) != nil {
innerErr = errors.Unwrap(innerErr)
}
// compare error first
if errors.Is(err, target) {
return true
if errors.As(*err, new(*rhpv3.RPCError)) {
*err = fmt.Errorf("%w: '%w'", errHost, innerErr)
} else {
*err = fmt.Errorf("%w: '%w'", errTransport, innerErr)
}
// then compare the string in case the error was returned by a host
return strings.Contains(strings.ToLower(err.Error()), strings.ToLower(target.Error()))
ChrisSchinnerl marked this conversation as resolved.
Show resolved Hide resolved
*err = fmt.Errorf("%s: %w", fnName, *err)
}

// transportV3 is a reference-counted wrapper for rhpv3.Transport.
Expand All @@ -125,6 +142,26 @@ type streamV3 struct {
*rhpv3.Stream
}

func (s *streamV3) ReadResponse(resp rhpv3.ProtocolObject, maxLen uint64) (err error) {
defer wrapRPCErr(&err, "ReadResponse")
return s.Stream.ReadResponse(resp, maxLen)
}

func (s *streamV3) WriteResponse(resp rhpv3.ProtocolObject) (err error) {
defer wrapRPCErr(&err, "WriteResponse")
return s.Stream.WriteResponse(resp)
}

func (s *streamV3) ReadRequest(req rhpv3.ProtocolObject, maxLen uint64) (err error) {
defer wrapRPCErr(&err, "ReadRequest")
return s.Stream.ReadRequest(req, maxLen)
}

func (s *streamV3) WriteRequest(rpcID types.Specifier, req rhpv3.ProtocolObject) (err error) {
defer wrapRPCErr(&err, "WriteRequest")
return s.Stream.WriteRequest(rpcID, req)
}

// Close closes the stream and cancels the goroutine launched by DialStream.
func (s *streamV3) Close() error {
s.cancel()
Expand Down
Loading
Loading