From 9c53b938eb50a36ecbe08561bd09d1e4a98941a5 Mon Sep 17 00:00:00 2001 From: Alexander Diana Date: Tue, 12 Dec 2023 19:43:42 +0000 Subject: [PATCH] AddData batching and serialization changes --- data/store/mongo/mongo_datum.go | 73 +------------------------ data/store/store.go | 1 - data/store/test/data_repository.go | 20 +++---- data/summary/store/store.go | 4 +- data/summary/summary.go | 4 +- data/summary/types/bgm.go | 40 +++++++++----- data/summary/types/cgm.go | 40 +++++++++----- data/summary/types/summary.go | 76 ++++++++++++-------------- data/summary/types/summary_cgm_test.go | 5 +- data/summary/types/test/summary.go | 4 +- 10 files changed, 106 insertions(+), 161 deletions(-) diff --git a/data/store/mongo/mongo_datum.go b/data/store/mongo/mongo_datum.go index 5992ebae7..1620b1793 100644 --- a/data/store/mongo/mongo_datum.go +++ b/data/store/mongo/mongo_datum.go @@ -597,6 +597,7 @@ func (d *DatumRepository) GetDataRange(ctx context.Context, userId string, typ s return nil, fmt.Errorf("unexpected type: %v", upload.Type) } + // TODO remove? //switch v := dataRecords.(type) { //case *[]*glucose.Glucose: // if typ != continuous.Type && typ != selfmonitored.Type { @@ -641,6 +642,7 @@ func (d *DatumRepository) GetDataRange(ctx context.Context, userId string, typ s opts := options.Find() opts.SetSort(bson.D{{Key: "time", Value: 1}}) + opts.SetBatchSize(300) cursor, err := d.Find(ctx, selector, opts) if err != nil { @@ -650,77 +652,6 @@ func (d *DatumRepository) GetDataRange(ctx context.Context, userId string, typ s return cursor, nil } -//func (d *DatumRepository) GetModifiedBucketsInRange(ctx context.Context, userId string, typ string, startTime time.Time, endTime time.Time, fromModified time.Time) (modifiedPeriods []types.ModifiedPeriod, err error) { -// if ctx == nil { -// return nil, errors.New("context is missing") -// } -// -// if userId == "" { -// return nil, errors.New("userId is empty") -// } -// -// if typ == "" { -// return nil, errors.New("typ is empty") -// } -// -// // This is never expected to be an upload. -// if isTypeUpload(typ) { -// return nil, fmt.Errorf("unexpected type: %v", upload.Type) -// } -// -// // quit early if range is 0 -// if startTime.Equal(endTime) { -// return nil, nil -// } -// -// // return error if ranges are inverted, as this can produce unexpected results -// if startTime.After(endTime) { -// return nil, fmt.Errorf("startTime (%s) after endTime (%s) for user %s", startTime, endTime, userId) -// } -// -// if fromModified.IsZero() { -// return nil, fmt.Errorf("fromModified is zero") -// } -// -// if fromModified.Before(startTime) { -// return nil, fmt.Errorf("fromModified is before startTime") -// } -// -// pipeline := mongo.Pipeline{ -// bson.D{ -// {"$match", bson.D{ -// {"_active", true}, -// {"_userId", userId}, -// {"type", typ}, -// {"time", bson.D{ -// {"$gt", startTime}, -// {"$lte", endTime}, -// }}, -// {"modifiedTime", bson.D{ -// {"$gt", fromModified}, -// }}, -// }}, -// }, -// bson.D{ -// {"$group", bson.D{ -// {"_id", bson.M{"$dateTrunc": bson.M{"date": "$modifiedTime", "unit": "hour"}}}, -// {"minTime", bson.M{"$min": "$time"}}, -// }}, -// }, -// } -// -// var cursor *mongo.Cursor -// if cursor, err = d.Aggregate(ctx, pipeline); err != nil { -// return nil, fmt.Errorf("unable to get modified %s buckets in date range for user: %w", typ, err) -// } -// -// if err = cursor.All(ctx, modifiedPeriods); err != nil { -// return nil, fmt.Errorf("unable to decode modified ranges, %w", err) -// } -// -// return -//} - func (d *DatumRepository) GetLastUpdatedForUser(ctx context.Context, userId string, typ string, status *types.UserLastUpdated) error { var err error diff --git a/data/store/store.go b/data/store/store.go index bc9fd29ce..ffe6844d0 100644 --- a/data/store/store.go +++ b/data/store/store.go @@ -63,7 +63,6 @@ type DatumRepository interface { GetDataSet(ctx context.Context, id string) (*data.DataSet, error) GetDataRange(ctx context.Context, userId string, typ string, status *types.UserLastUpdated) (*mongo.Cursor, error) - GetModifiedBucketsInRange(ctx context.Context, userId string, typ string, startTime time.Time, endTime time.Time, fromModified time.Time) (modifiedPeriods []types.ModifiedPeriod, err error) GetLastUpdatedForUser(ctx context.Context, userId string, typ string, status *types.UserLastUpdated) error DistinctUserIDs(ctx context.Context, typ string) ([]string, error) diff --git a/data/store/test/data_repository.go b/data/store/test/data_repository.go index 1d312427e..6bbbda44e 100644 --- a/data/store/test/data_repository.go +++ b/data/store/test/data_repository.go @@ -2,6 +2,7 @@ package test import ( "context" + "go.mongodb.org/mongo-driver/mongo" "time" "github.com/tidepool-org/platform/data/summary/types" @@ -148,16 +149,15 @@ type GetLastUpdatedForUserOutput struct { } type GetDataRangeInput struct { - Context context.Context - DataRecords interface{} - ID string - Type string - StartTime time.Time - EndTime time.Time + Context context.Context + UserId string + Typ string + Status *types.UserLastUpdated } type GetDataRangeOutput struct { - Error error + Error error + Cursor *mongo.Cursor } type GetUsersWithBGDataSinceInput struct { @@ -495,16 +495,16 @@ func (d *DataRepository) GetLastUpdatedForUser(ctx context.Context, userId strin return output.UserLastUpdated, output.Error } -func (d *DataRepository) GetDataRange(ctx context.Context, dataRecords interface{}, id string, typ string, startTime time.Time, endTime time.Time) error { +func (d *DataRepository) GetDataRange(ctx context.Context, userId string, typ string, status *types.UserLastUpdated) (*mongo.Cursor, error) { d.GetDataRangeInvocations++ - d.GetDataRangeInputs = append(d.GetDataRangeInputs, GetDataRangeInput{Context: ctx, DataRecords: dataRecords, ID: id, Type: typ, StartTime: startTime, EndTime: endTime}) + d.GetDataRangeInputs = append(d.GetDataRangeInputs, GetDataRangeInput{Context: ctx, UserId: userId, Typ: typ, Status: status}) gomega.Expect(d.GetDataRangeOutputs).ToNot(gomega.BeEmpty()) output := d.GetDataRangeOutputs[0] d.GetDataRangeOutputs = d.GetDataRangeOutputs[1:] - return output.Error + return output.Cursor, output.Error } func (d *DataRepository) GetUsersWithBGDataSince(ctx context.Context, lastUpdated time.Time) ([]string, error) { diff --git a/data/summary/store/store.go b/data/summary/store/store.go index aa958d833..c9413a351 100644 --- a/data/summary/store/store.go +++ b/data/summary/store/store.go @@ -44,7 +44,7 @@ func (r *Repo[T, A]) GetSummary(ctx context.Context, userId string) (*types.Summ return nil, errors.New("userId is missing") } - summary := types.Create[T, A](userId) + summary := types.Create[A](userId) selector := bson.M{ "userId": userId, "type": summary.Type, @@ -201,7 +201,7 @@ func (r *Repo[T, A]) SetOutdated(ctx context.Context, userId, reason string) (*t } if userSummary == nil { - userSummary = types.Create[T, A](userId) + userSummary = types.Create[A](userId) } userSummary.SetOutdated(reason) diff --git a/data/summary/summary.go b/data/summary/summary.go index 7acef50f1..cb40fff3a 100644 --- a/data/summary/summary.go +++ b/data/summary/summary.go @@ -121,7 +121,7 @@ func (c *GlucoseSummarizer[T, A]) BackfillSummaries(ctx context.Context) (int, e summaries := make([]*types.Summary[T, A], 0, len(userIDsReqBackfill)) for _, userID := range userIDsReqBackfill { - s := types.Create[T, A](userID) + s := types.Create[A](userID) s.SetOutdated(types.OutdatedReasonBackfill) summaries = append(summaries, s) @@ -165,7 +165,7 @@ func (c *GlucoseSummarizer[T, A]) UpdateSummary(ctx context.Context, userId stri // user has no usable summary for incremental update if userSummary == nil { - userSummary = types.Create[T, A](userId) + userSummary = types.Create[A](userId) } if userSummary.Config.SchemaVersion != types.SchemaVersion { diff --git a/data/summary/types/bgm.go b/data/summary/types/bgm.go index b13eef0f4..9cf1a4290 100644 --- a/data/summary/types/bgm.go +++ b/data/summary/types/bgm.go @@ -3,6 +3,7 @@ package types import ( "context" "errors" + "fmt" "go.mongodb.org/mongo-driver/mongo" "strconv" "time" @@ -128,21 +129,30 @@ func (s *BGMStats) GetBucketDate(i int) time.Time { return s.Buckets[i].Date } -func (s *BGMStats) Update(ctx context.Context, userData *mongo.Cursor) error { - //userDataTyped, ok := userData.([]*glucoseDatum.Glucose) - //if !ok { - // return errors.New("BGM records for calculation is not compatible with Glucose type") - //} - // - //for i, v := range userDataTyped { - // if v.Type != selfmonitored.Type { - // return fmt.Errorf("Non-BGM data provided for BGM summary update at position %d", i) - // } - //} - - err := AddData[*glucoseDatum.Glucose](ctx, &s.Buckets, userData) - if err != nil { - return err +func (s *BGMStats) Update(ctx context.Context, cursor *mongo.Cursor) error { + var userData []*glucoseDatum.Glucose = nil + var r *glucoseDatum.Glucose + var currentBucket *Bucket[*BGMBucketData, BGMBucketData] + var err error + + for cursor.Next(ctx) { + if userData == nil { + userData = make([]*glucoseDatum.Glucose, 0, cursor.RemainingBatchLength()) + } + + if err = cursor.Decode(r); err != nil { + return fmt.Errorf("unable to decode userData: %w", err) + } + userData = append(userData, r) + + // we call AddData before each network call to the db to reduce thrashing + if cursor.RemainingBatchLength() != 0 { + currentBucket, err = AddData(&s.Buckets, userData, currentBucket) + if err != nil { + return err + } + userData = nil + } } s.CalculateSummary() diff --git a/data/summary/types/cgm.go b/data/summary/types/cgm.go index b22cfa0af..da7c8ecef 100644 --- a/data/summary/types/cgm.go +++ b/data/summary/types/cgm.go @@ -3,6 +3,7 @@ package types import ( "context" "errors" + "fmt" "go.mongodb.org/mongo-driver/mongo" "strconv" "time" @@ -183,21 +184,30 @@ func (s *CGMStats) GetBucketDate(i int) time.Time { return s.Buckets[i].Date } -func (s *CGMStats) Update(ctx context.Context, userData *mongo.Cursor) error { - //userDataTyped, ok := userData.([]*glucoseDatum.Glucose) - //if !ok { - // return errors.New("CGM records for calculation is not compatible with Glucose type") - //} - // - //for i, v := range userDataTyped { - // if v.Type != continuous.Type { - // return fmt.Errorf("Non-CGM data provided for CGM summary update at position %d", i) - // } - //} - - err := AddData[*glucoseDatum.Glucose](ctx, &s.Buckets, userData) - if err != nil { - return err +func (s *CGMStats) Update(ctx context.Context, cursor *mongo.Cursor) error { + var userData []*glucoseDatum.Glucose = nil + var r *glucoseDatum.Glucose + var currentBucket *Bucket[*CGMBucketData, CGMBucketData] + var err error + + for cursor.Next(ctx) { + if userData == nil { + userData = make([]*glucoseDatum.Glucose, 0, cursor.RemainingBatchLength()) + } + + if err = cursor.Decode(r); err != nil { + return fmt.Errorf("unable to decode userData: %w", err) + } + userData = append(userData, r) + + // we call AddData before each network call to the db to reduce thrashing + if cursor.RemainingBatchLength() != 0 { + currentBucket, err = AddData(&s.Buckets, userData, currentBucket) + if err != nil { + return err + } + userData = nil + } } s.CalculateSummary() diff --git a/data/summary/types/summary.go b/data/summary/types/summary.go index fce4b75ca..bd7df6c80 100644 --- a/data/summary/types/summary.go +++ b/data/summary/types/summary.go @@ -195,7 +195,7 @@ func (s *Summary[T, A]) SetOutdated(reason string) { } if reason == OutdatedReasonSchemaMigration { - *s = *Create[T, A](s.UserID) + *s = *Create[A](s.UserID) } s.Dates.OutdatedReason = set.ToSlice() @@ -234,7 +234,7 @@ func NewDates() Dates { } } -func Create[T Stats, A StatsPt[T]](userId string) *Summary[T, A] { +func Create[A StatsPt[T], T Stats](userId string) *Summary[T, A] { s := new(Summary[T, A]) s.UserID = userId s.Stats = new(T) @@ -260,16 +260,15 @@ type Period interface { BGMPeriod | CGMPeriod } -func AddBin[T BucketData, A BucketDataPt[T], S Buckets[T, A]](buckets *S, newStat *Bucket[A, T]) error { - // NOTE This is only partially able to handle editing the past, and will break if given a bucket which - // must be prepended +func AddBin[T BucketData, A BucketDataPt[T], S Buckets[T, A]](buckets *S, newBucket *Bucket[A, T]) error { if len(*buckets) == 0 { - *buckets = append(*buckets, newStat) + *buckets = append(*buckets, newBucket) + return nil } lastBucketPeriod := (*buckets)[len(*buckets)-1].Date firstBucketPeriod := (*buckets)[0].Date - newPeriod := newStat.Date + newPeriod := newBucket.Date statsGap := 0 var gapStart time.Time var gapEnd time.Time @@ -308,7 +307,7 @@ func AddBin[T BucketData, A BucketDataPt[T], S Buckets[T, A]](buckets *S, newSta if !(*buckets)[offset].Date.Equal(newPeriod) { return errors.New("Potentially damaged buckets, offset jump did not find intended record when replacing bucket.") } - (*buckets)[offset] = newStat + (*buckets)[offset] = newBucket return nil } @@ -319,9 +318,9 @@ func AddBin[T BucketData, A BucketDataPt[T], S Buckets[T, A]](buckets *S, newSta } if newPeriod.After(lastBucketPeriod) { - *buckets = append(*buckets, newStat) + *buckets = append(*buckets, newBucket) } else if newPeriod.Before(firstBucketPeriod) { - *buckets = append([]*Bucket[A, T]{newStat}, *buckets...) + *buckets = append(S{newBucket}, *buckets...) } else { return errors.New("eh? bucket not before or after, but not existing?") } @@ -329,15 +328,10 @@ func AddBin[T BucketData, A BucketDataPt[T], S Buckets[T, A]](buckets *S, newSta return nil } -func AddData[D RecordTypesPt[R], A BucketDataPt[T], T BucketData, R RecordTypes](ctx context.Context, buckets *Buckets[T, A], userData *mongo.Cursor) error { - var r D - var newBucket *Bucket[A, T] +func AddData[D RecordTypesPt[R], A BucketDataPt[T], T BucketData, R RecordTypes](buckets *Buckets[T, A], userData []D, newBucket *Bucket[A, T]) (*Bucket[A, T], error) { lastPeriod := time.Time{} - for userData.Next(ctx) { - if err := userData.Decode(r); err != nil { - return errors.New("Unable to decode userData") - } + for _, r := range userData { recordTime := r.GetTime() // truncate time is not timezone/DST safe here, even if we do expect UTC @@ -348,7 +342,7 @@ func AddData[D RecordTypesPt[R], A BucketDataPt[T], T BucketData, R RecordTypes] if !lastPeriod.IsZero() && currentPeriod.After(lastPeriod) { err := AddBin(buckets, newBucket) if err != nil { - return err + return nil, err } newBucket = nil } @@ -369,7 +363,7 @@ func AddData[D RecordTypesPt[R], A BucketDataPt[T], T BucketData, R RecordTypes] newBucket = (*buckets)[len(*buckets)-gap-1] fmt.Println(newBucket.Date, "!=", currentPeriod) if !newBucket.Date.Equal(currentPeriod) { - return errors.New("Potentially damaged buckets, offset jump did not find intended record when adding data.") + return nil, errors.New("Potentially damaged buckets, offset jump did not find intended record when adding data.") } } } @@ -390,7 +384,7 @@ func AddData[D RecordTypesPt[R], A BucketDataPt[T], T BucketData, R RecordTypes] skipped, err := newBucket.Data.CalculateStats(r, &newBucket.LastRecordTime) if err != nil { - return err + return nil, err } if !skipped { newBucket.LastRecordTime = *recordTime @@ -401,32 +395,32 @@ func AddData[D RecordTypesPt[R], A BucketDataPt[T], T BucketData, R RecordTypes] if newBucket != nil { err := AddBin(buckets, newBucket) if err != nil { - return err + return nil, err } } - return nil + return newBucket, nil } -func SetStartTime[T Stats, A StatsPt[T]](userSummary *Summary[T, A], status *UserLastUpdated) { - // remove HoursAgoToKeep/24 days for start time - status.FirstData = status.LastData.AddDate(0, 0, -HoursAgoToKeep/24) - status.LastUpdated = userSummary.Dates.LastUpdatedDate - - //if userSummary.Dates.LastData != nil { - // // if summary already exists with a last data checkpoint, start data pull there - // if startTime.Before(*userSummary.Dates.LastData) { - // startTime = *userSummary.Dates.LastData - // } - // - // // ensure LastData does not move backwards by capping it at summary LastData - // if status.LastData.Before(*userSummary.Dates.LastData) { - // status.LastData = *userSummary.Dates.LastData - // } - //} - // - //return startTime -} +//func SetStartTime[T Stats, A StatsPt[T]](userSummary *Summary[T, A], status *UserLastUpdated) { +// // remove HoursAgoToKeep/24 days for start time +// status.FirstData = status.LastData.AddDate(0, 0, -HoursAgoToKeep/24) +// status.LastUpdated = userSummary.Dates.LastUpdatedDate +// +// //if userSummary.Dates.LastData != nil { +// // // if summary already exists with a last data checkpoint, start data pull there +// // if startTime.Before(*userSummary.Dates.LastData) { +// // startTime = *userSummary.Dates.LastData +// // } +// // +// // // ensure LastData does not move backwards by capping it at summary LastData +// // if status.LastData.Before(*userSummary.Dates.LastData) { +// // status.LastData = *userSummary.Dates.LastData +// // } +// //} +// // +// //return startTime +//} func (d *Dates) Reset() { *d = Dates{ diff --git a/data/summary/types/summary_cgm_test.go b/data/summary/types/summary_cgm_test.go index 2d79c03f1..1425c1891 100644 --- a/data/summary/types/summary_cgm_test.go +++ b/data/summary/types/summary_cgm_test.go @@ -291,7 +291,8 @@ var _ = Describe("CGM Summary", func() { It("Returns correct daily stats for days with different averages", func() { var expectedTotalGlucose float64 var lastRecordTime time.Time - userCGMSummary = types.Create[types.CGMStats, *types.CGMStats](userId) + var currentBucket *types.Bucket[*types.CGMBucketData, types.CGMBucketData] + userCGMSummary = types.Create[*types.CGMStats](userId) // Datasets use +1 and +2 offset to allow for checking via iteration dataSetCGMDataOne := NewDataSetCGMDataAvg(deviceId, datumTime.AddDate(0, 0, -2), 24, inTargetBloodGlucose) @@ -300,7 +301,7 @@ var _ = Describe("CGM Summary", func() { dataSetCGMData = append(dataSetCGMDataOne, dataSetCGMDataTwo...) dataSetCGMData = append(dataSetCGMData, dataSetCGMDataThree...) - err = types.AddData[types.CGMBucketData, *types.CGMBucketData](&userCGMSummary.Stats.Buckets, dataSetCGMData) + currentBucket, err = types.AddData(&userCGMSummary.Stats.Buckets, dataSetCGMData, currentBucket) Expect(err).ToNot(HaveOccurred()) Expect(len(userCGMSummary.Stats.Buckets)).To(Equal(72)) diff --git a/data/summary/types/test/summary.go b/data/summary/types/test/summary.go index 76ed7811d..5ec6e8ee8 100644 --- a/data/summary/types/test/summary.go +++ b/data/summary/types/test/summary.go @@ -39,7 +39,7 @@ func RandomCGMSummary(userId string) *types.Summary[types.CGMStats, *types.CGMSt } for i := 0; i < len(datum.Stats.Buckets); i++ { - datum.Stats.Buckets[i] = &types.Bucket[types.CGMBucketData, *types.CGMBucketData]{ + datum.Stats.Buckets[i] = &types.Bucket[*types.CGMBucketData, types.CGMBucketData]{ Date: test.RandomTime(), LastRecordTime: test.RandomTime(), Data: &types.CGMBucketData{ @@ -279,7 +279,7 @@ func RandomBGMSummary(userId string) *types.Summary[types.BGMStats, *types.BGMSt } for i := 0; i < len(datum.Stats.Buckets); i++ { - datum.Stats.Buckets[i] = &types.Bucket[types.BGMBucketData, *types.BGMBucketData]{ + datum.Stats.Buckets[i] = &types.Bucket[*types.BGMBucketData, types.BGMBucketData]{ Date: test.RandomTime(), LastRecordTime: test.RandomTime(), Data: &types.BGMBucketData{