Skip to content

Commit

Permalink
moved dicedb subscription reset logic to cleanup method and added syn…
Browse files Browse the repository at this point in the history
…chronization for running LivePrefix after subscription state is reset
  • Loading branch information
vpsinghg committed Nov 22, 2024
1 parent c984a58 commit b6c0e14
Showing 1 changed file with 10 additions and 5 deletions.
15 changes: 10 additions & 5 deletions cli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"os"
"os/exec"
"strings"
"sync"

"github.com/c-bata/go-prompt"
"github.com/dicedb/dicedb-go"
Expand All @@ -21,6 +22,7 @@ type DiceDBClient struct {
subCancel context.CancelFunc
addr string
password string
wg sync.WaitGroup
}

func Run(host string, port int) {
Expand Down Expand Up @@ -57,8 +59,6 @@ func Run(host string, port int) {
Key: prompt.ControlC,
Fn: func(buf *prompt.Buffer) {
if dicedbClient.subscribed {
fmt.Println("Exiting watch mode.")

dicedbClient.handleWatchModeExit()
} else {
handleExit()
Expand Down Expand Up @@ -175,9 +175,10 @@ func (c *DiceDBClient) handleWatchCommand(cmd string, args []string) {
}

func (c *DiceDBClient) handleWatchModeExit() {
fmt.Println("Exiting watch mode.")

c.subCancel()
c.subscribed = false
c.subType = ""
c.wg.Wait()
}

func (c *DiceDBClient) handleUnsubscribe() {
Expand Down Expand Up @@ -300,13 +301,17 @@ func (c *DiceDBClient) subscribe(channels []string) {
}

func (c *DiceDBClient) watchCommand(cmd string, args ...interface{}) {
c.wg.Add(1)
defer func() {
c.subscribed = false
c.subType = ""
if c.watchConn != nil {
c.watchConn.Close()
}
c.wg.Done()
}()

c.watchConn = c.client.WatchConn(c.subCtx)
defer c.watchConn.Close()

// Send the WATCH command
firstMsg, err := c.watchConn.Watch(c.subCtx, cmd, args...)
Expand Down

0 comments on commit b6c0e14

Please sign in to comment.