Skip to content

Commit

Permalink
Drop terminating atomic bool
Browse files Browse the repository at this point in the history
  • Loading branch information
anodar committed May 23, 2024
1 parent 16c95d7 commit 56fc8d4
Showing 1 changed file with 0 additions and 10 deletions.
10 changes: 0 additions & 10 deletions pubsub/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"encoding/json"
"errors"
"fmt"
"sync/atomic"
"time"

"github.com/ethereum/go-ethereum/log"
Expand Down Expand Up @@ -47,9 +46,6 @@ type Consumer[Request any, Response any] struct {
redisStream string
redisGroup string
cfg *ConsumerConfig
// terminating indicates whether interrupt was received, in which case
// consumer should clean up for graceful shutdown.
terminating atomic.Bool
}

type Message[Request any] struct {
Expand All @@ -67,7 +63,6 @@ func NewConsumer[Request any, Response any](client redis.UniversalClient, stream
redisStream: streamName,
redisGroup: streamName, // There is 1-1 mapping of redis stream and consumer group.
cfg: cfg,
terminating: atomic.Bool{},
}, nil
}

Expand All @@ -76,10 +71,6 @@ func (c *Consumer[Request, Response]) Start(ctx context.Context) {
c.StopWaiter.Start(ctx, c)
c.StopWaiter.CallIteratively(
func(ctx context.Context) time.Duration {
if !c.terminating.Load() {
log.Trace("Consumer is terminating, stopping heartbeat update")
return time.Hour
}
c.heartBeat(ctx)
return c.cfg.KeepAliveTimeout / 10
},
Expand All @@ -101,7 +92,6 @@ func (c *Consumer[Request, Response]) heartBeatKey() string {

// deleteHeartBeat deletes the heartbeat to indicate it is being shut down.
func (c *Consumer[Request, Response]) deleteHeartBeat(ctx context.Context) {
c.terminating.Store(true)
if err := c.client.Del(ctx, c.heartBeatKey()).Err(); err != nil {
l := log.Info
if ctx.Err() != nil {
Expand Down

0 comments on commit 56fc8d4

Please sign in to comment.