Skip to content

Commit

Permalink
Merge pull request #10 from ianmcorvidae/periodic-warnings
Browse files Browse the repository at this point in the history
Periodic VICE notifications
  • Loading branch information
ianmcorvidae authored Sep 4, 2024
2 parents 5a715a2 + 168d68e commit 9198033
Show file tree
Hide file tree
Showing 6 changed files with 238 additions and 9 deletions.
87 changes: 87 additions & 0 deletions analyses.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,93 @@ func JobsToKill(ctx context.Context, dedb *sql.DB) ([]Job, error) {
return jobs, nil
}

const periodicWarningsQuery = `
SELECT jobs.id,
jobs.app_id,
jobs.user_id,
jobs.status,
jobs.job_description,
jobs.job_name,
jobs.result_folder_path,
jobs.planned_end_date,
jobs.start_date,
job_types.system_id,
users.username
FROM jobs
JOIN job_types on jobs.job_type_id = job_types.id
JOIN users on jobs.user_id = users.id
LEFT join notif_statuses ON jobs.id = notif_statuses.analysis_id
WHERE jobs.status = $1
AND (notif_statuses.last_periodic_warning is null
OR notif_statuses.last_periodic_warning < now() - coalesce(notif_statuses.periodic_warning_period, '4 hours'::interval))
`

// JobPeriodicWarnings returns a list of running jobs that may need periodic notifications to be sent
func JobPeriodicWarnings(ctx context.Context, dedb *sql.DB) ([]Job, error) {
var (
err error
rows *sql.Rows
)

if rows, err = dedb.QueryContext(
ctx,
periodicWarningsQuery,
"Running",
); err != nil {
return nil, err
}
defer rows.Close()

jobs := []Job{}

for rows.Next() {
var (
job Job
startDate pq.NullTime
plannedEndDate pq.NullTime
)

job = Job{}

if err = rows.Scan(
&job.ID,
&job.AppID,
&job.UserID,
&job.Status,
&job.Description,
&job.Name,
&job.ResultFolder,
&plannedEndDate,
&startDate,
&job.Type,
&job.User,
); err != nil {
return nil, err
}

if plannedEndDate.Valid {
job.PlannedEndDate = plannedEndDate.Time.Format(TimestampFromDBFormat)
}

if startDate.Valid {
job.StartDate = startDate.Time.Format(TimestampFromDBFormat)
}

job.ExternalID, err = getExternalID(ctx, dedb, job.ID)
if err != nil {
return nil, err
}

jobs = append(jobs, job)
}

if err = rows.Err(); err != nil {
return nil, err
}

return jobs, nil
}

const jobWarningsQuery = `
select jobs.id,
jobs.app_id,
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
github.com/cyverse-de/messaging/v9 v9.1.5
github.com/lib/pq v1.10.4
github.com/pkg/errors v0.9.1
github.com/sanyokbig/pqinterval v1.1.2
github.com/sirupsen/logrus v1.4.2
github.com/spf13/viper v1.4.0
github.com/streadway/amqp v1.0.1-0.20200716223359-e6b33f460591
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R
github.com/prometheus/procfs v0.0.0-20190507164030-5867b95ac084/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
github.com/sanyokbig/pqinterval v1.1.2 h1:RzHMPdRMNvSZSDE+Qr20fFWSfBkKPFrLdFhzqmF0VnY=
github.com/sanyokbig/pqinterval v1.1.2/go.mod h1:jJvMjZaZFVqNTNVCd90zcFOkmbJgjxlWWkpu9/VeUFs=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
Expand All @@ -126,6 +128,7 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
github.com/stretchr/objx v0.1.1 h1:2vfRuCMp5sSVIDSqO8oNnWJq7mPa6KVP3iPIwFBuy8A=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
Expand Down
113 changes: 106 additions & 7 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,40 @@ func SendWarningNotification(ctx context.Context, j *Job) error {
return sendNotif(ctx, j, j.Status, subject, msg)
}

func SendPeriodicNotification(ctx context.Context, j *Job) error {
starttime, err := time.Parse(TimestampFromDBFormat, j.StartDate)
if err != nil {
return errors.Wrapf(err, "failed to parse start date %s", j.StartDate)
}
dur := time.Since(starttime)

subject := fmt.Sprintf(PeriodicSubjectFormat, j.Name, starttime, dur)

msg := fmt.Sprintf(
PeriodicMessageFormat,
j.Name,
j.ID,
starttime,
dur,
)

return sendNotif(ctx, j, j.Status, subject, msg)
}

func ensureNotifRecord(ctx context.Context, vicedb *VICEDatabaser, job Job) error {
analysisRecordExists := vicedb.AnalysisRecordExists(ctx, job.ID)

if !analysisRecordExists {
notifId, err := vicedb.AddNotifRecord(ctx, &job)
if err != nil {
return err
}
log.Debugf("notif_statuses ID inserted: %s", notifId)
}

return nil
}

const maxAttempts = 3

func sendWarning(ctx context.Context, db *sql.DB, vicedb *VICEDatabaser, warningInterval int64, warningKey string) {
Expand All @@ -180,13 +214,9 @@ func sendWarning(ctx context.Context, db *sql.DB, vicedb *VICEDatabaser, warning
updateFailureCount func(context.Context, *Job, int) error
)

analysisRecordExists := vicedb.AnalysisRecordExists(ctx, j.ID)

if !analysisRecordExists {
if _, err = vicedb.AddNotifRecord(ctx, &j); err != nil {
log.Error(err)
continue
}
if err = ensureNotifRecord(ctx, vicedb, j); err != nil {
log.Error(err)
continue
}

notifStatuses, err = vicedb.NotifStatuses(ctx, &j)
Expand Down Expand Up @@ -239,6 +269,72 @@ func sendWarning(ctx context.Context, db *sql.DB, vicedb *VICEDatabaser, warning
}
}

func sendPeriodic(ctx context.Context, db *sql.DB, vicedb *VICEDatabaser) {
// fetch jobs which periodic updates might apply to
jobs, err := JobPeriodicWarnings(ctx, db)

// loop over them and check if they have notif_statuses info
if err != nil {
log.Error(err)
} else {
for _, j := range jobs {
var (
notifStatuses *NotifStatuses
now time.Time
comparisonTimestamp time.Time
periodDuration time.Duration
)

// fetch preferences and update in the DB if needed
if err = ensureNotifRecord(ctx, vicedb, j); err != nil {
log.Error(err)
continue
}

notifStatuses, err = vicedb.NotifStatuses(ctx, &j)
if err != nil {
log.Error(err)
continue
}

periodDuration = 14400 * time.Second
if notifStatuses.PeriodicWarningPeriod > 0 {
periodDuration = time.Duration(notifStatuses.PeriodicWarningPeriod) * time.Second
}

sd, err := time.Parse(TimestampFromDBFormat, j.StartDate)
if err != nil {
log.Error(errors.Wrapf(err, "Error parsing start date %s", j.StartDate))
continue
}
comparisonTimestamp = sd
if notifStatuses.LastPeriodicWarning.After(sd) {
comparisonTimestamp = notifStatuses.LastPeriodicWarning
}

log.Infof("Comparing last-warning timestamp %s with period %s s", comparisonTimestamp, periodDuration)

now = time.Now()

// timeframe is met if: more recent of (last warning, job start date) + periodic warning period is before now
if comparisonTimestamp.Add(periodDuration).Before(now) {
// if so,
err = SendPeriodicNotification(ctx, &j)
if err != nil {
log.Error(errors.Wrap(err, "Error sending periodic notification"))
continue
}
// update timestamp:
err = vicedb.UpdateLastPeriodicWarning(ctx, &j, now)
if err != nil {
log.Error(errors.Wrap(err, "Error updating periodic notification timestamp"))
continue
}
}
}
}
}

func main() {
log.SetReportCaller(true)

Expand Down Expand Up @@ -364,6 +460,9 @@ func main() {
// 1 day warning
sendWarning(ctx, db, vicedb, 1440, oneDayWarningKey)

// periodic warnings
sendPeriodic(ctx, db, vicedb)

jl, err = JobsToKill(ctx, db)
if err != nil {
log.Error(errors.Wrap(err, "error getting list of jobs to kill"))
Expand Down
14 changes: 14 additions & 0 deletions notifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,20 @@ Please finish any work that is in progress. Output files will be transferred to
// to users when their job is going to terminate in the near future.
const WarningSubjectFormat = "Analysis %s will terminate on %s (%s)."

// PeriodicMessageFormat is the parameterized message that gets sent to users
// when it's time to send a regular reminder the job is still running
// parameters: analysis name & ID, start date, duration
const PeriodicMessageFormat = `Analysis "%s" (%s) is still running.
This is a regularly scheduled reminder message to ensure you don't use up your quota.
This job has been running since %s (%s).`

// PeriodicSubjectFormat is the parameterized subject for the email that is sent
// to users as a regular reminder of a running job
// parameters: analysis name, start date, duration
const PeriodicSubjectFormat = `Analysis %s is running since %s (%s)`

// Notification is a message intended as a notification to some upstream service
// or the DE UI.
type Notification struct {
Expand Down
29 changes: 27 additions & 2 deletions vicedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package main
import (
"context"
"database/sql"
"time"

pqinterval "github.com/sanyokbig/pqinterval"
log "github.com/sirupsen/logrus"
)

Expand All @@ -22,6 +24,8 @@ type NotifStatuses struct {
DayWarningFailureCount int
KillWarningSent bool
KillWarningFailureCount int
LastPeriodicWarning time.Time
PeriodicWarningPeriod time.Duration
}

const notifStatusQuery = `
Expand All @@ -32,7 +36,9 @@ const notifStatusQuery = `
day_warning_sent,
day_warning_failure_count,
kill_warning_sent,
kill_warning_failure_count
kill_warning_failure_count,
coalesce(last_periodic_warning, '1970-01-01 00:00:00') as last_periodic_warning,
coalesce(periodic_warning_period, '0 seconds'::interval) as periodic_warning_period
from notif_statuses
where analysis_id = $1
`
Expand All @@ -59,6 +65,8 @@ func (v *VICEDatabaser) NotifStatuses(ctx context.Context, job *Job) (*NotifStat
&notifStatuses.DayWarningFailureCount,
&notifStatuses.KillWarningSent,
&notifStatuses.KillWarningFailureCount,
&notifStatuses.LastPeriodicWarning,
(*pqinterval.Duration)(&notifStatuses.PeriodicWarningPeriod),
); err != nil {
return nil, err
}
Expand Down Expand Up @@ -93,7 +101,7 @@ func (v *VICEDatabaser) AnalysisRecordExists(ctx context.Context, analysisID str
}

const addNotifRecordQuery = `
insert into notif_statuses (analysis_id, external_id) values ($1, $2) returning id
insert into notif_statuses (analysis_id, external_id, periodic_warning_period) values ($1, $2, cast($3 as interval)) returning id
`

// AddNotifRecord adds a new record to the notif_statuses table for the provided analysis.
Expand All @@ -109,6 +117,7 @@ func (v *VICEDatabaser) AddNotifRecord(ctx context.Context, job *Job) (string, e
addNotifRecordQuery,
job.ID,
job.ExternalID,
"4 hours", // hardcoded for now
).Scan(&notifID); err != nil {
return "", err
}
Expand Down Expand Up @@ -290,3 +299,19 @@ func (v *VICEDatabaser) SetKillWarningFailureCount(ctx context.Context, job *Job
)
return err
}

const updateLastPeriodicWarningQuery = `
update notif_statuses set last_periodic_warning = $1 where analysis_id = $2
`

// UpdateLastPeriodicWarning updates the timestamp for a job's last periodic warning
func (v *VICEDatabaser) UpdateLastPeriodicWarning(ctx context.Context, job *Job, ts time.Time) error {
var err error
_, err = v.db.ExecContext(
ctx,
updateLastPeriodicWarningQuery,
ts,
job.ID,
)
return err
}

0 comments on commit 9198033

Please sign in to comment.