-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[DO NOT MERGE] Refactor - Move command execution responsibility to CommandHandler from IOThread #1358
base: master
Are you sure you want to change the base?
[DO NOT MERGE] Refactor - Move command execution responsibility to CommandHandler from IOThread #1358
Changes from all commits
16ab8bd
0ffb45d
d54d4bb
9b86beb
690430d
3e15eca
07ca2b7
c89b688
4aab95c
05ec942
cfaff73
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -29,6 +29,7 @@ import ( | |
"testing" | ||
"time" | ||
|
||
"github.com/dicedb/dice/internal/commandhandler" | ||
"github.com/dicedb/dice/internal/iothread" | ||
"github.com/dicedb/dice/internal/server/resp" | ||
"github.com/dicedb/dice/internal/wal" | ||
|
@@ -211,10 +212,12 @@ func RunTestServer(wg *sync.WaitGroup, opt TestServerOptions) { | |
cmdWatchSubscriptionChan := make(chan watchmanager.WatchSubscription) | ||
gec := make(chan error) | ||
shardManager := shard.NewShardManager(1, cmdWatchChan, gec) | ||
ioThreadManager := iothread.NewManager(20000, shardManager) | ||
ioThreadManager := iothread.NewManager(20000) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's use a config for this number. |
||
cmdHandlerManager := commandhandler.NewManager(20000, shardManager) | ||
|
||
// Initialize the RESP Server | ||
wl, _ := wal.NewNullWAL() | ||
testServer := resp.NewServer(shardManager, ioThreadManager, cmdWatchSubscriptionChan, cmdWatchChan, gec, wl) | ||
testServer := resp.NewServer(shardManager, ioThreadManager, cmdHandlerManager, cmdWatchSubscriptionChan, cmdWatchChan, gec, wl) | ||
|
||
ctx, cancel := context.WithCancel(context.Background()) | ||
fmt.Println("Starting the test server on port", config.DiceConfig.RespServer.Port) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,7 +14,7 @@ | |
// You should have received a copy of the GNU Affero General Public License | ||
// along with this program. If not, see <https://www.gnu.org/licenses/>. | ||
|
||
package iothread | ||
package commandhandler | ||
|
||
import ( | ||
"fmt" | ||
|
@@ -28,7 +28,7 @@ import ( | |
|
||
// RespAuth returns with an encoded "OK" if the user is authenticated | ||
// If the user is not authenticated, it returns with an encoded error message | ||
func (t *BaseIOThread) RespAuth(args []string) interface{} { | ||
func (h *BaseCommandHandler) RespAuth(args []string) interface{} { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would suggest we move this to the commandhandler.go file. |
||
// Check for incorrect number of arguments (arity error). | ||
if len(args) < 1 || len(args) > 2 { | ||
return diceerrors.ErrWrongArgumentCount("AUTH") | ||
|
@@ -47,7 +47,7 @@ func (t *BaseIOThread) RespAuth(args []string) interface{} { | |
username, password = args[0], args[1] | ||
} | ||
|
||
if err := t.Session.Validate(username, password); err != nil { | ||
if err := h.Session.Validate(username, password); err != nil { | ||
return err | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,7 +14,7 @@ | |
// You should have received a copy of the GNU Affero General Public License | ||
// along with this program. If not, see <https://www.gnu.org/licenses/>. | ||
|
||
package iothread | ||
package commandhandler | ||
|
||
import ( | ||
"context" | ||
|
@@ -213,12 +213,12 @@ const ( | |
|
||
type CmdMeta struct { | ||
CmdType | ||
Cmd string | ||
IOThreadHandler func([]string) []byte | ||
Cmd string | ||
CmdHandlerFunction func([]string) []byte | ||
|
||
// decomposeCommand is a function that takes a DiceDB command and breaks it down into smaller, | ||
// manageable DiceDB commands for each shard processing. It returns a slice of DiceDB commands. | ||
decomposeCommand func(ctx context.Context, thread *BaseIOThread, DiceDBCmd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error) | ||
decomposeCommand func(ctx context.Context, h *BaseCommandHandler, DiceDBCmd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you think these methods could just be defined on the BaseCommandHandler? We're passing a pointer to the command handler in each of these methods and I'm wondering if we could avoid that. |
||
|
||
// composeResponse is a function that combines multiple responses from the execution of commands | ||
// into a single response object. It accepts a variadic parameter of EvalResponse objects | ||
|
@@ -233,10 +233,10 @@ type CmdMeta struct { | |
|
||
// preProcessResponse is a function that handles the preprocessing of a DiceDB command by | ||
// preparing the necessary operations (e.g., fetching values from shards) before the command | ||
// is executed. It takes the io-thread and the original DiceDB command as parameters and | ||
// is executed. It takes the CommandHandler and the original DiceDB command as parameters and | ||
// ensures that any required information is retrieved and processed in advance. Use this when set | ||
// preProcessingReq = true. | ||
preProcessResponse func(thread *BaseIOThread, DiceDBCmd *cmd.DiceDBCmd) error | ||
preProcessResponse func(h *BaseCommandHandler, DiceDBCmd *cmd.DiceDBCmd) error | ||
} | ||
|
||
var CommandsMeta = map[string]CmdMeta{ | ||
|
@@ -691,8 +691,8 @@ func init() { | |
func validateCmdMeta(c string, meta CmdMeta) error { | ||
switch meta.CmdType { | ||
case Global: | ||
if meta.IOThreadHandler == nil { | ||
return fmt.Errorf("global command %s must have IOThreadHandler function", c) | ||
if meta.CmdHandlerFunction == nil { | ||
return fmt.Errorf("global command %s must have CmdHandlerFunction function", c) | ||
} | ||
case MultiShard, AllShard: | ||
if meta.decomposeCommand == nil || meta.composeResponse == nil { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,7 +14,7 @@ | |
// You should have received a copy of the GNU Affero General Public License | ||
// along with this program. If not, see <https://www.gnu.org/licenses/>. | ||
|
||
package iothread | ||
package commandhandler | ||
|
||
import ( | ||
"github.com/dicedb/dice/internal/cmd" | ||
|
@@ -25,13 +25,13 @@ import ( | |
// preProcessRename prepares the RENAME command for preprocessing by sending a GET command | ||
// to retrieve the value of the original key. The retrieved value is used later in the | ||
// decomposeRename function to delete the old key and set the new key. | ||
func preProcessRename(thread *BaseIOThread, diceDBCmd *cmd.DiceDBCmd) error { | ||
func preProcessRename(h *BaseCommandHandler, diceDBCmd *cmd.DiceDBCmd) error { | ||
if len(diceDBCmd.Args) < 2 { | ||
return diceerrors.ErrWrongArgumentCount("RENAME") | ||
} | ||
|
||
key := diceDBCmd.Args[0] | ||
sid, rc := thread.shardManager.GetShardInfo(key) | ||
sid, rc := h.shardManager.GetShardInfo(key) | ||
|
||
preCmd := cmd.DiceDBCmd{ | ||
Cmd: "RENAME", | ||
|
@@ -42,7 +42,7 @@ func preProcessRename(thread *BaseIOThread, diceDBCmd *cmd.DiceDBCmd) error { | |
SeqID: 0, | ||
RequestID: GenerateUniqueRequestID(), | ||
Cmd: &preCmd, | ||
IOThreadID: thread.id, | ||
CmdHandlerID: h.id, | ||
ShardID: sid, | ||
Client: nil, | ||
PreProcessing: true, | ||
|
@@ -54,12 +54,12 @@ func preProcessRename(thread *BaseIOThread, diceDBCmd *cmd.DiceDBCmd) error { | |
// preProcessCopy prepares the COPY command for preprocessing by sending a GET command | ||
// to retrieve the value of the original key. The retrieved value is used later in the | ||
// decomposeCopy function to copy the value to the destination key. | ||
func customProcessCopy(thread *BaseIOThread, diceDBCmd *cmd.DiceDBCmd) error { | ||
func customProcessCopy(h *BaseCommandHandler, diceDBCmd *cmd.DiceDBCmd) error { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same as the other comment, is it possible to define these methods on the command handler struct? |
||
if len(diceDBCmd.Args) < 2 { | ||
return diceerrors.ErrWrongArgumentCount("COPY") | ||
} | ||
|
||
sid, rc := thread.shardManager.GetShardInfo(diceDBCmd.Args[0]) | ||
sid, rc := h.shardManager.GetShardInfo(diceDBCmd.Args[0]) | ||
|
||
preCmd := cmd.DiceDBCmd{ | ||
Cmd: "COPY", | ||
|
@@ -71,7 +71,7 @@ func customProcessCopy(thread *BaseIOThread, diceDBCmd *cmd.DiceDBCmd) error { | |
SeqID: 0, | ||
RequestID: GenerateUniqueRequestID(), | ||
Cmd: &preCmd, | ||
IOThreadID: thread.id, | ||
CmdHandlerID: h.id, | ||
ShardID: sid, | ||
Client: nil, | ||
PreProcessing: true, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't this same as MaxClients?
Also since we do not plan to have more than 65536 client's can we have the type as
int16
instead?