Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DO NOT MERGE] Refactor - Move command execution responsibility to CommandHandler from IOThread #1358

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ type performance struct {
ShardCronFrequency time.Duration `config:"shard_cron_frequency" default:"1s"`
MultiplexerPollTimeout time.Duration `config:"multiplexer_poll_timeout" default:"100ms"`
MaxClients int32 `config:"max_clients" default:"20000" validate:"min=0"`
MaxCmdHandlers int32 `config:"max_cmd_handlers" default:"20000" validate:"min=0"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this same as MaxClients?
Also since we do not plan to have more than 65536 client's can we have the type as int16 instead?

StoreMapInitSize int `config:"store_map_init_size" default:"1024000"`
AdhocReqChanBufSize int `config:"adhoc_req_chan_buf_size" default:"20"`
EnableProfiling bool `config:"profiling" default:"false"`
Expand Down
7 changes: 5 additions & 2 deletions integration_tests/commands/resp/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"testing"
"time"

"github.com/dicedb/dice/internal/commandhandler"
"github.com/dicedb/dice/internal/iothread"
"github.com/dicedb/dice/internal/server/resp"
"github.com/dicedb/dice/internal/wal"
Expand Down Expand Up @@ -211,10 +212,12 @@ func RunTestServer(wg *sync.WaitGroup, opt TestServerOptions) {
cmdWatchSubscriptionChan := make(chan watchmanager.WatchSubscription)
gec := make(chan error)
shardManager := shard.NewShardManager(1, cmdWatchChan, gec)
ioThreadManager := iothread.NewManager(20000, shardManager)
ioThreadManager := iothread.NewManager(20000)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use a config for this number.

cmdHandlerManager := commandhandler.NewManager(20000, shardManager)

// Initialize the RESP Server
wl, _ := wal.NewNullWAL()
testServer := resp.NewServer(shardManager, ioThreadManager, cmdWatchSubscriptionChan, cmdWatchChan, gec, wl)
testServer := resp.NewServer(shardManager, ioThreadManager, cmdHandlerManager, cmdWatchSubscriptionChan, cmdWatchChan, gec, wl)

ctx, cancel := context.WithCancel(context.Background())
fmt.Println("Starting the test server on port", config.DiceConfig.RespServer.Port)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

package iothread
package commandhandler

import (
"math"
Expand All @@ -24,7 +24,7 @@ import (
"github.com/dicedb/dice/internal/ops"
)

// This file contains functions used by the IOThread to handle and process responses
// This file contains functions used by the CommandHandler to handle and process responses
// from multiple shards during distributed operations. For commands that are executed
// across several shards, such as MultiShard commands, dedicated functions are responsible
// for aggregating and managing the results.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

package iothread
package commandhandler

import (
"fmt"
Expand All @@ -28,7 +28,7 @@ import (

// RespAuth returns with an encoded "OK" if the user is authenticated
// If the user is not authenticated, it returns with an encoded error message
func (t *BaseIOThread) RespAuth(args []string) interface{} {
func (h *BaseCommandHandler) RespAuth(args []string) interface{} {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest we move this to the commandhandler.go file.

// Check for incorrect number of arguments (arity error).
if len(args) < 1 || len(args) > 2 {
return diceerrors.ErrWrongArgumentCount("AUTH")
Expand All @@ -47,7 +47,7 @@ func (t *BaseIOThread) RespAuth(args []string) interface{} {
username, password = args[0], args[1]
}

if err := t.Session.Validate(username, password); err != nil {
if err := h.Session.Validate(username, password); err != nil {
return err
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

package iothread
package commandhandler

import (
"context"
Expand All @@ -28,7 +28,7 @@ import (
"github.com/dicedb/dice/internal/store"
)

// This file is utilized by the IOThread to decompose commands that need to be executed
// This file is utilized by the CommandHandler to decompose commands that need to be executed
// across multiple shards. For commands that operate on multiple keys or necessitate
// distribution across shards (e.g., MultiShard commands), a Breakup function is invoked
// to transform the original command into multiple smaller commands, each directed at
Expand All @@ -41,13 +41,13 @@ import (
// decomposeRename breaks down the RENAME command into separate DELETE and SET commands.
// It first waits for the result of a GET command from shards. If successful, it removes
// the old key using a DEL command and sets the new key with the retrieved value using a SET command.
func decomposeRename(ctx context.Context, thread *BaseIOThread, cd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error) {
func decomposeRename(ctx context.Context, h *BaseCommandHandler, cd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error) {
// Waiting for GET command response
var val string
select {
case <-ctx.Done():
slog.Error("IOThread timed out waiting for response from shards", slog.String("id", thread.id), slog.Any("error", ctx.Err()))
case preProcessedResp, ok := <-thread.preprocessingChan:
slog.Error("CommandHandler timed out waiting for response from shards", slog.String("id", h.id), slog.Any("error", ctx.Err()))
case preProcessedResp, ok := <-h.preprocessingChan:
if ok {
evalResp := preProcessedResp.EvalResponse
if evalResp.Error != nil {
Expand Down Expand Up @@ -85,13 +85,13 @@ func decomposeRename(ctx context.Context, thread *BaseIOThread, cd *cmd.DiceDBCm
// decomposeCopy breaks down the COPY command into a SET command that copies a value from
// one key to another. It first retrieves the value of the original key from shards, then
// sets the value to the destination key using a SET command.
func decomposeCopy(ctx context.Context, thread *BaseIOThread, cd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error) {
func decomposeCopy(ctx context.Context, h *BaseCommandHandler, cd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error) {
// Waiting for GET command response
var resp *ops.StoreResponse
select {
case <-ctx.Done():
slog.Error("IOThread timed out waiting for response from shards", slog.String("id", thread.id), slog.Any("error", ctx.Err()))
case preProcessedResp, ok := <-thread.preprocessingChan:
slog.Error("CommandHandler timed out waiting for response from shards", slog.String("id", h.id), slog.Any("error", ctx.Err()))
case preProcessedResp, ok := <-h.preprocessingChan:
if ok {
resp = preProcessedResp
}
Expand Down Expand Up @@ -124,7 +124,7 @@ func decomposeCopy(ctx context.Context, thread *BaseIOThread, cd *cmd.DiceDBCmd)
// decomposeMSet decomposes the MSET (Multi-set) command into individual SET commands.
// It expects an even number of arguments (key-value pairs). For each pair, it creates
// a separate SET command to store the value at the given key.
func decomposeMSet(_ context.Context, _ *BaseIOThread, cd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error) {
func decomposeMSet(_ context.Context, _ *BaseCommandHandler, cd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error) {
if len(cd.Args)%2 != 0 {
return nil, diceerrors.ErrWrongArgumentCount("MSET")
}
Expand All @@ -148,7 +148,7 @@ func decomposeMSet(_ context.Context, _ *BaseIOThread, cd *cmd.DiceDBCmd) ([]*cm
// decomposeMGet decomposes the MGET (Multi-get) command into individual GET commands.
// It expects a list of keys, and for each key, it creates a separate GET command to
// retrieve the value associated with that key.
func decomposeMGet(_ context.Context, _ *BaseIOThread, cd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error) {
func decomposeMGet(_ context.Context, _ *BaseCommandHandler, cd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error) {
if len(cd.Args) < 1 {
return nil, diceerrors.ErrWrongArgumentCount("MGET")
}
Expand All @@ -164,7 +164,7 @@ func decomposeMGet(_ context.Context, _ *BaseIOThread, cd *cmd.DiceDBCmd) ([]*cm
return decomposedCmds, nil
}

func decomposeSInter(_ context.Context, _ *BaseIOThread, cd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error) {
func decomposeSInter(_ context.Context, _ *BaseCommandHandler, cd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error) {
if len(cd.Args) < 1 {
return nil, diceerrors.ErrWrongArgumentCount("SINTER")
}
Expand All @@ -180,7 +180,7 @@ func decomposeSInter(_ context.Context, _ *BaseIOThread, cd *cmd.DiceDBCmd) ([]*
return decomposedCmds, nil
}

func decomposeSDiff(_ context.Context, _ *BaseIOThread, cd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error) {
func decomposeSDiff(_ context.Context, _ *BaseCommandHandler, cd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error) {
if len(cd.Args) < 1 {
return nil, diceerrors.ErrWrongArgumentCount("SDIFF")
}
Expand All @@ -196,7 +196,7 @@ func decomposeSDiff(_ context.Context, _ *BaseIOThread, cd *cmd.DiceDBCmd) ([]*c
return decomposedCmds, nil
}

func decomposeJSONMget(_ context.Context, _ *BaseIOThread, cd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error) {
func decomposeJSONMget(_ context.Context, _ *BaseCommandHandler, cd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error) {
if len(cd.Args) < 2 {
return nil, diceerrors.ErrWrongArgumentCount("JSON.MGET")
}
Expand All @@ -215,7 +215,7 @@ func decomposeJSONMget(_ context.Context, _ *BaseIOThread, cd *cmd.DiceDBCmd) ([
return decomposedCmds, nil
}

func decomposeTouch(_ context.Context, _ *BaseIOThread, cd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error) {
func decomposeTouch(_ context.Context, _ *BaseCommandHandler, cd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error) {
if len(cd.Args) == 0 {
return nil, diceerrors.ErrWrongArgumentCount("TOUCH")
}
Expand All @@ -232,13 +232,13 @@ func decomposeTouch(_ context.Context, _ *BaseIOThread, cd *cmd.DiceDBCmd) ([]*c
return decomposedCmds, nil
}

func decomposeDBSize(_ context.Context, thread *BaseIOThread, cd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error) {
func decomposeDBSize(_ context.Context, h *BaseCommandHandler, cd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error) {
if len(cd.Args) > 0 {
return nil, diceerrors.ErrWrongArgumentCount("DBSIZE")
}

decomposedCmds := make([]*cmd.DiceDBCmd, 0, len(cd.Args))
for i := uint8(0); i < uint8(thread.shardManager.GetShardCount()); i++ {
for i := uint8(0); i < uint8(h.shardManager.GetShardCount()); i++ {
decomposedCmds = append(decomposedCmds,
&cmd.DiceDBCmd{
Cmd: store.SingleShardSize,
Expand All @@ -249,13 +249,13 @@ func decomposeDBSize(_ context.Context, thread *BaseIOThread, cd *cmd.DiceDBCmd)
return decomposedCmds, nil
}

func decomposeKeys(_ context.Context, thread *BaseIOThread, cd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error) {
func decomposeKeys(_ context.Context, h *BaseCommandHandler, cd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error) {
if len(cd.Args) != 1 {
return nil, diceerrors.ErrWrongArgumentCount("KEYS")
}

decomposedCmds := make([]*cmd.DiceDBCmd, 0, len(cd.Args))
for i := uint8(0); i < uint8(thread.shardManager.GetShardCount()); i++ {
for i := uint8(0); i < uint8(h.shardManager.GetShardCount()); i++ {
decomposedCmds = append(decomposedCmds,
&cmd.DiceDBCmd{
Cmd: store.SingleShardKeys,
Expand All @@ -266,13 +266,13 @@ func decomposeKeys(_ context.Context, thread *BaseIOThread, cd *cmd.DiceDBCmd) (
return decomposedCmds, nil
}

func decomposeFlushDB(_ context.Context, thread *BaseIOThread, cd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error) {
func decomposeFlushDB(_ context.Context, h *BaseCommandHandler, cd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error) {
if len(cd.Args) > 1 {
return nil, diceerrors.ErrWrongArgumentCount("FLUSHDB")
}

decomposedCmds := make([]*cmd.DiceDBCmd, 0, len(cd.Args))
for i := uint8(0); i < uint8(thread.shardManager.GetShardCount()); i++ {
for i := uint8(0); i < uint8(h.shardManager.GetShardCount()); i++ {
decomposedCmds = append(decomposedCmds,
&cmd.DiceDBCmd{
Cmd: store.FlushDB,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

package iothread
package commandhandler

import (
"context"
Expand Down Expand Up @@ -213,12 +213,12 @@ const (

type CmdMeta struct {
CmdType
Cmd string
IOThreadHandler func([]string) []byte
Cmd string
CmdHandlerFunction func([]string) []byte

// decomposeCommand is a function that takes a DiceDB command and breaks it down into smaller,
// manageable DiceDB commands for each shard processing. It returns a slice of DiceDB commands.
decomposeCommand func(ctx context.Context, thread *BaseIOThread, DiceDBCmd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error)
decomposeCommand func(ctx context.Context, h *BaseCommandHandler, DiceDBCmd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think these methods could just be defined on the BaseCommandHandler? We're passing a pointer to the command handler in each of these methods and I'm wondering if we could avoid that.


// composeResponse is a function that combines multiple responses from the execution of commands
// into a single response object. It accepts a variadic parameter of EvalResponse objects
Expand All @@ -233,10 +233,10 @@ type CmdMeta struct {

// preProcessResponse is a function that handles the preprocessing of a DiceDB command by
// preparing the necessary operations (e.g., fetching values from shards) before the command
// is executed. It takes the io-thread and the original DiceDB command as parameters and
// is executed. It takes the CommandHandler and the original DiceDB command as parameters and
// ensures that any required information is retrieved and processed in advance. Use this when set
// preProcessingReq = true.
preProcessResponse func(thread *BaseIOThread, DiceDBCmd *cmd.DiceDBCmd) error
preProcessResponse func(h *BaseCommandHandler, DiceDBCmd *cmd.DiceDBCmd) error
}

var CommandsMeta = map[string]CmdMeta{
Expand Down Expand Up @@ -691,8 +691,8 @@ func init() {
func validateCmdMeta(c string, meta CmdMeta) error {
switch meta.CmdType {
case Global:
if meta.IOThreadHandler == nil {
return fmt.Errorf("global command %s must have IOThreadHandler function", c)
if meta.CmdHandlerFunction == nil {
return fmt.Errorf("global command %s must have CmdHandlerFunction function", c)
}
case MultiShard, AllShard:
if meta.decomposeCommand == nil || meta.composeResponse == nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

package iothread
package commandhandler

import (
"github.com/dicedb/dice/internal/cmd"
Expand All @@ -25,13 +25,13 @@ import (
// preProcessRename prepares the RENAME command for preprocessing by sending a GET command
// to retrieve the value of the original key. The retrieved value is used later in the
// decomposeRename function to delete the old key and set the new key.
func preProcessRename(thread *BaseIOThread, diceDBCmd *cmd.DiceDBCmd) error {
func preProcessRename(h *BaseCommandHandler, diceDBCmd *cmd.DiceDBCmd) error {
if len(diceDBCmd.Args) < 2 {
return diceerrors.ErrWrongArgumentCount("RENAME")
}

key := diceDBCmd.Args[0]
sid, rc := thread.shardManager.GetShardInfo(key)
sid, rc := h.shardManager.GetShardInfo(key)

preCmd := cmd.DiceDBCmd{
Cmd: "RENAME",
Expand All @@ -42,7 +42,7 @@ func preProcessRename(thread *BaseIOThread, diceDBCmd *cmd.DiceDBCmd) error {
SeqID: 0,
RequestID: GenerateUniqueRequestID(),
Cmd: &preCmd,
IOThreadID: thread.id,
CmdHandlerID: h.id,
ShardID: sid,
Client: nil,
PreProcessing: true,
Expand All @@ -54,12 +54,12 @@ func preProcessRename(thread *BaseIOThread, diceDBCmd *cmd.DiceDBCmd) error {
// preProcessCopy prepares the COPY command for preprocessing by sending a GET command
// to retrieve the value of the original key. The retrieved value is used later in the
// decomposeCopy function to copy the value to the destination key.
func customProcessCopy(thread *BaseIOThread, diceDBCmd *cmd.DiceDBCmd) error {
func customProcessCopy(h *BaseCommandHandler, diceDBCmd *cmd.DiceDBCmd) error {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as the other comment, is it possible to define these methods on the command handler struct?

if len(diceDBCmd.Args) < 2 {
return diceerrors.ErrWrongArgumentCount("COPY")
}

sid, rc := thread.shardManager.GetShardInfo(diceDBCmd.Args[0])
sid, rc := h.shardManager.GetShardInfo(diceDBCmd.Args[0])

preCmd := cmd.DiceDBCmd{
Cmd: "COPY",
Expand All @@ -71,7 +71,7 @@ func customProcessCopy(thread *BaseIOThread, diceDBCmd *cmd.DiceDBCmd) error {
SeqID: 0,
RequestID: GenerateUniqueRequestID(),
Cmd: &preCmd,
IOThreadID: thread.id,
CmdHandlerID: h.id,
ShardID: sid,
Client: nil,
PreProcessing: true,
Expand Down
Loading
Loading