Skip to content

Commit

Permalink
Initialize worker in a non-blocking fashion to facilitate clustered s…
Browse files Browse the repository at this point in the history
…etups (#1391)

The `worker` currently uses `events` to invalidate the worker cache.
This complicates cluster setups because the worker has to register
webhooks with the bus in order for it to get notified of certain events.
This is especially annoying when dealing with Kubernetes setups, where a
pod might not be aware of its location in the network, or its DNS name
might not yet be published and so on.

This PR addresses those issues by making the setup more asynchronous. To
do so I added a small abstraction of the event manager since it felt
weird extending the worker cache that way. Instead, we now have an event
manager that tries to register (**and unregister on shutdown**) a set of
webhooks. If and when that succeeds, subscribers (like the worker cache,
but in the future more systems might subscribe to events) will be
forwarded the events and can handle them as they see fit.

I also changed the route to `POST /event`, that's why it's a `DRAFT` for
now. I'm not sure whether we can still change it but since it takes a
single event we should probably rename it to be singular.

Fixes #1368
  • Loading branch information
ChrisSchinnerl authored Jul 31, 2024
2 parents 6363ba7 + 751798d commit 5815d9e
Show file tree
Hide file tree
Showing 8 changed files with 497 additions and 68 deletions.
12 changes: 4 additions & 8 deletions bus/client/webhooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,12 @@ func (c *Client) BroadcastAction(ctx context.Context, action webhooks.Event) err
return err
}

// DeleteWebhook deletes the webhook with the given ID.
func (c *Client) DeleteWebhook(ctx context.Context, url, module, event string) error {
return c.c.POST("/webhook/delete", webhooks.Webhook{
URL: url,
Module: module,
Event: event,
}, nil)
// UnregisterWebhook unregisters the given webhook.
func (c *Client) UnregisterWebhook(ctx context.Context, webhook webhooks.Webhook) error {
return c.c.POST("/webhook/delete", webhook, nil)
}

// RegisterWebhook registers a new webhook for the given URL.
// RegisterWebhook registers the given webhook.
func (c *Client) RegisterWebhook(ctx context.Context, webhook webhooks.Webhook) error {
err := c.c.WithContext(ctx).POST("/webhooks", webhook, nil)
return err
Expand Down
43 changes: 18 additions & 25 deletions internal/worker/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,13 @@ type (
Bus interface {
Contracts(ctx context.Context, opts api.ContractsOpts) ([]api.ContractMetadata, error)
GougingParams(ctx context.Context) (api.GougingParams, error)
RegisterWebhook(ctx context.Context, wh webhooks.Webhook) error
}

WorkerCache interface {
DownloadContracts(ctx context.Context) ([]api.ContractMetadata, error)
GougingParams(ctx context.Context) (api.GougingParams, error)
HandleEvent(event webhooks.Event) error
Initialize(ctx context.Context, workerAPI string, opts ...webhooks.HeaderOption) error
Subscribe(e EventSubscriber) error
}
)

Expand All @@ -92,8 +91,8 @@ type cache struct {
cache *memoryCache
logger *zap.SugaredLogger

mu sync.Mutex
ready bool
mu sync.Mutex
readyChan chan struct{}
}

func NewCache(b Bus, logger *zap.Logger) WorkerCache {
Expand Down Expand Up @@ -197,33 +196,27 @@ func (c *cache) HandleEvent(event webhooks.Event) (err error) {
return
}

func (c *cache) Initialize(ctx context.Context, workerAPI string, webhookOpts ...webhooks.HeaderOption) error {
eventsURL := fmt.Sprintf("%s/events", workerAPI)
headers := make(map[string]string)
for _, opt := range webhookOpts {
opt(headers)
func (c *cache) Subscribe(e EventSubscriber) (err error) {
c.mu.Lock()
defer c.mu.Unlock()
if c.readyChan != nil {
return fmt.Errorf("already subscribed")
}
for _, wh := range []webhooks.Webhook{
api.WebhookConsensusUpdate(eventsURL, headers),
api.WebhookContractArchive(eventsURL, headers),
api.WebhookContractRenew(eventsURL, headers),
api.WebhookHostUpdate(eventsURL, headers),
api.WebhookSettingUpdate(eventsURL, headers),
} {
if err := c.b.RegisterWebhook(ctx, wh); err != nil {
return fmt.Errorf("failed to register webhook '%s', err: %v", wh, err)
}

c.readyChan, err = e.AddEventHandler(c.logger.Desugar().Name(), c)
if err != nil {
return fmt.Errorf("failed to subscribe the worker cache, error: %v", err)
}
c.mu.Lock()
c.ready = true
c.mu.Unlock()
return nil
}

func (c *cache) isReady() bool {
c.mu.Lock()
defer c.mu.Unlock()
return c.ready
select {
case <-c.readyChan:
return true
default:
}
return false
}

func (c *cache) handleConsensusUpdate(event api.EventConsensusUpdate) {
Expand Down
31 changes: 25 additions & 6 deletions internal/worker/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,22 @@ func (m *mockBus) Contracts(ctx context.Context, opts api.ContractsOpts) ([]api.
func (m *mockBus) GougingParams(ctx context.Context) (api.GougingParams, error) {
return m.gougingParams, nil
}
func (m *mockBus) RegisterWebhook(ctx context.Context, wh webhooks.Webhook) error {

type mockEventSubscriber struct {
readyChan chan struct{}
}

func (m *mockEventSubscriber) AddEventHandler(id string, h EventHandler) (chan struct{}, error) {
return m.readyChan, nil
}

func (m *mockEventSubscriber) ProcessEvent(event webhooks.Event) {}

func (m *mockEventSubscriber) Register(ctx context.Context, eventURL string, opts ...webhooks.HeaderOption) error {
return nil
}

func (m *mockEventSubscriber) Shutdown(ctx context.Context) error {
return nil
}

Expand Down Expand Up @@ -57,7 +72,13 @@ func TestWorkerCache(t *testing.T) {
// create mock bus and cache
c, b, mc := newTestCache(zap.New(observedZapCore))

// assert using cache before it's initialized prints a warning
// create mock event subscriber
m := &mockEventSubscriber{readyChan: make(chan struct{})}

// subscribe cache to event subscriber
c.Subscribe(m)

// assert using cache before it's ready prints a warning
contracts, err := c.DownloadContracts(context.Background())
if err != nil {
t.Fatal(err)
Expand All @@ -84,10 +105,8 @@ func TestWorkerCache(t *testing.T) {
t.Fatal("expected error message to contain 'cache is not ready yet', got", lines[0].Message)
}

// initialize the cache
if err := c.Initialize(context.Background(), ""); err != nil {
t.Fatal(err)
}
// close the ready channel
close(m.readyChan)

// fetch contracts & gouging params so they're cached
_, err = c.DownloadContracts(context.Background())
Expand Down
194 changes: 194 additions & 0 deletions internal/worker/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
package worker

import (
"context"
"errors"
"fmt"
"sync"
"time"

"go.sia.tech/renterd/alerts"
"go.sia.tech/renterd/api"
"go.sia.tech/renterd/webhooks"
"go.uber.org/zap"
)

var (
alertWebhookRegistrationFailedID = alerts.RandomAlertID() // constant until restarted
)

type (
EventSubscriber interface {
AddEventHandler(id string, h EventHandler) (chan struct{}, error)
ProcessEvent(event webhooks.Event)
Register(ctx context.Context, eventURL string, opts ...webhooks.HeaderOption) error
Shutdown(context.Context) error
}

EventHandler interface {
HandleEvent(event webhooks.Event) error
Subscribe(e EventSubscriber) error
}

WebhookManager interface {
RegisterWebhook(ctx context.Context, wh webhooks.Webhook) error
UnregisterWebhook(ctx context.Context, wh webhooks.Webhook) error
}
)

type (
eventSubscriber struct {
alerts alerts.Alerter
webhooks WebhookManager
logger *zap.SugaredLogger

registerInterval time.Duration

mu sync.Mutex
handlers map[string]EventHandler
registered []webhooks.Webhook
registeredChan chan struct{}
}
)

func NewEventSubscriber(a alerts.Alerter, w WebhookManager, l *zap.Logger, registerInterval time.Duration) EventSubscriber {
return &eventSubscriber{
alerts: a,
webhooks: w,
logger: l.Sugar().Named("events"),

registeredChan: make(chan struct{}),

handlers: make(map[string]EventHandler),
registerInterval: registerInterval,
}
}

func (e *eventSubscriber) AddEventHandler(id string, h EventHandler) (chan struct{}, error) {
e.mu.Lock()
defer e.mu.Unlock()
_, ok := e.handlers[id]
if ok {
return nil, fmt.Errorf("subscriber with id %v already exists", id)
}
e.handlers[id] = h

return e.registeredChan, nil
}

func (e *eventSubscriber) ProcessEvent(event webhooks.Event) {
log := e.logger.With(
zap.String("module", event.Module),
zap.String("event", event.Event),
)

for id, s := range e.handlers {
if err := s.HandleEvent(event); err != nil {
log.Errorw("failed to handle event",
zap.Error(err),
zap.String("subscriber", id),
)
} else {
log.Debugw("handled event",
zap.String("subscriber", id),
)
}
}
}

func (e *eventSubscriber) Register(ctx context.Context, eventsURL string, opts ...webhooks.HeaderOption) error {
select {
case <-e.registeredChan:
return fmt.Errorf("already registered") // developer error
default:
}

// prepare headers
headers := make(map[string]string)
for _, opt := range opts {
opt(headers)
}

// prepare webhooks
webhooks := []webhooks.Webhook{
api.WebhookConsensusUpdate(eventsURL, headers),
api.WebhookContractArchive(eventsURL, headers),
api.WebhookContractRenew(eventsURL, headers),
api.WebhookHostUpdate(eventsURL, headers),
api.WebhookSettingUpdate(eventsURL, headers),
}

// try and register the webhooks in a loop
for {
err := e.registerWebhooks(ctx, webhooks)
if err == nil {
e.alerts.DismissAlerts(ctx, alertWebhookRegistrationFailedID)
break
}

// alert on failure
e.alerts.RegisterAlert(ctx, newWebhookRegistrationFailedAlert(err))
e.logger.Warnf("failed to register webhooks, retrying in %v", e.registerInterval)

// sleep for a bit before trying again
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(e.registerInterval):
}
}

return nil
}

func (e *eventSubscriber) Shutdown(ctx context.Context) error {
e.mu.Lock()
defer e.mu.Unlock()

// unregister webhooks
var errs []error
for _, wh := range e.registered {
if err := e.webhooks.UnregisterWebhook(ctx, wh); err != nil {
e.logger.Errorw("failed to unregister webhook",
zap.Error(err),
zap.Stringer("webhook", wh),
)
errs = append(errs, err)
}
}

return errors.Join(errs...)
}

func (e *eventSubscriber) registerWebhooks(ctx context.Context, webhooks []webhooks.Webhook) error {
for _, wh := range webhooks {
if err := e.webhooks.RegisterWebhook(ctx, wh); err != nil {
e.logger.Errorw("failed to register webhook",
zap.Error(err),
zap.Stringer("webhook", wh),
)
return err
}
}

// save webhooks so we can unregister them on shutdown
e.mu.Lock()
e.registered = webhooks
e.mu.Unlock()

// signal that we're registered
close(e.registeredChan)
return nil
}

func newWebhookRegistrationFailedAlert(err error) alerts.Alert {
return alerts.Alert{
ID: alertWebhookRegistrationFailedID,
Severity: alerts.SeverityCritical,
Message: "Worker failed to register webhooks",
Data: map[string]any{
"error": err.Error(),
},
Timestamp: time.Now(),
}
}
Loading

0 comments on commit 5815d9e

Please sign in to comment.