From 459106163d80806e556f2c23c896cc74acd0c1d3 Mon Sep 17 00:00:00 2001 From: Nodar Ambroladze Date: Thu, 16 May 2024 21:34:04 +0200 Subject: [PATCH 1/8] Gracefully shutdown consumer on interrupts --- pubsub/consumer.go | 48 ++++++++++++++++++++++++++++++++++++++++--- pubsub/pubsub_test.go | 8 +++++++- 2 files changed, 52 insertions(+), 4 deletions(-) diff --git a/pubsub/consumer.go b/pubsub/consumer.go index 7a5078ee00..af1345f059 100644 --- a/pubsub/consumer.go +++ b/pubsub/consumer.go @@ -5,6 +5,10 @@ import ( "encoding/json" "errors" "fmt" + "os" + "os/signal" + "sync/atomic" + "syscall" "time" "github.com/ethereum/go-ethereum/log" @@ -46,6 +50,10 @@ type Consumer[Request any, Response any] struct { redisStream string redisGroup string cfg *ConsumerConfig + // terminating indicates whether interrupt was received, in which case + // consumer should clean up for graceful shutdown. + terminating atomic.Bool + signals chan os.Signal } type Message[Request any] struct { @@ -57,29 +65,51 @@ func NewConsumer[Request any, Response any](client redis.UniversalClient, stream if streamName == "" { return nil, fmt.Errorf("redis stream name cannot be empty") } - consumer := &Consumer[Request, Response]{ + return &Consumer[Request, Response]{ id: uuid.NewString(), client: client, redisStream: streamName, redisGroup: streamName, // There is 1-1 mapping of redis stream and consumer group. cfg: cfg, - } - return consumer, nil + terminating: atomic.Bool{}, + signals: make(chan os.Signal, 1), + }, nil } // Start starts the consumer to iteratively perform heartbeat in configured intervals. func (c *Consumer[Request, Response]) Start(ctx context.Context) { c.StopWaiter.Start(ctx, c) + c.listenForInterrupt() c.StopWaiter.CallIteratively( func(ctx context.Context) time.Duration { + if !c.terminating.Load() { + log.Trace("Consumer is terminating, stopping heartbeat update") + return time.Hour + } c.heartBeat(ctx) return c.cfg.KeepAliveTimeout / 10 }, ) } +// listenForInterrupt launches a thread that notifies the channel when interrupt +// is received. +func (c *Consumer[Request, Response]) listenForInterrupt() { + signal.Notify(c.signals, syscall.SIGINT, syscall.SIGTERM) + c.StopWaiter.LaunchThread(func(ctx context.Context) { + select { + case sig := <-c.signals: + log.Info("Received interrup", "signal", sig.String()) + case <-ctx.Done(): + log.Info("Context is done", "error", ctx.Err()) + } + c.deleteHeartBeat(ctx) + }) +} + func (c *Consumer[Request, Response]) StopAndWait() { c.StopWaiter.StopAndWait() + c.deleteHeartBeat(c.GetContext()) } func heartBeatKey(id string) string { @@ -90,6 +120,18 @@ func (c *Consumer[Request, Response]) heartBeatKey() string { return heartBeatKey(c.id) } +// deleteHeartBeat deletes the heartbeat to indicate it is being shut down. +func (c *Consumer[Request, Response]) deleteHeartBeat(ctx context.Context) { + c.terminating.Store(true) + if err := c.client.Del(ctx, c.heartBeatKey()).Err(); err != nil { + l := log.Info + if ctx.Err() != nil { + l = log.Error + } + l("Deleting heardbeat", "consumer", c.id, "error", err) + } +} + // heartBeat updates the heartBeat key indicating aliveness. func (c *Consumer[Request, Response]) heartBeat(ctx context.Context) { if err := c.client.Set(ctx, c.heartBeatKey(), time.Now().UnixMilli(), 2*c.cfg.KeepAliveTimeout).Err(); err != nil { diff --git a/pubsub/pubsub_test.go b/pubsub/pubsub_test.go index 31f6d9e20a..11407e686f 100644 --- a/pubsub/pubsub_test.go +++ b/pubsub/pubsub_test.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "os" "sort" "testing" @@ -232,7 +233,12 @@ func TestRedisProduce(t *testing.T) { if _, err := consumers[i].Consume(ctx); err != nil { t.Errorf("Error consuming message: %v", err) } - consumers[i].StopAndWait() + // Terminate half of the consumers, send interrupt to others. + if i%2 == 0 { + consumers[i].StopAndWait() + } else { + consumers[i].signals <- os.Interrupt + } } } From f18419b0fb92bf523668d1272c1bd9764c5015ed Mon Sep 17 00:00:00 2001 From: Nodar Ambroladze Date: Thu, 16 May 2024 22:48:37 +0200 Subject: [PATCH 2/8] Delete heartbeat before stopAndWait --- pubsub/consumer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pubsub/consumer.go b/pubsub/consumer.go index af1345f059..d74d4ef1b2 100644 --- a/pubsub/consumer.go +++ b/pubsub/consumer.go @@ -108,8 +108,8 @@ func (c *Consumer[Request, Response]) listenForInterrupt() { } func (c *Consumer[Request, Response]) StopAndWait() { - c.StopWaiter.StopAndWait() c.deleteHeartBeat(c.GetContext()) + c.StopWaiter.StopAndWait() } func heartBeatKey(id string) string { From 3373c4a5b84010a7cd5f3b47a7f43de5cf46f476 Mon Sep 17 00:00:00 2001 From: Nodar Ambroladze Date: Thu, 16 May 2024 23:18:47 +0200 Subject: [PATCH 3/8] Fix test --- pubsub/consumer.go | 1 - pubsub/pubsub_test.go | 7 +++++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/pubsub/consumer.go b/pubsub/consumer.go index d74d4ef1b2..97ab004764 100644 --- a/pubsub/consumer.go +++ b/pubsub/consumer.go @@ -108,7 +108,6 @@ func (c *Consumer[Request, Response]) listenForInterrupt() { } func (c *Consumer[Request, Response]) StopAndWait() { - c.deleteHeartBeat(c.GetContext()) c.StopWaiter.StopAndWait() } diff --git a/pubsub/pubsub_test.go b/pubsub/pubsub_test.go index 11407e686f..9111c5cf66 100644 --- a/pubsub/pubsub_test.go +++ b/pubsub/pubsub_test.go @@ -7,6 +7,7 @@ import ( "os" "sort" "testing" + "time" "github.com/ethereum/go-ethereum/log" "github.com/go-redis/redis/v8" @@ -202,6 +203,7 @@ func consume(ctx context.Context, t *testing.T, consumers []*Consumer[testReques } func TestRedisProduce(t *testing.T) { + log.SetDefault(log.NewLogger(log.NewTerminalHandlerWithLevel(os.Stderr, log.LevelTrace, true))) t.Parallel() for _, tc := range []struct { name string @@ -213,7 +215,7 @@ func TestRedisProduce(t *testing.T) { }, { name: "some consumers killed, others should take over their work", - killConsumers: false, + killConsumers: true, }, } { t.Run(tc.name, func(t *testing.T) { @@ -233,7 +235,7 @@ func TestRedisProduce(t *testing.T) { if _, err := consumers[i].Consume(ctx); err != nil { t.Errorf("Error consuming message: %v", err) } - // Terminate half of the consumers, send interrupt to others. + //Terminate half of the consumers, send interrupt to others. if i%2 == 0 { consumers[i].StopAndWait() } else { @@ -242,6 +244,7 @@ func TestRedisProduce(t *testing.T) { } } + time.Sleep(time.Second) gotMessages, wantResponses := consume(ctx, t, consumers) gotResponses, err := awaitResponses(ctx, promises) if err != nil { From 3fd412e78e725b0bc850e800741b16effab491b3 Mon Sep 17 00:00:00 2001 From: Nodar Ambroladze Date: Thu, 16 May 2024 23:46:56 +0200 Subject: [PATCH 4/8] Fix lint --- pubsub/pubsub_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pubsub/pubsub_test.go b/pubsub/pubsub_test.go index 9111c5cf66..85314dc29a 100644 --- a/pubsub/pubsub_test.go +++ b/pubsub/pubsub_test.go @@ -235,7 +235,7 @@ func TestRedisProduce(t *testing.T) { if _, err := consumers[i].Consume(ctx); err != nil { t.Errorf("Error consuming message: %v", err) } - //Terminate half of the consumers, send interrupt to others. + // Terminate half of the consumers, send interrupt to others. if i%2 == 0 { consumers[i].StopAndWait() } else { From 5570fb3e57c574b2004aca9ecd3ad1831bd5ee4c Mon Sep 17 00:00:00 2001 From: Nodar Ambroladze Date: Wed, 22 May 2024 14:18:09 +0200 Subject: [PATCH 5/8] Drop listenForInterrupt, since stopAndWait is already called on sigint --- pubsub/consumer.go | 22 +--------------------- pubsub/pubsub_test.go | 10 +++------- 2 files changed, 4 insertions(+), 28 deletions(-) diff --git a/pubsub/consumer.go b/pubsub/consumer.go index 97ab004764..0288c19e45 100644 --- a/pubsub/consumer.go +++ b/pubsub/consumer.go @@ -5,10 +5,7 @@ import ( "encoding/json" "errors" "fmt" - "os" - "os/signal" "sync/atomic" - "syscall" "time" "github.com/ethereum/go-ethereum/log" @@ -53,7 +50,6 @@ type Consumer[Request any, Response any] struct { // terminating indicates whether interrupt was received, in which case // consumer should clean up for graceful shutdown. terminating atomic.Bool - signals chan os.Signal } type Message[Request any] struct { @@ -72,14 +68,12 @@ func NewConsumer[Request any, Response any](client redis.UniversalClient, stream redisGroup: streamName, // There is 1-1 mapping of redis stream and consumer group. cfg: cfg, terminating: atomic.Bool{}, - signals: make(chan os.Signal, 1), }, nil } // Start starts the consumer to iteratively perform heartbeat in configured intervals. func (c *Consumer[Request, Response]) Start(ctx context.Context) { c.StopWaiter.Start(ctx, c) - c.listenForInterrupt() c.StopWaiter.CallIteratively( func(ctx context.Context) time.Duration { if !c.terminating.Load() { @@ -92,22 +86,8 @@ func (c *Consumer[Request, Response]) Start(ctx context.Context) { ) } -// listenForInterrupt launches a thread that notifies the channel when interrupt -// is received. -func (c *Consumer[Request, Response]) listenForInterrupt() { - signal.Notify(c.signals, syscall.SIGINT, syscall.SIGTERM) - c.StopWaiter.LaunchThread(func(ctx context.Context) { - select { - case sig := <-c.signals: - log.Info("Received interrup", "signal", sig.String()) - case <-ctx.Done(): - log.Info("Context is done", "error", ctx.Err()) - } - c.deleteHeartBeat(ctx) - }) -} - func (c *Consumer[Request, Response]) StopAndWait() { + c.deleteHeartBeat(c.GetParentContext()) c.StopWaiter.StopAndWait() } diff --git a/pubsub/pubsub_test.go b/pubsub/pubsub_test.go index 85314dc29a..cdf5fa1ef6 100644 --- a/pubsub/pubsub_test.go +++ b/pubsub/pubsub_test.go @@ -232,15 +232,11 @@ func TestRedisProduce(t *testing.T) { // Consumer messages in every third consumer but don't ack them to check // that other consumers will claim ownership on those messages. for i := 0; i < len(consumers); i += 3 { + consumers[i].Start(ctx) if _, err := consumers[i].Consume(ctx); err != nil { t.Errorf("Error consuming message: %v", err) } - // Terminate half of the consumers, send interrupt to others. - if i%2 == 0 { - consumers[i].StopAndWait() - } else { - consumers[i].signals <- os.Interrupt - } + consumers[i].StopAndWait() } } @@ -252,7 +248,7 @@ func TestRedisProduce(t *testing.T) { } producer.StopAndWait() for _, c := range consumers { - c.StopWaiter.StopAndWait() + c.StopAndWait() } got, err := mergeValues(gotMessages) if err != nil { From 9a866114c9ea15e6efd0bf4452dfa0ec67cdb3b8 Mon Sep 17 00:00:00 2001 From: Nodar Ambroladze Date: Wed, 22 May 2024 14:55:52 +0200 Subject: [PATCH 6/8] Fix test --- pubsub/pubsub_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pubsub/pubsub_test.go b/pubsub/pubsub_test.go index cdf5fa1ef6..72504602e3 100644 --- a/pubsub/pubsub_test.go +++ b/pubsub/pubsub_test.go @@ -285,6 +285,7 @@ func TestRedisReproduceDisabled(t *testing.T) { // Consumer messages in every third consumer but don't ack them to check // that other consumers will claim ownership on those messages. for i := 0; i < len(consumers); i += 3 { + consumers[i].Start(ctx) if _, err := consumers[i].Consume(ctx); err != nil { t.Errorf("Error consuming message: %v", err) } From 16c95d730f4614625c9d5a10b88984d06aaac645 Mon Sep 17 00:00:00 2001 From: Nodar Ambroladze Date: Thu, 23 May 2024 17:28:11 +0200 Subject: [PATCH 7/8] Delete heartbeat after stopAndWait --- pubsub/consumer.go | 2 +- system_tests/block_validator_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pubsub/consumer.go b/pubsub/consumer.go index 0288c19e45..4b51d24f2d 100644 --- a/pubsub/consumer.go +++ b/pubsub/consumer.go @@ -87,8 +87,8 @@ func (c *Consumer[Request, Response]) Start(ctx context.Context) { } func (c *Consumer[Request, Response]) StopAndWait() { - c.deleteHeartBeat(c.GetParentContext()) c.StopWaiter.StopAndWait() + c.deleteHeartBeat(c.GetParentContext()) } func heartBeatKey(id string) string { diff --git a/system_tests/block_validator_test.go b/system_tests/block_validator_test.go index dfd892a079..debd6d4c7c 100644 --- a/system_tests/block_validator_test.go +++ b/system_tests/block_validator_test.go @@ -72,7 +72,7 @@ func testBlockValidatorSimple(t *testing.T, dasModeString string, workloadLoops redisURL := "" if useRedisStreams { redisURL = redisutil.CreateTestRedis(ctx, t) - validatorConfig.BlockValidator.RedisValidationClientConfig = redis.DefaultValidationClientConfig + validatorConfig.BlockValidator.RedisValidationClientConfig = redis.TestValidationClientConfig validatorConfig.BlockValidator.RedisValidationClientConfig.RedisURL = redisURL } From 56fc8d4b867351b9d2ed7714360389dd5d5b76ee Mon Sep 17 00:00:00 2001 From: Nodar Ambroladze Date: Thu, 23 May 2024 20:50:19 +0200 Subject: [PATCH 8/8] Drop terminating atomic bool --- pubsub/consumer.go | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/pubsub/consumer.go b/pubsub/consumer.go index 4b51d24f2d..c9590de8e6 100644 --- a/pubsub/consumer.go +++ b/pubsub/consumer.go @@ -5,7 +5,6 @@ import ( "encoding/json" "errors" "fmt" - "sync/atomic" "time" "github.com/ethereum/go-ethereum/log" @@ -47,9 +46,6 @@ type Consumer[Request any, Response any] struct { redisStream string redisGroup string cfg *ConsumerConfig - // terminating indicates whether interrupt was received, in which case - // consumer should clean up for graceful shutdown. - terminating atomic.Bool } type Message[Request any] struct { @@ -67,7 +63,6 @@ func NewConsumer[Request any, Response any](client redis.UniversalClient, stream redisStream: streamName, redisGroup: streamName, // There is 1-1 mapping of redis stream and consumer group. cfg: cfg, - terminating: atomic.Bool{}, }, nil } @@ -76,10 +71,6 @@ func (c *Consumer[Request, Response]) Start(ctx context.Context) { c.StopWaiter.Start(ctx, c) c.StopWaiter.CallIteratively( func(ctx context.Context) time.Duration { - if !c.terminating.Load() { - log.Trace("Consumer is terminating, stopping heartbeat update") - return time.Hour - } c.heartBeat(ctx) return c.cfg.KeepAliveTimeout / 10 }, @@ -101,7 +92,6 @@ func (c *Consumer[Request, Response]) heartBeatKey() string { // deleteHeartBeat deletes the heartbeat to indicate it is being shut down. func (c *Consumer[Request, Response]) deleteHeartBeat(ctx context.Context) { - c.terminating.Store(true) if err := c.client.Del(ctx, c.heartBeatKey()).Err(); err != nil { l := log.Info if ctx.Err() != nil {