Skip to content

Commit

Permalink
Add EHR data sync tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
toddkazakov committed Aug 15, 2023
1 parent cf3755b commit 6aee6ca
Show file tree
Hide file tree
Showing 20 changed files with 8,131 additions and 1,744 deletions.
29 changes: 29 additions & 0 deletions clinics/mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

47 changes: 47 additions & 0 deletions clinics/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"fmt"
"net/http"

"github.com/tidepool-org/platform/pointer"

"github.com/kelseyhightower/envconfig"
clinic "github.com/tidepool-org/clinic/client"
"go.uber.org/fx"
Expand All @@ -22,6 +24,8 @@ var ClientModule = fx.Provide(NewClient)
type Client interface {
GetClinician(ctx context.Context, clinicID, clinicianID string) (*clinic.Clinician, error)
SharePatientAccount(ctx context.Context, clinicID, patientID string) (*clinic.Patient, error)
ListEHREnabledClinics(ctx context.Context) ([]clinic.Clinic, error)
SyncEHRData(ctx context.Context, clinicID string) error
}

type config struct {
Expand Down Expand Up @@ -76,6 +80,38 @@ func (d *defaultClient) GetClinician(ctx context.Context, clinicID, clinicianID
return response.JSON200, nil
}

func (d *defaultClient) ListEHREnabledClinics(ctx context.Context) ([]clinic.Clinic, error) {
offset := 0
batchSize := 1000

clinics := make([]clinic.Clinic, 0)
for {
response, err := d.httpClient.ListClinicsWithResponse(ctx, &clinic.ListClinicsParams{
EhrEnabled: pointer.FromBool(true),
Offset: &offset,
Limit: &batchSize,
})
if err != nil {
return nil, err
}
if response.StatusCode() != http.StatusOK {
return nil, fmt.Errorf("unexpected response status code %v from %v", response.StatusCode(), response.HTTPResponse.Request.URL)
}
if response.JSON200 == nil {
break
}

clinics = append(clinics, *response.JSON200...)
offset = offset + batchSize

if len(*response.JSON200) < batchSize {
break
}
}

return clinics, nil
}

func (d *defaultClient) SharePatientAccount(ctx context.Context, clinicID, patientID string) (*clinic.Patient, error) {
permission := make(map[string]interface{}, 0)
body := clinic.CreatePatientFromUserJSONRequestBody{
Expand All @@ -98,6 +134,17 @@ func (d *defaultClient) SharePatientAccount(ctx context.Context, clinicID, patie
return response.JSON200, nil
}

func (d *defaultClient) SyncEHRData(ctx context.Context, clinicID string) error {
response, err := d.httpClient.SyncEHRDataWithResponse(ctx, clinicID)
if err != nil {
return err
}
if response.StatusCode() != http.StatusAccepted {
return fmt.Errorf("unexpected response status code %v from %v", response.StatusCode(), response.HTTPResponse.Request.URL)
}
return nil
}

func (d *defaultClient) getPatient(ctx context.Context, clinicID, patientID string) (*clinic.Patient, error) {
response, err := d.httpClient.GetPatientWithResponse(ctx, clinic.ClinicId(clinicID), clinic.PatientId(patientID))
if err != nil {
Expand Down
32 changes: 32 additions & 0 deletions clinics/test/clinics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package test

import (
api "github.com/tidepool-org/clinic/client"
"go.mongodb.org/mongo-driver/bson/primitive"
"syreclabs.com/go/faker"

"github.com/tidepool-org/platform/pointer"
"github.com/tidepool-org/platform/test"
)

func NewRandomClinic() api.Clinic {
return api.Clinic{
Address: pointer.FromAny(faker.Address().StreetAddress()),
CanMigrate: pointer.FromAny(test.RandomBool()),
City: pointer.FromAny(faker.Address().City()),
ClinicType: pointer.FromAny(test.RandomChoice([]api.ClinicClinicType{api.HealthcareSystem, api.VeterinaryClinic, api.Other})),
Country: pointer.FromAny(faker.Address().Country()),
CreatedTime: pointer.FromAny(test.RandomTimeFromRange(test.RandomTimeMinimum(), test.RandomTimeMaximum())),
Id: pointer.FromAny(primitive.NewObjectID().Hex()),
Name: faker.Company().Name(),
PhoneNumbers: pointer.FromAny([]api.PhoneNumber{{Number: faker.PhoneNumber().PhoneNumber()}}),
PostalCode: pointer.FromAny(faker.Address().ZipCode()),
PreferredBgUnits: test.RandomChoice([]api.ClinicPreferredBgUnits{api.ClinicPreferredBgUnitsMgdL, api.ClinicPreferredBgUnitsMmolL}),
ShareCode: pointer.FromAny(faker.RandomString(15)),
State: pointer.FromAny(faker.Address().State()),
Tier: pointer.FromAny(test.RandomChoice([]string{"tier1000", "tier2000"})),
TierDescription: pointer.FromAny(faker.Lorem().Sentence(5)),
UpdatedTime: pointer.FromAny(test.RandomTimeFromRange(test.RandomTimeMinimum(), test.RandomTimeMaximum())),
Website: pointer.FromAny(faker.Internet().Url()),
}
}
11 changes: 11 additions & 0 deletions ehr/reconcile/reconcile_suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package reconcile_test

import (
"testing"

"github.com/tidepool-org/platform/test"
)

func TestSuite(t *testing.T) {
test.Test(t)
}
167 changes: 167 additions & 0 deletions ehr/reconcile/runner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
package reconcile

import (
"context"
"math/rand"
"time"

api "github.com/tidepool-org/clinic/client"

"github.com/tidepool-org/platform/auth"
"github.com/tidepool-org/platform/clinics"
"github.com/tidepool-org/platform/ehr/sync"
"github.com/tidepool-org/platform/errors"
"github.com/tidepool-org/platform/log"
"github.com/tidepool-org/platform/page"
"github.com/tidepool-org/platform/pointer"
"github.com/tidepool-org/platform/task"
)

const (
AvailableAfterDurationMaximum = AvailableAfterDurationMinimum + 1*time.Hour
AvailableAfterDurationMinimum = 14*24*time.Hour - 30*time.Minute
TaskDurationMaximum = 5 * time.Minute
)

type Runner struct {
authClient auth.Client
clinicsClient clinics.Client
taskClient task.Client
logger log.Logger
}

func NewRunner(authClient auth.Client, clinicsClient clinics.Client, taskClient task.Client, logger log.Logger) (*Runner, error) {
return &Runner{
authClient: authClient,
clinicsClient: clinicsClient,
taskClient: taskClient,
logger: logger,
}, nil
}

func (r *Runner) GetRunnerType() string {
return Type
}

func (r *Runner) GetRunnerDeadline() time.Time {
return time.Now().Add(TaskDurationMaximum * 3)
}

func (r *Runner) GetRunnerMaximumDuration() time.Duration {
return TaskDurationMaximum
}

func (r *Runner) Run(ctx context.Context, tsk *task.Task) bool {
now := time.Now()
tsk.ClearError()

serverSessionToken, err := r.authClient.ServerSessionToken()
if err != nil {
tsk.AppendError(errors.Wrap(err, "unable to get server session token"))
}

ctx = auth.NewContextWithServerSessionToken(ctx, serverSessionToken)

// Get the list of all existing EHR sync tasks
syncTasks, err := r.getSyncTasks(ctx)
if err != nil {
tsk.AppendError(errors.Wrap(err, "unable to get sync tasks"))
}

// Get the list of all EHR enabled clinics
clinicsList, err := r.clinicsClient.ListEHREnabledClinics(ctx)
if err != nil {
tsk.AppendError(errors.Wrap(err, "unable to list clinics"))
}

plan := GetReconciliationPlan(syncTasks, clinicsList)
r.reconcileTasks(ctx, tsk, plan)

if !tsk.IsFailed() {
tsk.RepeatAvailableAfter(AvailableAfterDurationMinimum + time.Duration(rand.Int63n(int64(AvailableAfterDurationMaximum-AvailableAfterDurationMinimum+1))))
}

if taskDuration := time.Since(now); taskDuration > TaskDurationMaximum {
r.logger.WithField("taskDuration", taskDuration.Truncate(time.Millisecond).Seconds()).Warn("Task duration exceeds maximum")
}

return true
}

func (r *Runner) getSyncTasks(ctx context.Context) (map[string]task.Task, error) {
filter := task.TaskFilter{
Type: pointer.FromString(sync.Type),
}
pagination := page.Pagination{
Page: 0,
Size: 1000,
}

tasksByClinicId := map[string]task.Task{}
for {
tasks, err := r.taskClient.ListTasks(ctx, &filter, &pagination)
if err != nil {
return nil, errors.Wrap(err, "unable to list tasks")
}

for _, tsk := range tasks {
clinicId, err := sync.GetClinicId(tsk.Data)
if err != nil {
r.logger.Errorf("unable to get clinicId from task data (taskId %v): %v", tsk.ID, err)
continue
}
tasksByClinicId[clinicId] = *tsk
}
if len(tasks) < pagination.Size {
break
} else {
pagination.Page++
}
}

return tasksByClinicId, nil
}

func (r *Runner) reconcileTasks(ctx context.Context, task *task.Task, plan ReconciliationPlan) {
for _, t := range plan.ToDelete {
if err := r.taskClient.DeleteTask(ctx, t.ID); err != nil {
task.AppendError(errors.Wrap(err, "unable to delete task"))
}
}
for _, t := range plan.ToCreate {
if _, err := r.taskClient.CreateTask(ctx, &t); err != nil {
task.AppendError(errors.Wrap(err, "unable to create task"))
}
}
}

type ReconciliationPlan struct {
ToCreate []task.TaskCreate
ToDelete []task.Task
}

func GetReconciliationPlan(syncTasks map[string]task.Task, clinics []api.Clinic) ReconciliationPlan {
toDelete := make([]task.Task, 0)
toCreate := make([]task.TaskCreate, 0)

// At the end of the loop syncTasks will contain only the tasks that need to be deleted,
// and toCreate will contain tasks for new clinics that need to be synced.
for _, clinic := range clinics {
clinicId := *clinic.Id

_, exists := syncTasks[clinicId]
if exists {
delete(syncTasks, clinicId)
} else {
create := sync.NewTaskCreate(clinicId)
toCreate = append(toCreate, *create)
}
}
for _, tsk := range syncTasks {
toDelete = append(toDelete, tsk)
}
return ReconciliationPlan{
ToCreate: toCreate,
ToDelete: toDelete,
}
}
59 changes: 59 additions & 0 deletions ehr/reconcile/runner_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package reconcile_test

import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
. "github.com/onsi/gomega/gstruct"
api "github.com/tidepool-org/clinic/client"

clinicsTest "github.com/tidepool-org/platform/clinics/test"
"github.com/tidepool-org/platform/ehr/reconcile"
"github.com/tidepool-org/platform/ehr/sync"
"github.com/tidepool-org/platform/task"
"github.com/tidepool-org/platform/test"
)

var _ = Describe("Runner", func() {
Describe("GetReconciliationPlan", func() {
var clinics []api.Clinic
var tasks map[string]task.Task

BeforeEach(func() {
clinics = test.RandomArrayWithLength(3, clinicsTest.NewRandomClinic)
tasks = make(map[string]task.Task)
for _, clinic := range clinics {
tsk, err := task.NewTask(sync.NewTaskCreate(*clinic.Id))
Expect(err).ToNot(HaveOccurred())
Expect(tsk).ToNot(BeNil())
tasks[*clinic.Id] = *tsk
}
})

It("returns an empty plan when each clinic has a corresponding task", func() {
plan := reconcile.GetReconciliationPlan(tasks, clinics)
Expect(plan).ToNot(BeNil())
Expect(plan.ToCreate).To(BeEmpty())
Expect(plan.ToDelete).To(BeEmpty())
})

It("returns a clinic creation task when a task for the clinic doesn't exist", func() {
delete(tasks, *clinics[0].Id)
plan := reconcile.GetReconciliationPlan(tasks, clinics)
Expect(plan).ToNot(BeNil())
Expect(plan.ToCreate).To(HaveLen(1))
Expect(plan.ToCreate[0].Name).To(PointTo(Equal(sync.TaskName(*clinics[0].Id))))
Expect(plan.ToDelete).To(BeEmpty())
})

It("returns a multiple clinic creation tasks when multiple clinics don't exist", func() {
delete(tasks, *clinics[0].Id)
delete(tasks, *clinics[1].Id)
plan := reconcile.GetReconciliationPlan(tasks, clinics)
Expect(plan).ToNot(BeNil())
Expect(plan.ToCreate).To(HaveLen(2))
Expect(plan.ToCreate[0].Name).To(PointTo(Equal(sync.TaskName(*clinics[0].Id))))
Expect(plan.ToCreate[1].Name).To(PointTo(Equal(sync.TaskName(*clinics[1].Id))))
Expect(plan.ToDelete).To(BeEmpty())
})
})
})
Loading

0 comments on commit 6aee6ca

Please sign in to comment.