From 1d5c10b5769e1247a3568e4677f88fa7c998dbc5 Mon Sep 17 00:00:00 2001 From: Adil H <1284255+didil@users.noreply.github.com> Date: Mon, 15 May 2023 14:48:40 +0100 Subject: [PATCH] Move stuck processing messages to ready periodically (#33) * feat: move stuck processing to ready fix: fix shutdown signal handling * fix: fix ready queue error handling * test: fix move processing to ready tests * test: add ingest no flow test --- README.md | 2 +- cmd/api/main.go | 31 ++++- go.mod | 3 + go.sum | 7 + pkg/lib/config.go | 2 + pkg/server/handlers/app.go | 8 -- pkg/server/handlers/ingest_test.go | 43 +++++- pkg/services/processing_recovery_service.go | 81 ++++++++++++ .../processing_recovery_service_test.go | 122 ++++++++++++++++++ pkg/services/redis_store.go | 32 +++++ pkg/services/redis_store_test.go | 71 +++++++++- pkg/supervisor/processing.go | 45 +++++++ pkg/supervisor/processing_test.go | 54 ++++++++ pkg/supervisor/ready.go | 24 +++- pkg/supervisor/scheduled.go | 19 +-- pkg/supervisor/supervisor.go | 38 ++++-- pkg/testsupport/mocks/gen_mocks.sh | 1 + .../mocks/mock_processing_recovery_service.go | 64 +++++++++ pkg/testsupport/mocks/mock_redis_store.go | 29 +++++ 19 files changed, 633 insertions(+), 43 deletions(-) create mode 100644 pkg/services/processing_recovery_service.go create mode 100644 pkg/services/processing_recovery_service_test.go create mode 100644 pkg/supervisor/processing.go create mode 100644 pkg/supervisor/processing_test.go create mode 100644 pkg/testsupport/mocks/mock_processing_recovery_service.go diff --git a/README.md b/README.md index cf197cf..0894b15 100644 --- a/README.md +++ b/README.md @@ -38,7 +38,7 @@ flows: url: https://example.com/othertarget retryInterval: 5m # on error, retry after 5 minutes # retryExpMultiplier: 2 # exponential backoff - maxAttemps: 10 # maximum number of attempts + maxAttempts: 10 # maximum number of attempts ``` With this config, inhooks will listen to http POST requests to `/api/v1/ingest/source-1-slug`. diff --git a/cmd/api/main.go b/cmd/api/main.go index 6f3da0b..ab406aa 100644 --- a/cmd/api/main.go +++ b/cmd/api/main.go @@ -7,6 +7,7 @@ import ( "net/http" "os" "os/signal" + "sync" "syscall" "github.com/didil/inhooks/pkg/lib" @@ -66,7 +67,6 @@ func main() { app := handlers.NewApp( handlers.WithLogger(logger), - handlers.WithAppConfig(appConf), handlers.WithInhooksConfigService(inhooksConfigSvc), handlers.WithMessageBuilder(messageBuilder), handlers.WithMessageEnqueuer(messageEnqueuer), @@ -80,20 +80,30 @@ func main() { Handler: r, } + wg := &sync.WaitGroup{} + + wg.Add(1) go func() { logger.Info("listening ...", zap.String("addr", addr)) err = httpServer.ListenAndServe() - if err != nil { + if err != nil && err != http.ErrServerClosed { logger.Fatal("listener failure", zap.Error(err)) } + logger.Info("http server shutdown") + wg.Done() }() httpClient := lib.NewHttpClient(appConf) messageProcessor := services.NewMessageProcessor(httpClient) retryCalculator := services.NewRetryCalculator() - processingResultsService := services.NewProcessingResultsService(timeSvc, redisStore, retryCalculator) - schedulerService := services.NewSchedulerService(redisStore, timeSvc) + processingResultsSvc := services.NewProcessingResultsService(timeSvc, redisStore, retryCalculator) + schedulerSvc := services.NewSchedulerService(redisStore, timeSvc) + + processingRecoverySvc, err := services.NewProcessingRecoveryService(redisStore) + if err != nil { + logger.Fatal("failed to init ProcessingRecoveryService", zap.Error(err)) + } svisor := supervisor.NewSupervisor( supervisor.WithLogger(logger), @@ -101,19 +111,24 @@ func main() { supervisor.WithAppConfig(appConf), supervisor.WithInhooksConfigService(inhooksConfigSvc), supervisor.WithMessageProcessor(messageProcessor), - supervisor.WithProcessingResultsService(processingResultsService), - supervisor.WithSchedulerService(schedulerService), + supervisor.WithProcessingResultsService(processingResultsSvc), + supervisor.WithSchedulerService(schedulerSvc), + supervisor.WithProcessingRecoveryService(processingRecoverySvc), ) + wg.Add(1) go func() { logger.Info("starting supervisor ...") svisor.Start() + logger.Info("supervisor shutdown") + wg.Done() }() sigs := make(chan os.Signal, 1) signal.Notify(sigs, os.Interrupt, syscall.SIGTERM) - <-sigs + sig := <-sigs + logger.Info("received shutdown signal, shutting down process", zap.String("signal", sig.String())) svisor.Shutdown() @@ -123,4 +138,6 @@ func main() { if err != nil { logger.Fatal("http server shutdown failed", zap.Error(err)) } + + wg.Wait() } diff --git a/go.mod b/go.mod index 4710b7b..68394ad 100644 --- a/go.mod +++ b/go.mod @@ -46,7 +46,9 @@ require ( github.com/daixiang0/gci v0.10.1 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/denis-tingaikin/go-header v0.4.3 // indirect + github.com/dgraph-io/ristretto v0.1.1 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect + github.com/dustin/go-humanize v1.0.0 // indirect github.com/esimonov/ifshort v1.0.4 // indirect github.com/ettle/strcase v0.1.1 // indirect github.com/fatih/color v1.15.0 // indirect @@ -65,6 +67,7 @@ require ( github.com/go-xmlfmt/xmlfmt v1.1.2 // indirect github.com/gobwas/glob v0.2.3 // indirect github.com/gofrs/flock v0.8.1 // indirect + github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b // indirect github.com/golang/protobuf v1.5.2 // indirect github.com/golangci/check v0.0.0-20180506172741-cfe4005ccda2 // indirect github.com/golangci/dupl v0.0.0-20180902072040-3e9179ac440a // indirect diff --git a/go.sum b/go.sum index 376688b..b1f7207 100644 --- a/go.sum +++ b/go.sum @@ -116,8 +116,13 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/denis-tingaikin/go-header v0.4.3 h1:tEaZKAlqql6SKCY++utLmkPLd6K8IBM20Ha7UVm+mtU= github.com/denis-tingaikin/go-header v0.4.3/go.mod h1:0wOCWuN71D5qIgE2nz9KrKmuYBAC2Mra5RassOIQ2/c= +github.com/dgraph-io/ristretto v0.1.1 h1:6CWw5tJNgpegArSHpNHJKldNeq03FQCwYvfMVWajOK8= +github.com/dgraph-io/ristretto v0.1.1/go.mod h1:S1GPSBCYCIhmVNfcth17y2zZtQT6wzkzgwUve0VDWWA= +github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= +github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo= +github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= @@ -178,6 +183,7 @@ github.com/gobwas/glob v0.2.3/go.mod h1:d3Ez4x06l9bZtSvzIay5+Yzi0fmZzPgnTbPcKjJA github.com/gofrs/flock v0.8.1 h1:+gYjHKf32LDeiEEFhQaotPbLuUXjY5ZqxKgXy7n59aw= github.com/gofrs/flock v0.8.1/go.mod h1:F1TvTiK9OcQqauNUHlbJvyl9Qa1QvF/gOUDKA14jxHU= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= +github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= @@ -745,6 +751,7 @@ golang.org/x/sys v0.0.0-20220702020025-31831981b65f/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20221010170243-090e33056c14/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.3.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/pkg/lib/config.go b/pkg/lib/config.go index 1e73314..1a28d49 100644 --- a/pkg/lib/config.go +++ b/pkg/lib/config.go @@ -33,6 +33,8 @@ type SupervisorConfig struct { ReadyWaitTime time.Duration `env:"SUPERVISOR_READY_WAIT_TIME,default=5s"` ErrSleepTime time.Duration `env:"SUPERVISOR_ERR_SLEEP_TIME,default=5s"` SchedulerInterval time.Duration `env:"SUPERVISOR_SCHEDULER_INTERVAL,default=30s"` + // interval to move back stuck messages from processing to ready queue + ProcessingRecoveryInterval time.Duration `env:"SUPERVISOR_PROCESSING_RECOVERY_INTERVAL,default=5m"` } type HTTPClientConfig struct { diff --git a/pkg/server/handlers/app.go b/pkg/server/handlers/app.go index b450abb..003183b 100644 --- a/pkg/server/handlers/app.go +++ b/pkg/server/handlers/app.go @@ -4,14 +4,12 @@ import ( "encoding/json" "net/http" - "github.com/didil/inhooks/pkg/lib" "github.com/didil/inhooks/pkg/services" "go.uber.org/zap" ) type App struct { logger *zap.Logger - appConf *lib.AppConfig inhooksConfigSvc services.InhooksConfigService messageBuilder services.MessageBuilder messageEnqueuer services.MessageEnqueuer @@ -35,12 +33,6 @@ func WithLogger(logger *zap.Logger) AppOpt { } } -func WithAppConfig(appConf *lib.AppConfig) AppOpt { - return func(app *App) { - app.appConf = appConf - } -} - func WithInhooksConfigService(inhooksConfigSvc services.InhooksConfigService) AppOpt { return func(app *App) { app.inhooksConfigSvc = inhooksConfigSvc diff --git a/pkg/server/handlers/ingest_test.go b/pkg/server/handlers/ingest_test.go index 15ec164..f13327a 100644 --- a/pkg/server/handlers/ingest_test.go +++ b/pkg/server/handlers/ingest_test.go @@ -16,7 +16,7 @@ import ( "go.uber.org/zap" ) -func TestUpdateLB(t *testing.T) { +func TestIngest_OK(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -75,3 +75,44 @@ func TestUpdateLB(t *testing.T) { err = json.NewDecoder(resp.Body).Decode(jsonOK) assert.NoError(t, err) } + +func TestIngest_FlowNotFound(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + inhooksConfigSvc := mocks.NewMockInhooksConfigService(ctrl) + messageBuilder := mocks.NewMockMessageBuilder(ctrl) + messageEnqueuer := mocks.NewMockMessageEnqueuer(ctrl) + logger, err := zap.NewDevelopment() + assert.NoError(t, err) + + app := handlers.NewApp( + handlers.WithLogger(logger), + handlers.WithInhooksConfigService(inhooksConfigSvc), + handlers.WithMessageBuilder(messageBuilder), + handlers.WithMessageEnqueuer(messageEnqueuer), + ) + r := server.NewRouter(app) + s := httptest.NewServer(r) + defer s.Close() + + inhooksConfigSvc.EXPECT().FindFlowForSource("my-source").Return(nil) + + buf := bytes.NewBufferString(`{"id": "abc"}`) + + req, err := http.NewRequest(http.MethodPost, s.URL+"/api/v1/ingest/my-source", buf) + assert.NoError(t, err) + + cl := &http.Client{} + resp, err := cl.Do(req) + assert.NoError(t, err) + defer resp.Body.Close() + + assert.Equal(t, http.StatusNotFound, resp.StatusCode) + + jsonErr := &handlers.JSONErr{} + err = json.NewDecoder(resp.Body).Decode(jsonErr) + assert.NoError(t, err) + + assert.Equal(t, "unknown source slug my-source", jsonErr.Error) +} diff --git a/pkg/services/processing_recovery_service.go b/pkg/services/processing_recovery_service.go new file mode 100644 index 0000000..d3a53ac --- /dev/null +++ b/pkg/services/processing_recovery_service.go @@ -0,0 +1,81 @@ +package services + +import ( + "context" + "time" + + "github.com/dgraph-io/ristretto" + "github.com/didil/inhooks/pkg/models" + "github.com/pkg/errors" +) + +type ProcessingRecoveryService interface { + MoveProcessingToReady(ctx context.Context, flow *models.Flow, sink *models.Sink, processingRecoveryInterval time.Duration) ([]string, error) + AddToCache(mID string, ttl time.Duration) +} + +type processingRecoveryService struct { + redisStore RedisStore + cache *ristretto.Cache +} + +func NewProcessingRecoveryService(redisStore RedisStore) (ProcessingRecoveryService, error) { + cache, err := ristretto.NewCache(&ristretto.Config{ + NumCounters: 2e5, // number of keys to track frequency of (200K). + MaxCost: 2 * (1 << 20), // maximum cost of cache (1MB). + BufferItems: 64, // number of keys per Get buffer. + }) + if err != nil { + return nil, errors.Wrapf(err, "failed to init cache") + } + + svc := &processingRecoveryService{ + redisStore: redisStore, + cache: cache, + } + + return svc, nil +} + +const recoveryCacheDummyValue = "1" + +func (s *processingRecoveryService) MoveProcessingToReady(ctx context.Context, flow *models.Flow, sink *models.Sink, ttl time.Duration) ([]string, error) { + sourceQueueKey := queueKey(flow.ID, sink.ID, models.QueueStatusProcessing) + destQueueKey := queueKey(flow.ID, sink.ID, models.QueueStatusReady) + messageIDs, err := s.redisStore.LRangeAll(ctx, sourceQueueKey) + if err != nil { + return nil, errors.Wrapf(err, "failed to lrangeall") + } + if len(messageIDs) == 0 { + // nothing in processing + return nil, nil + } + + messageIDsToMove := []string{} + for _, mID := range messageIDs { + if _, found := s.cache.Get(mID); found { + // stuck message found + messageIDsToMove = append(messageIDsToMove, mID) + } else { + s.cache.SetWithTTL(mID, recoveryCacheDummyValue, 1, ttl) + } + } + + if len(messageIDsToMove) == 0 { + // no messages to move + return messageIDsToMove, nil + } + + err = s.redisStore.LRemRPush(ctx, sourceQueueKey, destQueueKey, messageIDsToMove) + if err != nil { + return nil, errors.Wrapf(err, "failed to lremrpush") + } + + return messageIDsToMove, nil +} + +// only used for testing +func (s *processingRecoveryService) AddToCache(mID string, ttl time.Duration) { + s.cache.SetWithTTL(mID, recoveryCacheDummyValue, 1, ttl) + s.cache.Wait() +} diff --git a/pkg/services/processing_recovery_service_test.go b/pkg/services/processing_recovery_service_test.go new file mode 100644 index 0000000..250874f --- /dev/null +++ b/pkg/services/processing_recovery_service_test.go @@ -0,0 +1,122 @@ +package services + +import ( + "context" + "testing" + "time" + + "github.com/didil/inhooks/pkg/models" + "github.com/didil/inhooks/pkg/testsupport/mocks" + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" +) + +func TestProcessingRecoveryService_Empty_Cache(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + ctx := context.Background() + + redisStore := mocks.NewMockRedisStore(ctrl) + + s, err := NewProcessingRecoveryService(redisStore) + assert.NoError(t, err) + + flowID := "flow-id" + flow := &models.Flow{ + ID: flowID, + } + + sinkID := "sink-id" + sink := &models.Sink{ + ID: sinkID, + } + + sourceQueueKey := "f:flow-id:s:sink-id:q:processing" + // destQueueKey := "f:flow-id:s:sink-id:q:ready" + + processingMessagesId := []string{"message-1", "message-2", "message-3"} + redisStore.EXPECT().LRangeAll(ctx, sourceQueueKey).Return(processingMessagesId, nil) + ttl := 50 * time.Millisecond + movedMessageIDs, err := s.MoveProcessingToReady(ctx, flow, sink, ttl) + assert.NoError(t, err) + assert.Equal(t, []string{}, movedMessageIDs) + + //redisStore.EXPECT().LRemRPush(ctx, sourceQueueKey, destQueueKey) +} + +func TestProcessingRecoveryService_Entry_In_Cache(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + ctx := context.Background() + + redisStore := mocks.NewMockRedisStore(ctrl) + + s, err := NewProcessingRecoveryService(redisStore) + assert.NoError(t, err) + + flowID := "flow-id" + flow := &models.Flow{ + ID: flowID, + } + + sinkID := "sink-id" + sink := &models.Sink{ + ID: sinkID, + } + + sourceQueueKey := "f:flow-id:s:sink-id:q:processing" + destQueueKey := "f:flow-id:s:sink-id:q:ready" + + processingMessagesId := []string{"message-1", "message-2", "message-3"} + movedMessageID := "message-2" + redisStore.EXPECT().LRangeAll(ctx, sourceQueueKey).Return(processingMessagesId, nil) + redisStore.EXPECT().LRemRPush(ctx, sourceQueueKey, destQueueKey, []string{"message-2"}) + + ttl := 1 * time.Second + + s.AddToCache(movedMessageID, ttl) + + movedMessageIDs, err := s.MoveProcessingToReady(ctx, flow, sink, ttl) + assert.NoError(t, err) + assert.Equal(t, []string{movedMessageID}, movedMessageIDs) +} + +func TestProcessingRecoveryService_Entry_In_Cache_Expired(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + ctx := context.Background() + + redisStore := mocks.NewMockRedisStore(ctrl) + + s, err := NewProcessingRecoveryService(redisStore) + assert.NoError(t, err) + + flowID := "flow-id" + flow := &models.Flow{ + ID: flowID, + } + + sinkID := "sink-id" + sink := &models.Sink{ + ID: sinkID, + } + + sourceQueueKey := "f:flow-id:s:sink-id:q:processing" + + processingMessagesId := []string{"message-1", "message-2", "message-3"} + movedMessageID := "message-2" + redisStore.EXPECT().LRangeAll(ctx, sourceQueueKey).Return(processingMessagesId, nil) + + ttl := 10 * time.Millisecond + + s.AddToCache(movedMessageID, ttl) + + time.Sleep(ttl) + + movedMessageIDs, err := s.MoveProcessingToReady(ctx, flow, sink, ttl) + assert.NoError(t, err) + assert.Equal(t, []string{}, movedMessageIDs) +} diff --git a/pkg/services/redis_store.go b/pkg/services/redis_store.go index 20d79e6..73dda2c 100644 --- a/pkg/services/redis_store.go +++ b/pkg/services/redis_store.go @@ -20,6 +20,8 @@ type RedisStore interface { BLMove(ctx context.Context, timeout time.Duration, sourceQueueKey string, destQueueKey string) ([]byte, error) ZRangeBelowScore(ctx context.Context, queueKey string, score float64) ([]string, error) ZRemRpush(ctx context.Context, messageIDs []string, sourceQueueKey string, destQueueKey string) error + LRangeAll(ctx context.Context, queueKey string) ([]string, error) + LRemRPush(ctx context.Context, sourceQueueKey, destQueueKey string, messageIDs []string) error } type redisStore struct { @@ -220,3 +222,33 @@ func (s *redisStore) ZRemRpush(ctx context.Context, messageIDs []string, sourceQ return nil } + +func (s *redisStore) LRangeAll(ctx context.Context, queueKey string) ([]string, error) { + queueKeyWithPrefix := s.keyWithPrefix(queueKey) + + vals, err := s.client.LRange(ctx, queueKeyWithPrefix, 0, -1).Result() + if err != nil { + return nil, errors.Wrapf(err, "failed to lrange. queueKey: %s", queueKeyWithPrefix) + } + + return vals, nil +} + +func (s *redisStore) LRemRPush(ctx context.Context, sourceQueueKey, destQueueKey string, messageIDs []string) error { + pipe := s.client.TxPipeline() + + sourceKeyWithPrefix := s.keyWithPrefix(sourceQueueKey) + destKeyWithPrefix := s.keyWithPrefix(destQueueKey) + + for _, messageID := range messageIDs { + pipe.LRem(ctx, sourceKeyWithPrefix, 0, messageID) + pipe.RPush(ctx, destKeyWithPrefix, messageID) + } + + _, err := pipe.Exec(ctx) + if err != nil { + return err + } + + return nil +} diff --git a/pkg/services/redis_store_test.go b/pkg/services/redis_store_test.go index 18b657a..2e8c086 100644 --- a/pkg/services/redis_store_test.go +++ b/pkg/services/redis_store_test.go @@ -111,7 +111,7 @@ func (s *RedisStoreSuite) TestBLMove() { s.NoError(err) s.Equal([]string{`{"id": 789}`}, destResults) - timeOut := 1 * time.Second + timeOut := 1 * time.Second // minimum value accepted by redis is 1 second val1, err := s.redisStore.BLMove(ctx, timeOut, sourceQueueKey, destQueueKey) s.NoError(err) @@ -372,3 +372,72 @@ func (s *RedisStoreSuite) TestZRemRpush() { s.Equal([]string{"message-3", "message-1"}, queueResults) } + +func (s *RedisStoreSuite) TestLRangeAll() { + ctx := context.Background() + prefix := fmt.Sprintf("inhooks:%s", s.appConf.Redis.InhooksDBName) + defer func() { + err := testsupport.DeleteAllRedisKeys(ctx, s.client, prefix) + s.NoError(err) + }() + + value1 := []byte(`message-1`) + value2 := []byte(`message-2`) + + queueKey := "q:processing" + + err := s.redisStore.Enqueue(ctx, queueKey, value1) + s.NoError(err) + err = s.redisStore.Enqueue(ctx, queueKey, value2) + s.NoError(err) + + results, err := s.redisStore.LRangeAll(ctx, queueKey) + s.NoError(err) + + s.Equal([]string{`message-1`, `message-2`}, results) +} + +func (s *RedisStoreSuite) TestLRemRPush() { + ctx := context.Background() + prefix := fmt.Sprintf("inhooks:%s", s.appConf.Redis.InhooksDBName) + defer func() { + err := testsupport.DeleteAllRedisKeys(ctx, s.client, prefix) + s.NoError(err) + }() + + value1 := []byte(`message-1`) + value2 := []byte(`message-2`) + value3 := []byte(`message-3`) + value4 := []byte(`message-4`) + + sourceQueueKey := "q:processing" + destQueueKey := "q:ready" + + err := s.redisStore.Enqueue(ctx, sourceQueueKey, value1) + s.NoError(err) + err = s.redisStore.Enqueue(ctx, sourceQueueKey, value2) + s.NoError(err) + err = s.redisStore.Enqueue(ctx, sourceQueueKey, value3) + s.NoError(err) + err = s.redisStore.Enqueue(ctx, destQueueKey, value4) + s.NoError(err) + + results, err := s.redisStore.LRangeAll(ctx, sourceQueueKey) + s.NoError(err) + s.Equal([]string{`message-1`, `message-2`, `message-3`}, results) + + results, err = s.redisStore.LRangeAll(ctx, destQueueKey) + s.NoError(err) + s.Equal([]string{`message-4`}, results) + + err = s.redisStore.LRemRPush(ctx, sourceQueueKey, destQueueKey, []string{"message-1", "message-3"}) + s.NoError(err) + + results, err = s.redisStore.LRangeAll(ctx, sourceQueueKey) + s.NoError(err) + s.Equal([]string{`message-2`}, results) + + results, err = s.redisStore.LRangeAll(ctx, destQueueKey) + s.NoError(err) + s.Equal([]string{`message-4`, "message-1", "message-3"}, results) +} diff --git a/pkg/supervisor/processing.go b/pkg/supervisor/processing.go new file mode 100644 index 0000000..f169cac --- /dev/null +++ b/pkg/supervisor/processing.go @@ -0,0 +1,45 @@ +package supervisor + +import ( + "context" + "time" + + "github.com/didil/inhooks/pkg/models" + "go.uber.org/zap" +) + +// move stuck messages from processing to ready queue periodically +func (s *Supervisor) HandleProcessingQueue(ctx context.Context, f *models.Flow, sink *models.Sink) { + logger := s.logger.With(zap.String("flowID", f.ID), zap.String("sinkID", sink.ID)) + for { + movedMessageIds, err := s.MoveProcessingToReady(ctx, f, sink) + if err != nil { + logger.Error("failed to move processing to ready", zap.Error(err)) + } + if len(movedMessageIds) > 0 { + logger.Info("moved stuck messages from processing to ready", zap.Strings("messageIDs", movedMessageIds)) + } + + // wait before next check + timer := time.NewTimer(s.appConf.Supervisor.ProcessingRecoveryInterval) + + select { + case <-s.ctx.Done(): + return + case <-timer.C: + continue + } + } +} + +func (s *Supervisor) MoveProcessingToReady(ctx context.Context, f *models.Flow, sink *models.Sink) ([]string, error) { + // cache keys for twice the processing recovery interval + // this avoids the recovery process from interfering with legitimate retry attempts + ttl := 2 * s.appConf.Supervisor.ProcessingRecoveryInterval + movedMessageIds, err := s.processingRecoverySvc.MoveProcessingToReady(ctx, f, sink, ttl) + if err != nil { + return nil, err + } + + return movedMessageIds, nil +} diff --git a/pkg/supervisor/processing_test.go b/pkg/supervisor/processing_test.go new file mode 100644 index 0000000..34e7c43 --- /dev/null +++ b/pkg/supervisor/processing_test.go @@ -0,0 +1,54 @@ +package supervisor + +import ( + "context" + "testing" + + "github.com/didil/inhooks/pkg/models" + "github.com/didil/inhooks/pkg/testsupport" + "github.com/didil/inhooks/pkg/testsupport/mocks" + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" + "go.uber.org/zap" +) + +func TestSupervisorMoveProcessingToReady(t *testing.T) { + ctx := context.Background() + + appConf, err := testsupport.InitAppConfig(ctx) + assert.NoError(t, err) + + appConf.Supervisor.ErrSleepTime = 0 + + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + flowId1 := "flow-1" + sinkID1 := "sink-1" + + sink1 := &models.Sink{ + ID: sinkID1, + } + + flow1 := &models.Flow{ + ID: flowId1, + Sinks: []*models.Sink{sink1}, + } + + processingRecoverySvc := mocks.NewMockProcessingRecoveryService(ctrl) + movedMessageIds := []string{"message-1", "message-2"} + processingRecoverySvc.EXPECT().MoveProcessingToReady(ctx, flow1, sink1, 2*appConf.Supervisor.ProcessingRecoveryInterval).Return(movedMessageIds, nil) + + logger, err := zap.NewDevelopment() + assert.NoError(t, err) + + s := NewSupervisor( + WithProcessingRecoveryService(processingRecoverySvc), + WithAppConfig(appConf), + WithLogger(logger), + ) + + messageIds, err := s.MoveProcessingToReady(ctx, flow1, sink1) + assert.NoError(t, err) + assert.Equal(t, movedMessageIds, messageIds) +} diff --git a/pkg/supervisor/ready.go b/pkg/supervisor/ready.go index 0789c28..4327f10 100644 --- a/pkg/supervisor/ready.go +++ b/pkg/supervisor/ready.go @@ -10,17 +10,29 @@ import ( ) func (s *Supervisor) HandleReadyQueue(ctx context.Context, f *models.Flow, sink *models.Sink) { + logger := s.logger.With(zap.String("flowID", f.ID), zap.String("sinkID", sink.ID)) + for { + err := s.FetchAndProcess(ctx, f, sink) + if err != nil && !errors.Is(err, context.Canceled) { + logger.Error("failed to fetch and processed", zap.Error(err)) + // wait before retrying + timer := time.NewTimer(s.appConf.Supervisor.ErrSleepTime) + + select { + case <-s.ctx.Done(): + return + case <-timer.C: + continue + } + } + + // check if channel closed select { case <-s.ctx.Done(): return default: - err := s.FetchAndProcess(ctx, f, sink) - if err != nil { - s.logger.Error("failed to fetch and processed", zap.Error(err)) - // wait before retrying - time.Sleep(s.appConf.Supervisor.ErrSleepTime) - } + continue } } } diff --git a/pkg/supervisor/scheduled.go b/pkg/supervisor/scheduled.go index 99246f0..da9d1d5 100644 --- a/pkg/supervisor/scheduled.go +++ b/pkg/supervisor/scheduled.go @@ -9,18 +9,21 @@ import ( ) func (s *Supervisor) HandleScheduledQueue(ctx context.Context, f *models.Flow, sink *models.Sink) { + logger := s.logger.With(zap.String("flowID", f.ID), zap.String("sinkID", sink.ID)) for { + err := s.MoveDueScheduled(ctx, f, sink) + if err != nil { + logger.Error("failed to move due scheduled", zap.Error(err)) + } + + // wait before next check + timer := time.NewTimer(s.appConf.Supervisor.SchedulerInterval) + select { case <-s.ctx.Done(): return - default: - logger := s.logger.With(zap.String("flowID", f.ID), zap.String("sinkID", sink.ID)) - err := s.MoveDueScheduled(ctx, f, sink) - if err != nil { - logger.Error("failed to move due scheduled", zap.Error(err)) - } - // wait before next check - time.Sleep(s.appConf.Supervisor.SchedulerInterval) + case <-timer.C: + continue } } } diff --git a/pkg/supervisor/supervisor.go b/pkg/supervisor/supervisor.go index 62486ee..5bd583e 100644 --- a/pkg/supervisor/supervisor.go +++ b/pkg/supervisor/supervisor.go @@ -10,15 +10,16 @@ import ( ) type Supervisor struct { - logger *zap.Logger - messageFetcher services.MessageFetcher - ctx context.Context - cancel context.CancelFunc - appConf *lib.AppConfig - inhooksConfigSvc services.InhooksConfigService - messageProcessor services.MessageProcessor - processingResultsSvc services.ProcessingResultsService - schedulerSvc services.SchedulerService + logger *zap.Logger + messageFetcher services.MessageFetcher + ctx context.Context + cancel context.CancelFunc + appConf *lib.AppConfig + inhooksConfigSvc services.InhooksConfigService + messageProcessor services.MessageProcessor + processingResultsSvc services.ProcessingResultsService + schedulerSvc services.SchedulerService + processingRecoverySvc services.ProcessingRecoveryService } type SupervisorOpt func(s *Supervisor) @@ -79,6 +80,12 @@ func WithSchedulerService(schedulerService services.SchedulerService) Supervisor } } +func WithProcessingRecoveryService(processingRecoverySvc services.ProcessingRecoveryService) SupervisorOpt { + return func(s *Supervisor) { + s.processingRecoverySvc = processingRecoverySvc + } +} + func (s *Supervisor) Start() { wg := &sync.WaitGroup{} flows := s.inhooksConfigSvc.GetFlows() @@ -87,16 +94,25 @@ func (s *Supervisor) Start() { for j := 0; j < len(f.Sinks); j++ { sink := f.Sinks[j] - wg.Add(2) + logger := s.logger.With(zap.String("flowID", f.ID), zap.String("sinkID", sink.ID)) + + wg.Add(3) + + go func() { + s.HandleProcessingQueue(s.ctx, f, sink) + logger.Info("processing queue handler shutdown") + wg.Done() + }() go func() { - //TODO: move all from processing to ready (in case of previous crash) s.HandleReadyQueue(s.ctx, f, sink) + logger.Info("ready queue handler shutdown") wg.Done() }() go func() { s.HandleScheduledQueue(s.ctx, f, sink) + logger.Info("scheduled queue handler shutdown") wg.Done() }() } diff --git a/pkg/testsupport/mocks/gen_mocks.sh b/pkg/testsupport/mocks/gen_mocks.sh index 5a085cf..2acf4b8 100755 --- a/pkg/testsupport/mocks/gen_mocks.sh +++ b/pkg/testsupport/mocks/gen_mocks.sh @@ -11,6 +11,7 @@ services=( "processing_results_service" "scheduler_service" "retry_calculator" + "processing_recovery_service" ) for service in ${services[@]} diff --git a/pkg/testsupport/mocks/mock_processing_recovery_service.go b/pkg/testsupport/mocks/mock_processing_recovery_service.go new file mode 100644 index 0000000..9b1549b --- /dev/null +++ b/pkg/testsupport/mocks/mock_processing_recovery_service.go @@ -0,0 +1,64 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: pkg/services/processing_recovery_service.go + +// Package mocks is a generated GoMock package. +package mocks + +import ( + context "context" + reflect "reflect" + time "time" + + models "github.com/didil/inhooks/pkg/models" + gomock "github.com/golang/mock/gomock" +) + +// MockProcessingRecoveryService is a mock of ProcessingRecoveryService interface. +type MockProcessingRecoveryService struct { + ctrl *gomock.Controller + recorder *MockProcessingRecoveryServiceMockRecorder +} + +// MockProcessingRecoveryServiceMockRecorder is the mock recorder for MockProcessingRecoveryService. +type MockProcessingRecoveryServiceMockRecorder struct { + mock *MockProcessingRecoveryService +} + +// NewMockProcessingRecoveryService creates a new mock instance. +func NewMockProcessingRecoveryService(ctrl *gomock.Controller) *MockProcessingRecoveryService { + mock := &MockProcessingRecoveryService{ctrl: ctrl} + mock.recorder = &MockProcessingRecoveryServiceMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockProcessingRecoveryService) EXPECT() *MockProcessingRecoveryServiceMockRecorder { + return m.recorder +} + +// AddToCache mocks base method. +func (m *MockProcessingRecoveryService) AddToCache(mID string, ttl time.Duration) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "AddToCache", mID, ttl) +} + +// AddToCache indicates an expected call of AddToCache. +func (mr *MockProcessingRecoveryServiceMockRecorder) AddToCache(mID, ttl interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddToCache", reflect.TypeOf((*MockProcessingRecoveryService)(nil).AddToCache), mID, ttl) +} + +// MoveProcessingToReady mocks base method. +func (m *MockProcessingRecoveryService) MoveProcessingToReady(ctx context.Context, flow *models.Flow, sink *models.Sink, processingRecoveryInterval time.Duration) ([]string, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "MoveProcessingToReady", ctx, flow, sink, processingRecoveryInterval) + ret0, _ := ret[0].([]string) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// MoveProcessingToReady indicates an expected call of MoveProcessingToReady. +func (mr *MockProcessingRecoveryServiceMockRecorder) MoveProcessingToReady(ctx, flow, sink, processingRecoveryInterval interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MoveProcessingToReady", reflect.TypeOf((*MockProcessingRecoveryService)(nil).MoveProcessingToReady), ctx, flow, sink, processingRecoveryInterval) +} diff --git a/pkg/testsupport/mocks/mock_redis_store.go b/pkg/testsupport/mocks/mock_redis_store.go index 26dfd8a..2d9181f 100644 --- a/pkg/testsupport/mocks/mock_redis_store.go +++ b/pkg/testsupport/mocks/mock_redis_store.go @@ -94,6 +94,35 @@ func (mr *MockRedisStoreMockRecorder) Get(ctx, messageKey interface{}) *gomock.C return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Get", reflect.TypeOf((*MockRedisStore)(nil).Get), ctx, messageKey) } +// LRangeAll mocks base method. +func (m *MockRedisStore) LRangeAll(ctx context.Context, queueKey string) ([]string, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "LRangeAll", ctx, queueKey) + ret0, _ := ret[0].([]string) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// LRangeAll indicates an expected call of LRangeAll. +func (mr *MockRedisStoreMockRecorder) LRangeAll(ctx, queueKey interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LRangeAll", reflect.TypeOf((*MockRedisStore)(nil).LRangeAll), ctx, queueKey) +} + +// LRemRPush mocks base method. +func (m *MockRedisStore) LRemRPush(ctx context.Context, sourceQueueKey, destQueueKey string, messageIDs []string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "LRemRPush", ctx, sourceQueueKey, destQueueKey, messageIDs) + ret0, _ := ret[0].(error) + return ret0 +} + +// LRemRPush indicates an expected call of LRemRPush. +func (mr *MockRedisStoreMockRecorder) LRemRPush(ctx, sourceQueueKey, destQueueKey, messageIDs interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LRemRPush", reflect.TypeOf((*MockRedisStore)(nil).LRemRPush), ctx, sourceQueueKey, destQueueKey, messageIDs) +} + // SetAndEnqueue mocks base method. func (m *MockRedisStore) SetAndEnqueue(ctx context.Context, messageKey string, value []byte, queueKey, messageID string) error { m.ctrl.T.Helper()