Skip to content

Commit

Permalink
worker: update logging in scanHost and apply timeout to each step of …
Browse files Browse the repository at this point in the history
…scanning
  • Loading branch information
ChrisSchinnerl committed Mar 25, 2024
1 parent 9fe12d9 commit 80eca75
Showing 1 changed file with 58 additions and 41 deletions.
99 changes: 58 additions & 41 deletions worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -1436,64 +1436,81 @@ func (w *worker) scanHost(ctx context.Context, timeout time.Duration, hostKey ty
logger := w.logger.With("host", hostKey).With("hostIP", hostIP).With("timeout", timeout)
// prepare a helper for scanning
scan := func() (rhpv2.HostSettings, rhpv3.HostPriceTable, time.Duration, error) {
// apply timeout
scanCtx := ctx
var cancel context.CancelFunc
if timeout > 0 {
scanCtx, cancel = context.WithTimeout(scanCtx, timeout)
defer cancel()
}
// resolve hostIP. We don't want to scan hosts on private networks.
if !w.allowPrivateIPs {
host, _, err := net.SplitHostPort(hostIP)
if err != nil {
return rhpv2.HostSettings{}, rhpv3.HostPriceTable{}, 0, err
// helper to prepare a context for scanning
withTimeoutCtx := func() (context.Context, context.CancelFunc) {
if timeout > 0 {
return context.WithTimeout(ctx, timeout)
}
addrs, err := (&net.Resolver{}).LookupIPAddr(scanCtx, host)
if err != nil {
return rhpv2.HostSettings{}, rhpv3.HostPriceTable{}, 0, err
}
for _, addr := range addrs {
if isPrivateIP(addr.IP) {
return rhpv2.HostSettings{}, rhpv3.HostPriceTable{}, 0, api.ErrHostOnPrivateNetwork
return ctx, func() {}
}
// resolve the address
{
scanCtx, cancel := withTimeoutCtx()
defer cancel()
// resolve hostIP. We don't want to scan hosts on private networks.
if !w.allowPrivateIPs {
host, _, err := net.SplitHostPort(hostIP)
if err != nil {
return rhpv2.HostSettings{}, rhpv3.HostPriceTable{}, 0, err
}
addrs, err := (&net.Resolver{}).LookupIPAddr(scanCtx, host)
if err != nil {
return rhpv2.HostSettings{}, rhpv3.HostPriceTable{}, 0, err
}
for _, addr := range addrs {
if isPrivateIP(addr.IP) {
return rhpv2.HostSettings{}, rhpv3.HostPriceTable{}, 0, api.ErrHostOnPrivateNetwork
}
}
}
}

// fetch the host settings
start := time.Now()
var settings rhpv2.HostSettings
err := w.withTransportV2(scanCtx, hostKey, hostIP, func(t *rhpv2.Transport) error {
var err error
if settings, err = RPCSettings(scanCtx, t); err != nil {
return fmt.Errorf("failed to fetch host settings: %w", err)
{
scanCtx, cancel := withTimeoutCtx()
defer cancel()
err := w.withTransportV2(scanCtx, hostKey, hostIP, func(t *rhpv2.Transport) error {
var err error
if settings, err = RPCSettings(scanCtx, t); err != nil {
return fmt.Errorf("failed to fetch host settings: %w", err)
}
// NOTE: we overwrite the NetAddress with the host address here
// since we just used it to dial the host we know it's valid
settings.NetAddress = hostIP
return nil
})
if err != nil {
return settings, rhpv3.HostPriceTable{}, time.Since(start), err
}
// NOTE: we overwrite the NetAddress with the host address here
// since we just used it to dial the host we know it's valid
settings.NetAddress = hostIP
return nil
})
elapsed := time.Since(start)
if err != nil {
return settings, rhpv3.HostPriceTable{}, elapsed, err
}

// fetch the host pricetable
var pt rhpv3.HostPriceTable
err = w.transportPoolV3.withTransportV3(scanCtx, hostKey, settings.SiamuxAddr(), func(ctx context.Context, t *transportV3) error {
if hpt, err := RPCPriceTable(ctx, t, func(pt rhpv3.HostPriceTable) (rhpv3.PaymentMethod, error) { return nil, nil }); err != nil {
return fmt.Errorf("failed to fetch host price table: %w", err)
} else {
pt = hpt.HostPriceTable
return nil
{
scanCtx, cancel := withTimeoutCtx()
defer cancel()
err := w.transportPoolV3.withTransportV3(scanCtx, hostKey, settings.SiamuxAddr(), func(ctx context.Context, t *transportV3) error {
if hpt, err := RPCPriceTable(ctx, t, func(pt rhpv3.HostPriceTable) (rhpv3.PaymentMethod, error) { return nil, nil }); err != nil {
return fmt.Errorf("failed to fetch host price table: %w", err)
} else {
pt = hpt.HostPriceTable
return nil
}
})
if err != nil {
return settings, rhpv3.HostPriceTable{}, time.Since(start), err
}
})
return settings, pt, elapsed, err
}
return settings, pt, time.Since(start), nil
}

// scan: first try
settings, pt, duration, err := scan()
if err != nil {
logger = logger.With(zap.Error(err))

// scan: second try
select {
case <-ctx.Done():
Expand All @@ -1502,11 +1519,11 @@ func (w *worker) scanHost(ctx context.Context, timeout time.Duration, hostKey ty
}
settings, pt, duration, err = scan()

logger = logger.With("elapsed", duration)
logger = logger.With("elapsed", duration).With(zap.Error(err))
if err == nil {
logger.Info("successfully scanned host on second try")
} else if !isErrHostUnreachable(err) {
logger.Infow("failed to scan host", zap.Error(err))
logger.Infow("failed to scan host")
}
}

Expand Down

0 comments on commit 80eca75

Please sign in to comment.