Skip to content

Commit

Permalink
[REFACTOR] remove sql and query_manager
Browse files Browse the repository at this point in the history
  • Loading branch information
apoorvyadav1111 committed Dec 16, 2024
1 parent e8f2573 commit 86e2782
Show file tree
Hide file tree
Showing 15 changed files with 1 addition and 3,641 deletions.
9 changes: 1 addition & 8 deletions integration_tests/commands/http/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (

"github.com/dicedb/dice/config"
derrors "github.com/dicedb/dice/internal/errors"
"github.com/dicedb/dice/internal/querymanager"
"github.com/dicedb/dice/internal/shard"
dstore "github.com/dicedb/dice/internal/store"
)
Expand Down Expand Up @@ -131,7 +130,7 @@ func RunHTTPServer(ctx context.Context, wg *sync.WaitGroup, opt TestServerOption
globalErrChannel := make(chan error)
watchChan := make(chan dstore.QueryWatchEvent, config.DiceConfig.Performance.WatchChanBufSize)
shardManager := shard.NewShardManager(1, watchChan, nil, globalErrChannel)
queryWatcherLocal := querymanager.NewQueryManager()

config.DiceConfig.HTTP.Port = opt.Port
// Initialize the HTTPServer
testServer := httpws.NewHTTPServer(shardManager, nil)
Expand All @@ -144,12 +143,6 @@ func RunHTTPServer(ctx context.Context, wg *sync.WaitGroup, opt TestServerOption
shardManager.Run(shardManagerCtx)
}()

wg.Add(1)
go func() {
defer wg.Done()
queryWatcherLocal.Run(ctx, watchChan)
}()

// Start the server in a goroutine
wg.Add(1)
go func() {
Expand Down
9 changes: 0 additions & 9 deletions integration_tests/commands/websocket/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (

"github.com/dicedb/dice/config"
derrors "github.com/dicedb/dice/internal/errors"
"github.com/dicedb/dice/internal/querymanager"
"github.com/dicedb/dice/internal/shard"
dstore "github.com/dicedb/dice/internal/store"
"github.com/gorilla/websocket"
Expand Down Expand Up @@ -132,7 +131,6 @@ func RunWebsocketServer(ctx context.Context, wg *sync.WaitGroup, opt TestServerO
globalErrChannel := make(chan error)
watchChan := make(chan dstore.QueryWatchEvent, config.DiceConfig.Performance.WatchChanBufSize)
shardManager := shard.NewShardManager(1, watchChan, nil, globalErrChannel)
queryWatcherLocal := querymanager.NewQueryManager()
config.DiceConfig.WebSocket.Port = opt.Port
testServer := httpws.NewWebSocketServer(shardManager, testPort1, nil)
shardManagerCtx, cancelShardManager := context.WithCancel(ctx)
Expand All @@ -144,13 +142,6 @@ func RunWebsocketServer(ctx context.Context, wg *sync.WaitGroup, opt TestServerO
shardManager.Run(shardManagerCtx)
}()

// run query manager
wg.Add(1)
go func() {
defer wg.Done()
queryWatcherLocal.Run(ctx, watchChan)
}()

// start websocket server
wg.Add(1)
go func() {
Expand Down
11 changes: 0 additions & 11 deletions internal/clientio/resp.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ import (

"github.com/dicedb/dice/internal/object"

"github.com/dicedb/dice/internal/sql"

"github.com/dicedb/dice/internal/server/utils"
dstore "github.com/dicedb/dice/internal/store"
)
Expand Down Expand Up @@ -298,15 +296,6 @@ func Encode(value interface{}, isSimple bool) []byte {
buf.Write(Encode(fmt.Sprintf("key:%s", we.Key), false))
buf.Write(Encode(fmt.Sprintf("op:%s", we.Operation), false))
return []byte(fmt.Sprintf("*2\r\n%s", buf.Bytes()))
case []sql.QueryResultRow:
var b []byte
buf := bytes.NewBuffer(b) // Create a buffer for accumulating encoded rows.
for _, row := range value.([]sql.QueryResultRow) {
buf.WriteString("*2\r\n") // Start a new array for each row.
buf.Write(Encode(row.Key, false)) // Encode the row key.
buf.Write(Encode(row.Value.Value, false)) // Encode the row value.
}
return []byte(fmt.Sprintf("*%d\r\n%s", len(v), buf.Bytes())) // Return the encoded response.

// Handle map[string]bool and return a nil response indicating unsupported types.
case map[string]bool:
Expand Down
92 changes: 0 additions & 92 deletions internal/eval/eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,10 @@ import (
"strconv"
"time"

"github.com/dicedb/dice/internal/object"

"github.com/dicedb/dice/internal/sql"

"github.com/dicedb/dice/config"
"github.com/dicedb/dice/internal/clientio"
"github.com/dicedb/dice/internal/comm"
diceerrors "github.com/dicedb/dice/internal/errors"
"github.com/dicedb/dice/internal/querymanager"
dstore "github.com/dicedb/dice/internal/store"
)

Expand Down Expand Up @@ -189,90 +184,3 @@ func evalSLEEP(args []string, store *dstore.Store) []byte {
time.Sleep(time.Duration(durationSec) * time.Second)
return clientio.RespOK
}

// EvalQWATCH adds the specified key to the watch list for the caller client.
// Every time a key in the watch list is modified, the client will be sent a response
// containing the new value of the key along with the operation that was performed on it.
// Contains only one argument, the query to be watched.
func EvalQWATCH(args []string, httpOp, websocketOp bool, client *comm.Client, store *dstore.Store) []byte {
if len(args) != 1 {
return diceerrors.NewErrArity("Q.WATCH")
}

// Parse and get the selection from the query.
query, e := sql.ParseQuery( /*sql=*/ args[0])

if e != nil {
return clientio.Encode(e, false)
}

// use an unbuffered channel to ensure that we only proceed to query execution once the query watcher has built the cache
cacheChannel := make(chan *[]struct {
Key string
Value *object.Obj
})
var watchSubscription querymanager.QuerySubscription

if httpOp || websocketOp {
watchSubscription = querymanager.QuerySubscription{
Subscribe: true,
Query: query,
CacheChan: cacheChannel,
QwatchClientChan: client.HTTPQwatchResponseChan,
ClientIdentifierID: client.ClientIdentifierID,
}
} else {
watchSubscription = querymanager.QuerySubscription{
Subscribe: true,
Query: query,
ClientFD: client.Fd,
CacheChan: cacheChannel,
}
}

querymanager.QuerySubscriptionChan <- watchSubscription
store.CacheKeysForQuery(query.Where, cacheChannel)

// Return the result of the query.
responseChan := make(chan querymanager.AdhocQueryResult)
querymanager.AdhocQueryChan <- querymanager.AdhocQuery{
Query: query,
ResponseChan: responseChan,
}

queryResult := <-responseChan
if queryResult.Err != nil {
return clientio.Encode(queryResult.Err, false)
}

// TODO: We should return the list of all queries being watched by the client.
return clientio.Encode(querymanager.GenericWatchResponse(sql.Qwatch, query.String(), *queryResult.Result), false)
}

// EvalQUNWATCH removes the specified key from the watch list for the caller client.
func EvalQUNWATCH(args []string, httpOp bool, client *comm.Client) []byte {
if len(args) != 1 {
return diceerrors.NewErrArity("Q.UNWATCH")
}
query, e := sql.ParseQuery( /*sql=*/ args[0])
if e != nil {
return clientio.Encode(e, false)
}

if httpOp {
querymanager.QuerySubscriptionChan <- querymanager.QuerySubscription{
Subscribe: false,
Query: query,
QwatchClientChan: client.HTTPQwatchResponseChan,
ClientIdentifierID: client.ClientIdentifierID,
}
} else {
querymanager.QuerySubscriptionChan <- querymanager.QuerySubscription{
Subscribe: false,
Query: query,
ClientFD: client.Fd,
}
}

return clientio.RespOK
}
4 changes: 0 additions & 4 deletions internal/eval/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,6 @@ func (e *Eval) ExecuteCommand() *EvalResponse {
switch diceCmd.Name {
// Old implementation kept as it is, but we will be moving
// to the new implementation soon for all commands
case "SUBSCRIBE", "Q.WATCH":
return &EvalResponse{Result: EvalQWATCH(e.cmd.Args, e.isHTTPOperation, e.isWebSocketOperation, e.client, e.store), Error: nil}
case "UNSUBSCRIBE", "Q.UNWATCH":
return &EvalResponse{Result: EvalQUNWATCH(e.cmd.Args, e.isHTTPOperation, e.client), Error: nil}
case auth.Cmd:
return &EvalResponse{Result: EvalAUTH(e.cmd.Args, e.client), Error: nil}
case "ABORT":
Expand Down
Loading

0 comments on commit 86e2782

Please sign in to comment.