Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BACK-2638] Store summary buckets in separate collection #706

Closed
wants to merge 36 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
27446b0
partial clinic client handling for summaries
Roukoswarf Feb 28, 2024
4669923
add clinic client to data service
Roukoswarf Feb 28, 2024
25a77b9
initial attempt of realtime user api
Roukoswarf Mar 6, 2024
10589a4
use user token for clinic patient listing, cleanup from review
Roukoswarf Mar 8, 2024
513f1ab
initial unit test modifications
Roukoswarf Mar 11, 2024
95ec7c7
add realtime and deferred record unit tests, fix related bugs, more g…
Roukoswarf Mar 12, 2024
dec7a4c
add unit tests for realtime users report, fix misc bugs
Roukoswarf Mar 14, 2024
b4c2f12
add deferred/realtime records to periods
Roukoswarf Mar 14, 2024
9f9611f
make continuous upload map sparse
Roukoswarf Mar 14, 2024
276db04
clean up some rebase errors and other missing code
Roukoswarf Mar 14, 2024
298ab09
remove redundant IsContinuous function on uploads
Roukoswarf Mar 14, 2024
69d6249
raise summary schema version
Roukoswarf Mar 14, 2024
3af18ee
handle user being schema migrated with no summarizable data
Roukoswarf Mar 25, 2024
994f178
initial split of continuous summaries into own type, incomplete unit …
Roukoswarf Mar 27, 2024
04f1d32
more unit tests, reorg summary store unit tests to separate types
Roukoswarf Mar 28, 2024
9063d9e
finish continuous type split, add more unit tests, move API code
Roukoswarf Apr 2, 2024
97500dc
add continuous type to task service
Roukoswarf Apr 2, 2024
2bcff03
fix build
Roukoswarf Apr 2, 2024
785c3c8
wip addressing review comments
Roukoswarf Apr 4, 2024
08f08b8
properly cast record generic in GetNextData
Roukoswarf Apr 4, 2024
88982fb
formatting
Roukoswarf Apr 4, 2024
d92803f
Simplify summary data fetchers and cursors (#705)
toddkazakov Apr 5, 2024
5b7fa57
fix infinite backfill loop on continuous summaries with non continuou…
Roukoswarf Apr 8, 2024
275b3ef
raise task timeout for backfill
Roukoswarf Apr 8, 2024
1abd563
correct accidental cgm hardcode in backfill task
Roukoswarf Apr 8, 2024
607058e
WIP restructure cgm stats, reduce duplication
Roukoswarf Apr 10, 2024
51d29f4
pass startDate and endDate from GetPatientsWithRealtimeData report re…
Roukoswarf Apr 10, 2024
a12e44a
remove IsService check from realtime report request
Roukoswarf Apr 10, 2024
13dcd66
fix null dependency on SummaryReporter
Roukoswarf Apr 10, 2024
7a3745c
create clinics client if missing
Roukoswarf Apr 10, 2024
3703fa3
handle placeholder continuous summaries without stats
Roukoswarf Apr 11, 2024
ea8e59a
handle patients with no summary/cgm/bgm data
Roukoswarf Apr 11, 2024
3b61a37
Merge remote-tracking branch 'origin/master' into realtimeRecords
Roukoswarf Apr 12, 2024
8bfca42
regenerate clinics mock
Roukoswarf Apr 12, 2024
9e2441a
Merge branch 'realtimeRecords' into alex/buckets-collection
Roukoswarf Apr 12, 2024
6cd8baf
update deps, replace shopify/sarama with ibm/sarama, remove old protobuf
Roukoswarf Apr 12, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
15 changes: 15 additions & 0 deletions clinics/mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 22 additions & 1 deletion clinics/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type Client interface {
SharePatientAccount(ctx context.Context, clinicID, patientID string) (*clinic.Patient, error)
ListEHREnabledClinics(ctx context.Context) ([]clinic.Clinic, error)
SyncEHRData(ctx context.Context, clinicID string) error
GetPatients(ctx context.Context, clinicId string, userToken string, params *clinic.ListPatientsParams) ([]clinic.Patient, error)
}

type config struct {
Expand All @@ -52,7 +53,11 @@ func NewClient(authClient auth.ExternalAccessor) (Client, error) {
return err
}

req.Header.Add(auth.TidepoolSessionTokenHeaderKey, token)
// conditionally set token only if not already present
if req.Header.Get(auth.TidepoolSessionTokenHeaderKey) == "" {
req.Header.Add(auth.TidepoolSessionTokenHeaderKey, token)
}

return nil
})
httpClient, err := clinic.NewClientWithResponses(cfg.ServiceAddress, opts)
Expand Down Expand Up @@ -155,3 +160,19 @@ func (d *defaultClient) getPatient(ctx context.Context, clinicID, patientID stri
}
return response.JSON200, nil
}

func (d *defaultClient) GetPatients(ctx context.Context, clinicId string, userToken string, params *clinic.ListPatientsParams) ([]clinic.Patient, error) {
response, err := d.httpClient.ListPatientsWithResponse(ctx, clinicId, params, func(ctx context.Context, req *http.Request) error {
req.Header.Set(auth.TidepoolSessionTokenHeaderKey, userToken)
return nil
})

if err != nil {
return nil, err
}
if response.StatusCode() != http.StatusOK {
return nil, fmt.Errorf("unexpected response status code %v from %v", response.StatusCode(), response.HTTPResponse.Request.URL)
}

return *response.JSON200.Data, nil
}
66 changes: 54 additions & 12 deletions data/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,12 @@ type Client interface {

DestroyDataForUserByID(ctx context.Context, userID string) error

GetCGMSummary(ctx context.Context, id string) (*types.Summary[types.CGMStats, *types.CGMStats], error)
GetBGMSummary(ctx context.Context, id string) (*types.Summary[types.BGMStats, *types.BGMStats], error)
UpdateCGMSummary(ctx context.Context, id string) (*types.Summary[types.CGMStats, *types.CGMStats], error)
UpdateBGMSummary(ctx context.Context, id string) (*types.Summary[types.BGMStats, *types.BGMStats], error)
GetCGMSummary(ctx context.Context, id string) (*types.Summary[*types.CGMStats, types.CGMStats], error)
GetBGMSummary(ctx context.Context, id string) (*types.Summary[*types.BGMStats, types.BGMStats], error)
GetContinuousSummary(ctx context.Context, id string) (*types.Summary[*types.ContinuousStats, types.ContinuousStats], error)
UpdateCGMSummary(ctx context.Context, id string) (*types.Summary[*types.CGMStats, types.CGMStats], error)
UpdateBGMSummary(ctx context.Context, id string) (*types.Summary[*types.BGMStats, types.BGMStats], error)
UpdateContinuousSummary(ctx context.Context, id string) (*types.Summary[*types.ContinuousStats, types.ContinuousStats], error)
GetOutdatedUserIDs(ctx context.Context, t string, pagination *page.Pagination) (*types.OutdatedSummariesResponse, error)
GetMigratableUserIDs(ctx context.Context, t string, pagination *page.Pagination) ([]string, error)
BackfillSummaries(ctx context.Context, t string) (int, error)
Expand Down Expand Up @@ -129,7 +131,7 @@ func (c *ClientImpl) GetDataSet(ctx context.Context, id string) (*data.DataSet,
return dataSet, nil
}

func (c *ClientImpl) GetCGMSummary(ctx context.Context, userId string) (*types.Summary[types.CGMStats, *types.CGMStats], error) {
func (c *ClientImpl) GetCGMSummary(ctx context.Context, userId string) (*types.Summary[*types.CGMStats, types.CGMStats], error) {
if ctx == nil {
return nil, errors.New("context is missing")
}
Expand All @@ -138,7 +140,7 @@ func (c *ClientImpl) GetCGMSummary(ctx context.Context, userId string) (*types.S
}

url := c.client.ConstructURL("v1", "summaries", "cgm", userId)
summary := &types.Summary[types.CGMStats, *types.CGMStats]{}
summary := &types.Summary[*types.CGMStats, types.CGMStats]{}
if err := c.client.RequestData(ctx, http.MethodGet, url, nil, nil, summary); err != nil {
if request.IsErrorResourceNotFound(err) {
return nil, nil
Expand All @@ -149,7 +151,7 @@ func (c *ClientImpl) GetCGMSummary(ctx context.Context, userId string) (*types.S
return summary, nil
}

func (c *ClientImpl) GetBGMSummary(ctx context.Context, userId string) (*types.Summary[types.BGMStats, *types.BGMStats], error) {
func (c *ClientImpl) GetBGMSummary(ctx context.Context, userId string) (*types.Summary[*types.BGMStats, types.BGMStats], error) {
if ctx == nil {
return nil, errors.New("context is missing")
}
Expand All @@ -158,7 +160,7 @@ func (c *ClientImpl) GetBGMSummary(ctx context.Context, userId string) (*types.S
}

url := c.client.ConstructURL("v1", "summaries", "bgm", userId)
summary := &types.Summary[types.BGMStats, *types.BGMStats]{}
summary := &types.Summary[*types.BGMStats, types.BGMStats]{}
if err := c.client.RequestData(ctx, http.MethodGet, url, nil, nil, summary); err != nil {
if request.IsErrorResourceNotFound(err) {
return nil, nil
Expand All @@ -169,7 +171,27 @@ func (c *ClientImpl) GetBGMSummary(ctx context.Context, userId string) (*types.S
return summary, nil
}

func (c *ClientImpl) UpdateCGMSummary(ctx context.Context, userId string) (*types.Summary[types.CGMStats, *types.CGMStats], error) {
func (c *ClientImpl) GetContinuousSummary(ctx context.Context, userId string) (*types.Summary[*types.ContinuousStats, types.ContinuousStats], error) {
if ctx == nil {
return nil, errors.New("context is missing")
}
if userId == "" {
return nil, errors.New("id is missing")
}

url := c.client.ConstructURL("v1", "summaries", "continuous", userId)
summary := &types.Summary[*types.ContinuousStats, types.ContinuousStats]{}
if err := c.client.RequestData(ctx, http.MethodGet, url, nil, nil, summary); err != nil {
if request.IsErrorResourceNotFound(err) {
return nil, nil
}
return nil, err
}

return summary, nil
}

func (c *ClientImpl) UpdateCGMSummary(ctx context.Context, userId string) (*types.Summary[*types.CGMStats, types.CGMStats], error) {
if ctx == nil {
return nil, errors.New("context is missing")
}
Expand All @@ -178,7 +200,7 @@ func (c *ClientImpl) UpdateCGMSummary(ctx context.Context, userId string) (*type
}

url := c.client.ConstructURL("v1", "summaries", "cgm", userId)
summary := &types.Summary[types.CGMStats, *types.CGMStats]{}
summary := &types.Summary[*types.CGMStats, types.CGMStats]{}
if err := c.client.RequestData(ctx, http.MethodPost, url, nil, nil, summary); err != nil {
if request.IsErrorResourceNotFound(err) {
return nil, nil
Expand All @@ -189,7 +211,7 @@ func (c *ClientImpl) UpdateCGMSummary(ctx context.Context, userId string) (*type
return summary, nil
}

func (c *ClientImpl) UpdateBGMSummary(ctx context.Context, userId string) (*types.Summary[types.BGMStats, *types.BGMStats], error) {
func (c *ClientImpl) UpdateBGMSummary(ctx context.Context, userId string) (*types.Summary[*types.BGMStats, types.BGMStats], error) {
if ctx == nil {
return nil, errors.New("context is missing")
}
Expand All @@ -198,7 +220,27 @@ func (c *ClientImpl) UpdateBGMSummary(ctx context.Context, userId string) (*type
}

url := c.client.ConstructURL("v1", "summaries", "bgm", userId)
summary := &types.Summary[types.BGMStats, *types.BGMStats]{}
summary := &types.Summary[*types.BGMStats, types.BGMStats]{}
if err := c.client.RequestData(ctx, http.MethodPost, url, nil, nil, summary); err != nil {
if request.IsErrorResourceNotFound(err) {
return nil, nil
}
return nil, err
}

return summary, nil
}

func (c *ClientImpl) UpdateContinuousSummary(ctx context.Context, userId string) (*types.Summary[*types.ContinuousStats, types.ContinuousStats], error) {
if ctx == nil {
return nil, errors.New("context is missing")
}
if userId == "" {
return nil, errors.New("id is missing")
}

url := c.client.ConstructURL("v1", "summaries", "continuous", userId)
summary := &types.Summary[*types.ContinuousStats, types.ContinuousStats]{}
if err := c.client.RequestData(ctx, http.MethodPost, url, nil, nil, summary); err != nil {
if request.IsErrorResourceNotFound(err) {
return nil, nil
Expand Down
14 changes: 14 additions & 0 deletions data/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package data
import (
"regexp"
"strconv"
"time"

"github.com/tidepool-org/platform/errors"
"github.com/tidepool-org/platform/id"
Expand Down Expand Up @@ -132,3 +133,16 @@ func ErrorValueStringAsIDNotValid(value string) error {
}

var idExpression = regexp.MustCompile("^[0-9a-z]{32}$") // TODO: Want just "[0-9a-f]{32}" (Jellyfish uses [0-9a-z])

// UserDataStatus is used to track the state of the user's data at the start of a summary calculation
type UserDataStatus struct {
FirstData time.Time
LastData time.Time

EarliestModified time.Time

LastUpload time.Time

LastUpdated time.Time
NextLastUpdated time.Time
}
1 change: 1 addition & 0 deletions data/datum.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type Datum interface {
GetType() string
IsActive() bool
GetTime() *time.Time
GetUploadID() *string

SetUserID(userID *string)
SetDataSetID(dataSetID *string)
Expand Down
6 changes: 5 additions & 1 deletion data/service/api/v1/datasets_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,12 @@ func DataSetsUpdate(dataServiceContext dataService.Context) {
return
}

// create map of all types, this will create redundant summaries, but will be cleaned up upon processing
updatesSummary := make(map[string]struct{})
summary.CheckDataSetUpdatesSummary(ctx, dataServiceContext.DataRepository(), updatesSummary, dataSetID)
for _, typ := range types.DeviceDataTypes {
updatesSummary[types.DeviceDataToSummaryTypes[typ]] = struct{}{}
}

summary.MaybeUpdateSummary(ctx, dataServiceContext.SummarizerRegistry(), updatesSummary, *dataSet.UserID, types.OutdatedReasonUploadCompleted)
}

Expand Down
46 changes: 41 additions & 5 deletions data/service/api/v1/summary.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"net/http"

"github.com/tidepool-org/platform/data/summary/reporters"

dataService "github.com/tidepool-org/platform/data/service"
"github.com/tidepool-org/platform/data/summary"
"github.com/tidepool-org/platform/data/summary/types"
Expand All @@ -19,18 +21,25 @@ func SummaryRoutes() []dataService.Route {
return []dataService.Route{
dataService.Get("/v1/summaries/cgm/:userId", GetSummary[types.CGMStats, *types.CGMStats], api.RequireAuth),
dataService.Get("/v1/summaries/bgm/:userId", GetSummary[types.BGMStats, *types.BGMStats], api.RequireAuth),
dataService.Get("/v1/summaries/continuous/:userId", GetSummary[types.ContinuousStats, *types.ContinuousStats], api.RequireAuth),

dataService.Post("/v1/summaries/cgm/:userId", UpdateSummary[types.CGMStats, *types.CGMStats], api.RequireAuth),
dataService.Post("/v1/summaries/bgm/:userId", UpdateSummary[types.BGMStats, *types.BGMStats], api.RequireAuth),
dataService.Post("/v1/summaries/continuous/:userId", UpdateSummary[types.ContinuousStats, *types.ContinuousStats], api.RequireAuth),

dataService.Post("/v1/summaries/backfill/cgm", BackfillSummaries[types.CGMStats, *types.CGMStats], api.RequireAuth),
dataService.Post("/v1/summaries/backfill/bgm", BackfillSummaries[types.BGMStats, *types.BGMStats], api.RequireAuth),
dataService.Post("/v1/summaries/backfill/continuous", BackfillSummaries[types.ContinuousStats, *types.ContinuousStats], api.RequireAuth),

dataService.Get("/v1/summaries/outdated/cgm", GetOutdatedUserIDs[types.CGMStats, *types.CGMStats], api.RequireAuth),
dataService.Get("/v1/summaries/outdated/bgm", GetOutdatedUserIDs[types.BGMStats, *types.BGMStats], api.RequireAuth),
dataService.Get("/v1/summaries/outdated/continuous", GetOutdatedUserIDs[types.ContinuousStats, *types.ContinuousStats], api.RequireAuth),

dataService.Get("/v1/summaries/migratable/cgm", GetMigratableUserIDs[types.CGMStats, *types.CGMStats], api.RequireAuth),
dataService.Get("/v1/summaries/migratable/bgm", GetMigratableUserIDs[types.BGMStats, *types.BGMStats], api.RequireAuth),
dataService.Get("/v1/summaries/migratable/continuous", GetMigratableUserIDs[types.ContinuousStats, *types.ContinuousStats], api.RequireAuth),

dataService.Get("/v1/clinics/:clinicId/reports/realtime", GetPatientsWithRealtimeData, api.RequireAuth),
}
}

Expand Down Expand Up @@ -68,7 +77,7 @@ func GetSummary[T types.Stats, A types.StatsPt[T]](dataServiceContext dataServic
return
}

summarizer := summary.GetSummarizer[T, A](dataServiceContext.SummarizerRegistry())
summarizer := summary.GetSummarizer[A](dataServiceContext.SummarizerRegistry())
userSummary, err := summarizer.GetSummary(ctx, id)
if err != nil {
responder.Error(http.StatusInternalServerError, err)
Expand All @@ -79,6 +88,33 @@ func GetSummary[T types.Stats, A types.StatsPt[T]](dataServiceContext dataServic
}
}

func GetPatientsWithRealtimeData(dataServiceContext dataService.Context) {
ctx := dataServiceContext.Request().Context()
res := dataServiceContext.Response()
req := dataServiceContext.Request()

responder := request.MustNewResponder(res, req)

clinicId := req.PathParam("clinicId")

filter := reporters.NewPatientRealtimeDaysFilter()
if err := request.DecodeRequestQuery(req.Request, filter); err != nil {
responder.Error(http.StatusBadRequest, err)
return
}

details := request.GetAuthDetails(ctx)

response, err := dataServiceContext.SummaryReporter().GetRealtimeDaysForPatients(
ctx, dataServiceContext.ClinicsClient(), clinicId, details.Token(), *filter.StartTime, *filter.EndTime)
if err != nil {
responder.Error(http.StatusInternalServerError, err)
return
}

responder.Data(http.StatusOK, response)
}

func UpdateSummary[T types.Stats, A types.StatsPt[T]](dataServiceContext dataService.Context) {
ctx := dataServiceContext.Request().Context()
res := dataServiceContext.Response()
Expand All @@ -92,7 +128,7 @@ func UpdateSummary[T types.Stats, A types.StatsPt[T]](dataServiceContext dataSer
return
}

summarizer := summary.GetSummarizer[T, A](dataServiceContext.SummarizerRegistry())
summarizer := summary.GetSummarizer[A](dataServiceContext.SummarizerRegistry())
userSummary, err := summarizer.UpdateSummary(ctx, id)
if err != nil {
responder.Error(http.StatusInternalServerError, err)
Expand All @@ -115,7 +151,7 @@ func BackfillSummaries[T types.Stats, A types.StatsPt[T]](dataServiceContext dat
return
}

summarizer := summary.GetSummarizer[T, A](dataServiceContext.SummarizerRegistry())
summarizer := summary.GetSummarizer[A](dataServiceContext.SummarizerRegistry())
status, err := summarizer.BackfillSummaries(ctx)
if err != nil {
responder.Error(http.StatusInternalServerError, err)
Expand Down Expand Up @@ -143,7 +179,7 @@ func GetOutdatedUserIDs[T types.Stats, A types.StatsPt[T]](dataServiceContext da
return
}

summarizer := summary.GetSummarizer[T, A](dataServiceContext.SummarizerRegistry())
summarizer := summary.GetSummarizer[A](dataServiceContext.SummarizerRegistry())
response, err := summarizer.GetOutdatedUserIDs(ctx, pagination)
if err != nil {
responder.Error(http.StatusInternalServerError, err)
Expand Down Expand Up @@ -171,7 +207,7 @@ func GetMigratableUserIDs[T types.Stats, A types.StatsPt[T]](dataServiceContext
return
}

summarizer := summary.GetSummarizer[T, A](dataServiceContext.SummarizerRegistry())
summarizer := summary.GetSummarizer[A](dataServiceContext.SummarizerRegistry())
userIDs, err := summarizer.GetMigratableUserIDs(ctx, pagination)
if err != nil {
responder.Error(http.StatusInternalServerError, err)
Expand Down
12 changes: 12 additions & 0 deletions data/service/api/v1/users_datasets_create_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ import (
"net/http"
"strings"

"github.com/tidepool-org/platform/data/summary/reporters"

"github.com/tidepool-org/platform/clinics"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"

Expand Down Expand Up @@ -210,6 +214,14 @@ func (c *mockDataServiceContext) DataClient() dataClient.Client {
panic("not implemented") // TODO: Implement
}

func (c *mockDataServiceContext) ClinicsClient() clinics.Client {
panic("not implemented") // TODO: Implement
}

func (c *mockDataServiceContext) DataSourceClient() dataSource.Client {
panic("not implemented") // TODO: Implement
}

func (c *mockDataServiceContext) SummaryReporter() *reporters.PatientRealtimeDaysReporter {
panic("not implemented")
}
Loading