diff --git a/AUTHORS b/AUTHORS index aea2144..f00e248 100644 --- a/AUTHORS +++ b/AUTHORS @@ -26,3 +26,4 @@ Peter Kurfer Sam Roberts Moritz Wanzenböck Jenni Griesmann +Nicholas Kwan \ No newline at end of file diff --git a/internal/event/stream.go b/internal/event/stream.go index 98ae269..f7f080c 100644 --- a/internal/event/stream.go +++ b/internal/event/stream.go @@ -41,6 +41,7 @@ type Stream struct { in, out chan Event // terminates processing + ctx context.Context shutdown context.CancelFunc } @@ -59,7 +60,7 @@ func NewStream(program uint32, cbID int32) *Stream { // Start the processing loop, which will return a routine we can use to // shut the queue down later. - s.shutdown = s.start() + s.start() return s } @@ -83,7 +84,10 @@ func (s *Stream) Recv() chan Event { // Push appends a new event to the queue. func (s *Stream) Push(e Event) { - s.in <- e + select { + case s.in <- e: + case <-s.ctx.Done(): + } } // Shutdown gracefully terminates Stream processing, releasing all internal @@ -97,12 +101,10 @@ func (s *Stream) Shutdown() { // start starts the event processing loop, which will continue to run until // terminated by the returned context.CancelFunc. -func (s *Stream) start() context.CancelFunc { - ctx, cancel := context.WithCancel(context.Background()) - - go s.process(ctx) +func (s *Stream) start() { + s.ctx, s.shutdown = context.WithCancel(context.Background()) - return cancel + go s.process(s.ctx) } // process manages an Stream's lifecycle until canceled by the provided context.