Skip to content

Commit

Permalink
AddData batching and serialization changes
Browse files Browse the repository at this point in the history
  • Loading branch information
Roukoswarf committed Dec 12, 2023
1 parent 6c557d4 commit 9c53b93
Show file tree
Hide file tree
Showing 10 changed files with 106 additions and 161 deletions.
73 changes: 2 additions & 71 deletions data/store/mongo/mongo_datum.go
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,7 @@ func (d *DatumRepository) GetDataRange(ctx context.Context, userId string, typ s
return nil, fmt.Errorf("unexpected type: %v", upload.Type)
}

// TODO remove?
//switch v := dataRecords.(type) {
//case *[]*glucose.Glucose:
// if typ != continuous.Type && typ != selfmonitored.Type {
Expand Down Expand Up @@ -641,6 +642,7 @@ func (d *DatumRepository) GetDataRange(ctx context.Context, userId string, typ s

opts := options.Find()
opts.SetSort(bson.D{{Key: "time", Value: 1}})
opts.SetBatchSize(300)

cursor, err := d.Find(ctx, selector, opts)
if err != nil {
Expand All @@ -650,77 +652,6 @@ func (d *DatumRepository) GetDataRange(ctx context.Context, userId string, typ s
return cursor, nil
}

//func (d *DatumRepository) GetModifiedBucketsInRange(ctx context.Context, userId string, typ string, startTime time.Time, endTime time.Time, fromModified time.Time) (modifiedPeriods []types.ModifiedPeriod, err error) {
// if ctx == nil {
// return nil, errors.New("context is missing")
// }
//
// if userId == "" {
// return nil, errors.New("userId is empty")
// }
//
// if typ == "" {
// return nil, errors.New("typ is empty")
// }
//
// // This is never expected to be an upload.
// if isTypeUpload(typ) {
// return nil, fmt.Errorf("unexpected type: %v", upload.Type)
// }
//
// // quit early if range is 0
// if startTime.Equal(endTime) {
// return nil, nil
// }
//
// // return error if ranges are inverted, as this can produce unexpected results
// if startTime.After(endTime) {
// return nil, fmt.Errorf("startTime (%s) after endTime (%s) for user %s", startTime, endTime, userId)
// }
//
// if fromModified.IsZero() {
// return nil, fmt.Errorf("fromModified is zero")
// }
//
// if fromModified.Before(startTime) {
// return nil, fmt.Errorf("fromModified is before startTime")
// }
//
// pipeline := mongo.Pipeline{
// bson.D{
// {"$match", bson.D{
// {"_active", true},
// {"_userId", userId},
// {"type", typ},
// {"time", bson.D{
// {"$gt", startTime},
// {"$lte", endTime},
// }},
// {"modifiedTime", bson.D{
// {"$gt", fromModified},
// }},
// }},
// },
// bson.D{
// {"$group", bson.D{
// {"_id", bson.M{"$dateTrunc": bson.M{"date": "$modifiedTime", "unit": "hour"}}},
// {"minTime", bson.M{"$min": "$time"}},
// }},
// },
// }
//
// var cursor *mongo.Cursor
// if cursor, err = d.Aggregate(ctx, pipeline); err != nil {
// return nil, fmt.Errorf("unable to get modified %s buckets in date range for user: %w", typ, err)
// }
//
// if err = cursor.All(ctx, modifiedPeriods); err != nil {
// return nil, fmt.Errorf("unable to decode modified ranges, %w", err)
// }
//
// return
//}

func (d *DatumRepository) GetLastUpdatedForUser(ctx context.Context, userId string, typ string, status *types.UserLastUpdated) error {
var err error

Expand Down
1 change: 0 additions & 1 deletion data/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ type DatumRepository interface {
GetDataSet(ctx context.Context, id string) (*data.DataSet, error)

GetDataRange(ctx context.Context, userId string, typ string, status *types.UserLastUpdated) (*mongo.Cursor, error)
GetModifiedBucketsInRange(ctx context.Context, userId string, typ string, startTime time.Time, endTime time.Time, fromModified time.Time) (modifiedPeriods []types.ModifiedPeriod, err error)
GetLastUpdatedForUser(ctx context.Context, userId string, typ string, status *types.UserLastUpdated) error
DistinctUserIDs(ctx context.Context, typ string) ([]string, error)

Expand Down
20 changes: 10 additions & 10 deletions data/store/test/data_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package test

import (
"context"
"go.mongodb.org/mongo-driver/mongo"
"time"

"github.com/tidepool-org/platform/data/summary/types"
Expand Down Expand Up @@ -148,16 +149,15 @@ type GetLastUpdatedForUserOutput struct {
}

type GetDataRangeInput struct {
Context context.Context
DataRecords interface{}
ID string
Type string
StartTime time.Time
EndTime time.Time
Context context.Context
UserId string
Typ string
Status *types.UserLastUpdated
}

type GetDataRangeOutput struct {
Error error
Error error
Cursor *mongo.Cursor
}

type GetUsersWithBGDataSinceInput struct {
Expand Down Expand Up @@ -495,16 +495,16 @@ func (d *DataRepository) GetLastUpdatedForUser(ctx context.Context, userId strin
return output.UserLastUpdated, output.Error
}

func (d *DataRepository) GetDataRange(ctx context.Context, dataRecords interface{}, id string, typ string, startTime time.Time, endTime time.Time) error {
func (d *DataRepository) GetDataRange(ctx context.Context, userId string, typ string, status *types.UserLastUpdated) (*mongo.Cursor, error) {
d.GetDataRangeInvocations++

d.GetDataRangeInputs = append(d.GetDataRangeInputs, GetDataRangeInput{Context: ctx, DataRecords: dataRecords, ID: id, Type: typ, StartTime: startTime, EndTime: endTime})
d.GetDataRangeInputs = append(d.GetDataRangeInputs, GetDataRangeInput{Context: ctx, UserId: userId, Typ: typ, Status: status})

gomega.Expect(d.GetDataRangeOutputs).ToNot(gomega.BeEmpty())

output := d.GetDataRangeOutputs[0]
d.GetDataRangeOutputs = d.GetDataRangeOutputs[1:]
return output.Error
return output.Cursor, output.Error
}

func (d *DataRepository) GetUsersWithBGDataSince(ctx context.Context, lastUpdated time.Time) ([]string, error) {
Expand Down
4 changes: 2 additions & 2 deletions data/summary/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (r *Repo[T, A]) GetSummary(ctx context.Context, userId string) (*types.Summ
return nil, errors.New("userId is missing")
}

summary := types.Create[T, A](userId)
summary := types.Create[A](userId)
selector := bson.M{
"userId": userId,
"type": summary.Type,
Expand Down Expand Up @@ -201,7 +201,7 @@ func (r *Repo[T, A]) SetOutdated(ctx context.Context, userId, reason string) (*t
}

if userSummary == nil {
userSummary = types.Create[T, A](userId)
userSummary = types.Create[A](userId)
}

userSummary.SetOutdated(reason)
Expand Down
4 changes: 2 additions & 2 deletions data/summary/summary.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func (c *GlucoseSummarizer[T, A]) BackfillSummaries(ctx context.Context) (int, e

summaries := make([]*types.Summary[T, A], 0, len(userIDsReqBackfill))
for _, userID := range userIDsReqBackfill {
s := types.Create[T, A](userID)
s := types.Create[A](userID)
s.SetOutdated(types.OutdatedReasonBackfill)
summaries = append(summaries, s)

Expand Down Expand Up @@ -165,7 +165,7 @@ func (c *GlucoseSummarizer[T, A]) UpdateSummary(ctx context.Context, userId stri

// user has no usable summary for incremental update
if userSummary == nil {
userSummary = types.Create[T, A](userId)
userSummary = types.Create[A](userId)
}

if userSummary.Config.SchemaVersion != types.SchemaVersion {
Expand Down
40 changes: 25 additions & 15 deletions data/summary/types/bgm.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package types
import (
"context"
"errors"
"fmt"
"go.mongodb.org/mongo-driver/mongo"
"strconv"
"time"
Expand Down Expand Up @@ -128,21 +129,30 @@ func (s *BGMStats) GetBucketDate(i int) time.Time {
return s.Buckets[i].Date
}

func (s *BGMStats) Update(ctx context.Context, userData *mongo.Cursor) error {
//userDataTyped, ok := userData.([]*glucoseDatum.Glucose)
//if !ok {
// return errors.New("BGM records for calculation is not compatible with Glucose type")
//}
//
//for i, v := range userDataTyped {
// if v.Type != selfmonitored.Type {
// return fmt.Errorf("Non-BGM data provided for BGM summary update at position %d", i)
// }
//}

err := AddData[*glucoseDatum.Glucose](ctx, &s.Buckets, userData)
if err != nil {
return err
func (s *BGMStats) Update(ctx context.Context, cursor *mongo.Cursor) error {
var userData []*glucoseDatum.Glucose = nil
var r *glucoseDatum.Glucose
var currentBucket *Bucket[*BGMBucketData, BGMBucketData]
var err error

for cursor.Next(ctx) {
if userData == nil {
userData = make([]*glucoseDatum.Glucose, 0, cursor.RemainingBatchLength())
}

if err = cursor.Decode(r); err != nil {
return fmt.Errorf("unable to decode userData: %w", err)
}
userData = append(userData, r)

// we call AddData before each network call to the db to reduce thrashing
if cursor.RemainingBatchLength() != 0 {
currentBucket, err = AddData(&s.Buckets, userData, currentBucket)
if err != nil {
return err
}
userData = nil
}
}

s.CalculateSummary()
Expand Down
40 changes: 25 additions & 15 deletions data/summary/types/cgm.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package types
import (
"context"
"errors"
"fmt"
"go.mongodb.org/mongo-driver/mongo"
"strconv"
"time"
Expand Down Expand Up @@ -183,21 +184,30 @@ func (s *CGMStats) GetBucketDate(i int) time.Time {
return s.Buckets[i].Date
}

func (s *CGMStats) Update(ctx context.Context, userData *mongo.Cursor) error {
//userDataTyped, ok := userData.([]*glucoseDatum.Glucose)
//if !ok {
// return errors.New("CGM records for calculation is not compatible with Glucose type")
//}
//
//for i, v := range userDataTyped {
// if v.Type != continuous.Type {
// return fmt.Errorf("Non-CGM data provided for CGM summary update at position %d", i)
// }
//}

err := AddData[*glucoseDatum.Glucose](ctx, &s.Buckets, userData)
if err != nil {
return err
func (s *CGMStats) Update(ctx context.Context, cursor *mongo.Cursor) error {
var userData []*glucoseDatum.Glucose = nil
var r *glucoseDatum.Glucose
var currentBucket *Bucket[*CGMBucketData, CGMBucketData]
var err error

for cursor.Next(ctx) {
if userData == nil {
userData = make([]*glucoseDatum.Glucose, 0, cursor.RemainingBatchLength())
}

if err = cursor.Decode(r); err != nil {
return fmt.Errorf("unable to decode userData: %w", err)
}
userData = append(userData, r)

// we call AddData before each network call to the db to reduce thrashing
if cursor.RemainingBatchLength() != 0 {
currentBucket, err = AddData(&s.Buckets, userData, currentBucket)
if err != nil {
return err
}
userData = nil
}
}

s.CalculateSummary()
Expand Down
Loading

0 comments on commit 9c53b93

Please sign in to comment.