From 23b1ace798d91137b78907f34d00ca5a349e5a35 Mon Sep 17 00:00:00 2001 From: PJ Date: Mon, 22 Jan 2024 10:34:47 +0100 Subject: [PATCH] worker: improve contract and host interaction recorders --- worker/download.go | 3 +- worker/host.go | 4 +- worker/interactions.go | 144 +++++++++++++++++++++++++++++------------ worker/spending.go | 120 ++++++++++++++++++++-------------- worker/upload.go | 3 +- worker/worker.go | 85 +++++++++++------------- 6 files changed, 218 insertions(+), 141 deletions(-) diff --git a/worker/download.go b/worker/download.go index bfeb272ac..128b7637c 100644 --- a/worker/download.go +++ b/worker/download.go @@ -25,7 +25,6 @@ const ( ) var ( - errDownloadManagerStopped = errors.New("download manager stopped") errDownloadNotEnoughHosts = errors.New("not enough hosts available to download the slab") ) @@ -291,7 +290,7 @@ outer: var resp *slabDownloadResponse select { case <-mgr.shutdownCtx.Done(): - return errDownloadManagerStopped + return ErrShuttingDown case <-ctx.Done(): return errors.New("download timed out") case resp = <-responseChan: diff --git a/worker/host.go b/worker/host.go index 2e13af353..f7ae6cb1f 100644 --- a/worker/host.go +++ b/worker/host.go @@ -49,7 +49,7 @@ type ( acc *account bus Bus - contractSpendingRecorder *contractSpendingRecorder + contractSpendingRecorder ContractSpendingRecorder logger *zap.SugaredLogger transportPool *transportPoolV3 priceTables *priceTables @@ -192,7 +192,7 @@ func (h *host) FetchPriceTable(ctx context.Context, rev *types.FileContractRevis fetchPT := func(paymentFn PriceTablePaymentFunc) (hpt hostdb.HostPriceTable, err error) { err = h.transportPool.withTransportV3(ctx, h.hk, h.siamuxAddr, func(ctx context.Context, t *transportV3) (err error) { hpt, err = RPCPriceTable(ctx, t, paymentFn) - InteractionRecorderFromContext(ctx).RecordPriceTableUpdate(hostdb.PriceTableUpdate{ + HostInteractionRecorderFromContext(ctx).RecordPriceTableUpdate(hostdb.PriceTableUpdate{ HostKey: h.hk, Success: isSuccessfulInteraction(err), Timestamp: time.Now(), diff --git a/worker/interactions.go b/worker/interactions.go index 2321ebb30..70629c1f0 100644 --- a/worker/interactions.go +++ b/worker/interactions.go @@ -4,10 +4,12 @@ import ( "context" "fmt" "net/http" + "sync" "time" "go.sia.tech/jape" "go.sia.tech/renterd/hostdb" + "go.uber.org/zap" ) const ( @@ -15,15 +17,40 @@ const ( ) type ( - InteractionRecorder interface { + HostInteractionRecorder interface { RecordHostScan(...hostdb.HostScan) RecordPriceTableUpdate(...hostdb.PriceTableUpdate) + Stop(context.Context) + } + + hostInteractionRecorder struct { + flushInterval time.Duration + + bus Bus + logger *zap.SugaredLogger + + mu sync.Mutex + hostScans []hostdb.HostScan + priceTableUpdates []hostdb.PriceTableUpdate + + flushCtx context.Context + flushTimer *time.Timer } ) -var _ InteractionRecorder = &worker{} +var ( + _ HostInteractionRecorder = (*hostInteractionRecorder)(nil) +) -func interactionMiddleware(ir InteractionRecorder, routes map[string]jape.Handler) map[string]jape.Handler { +func HostInteractionRecorderFromContext(ctx context.Context) HostInteractionRecorder { + ir, ok := ctx.Value(keyInteractionRecorder).(HostInteractionRecorder) + if !ok { + panic("no interaction recorder attached to the context") // developer error + } + return ir +} + +func interactionMiddleware(ir HostInteractionRecorder, routes map[string]jape.Handler) map[string]jape.Handler { for route, handler := range routes { routes[route] = jape.Adapt(func(h http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -35,61 +62,94 @@ func interactionMiddleware(ir InteractionRecorder, routes map[string]jape.Handle return routes } -func InteractionRecorderFromContext(ctx context.Context) InteractionRecorder { - ir, ok := ctx.Value(keyInteractionRecorder).(InteractionRecorder) - if !ok { - panic("no interaction recorder attached to the context") // developer error +func (w *worker) initHostInteractionRecorder(flushInterval time.Duration) { + if w.hostInteractionRecorder != nil { + panic("HostInteractionRecorder already initialized") // developer error } - return ir -} + w.hostInteractionRecorder = &hostInteractionRecorder{ + bus: w.bus, + logger: w.logger, -func (w *worker) RecordHostScan(scans ...hostdb.HostScan) { - w.interactionsMu.Lock() - defer w.interactionsMu.Unlock() + flushCtx: w.shutdownCtx, + flushInterval: flushInterval, - w.interactionsScans = append(w.interactionsScans, scans...) - w.tryFlushInteractionsBuffer() + hostScans: make([]hostdb.HostScan, 0), + priceTableUpdates: make([]hostdb.PriceTableUpdate, 0), + } } -func (w *worker) RecordPriceTableUpdate(ptUpdates ...hostdb.PriceTableUpdate) { - w.interactionsMu.Lock() - defer w.interactionsMu.Unlock() +func (r *hostInteractionRecorder) RecordHostScan(scans ...hostdb.HostScan) { + r.mu.Lock() + defer r.mu.Unlock() + r.hostScans = append(r.hostScans, scans...) + r.tryFlushInteractionsBuffer() +} - w.interactionsPriceTableUpdates = append(w.interactionsPriceTableUpdates, ptUpdates...) - w.tryFlushInteractionsBuffer() +func (r *hostInteractionRecorder) RecordPriceTableUpdate(ptUpdates ...hostdb.PriceTableUpdate) { + r.mu.Lock() + defer r.mu.Unlock() + r.priceTableUpdates = append(r.priceTableUpdates, ptUpdates...) + r.tryFlushInteractionsBuffer() } -func (w *worker) tryFlushInteractionsBuffer() { - // If a thread was scheduled to flush the buffer we are done. - if w.interactionsFlushTimer != nil { - return +func (r *hostInteractionRecorder) Stop(ctx context.Context) { + // stop the flush timer + r.mu.Lock() + if r.flushTimer != nil { + r.flushTimer.Stop() } + r.flushCtx = ctx + r.mu.Unlock() - // Otherwise we schedule a flush. - w.interactionsFlushTimer = time.AfterFunc(w.busFlushInterval, func() { - w.interactionsMu.Lock() - w.flushInteractions() - w.interactionsMu.Unlock() - }) + // flush all interactions + r.flush() + + // log if we weren't able to flush them + r.mu.Lock() + if len(r.hostScans) > 0 { + r.logger.Errorw(fmt.Sprintf("failed to record %d host scans on worker shutdown", len(r.hostScans))) + } + if len(r.priceTableUpdates) > 0 { + r.logger.Errorw(fmt.Sprintf("failed to record %d price table updates on worker shutdown", len(r.priceTableUpdates))) + } + r.mu.Unlock() } -// flushInteractions flushes the worker's interaction buffer to the bus. -func (w *worker) flushInteractions() { - if len(w.interactionsScans) > 0 { - if err := w.bus.RecordHostScans(w.shutdownCtx, w.interactionsScans); err != nil { - w.logger.Errorw(fmt.Sprintf("failed to record scans: %v", err)) - } else { - w.interactionsScans = nil +func (r *hostInteractionRecorder) flush() { + r.mu.Lock() + defer r.mu.Unlock() + + // NOTE: don't bother flushing if the context is cancelled, we can safely + // ignore the buffered scans and price tables since we'll flush on shutdown + // and log in case we weren't able to flush all interactions to the bus + select { + case <-r.flushCtx.Done(): + r.flushTimer = nil + return + default: + } + + if len(r.hostScans) > 0 { + if err := r.bus.RecordHostScans(r.flushCtx, r.hostScans); err != nil { + r.logger.Errorw(fmt.Sprintf("failed to record scans: %v", err)) + } else if err == nil { + r.hostScans = nil } } - if len(w.interactionsPriceTableUpdates) > 0 { - if err := w.bus.RecordPriceTables(w.shutdownCtx, w.interactionsPriceTableUpdates); err != nil { - w.logger.Errorw(fmt.Sprintf("failed to record price table updates: %v", err)) - } else { - w.interactionsPriceTableUpdates = nil + if len(r.priceTableUpdates) > 0 { + if err := r.bus.RecordPriceTables(r.flushCtx, r.priceTableUpdates); err != nil { + r.logger.Errorw(fmt.Sprintf("failed to record price table updates: %v", err)) + } else if err == nil { + r.priceTableUpdates = nil } } - w.interactionsFlushTimer = nil + r.flushTimer = nil +} + +func (r *hostInteractionRecorder) tryFlushInteractionsBuffer() { + if r.flushTimer == nil { + r.flushTimer = time.AfterFunc(r.flushInterval, r.flush) + } } func isSuccessfulInteraction(err error) bool { diff --git a/worker/spending.go b/worker/spending.go index ec09e803a..87d2ec17d 100644 --- a/worker/spending.go +++ b/worker/spending.go @@ -12,43 +12,51 @@ import ( ) type ( - // A ContractSpendingRecorder records the spending of a contract. ContractSpendingRecorder interface { Record(rev types.FileContractRevision, cs api.ContractSpending) + Stop(context.Context) } contractSpendingRecorder struct { - bus Bus flushInterval time.Duration - shutdownCtx context.Context - logger *zap.SugaredLogger - mu sync.Mutex - contractSpendings map[types.FileContractID]api.ContractSpendingRecord - contractSpendingsFlushTimer *time.Timer + bus Bus + logger *zap.SugaredLogger + + mu sync.Mutex + contractSpendings map[types.FileContractID]api.ContractSpendingRecord + + flushCtx context.Context + flushTimer *time.Timer } ) -func (w *worker) initContractSpendingRecorder() { +var ( + _ ContractSpendingRecorder = (*contractSpendingRecorder)(nil) +) + +func (w *worker) initContractSpendingRecorder(flushInterval time.Duration) { if w.contractSpendingRecorder != nil { - panic("contractSpendingRecorder already initialized") // developer error + panic("ContractSpendingRecorder already initialized") // developer error } w.contractSpendingRecorder = &contractSpendingRecorder{ - bus: w.bus, + bus: w.bus, + logger: w.logger, + + flushCtx: w.shutdownCtx, + flushInterval: flushInterval, + contractSpendings: make(map[types.FileContractID]api.ContractSpendingRecord), - flushInterval: w.busFlushInterval, - shutdownCtx: w.shutdownCtx, - logger: w.logger, } } -// Record sends contract spending records to the bus. -func (sr *contractSpendingRecorder) Record(rev types.FileContractRevision, cs api.ContractSpending) { - sr.mu.Lock() - defer sr.mu.Unlock() +// Record stores the given contract spending record until it gets flushed to the bus. +func (r *contractSpendingRecorder) Record(rev types.FileContractRevision, cs api.ContractSpending) { + r.mu.Lock() + defer r.mu.Unlock() - // Update buffer. - csr, found := sr.contractSpendings[rev.ParentID] + // record the spending + csr, found := r.contractSpendings[rev.ParentID] if !found { csr = api.ContractSpendingRecord{ ContractID: rev.ParentID, @@ -61,41 +69,59 @@ func (sr *contractSpendingRecorder) Record(rev types.FileContractRevision, cs ap csr.ValidRenterPayout = rev.ValidRenterPayout() csr.MissedHostPayout = rev.MissedHostPayout() } - sr.contractSpendings[rev.ParentID] = csr + r.contractSpendings[rev.ParentID] = csr - // If a thread was scheduled to flush the buffer we are done. - if sr.contractSpendingsFlushTimer != nil { - return + // schedule flush + if r.flushTimer == nil { + r.flushTimer = time.AfterFunc(r.flushInterval, r.flush) + } +} + +// Stop stops the flush timer and flushes one last time. +func (r *contractSpendingRecorder) Stop(ctx context.Context) { + // stop the flush timer + r.mu.Lock() + if r.flushTimer != nil { + r.flushTimer.Stop() } - // Otherwise we schedule a flush. - sr.contractSpendingsFlushTimer = time.AfterFunc(sr.flushInterval, func() { - sr.mu.Lock() - sr.flush() - sr.mu.Unlock() - }) + r.flushCtx = ctx + r.mu.Unlock() + + // flush all interactions + r.flush() + + // log if we weren't able to flush them + r.mu.Lock() + if len(r.contractSpendings) > 0 { + r.logger.Errorw(fmt.Sprintf("failed to record %d contract spendings on worker shutdown", len(r.contractSpendings))) + } + r.mu.Unlock() } -func (sr *contractSpendingRecorder) flush() { - if len(sr.contractSpendings) > 0 { - records := make([]api.ContractSpendingRecord, 0, len(sr.contractSpendings)) - for _, cs := range sr.contractSpendings { +func (r *contractSpendingRecorder) flush() { + r.mu.Lock() + defer r.mu.Unlock() + + // NOTE: don't bother flushing if the context is cancelled, we can safely + // ignore the buffered records since we'll flush on shutdown and log in case + // we weren't able to flush all spendings o the bus + select { + case <-r.flushCtx.Done(): + r.flushTimer = nil + return + default: + } + + if len(r.contractSpendings) > 0 { + records := make([]api.ContractSpendingRecord, 0, len(r.contractSpendings)) + for _, cs := range r.contractSpendings { records = append(records, cs) } - if err := sr.bus.RecordContractSpending(sr.shutdownCtx, records); err != nil { - sr.logger.Errorw(fmt.Sprintf("failed to record contract spending: %v", err)) + if err := r.bus.RecordContractSpending(r.flushCtx, records); err != nil { + r.logger.Errorw(fmt.Sprintf("failed to record contract spending: %v", err)) } else { - sr.contractSpendings = make(map[types.FileContractID]api.ContractSpendingRecord) + r.contractSpendings = make(map[types.FileContractID]api.ContractSpendingRecord) } } - sr.contractSpendingsFlushTimer = nil -} - -// Stop stops the flush timer. -func (sr *contractSpendingRecorder) Stop() { - sr.mu.Lock() - defer sr.mu.Unlock() - if sr.contractSpendingsFlushTimer != nil { - sr.contractSpendingsFlushTimer.Stop() - sr.flush() - } + r.flushTimer = nil } diff --git a/worker/upload.go b/worker/upload.go index aebd341a4..52b28fc2d 100644 --- a/worker/upload.go +++ b/worker/upload.go @@ -30,7 +30,6 @@ const ( var ( errNoCandidateUploader = errors.New("no candidate uploader found") errNotEnoughContracts = errors.New("not enough contracts to support requested redundancy") - errWorkerShutDown = errors.New("worker was shut down") errUploadInterrupted = errors.New("upload was interrupted") ) @@ -573,7 +572,7 @@ func (mgr *uploadManager) Upload(ctx context.Context, r io.Reader, contracts []a for len(responses) < numSlabs { select { case <-mgr.shutdownCtx.Done(): - return false, "", errWorkerShutDown + return false, "", ErrShuttingDown case <-ctx.Done(): return false, "", errUploadInterrupted case numSlabs = <-numSlabsChan: diff --git a/worker/worker.go b/worker/worker.go index f8ce65d2b..013df1a3e 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -51,6 +51,10 @@ const ( lockingPriorityBackgroundUpload = 5 ) +var ( + ErrShuttingDown = errors.New("worker is shutting down") +) + // re-export the client type Client struct { *client.Client @@ -189,26 +193,21 @@ type worker struct { downloadManager *downloadManager uploadManager *uploadManager - accounts *accounts - priceTables *priceTables - - busFlushInterval time.Duration + accounts *accounts + priceTables *priceTables + transportPoolV3 *transportPoolV3 uploadsMu sync.Mutex uploadingPackedSlabs map[string]bool - interactionsMu sync.Mutex - interactionsScans []hostdb.HostScan - interactionsPriceTableUpdates []hostdb.PriceTableUpdate - interactionsFlushTimer *time.Timer - - contractSpendingRecorder *contractSpendingRecorder + hostInteractionRecorder HostInteractionRecorder + contractSpendingRecorder ContractSpendingRecorder contractLockingDuration time.Duration - transportPoolV3 *transportPoolV3 - logger *zap.SugaredLogger shutdownCtx context.Context shutdownCtxCancel context.CancelFunc + + logger *zap.SugaredLogger } func (w *worker) withRevision(ctx context.Context, fetchTimeout time.Duration, fcid types.FileContractID, hk types.PublicKey, siamuxAddr string, lockPriority int, blockHeight uint64, fn func(rev types.FileContractRevision) error) error { @@ -225,7 +224,7 @@ func (w *worker) withRevision(ctx context.Context, fetchTimeout time.Duration, f func (w *worker) registerAlert(a alerts.Alert) { ctx, cancel := context.WithTimeout(w.shutdownCtx, time.Minute) if err := w.alerts.RegisterAlert(ctx, a); err != nil { - w.logger.Error("failed to register alert", err) + w.logger.Errorf("failed to register alert, err: %v", err) } cancel() } @@ -343,7 +342,7 @@ func (w *worker) rhpPriceTableHandler(jc jape.Context) { var err error var hpt hostdb.HostPriceTable defer func() { - InteractionRecorderFromContext(ctx).RecordPriceTableUpdate(hostdb.PriceTableUpdate{ + HostInteractionRecorderFromContext(ctx).RecordPriceTableUpdate(hostdb.PriceTableUpdate{ HostKey: rptr.HostKey, Success: isSuccessfulInteraction(err), Timestamp: time.Now(), @@ -927,11 +926,11 @@ func (w *worker) objectsHandlerGET(jc jape.Context) { downloadFn := func(wr io.Writer, offset, length int64) (err error) { ctx = WithGougingChecker(ctx, w.bus, gp) err = w.downloadManager.DownloadObject(ctx, wr, res.Object.Object, uint64(offset), uint64(length), contracts) - if err != nil && !(errors.Is(err, errDownloadManagerStopped) || - errors.Is(err, errNotEnoughContracts) || - errors.Is(err, context.Canceled)) { + if err != nil { w.logger.Error(err) - w.registerAlert(newDownloadFailedAlert(bucket, path, prefix, marker, offset, length, int64(len(contracts)), err)) + if !errors.Is(err, ErrShuttingDown) { + w.registerAlert(newDownloadFailedAlert(bucket, path, prefix, marker, offset, length, int64(len(contracts)), err)) + } } return } @@ -1043,11 +1042,11 @@ func (w *worker) objectsHandlerPUT(jc jape.Context) { params := defaultParameters(bucket, path) eTag, err := w.upload(ctx, jc.Request.Body, contracts, params, opts...) if err := jc.Check("couldn't upload object", err); err != nil { - if err != nil && !(errors.Is(err, errWorkerShutDown) || - errors.Is(err, errNotEnoughContracts) || - errors.Is(err, context.Canceled)) { + if err != nil { w.logger.Error(err) - w.registerAlert(newUploadFailedAlert(bucket, path, up.ContractSet, mimeType, rs.MinShards, rs.TotalShards, len(contracts), up.UploadPacking, false, err)) + if !errors.Is(err, ErrShuttingDown) { + w.registerAlert(newUploadFailedAlert(bucket, path, up.ContractSet, mimeType, rs.MinShards, rs.TotalShards, len(contracts), up.UploadPacking, false, err)) + } } return } @@ -1182,11 +1181,11 @@ func (w *worker) multipartUploadHandlerPUT(jc jape.Context) { params := multipartParameters(bucket, path, uploadID, partNumber) eTag, err := w.upload(ctx, jc.Request.Body, contracts, params, opts...) if jc.Check("couldn't upload object", err) != nil { - if err != nil && !(errors.Is(err, errWorkerShutDown) || - errors.Is(err, errNotEnoughContracts) || - errors.Is(err, context.Canceled)) { + if err != nil { w.logger.Error(err) - w.registerAlert(newUploadFailedAlert(bucket, path, up.ContractSet, "", rs.MinShards, rs.TotalShards, len(contracts), up.UploadPacking, true, err)) + if !errors.Is(err, ErrShuttingDown) { + w.registerAlert(newUploadFailedAlert(bucket, path, up.ContractSet, "", rs.MinShards, rs.TotalShards, len(contracts), up.UploadPacking, true, err)) + } } return } @@ -1307,7 +1306,6 @@ func New(masterKey [32]byte, id string, b Bus, contractLockingDuration, busFlush id: id, bus: b, masterKey: masterKey, - busFlushInterval: busFlushInterval, logger: l.Sugar().Named("worker").Named(id), startTime: time.Now(), uploadingPackedSlabs: make(map[string]bool), @@ -1315,17 +1313,20 @@ func New(masterKey [32]byte, id string, b Bus, contractLockingDuration, busFlush shutdownCtxCancel: shutdownCtxCancel, } w.initAccounts(b) - w.initContractSpendingRecorder() - w.initDownloadManager(downloadMaxMemory, downloadMaxOverdrive, downloadOverdriveTimeout, l.Sugar().Named("downloadmanager")) w.initPriceTables() w.initTransportPool() + + w.initDownloadManager(downloadMaxMemory, downloadMaxOverdrive, downloadOverdriveTimeout, l.Sugar().Named("downloadmanager")) w.initUploadManager(uploadMaxMemory, uploadMaxOverdrive, uploadOverdriveTimeout, l.Sugar().Named("uploadmanager")) + + w.initContractSpendingRecorder(busFlushInterval) + w.initHostInteractionRecorder(busFlushInterval) return w, nil } // Handler returns an HTTP handler that serves the worker API. func (w *worker) Handler() http.Handler { - return jape.Mux(interactionMiddleware(w, map[string]jape.Handler{ + return jape.Mux(interactionMiddleware(w.hostInteractionRecorder, map[string]jape.Handler{ "GET /account/:hostkey": w.accountHandlerGET, "GET /id": w.idHandlerGET, @@ -1357,32 +1358,24 @@ func (w *worker) Handler() http.Handler { } // Shutdown shuts down the worker. -func (w *worker) Shutdown(_ context.Context) error { - w.interactionsMu.Lock() - if w.interactionsFlushTimer != nil { - w.interactionsFlushTimer.Stop() - w.flushInteractions() - } - w.interactionsMu.Unlock() - - // Cancel shutdown context. +func (w *worker) Shutdown(ctx context.Context) error { + // cancel shutdown context w.shutdownCtxCancel() - // Stop contract spending recorder. - w.contractSpendingRecorder.Stop() - - // Stop the downloader. + // stop uploads and downloads w.downloadManager.Stop() - - // Stop the uploader. w.uploadManager.Stop() + + // stop recorders + w.hostInteractionRecorder.Stop(ctx) + w.contractSpendingRecorder.Stop(ctx) return nil } func (w *worker) scanHost(ctx context.Context, hostKey types.PublicKey, hostIP string) (settings rhpv2.HostSettings, pt rhpv3.HostPriceTable, elapsed time.Duration, err error) { // record host scan defer func() { - InteractionRecorderFromContext(ctx).RecordHostScan(hostdb.HostScan{ + HostInteractionRecorderFromContext(ctx).RecordHostScan(hostdb.HostScan{ HostKey: hostKey, Success: isSuccessfulInteraction(err), Timestamp: time.Now(),