Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BACK-3097] Final index and changes from sharded cluster in prep for sharding. #783

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 60 additions & 10 deletions data/store/mongo/mongo_datum.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,17 @@ func (d *DatumRepository) EnsureIndexes() error {
Options: options.Index().
SetName("UserIdTypeWeighted_v2"),
},
{
Keys: bson.D{
{Key: "_userId", Value: 1},
{Key: "type", Value: 1},
{Key: "time", Value: 1},
{Key: "_active", Value: 1},
{Key: "modifiedTime", Value: 1},
},
Options: options.Index().
SetName("ShardKeyIndex"),
},
{
Keys: bson.D{
{Key: "_userId", Value: 1},
Expand All @@ -71,6 +82,25 @@ func (d *DatumRepository) EnsureIndexes() error {
},
}),
},
{
Keys: bson.D{
{Key: "_userId", Value: 1},
{Key: "_active", Value: 1},
{Key: "type", Value: 1},
{Key: "modifiedTime", Value: 1},
{Key: "time", Value: 1},
},
Options: options.Index().
SetName("UserIdActiveTypeModifiedTimeTime").
SetPartialFilterExpression(bson.D{
{
Key: "time",
Value: bson.D{
{Key: "$gt", Value: lowerTimeBound},
},
},
}),
},
{
Keys: bson.D{
{Key: "_userId", Value: 1},
Expand All @@ -84,6 +114,10 @@ func (d *DatumRepository) EnsureIndexes() error {
}).
SetName("UserIdOriginId"),
},
// Future optimization after release.
// Rebuild index to to move _active
// before type for better compression and more
// closely follow ESR
{
Keys: bson.D{
{Key: "uploadId", Value: 1},
Expand All @@ -94,6 +128,14 @@ func (d *DatumRepository) EnsureIndexes() error {
Options: options.Index().
SetName("UploadId"),
},

// Future optimization - remove the PFE on deviceId as the Base datum
// already makes sure it exists and prod DB has already been checked to
// ensure there are no datums w/ no deviceId. Other possible
// optimization remove the _active in the PFE to use this in the
// ArchiveDeviceDataUsingHashesFromDataSet > Distinct quiery. Can also
// remove "type" field w/ corresponding removal of "$ne": "upload" in
// queries where appropriate.
{
Keys: bson.D{
{Key: "_userId", Value: 1},
Expand Down Expand Up @@ -138,7 +180,6 @@ func (d *DatumRepository) CreateDataSetData(ctx context.Context, dataSet *upload
datum.SetDataSetID(dataSet.UploadID)
datum.SetCreatedTime(&timestamp)
datum.SetModifiedTime(&timestamp)
datum.SetModifiedTime(&timestamp)
insertData = append(insertData, mongo.NewInsertOneModel().SetDocument(datum))
}

Expand Down Expand Up @@ -173,7 +214,7 @@ func (d *DatumRepository) ActivateDataSetData(ctx context.Context, dataSet *uplo

selector["_userId"] = dataSet.UserID
selector["uploadId"] = dataSet.UploadID
selector["type"] = bson.M{"$ne": "upload"}
selector["type"] = bson.M{"$ne": "upload"} // Note we WILL keep the "type" field in the UploadId index as that's a query need in tide-whisperer
selector["_active"] = false
selector["deletedTime"] = bson.M{"$exists": false}
set := bson.M{
Expand Down Expand Up @@ -358,18 +399,20 @@ func (d *DatumRepository) ArchiveDeviceDataUsingHashesFromDataSet(ctx context.Co

var updateInfo *mongo.UpdateResult

// Note that the "DeduplicatorHash" index is NOT used here as the fields in the query don't match the the index definition. On average an upload only has one device anyways (P90 ~ 1). However the "DeduplicatorHash" index is still useful for the UpdateMany operation that follows.
selector := bson.M{
"_userId": dataSet.UserID,
"uploadId": dataSet.UploadID,
"type": bson.M{"$ne": "upload"},
"_userId": dataSet.UserID,
"uploadId": dataSet.UploadID,
"type": bson.M{"$ne": "upload"},
"_deduplicator.hash": bson.M{"$ne": nil},
}

hashes, err := d.Distinct(ctx, "_deduplicator.hash", selector)
if err == nil && len(hashes) > 0 {
selector = bson.M{
"_userId": dataSet.UserID,
"deviceId": *dataSet.DeviceID,
"type": bson.M{"$ne": "upload"},
"type": bson.M{"$ne": "upload"}, // Until we update the indexes to NOT have type, the planner will sometimes not use the correct index w/o the type range so we are leaving $ne upload in some cases. The actual performance and size gains are minor (~5%) TODO: for a future update, create a version of the index WITHOUT the type
"_active": true,
"_deduplicator.hash": bson.M{"$in": hashes},
}
Expand Down Expand Up @@ -675,6 +718,7 @@ func (d *DatumRepository) getTimeRange(ctx context.Context, userId string, typ [
}

findOptions := options.Find()
findOptions.SetProjection(bson.M{"_id": 0, "time": 1})
findOptions.SetSort(bson.D{{Key: "time", Value: -1}})
findOptions.SetLimit(1)

Expand Down Expand Up @@ -715,8 +759,10 @@ func (d *DatumRepository) populateLastUpload(ctx context.Context, userId string,
selector["type"] = bson.M{"$in": typ}
}

findOptions := options.Find()
findOptions.SetHint("UserIdActiveTypeTimeModifiedTime")
findOptions := options.Find().SetProjection(bson.M{"_id": 0, "modifiedTime": 1, "createdTime": 1})
if lowerTimeBound, err := time.Parse(time.RFC3339, LowerTimeIndexRaw); err == nil && status.FirstData.After(lowerTimeBound) {
findOptions.SetHint("UserIdActiveTypeModifiedTimeTime")
}
findOptions.SetLimit(1)
findOptions.SetSort(bson.D{{Key: "modifiedTime", Value: -1}})

Expand Down Expand Up @@ -764,7 +810,8 @@ func (d *DatumRepository) populateEarliestModified(ctx context.Context, userId s

findOptions := options.Find()
findOptions.SetLimit(1)
findOptions.SetSort(bson.D{{Key: "time", Value: 1}})
findOptions.SetSort(bson.D{{Key: "time", Value: 1}}).
SetProjection(bson.M{"_id": 0, "time": 1})

// this skips using modifiedTime on fresh calculations as it may cause trouble with initial calculation of summaries
// for users with only data old enough to not have a modifiedTime, which would be excluded by this.
Expand All @@ -773,7 +820,10 @@ func (d *DatumRepository) populateEarliestModified(ctx context.Context, userId s
selector["modifiedTime"] = bson.M{
"$gt": status.LastUpdated,
}
findOptions.SetHint("UserIdActiveTypeTimeModifiedTime")
if lowerTimeBound, err := time.Parse(time.RFC3339, LowerTimeIndexRaw); err == nil && status.FirstData.After(lowerTimeBound) {
// has blocking sort, but more selective so usually performs better.
findOptions.SetHint("UserIdActiveTypeModifiedTimeTime")
}
}

var cursor *mongo.Cursor
Expand Down
11 changes: 11 additions & 0 deletions data/store/mongo/mongo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,13 +310,24 @@ var _ = Describe("Mongo", Label("mongodb", "slow", "integration"), func() {
"Key": Equal(storeStructuredMongoTest.MakeKeySlice("_userId", "_active", "type", "-time")),
"Name": Equal("UserIdTypeWeighted_v2"),
}),
MatchFields(IgnoreExtras, Fields{
"Key": Equal(storeStructuredMongoTest.MakeKeySlice("_userId", "type", "time", "_active", "modifiedTime")),
"Name": Equal("ShardKeyIndex"),
}),
MatchFields(IgnoreExtras, Fields{
"Key": Equal(storeStructuredMongoTest.MakeKeySlice("_userId", "_active", "type", "time", "modifiedTime")),
"Name": Equal("UserIdActiveTypeTimeModifiedTime"),
"PartialFilterExpression": Equal(bson.D{
{Key: "time", Value: bson.D{{Key: "$gt", Value: primitive.NewDateTimeFromTime(lowerTimeIndex)}}},
}),
}),
MatchFields(IgnoreExtras, Fields{
"Key": Equal(storeStructuredMongoTest.MakeKeySlice("_userId", "_active", "type", "modifiedTime", "time")),
"Name": Equal("UserIdActiveTypeModifiedTimeTime"),
"PartialFilterExpression": Equal(bson.D{
{Key: "time", Value: bson.D{{Key: "$gt", Value: primitive.NewDateTimeFromTime(lowerTimeIndex)}}},
}),
}),
MatchFields(IgnoreExtras, Fields{
"Key": Equal(storeStructuredMongoTest.MakeKeySlice("_userId", "origin.id", "-deletedTime", "_active")),
"Name": Equal("UserIdOriginId"),
Expand Down