Skip to content

Commit

Permalink
worker: use different context in scanHost
Browse files Browse the repository at this point in the history
  • Loading branch information
ChrisSchinnerl committed Feb 20, 2024
1 parent df94b1b commit 2fbcc26
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 146 deletions.
14 changes: 7 additions & 7 deletions worker/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ type (
acc *account
bus Bus
contractSpendingRecorder ContractSpendingRecorder
interactionRecorder HostInteractionRecorder
logger *zap.SugaredLogger
transportPool *transportPoolV3
priceTables *priceTables
Expand All @@ -70,7 +69,6 @@ func (w *worker) Host(hk types.PublicKey, fcid types.FileContractID, siamuxAddr
acc: w.accounts.ForHost(hk),
bus: w.bus,
contractSpendingRecorder: w.contractSpendingRecorder,
interactionRecorder: w.hostInteractionRecorder,
logger: w.logger.Named(hk.String()[:4]),
fcid: fcid,
siamuxAddr: siamuxAddr,
Expand Down Expand Up @@ -198,11 +196,13 @@ func (h *host) FetchPriceTable(ctx context.Context, rev *types.FileContractRevis
fetchPT := func(paymentFn PriceTablePaymentFunc) (hpt hostdb.HostPriceTable, err error) {
err = h.transportPool.withTransportV3(ctx, h.hk, h.siamuxAddr, func(ctx context.Context, t *transportV3) (err error) {
hpt, err = RPCPriceTable(ctx, t, paymentFn)
h.interactionRecorder.RecordPriceTableUpdate(hostdb.PriceTableUpdate{
HostKey: h.hk,
Success: isSuccessfulInteraction(err),
Timestamp: time.Now(),
PriceTable: hpt,
h.bus.RecordPriceTables(ctx, []hostdb.PriceTableUpdate{
{
HostKey: h.hk,
Success: isSuccessfulInteraction(err),
Timestamp: time.Now(),
PriceTable: hpt,
},
})
return
})
Expand Down
119 changes: 0 additions & 119 deletions worker/interactions.go
Original file line number Diff line number Diff line change
@@ -1,135 +1,16 @@
package worker

import (
"context"
"fmt"
"sync"
"time"

"go.sia.tech/renterd/hostdb"
"go.uber.org/zap"
)

const (
keyInteractionRecorder contextKey = "InteractionRecorder"
)

type (
HostInteractionRecorder interface {
RecordHostScan(...hostdb.HostScan)
RecordPriceTableUpdate(...hostdb.PriceTableUpdate)
Stop(context.Context)
}

hostInteractionRecorder struct {
flushInterval time.Duration

bus Bus
logger *zap.SugaredLogger

mu sync.Mutex
hostScans []hostdb.HostScan
priceTableUpdates []hostdb.PriceTableUpdate

flushCtx context.Context
flushTimer *time.Timer
}
)

var (
_ HostInteractionRecorder = (*hostInteractionRecorder)(nil)
)

func (w *worker) initHostInteractionRecorder(flushInterval time.Duration) {
if w.hostInteractionRecorder != nil {
panic("HostInteractionRecorder already initialized") // developer error
}
w.hostInteractionRecorder = &hostInteractionRecorder{
bus: w.bus,
logger: w.logger,

flushCtx: w.shutdownCtx,
flushInterval: flushInterval,

hostScans: make([]hostdb.HostScan, 0),
priceTableUpdates: make([]hostdb.PriceTableUpdate, 0),
}
}

func (r *hostInteractionRecorder) RecordHostScan(scans ...hostdb.HostScan) {
r.mu.Lock()
defer r.mu.Unlock()
r.hostScans = append(r.hostScans, scans...)
r.tryFlushInteractionsBuffer()
}

func (r *hostInteractionRecorder) RecordPriceTableUpdate(ptUpdates ...hostdb.PriceTableUpdate) {
r.mu.Lock()
defer r.mu.Unlock()
r.priceTableUpdates = append(r.priceTableUpdates, ptUpdates...)
r.tryFlushInteractionsBuffer()
}

func (r *hostInteractionRecorder) Stop(ctx context.Context) {
// stop the flush timer
r.mu.Lock()
if r.flushTimer != nil {
r.flushTimer.Stop()
}
r.flushCtx = ctx
r.mu.Unlock()

// flush all interactions
r.flush()

// log if we weren't able to flush them
r.mu.Lock()
if len(r.hostScans) > 0 {
r.logger.Errorw(fmt.Sprintf("failed to record %d host scans on worker shutdown", len(r.hostScans)))
}
if len(r.priceTableUpdates) > 0 {
r.logger.Errorw(fmt.Sprintf("failed to record %d price table updates on worker shutdown", len(r.priceTableUpdates)))
}
r.mu.Unlock()
}

func (r *hostInteractionRecorder) flush() {
r.mu.Lock()
defer r.mu.Unlock()

// NOTE: don't bother flushing if the context is cancelled, we can safely
// ignore the buffered scans and price tables since we'll flush on shutdown
// and log in case we weren't able to flush all interactions to the bus
select {
case <-r.flushCtx.Done():
r.flushTimer = nil
return
default:
}

if len(r.hostScans) > 0 {
if err := r.bus.RecordHostScans(r.flushCtx, r.hostScans); err != nil {
r.logger.Errorw(fmt.Sprintf("failed to record scans: %v", err))
} else if err == nil {
r.hostScans = nil
}
}
if len(r.priceTableUpdates) > 0 {
if err := r.bus.RecordPriceTables(r.flushCtx, r.priceTableUpdates); err != nil {
r.logger.Errorw(fmt.Sprintf("failed to record price table updates: %v", err))
} else if err == nil {
r.priceTableUpdates = nil
}
}
r.flushTimer = nil
}

func (r *hostInteractionRecorder) tryFlushInteractionsBuffer() {
if r.flushTimer == nil {
r.flushTimer = time.AfterFunc(r.flushInterval, r.flush)
}
}

func isSuccessfulInteraction(err error) bool {
// No error always means success.
if err == nil {
Expand Down
46 changes: 26 additions & 20 deletions worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,6 @@ type worker struct {
uploadsMu sync.Mutex
uploadingPackedSlabs map[string]bool

hostInteractionRecorder HostInteractionRecorder
contractSpendingRecorder ContractSpendingRecorder
contractLockingDuration time.Duration

Expand Down Expand Up @@ -342,11 +341,13 @@ func (w *worker) rhpPriceTableHandler(jc jape.Context) {
var err error
var hpt hostdb.HostPriceTable
defer func() {
w.hostInteractionRecorder.RecordPriceTableUpdate(hostdb.PriceTableUpdate{
HostKey: rptr.HostKey,
Success: isSuccessfulInteraction(err),
Timestamp: time.Now(),
PriceTable: hpt,
w.bus.RecordPriceTables(ctx, []hostdb.PriceTableUpdate{
{
HostKey: rptr.HostKey,
Success: isSuccessfulInteraction(err),
Timestamp: time.Now(),
PriceTable: hpt,
},
})
}()

Expand Down Expand Up @@ -1292,6 +1293,7 @@ func New(masterKey [32]byte, id string, b Bus, contractLockingDuration, busFlush
return nil, errors.New("uploadMaxMemory cannot be 0")
}

ctx, cancel := context.WithCancel(context.Background())
w := &worker{
alerts: alerts.WithOrigin(b, fmt.Sprintf("worker.%s", id)),
allowPrivateIPs: allowPrivateIPs,
Expand All @@ -1302,13 +1304,10 @@ func New(masterKey [32]byte, id string, b Bus, contractLockingDuration, busFlush
logger: l.Sugar().Named("worker").Named(id),
startTime: time.Now(),
uploadingPackedSlabs: make(map[string]bool),
shutdownCtx: ctx,
shutdownCtxCancel: cancel,
}

ctx, cancel := context.WithCancel(context.Background())
ctx = context.WithValue(ctx, keyInteractionRecorder, w)
w.shutdownCtx = ctx
w.shutdownCtxCancel = cancel

w.initAccounts(b)
w.initPriceTables()
w.initTransportPool()
Expand All @@ -1317,7 +1316,6 @@ func New(masterKey [32]byte, id string, b Bus, contractLockingDuration, busFlush
w.initUploadManager(uploadMaxMemory, uploadMaxOverdrive, uploadOverdriveTimeout, l.Sugar().Named("uploadmanager"))

w.initContractSpendingRecorder(busFlushInterval)
w.initHostInteractionRecorder(busFlushInterval)
return w, nil
}

Expand Down Expand Up @@ -1364,7 +1362,6 @@ func (w *worker) Shutdown(ctx context.Context) error {
w.uploadManager.Stop()

// stop recorders
w.hostInteractionRecorder.Stop(ctx)
w.contractSpendingRecorder.Stop(ctx)
return nil
}
Expand Down Expand Up @@ -1441,14 +1438,23 @@ func (w *worker) scanHost(ctx context.Context, hostKey types.PublicKey, hostIP s
default:
}

// record host scan
w.hostInteractionRecorder.RecordHostScan(hostdb.HostScan{
HostKey: hostKey,
Success: isSuccessfulInteraction(err),
Timestamp: time.Now(),
Settings: settings,
PriceTable: pt,
// record host scan - make sure this isn't interrupted by the same context
// used to time out the scan itself because otherwise we won't be able to
// record scans that timed out.
recordCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
scanErr := w.bus.RecordHostScans(recordCtx, []hostdb.HostScan{
{
HostKey: hostKey,
Success: isSuccessfulInteraction(err),
Timestamp: time.Now(),
Settings: settings,
PriceTable: pt,
},
})
if scanErr != nil {
w.logger.Errorf("failed to record host scan: %v", scanErr)
}
return settings, pt, duration, err
}

Expand Down

0 comments on commit 2fbcc26

Please sign in to comment.