Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Any advice on testing worker code that calls: JobCompleteTx #605

Open
advdv opened this issue Sep 20, 2024 · 2 comments
Open

Any advice on testing worker code that calls: JobCompleteTx #605

advdv opened this issue Sep 20, 2024 · 2 comments

Comments

@advdv
Copy link

advdv commented Sep 20, 2024

I'm very excited about this project, thank you and hats off!

I have work function that calls: JobCompleteTx to atomicaly complete the job as well as perform other work on the tx. It seems that this is pretty hard to unit test right now, the JobCompleteTx can only complete if the job actually exist in the database.

Ofcourse the JobCompleteTx can be mocked/replaced in the tests. Or i could insert actualy jobs, but that is a bit harder to do in my specific case because it's periodic job. Or maybe i'm overlooking something?

@brandur
Copy link
Contributor

brandur commented Sep 21, 2024

@advdv Presumably you've tried inserting a job, but aren't able to target it because JobCompleteTx requires that it be running. Is that right?

Hmm, we might be missing a helper that allows you to transition a job from from available to running for testing purposes.

@advdv
Copy link
Author

advdv commented Sep 23, 2024

I did not try to insert the job, I ended up going down a unit-testing route. I would image that for more of an integration testing setup it would definitely be usefull to have a "PrepareJobForTesting" helper function. I now use special tag that i can inspect to not call "JobCompleteTx" during testing. Something like this (NOTE that the tx is real though):

// SkipAtomicCompleteTag can be added to a job's tags to indicate that it should not be completed in the same
// tx as the work. This is mainly useful for testing.
const SkipAtomicCompleteTag = "__no_atomic_complete"

// work utility that runs fn within a transaction that is committed with the job completion, or rolled back
// when an error occurred.
func work[TArgs river.JobArgs](
	ctx context.Context,
	rw *pgxpool.Pool,
	job *river.Job[TArgs],
	fn func(pgx.Tx, *river.Job[TArgs]) error,
) error {
	tx, err := rw.Begin(ctx)
	if err != nil {
		return fmt.Errorf("failed to start tx: %w", err)
	}
	defer tx.Rollback(ctx)

	if err := fn(tx, job); err != nil {
		return err
	}

	if !slices.Contains(job.Tags, SkipAtomicCompleteTag) {
		// in tests, we don't usually have an actual job in the database, so we need to be able to skip this:
		// https://github.com/riverqueue/river/issues/605
		if _, err = river.JobCompleteTx[*riverpgxv5.Driver](ctx, tx, job); err != nil {
			return fmt.Errorf("failed to complete job: %w", err)
		}
	}

	if err = tx.Commit(ctx); err != nil {
		return fmt.Errorf("failed to commit: %w", err)
	}

	return nil
}

For my tests, i setup jobs with a helper like this:

func NewJobT[TArg river.JobArgs](a TArg) *river.Job[TArg] {
	return &river.Job[TArg]{Args: a, JobRow: &rivertype.JobRow{
		Tags: []string{worker.SkipAtomicCompleteTag},
	}}
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants