Skip to content

Commit

Permalink
eventstream: Event dispatcher
Browse files Browse the repository at this point in the history
The just introduced replay logic felt a bit clumsy. Thus, I introduced a
middle layer - the eventDispatcher method within its own goroutine - to
receive all Events and decide if buffering should be performed.
  • Loading branch information
oxzi committed Oct 26, 2023
1 parent cb377fe commit 1fc76d6
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 50 deletions.
97 changes: 50 additions & 47 deletions internal/eventstream/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,12 @@ type Client struct {
// Logger to log to.
Logger *logging.Logger

// eventDispatch communicates Events to be processed between producer and consumer.
eventDispatch chan *event.Event
// replayTrigger signals the eventDispatcher method that the reconnection phase is finished.
replayTrigger chan struct{}
// replayPhase indicates that Events will be cached as the Event Stream Client is in the reconnection phase.
replayPhase atomic.Bool
// replayBuffer is the cache being populated during the reconnection phase and its mutex.
replayBuffer []*event.Event
replayBufferMutex sync.Mutex
}

// NewClientsFromConfig returns all Clients defined in the conf.ConfigFile.
Expand Down Expand Up @@ -220,22 +221,44 @@ func (client *Client) buildAcknowledgementEvent(host, service, author, comment s
return ev, nil
}

// handleEvent checks and dispatches generated Events.
func (client *Client) handleEvent(ev *event.Event) {
if client.replayPhase.Load() {
client.replayBufferMutex.Lock()
client.replayBuffer = append(client.replayBuffer, ev)
client.replayBufferMutex.Unlock()
return
}
// eventDispatcher receives generated event.Events to be either buffered or directly delivered to the CallbackFn.
//
// When the Client is in the reconnection phase, indicated in the enterReconnectionPhase method, than all received Events
// from the eventDispatch channel will be buffered until the replayTrigger fires.
func (client *Client) eventDispatcher() {
var reconnectionBuffer []*event.Event

for {
select {
case <-client.Ctx.Done():
client.Logger.Warnw("Closing event dispatcher as context is done", zap.Error(client.Ctx.Err()))
return

client.CallbackFn(ev)
case <-client.replayTrigger:
for _, ev := range reconnectionBuffer {
client.CallbackFn(ev)
}
client.Logger.Debugf("Replayed %d events during reconnection phase", len(reconnectionBuffer))
client.replayPhase.Store(false)
reconnectionBuffer = []*event.Event{}
client.Logger.Info("Finished reconnection phase and returning normal operation")

case ev := <-client.eventDispatch:
if client.replayPhase.Load() {
reconnectionBuffer = append(reconnectionBuffer, ev)
} else {
client.CallbackFn(ev)
}
}
}
}

func (client *Client) replayBufferedEvents() {
client.replayBufferMutex.Lock()
client.replayBuffer = make([]*event.Event, 0, 1024)
client.replayBufferMutex.Unlock()
// enterReconnectionPhase enters the reconnection phase.
//
// This method starts multiple goroutines. First, some workers to query the Icinga 2 Objects API will be launched. When
// all of those have finished, the replayTrigger will be used to indicate that the buffered Events should be replayed.
func (client *Client) enterReconnectionPhase() {
client.Logger.Info("Entering reconnection phase to replay events")
client.replayPhase.Store(true)

queryFns := []func(string){client.checkMissedAcknowledgements, client.checkMissedStateChanges}
Expand All @@ -253,38 +276,9 @@ func (client *Client) replayBufferedEvents() {
}
}

// Fork off the synchronization in a background goroutine to wait for all producers to finish. As the producers
// check the Client's context, they should finish early and this should not deadlock.
go func() {
replayWg.Wait()
client.Logger.Debug("Querying the Objects API for replaying finished")

if client.Ctx.Err() != nil {
client.Logger.Warn("Aborting Objects API replaying as the context is done")
return
}

for {
// Here is a race between filling the buffer from incoming Event Stream events and processing the buffered
// events. Thus, the buffer will be reset to catch up what happened in between, as otherwise Events would be
// processed out of order. Only when the buffer is empty, the replay mode will be reset.
client.replayBufferMutex.Lock()
tmpReplayBuffer := client.replayBuffer
client.replayBuffer = make([]*event.Event, 0, 1024)
client.replayBufferMutex.Unlock()

if len(tmpReplayBuffer) == 0 {
break
}

for _, ev := range tmpReplayBuffer {
client.CallbackFn(ev)
}
client.Logger.Debugf("Replayed %d events", len(tmpReplayBuffer))
}

client.replayPhase.Store(false)
client.Logger.Debug("Finished replay")
client.replayTrigger <- struct{}{}
}()
}

Expand All @@ -294,8 +288,17 @@ func (client *Client) replayBufferedEvents() {
// loop takes care of reconnections, all those events will be logged while generated Events will be dispatched to the
// callback function.
func (client *Client) Process() {
// These two channels will be used to communicate the Events and are crucial. As there are multiple producers and
// only one consumer, eventDispatcher, there is no ideal closer. However, producers and the consumer will be
// finished by the Client's context. When this happens, the main application should either be stopped or the Client
// is restarted, and we can hope for the GC. To make sure that nothing gets stuck, make the event channel buffered.
client.eventDispatch = make(chan *event.Event, 1024)
client.replayTrigger = make(chan struct{})

defer client.Logger.Info("Event Stream Client has stopped")

go client.eventDispatcher()

for {
client.Logger.Info("Start listening on Icinga 2 Event Stream..")
err := client.listenEventStream()
Expand All @@ -311,6 +314,6 @@ func (client *Client) Process() {
return
}

client.replayBufferedEvents()
client.enterReconnectionPhase()
}
}
4 changes: 2 additions & 2 deletions internal/eventstream/client_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ func (client *Client) checkMissedStateChanges(objType string) {
return
}

client.handleEvent(ev)
client.eventDispatch <- ev
})
}

Expand All @@ -254,7 +254,7 @@ func (client *Client) checkMissedAcknowledgements(objType string) {
return
}

client.handleEvent(ev)
client.eventDispatch <- ev
})
}

Expand Down
2 changes: 1 addition & 1 deletion internal/eventstream/client_es.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (client *Client) listenEventStream() error {
return err
}

client.handleEvent(ev)
client.eventDispatch <- ev
}
return lineScanner.Err()
}

0 comments on commit 1fc76d6

Please sign in to comment.