Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Jobs in retryable state not retried #649

Open
petoc opened this issue Oct 17, 2024 · 0 comments
Open

Jobs in retryable state not retried #649

petoc opened this issue Oct 17, 2024 · 0 comments

Comments

@petoc
Copy link

petoc commented Oct 17, 2024

Hi,

I am unable to hit MaxAttempts (default 25). After one or two retries, jobs are set to retryable state and they are never retried again, according to RetryPolicy.
Below is example. How I understand configuration, it should retry job 24 times, every ~5 seconds.
Is there something I am missing.

package main

import (
	"context"
	"errors"
	"log/slog"
	"os"
	"time"

	"github.com/jackc/pgx/v5/pgxpool"
	"github.com/riverqueue/river"
	"github.com/riverqueue/river/riverdriver/riverpgxv5"
	"github.com/riverqueue/river/rivershared/util/slogutil"
	"github.com/riverqueue/river/rivertype"
)

type RetryableIssueJob struct{}

func (j RetryableIssueJob) Kind() string {
	return "retryable_issue"
}

type RetryableIssueWorker struct {
	river.WorkerDefaults[RetryableIssueJob]
}

type LinearRetryPolicy struct{}

func (policy *LinearRetryPolicy) NextRetry(job *rivertype.JobRow) time.Time {
	return time.Now().Add(time.Duration(len(job.Errors)*5) * time.Second)
}

func (w *RetryableIssueWorker) Work(ctx context.Context, job *river.Job[RetryableIssueJob]) error {
	time.Sleep(1 * time.Second)
	return errors.New("retryable issue job failed")
}

func main() {
	ctx := context.Background()
	dbPool, err := pgxpool.New(ctx, os.Getenv("DATABASE_URL"))
	if err != nil {
		panic(err)
	}
	defer dbPool.Close()
	workers := river.NewWorkers()
	river.AddWorker(workers, &RetryableIssueWorker{})
	riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
		Logger: slog.New(&slogutil.SlogMessageOnlyHandler{Level: slog.LevelWarn}),
		Queues: map[string]river.QueueConfig{
			river.QueueDefault: {MaxWorkers: 10},
		},
		Workers:     workers,
		RetryPolicy: &LinearRetryPolicy{},
	})
	if err != nil {
		panic(err)
	}
	if err := riverClient.Start(ctx); err != nil {
		panic(err)
	}
	if _, err = riverClient.Insert(ctx, RetryableIssueJob{}, nil); err != nil {
		panic(err)
	}
	w := make(chan struct{})
	<-w
}

Also in documentation in Job retries section in LinearRetryPolicy example there is probably old function name NextAt. It should be NextRetry.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant