From 0a15a19c9d1a93bba2c576b9fd10a655aea2da15 Mon Sep 17 00:00:00 2001 From: Prashant Shubham Date: Wed, 27 Nov 2024 04:53:57 +0530 Subject: [PATCH] Fix: keep fingerprint until all clients unsubscribe (#1335) --- .../commands/resp/getwatch_test.go | 62 +++++++++++++++++++ internal/watchmanager/watch_manager.go | 3 + 2 files changed, 65 insertions(+) diff --git a/integration_tests/commands/resp/getwatch_test.go b/integration_tests/commands/resp/getwatch_test.go index 82cbdc416..177007e12 100644 --- a/integration_tests/commands/resp/getwatch_test.go +++ b/integration_tests/commands/resp/getwatch_test.go @@ -281,3 +281,65 @@ func TestGETWATCHWithLabelWithSDK(t *testing.T) { unsubscribeFromWatchUpdatesSDK(t, subscribers, "GET", getWatchWithLabelTestCases[0].fingerprint) unsubscribeFromWatchUpdatesSDK(t, subscribers, "GET", getWatchWithLabelTestCases[1].fingerprint) } + +func TestGETWATCHWhenClientUnsubscribe(t *testing.T) { + publisher := getLocalSdk() + subscribers := []WatchSubscriber{{client: getLocalSdk()}, {client: getLocalSdk()}, {client: getLocalSdk()}} + + defer func() { + err := ClosePublisherSubscribersSDK(publisher, subscribers) + assert.Nil(t, err) + }() + + publisher.Del(context.Background(), getWatchKey) + + channels := make([]<-chan *dicedb.WatchResult, len(subscribers)) + for i, subscriber := range subscribers { + // subscribe to updates + watch := subscriber.client.WatchConn(context.Background()) + subscribers[i].watch = watch + assert.True(t, watch != nil) + + firstMsg, err := watch.Watch(context.Background(), "GET", getWatchKey) + assert.Nil(t, err) + assert.Equal(t, "2714318480", firstMsg.Fingerprint) + + channels[i] = watch.Channel() + } + + for _, tc := range getWatchTestCases { + err := publisher.Set(context.Background(), tc.key, tc.val, 0).Err() + assert.Nil(t, err) + + for _, channel := range channels { + v := <-channel + assert.Equal(t, "GET", v.Command) // command + assert.Equal(t, "2714318480", v.Fingerprint) // Fingerprint + assert.Equal(t, tc.val, v.Data.(string)) // data + } + } + + // unsubscribe first 2 clients from updates + unsubscribeFromWatchUpdatesSDK(t, subscribers[:2], "GET", "2714318480") + // Trigger an update and verify only the third subscriber receives it + err := publisher.Set(context.Background(), getWatchKey, "new-updated-val", 0).Err() + assert.Nil(t, err) + + // Assert other unsubscribed clients don't get any updates + for _, channel := range channels[:2] { + select { + case v := <-channel: + assert.Fail(t, fmt.Sprintf("Unexpected update received: %v", v)) + case <-time.After(100 * time.Millisecond): + // This is the expected behavior - no response within the timeout + } + } + + lastMsg := <-channels[2] + assert.Equal(t, "GET", lastMsg.Command) + assert.Equal(t, "2714318480", lastMsg.Fingerprint) + assert.Equal(t, "new-updated-val", lastMsg.Data.(string)) + + // Unsubscribe all clients + unsubscribeFromWatchUpdatesSDK(t, subscribers, "GET", getWatchKey) +} diff --git a/internal/watchmanager/watch_manager.go b/internal/watchmanager/watch_manager.go index f0d765316..577f171c1 100644 --- a/internal/watchmanager/watch_manager.go +++ b/internal/watchmanager/watch_manager.go @@ -111,6 +111,9 @@ func (m *Manager) handleUnsubscription(sub WatchSubscription) { if len(clients) == 0 { // Remove the fingerprint from tcpSubscriptionMap delete(m.tcpSubscriptionMap, fingerprint) + } else { + // Other clients still subscribed, no need to remove the fingerprint altogether + return } }