diff --git a/worker/host.go b/worker/host.go index 86e92ce27..43e0891af 100644 --- a/worker/host.go +++ b/worker/host.go @@ -52,7 +52,6 @@ type ( acc *account bus Bus contractSpendingRecorder ContractSpendingRecorder - interactionRecorder HostInteractionRecorder logger *zap.SugaredLogger transportPool *transportPoolV3 priceTables *priceTables @@ -70,7 +69,6 @@ func (w *worker) Host(hk types.PublicKey, fcid types.FileContractID, siamuxAddr acc: w.accounts.ForHost(hk), bus: w.bus, contractSpendingRecorder: w.contractSpendingRecorder, - interactionRecorder: w.hostInteractionRecorder, logger: w.logger.Named(hk.String()[:4]), fcid: fcid, siamuxAddr: siamuxAddr, @@ -198,11 +196,13 @@ func (h *host) FetchPriceTable(ctx context.Context, rev *types.FileContractRevis fetchPT := func(paymentFn PriceTablePaymentFunc) (hpt hostdb.HostPriceTable, err error) { err = h.transportPool.withTransportV3(ctx, h.hk, h.siamuxAddr, func(ctx context.Context, t *transportV3) (err error) { hpt, err = RPCPriceTable(ctx, t, paymentFn) - h.interactionRecorder.RecordPriceTableUpdate(hostdb.PriceTableUpdate{ - HostKey: h.hk, - Success: isSuccessfulInteraction(err), - Timestamp: time.Now(), - PriceTable: hpt, + h.bus.RecordPriceTables(ctx, []hostdb.PriceTableUpdate{ + { + HostKey: h.hk, + Success: isSuccessfulInteraction(err), + Timestamp: time.Now(), + PriceTable: hpt, + }, }) return }) diff --git a/worker/interactions.go b/worker/interactions.go index dfe8c4017..2107ae582 100644 --- a/worker/interactions.go +++ b/worker/interactions.go @@ -1,135 +1,16 @@ package worker import ( - "context" - "fmt" - "sync" - "time" - "go.sia.tech/renterd/hostdb" - "go.uber.org/zap" -) - -const ( - keyInteractionRecorder contextKey = "InteractionRecorder" ) type ( HostInteractionRecorder interface { RecordHostScan(...hostdb.HostScan) RecordPriceTableUpdate(...hostdb.PriceTableUpdate) - Stop(context.Context) - } - - hostInteractionRecorder struct { - flushInterval time.Duration - - bus Bus - logger *zap.SugaredLogger - - mu sync.Mutex - hostScans []hostdb.HostScan - priceTableUpdates []hostdb.PriceTableUpdate - - flushCtx context.Context - flushTimer *time.Timer } ) -var ( - _ HostInteractionRecorder = (*hostInteractionRecorder)(nil) -) - -func (w *worker) initHostInteractionRecorder(flushInterval time.Duration) { - if w.hostInteractionRecorder != nil { - panic("HostInteractionRecorder already initialized") // developer error - } - w.hostInteractionRecorder = &hostInteractionRecorder{ - bus: w.bus, - logger: w.logger, - - flushCtx: w.shutdownCtx, - flushInterval: flushInterval, - - hostScans: make([]hostdb.HostScan, 0), - priceTableUpdates: make([]hostdb.PriceTableUpdate, 0), - } -} - -func (r *hostInteractionRecorder) RecordHostScan(scans ...hostdb.HostScan) { - r.mu.Lock() - defer r.mu.Unlock() - r.hostScans = append(r.hostScans, scans...) - r.tryFlushInteractionsBuffer() -} - -func (r *hostInteractionRecorder) RecordPriceTableUpdate(ptUpdates ...hostdb.PriceTableUpdate) { - r.mu.Lock() - defer r.mu.Unlock() - r.priceTableUpdates = append(r.priceTableUpdates, ptUpdates...) - r.tryFlushInteractionsBuffer() -} - -func (r *hostInteractionRecorder) Stop(ctx context.Context) { - // stop the flush timer - r.mu.Lock() - if r.flushTimer != nil { - r.flushTimer.Stop() - } - r.flushCtx = ctx - r.mu.Unlock() - - // flush all interactions - r.flush() - - // log if we weren't able to flush them - r.mu.Lock() - if len(r.hostScans) > 0 { - r.logger.Errorw(fmt.Sprintf("failed to record %d host scans on worker shutdown", len(r.hostScans))) - } - if len(r.priceTableUpdates) > 0 { - r.logger.Errorw(fmt.Sprintf("failed to record %d price table updates on worker shutdown", len(r.priceTableUpdates))) - } - r.mu.Unlock() -} - -func (r *hostInteractionRecorder) flush() { - r.mu.Lock() - defer r.mu.Unlock() - - // NOTE: don't bother flushing if the context is cancelled, we can safely - // ignore the buffered scans and price tables since we'll flush on shutdown - // and log in case we weren't able to flush all interactions to the bus - select { - case <-r.flushCtx.Done(): - r.flushTimer = nil - return - default: - } - - if len(r.hostScans) > 0 { - if err := r.bus.RecordHostScans(r.flushCtx, r.hostScans); err != nil { - r.logger.Errorw(fmt.Sprintf("failed to record scans: %v", err)) - } else if err == nil { - r.hostScans = nil - } - } - if len(r.priceTableUpdates) > 0 { - if err := r.bus.RecordPriceTables(r.flushCtx, r.priceTableUpdates); err != nil { - r.logger.Errorw(fmt.Sprintf("failed to record price table updates: %v", err)) - } else if err == nil { - r.priceTableUpdates = nil - } - } - r.flushTimer = nil -} - -func (r *hostInteractionRecorder) tryFlushInteractionsBuffer() { - if r.flushTimer == nil { - r.flushTimer = time.AfterFunc(r.flushInterval, r.flush) - } -} - func isSuccessfulInteraction(err error) bool { // No error always means success. if err == nil { diff --git a/worker/worker.go b/worker/worker.go index 094722c00..79c149085 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -200,7 +200,6 @@ type worker struct { uploadsMu sync.Mutex uploadingPackedSlabs map[string]bool - hostInteractionRecorder HostInteractionRecorder contractSpendingRecorder ContractSpendingRecorder contractLockingDuration time.Duration @@ -342,11 +341,13 @@ func (w *worker) rhpPriceTableHandler(jc jape.Context) { var err error var hpt hostdb.HostPriceTable defer func() { - w.hostInteractionRecorder.RecordPriceTableUpdate(hostdb.PriceTableUpdate{ - HostKey: rptr.HostKey, - Success: isSuccessfulInteraction(err), - Timestamp: time.Now(), - PriceTable: hpt, + w.bus.RecordPriceTables(ctx, []hostdb.PriceTableUpdate{ + { + HostKey: rptr.HostKey, + Success: isSuccessfulInteraction(err), + Timestamp: time.Now(), + PriceTable: hpt, + }, }) }() @@ -1291,6 +1292,7 @@ func New(masterKey [32]byte, id string, b Bus, contractLockingDuration, busFlush return nil, errors.New("uploadMaxMemory cannot be 0") } + ctx, cancel := context.WithCancel(context.Background()) w := &worker{ alerts: alerts.WithOrigin(b, fmt.Sprintf("worker.%s", id)), allowPrivateIPs: allowPrivateIPs, @@ -1301,13 +1303,10 @@ func New(masterKey [32]byte, id string, b Bus, contractLockingDuration, busFlush logger: l.Sugar().Named("worker").Named(id), startTime: time.Now(), uploadingPackedSlabs: make(map[string]bool), + shutdownCtx: ctx, + shutdownCtxCancel: cancel, } - ctx, cancel := context.WithCancel(context.Background()) - ctx = context.WithValue(ctx, keyInteractionRecorder, w) - w.shutdownCtx = ctx - w.shutdownCtxCancel = cancel - w.initAccounts(b) w.initPriceTables() w.initTransportPool() @@ -1316,7 +1315,6 @@ func New(masterKey [32]byte, id string, b Bus, contractLockingDuration, busFlush w.initUploadManager(uploadMaxMemory, uploadMaxOverdrive, uploadOverdriveTimeout, l.Sugar().Named("uploadmanager")) w.initContractSpendingRecorder(busFlushInterval) - w.initHostInteractionRecorder(busFlushInterval) return w, nil } @@ -1363,7 +1361,6 @@ func (w *worker) Shutdown(ctx context.Context) error { w.uploadManager.Stop() // stop recorders - w.hostInteractionRecorder.Stop(ctx) w.contractSpendingRecorder.Stop(ctx) return nil } @@ -1440,14 +1437,23 @@ func (w *worker) scanHost(ctx context.Context, hostKey types.PublicKey, hostIP s default: } - // record host scan - w.hostInteractionRecorder.RecordHostScan(hostdb.HostScan{ - HostKey: hostKey, - Success: isSuccessfulInteraction(err), - Timestamp: time.Now(), - Settings: settings, - PriceTable: pt, + // record host scan - make sure this isn't interrupted by the same context + // used to time out the scan itself because otherwise we won't be able to + // record scans that timed out. + recordCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + scanErr := w.bus.RecordHostScans(recordCtx, []hostdb.HostScan{ + { + HostKey: hostKey, + Success: isSuccessfulInteraction(err), + Timestamp: time.Now(), + Settings: settings, + PriceTable: pt, + }, }) + if scanErr != nil { + w.logger.Errorf("failed to record host scan: %v", scanErr) + } return settings, pt, duration, err }