diff --git a/process/schedule.go b/process/schedule.go index e06fac1..0673c28 100644 --- a/process/schedule.go +++ b/process/schedule.go @@ -7,6 +7,7 @@ import ( "time" "github.com/luno/jettison/errors" + "github.com/luno/jettison/j" "github.com/luno/jettison/log" "github.com/robfig/cron/v3" @@ -123,26 +124,10 @@ func (r intervalSchedule) Next(t time.Time) time.Time { return next } -// FixedInterval unlike Every will execute on a specific interval only...regardless if the cursor has -// fallen behind. For example, if you specify a duration of 5 min...but your process stops running for 2 hours, the -// process will only execute at the next 5-min interval once, where process.Every will execute for all the missed 5-min intervals -// during the 2-hour outage. -func FixedInterval(period time.Duration, opts ...EveryOption) Schedule { - s := fixedIntervalSchedule{ - intervalSchedule: newIntervalSchedule(period, opts...), - } - - return s -} - -type fixedIntervalSchedule struct { - intervalSchedule -} - // Previous this method returns the expected last run time. It uses this to compare with the // actual last run time and ensure that the process only runs once for all the intervals in between the // last run time and "now". -func (r fixedIntervalSchedule) Previous(now time.Time) time.Time { +func (r intervalSchedule) Previous(now time.Time) time.Time { prev := now.Truncate(r.Period).Add(r.Offset) if prev.After(now) { prev = prev.Add(-1 * r.Period) @@ -151,6 +136,10 @@ func (r fixedIntervalSchedule) Previous(now time.Time) time.Time { return prev } +// FixedInterval is deprecated. +// Deprecated: Use Every. +var FixedInterval = Every + // TimeOfDay returns a Schedule that will trigger at the same time every day // hour is based on the 24-hour clock. func TimeOfDay(hour, minute int) Schedule { @@ -282,8 +271,14 @@ func (r scheduleRunner) doNext(ctx context.Context) error { if err != nil { return err } + next := nextExecution(r.o.clock.Now(), lastDone, r.when, r.o.name) + ctx = log.ContextWith(ctx, j.MKV{ + "schedule_last": lastDone, + "schedule_next": next, + }) + if r.o.maxErrors > 0 && r.ErrCount >= r.o.maxErrors { return setRunDone(ctx, next, r.cursor, r.o.name) } @@ -294,6 +289,8 @@ func (r scheduleRunner) doNext(ctx context.Context) error { runID := fmt.Sprintf("%s_%d", r.o.name, next.Unix()) + ctx = log.ContextWith(ctx, j.MKV{"schedule_run_id": runID}) + if err := r.f(ctx, lastDone, next, runID); err != nil { return err } diff --git a/process/schedule_test.go b/process/schedule_test.go index 6dd9f48..3383f3f 100644 --- a/process/schedule_test.go +++ b/process/schedule_test.go @@ -218,11 +218,11 @@ func TestNextExecution(t *testing.T) { expNext: must(time.Parse(time.RFC3339, "2022-01-22T14:00:00Z")), }, { - name: "last in the future still returns next", + name: "last in the future returns previous", now: must(time.Parse(time.RFC3339, "2022-01-22T13:24:01Z")), last: must(time.Parse(time.RFC3339, "2022-01-22T13:44:00Z")), spec: Every(time.Hour), - expNext: must(time.Parse(time.RFC3339, "2022-01-22T14:00:00Z")), + expNext: must(time.Parse(time.RFC3339, "2022-01-22T13:00:00Z")), }, { name: "offset handled", @@ -259,41 +259,6 @@ func TestNextExecution(t *testing.T) { spec: TimeOfDay(15, 0), expNext: must(time.Parse(time.RFC3339, "2022-01-22T15:00:00Z")), }, - { - name: "fixed interval with cursor far in the past", - now: must(time.Parse(time.RFC3339, "2022-01-21T12:15:00Z")), - last: must(time.Parse(time.RFC3339, "2022-01-21T10:00:00Z")), - spec: FixedInterval(time.Hour), - expNext: must(time.Parse(time.RFC3339, "2022-01-21T12:00:00Z")), - }, - { - name: "fixed interval with cursor in the past but now the same as expected run time", - now: must(time.Parse(time.RFC3339, "2022-01-21T12:00:00Z")), - last: must(time.Parse(time.RFC3339, "2022-01-21T10:00:00Z")), - spec: FixedInterval(time.Hour), - expNext: must(time.Parse(time.RFC3339, "2022-01-21T12:00:00Z")), - }, - { - name: "fixed interval with cursor updated", - now: must(time.Parse(time.RFC3339, "2022-01-21T12:00:00Z")), - last: must(time.Parse(time.RFC3339, "2022-01-21T12:00:00Z")), - spec: FixedInterval(time.Hour), - expNext: must(time.Parse(time.RFC3339, "2022-01-21T13:00:00Z")), - }, - { - name: "fixed interval with offset", - now: must(time.Parse(time.RFC3339, "2022-01-21T12:20:00Z")), - last: must(time.Parse(time.RFC3339, "2022-01-21T08:00:00Z")), - spec: FixedInterval(time.Hour, WithOffset(time.Minute)), - expNext: must(time.Parse(time.RFC3339, "2022-01-21T12:01:00Z")), - }, - { - name: "fixed interval with historic cursor and offset run time and now value", - now: must(time.Parse(time.RFC3339, "2022-01-21T12:15:00Z")), - last: must(time.Parse(time.RFC3339, "2022-01-21T08:00:00Z")), - spec: FixedInterval(time.Hour, WithOffset(20*time.Minute)), - expNext: must(time.Parse(time.RFC3339, "2022-01-21T11:20:00Z")), - }, } for _, tc := range testCases {