Skip to content

Commit

Permalink
Add EHR data sync tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
toddkazakov committed Aug 15, 2023
1 parent cf3755b commit ed5c76b
Show file tree
Hide file tree
Showing 19 changed files with 8,045 additions and 1,745 deletions.
29 changes: 29 additions & 0 deletions clinics/mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

47 changes: 47 additions & 0 deletions clinics/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"fmt"
"net/http"

"github.com/tidepool-org/platform/pointer"

"github.com/kelseyhightower/envconfig"
clinic "github.com/tidepool-org/clinic/client"
"go.uber.org/fx"
Expand All @@ -22,6 +24,8 @@ var ClientModule = fx.Provide(NewClient)
type Client interface {
GetClinician(ctx context.Context, clinicID, clinicianID string) (*clinic.Clinician, error)
SharePatientAccount(ctx context.Context, clinicID, patientID string) (*clinic.Patient, error)
ListEHREnabledClinics(ctx context.Context) ([]clinic.Clinic, error)
SyncEHRData(ctx context.Context, clinicID string) error
}

type config struct {
Expand Down Expand Up @@ -76,6 +80,38 @@ func (d *defaultClient) GetClinician(ctx context.Context, clinicID, clinicianID
return response.JSON200, nil
}

func (d *defaultClient) ListEHREnabledClinics(ctx context.Context) ([]clinic.Clinic, error) {
offset := 0
batchSize := 1000

clinics := make([]clinic.Clinic, 0)
for {
response, err := d.httpClient.ListClinicsWithResponse(ctx, &clinic.ListClinicsParams{
EhrEnabled: pointer.FromBool(true),
Offset: &offset,
Limit: &batchSize,
})
if err != nil {
return nil, err
}
if response.StatusCode() != http.StatusOK {
return nil, fmt.Errorf("unexpected response status code %v from %v", response.StatusCode(), response.HTTPResponse.Request.URL)
}
if response.JSON200 == nil {
break
}

clinics = append(clinics, *response.JSON200...)
offset = offset + batchSize

if len(*response.JSON200) < batchSize {
break
}
}

return clinics, nil
}

func (d *defaultClient) SharePatientAccount(ctx context.Context, clinicID, patientID string) (*clinic.Patient, error) {
permission := make(map[string]interface{}, 0)
body := clinic.CreatePatientFromUserJSONRequestBody{
Expand All @@ -98,6 +134,17 @@ func (d *defaultClient) SharePatientAccount(ctx context.Context, clinicID, patie
return response.JSON200, nil
}

func (d *defaultClient) SyncEHRData(ctx context.Context, clinicID string) error {
response, err := d.httpClient.SyncEHRDataWithResponse(ctx, clinicID)
if err != nil {
return err
}
if response.StatusCode() != http.StatusAccepted {
return fmt.Errorf("unexpected response status code %v from %v", response.StatusCode(), response.HTTPResponse.Request.URL)
}
return nil
}

func (d *defaultClient) getPatient(ctx context.Context, clinicID, patientID string) (*clinic.Patient, error) {
response, err := d.httpClient.GetPatientWithResponse(ctx, clinic.ClinicId(clinicID), clinic.PatientId(patientID))
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion dexcom/fetch/fetch_suite_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package fetch_test
package reconcile_test

import (
"testing"
Expand Down
11 changes: 11 additions & 0 deletions ehr/reconcile/reconcile_suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package reconcile_test

import (
"testing"

"github.com/tidepool-org/platform/test"
)

func TestSuite(t *testing.T) {
test.Test(t)
}
167 changes: 167 additions & 0 deletions ehr/reconcile/runner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
package reconcile

import (
"context"
"math/rand"
"time"

api "github.com/tidepool-org/clinic/client"

"github.com/tidepool-org/platform/auth"
"github.com/tidepool-org/platform/clinics"
"github.com/tidepool-org/platform/ehr/sync"
"github.com/tidepool-org/platform/errors"
"github.com/tidepool-org/platform/log"
"github.com/tidepool-org/platform/page"
"github.com/tidepool-org/platform/pointer"
"github.com/tidepool-org/platform/task"
)

const (
AvailableAfterDurationMaximum = AvailableAfterDurationMinimum + 1*time.Hour
AvailableAfterDurationMinimum = 14*24*time.Hour - 30*time.Minute
TaskDurationMaximum = 5 * time.Minute
)

type Runner struct {
authClient auth.Client
clinicsClient clinics.Client
taskClient task.Client
logger log.Logger
}

func NewRunner(authClient auth.Client, clinicsClient clinics.Client, taskClient task.Client, logger log.Logger) (*Runner, error) {
return &Runner{
authClient: authClient,
clinicsClient: clinicsClient,
taskClient: taskClient,
logger: logger,
}, nil
}

func (r *Runner) GetRunnerType() string {
return Type
}

func (r *Runner) GetRunnerDeadline() time.Time {
return time.Now().Add(TaskDurationMaximum * 3)
}

func (r *Runner) GetRunnerMaximumDuration() time.Duration {
return TaskDurationMaximum
}

func (r *Runner) Run(ctx context.Context, tsk *task.Task) bool {
now := time.Now()
tsk.ClearError()

serverSessionToken, err := r.authClient.ServerSessionToken()
if err != nil {
tsk.AppendError(errors.Wrap(err, "unable to get server session token"))
}

ctx = auth.NewContextWithServerSessionToken(ctx, serverSessionToken)

// Get the list of all existing EHR sync tasks
syncTasks, err := r.getSyncTasks(ctx)
if err != nil {
tsk.AppendError(errors.Wrap(err, "unable to get sync tasks"))
}

// Get the list of all EHR enabled clinics
clinicsList, err := r.clinicsClient.ListEHREnabledClinics(ctx)
if err != nil {
tsk.AppendError(errors.Wrap(err, "unable to list clinics"))
}

plan := GetReconciliationPlan(syncTasks, clinicsList)
r.reconcileTasks(ctx, tsk, plan)

if !tsk.IsFailed() {
tsk.RepeatAvailableAfter(AvailableAfterDurationMinimum + time.Duration(rand.Int63n(int64(AvailableAfterDurationMaximum-AvailableAfterDurationMinimum+1))))
}

if taskDuration := time.Since(now); taskDuration > TaskDurationMaximum {
r.logger.WithField("taskDuration", taskDuration.Truncate(time.Millisecond).Seconds()).Warn("Task duration exceeds maximum")
}

return true
}

func (r *Runner) getSyncTasks(ctx context.Context) (map[string]task.Task, error) {
filter := task.TaskFilter{
Type: pointer.FromString(sync.Type),
}
pagination := page.Pagination{
Page: 0,
Size: 1000,
}

tasksByClinicId := map[string]task.Task{}
for {
tasks, err := r.taskClient.ListTasks(ctx, &filter, &pagination)
if err != nil {
return nil, errors.Wrap(err, "unable to list tasks")
}

for _, tsk := range tasks {
clinicId, err := sync.GetClinicId(tsk.Data)
if err != nil {
r.logger.Errorf("unable to get clinicId from task data (taskId %v): %v", tsk.ID, err)
continue
}
tasksByClinicId[clinicId] = *tsk
}
if len(tasks) < pagination.Size {
break
} else {
pagination.Page++
}
}

return tasksByClinicId, nil
}

func (r *Runner) reconcileTasks(ctx context.Context, task *task.Task, plan ReconciliationPlan) {
for _, t := range plan.ToDelete {
if err := r.taskClient.DeleteTask(ctx, t.ID); err != nil {
task.AppendError(errors.Wrap(err, "unable to delete task"))
}
}
for _, t := range plan.ToCreate {
if _, err := r.taskClient.CreateTask(ctx, &t); err != nil {
task.AppendError(errors.Wrap(err, "unable to create task"))
}
}
}

type ReconciliationPlan struct {
ToCreate []task.TaskCreate
ToDelete []task.Task
}

func GetReconciliationPlan(syncTasks map[string]task.Task, clinics []api.Clinic) ReconciliationPlan {
toDelete := make([]task.Task, 0)
toCreate := make([]task.TaskCreate, 0)

// At the end of the loop syncTasks will contain only the tasks that need to be deleted,
// and toCreate will contain tasks for new clinics that need to be synced.
for _, clinic := range clinics {
clinicId := *clinic.Id

_, exists := syncTasks[clinicId]
if exists {
delete(syncTasks, clinicId)
} else {
create := sync.NewTaskCreate(clinicId)
toCreate = append(toCreate, *create)
}
}
for _, tsk := range syncTasks {
toDelete = append(toDelete, tsk)
}
return ReconciliationPlan{
ToCreate: toCreate,
ToDelete: toDelete,
}
}
21 changes: 21 additions & 0 deletions ehr/reconcile/runner_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package reconcile_test

import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
. "github.com/onsi/gomega/gstruct"
"github.com/tidepool-org/platform/ehr/reconcile"
"time"
)

var _ = Describe("Task", func() {
Describe("NewTaskCreate", func() {
It("returns a task create", func() {
create := reconcile.NewTaskCreate()
Expect(create).ToNot(BeNil())
Expect(create.Name).To(PointTo(Equal(reconcile.Type)))
Expect(create.Type).To(Equal(reconcile.Type))
Expect(create.AvailableTime).To(PointTo(BeTemporally("~", time.Now(), 3*time.Second)))
})
})
})
20 changes: 20 additions & 0 deletions ehr/reconcile/task.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package reconcile

import (
"time"

"github.com/tidepool-org/platform/pointer"
"github.com/tidepool-org/platform/task"
)

const (
Type = "org.tidepool.ehr.reconcile"
)

func NewTaskCreate() *task.TaskCreate {
return &task.TaskCreate{
Name: pointer.FromString(Type),
Type: Type,
AvailableTime: pointer.FromAny(time.Now().UTC()),
}
}
21 changes: 21 additions & 0 deletions ehr/reconcile/task_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package reconcile_test

import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
. "github.com/onsi/gomega/gstruct"
"github.com/tidepool-org/platform/ehr/reconcile"
"time"
)

var _ = Describe("Task", func() {
Describe("NewTaskCreate", func() {
It("returns a task create", func() {
create := reconcile.NewTaskCreate()
Expect(create).ToNot(BeNil())
Expect(create.Name).To(PointTo(Equal(reconcile.Type)))
Expect(create.Type).To(Equal(reconcile.Type))
Expect(create.AvailableTime).To(PointTo(BeTemporally("~", time.Now(), 3*time.Second)))
})
})
})
Loading

0 comments on commit ed5c76b

Please sign in to comment.