Skip to content

Commit

Permalink
bus: do not prevent startup if price updates fail temporarily
Browse files Browse the repository at this point in the history
  • Loading branch information
peterjan authored and ChrisSchinnerl committed Jul 29, 2024
1 parent d163099 commit 1365adf
Show file tree
Hide file tree
Showing 7 changed files with 125 additions and 29 deletions.
6 changes: 3 additions & 3 deletions bus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,8 +402,8 @@ func (b *bus) Handler() http.Handler {
}

// Setup starts the pin manager.
func (b *bus) Setup(ctx context.Context) error {
return b.pinMgr.Run(ctx)
func (b *bus) Setup() {
b.pinMgr.Run()
}

// Shutdown shuts down the bus.
Expand Down Expand Up @@ -2602,7 +2602,7 @@ func New(am *alerts.Manager, whm *webhooks.Manager, cm ChainManager, cs ChainSto
startTime: time.Now(),
}

b.pinMgr = ibus.NewPinManager(whm, as, ss, defaultPinUpdateInterval, defaultPinRateWindow, b.logger.Desugar())
b.pinMgr = ibus.NewPinManager(b.alerts, whm, as, ss, defaultPinUpdateInterval, defaultPinRateWindow, b.logger.Desugar())

// ensure we don't hang indefinitely
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
Expand Down
6 changes: 5 additions & 1 deletion cmd/renterd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,11 @@ func main() {
if err != nil {
logger.Fatal("failed to create bus, err: " + err.Error())
}
setupBusFn = setupFn
setupBusFn = func(_ context.Context) error {
setupFn()
return nil
}

shutdownFns = append(shutdownFns, shutdownFnEntry{
name: "Bus",
fn: shutdownFn,
Expand Down
25 changes: 25 additions & 0 deletions internal/bus/alerts.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package bus

import (
"fmt"
"time"

"go.sia.tech/renterd/alerts"
)

var (
alertPricePinningID = alerts.RandomAlertID() // constant until restarted
)

func newPricePinningFailedAlert(err error) alerts.Alert {
return alerts.Alert{
ID: alertPricePinningID,
Severity: alerts.SeverityWarning,
Message: "Price pinning failed",
Data: map[string]any{
"error": err.Error(),
"hint": fmt.Sprintf("This might happen when the forex API is temporarily unreachable. This alert will disappear the next time prices were updated successfully"),
},
Timestamp: time.Now(),
}
}
23 changes: 12 additions & 11 deletions internal/bus/pinmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/montanaflynn/stats"
"github.com/shopspring/decimal"
"go.sia.tech/core/types"
"go.sia.tech/renterd/alerts"
"go.sia.tech/renterd/api"
"go.sia.tech/renterd/webhooks"
"go.uber.org/zap"
Expand All @@ -26,7 +27,7 @@ type (
// PinManager is a service that manages price pinning.
PinManager interface {
Close(context.Context) error
Run(context.Context) error
Run()
TriggerUpdate()
}

Expand All @@ -39,6 +40,7 @@ type (

type (
pinManager struct {
a alerts.Alerter
as AutopilotStore
ss SettingStore
broadcaster webhooks.Broadcaster
Expand All @@ -58,8 +60,12 @@ type (
}
)

func NewPinManager(broadcaster webhooks.Broadcaster, as AutopilotStore, ss SettingStore, updateInterval, rateWindow time.Duration, l *zap.Logger) *pinManager {
// NewPinManager returns a new PinManager, responsible for pinning prices to a
// fixed value in an underlying currency. Note that the manager that is being
// returned is not running, this can be done by calling Run().
func NewPinManager(alerts alerts.Alerter, broadcaster webhooks.Broadcaster, as AutopilotStore, ss SettingStore, updateInterval, rateWindow time.Duration, l *zap.Logger) *pinManager {
return &pinManager{
a: alerts,
as: as,
ss: ss,
broadcaster: broadcaster,
Expand Down Expand Up @@ -91,13 +97,7 @@ func (pm *pinManager) Close(ctx context.Context) error {
}
}

func (pm *pinManager) Run(ctx context.Context) error {
// try to update prices
if err := pm.updatePrices(ctx, true); err != nil {
return err
}

// start the update loop
func (pm *pinManager) Run() {
pm.wg.Add(1)
go func() {
defer pm.wg.Done()
Expand All @@ -111,6 +111,9 @@ func (pm *pinManager) Run(ctx context.Context) error {
err := pm.updatePrices(ctx, forced)
if err != nil {
pm.logger.Warn("failed to update prices", zap.Error(err))
pm.a.RegisterAlert(ctx, newPricePinningFailedAlert(err))
} else {
pm.a.DismissAlerts(ctx, alertPricePinningID)
}
cancel()

Expand All @@ -124,8 +127,6 @@ func (pm *pinManager) Run(ctx context.Context) error {
}
}
}()

return nil
}

func (pm *pinManager) TriggerUpdate() {
Expand Down
88 changes: 78 additions & 10 deletions internal/bus/pinmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/shopspring/decimal"
"go.sia.tech/core/types"
"go.sia.tech/hostd/host/settings/pin"
"go.sia.tech/renterd/alerts"
"go.sia.tech/renterd/api"
"go.sia.tech/renterd/build"
"go.sia.tech/renterd/webhooks"
Expand All @@ -25,6 +26,43 @@ const (
testUpdateInterval = 100 * time.Millisecond
)

type mockAlerter struct {
mu sync.Mutex
alerts []alerts.Alert
}

func (ma *mockAlerter) Alerts(ctx context.Context, opts alerts.AlertsOpts) (resp alerts.AlertsResponse, err error) {
ma.mu.Lock()
defer ma.mu.Unlock()
return alerts.AlertsResponse{Alerts: ma.alerts}, nil
}

func (ma *mockAlerter) RegisterAlert(_ context.Context, a alerts.Alert) error {
ma.mu.Lock()
defer ma.mu.Unlock()
for _, alert := range ma.alerts {
if alert.ID == a.ID {
return nil
}
}
ma.alerts = append(ma.alerts, a)
return nil
}

func (ma *mockAlerter) DismissAlerts(_ context.Context, ids ...types.Hash256) error {
ma.mu.Lock()
defer ma.mu.Unlock()
for _, id := range ids {
for i, a := range ma.alerts {
if a.ID == id {
ma.alerts = append(ma.alerts[:i], ma.alerts[i+1:]...)
break
}
}
}
return nil
}

type mockBroadcaster struct {
events []webhooks.Event
}
Expand All @@ -37,15 +75,20 @@ func (meb *mockBroadcaster) BroadcastAction(ctx context.Context, e webhooks.Even
type mockForexAPI struct {
s *httptest.Server

mu sync.Mutex
rate float64
mu sync.Mutex
rate float64
unreachable bool
}

func newTestForexAPI() *mockForexAPI {
api := &mockForexAPI{rate: 1}
api.s = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
api.mu.Lock()
defer api.mu.Unlock()
if api.unreachable {
w.WriteHeader(http.StatusInternalServerError)
return
}
json.NewEncoder(w).Encode(api.rate)
}))
return api
Expand All @@ -55,12 +98,18 @@ func (api *mockForexAPI) Close() {
api.s.Close()
}

func (api *mockForexAPI) updateRate(rate float64) {
func (api *mockForexAPI) setRate(rate float64) {
api.mu.Lock()
defer api.mu.Unlock()
api.rate = rate
}

func (api *mockForexAPI) setUnreachable(unreachable bool) {
api.mu.Lock()
defer api.mu.Unlock()
api.unreachable = unreachable
}

type mockStore struct {
mu sync.Mutex
settings map[string]string
Expand Down Expand Up @@ -140,16 +189,15 @@ func TestPinManager(t *testing.T) {
// mock dependencies
ms := newTestStore()
eb := &mockBroadcaster{}
a := &mockAlerter{}

// mock forex api
forex := newTestForexAPI()
defer forex.Close()

// start a pinmanager
pm := NewPinManager(eb, ms, ms, testUpdateInterval, time.Minute, zap.NewNop())
if err := pm.Run(context.Background()); err != nil {
t.Fatal(err)
}
// create a pinmanager
pm := NewPinManager(a, eb, ms, ms, testUpdateInterval, time.Minute, zap.NewNop())
pm.Run()
defer func() {
if err := pm.Close(context.Background()); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -183,7 +231,7 @@ func TestPinManager(t *testing.T) {
}

// update exchange rate and fetch current gouging settings
forex.updateRate(2.5)
forex.setRate(2.5)
gs := ms.gougingSettings()

// configure all pins but disable them for now
Expand Down Expand Up @@ -228,7 +276,7 @@ func TestPinManager(t *testing.T) {
}

// increase rate so average isn't catching up to us
forex.updateRate(3)
forex.setRate(3)

// fetch autopilot
ap, _ := ms.Autopilot(context.Background(), testAutopilotID)
Expand Down Expand Up @@ -257,6 +305,26 @@ func TestPinManager(t *testing.T) {
if app, _ := ms.Autopilot(context.Background(), testAutopilotID); app.Config.Contracts.Allowance.Equals(ap.Config.Contracts.Allowance) {
t.Fatalf("expected autopilot to be updated, got %v = %v", app.Config.Contracts.Allowance, ap.Config.Contracts.Allowance)
}

// make forex API return an error
forex.setUnreachable(true)

// assert alert was registered
ms.updatPinnedSettings(pps)
res, _ := a.Alerts(context.Background(), alerts.AlertsOpts{})
if len(res.Alerts) == 0 {
t.Fatalf("expected 1 alert, got %d", len(a.alerts))
}

// make forex API return a valid response
forex.setUnreachable(false)

// assert alert was dismissed
ms.updatPinnedSettings(pps)
res, _ = a.Alerts(context.Background(), alerts.AlertsOpts{})
if len(res.Alerts) != 0 {
t.Fatalf("expected 0 alerts, got %d", len(a.alerts))
}
}

// TestConvertConvertCurrencyToSC tests the conversion of a currency to Siacoins.
Expand Down
2 changes: 1 addition & 1 deletion internal/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ type AutopilotConfig struct {

type (
RunFn = func() error
BusSetupFn = func(context.Context) error
BusSetupFn = func()
WorkerSetupFn = func(context.Context, string, string) error
ShutdownFn = func(context.Context) error
)
Expand Down
4 changes: 1 addition & 3 deletions internal/test/e2e/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,9 +414,7 @@ func newTestCluster(t *testing.T, opts testClusterOptions) *TestCluster {
}

// Finish bus setup.
if err := bSetupFn(ctx); err != nil {
tt.Fatalf("failed to setup bus, err: %v", err)
}
bSetupFn()

// Finish worker setup.
if err := wSetupFn(ctx, workerAddr, workerPassword); err != nil {
Expand Down

0 comments on commit 1365adf

Please sign in to comment.