Skip to content

Commit

Permalink
alerts: add alert manager with an initial alert during migrations
Browse files Browse the repository at this point in the history
  • Loading branch information
ChrisSchinnerl committed Jul 6, 2023
1 parent 10fb1d0 commit 3e52ce7
Show file tree
Hide file tree
Showing 5 changed files with 229 additions and 18 deletions.
136 changes: 136 additions & 0 deletions alerts/alerts.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
package alerts

import (
"fmt"
"sort"
"strings"
"sync"
"time"

"go.sia.tech/core/types"
)

const (
// SeverityInfo indicates that the alert is informational.
SeverityInfo Severity = iota + 1
// SeverityWarning indicates that the alert is a warning.
SeverityWarning
// SeverityError indicates that the alert is an error.
SeverityError
// SeverityCritical indicates that the alert is critical.
SeverityCritical

severityInfoStr = "info"
severityWarningStr = "warning"
severityErrorStr = "error"
severityCriticalStr = "critical"
)

type (
// Severity indicates the severity of an alert.
Severity uint8

// An Alert is a dismissible message that is displayed to the user.
Alert struct {
// ID is a unique identifier for the alert.
ID types.Hash256 `json:"id"`
// Severity is the severity of the alert.
Severity Severity `json:"severity"`
// Message is a human-readable message describing the alert.
Message string `json:"message"`
// Data is a map of arbitrary data that can be used to provide
// additional context to the alert.
Data map[string]any `json:"data,omitempty"`
Timestamp time.Time `json:"timestamp"`
}

// A Manager manages the host's alerts.
Manager struct {
mu sync.Mutex
// alerts is a map of alert IDs to their current alert.
alerts map[types.Hash256]Alert
}
)

// String implements the fmt.Stringer interface.
func (s Severity) String() string {
switch s {
case SeverityInfo:
return severityInfoStr
case SeverityWarning:
return severityWarningStr
case SeverityError:
return severityErrorStr
case SeverityCritical:
return severityCriticalStr
default:
panic(fmt.Sprintf("unrecognized severity %d", s))
}
}

// MarshalJSON implements the json.Marshaler interface.
func (s Severity) MarshalJSON() ([]byte, error) {
return []byte(fmt.Sprintf(`%q`, s.String())), nil
}

// UnmarshalJSON implements the json.Unmarshaler interface.
func (s *Severity) UnmarshalJSON(b []byte) error {
status := strings.Trim(string(b), `"`)
switch status {
case severityInfoStr:
*s = SeverityInfo
case severityWarningStr:
*s = SeverityWarning
case severityErrorStr:
*s = SeverityError
case severityCriticalStr:
*s = SeverityCritical
default:
return fmt.Errorf("unrecognized severity: %v", status)
}
return nil
}

// Register registers a new alert with the manager
func (m *Manager) Register(a Alert) {
if a.ID == (types.Hash256{}) {
panic("cannot register alert with empty ID") // developer error
} else if a.Timestamp.IsZero() {
panic("cannot register alert with zero timestamp") // developer error
}

m.mu.Lock()
m.alerts[a.ID] = a
m.mu.Unlock()
}

// Dismiss removes the alerts with the given IDs.
func (m *Manager) Dismiss(ids ...types.Hash256) {
m.mu.Lock()
for _, id := range ids {
delete(m.alerts, id)
}
m.mu.Unlock()
}

// Active returns the host's active alerts.
func (m *Manager) Active() []Alert {
m.mu.Lock()
defer m.mu.Unlock()

alerts := make([]Alert, 0, len(m.alerts))
for _, a := range m.alerts {
alerts = append(alerts, a)
}
sort.Slice(alerts, func(i, j int) bool {
return alerts[i].Timestamp.After(alerts[j].Timestamp)
})
return alerts
}

// NewManager initializes a new alerts manager.
func NewManager() *Manager {
return &Manager{
alerts: make(map[types.Hash256]Alert),
}
}
40 changes: 30 additions & 10 deletions autopilot/autopilot.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
rhpv3 "go.sia.tech/core/rhp/v3"
"go.sia.tech/core/types"
"go.sia.tech/jape"
"go.sia.tech/renterd/alerts"
"go.sia.tech/renterd/api"
"go.sia.tech/renterd/hostdb"
"go.sia.tech/renterd/object"
Expand Down Expand Up @@ -101,10 +102,11 @@ type Autopilot struct {
synced bool
state state

a *accounts
c *contractor
m *migrator
s *scanner
alerts *alerts.Manager
a *accounts
c *contractor
m *migrator
s *scanner

tickerDuration time.Duration
wg sync.WaitGroup
Expand Down Expand Up @@ -162,12 +164,14 @@ func (wp *workerPool) withWorkers(workerFunc func([]Worker)) {
// Handler returns an HTTP handler that serves the autopilot api.
func (ap *Autopilot) Handler() http.Handler {
return jape.Mux(tracing.TracedRoutes("autopilot", map[string]jape.Handler{
"GET /config": ap.configHandlerGET,
"PUT /config": ap.configHandlerPUT,
"POST /debug/trigger": ap.triggerHandlerPOST,
"POST /hosts": ap.hostsHandlerPOST,
"GET /host/:hostKey": ap.hostHandlerGET,
"GET /status": ap.statusHandlerGET,
"GET /alerts": ap.handleGETAlerts,
"POST /alerts/dismiss": ap.handlePOSTAlertsDismiss,
"GET /config": ap.configHandlerGET,
"PUT /config": ap.configHandlerPUT,
"POST /debug/trigger": ap.triggerHandlerPOST,
"POST /hosts": ap.hostsHandlerPOST,
"GET /host/:hostKey": ap.hostHandlerGET,
"GET /status": ap.statusHandlerGET,
}))
}

Expand Down Expand Up @@ -481,6 +485,21 @@ func (ap *Autopilot) isStopped() bool {
}
}

func (ap *Autopilot) handleGETAlerts(c jape.Context) {
c.Encode(ap.alerts.Active())
}

func (ap *Autopilot) handlePOSTAlertsDismiss(c jape.Context) {
var ids []types.Hash256
if err := c.Decode(&ids); err != nil {
return
} else if len(ids) == 0 {
c.Error(errors.New("no alerts to dismiss"), http.StatusBadRequest)
return
}
ap.alerts.Dismiss(ids...)
}

func (ap *Autopilot) configHandlerGET(jc jape.Context) {
autopilot, err := ap.bus.Autopilot(jc.Request.Context(), ap.id)
if err != nil && strings.Contains(err.Error(), api.ErrAutopilotNotFound.Error()) {
Expand Down Expand Up @@ -527,6 +546,7 @@ func (ap *Autopilot) triggerHandlerPOST(jc jape.Context) {
// New initializes an Autopilot.
func New(id string, bus Bus, workers []Worker, logger *zap.Logger, heartbeat time.Duration, scannerScanInterval time.Duration, scannerBatchSize, scannerMinRecentFailures, scannerNumThreads uint64, migrationHealthCutoff float64, accountsRefillInterval time.Duration, revisionSubmissionBuffer uint64) (*Autopilot, error) {
ap := &Autopilot{
alerts: alerts.NewManager(),
id: id,
bus: bus,
logger: logger.Sugar().Named("autopilot"),
Expand Down
15 changes: 15 additions & 0 deletions autopilot/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,22 @@ package autopilot

import (
"context"
"fmt"
"math"
"sort"
"sync"
"time"

"go.sia.tech/renterd/alerts"
"go.sia.tech/renterd/api"
"go.sia.tech/renterd/object"
"go.sia.tech/renterd/tracing"
"go.uber.org/zap"
"lukechampine.com/frand"
)

var (
alertMigrationID = frand.Entropy256() // constnt across restarts
)

const (
Expand Down Expand Up @@ -173,6 +180,14 @@ OUTER:
// log the updated list of slabs to migrate
m.logger.Debugf("%d slabs to migrate", len(toMigrate))

// register an alert to notify users about ongoing migrations.
m.ap.alerts.Register(alerts.Alert{
ID: alertMigrationID,
Severity: alerts.SeverityInfo,
Message: fmt.Sprintf("Migrating %d slabs", len(toMigrate)),
Timestamp: time.Now(),
})

// return if there are no slabs to migrate
if len(toMigrate) == 0 {
return
Expand Down
36 changes: 28 additions & 8 deletions bus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
rhpv3 "go.sia.tech/core/rhp/v3"
"go.sia.tech/core/types"
"go.sia.tech/jape"
"go.sia.tech/renterd/alerts"
"go.sia.tech/renterd/api"
"go.sia.tech/renterd/build"
"go.sia.tech/renterd/hostdb"
Expand Down Expand Up @@ -131,14 +132,15 @@ type (
)

type bus struct {
s Syncer
cm ChainManager
tp TransactionPool
w Wallet
hdb HostDB
as AutopilotStore
ms MetadataStore
ss SettingStore
alerts *alerts.Manager
s Syncer
cm ChainManager
tp TransactionPool
w Wallet
hdb HostDB
as AutopilotStore
ms MetadataStore
ss SettingStore

eas EphemeralAccountStore

Expand Down Expand Up @@ -984,6 +986,21 @@ func (b *bus) gougingParams(ctx context.Context) (api.GougingParams, error) {
}, nil
}

func (b *bus) handleGETAlerts(c jape.Context) {
c.Encode(b.alerts.Active())
}

func (b *bus) handlePOSTAlertsDismiss(c jape.Context) {
var ids []types.Hash256
if err := c.Decode(&ids); err != nil {
return
} else if len(ids) == 0 {
c.Error(errors.New("no alerts to dismiss"), http.StatusBadRequest)
return
}
b.alerts.Dismiss(ids...)
}

func (b *bus) accountsHandlerGET(jc jape.Context) {
jc.Encode(b.accounts.Accounts())
}
Expand Down Expand Up @@ -1173,6 +1190,7 @@ func (b *bus) contractTaxHandlerGET(jc jape.Context) {
// New returns a new Bus.
func New(s Syncer, cm ChainManager, tp TransactionPool, w Wallet, hdb HostDB, as AutopilotStore, ms MetadataStore, ss SettingStore, eas EphemeralAccountStore, l *zap.Logger) (*bus, error) {
b := &bus{
alerts: alerts.NewManager(),
s: s,
cm: cm,
tp: tp,
Expand Down Expand Up @@ -1264,6 +1282,8 @@ func New(s Syncer, cm ChainManager, tp TransactionPool, w Wallet, hdb HostDB, as
// Handler returns an HTTP handler that serves the bus API.
func (b *bus) Handler() http.Handler {
return jape.Mux(tracing.TracedRoutes("bus", map[string]jape.Handler{
"GET /alerts": b.handleGETAlerts,
"POST /alerts/dismiss": b.handlePOSTAlertsDismiss,
"GET /accounts": b.accountsHandlerGET,
"POST /accounts/:id": b.accountHandlerGET,
"POST /accounts/:id/lock": b.accountsLockHandlerPOST,
Expand Down
20 changes: 20 additions & 0 deletions worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
rhpv3 "go.sia.tech/core/rhp/v3"
"go.sia.tech/core/types"
"go.sia.tech/jape"
"go.sia.tech/renterd/alerts"
"go.sia.tech/renterd/api"
"go.sia.tech/renterd/hostdb"
"go.sia.tech/renterd/metrics"
Expand Down Expand Up @@ -218,6 +219,7 @@ type hostProvider interface {
// A worker talks to Sia hosts to perform contract and storage operations within
// a renterd system.
type worker struct {
alerts *alerts.Manager
allowPrivateIPs bool
id string
bus Bus
Expand Down Expand Up @@ -989,6 +991,21 @@ func (w *worker) idHandlerGET(jc jape.Context) {
jc.Encode(w.id)
}

func (w *worker) handleGETAlerts(c jape.Context) {
c.Encode(w.alerts.Active())
}

func (w *worker) handlePOSTAlertsDismiss(c jape.Context) {
var ids []types.Hash256
if err := c.Decode(&ids); err != nil {
return
} else if len(ids) == 0 {
c.Error(errors.New("no alerts to dismiss"), http.StatusBadRequest)
return
}
w.alerts.Dismiss(ids...)
}

func (w *worker) accountHandlerGET(jc jape.Context) {
var hostKey types.PublicKey
if jc.DecodeParam("hostkey", &hostKey) != nil {
Expand All @@ -1014,6 +1031,7 @@ func New(masterKey [32]byte, id string, b Bus, contractLockingDuration, busFlush
}

w := &worker{
alerts: alerts.NewManager(),
allowPrivateIPs: allowPrivateIPs,
contractLockingDuration: contractLockingDuration,
id: id,
Expand All @@ -1034,6 +1052,8 @@ func New(masterKey [32]byte, id string, b Bus, contractLockingDuration, busFlush
// Handler returns an HTTP handler that serves the worker API.
func (w *worker) Handler() http.Handler {
return jape.Mux(tracing.TracedRoutes("worker", map[string]jape.Handler{
"GET /alerts": w.handleGETAlerts,
"POST /alerts/dismiss": w.handlePOSTAlertsDismiss,
"GET /account/:hostkey": w.accountHandlerGET,
"GET /id": w.idHandlerGET,

Expand Down

0 comments on commit 3e52ce7

Please sign in to comment.