Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Debug bootstrap for intermittent test in periodic job scheduler #271

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 16 additions & 16 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -69,27 +69,27 @@ jobs:

- name: Test
working-directory: .
run: go test -p 1 -race ./... -timeout 2m
run: go test ./internal/maintenance -run TestPeriodicJobEnqueuer/EnqueuesPeriodicJobs -race -count 100

- name: Test cmd/river
working-directory: ./cmd/river
run: go test -race ./... -timeout 2m
# - name: Test cmd/river
# working-directory: ./cmd/river
# run: go test -race ./... -timeout 2m

- name: Test riverdriver
working-directory: ./riverdriver
run: go test -race ./... -timeout 2m
# - name: Test riverdriver
# working-directory: ./riverdriver
# run: go test -race ./... -timeout 2m

- name: Test riverdriver/riverdatabasesql
working-directory: ./riverdriver/riverdatabasesql
run: go test -race ./... -timeout 2m
# - name: Test riverdriver/riverdatabasesql
# working-directory: ./riverdriver/riverdatabasesql
# run: go test -race ./... -timeout 2m

- name: Test riverdriver/riverpgxv5
working-directory: ./riverdriver/riverpgxv5
run: go test -race ./... -timeout 2m
# - name: Test riverdriver/riverpgxv5
# working-directory: ./riverdriver/riverpgxv5
# run: go test -race ./... -timeout 2m

- name: Test rivertype
working-directory: ./rivertype
run: go test -race ./... -timeout 2m
# - name: Test rivertype
# working-directory: ./rivertype
# run: go test -race ./... -timeout 2m

cli:
runs-on: ubuntu-latest
Expand Down
21 changes: 17 additions & 4 deletions internal/maintenance/periodic_job_enqueuer.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,15 +122,12 @@ func (s *PeriodicJobEnqueuer) Start(ctx context.Context) error {
insertParamsMany []*riverdriver.JobInsertFastParams
insertParamsUnique []*insertParamsAndUniqueOpts
)
now := s.TimeNowUTC()

for _, periodicJob := range s.periodicJobs {
// Expect client to have validated any user input in a safer way
// already, but do a second pass for internal uses.
periodicJob.mustValidate()

periodicJob.nextRunAt = periodicJob.ScheduleFunc(now)

if periodicJob.RunOnStart {
if insertParams, uniqueOpts, ok := s.insertParamsFromConstructor(ctx, periodicJob.ConstructorFunc); ok {
if !uniqueOpts.IsEmpty() {
Expand All @@ -143,6 +140,11 @@ func (s *PeriodicJobEnqueuer) Start(ctx context.Context) error {
}

s.insertBatch(ctx, insertParamsMany, insertParamsUnique)

now := s.TimeNowUTC()
for _, periodicJob := range s.periodicJobs {
periodicJob.nextRunAt = periodicJob.ScheduleFunc(now)
}
}

s.TestSignals.EnteredLoop.Signal(struct{}{})
Expand All @@ -160,16 +162,24 @@ func (s *PeriodicJobEnqueuer) Start(ctx context.Context) error {

now := s.TimeNowUTC()

s.Logger.InfoContext(ctx, "--- Timer elapsed; scheduling jobs", "now", now)

// Add a small margin to the current time so we're not only
// running jobs that are already ready, but also ones ready at
// this exact moment or ready in the very near future.
nowWithMargin := now.Add(10 * time.Millisecond)
nowWithMargin := now.Add(100 * time.Millisecond)
// nowWithMargin := now

for _, periodicJob := range s.periodicJobs {
insertParams, _, _ := s.insertParamsFromConstructor(ctx, periodicJob.ConstructorFunc)

if !periodicJob.nextRunAt.Before(nowWithMargin) {
s.Logger.InfoContext(ctx, "Skipping job because not ready", "kind", insertParams.Kind, "next_run_at", periodicJob.nextRunAt)
continue
}

s.Logger.InfoContext(ctx, "Scheduling job because ready to go", "kind", insertParams.Kind, "next_run_at", periodicJob.nextRunAt)

periodicJob.nextRunAt = periodicJob.ScheduleFunc(now)

if insertParams, uniqueOpts, ok := s.insertParamsFromConstructor(ctx, periodicJob.ConstructorFunc); ok {
Expand All @@ -187,6 +197,9 @@ func (s *PeriodicJobEnqueuer) Start(ctx context.Context) error {
// paused during work. Makes its firing more deterministic.
timerUntilNextRun.Reset(s.timeUntilNextRun())

untilNextRun := s.timeUntilNextRun()
s.Logger.InfoContext(ctx, "Timer until next run", "time_until_next_run", untilNextRun, "next_run_at", time.Now().Add(untilNextRun))

case <-ctx.Done():
// Clean up timer resources. We know it has _not_ received from the
// timer since its last reset because that would have led us to the case
Expand Down
Loading