Skip to content

Commit

Permalink
worker: address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
ChrisSchinnerl committed Mar 6, 2024
1 parent 64a2788 commit 7f0cf5c
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 37 deletions.
4 changes: 4 additions & 0 deletions api/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ var (
// ErrContractSetNotSpecified is returned by the worker API by endpoints that
// need a contract set to be able to upload data.
ErrContractSetNotSpecified = errors.New("contract set is not specified")

// ErrHostOnPrivateNetwork is returned by the worker API when a host can't
// be scanned since it is on a private network.
ErrHostOnPrivateNetwork = errors.New("host is on a private network")
)

type (
Expand Down
62 changes: 25 additions & 37 deletions worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,6 @@ const (
)

var (
errHostOnPrivateNetwork = errors.New("host is on a private network")

ErrShuttingDown = errors.New("worker is shutting down")
)

Expand Down Expand Up @@ -274,7 +272,7 @@ func (w *worker) rhpScanHandler(jc jape.Context) {

// scan host
var errStr string
settings, priceTable, elapsed, err := w.scanHost(time.Duration(rsr.Timeout), rsr.HostKey, rsr.HostIP)
settings, priceTable, elapsed, err := w.scanHost(ctx, time.Duration(rsr.Timeout), rsr.HostKey, rsr.HostIP)
if err != nil {
errStr = err.Error()
}
Expand Down Expand Up @@ -1386,16 +1384,15 @@ func (w *worker) Shutdown(ctx context.Context) error {
return nil
}

func (w *worker) scanHost(timeout time.Duration, hostKey types.PublicKey, hostIP string) (rhpv2.HostSettings, rhpv3.HostPriceTable, time.Duration, error) {
func (w *worker) scanHost(ctx context.Context, timeout time.Duration, hostKey types.PublicKey, hostIP string) (rhpv2.HostSettings, rhpv3.HostPriceTable, time.Duration, error) {
logger := w.logger.With("host", hostKey).With("hostIP", hostIP).With("timeout", timeout)

// prepare a helper for scanning
scan := func() (rhpv2.HostSettings, rhpv3.HostPriceTable, time.Duration, error) {
// apply timeout
ctx := w.shutdownCtx
var cancel context.CancelFunc
if timeout > 0 {
ctx, cancel = context.WithTimeout(w.shutdownCtx, timeout)
ctx, cancel = context.WithTimeout(ctx, timeout)
defer cancel()
}
// resolve hostIP. We don't want to scan hosts on private networks.
Expand All @@ -1410,22 +1407,23 @@ func (w *worker) scanHost(timeout time.Duration, hostKey types.PublicKey, hostIP
}
for _, addr := range addrs {
if isPrivateIP(addr.IP) {
return rhpv2.HostSettings{}, rhpv3.HostPriceTable{}, 0, errHostOnPrivateNetwork
return rhpv2.HostSettings{}, rhpv3.HostPriceTable{}, 0, api.ErrHostOnPrivateNetwork
}
}
}

// fetch the host settings
start := time.Now()
var settings rhpv2.HostSettings
err := w.withTransportV2(ctx, hostKey, hostIP, func(t *rhpv2.Transport) (err error) {
if settings, err = RPCSettings(ctx, t); err == nil {
// NOTE: we overwrite the NetAddress with the host address here since we
// just used it to dial the host we know it's valid
settings.NetAddress = hostIP
return nil
err := w.withTransportV2(ctx, hostKey, hostIP, func(t *rhpv2.Transport) error {
var err error
if settings, err = RPCSettings(ctx, t); err != nil {
return fmt.Errorf("failed to fetch host settings: %w", err)
}
return fmt.Errorf("failed to fetch host settings: %w", err)
// NOTE: we overwrite the NetAddress with the host address here
// since we just used it to dial the host we know it's valid
settings.NetAddress = hostIP
return nil
})
elapsed := time.Since(start)
if err != nil {
Expand All @@ -1450,15 +1448,15 @@ func (w *worker) scanHost(timeout time.Duration, hostKey types.PublicKey, hostIP
if err != nil {
// scan: second try
select {
case <-w.shutdownCtx.Done():
return rhpv2.HostSettings{}, rhpv3.HostPriceTable{}, 0, w.shutdownCtx.Err()
case <-ctx.Done():
return rhpv2.HostSettings{}, rhpv3.HostPriceTable{}, 0, ctx.Err()
case <-time.After(time.Second):
}
settings, pt, duration, err = scan()

logger = logger.With("elapsed", duration)
if err == nil {
logger.Debugf("successfully scanned host on second try")
logger.Debug("successfully scanned host on second try")
} else if !isErrHostUnreachable(err) {
logger.Debugw("failed to scan host", zap.Error(err))
}
Expand All @@ -1468,8 +1466,8 @@ func (w *worker) scanHost(timeout time.Duration, hostKey types.PublicKey, hostIP
// just in case since recording a failed scan might have serious
// repercussions
select {
case <-w.shutdownCtx.Done():
return rhpv2.HostSettings{}, rhpv3.HostPriceTable{}, 0, w.shutdownCtx.Err()
case <-ctx.Done():
return rhpv2.HostSettings{}, rhpv3.HostPriceTable{}, 0, ctx.Err()
default:
}

Expand Down Expand Up @@ -1506,24 +1504,14 @@ func discardTxnOnErr(ctx context.Context, bus Bus, l *zap.SugaredLogger, txn typ
}

func isErrHostUnreachable(err error) bool {
if isError(err, os.ErrDeadlineExceeded) {
return true
} else if isError(err, context.DeadlineExceeded) {
return true
} else if isError(err, errHostOnPrivateNetwork) {
return true
} else if isError(err, errors.New("no route to host")) {
return true
} else if isError(err, errors.New("no such host")) {
return true
} else if isError(err, errors.New("connection refused")) {
return true
} else if isError(err, errors.New("unknown port")) {
return true
} else if isError(err, errors.New("cannot assign requested address")) {
return true
}
return false
return isError(err, os.ErrDeadlineExceeded) ||
isError(err, context.DeadlineExceeded) ||
isError(err, api.ErrHostOnPrivateNetwork) ||
isError(err, errors.New("no route to host")) ||
isError(err, errors.New("no such host")) ||
isError(err, errors.New("connection refused")) ||
isError(err, errors.New("unknown port")) ||
isError(err, errors.New("cannot assign requested address"))
}

func isErrDuplicateTransactionSet(err error) bool {
Expand Down

0 comments on commit 7f0cf5c

Please sign in to comment.