Skip to content

Commit

Permalink
Merge pull request #1029 from SiaFoundation/chris/scan-logging
Browse files Browse the repository at this point in the history
Improve logging for failed host scans
  • Loading branch information
ChrisSchinnerl authored Mar 7, 2024
2 parents 7a89292 + e4ab18a commit 79f6d66
Show file tree
Hide file tree
Showing 6 changed files with 168 additions and 34 deletions.
2 changes: 1 addition & 1 deletion api/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,6 @@ func (opts HostsForScanningOptions) Apply(values url.Values) {
values.Set("limit", fmt.Sprint(opts.Limit))
}
if !opts.MaxLastScan.IsZero() {
values.Set("maxLastScan", fmt.Sprint(TimeRFC3339(opts.MaxLastScan)))
values.Set("lastScan", fmt.Sprint(TimeRFC3339(opts.MaxLastScan)))
}
}
4 changes: 4 additions & 0 deletions api/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ var (
// ErrContractSetNotSpecified is returned by the worker API by endpoints that
// need a contract set to be able to upload data.
ErrContractSetNotSpecified = errors.New("contract set is not specified")

// ErrHostOnPrivateNetwork is returned by the worker API when a host can't
// be scanned since it is on a private network.
ErrHostOnPrivateNetwork = errors.New("host is on a private network")
)

type (
Expand Down
13 changes: 8 additions & 5 deletions autopilot/contractor.go
Original file line number Diff line number Diff line change
Expand Up @@ -752,11 +752,14 @@ func (c *contractor) runContractChecks(ctx context.Context, w Worker, contracts
continue
}

// if the host doesn't have a valid pricetable, update it
var invalidPT bool
if err := refreshPriceTable(ctx, w, &host.Host); err != nil {
c.logger.Errorf("could not fetch price table for host %v: %v", host.PublicKey, err)
invalidPT = true
// if the host doesn't have a valid pricetable, update it if we were
// able to obtain a revision
invalidPT := contract.Revision == nil
if contract.Revision != nil {
if err := refreshPriceTable(ctx, w, &host.Host); err != nil {
c.logger.Errorf("could not fetch price table for host %v: %v", host.PublicKey, err)
invalidPT = true
}
}

// refresh the consensus state
Expand Down
13 changes: 8 additions & 5 deletions internal/test/e2e/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ type testClusterOptions struct {
logger *zap.Logger
uploadPacking bool
skipSettingAutopilot bool
skipRunningAutopilot bool
walletKey *types.PrivateKey

autopilotCfg *node.AutopilotConfig
Expand Down Expand Up @@ -393,11 +394,13 @@ func newTestCluster(t *testing.T, opts testClusterOptions) *TestCluster {
_ = autopilotServer.Serve(autopilotListener)
cluster.wg.Done()
}()
cluster.wg.Add(1)
go func() {
_ = aStartFn()
cluster.wg.Done()
}()
if !opts.skipRunningAutopilot {
cluster.wg.Add(1)
go func() {
_ = aStartFn()
cluster.wg.Done()
}()
}

// Set the test contract set to make sure we can add objects at the
// beginning of a test right away.
Expand Down
104 changes: 104 additions & 0 deletions internal/test/e2e/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2448,3 +2448,107 @@ func TestMultipartUploadWrappedByPartialSlabs(t *testing.T) {
t.Fatal("unexpected data")
}
}

func TestHostScan(t *testing.T) {
// New cluster with autopilot disabled
cfg := clusterOptsDefault
cfg.skipRunningAutopilot = true
cluster := newTestCluster(t, cfg)
defer cluster.Shutdown()

b := cluster.Bus
w := cluster.Worker
tt := cluster.tt

// add 2 hosts to the cluster, 1 to scan and 1 to make sure we always have 1
// peer and consider ourselves connected to the internet
hosts := cluster.AddHosts(2)
host := hosts[0]

settings, err := host.RHPv2Settings()
tt.OK(err)

hk := host.PublicKey()
hostIP := settings.NetAddress

assertHost := func(ls time.Time, lss, slss bool, ts uint64) {
t.Helper()

hi, err := b.Host(context.Background(), host.PublicKey())
tt.OK(err)

if ls.IsZero() && !hi.Interactions.LastScan.IsZero() {
t.Fatal("expected last scan to be zero")
} else if !ls.IsZero() && !hi.Interactions.LastScan.After(ls) {
t.Fatal("expected last scan to be after", ls)
} else if hi.Interactions.LastScanSuccess != lss {
t.Fatalf("expected last scan success to be %v, got %v", lss, hi.Interactions.LastScanSuccess)
} else if hi.Interactions.SecondToLastScanSuccess != slss {
t.Fatalf("expected second to last scan success to be %v, got %v", slss, hi.Interactions.SecondToLastScanSuccess)
} else if hi.Interactions.TotalScans != ts {
t.Fatalf("expected total scans to be %v, got %v", ts, hi.Interactions.TotalScans)
}
}

scanHost := func() error {
// timing on the CI can be weird, wait a bit to make sure time passes
// between scans
time.Sleep(time.Millisecond)

resp, err := w.RHPScan(context.Background(), hk, hostIP, 10*time.Second)
tt.OK(err)
if resp.ScanError != "" {
return errors.New(resp.ScanError)
}
return nil
}

assertHost(time.Time{}, false, false, 0)

// scan the host the first time
ls := time.Now()
if err := scanHost(); err != nil {
t.Fatal(err)
}
assertHost(ls, true, false, 1)

// scan the host the second time
ls = time.Now()
if err := scanHost(); err != nil {
t.Fatal(err)
}
assertHost(ls, true, true, 2)

// close the host to make scans fail
tt.OK(host.Close())

// scan the host a third time
ls = time.Now()
if err := scanHost(); err == nil {
t.Fatal("expected scan error")
}
assertHost(ls, false, true, 3)

// fetch hosts for scanning with maxLastScan set to now which should return
// all hosts
tt.Retry(100, 100*time.Millisecond, func() error {
toScan, err := b.HostsForScanning(context.Background(), api.HostsForScanningOptions{
MaxLastScan: api.TimeRFC3339(time.Now()),
})
tt.OK(err)
if len(toScan) != 2 {
return fmt.Errorf("expected 2 hosts, got %v", len(toScan))
}
return nil
})

// fetch hosts again with the unix epoch timestamp which should only return
// 1 host since that one hasn't been scanned yet
toScan, err := b.HostsForScanning(context.Background(), api.HostsForScanningOptions{
MaxLastScan: api.TimeRFC3339(time.Unix(0, 1)),
})
tt.OK(err)
if len(toScan) != 1 {
t.Fatalf("expected 1 hosts, got %v", len(toScan))
}
}
66 changes: 43 additions & 23 deletions worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"math/big"
"net"
"net/http"
"os"
"runtime"
"sort"
"strings"
Expand Down Expand Up @@ -259,13 +260,6 @@ func (w *worker) rhpScanHandler(jc jape.Context) {
return
}

// apply the timeout
if rsr.Timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, time.Duration(rsr.Timeout))
defer cancel()
}

// only scan hosts if we are online
peers, err := w.bus.SyncerPeers(ctx)
if jc.Check("failed to fetch peers from bus", err) != nil {
Expand All @@ -278,7 +272,7 @@ func (w *worker) rhpScanHandler(jc jape.Context) {

// scan host
var errStr string
settings, priceTable, elapsed, err := w.scanHost(ctx, rsr.HostKey, rsr.HostIP)
settings, priceTable, elapsed, err := w.scanHost(ctx, time.Duration(rsr.Timeout), rsr.HostKey, rsr.HostIP)
if err != nil {
errStr = err.Error()
}
Expand Down Expand Up @@ -1390,36 +1384,46 @@ func (w *worker) Shutdown(ctx context.Context) error {
return nil
}

func (w *worker) scanHost(ctx context.Context, hostKey types.PublicKey, hostIP string) (rhpv2.HostSettings, rhpv3.HostPriceTable, time.Duration, error) {
func (w *worker) scanHost(ctx context.Context, timeout time.Duration, hostKey types.PublicKey, hostIP string) (rhpv2.HostSettings, rhpv3.HostPriceTable, time.Duration, error) {
logger := w.logger.With("host", hostKey).With("hostIP", hostIP).With("timeout", timeout)
// prepare a helper for scanning
scan := func() (rhpv2.HostSettings, rhpv3.HostPriceTable, time.Duration, error) {
// apply timeout
scanCtx := ctx
var cancel context.CancelFunc
if timeout > 0 {
scanCtx, cancel = context.WithTimeout(scanCtx, timeout)
defer cancel()
}
// resolve hostIP. We don't want to scan hosts on private networks.
if !w.allowPrivateIPs {
host, _, err := net.SplitHostPort(hostIP)
if err != nil {
return rhpv2.HostSettings{}, rhpv3.HostPriceTable{}, 0, err
}
addrs, err := (&net.Resolver{}).LookupIPAddr(ctx, host)
addrs, err := (&net.Resolver{}).LookupIPAddr(scanCtx, host)
if err != nil {
return rhpv2.HostSettings{}, rhpv3.HostPriceTable{}, 0, err
}
for _, addr := range addrs {
if isPrivateIP(addr.IP) {
return rhpv2.HostSettings{}, rhpv3.HostPriceTable{}, 0, errors.New("host is on a private network")
return rhpv2.HostSettings{}, rhpv3.HostPriceTable{}, 0, api.ErrHostOnPrivateNetwork
}
}
}

// fetch the host settings
start := time.Now()
var settings rhpv2.HostSettings
err := w.withTransportV2(ctx, hostKey, hostIP, func(t *rhpv2.Transport) (err error) {
if settings, err = RPCSettings(ctx, t); err == nil {
// NOTE: we overwrite the NetAddress with the host address here since we
// just used it to dial the host we know it's valid
settings.NetAddress = hostIP
err := w.withTransportV2(scanCtx, hostKey, hostIP, func(t *rhpv2.Transport) error {
var err error
if settings, err = RPCSettings(scanCtx, t); err != nil {
return fmt.Errorf("failed to fetch host settings: %w", err)
}
return err
// NOTE: we overwrite the NetAddress with the host address here
// since we just used it to dial the host we know it's valid
settings.NetAddress = hostIP
return nil
})
elapsed := time.Since(start)
if err != nil {
Expand All @@ -1428,9 +1432,9 @@ func (w *worker) scanHost(ctx context.Context, hostKey types.PublicKey, hostIP s

// fetch the host pricetable
var pt rhpv3.HostPriceTable
err = w.transportPoolV3.withTransportV3(ctx, hostKey, settings.SiamuxAddr(), func(ctx context.Context, t *transportV3) error {
err = w.transportPoolV3.withTransportV3(scanCtx, hostKey, settings.SiamuxAddr(), func(ctx context.Context, t *transportV3) error {
if hpt, err := RPCPriceTable(ctx, t, func(pt rhpv3.HostPriceTable) (rhpv3.PaymentMethod, error) { return nil, nil }); err != nil {
return err
return fmt.Errorf("failed to fetch host price table: %w", err)
} else {
pt = hpt.HostPriceTable
return nil
Expand All @@ -1445,20 +1449,25 @@ func (w *worker) scanHost(ctx context.Context, hostKey types.PublicKey, hostIP s
// scan: second try
select {
case <-ctx.Done():
return rhpv2.HostSettings{}, rhpv3.HostPriceTable{}, 0, ctx.Err()
case <-time.After(time.Second):
}
settings, pt, duration, err = scan()

logger = logger.With("elapsed", duration)
if err == nil {
w.logger.Debugf("successfully scanned host %v after retry", hostKey)
logger.Debug("successfully scanned host on second try")
} else if !isErrHostUnreachable(err) {
logger.Debugw("failed to scan host", zap.Error(err))
}
}

// check if the scan failed due to a shutdown - shouldn't be necessary but
// just in case since recording a failed scan might have serious
// repercussions
select {
case <-w.shutdownCtx.Done():
return rhpv2.HostSettings{}, rhpv3.HostPriceTable{}, 0, w.shutdownCtx.Err()
case <-ctx.Done():
return rhpv2.HostSettings{}, rhpv3.HostPriceTable{}, 0, ctx.Err()
default:
}

Expand All @@ -1477,7 +1486,7 @@ func (w *worker) scanHost(ctx context.Context, hostKey types.PublicKey, hostIP s
},
})
if scanErr != nil {
w.logger.Errorf("failed to record host scan: %v", scanErr)
logger.Errorw("failed to record host scan", zap.Error(scanErr))
}
return settings, pt, duration, err
}
Expand All @@ -1494,6 +1503,17 @@ func discardTxnOnErr(ctx context.Context, bus Bus, l *zap.SugaredLogger, txn typ
cancel()
}

func isErrHostUnreachable(err error) bool {
return isError(err, os.ErrDeadlineExceeded) ||
isError(err, context.DeadlineExceeded) ||
isError(err, api.ErrHostOnPrivateNetwork) ||
isError(err, errors.New("no route to host")) ||
isError(err, errors.New("no such host")) ||
isError(err, errors.New("connection refused")) ||
isError(err, errors.New("unknown port")) ||
isError(err, errors.New("cannot assign requested address"))
}

func isErrDuplicateTransactionSet(err error) bool {
return err != nil && strings.Contains(err.Error(), modules.ErrDuplicateTransactionSet.Error())
}

0 comments on commit 79f6d66

Please sign in to comment.