Skip to content

Commit

Permalink
Fix: keep fingerprint until all clients unsubscribe (DiceDB#1335)
Browse files Browse the repository at this point in the history
  • Loading branch information
lucifercr07 authored Nov 26, 2024
1 parent d092ed0 commit 0a15a19
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 0 deletions.
62 changes: 62 additions & 0 deletions integration_tests/commands/resp/getwatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,3 +281,65 @@ func TestGETWATCHWithLabelWithSDK(t *testing.T) {
unsubscribeFromWatchUpdatesSDK(t, subscribers, "GET", getWatchWithLabelTestCases[0].fingerprint)
unsubscribeFromWatchUpdatesSDK(t, subscribers, "GET", getWatchWithLabelTestCases[1].fingerprint)
}

func TestGETWATCHWhenClientUnsubscribe(t *testing.T) {
publisher := getLocalSdk()
subscribers := []WatchSubscriber{{client: getLocalSdk()}, {client: getLocalSdk()}, {client: getLocalSdk()}}

defer func() {
err := ClosePublisherSubscribersSDK(publisher, subscribers)
assert.Nil(t, err)
}()

publisher.Del(context.Background(), getWatchKey)

channels := make([]<-chan *dicedb.WatchResult, len(subscribers))
for i, subscriber := range subscribers {
// subscribe to updates
watch := subscriber.client.WatchConn(context.Background())
subscribers[i].watch = watch
assert.True(t, watch != nil)

firstMsg, err := watch.Watch(context.Background(), "GET", getWatchKey)
assert.Nil(t, err)
assert.Equal(t, "2714318480", firstMsg.Fingerprint)

channels[i] = watch.Channel()
}

for _, tc := range getWatchTestCases {
err := publisher.Set(context.Background(), tc.key, tc.val, 0).Err()
assert.Nil(t, err)

for _, channel := range channels {
v := <-channel
assert.Equal(t, "GET", v.Command) // command
assert.Equal(t, "2714318480", v.Fingerprint) // Fingerprint
assert.Equal(t, tc.val, v.Data.(string)) // data
}
}

// unsubscribe first 2 clients from updates
unsubscribeFromWatchUpdatesSDK(t, subscribers[:2], "GET", "2714318480")
// Trigger an update and verify only the third subscriber receives it
err := publisher.Set(context.Background(), getWatchKey, "new-updated-val", 0).Err()
assert.Nil(t, err)

// Assert other unsubscribed clients don't get any updates
for _, channel := range channels[:2] {
select {
case v := <-channel:
assert.Fail(t, fmt.Sprintf("Unexpected update received: %v", v))
case <-time.After(100 * time.Millisecond):
// This is the expected behavior - no response within the timeout
}
}

lastMsg := <-channels[2]
assert.Equal(t, "GET", lastMsg.Command)
assert.Equal(t, "2714318480", lastMsg.Fingerprint)
assert.Equal(t, "new-updated-val", lastMsg.Data.(string))

// Unsubscribe all clients
unsubscribeFromWatchUpdatesSDK(t, subscribers, "GET", getWatchKey)
}
3 changes: 3 additions & 0 deletions internal/watchmanager/watch_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ func (m *Manager) handleUnsubscription(sub WatchSubscription) {
if len(clients) == 0 {
// Remove the fingerprint from tcpSubscriptionMap
delete(m.tcpSubscriptionMap, fingerprint)
} else {
// Other clients still subscribed, no need to remove the fingerprint altogether
return
}
}

Expand Down

0 comments on commit 0a15a19

Please sign in to comment.