From 9a44212f03d9545d359d551c38c3c00db9e70cbb Mon Sep 17 00:00:00 2001 From: PJ Date: Mon, 22 Jul 2024 13:26:52 +0200 Subject: [PATCH 1/5] worker: register event webhooks asynchronously --- bus/client/webhooks.go | 12 +- internal/worker/cache.go | 42 +++---- internal/worker/cache_test.go | 34 +++++- internal/worker/events.go | 186 +++++++++++++++++++++++++++++++ internal/worker/events_test.go | 198 +++++++++++++++++++++++++++++++++ worker/mocks_test.go | 4 + worker/worker.go | 46 ++++---- 7 files changed, 458 insertions(+), 64 deletions(-) create mode 100644 internal/worker/events.go create mode 100644 internal/worker/events_test.go diff --git a/bus/client/webhooks.go b/bus/client/webhooks.go index 769d1cf57..833c7c7e1 100644 --- a/bus/client/webhooks.go +++ b/bus/client/webhooks.go @@ -13,16 +13,12 @@ func (c *Client) BroadcastAction(ctx context.Context, action webhooks.Event) err return err } -// DeleteWebhook deletes the webhook with the given ID. -func (c *Client) DeleteWebhook(ctx context.Context, url, module, event string) error { - return c.c.POST("/webhook/delete", webhooks.Webhook{ - URL: url, - Module: module, - Event: event, - }, nil) +// UnregisterWebhook unregisters the given webhook. +func (c *Client) UnregisterWebhook(ctx context.Context, webhook webhooks.Webhook) error { + return c.c.POST("/webhook/delete", webhook, nil) } -// RegisterWebhook registers a new webhook for the given URL. +// RegisterWebhook registers the given webhook. func (c *Client) RegisterWebhook(ctx context.Context, webhook webhooks.Webhook) error { err := c.c.WithContext(ctx).POST("/webhooks", webhook, nil) return err diff --git a/internal/worker/cache.go b/internal/worker/cache.go index e223c82fe..c9abed3e8 100644 --- a/internal/worker/cache.go +++ b/internal/worker/cache.go @@ -75,14 +75,13 @@ type ( Bus interface { Contracts(ctx context.Context, opts api.ContractsOpts) ([]api.ContractMetadata, error) GougingParams(ctx context.Context) (api.GougingParams, error) - RegisterWebhook(ctx context.Context, wh webhooks.Webhook) error } WorkerCache interface { DownloadContracts(ctx context.Context) ([]api.ContractMetadata, error) GougingParams(ctx context.Context) (api.GougingParams, error) HandleEvent(event webhooks.Event) error - Initialize(ctx context.Context, workerAPI string, opts ...webhooks.HeaderOption) error + Subscribe(e EventManager) error } ) @@ -92,8 +91,8 @@ type cache struct { cache *memoryCache logger *zap.SugaredLogger - mu sync.Mutex - ready bool + mu sync.Mutex + readyChan chan struct{} } func NewCache(b Bus, logger *zap.Logger) WorkerCache { @@ -194,32 +193,27 @@ func (c *cache) HandleEvent(event webhooks.Event) (err error) { return } -func (c *cache) Initialize(ctx context.Context, workerAPI string, webhookOpts ...webhooks.HeaderOption) error { - eventsURL := fmt.Sprintf("%s/events", workerAPI) - headers := make(map[string]string) - for _, opt := range webhookOpts { - opt(headers) +func (c *cache) Subscribe(e EventManager) (err error) { + c.mu.Lock() + defer c.mu.Unlock() + if c.readyChan != nil { + return fmt.Errorf("already subscribed") } - for _, wh := range []webhooks.Webhook{ - api.WebhookConsensusUpdate(eventsURL, headers), - api.WebhookContractArchive(eventsURL, headers), - api.WebhookContractRenew(eventsURL, headers), - api.WebhookSettingUpdate(eventsURL, headers), - } { - if err := c.b.RegisterWebhook(ctx, wh); err != nil { - return fmt.Errorf("failed to register webhook '%s', err: %v", wh, err) - } + + c.readyChan, err = e.AddSubscriber(c.logger.Desugar().Name(), c) + if err != nil { + return fmt.Errorf("failed to subscribe to event manager, error: %v", err) } - c.mu.Lock() - c.ready = true - c.mu.Unlock() return nil } func (c *cache) isReady() bool { - c.mu.Lock() - defer c.mu.Unlock() - return c.ready + select { + case <-c.readyChan: + return true + default: + } + return false } func (c *cache) handleConsensusUpdate(event api.EventConsensusUpdate) { diff --git a/internal/worker/cache_test.go b/internal/worker/cache_test.go index e696ed02c..c65e31331 100644 --- a/internal/worker/cache_test.go +++ b/internal/worker/cache_test.go @@ -7,6 +7,7 @@ import ( "time" "go.sia.tech/core/types" + "go.sia.tech/jape" "go.sia.tech/renterd/api" "go.sia.tech/renterd/internal/test" "go.sia.tech/renterd/webhooks" @@ -26,7 +27,24 @@ func (m *mockBus) Contracts(ctx context.Context, opts api.ContractsOpts) ([]api. func (m *mockBus) GougingParams(ctx context.Context) (api.GougingParams, error) { return m.gougingParams, nil } -func (m *mockBus) RegisterWebhook(ctx context.Context, wh webhooks.Webhook) error { + +type mockEventManager struct { + readyChan chan struct{} +} + +func (m *mockEventManager) AddSubscriber(id string, s EventSubscriber) (chan struct{}, error) { + return m.readyChan, nil +} + +func (m *mockEventManager) Handler() jape.Handler { + return nil +} + +func (m *mockEventManager) Run(ctx context.Context, eventURL string, opts ...webhooks.HeaderOption) error { + return nil +} + +func (m *mockEventManager) Shutdown(ctx context.Context) error { return nil } @@ -57,7 +75,13 @@ func TestWorkerCache(t *testing.T) { // create mock bus and cache c, b, mc := newTestCache(zap.New(observedZapCore)) - // assert using cache before it's initialized prints a warning + // create mock event manager + m := &mockEventManager{readyChan: make(chan struct{})} + + // subscribe cache to event manager + c.Subscribe(m) + + // assert using cache before it's ready prints a warning contracts, err := c.DownloadContracts(context.Background()) if err != nil { t.Fatal(err) @@ -84,10 +108,8 @@ func TestWorkerCache(t *testing.T) { t.Fatal("expected error message to contain 'cache is not ready yet', got", lines[0].Message) } - // initialize the cache - if err := c.Initialize(context.Background(), ""); err != nil { - t.Fatal(err) - } + // close the ready channel + close(m.readyChan) // fetch contracts & gouging params so they're cached _, err = c.DownloadContracts(context.Background()) diff --git a/internal/worker/events.go b/internal/worker/events.go new file mode 100644 index 000000000..2776ed367 --- /dev/null +++ b/internal/worker/events.go @@ -0,0 +1,186 @@ +package worker + +import ( + "context" + "errors" + "fmt" + "net/http" + "sync" + "time" + + "go.sia.tech/jape" + "go.sia.tech/renterd/api" + "go.sia.tech/renterd/webhooks" + "go.uber.org/zap" +) + +type ( + EventManager interface { + AddSubscriber(id string, s EventSubscriber) (chan struct{}, error) + Handler() jape.Handler + Run(ctx context.Context, eventURL string, opts ...webhooks.HeaderOption) error + Shutdown(context.Context) error + } + + EventSubscriber interface { + HandleEvent(event webhooks.Event) error + Subscribe(e EventManager) error + } + + WebhookManager interface { + RegisterWebhook(ctx context.Context, wh webhooks.Webhook) error + UnregisterWebhook(ctx context.Context, wh webhooks.Webhook) error + } +) + +type ( + eventManager struct { + webhooks WebhookManager + logger *zap.SugaredLogger + + registerInterval time.Duration + + mu sync.Mutex + subs map[string]eventSubscriber + registered []webhooks.Webhook + } + + eventSubscriber struct { + EventSubscriber + readyChan chan struct{} + } +) + +func NewEventManager(w WebhookManager, l *zap.Logger, registerInterval time.Duration) EventManager { + return &eventManager{ + webhooks: w, + logger: l.Sugar().Named("events"), + + registerInterval: registerInterval, + + subs: make(map[string]eventSubscriber), + } +} + +func (e *eventManager) AddSubscriber(id string, s EventSubscriber) (chan struct{}, error) { + e.mu.Lock() + defer e.mu.Unlock() + _, ok := e.subs[id] + if ok { + return nil, fmt.Errorf("subscriber with id %v already exists", id) + } + + readyChan := make(chan struct{}) + if len(e.registered) > 0 { + close(readyChan) + } + + e.subs[id] = eventSubscriber{s, readyChan} + return readyChan, nil +} + +func (e *eventManager) Handler() jape.Handler { + return func(jc jape.Context) { + // decode the event + var event webhooks.Event + if jc.Decode(&event) != nil { + return + } else if event.Event == webhooks.WebhookEventPing { + jc.ResponseWriter.WriteHeader(http.StatusOK) + return + } + + // handle the event + for id, s := range e.subs { + if err := s.HandleEvent(event); err != nil { + e.logger.Errorw("failed to handle event", + zap.Error(err), + zap.String("subscriber", id), + zap.String("module", event.Module), + zap.String("event", event.Event), + ) + } else { + e.logger.Debugw("handled event", + zap.String("subscriber", id), + zap.String("module", event.Module), + zap.String("event", event.Event), + ) + } + } + } +} + +func (e *eventManager) Run(ctx context.Context, eventsURL string, opts ...webhooks.HeaderOption) error { + // prepare headers + headers := make(map[string]string) + for _, opt := range opts { + opt(headers) + } + + // prepare webhooks + webhooks := []webhooks.Webhook{ + api.WebhookConsensusUpdate(eventsURL, headers), + api.WebhookContractArchive(eventsURL, headers), + api.WebhookContractRenew(eventsURL, headers), + api.WebhookSettingUpdate(eventsURL, headers), + } + + // try and register the webhooks in a loop + for { + if e.registerWebhooks(ctx, webhooks) { + e.mu.Lock() + for _, s := range e.subs { + close(s.readyChan) + } + e.mu.Unlock() + break + } + + // sleep for a bit before trying again + e.logger.Warnf("failed to register webhooks, retrying in %v", e.registerInterval) + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(e.registerInterval): + } + } + + return nil +} + +func (e *eventManager) Shutdown(ctx context.Context) error { + e.mu.Lock() + defer e.mu.Unlock() + + // unregister webhooks + var errs []error + for _, wh := range e.registered { + if err := e.webhooks.UnregisterWebhook(ctx, wh); err != nil { + e.logger.Errorw("failed to unregister webhook", + zap.Error(err), + zap.Stringer("webhook", wh), + ) + errs = append(errs, err) + } + } + + return errors.Join(errs...) +} + +func (e *eventManager) registerWebhooks(ctx context.Context, webhooks []webhooks.Webhook) bool { + for _, wh := range webhooks { + if err := e.webhooks.RegisterWebhook(ctx, wh); err != nil { + e.logger.Errorw("failed to register webhook", + zap.Error(err), + zap.Stringer("webhook", wh), + ) + return false + } + } + + // save webhooks so we can unregister them on shutdown + e.mu.Lock() + e.registered = webhooks + e.mu.Unlock() + return true +} diff --git a/internal/worker/events_test.go b/internal/worker/events_test.go new file mode 100644 index 000000000..832de7775 --- /dev/null +++ b/internal/worker/events_test.go @@ -0,0 +1,198 @@ +package worker + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "net/http/httptest" + "sync" + "testing" + "time" + + "go.sia.tech/jape" + "go.sia.tech/renterd/api" + "go.sia.tech/renterd/webhooks" + "go.uber.org/zap" + "go.uber.org/zap/zaptest/observer" +) + +const testRegisterInterval = 100 * time.Millisecond + +type mockSubscriber struct { + id string + readyChan chan struct{} + + mu sync.Mutex + events []webhooks.Event +} + +func (s *mockSubscriber) Events() []webhooks.Event { + s.mu.Lock() + defer s.mu.Unlock() + return s.events +} + +func (s *mockSubscriber) HandleEvent(event webhooks.Event) error { + s.mu.Lock() + defer s.mu.Unlock() + + select { + case <-s.readyChan: + default: + return fmt.Errorf("subscriber not ready") + } + + s.events = append(s.events, event) + return nil +} + +func (s *mockSubscriber) Subscribe(e EventManager) error { + s.readyChan, _ = e.AddSubscriber(s.id, s) + return nil +} + +type mockWebhookManager struct { + blockChan chan struct{} + + mu sync.Mutex + registered []webhooks.Webhook +} + +func (m *mockWebhookManager) RegisterWebhook(ctx context.Context, webhook webhooks.Webhook) error { + <-m.blockChan + + m.mu.Lock() + defer m.mu.Unlock() + m.registered = append(m.registered, webhook) + return nil +} + +func (m *mockWebhookManager) UnregisterWebhook(ctx context.Context, webhook webhooks.Webhook) error { + m.mu.Lock() + defer m.mu.Unlock() + + for i, wh := range m.registered { + if wh.String() == webhook.String() { + m.registered = append(m.registered[:i], m.registered[i+1:]...) + return nil + } + } + return nil +} + +func (m *mockWebhookManager) Webhooks() []webhooks.Webhook { + m.mu.Lock() + defer m.mu.Unlock() + return m.registered +} + +func TestEventManager(t *testing.T) { + // observe logs + observedZapCore, observedLogs := observer.New(zap.DebugLevel) + + // create mocks + w := &mockWebhookManager{blockChan: make(chan struct{})} + s := &mockSubscriber{id: t.Name()} + + // create event manager + e := NewEventManager(w, zap.New(observedZapCore), testRegisterInterval) + + // subscribe to event manager + if err := s.Subscribe(e); err != nil { + t.Fatal(err) + } + + // setup a server + mux := jape.Mux(map[string]jape.Handler{"POST /events": e.Handler()}) + srv := httptest.NewServer(mux) + defer srv.Close() + + // run event manager + eventsURL := fmt.Sprintf("http://%v/events", srv.Listener.Addr().String()) + go func() { + if err := e.Run(context.Background(), eventsURL); err != nil { + t.Error(err) + } + }() + + // send an event before unblocking webhooks registration + err := sendEvent(eventsURL, webhooks.Event{Module: api.ModuleConsensus, Event: api.EventUpdate}) + if err != nil { + t.Fatal(err) + } + logs := observedLogs.TakeAll() + if len(logs) != 1 { + t.Fatal("expected 1 log, got", len(logs)) + } else if entry := logs[0]; entry.Message != "failed to handle event" || entry.ContextMap()["error"] != "subscriber not ready" { + t.Fatal("expected different log entry, got", entry) + } + + // unblock the webhooks registration + close(w.blockChan) + time.Sleep(testRegisterInterval) + + // assert webhook was registered + if webhooks := w.Webhooks(); len(webhooks) != 4 { + t.Fatal("expected 4 webhooks, got", len(webhooks)) + } + + // send the same event again + err = sendEvent(eventsURL, webhooks.Event{Module: api.ModuleConsensus, Event: api.EventUpdate}) + if err != nil { + t.Fatal(err) + } + logs = observedLogs.TakeAll() + if len(logs) != 1 { + t.Fatal("expected 1 log, got", len(logs)) + } else if entry := logs[0]; entry.Message != "handled event" || entry.ContextMap()["subscriber"] != t.Name() { + t.Fatal("expected different log entry, got", entry) + } + + // assert the subscriber handled the event + if events := s.Events(); len(events) != 1 { + t.Fatal("expected 1 event, got", len(events)) + } + + // shutdown event manager + err = e.Shutdown(context.Background()) + if err != nil { + t.Fatal(err) + } + + // assert webhook was unregistered + if webhooks := w.Webhooks(); len(webhooks) != 0 { + t.Fatal("expected 0 webhooks, got", len(webhooks)) + } +} + +func sendEvent(url string, event webhooks.Event) error { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + body, err := json.Marshal(event) + if err != nil { + return err + } + + req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(body)) + if err != nil { + return err + } + defer io.ReadAll(req.Body) // always drain body + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return err + } + if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusNoContent { + errStr, err := io.ReadAll(req.Body) + if err != nil { + return fmt.Errorf("failed to read response body: %w", err) + } + return fmt.Errorf("Webhook returned unexpected status %v: %v", resp.StatusCode, string(errStr)) + } + return nil +} diff --git a/worker/mocks_test.go b/worker/mocks_test.go index 192f4c169..4c3929205 100644 --- a/worker/mocks_test.go +++ b/worker/mocks_test.go @@ -697,3 +697,7 @@ type webhookStoreMock struct{} func (*webhookStoreMock) RegisterWebhook(ctx context.Context, webhook webhooks.Webhook) error { return nil } + +func (*webhookStoreMock) UnregisterWebhook(ctx context.Context, webhook webhooks.Webhook) error { + return nil +} diff --git a/worker/worker.go b/worker/worker.go index 9202d0c04..c05b3ffb5 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -157,6 +157,7 @@ type ( WebhookStore interface { RegisterWebhook(ctx context.Context, webhook webhooks.Webhook) error + UnregisterWebhook(ctx context.Context, webhook webhooks.Webhook) error } ConsensusState interface { @@ -208,6 +209,7 @@ type worker struct { masterKey [32]byte startTime time.Time + eventManager iworker.EventManager downloadManager *downloadManager uploadManager *uploadManager @@ -1230,25 +1232,6 @@ func (w *worker) idHandlerGET(jc jape.Context) { jc.Encode(w.id) } -func (w *worker) eventsHandler(jc jape.Context) { - var event webhooks.Event - if jc.Decode(&event) != nil { - return - } else if event.Event == webhooks.WebhookEventPing { - jc.ResponseWriter.WriteHeader(http.StatusOK) - return - } - - err := w.cache.HandleEvent(event) - if errors.Is(err, api.ErrUnknownEvent) { - jc.ResponseWriter.WriteHeader(http.StatusAccepted) - return - } else if err != nil { - jc.Error(err, http.StatusBadRequest) - return - } -} - func (w *worker) memoryGET(jc jape.Context) { jc.Encode(api.MemoryResponse{ Download: w.downloadManager.mm.Status(), @@ -1301,13 +1284,13 @@ func New(masterKey [32]byte, id string, b Bus, contractLockingDuration, busFlush } l = l.Named("worker").Named(id) - cache := iworker.NewCache(b, l) shutdownCtx, shutdownCancel := context.WithCancel(context.Background()) w := &worker{ alerts: alerts.WithOrigin(b, fmt.Sprintf("worker.%s", id)), allowPrivateIPs: allowPrivateIPs, contractLockingDuration: contractLockingDuration, - cache: cache, + cache: iworker.NewCache(b, l), + eventManager: iworker.NewEventManager(b, l, 10*time.Second), id: id, bus: b, masterKey: masterKey, @@ -1335,7 +1318,7 @@ func (w *worker) Handler() http.Handler { "GET /account/:hostkey": w.accountHandlerGET, "GET /id": w.idHandlerGET, - "POST /events": w.eventsHandler, + "POST /events": w.eventManager.Handler(), "GET /memory": w.memoryGET, @@ -1365,10 +1348,19 @@ func (w *worker) Handler() http.Handler { }) } -// Setup initializes the worker cache. +// Setup register event webhooks that enable the worker cache. func (w *worker) Setup(ctx context.Context, apiURL, apiPassword string) error { - webhookOpts := []webhooks.HeaderOption{webhooks.WithBasicAuth("", apiPassword)} - return w.cache.Initialize(ctx, apiURL, webhookOpts...) + // run event manager in a goroutine + go func() { + eventsURL := fmt.Sprintf("%s/events", apiURL) + webhookOpts := []webhooks.HeaderOption{webhooks.WithBasicAuth("", apiPassword)} + if err := w.eventManager.Run(w.shutdownCtx, eventsURL, webhookOpts...); err != nil { + w.logger.Errorw("failed to run event manager", zap.Error(err)) + } + }() + + // subscribe cache to the event manager + return w.cache.Subscribe(w.eventManager) } // Shutdown shuts down the worker. @@ -1382,7 +1374,9 @@ func (w *worker) Shutdown(ctx context.Context) error { // stop recorders w.contractSpendingRecorder.Stop(ctx) - return nil + + // shutdown event manager + return w.eventManager.Shutdown(ctx) } func (w *worker) scanHost(ctx context.Context, timeout time.Duration, hostKey types.PublicKey, hostIP string) (rhpv2.HostSettings, rhpv3.HostPriceTable, time.Duration, error) { From 65c0711a6c7819f4d8982ec3e9a630ffbdbe9601 Mon Sep 17 00:00:00 2001 From: PJ Date: Tue, 23 Jul 2024 12:11:38 +0200 Subject: [PATCH 2/5] worker: update route --- alerts/alerts_test.go | 4 ++-- internal/worker/events_test.go | 4 ++-- worker/client/client.go | 6 +++--- worker/worker.go | 8 ++++++-- 4 files changed, 13 insertions(+), 9 deletions(-) diff --git a/alerts/alerts_test.go b/alerts/alerts_test.go index 24b299e1b..99b698f7e 100644 --- a/alerts/alerts_test.go +++ b/alerts/alerts_test.go @@ -57,7 +57,7 @@ func TestWebhooks(t *testing.T) { mux := http.NewServeMux() var events []webhooks.Event var mu sync.Mutex - mux.HandleFunc("/events", func(w http.ResponseWriter, r *http.Request) { + mux.HandleFunc("/event", func(w http.ResponseWriter, r *http.Request) { var event webhooks.Event if err := json.NewDecoder(r.Body).Decode(&event); err != nil { t.Fatal(err) @@ -72,7 +72,7 @@ func TestWebhooks(t *testing.T) { // register a hook wh := webhooks.Webhook{ Module: webhookModule, - URL: fmt.Sprintf("http://%v/events", srv.Listener.Addr().String()), + URL: fmt.Sprintf("http://%v/event", srv.Listener.Addr().String()), } if hookID := wh.String(); hookID != fmt.Sprintf("%v.%v.%v", wh.URL, wh.Module, "") { t.Fatalf("wrong result for wh.String(): %v != %v", wh.String(), hookID) diff --git a/internal/worker/events_test.go b/internal/worker/events_test.go index 832de7775..eb6507ef6 100644 --- a/internal/worker/events_test.go +++ b/internal/worker/events_test.go @@ -106,12 +106,12 @@ func TestEventManager(t *testing.T) { } // setup a server - mux := jape.Mux(map[string]jape.Handler{"POST /events": e.Handler()}) + mux := jape.Mux(map[string]jape.Handler{"POST /event": e.Handler()}) srv := httptest.NewServer(mux) defer srv.Close() // run event manager - eventsURL := fmt.Sprintf("http://%v/events", srv.Listener.Addr().String()) + eventsURL := fmt.Sprintf("http://%v/event", srv.Listener.Addr().String()) go func() { if err := e.Run(context.Background(), eventsURL); err != nil { t.Error(err) diff --git a/worker/client/client.go b/worker/client/client.go index c1ab8a70e..8996f66ae 100644 --- a/worker/client/client.go +++ b/worker/client/client.go @@ -269,9 +269,9 @@ func (c *Client) UploadStats() (resp api.UploadStatsResponse, err error) { return } -// RegisterEvent register an event. -func (c *Client) RegisterEvent(ctx context.Context, e webhooks.Event) (err error) { - err = c.c.WithContext(ctx).POST("/events", e, nil) +// NotifyEvent notifies the worker of an event. +func (c *Client) NotifyEvent(ctx context.Context, e webhooks.Event) (err error) { + err = c.c.WithContext(ctx).POST("/event", e, nil) return } diff --git a/worker/worker.go b/worker/worker.go index c05b3ffb5..fbcfa144b 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -1248,6 +1248,10 @@ func (w *worker) accountHandlerGET(jc jape.Context) { jc.Encode(account) } +func (w *worker) eventHandlerPOST(jc jape.Context) { + w.eventManager.Handler()(jc) +} + func (w *worker) stateHandlerGET(jc jape.Context) { jc.Encode(api.WorkerStateResponse{ ID: w.id, @@ -1318,7 +1322,7 @@ func (w *worker) Handler() http.Handler { "GET /account/:hostkey": w.accountHandlerGET, "GET /id": w.idHandlerGET, - "POST /events": w.eventManager.Handler(), + "POST /event": w.eventHandlerPOST, "GET /memory": w.memoryGET, @@ -1352,7 +1356,7 @@ func (w *worker) Handler() http.Handler { func (w *worker) Setup(ctx context.Context, apiURL, apiPassword string) error { // run event manager in a goroutine go func() { - eventsURL := fmt.Sprintf("%s/events", apiURL) + eventsURL := fmt.Sprintf("%s/event", apiURL) webhookOpts := []webhooks.HeaderOption{webhooks.WithBasicAuth("", apiPassword)} if err := w.eventManager.Run(w.shutdownCtx, eventsURL, webhookOpts...); err != nil { w.logger.Errorw("failed to run event manager", zap.Error(err)) From 0ff0cdb61b269ea40d7e0b1ba6be4055dfbb26ad Mon Sep 17 00:00:00 2001 From: PJ Date: Tue, 23 Jul 2024 12:32:58 +0200 Subject: [PATCH 3/5] worker: remove Handler() from interface --- internal/worker/cache_test.go | 5 +--- internal/worker/events.go | 46 ++++++++++++---------------------- internal/worker/events_test.go | 12 ++++++++- worker/worker.go | 9 ++++++- 4 files changed, 36 insertions(+), 36 deletions(-) diff --git a/internal/worker/cache_test.go b/internal/worker/cache_test.go index c65e31331..4779949fd 100644 --- a/internal/worker/cache_test.go +++ b/internal/worker/cache_test.go @@ -7,7 +7,6 @@ import ( "time" "go.sia.tech/core/types" - "go.sia.tech/jape" "go.sia.tech/renterd/api" "go.sia.tech/renterd/internal/test" "go.sia.tech/renterd/webhooks" @@ -36,9 +35,7 @@ func (m *mockEventManager) AddSubscriber(id string, s EventSubscriber) (chan str return m.readyChan, nil } -func (m *mockEventManager) Handler() jape.Handler { - return nil -} +func (m *mockEventManager) HandleEvent(event webhooks.Event) {} func (m *mockEventManager) Run(ctx context.Context, eventURL string, opts ...webhooks.HeaderOption) error { return nil diff --git a/internal/worker/events.go b/internal/worker/events.go index 2776ed367..9bba07711 100644 --- a/internal/worker/events.go +++ b/internal/worker/events.go @@ -4,11 +4,9 @@ import ( "context" "errors" "fmt" - "net/http" "sync" "time" - "go.sia.tech/jape" "go.sia.tech/renterd/api" "go.sia.tech/renterd/webhooks" "go.uber.org/zap" @@ -17,7 +15,7 @@ import ( type ( EventManager interface { AddSubscriber(id string, s EventSubscriber) (chan struct{}, error) - Handler() jape.Handler + HandleEvent(event webhooks.Event) Run(ctx context.Context, eventURL string, opts ...webhooks.HeaderOption) error Shutdown(context.Context) error } @@ -79,33 +77,21 @@ func (e *eventManager) AddSubscriber(id string, s EventSubscriber) (chan struct{ return readyChan, nil } -func (e *eventManager) Handler() jape.Handler { - return func(jc jape.Context) { - // decode the event - var event webhooks.Event - if jc.Decode(&event) != nil { - return - } else if event.Event == webhooks.WebhookEventPing { - jc.ResponseWriter.WriteHeader(http.StatusOK) - return - } - - // handle the event - for id, s := range e.subs { - if err := s.HandleEvent(event); err != nil { - e.logger.Errorw("failed to handle event", - zap.Error(err), - zap.String("subscriber", id), - zap.String("module", event.Module), - zap.String("event", event.Event), - ) - } else { - e.logger.Debugw("handled event", - zap.String("subscriber", id), - zap.String("module", event.Module), - zap.String("event", event.Event), - ) - } +func (e *eventManager) HandleEvent(event webhooks.Event) { + for id, s := range e.subs { + if err := s.HandleEvent(event); err != nil { + e.logger.Errorw("failed to handle event", + zap.Error(err), + zap.String("subscriber", id), + zap.String("module", event.Module), + zap.String("event", event.Event), + ) + } else { + e.logger.Debugw("handled event", + zap.String("subscriber", id), + zap.String("module", event.Module), + zap.String("event", event.Event), + ) } } } diff --git a/internal/worker/events_test.go b/internal/worker/events_test.go index eb6507ef6..c9dd32c4c 100644 --- a/internal/worker/events_test.go +++ b/internal/worker/events_test.go @@ -106,7 +106,17 @@ func TestEventManager(t *testing.T) { } // setup a server - mux := jape.Mux(map[string]jape.Handler{"POST /event": e.Handler()}) + mux := jape.Mux(map[string]jape.Handler{"POST /event": func(jc jape.Context) { + var event webhooks.Event + if jc.Decode(&event) != nil { + return + } else if event.Event == webhooks.WebhookEventPing { + jc.ResponseWriter.WriteHeader(http.StatusOK) + return + } else { + e.HandleEvent(event) + } + }}) srv := httptest.NewServer(mux) defer srv.Close() diff --git a/worker/worker.go b/worker/worker.go index fbcfa144b..08bee7ee2 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -1249,7 +1249,14 @@ func (w *worker) accountHandlerGET(jc jape.Context) { } func (w *worker) eventHandlerPOST(jc jape.Context) { - w.eventManager.Handler()(jc) + var event webhooks.Event + if jc.Decode(&event) != nil { + return + } else if event.Event == webhooks.WebhookEventPing { + jc.ResponseWriter.WriteHeader(http.StatusOK) + } else { + w.eventManager.HandleEvent(event) + } } func (w *worker) stateHandlerGET(jc jape.Context) { From 44310ec66b61ef52d8bedf5b748deeee9c722e36 Mon Sep 17 00:00:00 2001 From: PJ Date: Tue, 30 Jul 2024 15:10:07 +0200 Subject: [PATCH 4/5] worker: address review comments --- alerts/alerts_test.go | 4 +- internal/worker/cache.go | 8 ++-- internal/worker/cache_test.go | 16 +++---- internal/worker/events.go | 83 ++++++++++++++++------------------ internal/worker/events_test.go | 42 ++++++++--------- worker/client/client.go | 2 +- worker/worker.go | 24 +++++----- 7 files changed, 86 insertions(+), 93 deletions(-) diff --git a/alerts/alerts_test.go b/alerts/alerts_test.go index 99b698f7e..24b299e1b 100644 --- a/alerts/alerts_test.go +++ b/alerts/alerts_test.go @@ -57,7 +57,7 @@ func TestWebhooks(t *testing.T) { mux := http.NewServeMux() var events []webhooks.Event var mu sync.Mutex - mux.HandleFunc("/event", func(w http.ResponseWriter, r *http.Request) { + mux.HandleFunc("/events", func(w http.ResponseWriter, r *http.Request) { var event webhooks.Event if err := json.NewDecoder(r.Body).Decode(&event); err != nil { t.Fatal(err) @@ -72,7 +72,7 @@ func TestWebhooks(t *testing.T) { // register a hook wh := webhooks.Webhook{ Module: webhookModule, - URL: fmt.Sprintf("http://%v/event", srv.Listener.Addr().String()), + URL: fmt.Sprintf("http://%v/events", srv.Listener.Addr().String()), } if hookID := wh.String(); hookID != fmt.Sprintf("%v.%v.%v", wh.URL, wh.Module, "") { t.Fatalf("wrong result for wh.String(): %v != %v", wh.String(), hookID) diff --git a/internal/worker/cache.go b/internal/worker/cache.go index 1bd2f5002..dc8ae6357 100644 --- a/internal/worker/cache.go +++ b/internal/worker/cache.go @@ -81,7 +81,7 @@ type ( DownloadContracts(ctx context.Context) ([]api.ContractMetadata, error) GougingParams(ctx context.Context) (api.GougingParams, error) HandleEvent(event webhooks.Event) error - Subscribe(e EventManager) error + Subscribe(e EventSubscriber) error } ) @@ -196,16 +196,16 @@ func (c *cache) HandleEvent(event webhooks.Event) (err error) { return } -func (c *cache) Subscribe(e EventManager) (err error) { +func (c *cache) Subscribe(e EventSubscriber) (err error) { c.mu.Lock() defer c.mu.Unlock() if c.readyChan != nil { return fmt.Errorf("already subscribed") } - c.readyChan, err = e.AddSubscriber(c.logger.Desugar().Name(), c) + c.readyChan, err = e.AddEventHandler(c.logger.Desugar().Name(), c) if err != nil { - return fmt.Errorf("failed to subscribe to event manager, error: %v", err) + return fmt.Errorf("failed to subscribe the worker cache, error: %v", err) } return nil } diff --git a/internal/worker/cache_test.go b/internal/worker/cache_test.go index cbeadb21f..9bc8d682d 100644 --- a/internal/worker/cache_test.go +++ b/internal/worker/cache_test.go @@ -27,21 +27,21 @@ func (m *mockBus) GougingParams(ctx context.Context) (api.GougingParams, error) return m.gougingParams, nil } -type mockEventManager struct { +type mockEventSubscriber struct { readyChan chan struct{} } -func (m *mockEventManager) AddSubscriber(id string, s EventSubscriber) (chan struct{}, error) { +func (m *mockEventSubscriber) AddEventHandler(id string, h EventHandler) (chan struct{}, error) { return m.readyChan, nil } -func (m *mockEventManager) HandleEvent(event webhooks.Event) {} +func (m *mockEventSubscriber) ProcessEvent(event webhooks.Event) {} -func (m *mockEventManager) Run(ctx context.Context, eventURL string, opts ...webhooks.HeaderOption) error { +func (m *mockEventSubscriber) Register(ctx context.Context, eventURL string, opts ...webhooks.HeaderOption) error { return nil } -func (m *mockEventManager) Shutdown(ctx context.Context) error { +func (m *mockEventSubscriber) Shutdown(ctx context.Context) error { return nil } @@ -72,10 +72,10 @@ func TestWorkerCache(t *testing.T) { // create mock bus and cache c, b, mc := newTestCache(zap.New(observedZapCore)) - // create mock event manager - m := &mockEventManager{readyChan: make(chan struct{})} + // create mock event subscriber + m := &mockEventSubscriber{readyChan: make(chan struct{})} - // subscribe cache to event manager + // subscribe cache to event subscriber c.Subscribe(m) // assert using cache before it's ready prints a warning diff --git a/internal/worker/events.go b/internal/worker/events.go index 266e6eb46..44b9d3a39 100644 --- a/internal/worker/events.go +++ b/internal/worker/events.go @@ -13,16 +13,16 @@ import ( ) type ( - EventManager interface { - AddSubscriber(id string, s EventSubscriber) (chan struct{}, error) - HandleEvent(event webhooks.Event) - Run(ctx context.Context, eventURL string, opts ...webhooks.HeaderOption) error + EventSubscriber interface { + AddEventHandler(id string, h EventHandler) (chan struct{}, error) + ProcessEvent(event webhooks.Event) + Register(ctx context.Context, eventURL string, opts ...webhooks.HeaderOption) error Shutdown(context.Context) error } - EventSubscriber interface { + EventHandler interface { HandleEvent(event webhooks.Event) error - Subscribe(e EventManager) error + Subscribe(e EventSubscriber) error } WebhookManager interface { @@ -32,71 +32,70 @@ type ( ) type ( - eventManager struct { + eventSubscriber struct { webhooks WebhookManager logger *zap.SugaredLogger registerInterval time.Duration - mu sync.Mutex - subs map[string]eventSubscriber - registered []webhooks.Webhook - } - - eventSubscriber struct { - EventSubscriber - readyChan chan struct{} + mu sync.Mutex + handlers map[string]EventHandler + registered []webhooks.Webhook + registeredChan chan struct{} } ) -func NewEventManager(w WebhookManager, l *zap.Logger, registerInterval time.Duration) EventManager { - return &eventManager{ +func NewEventSubscriber(w WebhookManager, l *zap.Logger, registerInterval time.Duration) EventSubscriber { + return &eventSubscriber{ webhooks: w, logger: l.Sugar().Named("events"), - registerInterval: registerInterval, + registeredChan: make(chan struct{}), - subs: make(map[string]eventSubscriber), + handlers: make(map[string]EventHandler), + registerInterval: registerInterval, } } -func (e *eventManager) AddSubscriber(id string, s EventSubscriber) (chan struct{}, error) { +func (e *eventSubscriber) AddEventHandler(id string, h EventHandler) (chan struct{}, error) { e.mu.Lock() defer e.mu.Unlock() - _, ok := e.subs[id] + _, ok := e.handlers[id] if ok { return nil, fmt.Errorf("subscriber with id %v already exists", id) } + e.handlers[id] = h - readyChan := make(chan struct{}) - if len(e.registered) > 0 { - close(readyChan) - } - - e.subs[id] = eventSubscriber{s, readyChan} - return readyChan, nil + return e.registeredChan, nil } -func (e *eventManager) HandleEvent(event webhooks.Event) { - for id, s := range e.subs { +func (e *eventSubscriber) ProcessEvent(event webhooks.Event) { + log := e.logger.With( + zap.String("module", event.Module), + zap.String("event", event.Event), + ) + + for id, s := range e.handlers { if err := s.HandleEvent(event); err != nil { - e.logger.Errorw("failed to handle event", + log.Errorw("failed to handle event", zap.Error(err), zap.String("subscriber", id), - zap.String("module", event.Module), - zap.String("event", event.Event), ) } else { - e.logger.Debugw("handled event", + log.Debugw("handled event", zap.String("subscriber", id), - zap.String("module", event.Module), - zap.String("event", event.Event), ) } } } -func (e *eventManager) Run(ctx context.Context, eventsURL string, opts ...webhooks.HeaderOption) error { +func (e *eventSubscriber) Register(ctx context.Context, eventsURL string, opts ...webhooks.HeaderOption) error { + select { + case <-e.registeredChan: + return fmt.Errorf("already registered") // developer error + default: + } + // prepare headers headers := make(map[string]string) for _, opt := range opts { @@ -115,11 +114,7 @@ func (e *eventManager) Run(ctx context.Context, eventsURL string, opts ...webhoo // try and register the webhooks in a loop for { if e.registerWebhooks(ctx, webhooks) { - e.mu.Lock() - for _, s := range e.subs { - close(s.readyChan) - } - e.mu.Unlock() + close(e.registeredChan) break } @@ -135,7 +130,7 @@ func (e *eventManager) Run(ctx context.Context, eventsURL string, opts ...webhoo return nil } -func (e *eventManager) Shutdown(ctx context.Context) error { +func (e *eventSubscriber) Shutdown(ctx context.Context) error { e.mu.Lock() defer e.mu.Unlock() @@ -154,7 +149,7 @@ func (e *eventManager) Shutdown(ctx context.Context) error { return errors.Join(errs...) } -func (e *eventManager) registerWebhooks(ctx context.Context, webhooks []webhooks.Webhook) bool { +func (e *eventSubscriber) registerWebhooks(ctx context.Context, webhooks []webhooks.Webhook) bool { for _, wh := range webhooks { if err := e.webhooks.RegisterWebhook(ctx, wh); err != nil { e.logger.Errorw("failed to register webhook", diff --git a/internal/worker/events_test.go b/internal/worker/events_test.go index c9dd32c4c..2dd946fca 100644 --- a/internal/worker/events_test.go +++ b/internal/worker/events_test.go @@ -21,7 +21,7 @@ import ( const testRegisterInterval = 100 * time.Millisecond -type mockSubscriber struct { +type mockEventHandler struct { id string readyChan chan struct{} @@ -29,13 +29,13 @@ type mockSubscriber struct { events []webhooks.Event } -func (s *mockSubscriber) Events() []webhooks.Event { +func (s *mockEventHandler) Events() []webhooks.Event { s.mu.Lock() defer s.mu.Unlock() return s.events } -func (s *mockSubscriber) HandleEvent(event webhooks.Event) error { +func (s *mockEventHandler) HandleEvent(event webhooks.Event) error { s.mu.Lock() defer s.mu.Unlock() @@ -49,8 +49,8 @@ func (s *mockSubscriber) HandleEvent(event webhooks.Event) error { return nil } -func (s *mockSubscriber) Subscribe(e EventManager) error { - s.readyChan, _ = e.AddSubscriber(s.id, s) +func (s *mockEventHandler) Subscribe(e EventSubscriber) error { + s.readyChan, _ = e.AddEventHandler(s.id, s) return nil } @@ -89,24 +89,24 @@ func (m *mockWebhookManager) Webhooks() []webhooks.Webhook { return m.registered } -func TestEventManager(t *testing.T) { +func TestEventSubscriber(t *testing.T) { // observe logs observedZapCore, observedLogs := observer.New(zap.DebugLevel) // create mocks w := &mockWebhookManager{blockChan: make(chan struct{})} - s := &mockSubscriber{id: t.Name()} + h := &mockEventHandler{id: t.Name()} - // create event manager - e := NewEventManager(w, zap.New(observedZapCore), testRegisterInterval) + // create event subscriber + s := NewEventSubscriber(w, zap.New(observedZapCore), testRegisterInterval) - // subscribe to event manager - if err := s.Subscribe(e); err != nil { + // subscribe the event handler + if err := h.Subscribe(s); err != nil { t.Fatal(err) } // setup a server - mux := jape.Mux(map[string]jape.Handler{"POST /event": func(jc jape.Context) { + mux := jape.Mux(map[string]jape.Handler{"POST /events": func(jc jape.Context) { var event webhooks.Event if jc.Decode(&event) != nil { return @@ -114,16 +114,16 @@ func TestEventManager(t *testing.T) { jc.ResponseWriter.WriteHeader(http.StatusOK) return } else { - e.HandleEvent(event) + s.ProcessEvent(event) } }}) srv := httptest.NewServer(mux) defer srv.Close() - // run event manager - eventsURL := fmt.Sprintf("http://%v/event", srv.Listener.Addr().String()) + // register the subscriber + eventsURL := fmt.Sprintf("http://%v/events", srv.Listener.Addr().String()) go func() { - if err := e.Run(context.Background(), eventsURL); err != nil { + if err := s.Register(context.Background(), eventsURL); err != nil { t.Error(err) } }() @@ -145,8 +145,8 @@ func TestEventManager(t *testing.T) { time.Sleep(testRegisterInterval) // assert webhook was registered - if webhooks := w.Webhooks(); len(webhooks) != 4 { - t.Fatal("expected 4 webhooks, got", len(webhooks)) + if webhooks := w.Webhooks(); len(webhooks) != 5 { + t.Fatal("expected 5 webhooks, got", len(webhooks)) } // send the same event again @@ -162,12 +162,12 @@ func TestEventManager(t *testing.T) { } // assert the subscriber handled the event - if events := s.Events(); len(events) != 1 { + if events := h.Events(); len(events) != 1 { t.Fatal("expected 1 event, got", len(events)) } - // shutdown event manager - err = e.Shutdown(context.Background()) + // shutdown event subscriber + err = s.Shutdown(context.Background()) if err != nil { t.Fatal(err) } diff --git a/worker/client/client.go b/worker/client/client.go index 8996f66ae..9abac4d0e 100644 --- a/worker/client/client.go +++ b/worker/client/client.go @@ -271,7 +271,7 @@ func (c *Client) UploadStats() (resp api.UploadStatsResponse, err error) { // NotifyEvent notifies the worker of an event. func (c *Client) NotifyEvent(ctx context.Context, e webhooks.Event) (err error) { - err = c.c.WithContext(ctx).POST("/event", e, nil) + err = c.c.WithContext(ctx).POST("/events", e, nil) return } diff --git a/worker/worker.go b/worker/worker.go index 304134ed8..7da7d0ceb 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -209,7 +209,7 @@ type worker struct { masterKey [32]byte startTime time.Time - eventManager iworker.EventManager + eventSubscriber iworker.EventSubscriber downloadManager *downloadManager uploadManager *uploadManager @@ -1252,14 +1252,14 @@ func (w *worker) accountHandlerGET(jc jape.Context) { jc.Encode(account) } -func (w *worker) eventHandlerPOST(jc jape.Context) { +func (w *worker) eventsHandlerPOST(jc jape.Context) { var event webhooks.Event if jc.Decode(&event) != nil { return } else if event.Event == webhooks.WebhookEventPing { jc.ResponseWriter.WriteHeader(http.StatusOK) } else { - w.eventManager.HandleEvent(event) + w.eventSubscriber.ProcessEvent(event) } } @@ -1305,7 +1305,7 @@ func New(masterKey [32]byte, id string, b Bus, contractLockingDuration, busFlush allowPrivateIPs: allowPrivateIPs, contractLockingDuration: contractLockingDuration, cache: iworker.NewCache(b, l), - eventManager: iworker.NewEventManager(b, l, 10*time.Second), + eventSubscriber: iworker.NewEventSubscriber(b, l, 10*time.Second), id: id, bus: b, masterKey: masterKey, @@ -1333,7 +1333,7 @@ func (w *worker) Handler() http.Handler { "GET /account/:hostkey": w.accountHandlerGET, "GET /id": w.idHandlerGET, - "POST /event": w.eventHandlerPOST, + "POST /events": w.eventsHandlerPOST, "GET /memory": w.memoryGET, @@ -1365,17 +1365,15 @@ func (w *worker) Handler() http.Handler { // Setup register event webhooks that enable the worker cache. func (w *worker) Setup(ctx context.Context, apiURL, apiPassword string) error { - // run event manager in a goroutine go func() { - eventsURL := fmt.Sprintf("%s/event", apiURL) + eventsURL := fmt.Sprintf("%s/events", apiURL) webhookOpts := []webhooks.HeaderOption{webhooks.WithBasicAuth("", apiPassword)} - if err := w.eventManager.Run(w.shutdownCtx, eventsURL, webhookOpts...); err != nil { - w.logger.Errorw("failed to run event manager", zap.Error(err)) + if err := w.eventSubscriber.Register(w.shutdownCtx, eventsURL, webhookOpts...); err != nil { + w.logger.Errorw("failed to register webhooks", zap.Error(err)) } }() - // subscribe cache to the event manager - return w.cache.Subscribe(w.eventManager) + return w.cache.Subscribe(w.eventSubscriber) } // Shutdown shuts down the worker. @@ -1390,8 +1388,8 @@ func (w *worker) Shutdown(ctx context.Context) error { // stop recorders w.contractSpendingRecorder.Stop(ctx) - // shutdown event manager - return w.eventManager.Shutdown(ctx) + // shutdown the subscriber + return w.eventSubscriber.Shutdown(ctx) } func (w *worker) scanHost(ctx context.Context, timeout time.Duration, hostKey types.PublicKey, hostIP string) (rhpv2.HostSettings, rhpv3.HostPriceTable, time.Duration, error) { From 3e380c87f1239ffe646bae523ab85c44a5f9ee7f Mon Sep 17 00:00:00 2001 From: PJ Date: Wed, 31 Jul 2024 12:19:44 +0200 Subject: [PATCH 5/5] worker: add webhook registration failed alert --- internal/worker/events.go | 40 ++++++++++++++++++++++++++++------ internal/worker/events_test.go | 13 ++++++++++- worker/worker.go | 5 +++-- 3 files changed, 48 insertions(+), 10 deletions(-) diff --git a/internal/worker/events.go b/internal/worker/events.go index 44b9d3a39..eb98018d3 100644 --- a/internal/worker/events.go +++ b/internal/worker/events.go @@ -7,11 +7,16 @@ import ( "sync" "time" + "go.sia.tech/renterd/alerts" "go.sia.tech/renterd/api" "go.sia.tech/renterd/webhooks" "go.uber.org/zap" ) +var ( + alertWebhookRegistrationFailedID = alerts.RandomAlertID() // constant until restarted +) + type ( EventSubscriber interface { AddEventHandler(id string, h EventHandler) (chan struct{}, error) @@ -33,6 +38,7 @@ type ( type ( eventSubscriber struct { + alerts alerts.Alerter webhooks WebhookManager logger *zap.SugaredLogger @@ -45,8 +51,9 @@ type ( } ) -func NewEventSubscriber(w WebhookManager, l *zap.Logger, registerInterval time.Duration) EventSubscriber { +func NewEventSubscriber(a alerts.Alerter, w WebhookManager, l *zap.Logger, registerInterval time.Duration) EventSubscriber { return &eventSubscriber{ + alerts: a, webhooks: w, logger: l.Sugar().Named("events"), @@ -113,13 +120,17 @@ func (e *eventSubscriber) Register(ctx context.Context, eventsURL string, opts . // try and register the webhooks in a loop for { - if e.registerWebhooks(ctx, webhooks) { - close(e.registeredChan) + err := e.registerWebhooks(ctx, webhooks) + if err == nil { + e.alerts.DismissAlerts(ctx, alertWebhookRegistrationFailedID) break } - // sleep for a bit before trying again + // alert on failure + e.alerts.RegisterAlert(ctx, newWebhookRegistrationFailedAlert(err)) e.logger.Warnf("failed to register webhooks, retrying in %v", e.registerInterval) + + // sleep for a bit before trying again select { case <-ctx.Done(): return ctx.Err() @@ -149,14 +160,14 @@ func (e *eventSubscriber) Shutdown(ctx context.Context) error { return errors.Join(errs...) } -func (e *eventSubscriber) registerWebhooks(ctx context.Context, webhooks []webhooks.Webhook) bool { +func (e *eventSubscriber) registerWebhooks(ctx context.Context, webhooks []webhooks.Webhook) error { for _, wh := range webhooks { if err := e.webhooks.RegisterWebhook(ctx, wh); err != nil { e.logger.Errorw("failed to register webhook", zap.Error(err), zap.Stringer("webhook", wh), ) - return false + return err } } @@ -164,5 +175,20 @@ func (e *eventSubscriber) registerWebhooks(ctx context.Context, webhooks []webho e.mu.Lock() e.registered = webhooks e.mu.Unlock() - return true + + // signal that we're registered + close(e.registeredChan) + return nil +} + +func newWebhookRegistrationFailedAlert(err error) alerts.Alert { + return alerts.Alert{ + ID: alertWebhookRegistrationFailedID, + Severity: alerts.SeverityCritical, + Message: "Worker failed to register webhooks", + Data: map[string]any{ + "error": err.Error(), + }, + Timestamp: time.Now(), + } } diff --git a/internal/worker/events_test.go b/internal/worker/events_test.go index 2dd946fca..3de028acd 100644 --- a/internal/worker/events_test.go +++ b/internal/worker/events_test.go @@ -12,7 +12,9 @@ import ( "testing" "time" + "go.sia.tech/core/types" "go.sia.tech/jape" + "go.sia.tech/renterd/alerts" "go.sia.tech/renterd/api" "go.sia.tech/renterd/webhooks" "go.uber.org/zap" @@ -21,6 +23,14 @@ import ( const testRegisterInterval = 100 * time.Millisecond +type mockAlerter struct{} + +func (a *mockAlerter) Alerts(ctx context.Context, opts alerts.AlertsOpts) (alerts.AlertsResponse, error) { + return alerts.AlertsResponse{}, nil +} +func (a *mockAlerter) RegisterAlert(ctx context.Context, alert alerts.Alert) error { return nil } +func (a *mockAlerter) DismissAlerts(ctx context.Context, ids ...types.Hash256) error { return nil } + type mockEventHandler struct { id string readyChan chan struct{} @@ -94,11 +104,12 @@ func TestEventSubscriber(t *testing.T) { observedZapCore, observedLogs := observer.New(zap.DebugLevel) // create mocks + a := &mockAlerter{} w := &mockWebhookManager{blockChan: make(chan struct{})} h := &mockEventHandler{id: t.Name()} // create event subscriber - s := NewEventSubscriber(w, zap.New(observedZapCore), testRegisterInterval) + s := NewEventSubscriber(a, w, zap.New(observedZapCore), testRegisterInterval) // subscribe the event handler if err := h.Subscribe(s); err != nil { diff --git a/worker/worker.go b/worker/worker.go index 7da7d0ceb..a6bfefbd4 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -1298,14 +1298,15 @@ func New(masterKey [32]byte, id string, b Bus, contractLockingDuration, busFlush return nil, errors.New("uploadMaxMemory cannot be 0") } + a := alerts.WithOrigin(b, fmt.Sprintf("worker.%s", id)) l = l.Named("worker").Named(id) shutdownCtx, shutdownCancel := context.WithCancel(context.Background()) w := &worker{ - alerts: alerts.WithOrigin(b, fmt.Sprintf("worker.%s", id)), + alerts: a, allowPrivateIPs: allowPrivateIPs, contractLockingDuration: contractLockingDuration, cache: iworker.NewCache(b, l), - eventSubscriber: iworker.NewEventSubscriber(b, l, 10*time.Second), + eventSubscriber: iworker.NewEventSubscriber(a, b, l, 10*time.Second), id: id, bus: b, masterKey: masterKey,