Skip to content

Commit

Permalink
health: Fix yet another NRE and improve panic handling (#172)
Browse files Browse the repository at this point in the history
* pkg/event: Include panicked message in recv log

* health: Recover panics from single-evt processing

* health: Fix the actual NRE
  • Loading branch information
victorges authored Oct 4, 2023
1 parent b938ac1 commit 31e37fc
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 11 deletions.
33 changes: 29 additions & 4 deletions health/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"math/rand"
"runtime/debug"
"sort"
"time"

Expand Down Expand Up @@ -137,7 +138,11 @@ func (c *Core) HandleMessage(msg event.StreamMessage) {
}

start := time.Now()
c.handleSingleEvent(evt)
err = c.handleSingleEvent(evt)
if err != nil {
glog.Errorf("Health core failed to process event. err=%q, event=%q", err, rawEvt)
continue
}
dur := time.Since(start)

eventsProcessedCount.WithLabelValues(string(evt.Type())).
Expand All @@ -150,16 +155,20 @@ func (c *Core) HandleMessage(msg event.StreamMessage) {
}
}

func (c *Core) handleSingleEvent(evt data.Event) {
func (c *Core) handleSingleEvent(evt data.Event) (err error) {
streamID, ts := evt.StreamID(), evt.Timestamp()
c.lastEventTs = ts
record := c.storage.GetOrCreate(streamID, c.conditionTypes)

record.RLock()
status, state := record.LastStatus, record.ReducerState
record.RUnlock()
// Only 1 go-routine processing events rn, so no need for locking here.
status, state = c.reducer.Reduce(status, state, evt)

// Only 1 go-routine processing events at a time, so no need for locking here.
status, state, err = reduceRecv(c.reducer, status, state, evt)
if err != nil {
return err
}

record.Lock()
defer record.Unlock()
Expand All @@ -182,6 +191,22 @@ func (c *Core) handleSingleEvent(evt data.Event) {
glog.Warningf("Buffer full for health event subscription, skipping message. streamId=%q, eventTs=%q", streamID, ts)
}
}
return nil
}

// reduceRecv is a wrapper around the reducer's Reduce method that recovers from
// panics. Reducers are stateless so its safe to recover from panics and
// continue processing of other events.
func reduceRecv(reducer Reducer, currStatus *data.HealthStatus, currState interface{}, evt data.Event) (newStatus *data.HealthStatus, newState interface{}, err error) {
defer func() {
if rec := recover(); rec != nil {
glog.Errorf("Panic in health reducer. panicValue=%q event=%+v stack=%q", rec, evt, debug.Stack())
err = fmt.Errorf("panic: %v", rec)
}
}()

newStatus, newState = reducer.Reduce(currStatus, currState, evt)
return
}

func (c *Core) GetStatus(manifestID string) (*data.HealthStatus, error) {
Expand Down
2 changes: 1 addition & 1 deletion health/reducers/media_server_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (t MediaServerMetrics) Reduce(current *data.HealthStatus, _ interface{}, ev
}

func multistreamMetrics(current *data.HealthStatus, ts time.Time, nodeID, region string, ms *data.MultistreamTargetMetrics) []*data.Metric {
if ms.Metrics == nil {
if ms == nil || ms.Metrics == nil {
return nil
}
msDims := map[string]string{
Expand Down
17 changes: 11 additions & 6 deletions pkg/event/stream_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,16 +119,21 @@ func (c *strmConsumer) Consume(ctx context.Context, opts ConsumeOptions, handler
}

go func() {
defer func() {
if rec := recover(); rec != nil {
glog.Fatalf("Panic in stream message handler. panicValue=%v, stack=%q", rec, debug.Stack())
}
}()
defer cancel()

for msg := range msgs {
handleMsgRecv := func(msg StreamMessage) {
defer func() {
if rec := recover(); rec != nil {
glog.Fatalf("Panic in stream message handler. panicValue=%q message=%q stack=%q", rec, msg.Data, debug.Stack())
}
}()

handler.HandleMessage(msg)
}

for msg := range msgs {
handleMsgRecv(msg)
}
}()
return nil
}
Expand Down

0 comments on commit 31e37fc

Please sign in to comment.