Skip to content

Commit

Permalink
Move stuck processing messages to ready periodically (#33)
Browse files Browse the repository at this point in the history
* feat: move stuck processing to ready

fix: fix shutdown signal handling

* fix: fix ready queue error handling

* test: fix move processing to ready tests

* test: add ingest no flow test
  • Loading branch information
didil authored May 15, 2023
1 parent 2f8ff7f commit 1d5c10b
Show file tree
Hide file tree
Showing 19 changed files with 633 additions and 43 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ flows:
url: https://example.com/othertarget
retryInterval: 5m # on error, retry after 5 minutes
# retryExpMultiplier: 2 # exponential backoff
maxAttemps: 10 # maximum number of attempts
maxAttempts: 10 # maximum number of attempts
```
With this config, inhooks will listen to http POST requests to `/api/v1/ingest/source-1-slug`.
Expand Down
31 changes: 24 additions & 7 deletions cmd/api/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"net/http"
"os"
"os/signal"
"sync"
"syscall"

"github.com/didil/inhooks/pkg/lib"
Expand Down Expand Up @@ -66,7 +67,6 @@ func main() {

app := handlers.NewApp(
handlers.WithLogger(logger),
handlers.WithAppConfig(appConf),
handlers.WithInhooksConfigService(inhooksConfigSvc),
handlers.WithMessageBuilder(messageBuilder),
handlers.WithMessageEnqueuer(messageEnqueuer),
Expand All @@ -80,40 +80,55 @@ func main() {
Handler: r,
}

wg := &sync.WaitGroup{}

wg.Add(1)
go func() {
logger.Info("listening ...", zap.String("addr", addr))
err = httpServer.ListenAndServe()
if err != nil {
if err != nil && err != http.ErrServerClosed {
logger.Fatal("listener failure", zap.Error(err))
}
logger.Info("http server shutdown")
wg.Done()
}()

httpClient := lib.NewHttpClient(appConf)

messageProcessor := services.NewMessageProcessor(httpClient)
retryCalculator := services.NewRetryCalculator()
processingResultsService := services.NewProcessingResultsService(timeSvc, redisStore, retryCalculator)
schedulerService := services.NewSchedulerService(redisStore, timeSvc)
processingResultsSvc := services.NewProcessingResultsService(timeSvc, redisStore, retryCalculator)
schedulerSvc := services.NewSchedulerService(redisStore, timeSvc)

processingRecoverySvc, err := services.NewProcessingRecoveryService(redisStore)
if err != nil {
logger.Fatal("failed to init ProcessingRecoveryService", zap.Error(err))
}

svisor := supervisor.NewSupervisor(
supervisor.WithLogger(logger),
supervisor.WithMessageFetcher(messageFetcher),
supervisor.WithAppConfig(appConf),
supervisor.WithInhooksConfigService(inhooksConfigSvc),
supervisor.WithMessageProcessor(messageProcessor),
supervisor.WithProcessingResultsService(processingResultsService),
supervisor.WithSchedulerService(schedulerService),
supervisor.WithProcessingResultsService(processingResultsSvc),
supervisor.WithSchedulerService(schedulerSvc),
supervisor.WithProcessingRecoveryService(processingRecoverySvc),
)

wg.Add(1)
go func() {
logger.Info("starting supervisor ...")
svisor.Start()
logger.Info("supervisor shutdown")
wg.Done()
}()

sigs := make(chan os.Signal, 1)
signal.Notify(sigs, os.Interrupt, syscall.SIGTERM)

<-sigs
sig := <-sigs
logger.Info("received shutdown signal, shutting down process", zap.String("signal", sig.String()))

svisor.Shutdown()

Expand All @@ -123,4 +138,6 @@ func main() {
if err != nil {
logger.Fatal("http server shutdown failed", zap.Error(err))
}

wg.Wait()
}
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ require (
github.com/daixiang0/gci v0.10.1 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/denis-tingaikin/go-header v0.4.3 // indirect
github.com/dgraph-io/ristretto v0.1.1 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/dustin/go-humanize v1.0.0 // indirect
github.com/esimonov/ifshort v1.0.4 // indirect
github.com/ettle/strcase v0.1.1 // indirect
github.com/fatih/color v1.15.0 // indirect
Expand All @@ -65,6 +67,7 @@ require (
github.com/go-xmlfmt/xmlfmt v1.1.2 // indirect
github.com/gobwas/glob v0.2.3 // indirect
github.com/gofrs/flock v0.8.1 // indirect
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/golangci/check v0.0.0-20180506172741-cfe4005ccda2 // indirect
github.com/golangci/dupl v0.0.0-20180902072040-3e9179ac440a // indirect
Expand Down
7 changes: 7 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,13 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/denis-tingaikin/go-header v0.4.3 h1:tEaZKAlqql6SKCY++utLmkPLd6K8IBM20Ha7UVm+mtU=
github.com/denis-tingaikin/go-header v0.4.3/go.mod h1:0wOCWuN71D5qIgE2nz9KrKmuYBAC2Mra5RassOIQ2/c=
github.com/dgraph-io/ristretto v0.1.1 h1:6CWw5tJNgpegArSHpNHJKldNeq03FQCwYvfMVWajOK8=
github.com/dgraph-io/ristretto v0.1.1/go.mod h1:S1GPSBCYCIhmVNfcth17y2zZtQT6wzkzgwUve0VDWWA=
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo=
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
Expand Down Expand Up @@ -178,6 +183,7 @@ github.com/gobwas/glob v0.2.3/go.mod h1:d3Ez4x06l9bZtSvzIay5+Yzi0fmZzPgnTbPcKjJA
github.com/gofrs/flock v0.8.1 h1:+gYjHKf32LDeiEEFhQaotPbLuUXjY5ZqxKgXy7n59aw=
github.com/gofrs/flock v0.8.1/go.mod h1:F1TvTiK9OcQqauNUHlbJvyl9Qa1QvF/gOUDKA14jxHU=
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
Expand Down Expand Up @@ -745,6 +751,7 @@ golang.org/x/sys v0.0.0-20220702020025-31831981b65f/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20221010170243-090e33056c14/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.3.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
Expand Down
2 changes: 2 additions & 0 deletions pkg/lib/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ type SupervisorConfig struct {
ReadyWaitTime time.Duration `env:"SUPERVISOR_READY_WAIT_TIME,default=5s"`
ErrSleepTime time.Duration `env:"SUPERVISOR_ERR_SLEEP_TIME,default=5s"`
SchedulerInterval time.Duration `env:"SUPERVISOR_SCHEDULER_INTERVAL,default=30s"`
// interval to move back stuck messages from processing to ready queue
ProcessingRecoveryInterval time.Duration `env:"SUPERVISOR_PROCESSING_RECOVERY_INTERVAL,default=5m"`
}

type HTTPClientConfig struct {
Expand Down
8 changes: 0 additions & 8 deletions pkg/server/handlers/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,12 @@ import (
"encoding/json"
"net/http"

"github.com/didil/inhooks/pkg/lib"
"github.com/didil/inhooks/pkg/services"
"go.uber.org/zap"
)

type App struct {
logger *zap.Logger
appConf *lib.AppConfig
inhooksConfigSvc services.InhooksConfigService
messageBuilder services.MessageBuilder
messageEnqueuer services.MessageEnqueuer
Expand All @@ -35,12 +33,6 @@ func WithLogger(logger *zap.Logger) AppOpt {
}
}

func WithAppConfig(appConf *lib.AppConfig) AppOpt {
return func(app *App) {
app.appConf = appConf
}
}

func WithInhooksConfigService(inhooksConfigSvc services.InhooksConfigService) AppOpt {
return func(app *App) {
app.inhooksConfigSvc = inhooksConfigSvc
Expand Down
43 changes: 42 additions & 1 deletion pkg/server/handlers/ingest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
"go.uber.org/zap"
)

func TestUpdateLB(t *testing.T) {
func TestIngest_OK(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

Expand Down Expand Up @@ -75,3 +75,44 @@ func TestUpdateLB(t *testing.T) {
err = json.NewDecoder(resp.Body).Decode(jsonOK)
assert.NoError(t, err)
}

func TestIngest_FlowNotFound(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

inhooksConfigSvc := mocks.NewMockInhooksConfigService(ctrl)
messageBuilder := mocks.NewMockMessageBuilder(ctrl)
messageEnqueuer := mocks.NewMockMessageEnqueuer(ctrl)
logger, err := zap.NewDevelopment()
assert.NoError(t, err)

app := handlers.NewApp(
handlers.WithLogger(logger),
handlers.WithInhooksConfigService(inhooksConfigSvc),
handlers.WithMessageBuilder(messageBuilder),
handlers.WithMessageEnqueuer(messageEnqueuer),
)
r := server.NewRouter(app)
s := httptest.NewServer(r)
defer s.Close()

inhooksConfigSvc.EXPECT().FindFlowForSource("my-source").Return(nil)

buf := bytes.NewBufferString(`{"id": "abc"}`)

req, err := http.NewRequest(http.MethodPost, s.URL+"/api/v1/ingest/my-source", buf)
assert.NoError(t, err)

cl := &http.Client{}
resp, err := cl.Do(req)
assert.NoError(t, err)
defer resp.Body.Close()

assert.Equal(t, http.StatusNotFound, resp.StatusCode)

jsonErr := &handlers.JSONErr{}
err = json.NewDecoder(resp.Body).Decode(jsonErr)
assert.NoError(t, err)

assert.Equal(t, "unknown source slug my-source", jsonErr.Error)
}
81 changes: 81 additions & 0 deletions pkg/services/processing_recovery_service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package services

import (
"context"
"time"

"github.com/dgraph-io/ristretto"
"github.com/didil/inhooks/pkg/models"
"github.com/pkg/errors"
)

type ProcessingRecoveryService interface {
MoveProcessingToReady(ctx context.Context, flow *models.Flow, sink *models.Sink, processingRecoveryInterval time.Duration) ([]string, error)
AddToCache(mID string, ttl time.Duration)
}

type processingRecoveryService struct {
redisStore RedisStore
cache *ristretto.Cache
}

func NewProcessingRecoveryService(redisStore RedisStore) (ProcessingRecoveryService, error) {
cache, err := ristretto.NewCache(&ristretto.Config{
NumCounters: 2e5, // number of keys to track frequency of (200K).
MaxCost: 2 * (1 << 20), // maximum cost of cache (1MB).
BufferItems: 64, // number of keys per Get buffer.
})
if err != nil {
return nil, errors.Wrapf(err, "failed to init cache")
}

svc := &processingRecoveryService{
redisStore: redisStore,
cache: cache,
}

return svc, nil
}

const recoveryCacheDummyValue = "1"

func (s *processingRecoveryService) MoveProcessingToReady(ctx context.Context, flow *models.Flow, sink *models.Sink, ttl time.Duration) ([]string, error) {
sourceQueueKey := queueKey(flow.ID, sink.ID, models.QueueStatusProcessing)
destQueueKey := queueKey(flow.ID, sink.ID, models.QueueStatusReady)
messageIDs, err := s.redisStore.LRangeAll(ctx, sourceQueueKey)
if err != nil {
return nil, errors.Wrapf(err, "failed to lrangeall")
}
if len(messageIDs) == 0 {
// nothing in processing
return nil, nil
}

messageIDsToMove := []string{}
for _, mID := range messageIDs {
if _, found := s.cache.Get(mID); found {
// stuck message found
messageIDsToMove = append(messageIDsToMove, mID)
} else {
s.cache.SetWithTTL(mID, recoveryCacheDummyValue, 1, ttl)
}
}

if len(messageIDsToMove) == 0 {
// no messages to move
return messageIDsToMove, nil
}

err = s.redisStore.LRemRPush(ctx, sourceQueueKey, destQueueKey, messageIDsToMove)
if err != nil {
return nil, errors.Wrapf(err, "failed to lremrpush")
}

return messageIDsToMove, nil
}

// only used for testing
func (s *processingRecoveryService) AddToCache(mID string, ttl time.Duration) {
s.cache.SetWithTTL(mID, recoveryCacheDummyValue, 1, ttl)
s.cache.Wait()
}
Loading

0 comments on commit 1d5c10b

Please sign in to comment.