Skip to content

Commit

Permalink
Hint indexes where appropriate.
Browse files Browse the repository at this point in the history
  • Loading branch information
lostlevels committed Jun 23, 2024
1 parent de796e8 commit 67e7a7e
Show file tree
Hide file tree
Showing 8 changed files with 93 additions and 41 deletions.
2 changes: 1 addition & 1 deletion auth/service/service/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ var _ = Describe("Service", func() {
server = NewServer()
server.AppendHandlers(
CombineHandlers(
VerifyRequest("POST", "/auth/serverlogin"),
VerifyRequest("POST", "/serverlogin"), // by default the path prefix is empty to the auth service unless set in the env var TIDEPOOL_AUTH_CLIENT_EXTERNAL_PATH_PREFIX
VerifyHeaderKV("X-Tidepool-Server-Name", *provider.NameOutput),
VerifyHeaderKV("X-Tidepool-Server-Secret", serverSecret),
VerifyBody(nil),
Expand Down
2 changes: 1 addition & 1 deletion blob/service/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ var _ = Describe("Service", func() {
server = NewServer()
server.AppendHandlers(
CombineHandlers(
VerifyRequest(http.MethodPost, "/auth/serverlogin"),
VerifyRequest(http.MethodPost, "/serverlogin"),
VerifyHeaderKV("X-Tidepool-Server-Name", *provider.NameOutput),
VerifyHeaderKV("X-Tidepool-Server-Secret", serverSecret),
VerifyBody(nil),
Expand Down
29 changes: 17 additions & 12 deletions data/store/mongo/mongo_data_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,14 @@ func (d *DataSetRepository) EnsureIndexes() error {
Keys: bson.D{
{Key: "_userId", Value: 1},
{Key: "_active", Value: 1},
{Key: "type", Value: 1},
{Key: "time", Value: -1},
},
Options: options.Index().
SetBackground(true).
SetName("UserIdTypeWeighted_v2"),
},
{
Keys: bson.D{
{Key: "origin.id", Value: 1},
{Key: "type", Value: 1},
{Key: "deletedTime", Value: -1},
{Key: "_active", Value: 1},
},
Expand All @@ -60,15 +57,14 @@ func (d *DataSetRepository) EnsureIndexes() error {
SetUnique(true).
SetName("UniqueUploadId"),
},
// TODO UploadId and UniqueUploadId is always unique in deviceDataSets so remove one or the other.
{
Keys: bson.D{
{Key: "uploadId", Value: 1},
{Key: "type", Value: 1},
{Key: "deletedTime", Value: -1},
{Key: "_active", Value: 1},
},
Options: options.Index().
SetBackground(true).
SetName("UploadId"),
},
{
Expand All @@ -80,14 +76,25 @@ func (d *DataSetRepository) EnsureIndexes() error {
{Key: "_deduplicator.hash", Value: 1},
},
Options: options.Index().
SetBackground(true).
SetPartialFilterExpression(bson.D{
{Key: "_active", Value: true},
{Key: "_deduplicator.hash", Value: bson.D{{Key: "$exists", Value: true}}},
{Key: "deviceId", Value: bson.D{{Key: "$exists", Value: true}}},
}).
SetName("DeduplicatorHash"),
},
{
Keys: bson.D{
{Key: "_userId", Value: 1},
{Key: "_active", Value: 1},
{Key: "client.name", Value: 1},
},
Options: options.Index().
SetPartialFilterExpression(bson.D{
{Key: "client.name", Value: bson.D{{Key: "$exists", Value: true}}},
}).
SetName("ClientName"),
},
})
}

Expand Down Expand Up @@ -261,7 +268,6 @@ func (d *DataSetRepository) ListUserDataSets(ctx context.Context, userID string,
selector := bson.M{
"_active": true,
"_userId": userID,
"type": "upload",
}
if filter.ClientName != nil {
selector["client.name"] = *filter.ClientName
Expand Down Expand Up @@ -315,7 +321,6 @@ func (d *DataSetRepository) GetDataSetsForUserByID(ctx context.Context, userID s
selector := bson.M{
"_active": true,
"_userId": userID,
"type": "upload",
}
if !filter.Deleted {
selector["deletedTime"] = bson.M{"$exists": false}
Expand All @@ -324,14 +329,14 @@ func (d *DataSetRepository) GetDataSetsForUserByID(ctx context.Context, userID s
SetSort(bson.M{"createdTime": -1})
cursor, err := d.Find(ctx, selector, opts)

loggerFields := log.Fields{"userId": userID, "dataSetsCount": len(dataSets), "duration": time.Since(now) / time.Microsecond}
log.LoggerFromContext(ctx).WithFields(loggerFields).WithError(err).Debug("GetDataSetsForUserByID")

if err != nil {
return nil, errors.Wrap(err, "unable to get data sets for user by id")
}

if err = cursor.All(ctx, &dataSets); err != nil {
err = cursor.All(ctx, &dataSets)
loggerFields := log.Fields{"userId": userID, "dataSetsCount": len(dataSets), "duration": time.Since(now) / time.Microsecond}
log.LoggerFromContext(ctx).WithFields(loggerFields).WithError(err).Debug("GetDataSetsForUserByID")
if err != nil {
return nil, errors.Wrap(err, "unable to decode data sets for user by id")
}

Expand Down
88 changes: 64 additions & 24 deletions data/store/mongo/mongo_datum.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,26 @@ func (d *DatumRepository) EnsureIndexes() error {
},
}),
},
{
Keys: bson.D{
{Key: "_userId", Value: 1},
{Key: "_active", Value: 1},
{Key: "type", Value: 1},
{Key: "modifiedTime", Value: 1},
{Key: "time", Value: 1},
},
// temp name because that's what I have it as tested.
Options: options.Index().
SetName("TestUserIdActiveTypeModifiedTimeTime").
SetPartialFilterExpression(bson.D{
{
Key: "time",
Value: bson.D{
{Key: "$gt", Value: lowerTimeBound},
},
},
}),
},
{
Keys: bson.D{
{Key: "origin.id", Value: 1},
Expand Down Expand Up @@ -164,7 +184,7 @@ func (d *DatumRepository) ActivateDataSetData(ctx context.Context, dataSet *uplo
if err := validateDataSet(dataSet); err != nil {
return err
}
selector, err := validateAndTranslateSelectors(selectors)
selector, _, err := validateAndTranslateSelectors(selectors)
if err != nil {
return err
}
Expand Down Expand Up @@ -204,7 +224,7 @@ func (d *DatumRepository) ArchiveDataSetData(ctx context.Context, dataSet *uploa
if err := validateDataSet(dataSet); err != nil {
return err
}
selector, err := validateAndTranslateSelectors(selectors)
selector, _, err := validateAndTranslateSelectors(selectors)
if err != nil {
return err
}
Expand Down Expand Up @@ -244,7 +264,7 @@ func (d *DatumRepository) DeleteDataSetData(ctx context.Context, dataSet *upload
if err := validateDataSet(dataSet); err != nil {
return err
}
selector, err := validateAndTranslateSelectors(selectors)
selector, hasOriginID, err := validateAndTranslateSelectors(selectors)
if err != nil {
return err
}
Expand All @@ -255,7 +275,7 @@ func (d *DatumRepository) DeleteDataSetData(ctx context.Context, dataSet *upload

selector["_userId"] = dataSet.UserID
selector["uploadId"] = dataSet.UploadID
selector["type"] = bson.M{"$ne": "upload"}
// selector["type"] = bson.M{"$ne": "upload"}
selector["deletedTime"] = bson.M{"$exists": false}
set := bson.M{
"_active": false,
Expand All @@ -268,7 +288,11 @@ func (d *DatumRepository) DeleteDataSetData(ctx context.Context, dataSet *upload
"deletedUserId": 1,
"modifiedUserId": 1,
}
changeInfo, err := d.UpdateMany(ctx, selector, d.ConstructUpdate(set, unset))
opts := options.Update()
if hasOriginID {
opts.SetHint("OriginId")
}
changeInfo, err := d.UpdateMany(ctx, selector, d.ConstructUpdate(set, unset), opts)
if err != nil {
logger.WithError(err).Error("Unable to delete data set data")
return fmt.Errorf("unable to delete data set data: %w", err)
Expand All @@ -285,7 +309,7 @@ func (d *DatumRepository) DestroyDeletedDataSetData(ctx context.Context, dataSet
if err := validateDataSet(dataSet); err != nil {
return err
}
selector, err := validateAndTranslateSelectors(selectors)
selector, hasOriginID, err := validateAndTranslateSelectors(selectors)
if err != nil {
return err
}
Expand All @@ -295,9 +319,13 @@ func (d *DatumRepository) DestroyDeletedDataSetData(ctx context.Context, dataSet

selector["_userId"] = dataSet.UserID
selector["uploadId"] = dataSet.UploadID
selector["type"] = bson.M{"$ne": "upload"}
// selector["type"] = bson.M{"$ne": "upload"}
selector["deletedTime"] = bson.M{"$exists": true}
changeInfo, err := d.DeleteMany(ctx, selector)
opts := options.Delete()
if hasOriginID {
opts.SetHint("OriginId")
}
changeInfo, err := d.DeleteMany(ctx, selector, opts)
if err != nil {
logger.WithError(err).Error("Unable to destroy deleted data set data")
return fmt.Errorf("unable to destroy deleted data set data: %w", err)
Expand All @@ -314,7 +342,7 @@ func (d *DatumRepository) DestroyDataSetData(ctx context.Context, dataSet *uploa
if err := validateDataSet(dataSet); err != nil {
return err
}
selector, err := validateAndTranslateSelectors(selectors)
selector, _, err := validateAndTranslateSelectors(selectors)
if err != nil {
return err
}
Expand Down Expand Up @@ -360,9 +388,9 @@ func (d *DatumRepository) ArchiveDeviceDataUsingHashesFromDataSet(ctx context.Co
hashes, err := d.Distinct(ctx, "_deduplicator.hash", selector)
if err == nil && len(hashes) > 0 {
selector = bson.M{
"_userId": dataSet.UserID,
"deviceId": *dataSet.DeviceID,
"type": bson.M{"$ne": "upload"},
"_userId": dataSet.UserID,
"deviceId": *dataSet.DeviceID,
// "type": bson.M{"$ne": "upload"},
"_active": true,
"_deduplicator.hash": bson.M{"$in": hashes},
}
Expand All @@ -373,7 +401,8 @@ func (d *DatumRepository) ArchiveDeviceDataUsingHashesFromDataSet(ctx context.Co
"modifiedTime": timestamp,
}
unset := bson.M{}
updateInfo, err = d.UpdateMany(ctx, selector, d.ConstructUpdate(set, unset))
opts := options.Update().SetHint("DeduplicatorHash")
updateInfo, err = d.UpdateMany(ctx, selector, d.ConstructUpdate(set, unset), opts)
}

loggerFields := log.Fields{"userId": dataSet.UserID, "deviceId": *dataSet.DeviceID, "updateInfo": updateInfo, "duration": time.Since(now) / time.Microsecond}
Expand Down Expand Up @@ -487,11 +516,11 @@ func (d *DatumRepository) UnarchiveDeviceDataUsingHashesFromDataSet(ctx context.
return overallErr
}

func validateAndTranslateSelectors(selectors *data.Selectors) (bson.M, error) {
func validateAndTranslateSelectors(selectors *data.Selectors) (filter bson.M, hasOriginID bool, err error) {
if selectors == nil {
return bson.M{}, nil
return bson.M{}, false, nil
} else if err := structureValidator.New().Validate(selectors); err != nil {
return nil, errors.Join(ErrSelectorsInvalid, err)
return nil, false, errors.Join(ErrSelectorsInvalid, err)
}

var selectorIDs []string
Expand All @@ -501,6 +530,7 @@ func validateAndTranslateSelectors(selectors *data.Selectors) (bson.M, error) {
if selector.ID != nil {
selectorIDs = append(selectorIDs, *selector.ID)
} else if selector.Origin != nil && selector.Origin.ID != nil {
hasOriginID = true
selectorOriginIDs = append(selectorOriginIDs, *selector.Origin.ID)
}
}
Expand All @@ -519,10 +549,10 @@ func validateAndTranslateSelectors(selectors *data.Selectors) (bson.M, error) {
}

if len(selector) == 0 {
return nil, errors.New("selectors is invalid")
return nil, false, errors.New("selectors is invalid")
}

return selector, nil
return selector, hasOriginID, nil
}

func (d *DatumRepository) GetDataRange(ctx context.Context, userId string, typ []string, status *data.UserDataStatus) (*mongo.Cursor, error) {
Expand Down Expand Up @@ -693,11 +723,12 @@ func (d *DatumRepository) getTimeRange(ctx context.Context, userId string, typ [

func (d *DatumRepository) populateLastUpload(ctx context.Context, userId string, typ []string, status *data.UserDataStatus) (err error) {
// get latest modified record
timeMin := status.FirstData
selector := bson.M{
"_userId": userId,
"_active": bson.M{"$in": bson.A{true, false}},
"time": bson.M{
"$gte": status.FirstData,
"$gte": timeMin,
"$lte": status.LastData,
},
}
Expand All @@ -708,8 +739,11 @@ func (d *DatumRepository) populateLastUpload(ctx context.Context, userId string,
selector["type"] = bson.M{"$in": typ}
}

findOptions := options.Find()
findOptions.SetHint("UserIdActiveTypeTimeModifiedTime")
findOptions := options.Find().SetProjection(bson.M{"_id": 0, "modifiedTime": 1, "createdTime": 1})
if lowerTimeBound, err := time.Parse(time.RFC3339, LowerTimeIndexRaw); err == nil && timeMin.After(lowerTimeBound) {
// has blocking sort, but more selective so usually performs better.
findOptions.SetHint("UserIdActiveTypeTimeModifiedTime")
}
findOptions.SetLimit(1)
findOptions.SetSort(bson.D{{Key: "modifiedTime", Value: -1}})

Expand Down Expand Up @@ -740,11 +774,12 @@ func (d *DatumRepository) populateLastUpload(ctx context.Context, userId string,

func (d *DatumRepository) populateEarliestModified(ctx context.Context, userId string, typ []string, status *data.UserDataStatus) (err error) {
// get earliest modified record which is newer than LastUpdated
timeMin := status.FirstData
selector := bson.M{
"_userId": userId,
"_active": bson.M{"$in": bson.A{true, false}},
"time": bson.M{
"$gte": status.FirstData,
"$gte": timeMin,
"$lte": status.LastData,
},
}
Expand All @@ -757,7 +792,8 @@ func (d *DatumRepository) populateEarliestModified(ctx context.Context, userId s

findOptions := options.Find()
findOptions.SetLimit(1)
findOptions.SetSort(bson.D{{Key: "time", Value: 1}})
findOptions.SetSort(bson.D{{Key: "time", Value: 1}}).
SetProjection(bson.M{"_id": 0, "time": 1})

// this skips using modifiedTime on fresh calculations as it may cause trouble with initial calculation of summaries
// for users with only data old enough to not have a modifiedTime, which would be excluded by this.
Expand All @@ -766,7 +802,10 @@ func (d *DatumRepository) populateEarliestModified(ctx context.Context, userId s
selector["modifiedTime"] = bson.M{
"$gt": status.LastUpdated,
}
findOptions.SetHint("UserIdActiveTypeTimeModifiedTime")
if lowerTimeBound, err := time.Parse(time.RFC3339, LowerTimeIndexRaw); err == nil && timeMin.After(lowerTimeBound) {
// has blocking sort, but more selective so usually performs better.
findOptions.SetHint("TestUserIdActiveTypeModifiedTimeTime")
}
}

var cursor *mongo.Cursor
Expand Down Expand Up @@ -854,6 +893,7 @@ func (d *DatumRepository) DistinctUserIDs(ctx context.Context, typ []string) ([]
pastCutoff := time.Now().AddDate(0, -23, -20).UTC()
futureCutoff := time.Now().AddDate(0, 0, 1).UTC()

// TODO: maybe reevaluate, scatters / broadcasts on sharded cluster
selector := bson.M{
"_userId": bson.M{"$ne": -1111},
"_active": true,
Expand Down
7 changes: 7 additions & 0 deletions data/store/mongo/mongo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,13 @@ var _ = Describe("Mongo", func() {
{Key: "time", Value: bson.D{{Key: "$gt", Value: primitive.NewDateTimeFromTime(lowerTimeIndex)}}},
}),
}),
MatchFields(IgnoreExtras, Fields{
"Key": Equal(storeStructuredMongoTest.MakeKeySlice("_userId", "_active", "type", "modifiedTime", "time")),
"Name": Equal("TestUserIdActiveTypeModifiedTimeTime"),
"PartialFilterExpression": Equal(bson.D{
{Key: "time", Value: bson.D{{Key: "$gt", Value: primitive.NewDateTimeFromTime(lowerTimeIndex)}}},
}),
}),
MatchFields(IgnoreExtras, Fields{
"Key": Equal(storeStructuredMongoTest.MakeKeySlice("origin.id", "type", "-deletedTime", "_active")),
"Background": Equal(true),
Expand Down
2 changes: 1 addition & 1 deletion prescription/application/application_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ var _ = Describe("Application", func() {
server = NewServer()
server.AppendHandlers(
CombineHandlers(
VerifyRequest(http.MethodPost, "/auth/serverlogin"),
VerifyRequest(http.MethodPost, "/serverlogin"),
VerifyHeaderKV("X-Tidepool-Server-Name", *prvdr.NameOutput),
VerifyHeaderKV("X-Tidepool-Server-Secret", serverSecret),
VerifyBody(nil),
Expand Down
2 changes: 1 addition & 1 deletion service/service/DEPRECATED_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ var _ = Describe("DEPRECATEDService", func() {
Expect(server).ToNot(BeNil())
server.AppendHandlers(
CombineHandlers(
VerifyRequest("POST", "/auth/serverlogin"),
VerifyRequest("POST", "/serverlogin"),
VerifyHeaderKV("X-Tidepool-Server-Name", *provider.NameOutput),
VerifyHeaderKV("X-Tidepool-Server-Secret", serverSecret),
VerifyBody(nil),
Expand Down
2 changes: 1 addition & 1 deletion task/service/service/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ var _ = Describe("Service", func() {
server = NewServer()
server.AppendHandlers(
CombineHandlers(
VerifyRequest("POST", "/auth/serverlogin"),
VerifyRequest("POST", "/serverlogin"),
VerifyHeaderKV("X-Tidepool-Server-Name", *provider.NameOutput),
VerifyHeaderKV("X-Tidepool-Server-Secret", serverSecret),
VerifyBody(nil),
Expand Down

0 comments on commit 67e7a7e

Please sign in to comment.