diff --git a/pkg/distributor/ha_tracker_test.go b/pkg/distributor/ha_tracker_test.go index 68ef61e417b..fac8c01a5d9 100644 --- a/pkg/distributor/ha_tracker_test.go +++ b/pkg/distributor/ha_tracker_test.go @@ -307,7 +307,8 @@ func TestHaTrackerWithMemberList(t *testing.T) { // Update KVStore - this should elect replica 2. tracker.updateKVStoreAll(context.Background(), now) - checkReplicaTimestamp(t, time.Second, tracker, "user", cluster, replica2, now, now) + // Evaluate up to 2 seconds to verify whether the tracker’s cache replica has been updated to r2. + checkReplicaTimestamp(t, 2*time.Second, tracker, "user", cluster, replica2, now, now) // Now we should accept from replica 2. err = tracker.checkReplica(context.Background(), "user", cluster, replica2, now) @@ -477,7 +478,7 @@ func TestHATrackerWatchPrefixAssignment(t *testing.T) { assert.NoError(t, err) // Check to see if the value in the trackers cache is correct. - checkReplicaTimestamp(t, time.Second, c, "user", cluster, replica, now, now) + checkReplicaTimestamp(t, 2*time.Second, c, "user", cluster, replica, now, now) } func TestHATrackerCheckReplicaOverwriteTimeout(t *testing.T) { @@ -515,7 +516,7 @@ func TestHATrackerCheckReplicaOverwriteTimeout(t *testing.T) { // Update KVStore - this should elect replica 2. c.updateKVStoreAll(context.Background(), now) - checkReplicaTimestamp(t, time.Second, c, "user", "test", replica2, now, now) + checkReplicaTimestamp(t, 2*time.Second, c, "user", "test", replica2, now, now) // Now we should accept from replica 2. err = c.checkReplica(context.Background(), "user", "test", replica2, now) @@ -624,7 +625,7 @@ func TestHATrackerCheckReplicaMultiClusterTimeout(t *testing.T) { err = c.checkReplica(context.Background(), "user", "c1", replica2, now) assert.Error(t, err) c.updateKVStoreAll(context.Background(), now) - checkReplicaTimestamp(t, time.Second, c, "user", "c1", replica2, now, now) + checkReplicaTimestamp(t, 2*time.Second, c, "user", "c1", replica2, now, now) // Accept a sample from c1/replica2. err = c.checkReplica(context.Background(), "user", "c1", replica2, now) @@ -677,13 +678,13 @@ func TestHATrackerCheckReplicaUpdateTimeout(t *testing.T) { err = c.checkReplica(context.Background(), user, cluster, replica, startTime) assert.NoError(t, err) - checkReplicaTimestamp(t, time.Second, c, user, cluster, replica, startTime, startTime) + checkReplicaTimestamp(t, 2*time.Second, c, user, cluster, replica, startTime, startTime) // Timestamp should not update here, since time has not advanced. err = c.checkReplica(context.Background(), user, cluster, replica, startTime) assert.NoError(t, err) - checkReplicaTimestamp(t, time.Second, c, user, cluster, replica, startTime, startTime) + checkReplicaTimestamp(t, 2*time.Second, c, user, cluster, replica, startTime, startTime) // Wait 500ms and the timestamp should still not update. updateTime := time.Unix(0, startTime.UnixNano()).Add(500 * time.Millisecond) @@ -691,7 +692,7 @@ func TestHATrackerCheckReplicaUpdateTimeout(t *testing.T) { err = c.checkReplica(context.Background(), user, cluster, replica, updateTime) assert.NoError(t, err) - checkReplicaTimestamp(t, time.Second, c, user, cluster, replica, startTime, startTime) + checkReplicaTimestamp(t, 2*time.Second, c, user, cluster, replica, startTime, startTime) receivedAt := updateTime @@ -700,7 +701,7 @@ func TestHATrackerCheckReplicaUpdateTimeout(t *testing.T) { c.updateKVStoreAll(context.Background(), updateTime) // Timestamp stored in KV should be time when we have received a request (called "checkReplica"), not current time (updateTime). - checkReplicaTimestamp(t, time.Second, c, user, cluster, replica, receivedAt, receivedAt) + checkReplicaTimestamp(t, 2*time.Second, c, user, cluster, replica, receivedAt, receivedAt) err = c.checkReplica(context.Background(), user, cluster, replica, updateTime) assert.NoError(t, err) @@ -732,12 +733,12 @@ func TestHATrackerCheckReplicaMultiUser(t *testing.T) { // Write the first time for user 1. err = c.checkReplica(context.Background(), "user1", cluster, replica, now) assert.NoError(t, err) - checkReplicaTimestamp(t, time.Second, c, "user1", cluster, replica, now, now) + checkReplicaTimestamp(t, 2*time.Second, c, "user1", cluster, replica, now, now) // Write the first time for user 2. err = c.checkReplica(context.Background(), "user2", cluster, replica, now) assert.NoError(t, err) - checkReplicaTimestamp(t, time.Second, c, "user2", cluster, replica, now, now) + checkReplicaTimestamp(t, 2*time.Second, c, "user2", cluster, replica, now, now) // Now we've waited > 1s, so the timestamp should update. updated := now.Add(1100 * time.Millisecond) @@ -745,9 +746,9 @@ func TestHATrackerCheckReplicaMultiUser(t *testing.T) { assert.NoError(t, err) c.updateKVStoreAll(context.Background(), updated) - checkReplicaTimestamp(t, time.Second, c, "user1", cluster, replica, updated, updated) + checkReplicaTimestamp(t, 2*time.Second, c, "user1", cluster, replica, updated, updated) // No update for user2. - checkReplicaTimestamp(t, time.Second, c, "user2", cluster, replica, now, now) + checkReplicaTimestamp(t, 2*time.Second, c, "user2", cluster, replica, now, now) } func TestHATrackerCheckReplicaUpdateTimeoutJitter(t *testing.T) { @@ -820,7 +821,7 @@ func TestHATrackerCheckReplicaUpdateTimeoutJitter(t *testing.T) { // Init the replica in the KV Store err = c.checkReplica(ctx, "user1", "cluster", "replica-1", testData.startTime) require.NoError(t, err) - checkReplicaTimestamp(t, time.Second, c, "user1", "cluster", "replica-1", testData.startTime, testData.startTime) + checkReplicaTimestamp(t, 2*time.Second, c, "user1", "cluster", "replica-1", testData.startTime, testData.startTime) // Refresh the replica in the KV Store err = c.checkReplica(ctx, "user1", "cluster", "replica-1", testData.updateTime) @@ -828,7 +829,7 @@ func TestHATrackerCheckReplicaUpdateTimeoutJitter(t *testing.T) { c.updateKVStoreAll(context.Background(), testData.updateTime) // Assert on the received timestamp - checkReplicaTimestamp(t, time.Second, c, "user1", "cluster", "replica-1", testData.expectedTimestamp, testData.expectedTimestamp) + checkReplicaTimestamp(t, 2*time.Second, c, "user1", "cluster", "replica-1", testData.expectedTimestamp, testData.expectedTimestamp) }) } } @@ -931,7 +932,7 @@ func TestHATrackerClustersLimit(t *testing.T) { assert.Error(t, err) // Update KVStore. t1.updateKVStoreAll(context.Background(), now) - checkReplicaTimestamp(t, time.Second, t1, userID, "b", "b2", now, now) + checkReplicaTimestamp(t, 2*time.Second, t1, userID, "b", "b2", now, now) assert.NoError(t, t1.checkReplica(context.Background(), userID, "b", "b2", now)) waitForClustersUpdate(t, 2, t1, userID) @@ -1077,7 +1078,7 @@ func TestHATrackerCheckReplicaCleanup(t *testing.T) { err = c.checkReplica(context.Background(), userID, cluster, replica, now) assert.NoError(t, err) - checkReplicaTimestamp(t, time.Second, c, userID, cluster, replica, now, now) + checkReplicaTimestamp(t, 2*time.Second, c, userID, cluster, replica, now, now) // Replica is not marked for deletion yet. checkReplicaDeletionState(t, time.Second, c, userID, cluster, true, true, false) @@ -1094,7 +1095,7 @@ func TestHATrackerCheckReplicaCleanup(t *testing.T) { now = time.Now() err = c.checkReplica(context.Background(), userID, cluster, replica, now) assert.NoError(t, err) - checkReplicaTimestamp(t, time.Second, c, userID, cluster, replica, now, now) // This also checks that entry is not marked for deletion. + checkReplicaTimestamp(t, 2*time.Second, c, userID, cluster, replica, now, now) // This also checks that entry is not marked for deletion. checkUserClusters(t, time.Second, c, userID, 1) // This will mark replica for deletion again (with new time.Now()) @@ -1246,8 +1247,8 @@ func TestHATrackerChangeInElectedReplicaClearsLastSeenTimestamp(t *testing.T) { assert.NoError(t, t1.checkReplica(context.Background(), userID, cluster, firstReplica, now)) // Both trackers will see "first" replica as current. - checkReplicaTimestamp(t, time.Second, t1, userID, cluster, firstReplica, now, now) - checkReplicaTimestamp(t, time.Second, t2, userID, cluster, firstReplica, now, now) + checkReplicaTimestamp(t, 2*time.Second, t1, userID, cluster, firstReplica, now, now) + checkReplicaTimestamp(t, 2*time.Second, t2, userID, cluster, firstReplica, now, now) // Ten seconds later, t1 receives request from first replica again now = now.Add(10 * time.Second) @@ -1261,7 +1262,7 @@ func TestHATrackerChangeInElectedReplicaClearsLastSeenTimestamp(t *testing.T) { t2.updateKVStoreAll(context.Background(), now) // t1 is reading updates from KV store, and should see second replica being the elected one. - checkReplicaTimestamp(t, time.Second, t1, userID, cluster, secondReplica, secondReplicaReceivedAtT2, secondReplicaReceivedAtT2) + checkReplicaTimestamp(t, 2*time.Second, t1, userID, cluster, secondReplica, secondReplicaReceivedAtT2, secondReplicaReceivedAtT2) // Furthermore, t1 has never seen "second" replica, so it should not have "electedLastSeenTimestamp" set. { @@ -1285,7 +1286,7 @@ func TestHATrackerChangeInElectedReplicaClearsLastSeenTimestamp(t *testing.T) { t1.updateKVStoreAll(context.Background(), now) // t2 is reading updates from KV store, and should see "second" replica being the elected one. - checkReplicaTimestamp(t, time.Second, t2, userID, cluster, firstReplica, firstReceivedAtT1, firstReceivedAtT1) + checkReplicaTimestamp(t, 2*time.Second, t2, userID, cluster, firstReplica, firstReceivedAtT1, firstReceivedAtT1) // Since t2 has seen new elected replica too, we should have non-zero "electedLastSeenTimestamp". {