From 1365adf6221d431d648fc732a4a9eb5c974669fc Mon Sep 17 00:00:00 2001 From: PJ Date: Tue, 16 Jul 2024 14:16:38 +0200 Subject: [PATCH] bus: do not prevent startup if price updates fail temporarily --- bus/bus.go | 6 +-- cmd/renterd/main.go | 6 ++- internal/bus/alerts.go | 25 ++++++++++ internal/bus/pinmanager.go | 23 ++++----- internal/bus/pinmanager_test.go | 88 +++++++++++++++++++++++++++++---- internal/node/node.go | 2 +- internal/test/e2e/cluster.go | 4 +- 7 files changed, 125 insertions(+), 29 deletions(-) create mode 100644 internal/bus/alerts.go diff --git a/bus/bus.go b/bus/bus.go index 1c33dc602..38bad6ff9 100644 --- a/bus/bus.go +++ b/bus/bus.go @@ -402,8 +402,8 @@ func (b *bus) Handler() http.Handler { } // Setup starts the pin manager. -func (b *bus) Setup(ctx context.Context) error { - return b.pinMgr.Run(ctx) +func (b *bus) Setup() { + b.pinMgr.Run() } // Shutdown shuts down the bus. @@ -2602,7 +2602,7 @@ func New(am *alerts.Manager, whm *webhooks.Manager, cm ChainManager, cs ChainSto startTime: time.Now(), } - b.pinMgr = ibus.NewPinManager(whm, as, ss, defaultPinUpdateInterval, defaultPinRateWindow, b.logger.Desugar()) + b.pinMgr = ibus.NewPinManager(b.alerts, whm, as, ss, defaultPinUpdateInterval, defaultPinRateWindow, b.logger.Desugar()) // ensure we don't hang indefinitely ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) diff --git a/cmd/renterd/main.go b/cmd/renterd/main.go index fd6776d48..632bccf9c 100644 --- a/cmd/renterd/main.go +++ b/cmd/renterd/main.go @@ -520,7 +520,11 @@ func main() { if err != nil { logger.Fatal("failed to create bus, err: " + err.Error()) } - setupBusFn = setupFn + setupBusFn = func(_ context.Context) error { + setupFn() + return nil + } + shutdownFns = append(shutdownFns, shutdownFnEntry{ name: "Bus", fn: shutdownFn, diff --git a/internal/bus/alerts.go b/internal/bus/alerts.go new file mode 100644 index 000000000..ef06d230c --- /dev/null +++ b/internal/bus/alerts.go @@ -0,0 +1,25 @@ +package bus + +import ( + "fmt" + "time" + + "go.sia.tech/renterd/alerts" +) + +var ( + alertPricePinningID = alerts.RandomAlertID() // constant until restarted +) + +func newPricePinningFailedAlert(err error) alerts.Alert { + return alerts.Alert{ + ID: alertPricePinningID, + Severity: alerts.SeverityWarning, + Message: "Price pinning failed", + Data: map[string]any{ + "error": err.Error(), + "hint": fmt.Sprintf("This might happen when the forex API is temporarily unreachable. This alert will disappear the next time prices were updated successfully"), + }, + Timestamp: time.Now(), + } +} diff --git a/internal/bus/pinmanager.go b/internal/bus/pinmanager.go index 21591b21c..8fbe33e4f 100644 --- a/internal/bus/pinmanager.go +++ b/internal/bus/pinmanager.go @@ -11,6 +11,7 @@ import ( "github.com/montanaflynn/stats" "github.com/shopspring/decimal" "go.sia.tech/core/types" + "go.sia.tech/renterd/alerts" "go.sia.tech/renterd/api" "go.sia.tech/renterd/webhooks" "go.uber.org/zap" @@ -26,7 +27,7 @@ type ( // PinManager is a service that manages price pinning. PinManager interface { Close(context.Context) error - Run(context.Context) error + Run() TriggerUpdate() } @@ -39,6 +40,7 @@ type ( type ( pinManager struct { + a alerts.Alerter as AutopilotStore ss SettingStore broadcaster webhooks.Broadcaster @@ -58,8 +60,12 @@ type ( } ) -func NewPinManager(broadcaster webhooks.Broadcaster, as AutopilotStore, ss SettingStore, updateInterval, rateWindow time.Duration, l *zap.Logger) *pinManager { +// NewPinManager returns a new PinManager, responsible for pinning prices to a +// fixed value in an underlying currency. Note that the manager that is being +// returned is not running, this can be done by calling Run(). +func NewPinManager(alerts alerts.Alerter, broadcaster webhooks.Broadcaster, as AutopilotStore, ss SettingStore, updateInterval, rateWindow time.Duration, l *zap.Logger) *pinManager { return &pinManager{ + a: alerts, as: as, ss: ss, broadcaster: broadcaster, @@ -91,13 +97,7 @@ func (pm *pinManager) Close(ctx context.Context) error { } } -func (pm *pinManager) Run(ctx context.Context) error { - // try to update prices - if err := pm.updatePrices(ctx, true); err != nil { - return err - } - - // start the update loop +func (pm *pinManager) Run() { pm.wg.Add(1) go func() { defer pm.wg.Done() @@ -111,6 +111,9 @@ func (pm *pinManager) Run(ctx context.Context) error { err := pm.updatePrices(ctx, forced) if err != nil { pm.logger.Warn("failed to update prices", zap.Error(err)) + pm.a.RegisterAlert(ctx, newPricePinningFailedAlert(err)) + } else { + pm.a.DismissAlerts(ctx, alertPricePinningID) } cancel() @@ -124,8 +127,6 @@ func (pm *pinManager) Run(ctx context.Context) error { } } }() - - return nil } func (pm *pinManager) TriggerUpdate() { diff --git a/internal/bus/pinmanager_test.go b/internal/bus/pinmanager_test.go index a2af6e137..66e1c4867 100644 --- a/internal/bus/pinmanager_test.go +++ b/internal/bus/pinmanager_test.go @@ -14,6 +14,7 @@ import ( "github.com/shopspring/decimal" "go.sia.tech/core/types" "go.sia.tech/hostd/host/settings/pin" + "go.sia.tech/renterd/alerts" "go.sia.tech/renterd/api" "go.sia.tech/renterd/build" "go.sia.tech/renterd/webhooks" @@ -25,6 +26,43 @@ const ( testUpdateInterval = 100 * time.Millisecond ) +type mockAlerter struct { + mu sync.Mutex + alerts []alerts.Alert +} + +func (ma *mockAlerter) Alerts(ctx context.Context, opts alerts.AlertsOpts) (resp alerts.AlertsResponse, err error) { + ma.mu.Lock() + defer ma.mu.Unlock() + return alerts.AlertsResponse{Alerts: ma.alerts}, nil +} + +func (ma *mockAlerter) RegisterAlert(_ context.Context, a alerts.Alert) error { + ma.mu.Lock() + defer ma.mu.Unlock() + for _, alert := range ma.alerts { + if alert.ID == a.ID { + return nil + } + } + ma.alerts = append(ma.alerts, a) + return nil +} + +func (ma *mockAlerter) DismissAlerts(_ context.Context, ids ...types.Hash256) error { + ma.mu.Lock() + defer ma.mu.Unlock() + for _, id := range ids { + for i, a := range ma.alerts { + if a.ID == id { + ma.alerts = append(ma.alerts[:i], ma.alerts[i+1:]...) + break + } + } + } + return nil +} + type mockBroadcaster struct { events []webhooks.Event } @@ -37,8 +75,9 @@ func (meb *mockBroadcaster) BroadcastAction(ctx context.Context, e webhooks.Even type mockForexAPI struct { s *httptest.Server - mu sync.Mutex - rate float64 + mu sync.Mutex + rate float64 + unreachable bool } func newTestForexAPI() *mockForexAPI { @@ -46,6 +85,10 @@ func newTestForexAPI() *mockForexAPI { api.s = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { api.mu.Lock() defer api.mu.Unlock() + if api.unreachable { + w.WriteHeader(http.StatusInternalServerError) + return + } json.NewEncoder(w).Encode(api.rate) })) return api @@ -55,12 +98,18 @@ func (api *mockForexAPI) Close() { api.s.Close() } -func (api *mockForexAPI) updateRate(rate float64) { +func (api *mockForexAPI) setRate(rate float64) { api.mu.Lock() defer api.mu.Unlock() api.rate = rate } +func (api *mockForexAPI) setUnreachable(unreachable bool) { + api.mu.Lock() + defer api.mu.Unlock() + api.unreachable = unreachable +} + type mockStore struct { mu sync.Mutex settings map[string]string @@ -140,16 +189,15 @@ func TestPinManager(t *testing.T) { // mock dependencies ms := newTestStore() eb := &mockBroadcaster{} + a := &mockAlerter{} // mock forex api forex := newTestForexAPI() defer forex.Close() - // start a pinmanager - pm := NewPinManager(eb, ms, ms, testUpdateInterval, time.Minute, zap.NewNop()) - if err := pm.Run(context.Background()); err != nil { - t.Fatal(err) - } + // create a pinmanager + pm := NewPinManager(a, eb, ms, ms, testUpdateInterval, time.Minute, zap.NewNop()) + pm.Run() defer func() { if err := pm.Close(context.Background()); err != nil { t.Fatal(err) @@ -183,7 +231,7 @@ func TestPinManager(t *testing.T) { } // update exchange rate and fetch current gouging settings - forex.updateRate(2.5) + forex.setRate(2.5) gs := ms.gougingSettings() // configure all pins but disable them for now @@ -228,7 +276,7 @@ func TestPinManager(t *testing.T) { } // increase rate so average isn't catching up to us - forex.updateRate(3) + forex.setRate(3) // fetch autopilot ap, _ := ms.Autopilot(context.Background(), testAutopilotID) @@ -257,6 +305,26 @@ func TestPinManager(t *testing.T) { if app, _ := ms.Autopilot(context.Background(), testAutopilotID); app.Config.Contracts.Allowance.Equals(ap.Config.Contracts.Allowance) { t.Fatalf("expected autopilot to be updated, got %v = %v", app.Config.Contracts.Allowance, ap.Config.Contracts.Allowance) } + + // make forex API return an error + forex.setUnreachable(true) + + // assert alert was registered + ms.updatPinnedSettings(pps) + res, _ := a.Alerts(context.Background(), alerts.AlertsOpts{}) + if len(res.Alerts) == 0 { + t.Fatalf("expected 1 alert, got %d", len(a.alerts)) + } + + // make forex API return a valid response + forex.setUnreachable(false) + + // assert alert was dismissed + ms.updatPinnedSettings(pps) + res, _ = a.Alerts(context.Background(), alerts.AlertsOpts{}) + if len(res.Alerts) != 0 { + t.Fatalf("expected 0 alerts, got %d", len(a.alerts)) + } } // TestConvertConvertCurrencyToSC tests the conversion of a currency to Siacoins. diff --git a/internal/node/node.go b/internal/node/node.go index 4a3575b3d..dc9416e92 100644 --- a/internal/node/node.go +++ b/internal/node/node.go @@ -63,7 +63,7 @@ type AutopilotConfig struct { type ( RunFn = func() error - BusSetupFn = func(context.Context) error + BusSetupFn = func() WorkerSetupFn = func(context.Context, string, string) error ShutdownFn = func(context.Context) error ) diff --git a/internal/test/e2e/cluster.go b/internal/test/e2e/cluster.go index c4856332f..88bf606ad 100644 --- a/internal/test/e2e/cluster.go +++ b/internal/test/e2e/cluster.go @@ -414,9 +414,7 @@ func newTestCluster(t *testing.T, opts testClusterOptions) *TestCluster { } // Finish bus setup. - if err := bSetupFn(ctx); err != nil { - tt.Fatalf("failed to setup bus, err: %v", err) - } + bSetupFn() // Finish worker setup. if err := wSetupFn(ctx, workerAddr, workerPassword); err != nil {