diff --git a/pubsub/consumer.go b/pubsub/consumer.go index 97ab004764..0288c19e45 100644 --- a/pubsub/consumer.go +++ b/pubsub/consumer.go @@ -5,10 +5,7 @@ import ( "encoding/json" "errors" "fmt" - "os" - "os/signal" "sync/atomic" - "syscall" "time" "github.com/ethereum/go-ethereum/log" @@ -53,7 +50,6 @@ type Consumer[Request any, Response any] struct { // terminating indicates whether interrupt was received, in which case // consumer should clean up for graceful shutdown. terminating atomic.Bool - signals chan os.Signal } type Message[Request any] struct { @@ -72,14 +68,12 @@ func NewConsumer[Request any, Response any](client redis.UniversalClient, stream redisGroup: streamName, // There is 1-1 mapping of redis stream and consumer group. cfg: cfg, terminating: atomic.Bool{}, - signals: make(chan os.Signal, 1), }, nil } // Start starts the consumer to iteratively perform heartbeat in configured intervals. func (c *Consumer[Request, Response]) Start(ctx context.Context) { c.StopWaiter.Start(ctx, c) - c.listenForInterrupt() c.StopWaiter.CallIteratively( func(ctx context.Context) time.Duration { if !c.terminating.Load() { @@ -92,22 +86,8 @@ func (c *Consumer[Request, Response]) Start(ctx context.Context) { ) } -// listenForInterrupt launches a thread that notifies the channel when interrupt -// is received. -func (c *Consumer[Request, Response]) listenForInterrupt() { - signal.Notify(c.signals, syscall.SIGINT, syscall.SIGTERM) - c.StopWaiter.LaunchThread(func(ctx context.Context) { - select { - case sig := <-c.signals: - log.Info("Received interrup", "signal", sig.String()) - case <-ctx.Done(): - log.Info("Context is done", "error", ctx.Err()) - } - c.deleteHeartBeat(ctx) - }) -} - func (c *Consumer[Request, Response]) StopAndWait() { + c.deleteHeartBeat(c.GetParentContext()) c.StopWaiter.StopAndWait() } diff --git a/pubsub/pubsub_test.go b/pubsub/pubsub_test.go index 85314dc29a..cdf5fa1ef6 100644 --- a/pubsub/pubsub_test.go +++ b/pubsub/pubsub_test.go @@ -232,15 +232,11 @@ func TestRedisProduce(t *testing.T) { // Consumer messages in every third consumer but don't ack them to check // that other consumers will claim ownership on those messages. for i := 0; i < len(consumers); i += 3 { + consumers[i].Start(ctx) if _, err := consumers[i].Consume(ctx); err != nil { t.Errorf("Error consuming message: %v", err) } - // Terminate half of the consumers, send interrupt to others. - if i%2 == 0 { - consumers[i].StopAndWait() - } else { - consumers[i].signals <- os.Interrupt - } + consumers[i].StopAndWait() } } @@ -252,7 +248,7 @@ func TestRedisProduce(t *testing.T) { } producer.StopAndWait() for _, c := range consumers { - c.StopWaiter.StopAndWait() + c.StopAndWait() } got, err := mergeValues(gotMessages) if err != nil {