Skip to content

Commit

Permalink
distributor: Increase test polling duration to allow the cache suffic… (
Browse files Browse the repository at this point in the history
  • Loading branch information
NickAnge authored Jan 9, 2025
1 parent ce6871c commit 33ca7b5
Showing 1 changed file with 22 additions and 21 deletions.
43 changes: 22 additions & 21 deletions pkg/distributor/ha_tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,8 @@ func TestHaTrackerWithMemberList(t *testing.T) {
// Update KVStore - this should elect replica 2.
tracker.updateKVStoreAll(context.Background(), now)

checkReplicaTimestamp(t, time.Second, tracker, "user", cluster, replica2, now, now)
// Evaluate up to 2 seconds to verify whether the tracker’s cache replica has been updated to r2.
checkReplicaTimestamp(t, 2*time.Second, tracker, "user", cluster, replica2, now, now)

// Now we should accept from replica 2.
err = tracker.checkReplica(context.Background(), "user", cluster, replica2, now)
Expand Down Expand Up @@ -477,7 +478,7 @@ func TestHATrackerWatchPrefixAssignment(t *testing.T) {
assert.NoError(t, err)

// Check to see if the value in the trackers cache is correct.
checkReplicaTimestamp(t, time.Second, c, "user", cluster, replica, now, now)
checkReplicaTimestamp(t, 2*time.Second, c, "user", cluster, replica, now, now)
}

func TestHATrackerCheckReplicaOverwriteTimeout(t *testing.T) {
Expand Down Expand Up @@ -515,7 +516,7 @@ func TestHATrackerCheckReplicaOverwriteTimeout(t *testing.T) {
// Update KVStore - this should elect replica 2.
c.updateKVStoreAll(context.Background(), now)

checkReplicaTimestamp(t, time.Second, c, "user", "test", replica2, now, now)
checkReplicaTimestamp(t, 2*time.Second, c, "user", "test", replica2, now, now)

// Now we should accept from replica 2.
err = c.checkReplica(context.Background(), "user", "test", replica2, now)
Expand Down Expand Up @@ -624,7 +625,7 @@ func TestHATrackerCheckReplicaMultiClusterTimeout(t *testing.T) {
err = c.checkReplica(context.Background(), "user", "c1", replica2, now)
assert.Error(t, err)
c.updateKVStoreAll(context.Background(), now)
checkReplicaTimestamp(t, time.Second, c, "user", "c1", replica2, now, now)
checkReplicaTimestamp(t, 2*time.Second, c, "user", "c1", replica2, now, now)

// Accept a sample from c1/replica2.
err = c.checkReplica(context.Background(), "user", "c1", replica2, now)
Expand Down Expand Up @@ -677,21 +678,21 @@ func TestHATrackerCheckReplicaUpdateTimeout(t *testing.T) {
err = c.checkReplica(context.Background(), user, cluster, replica, startTime)
assert.NoError(t, err)

checkReplicaTimestamp(t, time.Second, c, user, cluster, replica, startTime, startTime)
checkReplicaTimestamp(t, 2*time.Second, c, user, cluster, replica, startTime, startTime)

// Timestamp should not update here, since time has not advanced.
err = c.checkReplica(context.Background(), user, cluster, replica, startTime)
assert.NoError(t, err)

checkReplicaTimestamp(t, time.Second, c, user, cluster, replica, startTime, startTime)
checkReplicaTimestamp(t, 2*time.Second, c, user, cluster, replica, startTime, startTime)

// Wait 500ms and the timestamp should still not update.
updateTime := time.Unix(0, startTime.UnixNano()).Add(500 * time.Millisecond)
c.updateKVStoreAll(context.Background(), updateTime)

err = c.checkReplica(context.Background(), user, cluster, replica, updateTime)
assert.NoError(t, err)
checkReplicaTimestamp(t, time.Second, c, user, cluster, replica, startTime, startTime)
checkReplicaTimestamp(t, 2*time.Second, c, user, cluster, replica, startTime, startTime)

receivedAt := updateTime

Expand All @@ -700,7 +701,7 @@ func TestHATrackerCheckReplicaUpdateTimeout(t *testing.T) {
c.updateKVStoreAll(context.Background(), updateTime)

// Timestamp stored in KV should be time when we have received a request (called "checkReplica"), not current time (updateTime).
checkReplicaTimestamp(t, time.Second, c, user, cluster, replica, receivedAt, receivedAt)
checkReplicaTimestamp(t, 2*time.Second, c, user, cluster, replica, receivedAt, receivedAt)

err = c.checkReplica(context.Background(), user, cluster, replica, updateTime)
assert.NoError(t, err)
Expand Down Expand Up @@ -732,22 +733,22 @@ func TestHATrackerCheckReplicaMultiUser(t *testing.T) {
// Write the first time for user 1.
err = c.checkReplica(context.Background(), "user1", cluster, replica, now)
assert.NoError(t, err)
checkReplicaTimestamp(t, time.Second, c, "user1", cluster, replica, now, now)
checkReplicaTimestamp(t, 2*time.Second, c, "user1", cluster, replica, now, now)

// Write the first time for user 2.
err = c.checkReplica(context.Background(), "user2", cluster, replica, now)
assert.NoError(t, err)
checkReplicaTimestamp(t, time.Second, c, "user2", cluster, replica, now, now)
checkReplicaTimestamp(t, 2*time.Second, c, "user2", cluster, replica, now, now)

// Now we've waited > 1s, so the timestamp should update.
updated := now.Add(1100 * time.Millisecond)
err = c.checkReplica(context.Background(), "user1", cluster, replica, updated)
assert.NoError(t, err)
c.updateKVStoreAll(context.Background(), updated)

checkReplicaTimestamp(t, time.Second, c, "user1", cluster, replica, updated, updated)
checkReplicaTimestamp(t, 2*time.Second, c, "user1", cluster, replica, updated, updated)
// No update for user2.
checkReplicaTimestamp(t, time.Second, c, "user2", cluster, replica, now, now)
checkReplicaTimestamp(t, 2*time.Second, c, "user2", cluster, replica, now, now)
}

func TestHATrackerCheckReplicaUpdateTimeoutJitter(t *testing.T) {
Expand Down Expand Up @@ -820,15 +821,15 @@ func TestHATrackerCheckReplicaUpdateTimeoutJitter(t *testing.T) {
// Init the replica in the KV Store
err = c.checkReplica(ctx, "user1", "cluster", "replica-1", testData.startTime)
require.NoError(t, err)
checkReplicaTimestamp(t, time.Second, c, "user1", "cluster", "replica-1", testData.startTime, testData.startTime)
checkReplicaTimestamp(t, 2*time.Second, c, "user1", "cluster", "replica-1", testData.startTime, testData.startTime)

// Refresh the replica in the KV Store
err = c.checkReplica(ctx, "user1", "cluster", "replica-1", testData.updateTime)
require.NoError(t, err)
c.updateKVStoreAll(context.Background(), testData.updateTime)

// Assert on the received timestamp
checkReplicaTimestamp(t, time.Second, c, "user1", "cluster", "replica-1", testData.expectedTimestamp, testData.expectedTimestamp)
checkReplicaTimestamp(t, 2*time.Second, c, "user1", "cluster", "replica-1", testData.expectedTimestamp, testData.expectedTimestamp)
})
}
}
Expand Down Expand Up @@ -931,7 +932,7 @@ func TestHATrackerClustersLimit(t *testing.T) {
assert.Error(t, err)
// Update KVStore.
t1.updateKVStoreAll(context.Background(), now)
checkReplicaTimestamp(t, time.Second, t1, userID, "b", "b2", now, now)
checkReplicaTimestamp(t, 2*time.Second, t1, userID, "b", "b2", now, now)

assert.NoError(t, t1.checkReplica(context.Background(), userID, "b", "b2", now))
waitForClustersUpdate(t, 2, t1, userID)
Expand Down Expand Up @@ -1077,7 +1078,7 @@ func TestHATrackerCheckReplicaCleanup(t *testing.T) {

err = c.checkReplica(context.Background(), userID, cluster, replica, now)
assert.NoError(t, err)
checkReplicaTimestamp(t, time.Second, c, userID, cluster, replica, now, now)
checkReplicaTimestamp(t, 2*time.Second, c, userID, cluster, replica, now, now)

// Replica is not marked for deletion yet.
checkReplicaDeletionState(t, time.Second, c, userID, cluster, true, true, false)
Expand All @@ -1094,7 +1095,7 @@ func TestHATrackerCheckReplicaCleanup(t *testing.T) {
now = time.Now()
err = c.checkReplica(context.Background(), userID, cluster, replica, now)
assert.NoError(t, err)
checkReplicaTimestamp(t, time.Second, c, userID, cluster, replica, now, now) // This also checks that entry is not marked for deletion.
checkReplicaTimestamp(t, 2*time.Second, c, userID, cluster, replica, now, now) // This also checks that entry is not marked for deletion.
checkUserClusters(t, time.Second, c, userID, 1)

// This will mark replica for deletion again (with new time.Now())
Expand Down Expand Up @@ -1246,8 +1247,8 @@ func TestHATrackerChangeInElectedReplicaClearsLastSeenTimestamp(t *testing.T) {

assert.NoError(t, t1.checkReplica(context.Background(), userID, cluster, firstReplica, now))
// Both trackers will see "first" replica as current.
checkReplicaTimestamp(t, time.Second, t1, userID, cluster, firstReplica, now, now)
checkReplicaTimestamp(t, time.Second, t2, userID, cluster, firstReplica, now, now)
checkReplicaTimestamp(t, 2*time.Second, t1, userID, cluster, firstReplica, now, now)
checkReplicaTimestamp(t, 2*time.Second, t2, userID, cluster, firstReplica, now, now)

// Ten seconds later, t1 receives request from first replica again
now = now.Add(10 * time.Second)
Expand All @@ -1261,7 +1262,7 @@ func TestHATrackerChangeInElectedReplicaClearsLastSeenTimestamp(t *testing.T) {
t2.updateKVStoreAll(context.Background(), now)

// t1 is reading updates from KV store, and should see second replica being the elected one.
checkReplicaTimestamp(t, time.Second, t1, userID, cluster, secondReplica, secondReplicaReceivedAtT2, secondReplicaReceivedAtT2)
checkReplicaTimestamp(t, 2*time.Second, t1, userID, cluster, secondReplica, secondReplicaReceivedAtT2, secondReplicaReceivedAtT2)

// Furthermore, t1 has never seen "second" replica, so it should not have "electedLastSeenTimestamp" set.
{
Expand All @@ -1285,7 +1286,7 @@ func TestHATrackerChangeInElectedReplicaClearsLastSeenTimestamp(t *testing.T) {
t1.updateKVStoreAll(context.Background(), now)

// t2 is reading updates from KV store, and should see "second" replica being the elected one.
checkReplicaTimestamp(t, time.Second, t2, userID, cluster, firstReplica, firstReceivedAtT1, firstReceivedAtT1)
checkReplicaTimestamp(t, 2*time.Second, t2, userID, cluster, firstReplica, firstReceivedAtT1, firstReceivedAtT1)

// Since t2 has seen new elected replica too, we should have non-zero "electedLastSeenTimestamp".
{
Expand Down

0 comments on commit 33ca7b5

Please sign in to comment.