From 1fc76d696a2135291980930f331068def4244297 Mon Sep 17 00:00:00 2001 From: Alvar Penning Date: Thu, 26 Oct 2023 17:22:21 +0200 Subject: [PATCH] eventstream: Event dispatcher The just introduced replay logic felt a bit clumsy. Thus, I introduced a middle layer - the eventDispatcher method within its own goroutine - to receive all Events and decide if buffering should be performed. --- internal/eventstream/client.go | 97 +++++++++++++++--------------- internal/eventstream/client_api.go | 4 +- internal/eventstream/client_es.go | 2 +- 3 files changed, 53 insertions(+), 50 deletions(-) diff --git a/internal/eventstream/client.go b/internal/eventstream/client.go index 3c0c68947..a1cc7f970 100644 --- a/internal/eventstream/client.go +++ b/internal/eventstream/client.go @@ -41,11 +41,12 @@ type Client struct { // Logger to log to. Logger *logging.Logger + // eventDispatch communicates Events to be processed between producer and consumer. + eventDispatch chan *event.Event + // replayTrigger signals the eventDispatcher method that the reconnection phase is finished. + replayTrigger chan struct{} // replayPhase indicates that Events will be cached as the Event Stream Client is in the reconnection phase. replayPhase atomic.Bool - // replayBuffer is the cache being populated during the reconnection phase and its mutex. - replayBuffer []*event.Event - replayBufferMutex sync.Mutex } // NewClientsFromConfig returns all Clients defined in the conf.ConfigFile. @@ -220,22 +221,44 @@ func (client *Client) buildAcknowledgementEvent(host, service, author, comment s return ev, nil } -// handleEvent checks and dispatches generated Events. -func (client *Client) handleEvent(ev *event.Event) { - if client.replayPhase.Load() { - client.replayBufferMutex.Lock() - client.replayBuffer = append(client.replayBuffer, ev) - client.replayBufferMutex.Unlock() - return - } +// eventDispatcher receives generated event.Events to be either buffered or directly delivered to the CallbackFn. +// +// When the Client is in the reconnection phase, indicated in the enterReconnectionPhase method, than all received Events +// from the eventDispatch channel will be buffered until the replayTrigger fires. +func (client *Client) eventDispatcher() { + var reconnectionBuffer []*event.Event + + for { + select { + case <-client.Ctx.Done(): + client.Logger.Warnw("Closing event dispatcher as context is done", zap.Error(client.Ctx.Err())) + return - client.CallbackFn(ev) + case <-client.replayTrigger: + for _, ev := range reconnectionBuffer { + client.CallbackFn(ev) + } + client.Logger.Debugf("Replayed %d events during reconnection phase", len(reconnectionBuffer)) + client.replayPhase.Store(false) + reconnectionBuffer = []*event.Event{} + client.Logger.Info("Finished reconnection phase and returning normal operation") + + case ev := <-client.eventDispatch: + if client.replayPhase.Load() { + reconnectionBuffer = append(reconnectionBuffer, ev) + } else { + client.CallbackFn(ev) + } + } + } } -func (client *Client) replayBufferedEvents() { - client.replayBufferMutex.Lock() - client.replayBuffer = make([]*event.Event, 0, 1024) - client.replayBufferMutex.Unlock() +// enterReconnectionPhase enters the reconnection phase. +// +// This method starts multiple goroutines. First, some workers to query the Icinga 2 Objects API will be launched. When +// all of those have finished, the replayTrigger will be used to indicate that the buffered Events should be replayed. +func (client *Client) enterReconnectionPhase() { + client.Logger.Info("Entering reconnection phase to replay events") client.replayPhase.Store(true) queryFns := []func(string){client.checkMissedAcknowledgements, client.checkMissedStateChanges} @@ -253,38 +276,9 @@ func (client *Client) replayBufferedEvents() { } } - // Fork off the synchronization in a background goroutine to wait for all producers to finish. As the producers - // check the Client's context, they should finish early and this should not deadlock. go func() { replayWg.Wait() - client.Logger.Debug("Querying the Objects API for replaying finished") - - if client.Ctx.Err() != nil { - client.Logger.Warn("Aborting Objects API replaying as the context is done") - return - } - - for { - // Here is a race between filling the buffer from incoming Event Stream events and processing the buffered - // events. Thus, the buffer will be reset to catch up what happened in between, as otherwise Events would be - // processed out of order. Only when the buffer is empty, the replay mode will be reset. - client.replayBufferMutex.Lock() - tmpReplayBuffer := client.replayBuffer - client.replayBuffer = make([]*event.Event, 0, 1024) - client.replayBufferMutex.Unlock() - - if len(tmpReplayBuffer) == 0 { - break - } - - for _, ev := range tmpReplayBuffer { - client.CallbackFn(ev) - } - client.Logger.Debugf("Replayed %d events", len(tmpReplayBuffer)) - } - - client.replayPhase.Store(false) - client.Logger.Debug("Finished replay") + client.replayTrigger <- struct{}{} }() } @@ -294,8 +288,17 @@ func (client *Client) replayBufferedEvents() { // loop takes care of reconnections, all those events will be logged while generated Events will be dispatched to the // callback function. func (client *Client) Process() { + // These two channels will be used to communicate the Events and are crucial. As there are multiple producers and + // only one consumer, eventDispatcher, there is no ideal closer. However, producers and the consumer will be + // finished by the Client's context. When this happens, the main application should either be stopped or the Client + // is restarted, and we can hope for the GC. To make sure that nothing gets stuck, make the event channel buffered. + client.eventDispatch = make(chan *event.Event, 1024) + client.replayTrigger = make(chan struct{}) + defer client.Logger.Info("Event Stream Client has stopped") + go client.eventDispatcher() + for { client.Logger.Info("Start listening on Icinga 2 Event Stream..") err := client.listenEventStream() @@ -311,6 +314,6 @@ func (client *Client) Process() { return } - client.replayBufferedEvents() + client.enterReconnectionPhase() } } diff --git a/internal/eventstream/client_api.go b/internal/eventstream/client_api.go index 8ce3c380f..ae08e3205 100644 --- a/internal/eventstream/client_api.go +++ b/internal/eventstream/client_api.go @@ -232,7 +232,7 @@ func (client *Client) checkMissedStateChanges(objType string) { return } - client.handleEvent(ev) + client.eventDispatch <- ev }) } @@ -254,7 +254,7 @@ func (client *Client) checkMissedAcknowledgements(objType string) { return } - client.handleEvent(ev) + client.eventDispatch <- ev }) } diff --git a/internal/eventstream/client_es.go b/internal/eventstream/client_es.go index 2f58c9ba7..3aaf31fec 100644 --- a/internal/eventstream/client_es.go +++ b/internal/eventstream/client_es.go @@ -88,7 +88,7 @@ func (client *Client) listenEventStream() error { return err } - client.handleEvent(ev) + client.eventDispatch <- ev } return lineScanner.Err() }