Skip to content

Commit

Permalink
chore: change done queue to zset (#35)
Browse files Browse the repository at this point in the history
  • Loading branch information
didil authored May 15, 2023
1 parent 1d5c10b commit 0731eb5
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 10 deletions.
9 changes: 4 additions & 5 deletions pkg/services/processing_results_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,16 @@ func NewProcessingResultsService(timeSvc TimeService, redisStore RedisStore, ret
}

func (s *processingResultsService) HandleFailed(ctx context.Context, sink *models.Sink, m *models.Message, processingErr error) (*models.QueuedInfo, error) {
now := s.timeSvc.Now()
m.DeliveryAttempts = append(m.DeliveryAttempts,
&models.DeliveryAttempt{
At: now,
At: s.timeSvc.Now(),
Status: models.DeliveryAttemptStatusFailed,
Error: processingErr.Error(),
},
)

nextAttemptInterval := s.retryCalculator.NextAttemptInterval(len(m.DeliveryAttempts), sink.RetryInterval, sink.RetryExpMultiplier)
m.DeliverAfter = now.Add(nextAttemptInterval)
m.DeliverAfter = s.timeSvc.Now().Add(nextAttemptInterval)

var maxAttempts int
if sink.MaxAttempts == nil {
Expand Down Expand Up @@ -67,7 +66,7 @@ func (s *processingResultsService) HandleFailed(ctx context.Context, sink *model
return &models.QueuedInfo{MessageID: m.ID, QueueStatus: models.QueueStatusDead, DeliverAfter: m.DeliverAfter}, nil
}

queueStatus := getQueueStatus(m, now)
queueStatus := getQueueStatus(m, s.timeSvc.Now())
destQueueKey := queueKey(m.FlowID, m.SinkID, queueStatus)

switch queueStatus {
Expand Down Expand Up @@ -106,7 +105,7 @@ func (s *processingResultsService) HandleOK(ctx context.Context, m *models.Messa
}

// update message and move to done
err = s.redisStore.SetAndMove(ctx, mKey, b, sourceQueueKey, destQueueKey, m.ID)
err = s.redisStore.SetLRemZAdd(ctx, mKey, b, sourceQueueKey, destQueueKey, m.ID, float64(s.timeSvc.Now().Unix()))
if err != nil {
return errors.Wrapf(err, "failed to set and move to done")
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/services/processing_results_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func TestProcessingResultsServiceHandleOK(t *testing.T) {
retryCalculator := mocks.NewMockRetryCalculator(ctrl)

now := time.Date(2023, 05, 5, 8, 9, 12, 0, time.UTC)
timeSvc.EXPECT().Now().Return(now)
timeSvc.EXPECT().Now().AnyTimes().Return(now)

flowID := "flow-1"
sinkID := "sink-1"
Expand Down Expand Up @@ -56,7 +56,7 @@ func TestProcessingResultsServiceHandleOK(t *testing.T) {
b, err := json.Marshal(&mUpdated)
assert.NoError(t, err)

redisStore.EXPECT().SetAndMove(ctx, messageKey, b, sourceQueueKey, destQueueKey, mID).Return(nil)
redisStore.EXPECT().SetLRemZAdd(ctx, messageKey, b, sourceQueueKey, destQueueKey, mID, float64(now.Unix())).Return(nil)

s := NewProcessingResultsService(timeSvc, redisStore, retryCalculator)
err = s.HandleOK(ctx, m)
Expand All @@ -74,7 +74,7 @@ func TestProcessingResultsServiceHandleFailed_Dead(t *testing.T) {
retryCalculator := mocks.NewMockRetryCalculator(ctrl)

now := time.Date(2023, 05, 5, 8, 9, 12, 0, time.UTC)
timeSvc.EXPECT().Now().Return(now)
timeSvc.EXPECT().Now().AnyTimes().Return(now)

flowID := "flow-1"
sinkID := "sink-1"
Expand Down Expand Up @@ -146,7 +146,7 @@ func TestProcessingResultsServiceHandleFailed_Scheduled(t *testing.T) {
retryCalculator := mocks.NewMockRetryCalculator(ctrl)

now := time.Date(2023, 05, 5, 8, 9, 12, 0, time.UTC)
timeSvc.EXPECT().Now().Return(now)
timeSvc.EXPECT().Now().AnyTimes().Return(now)

flowID := "flow-1"
sinkID := "sink-1"
Expand Down Expand Up @@ -217,7 +217,7 @@ func TestProcessingResultsServiceHandleFailed_Ready(t *testing.T) {
retryCalculator := mocks.NewMockRetryCalculator(ctrl)

now := time.Date(2023, 05, 5, 8, 9, 12, 0, time.UTC)
timeSvc.EXPECT().Now().Return(now)
timeSvc.EXPECT().Now().AnyTimes().Return(now)

flowID := "flow-1"
sinkID := "sink-1"
Expand Down

0 comments on commit 0731eb5

Please sign in to comment.