Skip to content

Commit

Permalink
Allow retrying failed sync tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
toddkazakov committed Aug 17, 2023
1 parent 62181cf commit 98c2034
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 25 deletions.
25 changes: 14 additions & 11 deletions ehr/reconcile/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,21 @@ func (r *Runner) Run(ctx context.Context, tsk *task.Task) bool {
now := time.Now()
tsk.ClearError()

r.doRun(ctx, tsk)
tsk.RepeatAvailableAfter(AvailableAfterDurationMinimum + time.Duration(rand.Int63n(int64(AvailableAfterDurationMaximum-AvailableAfterDurationMinimum+1))))

if taskDuration := time.Since(now); taskDuration > TaskDurationMaximum {
r.logger.WithField("taskDuration", taskDuration.Truncate(time.Millisecond).Seconds()).Warn("Task duration exceeds maximum")
}

return true
}

func (r *Runner) doRun(ctx context.Context, tsk *task.Task) {
serverSessionToken, err := r.authClient.ServerSessionToken()
if err != nil {
tsk.AppendError(errors.Wrap(err, "unable to get server session token"))
return true
return
}

ctx = auth.NewContextWithServerSessionToken(ctx, serverSessionToken)
Expand All @@ -67,26 +78,18 @@ func (r *Runner) Run(ctx context.Context, tsk *task.Task) bool {
syncTasks, err := r.getSyncTasks(ctx)
if err != nil {
tsk.AppendError(errors.Wrap(err, "unable to get sync tasks"))
return
}

// Get the list of all EHR enabled clinics
clinicsList, err := r.clinicsClient.ListEHREnabledClinics(ctx)
if err != nil {
tsk.AppendError(errors.Wrap(err, "unable to list clinics"))
return
}

plan := GetReconciliationPlan(syncTasks, clinicsList)
r.reconcileTasks(ctx, tsk, plan)

if !tsk.IsFailed() {
tsk.RepeatAvailableAfter(AvailableAfterDurationMinimum + time.Duration(rand.Int63n(int64(AvailableAfterDurationMaximum-AvailableAfterDurationMinimum+1))))
}

if taskDuration := time.Since(now); taskDuration > TaskDurationMaximum {
r.logger.WithField("taskDuration", taskDuration.Truncate(time.Millisecond).Seconds()).Warn("Task duration exceeds maximum")
}

return true
}

func (r *Runner) getSyncTasks(ctx context.Context) (map[string]task.Task, error) {
Expand Down
40 changes: 26 additions & 14 deletions ehr/sync/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@ import (
)

const (
AvailableAfterDurationMaximum = AvailableAfterDurationMinimum + 1*time.Hour
AvailableAfterDurationMinimum = 14*24*time.Hour - 30*time.Minute
TaskDurationMaximum = 5 * time.Minute
OnSuccessAvailableAfterDurationMaximum = OnSuccessAvailableAfterDurationMinimum + 1*time.Hour
OnSuccessAvailableAfterDurationMinimum = 14*24*time.Hour - 30*time.Minute
OnErrorAvailableAfterDurationMaximum = OnErrorAvailableAfterDurationMinimum + 5*time.Minute
OnErrorAvailableAfterDurationMinimum = 1*time.Hour - 5*time.Minute
TaskDurationMaximum = 5 * time.Minute
)

type Runner struct {
Expand Down Expand Up @@ -45,19 +47,14 @@ func (r *Runner) Run(ctx context.Context, tsk *task.Task) bool {
now := time.Now()
tsk.ClearError()

clinicId, err := GetClinicId(tsk.Data)
if err != nil {
tsk.AppendError(errors.Wrap(err, "unable to get clinicId from task data"))
return true
}

err = r.clinicsClient.SyncEHRData(ctx, clinicId)
if err != nil {
tsk.AppendError(errors.Wrap(err, "unable to sync ehr data"))
}
r.doRun(ctx, tsk)

if !tsk.IsFailed() {
tsk.RepeatAvailableAfter(AvailableAfterDurationMinimum + time.Duration(rand.Int63n(int64(AvailableAfterDurationMaximum-AvailableAfterDurationMinimum+1))))
if tsk.HasError() {
tsk.RepeatAvailableAfter(OnErrorAvailableAfterDurationMinimum + time.Duration(rand.Int63n(int64(OnErrorAvailableAfterDurationMaximum-OnErrorAvailableAfterDurationMinimum+1))))
} else {
tsk.RepeatAvailableAfter(OnSuccessAvailableAfterDurationMinimum + time.Duration(rand.Int63n(int64(OnSuccessAvailableAfterDurationMaximum-OnSuccessAvailableAfterDurationMinimum+1))))
}
}

if taskDuration := time.Since(now); taskDuration > TaskDurationMaximum {
Expand All @@ -66,3 +63,18 @@ func (r *Runner) Run(ctx context.Context, tsk *task.Task) bool {

return true
}

func (r *Runner) doRun(ctx context.Context, tsk *task.Task) {
clinicId, err := GetClinicId(tsk.Data)
if err != nil {
tsk.AppendError(errors.Wrap(err, "unable to get clinicId from task data"))
// Unrecoverable condition, move the task to failed state so it won't be retried
tsk.SetFailed()
return
}

err = r.clinicsClient.SyncEHRData(ctx, clinicId)
if err != nil {
tsk.AppendError(errors.Wrap(err, "unable to sync ehr data"))
}
}

0 comments on commit 98c2034

Please sign in to comment.