Skip to content

Commit

Permalink
refactored to separate commandHandler from ioThread
Browse files Browse the repository at this point in the history
  • Loading branch information
psrvere committed Dec 4, 2024
1 parent 41d4918 commit 16ab8bd
Show file tree
Hide file tree
Showing 19 changed files with 782 additions and 647 deletions.
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ type performance struct {
ShardCronFrequency time.Duration `config:"shard_cron_frequency" default:"1s"`
MultiplexerPollTimeout time.Duration `config:"multiplexer_poll_timeout" default:"100ms"`
MaxClients int32 `config:"max_clients" default:"20000" validate:"min=0"`
MaxCmdHandlers int32 `config:"max_cmd_handlers" default:"20000" validate:"min=0"`
StoreMapInitSize int `config:"store_map_init_size" default:"1024000"`
AdhocReqChanBufSize int `config:"adhoc_req_chan_buf_size" default:"20"`
EnableProfiling bool `config:"profiling" default:"false"`
Expand Down
7 changes: 5 additions & 2 deletions integration_tests/commands/resp/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"testing"
"time"

"github.com/dicedb/dice/internal/commandhandler"
"github.com/dicedb/dice/internal/iothread"
"github.com/dicedb/dice/internal/server/resp"
"github.com/dicedb/dice/internal/wal"
Expand Down Expand Up @@ -196,10 +197,12 @@ func RunTestServer(wg *sync.WaitGroup, opt TestServerOptions) {
cmdWatchSubscriptionChan := make(chan watchmanager.WatchSubscription)
gec := make(chan error)
shardManager := shard.NewShardManager(1, queryWatchChan, cmdWatchChan, gec)
ioThreadManager := iothread.NewManager(20000, shardManager)
ioThreadManager := iothread.NewManager(20000)
cmdHandlerManager := commandhandler.NewManager(20000, shardManager)

// Initialize the RESP Server
wl, _ := wal.NewNullWAL()
testServer := resp.NewServer(shardManager, ioThreadManager, cmdWatchSubscriptionChan, cmdWatchChan, gec, wl)
testServer := resp.NewServer(shardManager, ioThreadManager, cmdHandlerManager, cmdWatchSubscriptionChan, cmdWatchChan, gec, wl)

ctx, cancel := context.WithCancel(context.Background())
fmt.Println("Starting the test server on port", config.DiceConfig.RespServer.Port)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package iothread
package commandhandler

import (
"math"
Expand All @@ -8,7 +8,7 @@ import (
"github.com/dicedb/dice/internal/ops"
)

// This file contains functions used by the IOThread to handle and process responses
// This file contains functions used by the CommandHandler to handle and process responses
// from multiple shards during distributed operations. For commands that are executed
// across several shards, such as MultiShard commands, dedicated functions are responsible
// for aggregating and managing the results.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package iothread
package commandhandler

import (
"fmt"
Expand All @@ -12,7 +12,7 @@ import (

// RespAuth returns with an encoded "OK" if the user is authenticated
// If the user is not authenticated, it returns with an encoded error message
func (t *BaseIOThread) RespAuth(args []string) interface{} {
func (h *BaseCommandHandler) RespAuth(args []string) interface{} {
// Check for incorrect number of arguments (arity error).
if len(args) < 1 || len(args) > 2 {
return diceerrors.ErrWrongArgumentCount("AUTH")
Expand All @@ -31,7 +31,7 @@ func (t *BaseIOThread) RespAuth(args []string) interface{} {
username, password = args[0], args[1]
}

if err := t.Session.Validate(username, password); err != nil {
if err := h.Session.Validate(username, password); err != nil {
return err
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package iothread
package commandhandler

import (
"context"
Expand All @@ -12,7 +12,7 @@ import (
"github.com/dicedb/dice/internal/store"
)

// This file is utilized by the IOThread to decompose commands that need to be executed
// This file is utilized by the CommandHandler to decompose commands that need to be executed
// across multiple shards. For commands that operate on multiple keys or necessitate
// distribution across shards (e.g., MultiShard commands), a Breakup function is invoked
// to transform the original command into multiple smaller commands, each directed at
Expand All @@ -25,13 +25,13 @@ import (
// decomposeRename breaks down the RENAME command into separate DELETE and SET commands.
// It first waits for the result of a GET command from shards. If successful, it removes
// the old key using a DEL command and sets the new key with the retrieved value using a SET command.
func decomposeRename(ctx context.Context, thread *BaseIOThread, cd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error) {
func decomposeRename(ctx context.Context, h *BaseCommandHandler, cd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error) {
// Waiting for GET command response
var val string
select {
case <-ctx.Done():
slog.Error("IOThread timed out waiting for response from shards", slog.String("id", thread.id), slog.Any("error", ctx.Err()))
case preProcessedResp, ok := <-thread.preprocessingChan:
slog.Error("CommandHandler timed out waiting for response from shards", slog.String("id", h.id), slog.Any("error", ctx.Err()))
case preProcessedResp, ok := <-h.preprocessingChan:
if ok {
evalResp := preProcessedResp.EvalResponse
if evalResp.Error != nil {
Expand Down Expand Up @@ -69,13 +69,13 @@ func decomposeRename(ctx context.Context, thread *BaseIOThread, cd *cmd.DiceDBCm
// decomposeCopy breaks down the COPY command into a SET command that copies a value from
// one key to another. It first retrieves the value of the original key from shards, then
// sets the value to the destination key using a SET command.
func decomposeCopy(ctx context.Context, thread *BaseIOThread, cd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error) {
func decomposeCopy(ctx context.Context, h *BaseCommandHandler, cd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error) {
// Waiting for GET command response
var resp *ops.StoreResponse
select {
case <-ctx.Done():
slog.Error("IOThread timed out waiting for response from shards", slog.String("id", thread.id), slog.Any("error", ctx.Err()))
case preProcessedResp, ok := <-thread.preprocessingChan:
slog.Error("CommandHandler timed out waiting for response from shards", slog.String("id", h.id), slog.Any("error", ctx.Err()))
case preProcessedResp, ok := <-h.preprocessingChan:
if ok {
resp = preProcessedResp
}
Expand Down Expand Up @@ -108,7 +108,7 @@ func decomposeCopy(ctx context.Context, thread *BaseIOThread, cd *cmd.DiceDBCmd)
// decomposeMSet decomposes the MSET (Multi-set) command into individual SET commands.
// It expects an even number of arguments (key-value pairs). For each pair, it creates
// a separate SET command to store the value at the given key.
func decomposeMSet(_ context.Context, _ *BaseIOThread, cd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error) {
func decomposeMSet(_ context.Context, _ *BaseCommandHandler, cd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error) {
if len(cd.Args)%2 != 0 {
return nil, diceerrors.ErrWrongArgumentCount("MSET")
}
Expand All @@ -132,7 +132,7 @@ func decomposeMSet(_ context.Context, _ *BaseIOThread, cd *cmd.DiceDBCmd) ([]*cm
// decomposeMGet decomposes the MGET (Multi-get) command into individual GET commands.
// It expects a list of keys, and for each key, it creates a separate GET command to
// retrieve the value associated with that key.
func decomposeMGet(_ context.Context, _ *BaseIOThread, cd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error) {
func decomposeMGet(_ context.Context, _ *BaseCommandHandler, cd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error) {
if len(cd.Args) < 1 {
return nil, diceerrors.ErrWrongArgumentCount("MGET")
}
Expand All @@ -148,7 +148,7 @@ func decomposeMGet(_ context.Context, _ *BaseIOThread, cd *cmd.DiceDBCmd) ([]*cm
return decomposedCmds, nil
}

func decomposeSInter(_ context.Context, _ *BaseIOThread, cd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error) {
func decomposeSInter(_ context.Context, _ *BaseCommandHandler, cd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error) {
if len(cd.Args) < 1 {
return nil, diceerrors.ErrWrongArgumentCount("SINTER")
}
Expand All @@ -164,7 +164,7 @@ func decomposeSInter(_ context.Context, _ *BaseIOThread, cd *cmd.DiceDBCmd) ([]*
return decomposedCmds, nil
}

func decomposeSDiff(_ context.Context, _ *BaseIOThread, cd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error) {
func decomposeSDiff(_ context.Context, _ *BaseCommandHandler, cd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error) {
if len(cd.Args) < 1 {
return nil, diceerrors.ErrWrongArgumentCount("SDIFF")
}
Expand All @@ -180,7 +180,7 @@ func decomposeSDiff(_ context.Context, _ *BaseIOThread, cd *cmd.DiceDBCmd) ([]*c
return decomposedCmds, nil
}

func decomposeJSONMget(_ context.Context, _ *BaseIOThread, cd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error) {
func decomposeJSONMget(_ context.Context, _ *BaseCommandHandler, cd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error) {
if len(cd.Args) < 2 {
return nil, diceerrors.ErrWrongArgumentCount("JSON.MGET")
}
Expand All @@ -199,7 +199,7 @@ func decomposeJSONMget(_ context.Context, _ *BaseIOThread, cd *cmd.DiceDBCmd) ([
return decomposedCmds, nil
}

func decomposeTouch(_ context.Context, _ *BaseIOThread, cd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error) {
func decomposeTouch(_ context.Context, _ *BaseCommandHandler, cd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error) {
if len(cd.Args) == 0 {
return nil, diceerrors.ErrWrongArgumentCount("TOUCH")
}
Expand All @@ -216,13 +216,13 @@ func decomposeTouch(_ context.Context, _ *BaseIOThread, cd *cmd.DiceDBCmd) ([]*c
return decomposedCmds, nil
}

func decomposeDBSize(_ context.Context, thread *BaseIOThread, cd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error) {
func decomposeDBSize(_ context.Context, h *BaseCommandHandler, cd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error) {
if len(cd.Args) > 0 {
return nil, diceerrors.ErrWrongArgumentCount("DBSIZE")
}

decomposedCmds := make([]*cmd.DiceDBCmd, 0, len(cd.Args))
for i := uint8(0); i < uint8(thread.shardManager.GetShardCount()); i++ {
for i := uint8(0); i < uint8(h.shardManager.GetShardCount()); i++ {
decomposedCmds = append(decomposedCmds,
&cmd.DiceDBCmd{
Cmd: store.SingleShardSize,
Expand All @@ -233,13 +233,13 @@ func decomposeDBSize(_ context.Context, thread *BaseIOThread, cd *cmd.DiceDBCmd)
return decomposedCmds, nil
}

func decomposeKeys(_ context.Context, thread *BaseIOThread, cd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error) {
func decomposeKeys(_ context.Context, h *BaseCommandHandler, cd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error) {
if len(cd.Args) != 1 {
return nil, diceerrors.ErrWrongArgumentCount("KEYS")
}

decomposedCmds := make([]*cmd.DiceDBCmd, 0, len(cd.Args))
for i := uint8(0); i < uint8(thread.shardManager.GetShardCount()); i++ {
for i := uint8(0); i < uint8(h.shardManager.GetShardCount()); i++ {
decomposedCmds = append(decomposedCmds,
&cmd.DiceDBCmd{
Cmd: store.SingleShardKeys,
Expand All @@ -250,13 +250,13 @@ func decomposeKeys(_ context.Context, thread *BaseIOThread, cd *cmd.DiceDBCmd) (
return decomposedCmds, nil
}

func decomposeFlushDB(_ context.Context, thread *BaseIOThread, cd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error) {
func decomposeFlushDB(_ context.Context, h *BaseCommandHandler, cd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error) {
if len(cd.Args) > 1 {
return nil, diceerrors.ErrWrongArgumentCount("FLUSHDB")
}

decomposedCmds := make([]*cmd.DiceDBCmd, 0, len(cd.Args))
for i := uint8(0); i < uint8(thread.shardManager.GetShardCount()); i++ {
for i := uint8(0); i < uint8(h.shardManager.GetShardCount()); i++ {
decomposedCmds = append(decomposedCmds,
&cmd.DiceDBCmd{
Cmd: store.FlushDB,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package iothread
package commandhandler

import (
"context"
Expand Down Expand Up @@ -191,12 +191,12 @@ const (

type CmdMeta struct {
CmdType
Cmd string
IOThreadHandler func([]string) []byte
Cmd string
CmdHandlerFunction func([]string) []byte

// decomposeCommand is a function that takes a DiceDB command and breaks it down into smaller,
// manageable DiceDB commands for each shard processing. It returns a slice of DiceDB commands.
decomposeCommand func(ctx context.Context, thread *BaseIOThread, DiceDBCmd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error)
decomposeCommand func(ctx context.Context, h *BaseCommandHandler, DiceDBCmd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error)

// composeResponse is a function that combines multiple responses from the execution of commands
// into a single response object. It accepts a variadic parameter of EvalResponse objects
Expand All @@ -211,10 +211,10 @@ type CmdMeta struct {

// preProcessResponse is a function that handles the preprocessing of a DiceDB command by
// preparing the necessary operations (e.g., fetching values from shards) before the command
// is executed. It takes the io-thread and the original DiceDB command as parameters and
// is executed. It takes the CommandHandler and the original DiceDB command as parameters and
// ensures that any required information is retrieved and processed in advance. Use this when set
// preProcessingReq = true.
preProcessResponse func(thread *BaseIOThread, DiceDBCmd *cmd.DiceDBCmd) error
preProcessResponse func(h *BaseCommandHandler, DiceDBCmd *cmd.DiceDBCmd) error
}

var CommandsMeta = map[string]CmdMeta{
Expand Down Expand Up @@ -651,8 +651,8 @@ func init() {
func validateCmdMeta(c string, meta CmdMeta) error {
switch meta.CmdType {
case Global:
if meta.IOThreadHandler == nil {
return fmt.Errorf("global command %s must have IOThreadHandler function", c)
if meta.CmdHandlerFunction == nil {
return fmt.Errorf("global command %s must have CmdHandlerFunction function", c)
}
case MultiShard, AllShard:
if meta.decomposeCommand == nil || meta.composeResponse == nil {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package iothread
package commandhandler

import (
"github.com/dicedb/dice/internal/cmd"
Expand All @@ -9,13 +9,13 @@ import (
// preProcessRename prepares the RENAME command for preprocessing by sending a GET command
// to retrieve the value of the original key. The retrieved value is used later in the
// decomposeRename function to delete the old key and set the new key.
func preProcessRename(thread *BaseIOThread, diceDBCmd *cmd.DiceDBCmd) error {
func preProcessRename(h *BaseCommandHandler, diceDBCmd *cmd.DiceDBCmd) error {
if len(diceDBCmd.Args) < 2 {
return diceerrors.ErrWrongArgumentCount("RENAME")
}

key := diceDBCmd.Args[0]
sid, rc := thread.shardManager.GetShardInfo(key)
sid, rc := h.shardManager.GetShardInfo(key)

preCmd := cmd.DiceDBCmd{
Cmd: "RENAME",
Expand All @@ -26,7 +26,7 @@ func preProcessRename(thread *BaseIOThread, diceDBCmd *cmd.DiceDBCmd) error {
SeqID: 0,
RequestID: GenerateUniqueRequestID(),
Cmd: &preCmd,
IOThreadID: thread.id,
CmdHandlerID: h.id,
ShardID: sid,
Client: nil,
PreProcessing: true,
Expand All @@ -38,12 +38,12 @@ func preProcessRename(thread *BaseIOThread, diceDBCmd *cmd.DiceDBCmd) error {
// preProcessCopy prepares the COPY command for preprocessing by sending a GET command
// to retrieve the value of the original key. The retrieved value is used later in the
// decomposeCopy function to copy the value to the destination key.
func customProcessCopy(thread *BaseIOThread, diceDBCmd *cmd.DiceDBCmd) error {
func customProcessCopy(h *BaseCommandHandler, diceDBCmd *cmd.DiceDBCmd) error {
if len(diceDBCmd.Args) < 2 {
return diceerrors.ErrWrongArgumentCount("COPY")
}

sid, rc := thread.shardManager.GetShardInfo(diceDBCmd.Args[0])
sid, rc := h.shardManager.GetShardInfo(diceDBCmd.Args[0])

preCmd := cmd.DiceDBCmd{
Cmd: "COPY",
Expand All @@ -55,7 +55,7 @@ func customProcessCopy(thread *BaseIOThread, diceDBCmd *cmd.DiceDBCmd) error {
SeqID: 0,
RequestID: GenerateUniqueRequestID(),
Cmd: &preCmd,
IOThreadID: thread.id,
CmdHandlerID: h.id,
ShardID: sid,
Client: nil,
PreProcessing: true,
Expand Down
Loading

0 comments on commit 16ab8bd

Please sign in to comment.