-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[DO NOT MERGE] Refactor - Move command execution responsibility to CommandHandler from IOThread #1358
base: master
Are you sure you want to change the base?
Conversation
@soumya-codes @lucifercr07 - please review. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code changes look great Prateek. I have done a lot of nit picking around naming convention. Please feel free to open a separate PR to address them.
@@ -136,6 +136,7 @@ type performance struct { | |||
ShardCronFrequency time.Duration `config:"shard_cron_frequency" default:"1s"` | |||
MultiplexerPollTimeout time.Duration `config:"multiplexer_poll_timeout" default:"100ms"` | |||
MaxClients int32 `config:"max_clients" default:"20000" validate:"min=0"` | |||
MaxCmdHandlers int32 `config:"max_cmd_handlers" default:"20000" validate:"min=0"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't this same as MaxClients?
Also since we do not plan to have more than 65536 client's can we have the type as int16
instead?
"github.com/dicedb/dice/internal/shard" | ||
) | ||
|
||
type Manager struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given the behaviour, I think we got naming convention for Manager wrong in DiceDB. What do you think about Registrar/Registry?
func (m *Manager) UnregisterCommandHandler(id string) error { | ||
m.ShardManager.UnregisterCommandHandler(id) | ||
if cmdHandler, loaded := m.activeCmdHandlers.LoadAndDelete(id); loaded { | ||
ch := cmdHandler.(*BaseCommandHandler) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can use a comma, ok idiom here to be safe. Not needed here, but it will be failproof for future code changes.
id string | ||
parser requestparser.Parser | ||
shardManager *shard.ShardManager | ||
adhocReqChan chan *cmd.DiceDBCmd |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since you are refactoring this I would request you to change the name of adhocReqChan
field to something more meaningful like watchReqChan
.
} | ||
|
||
type BaseCommandHandler struct { | ||
CommandHandler |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please make sure the structures are aligned in a way that they add lowest memory footprint..
@@ -12,7 +12,7 @@ import ( | |||
|
|||
// RespAuth returns with an encoded "OK" if the user is authenticated | |||
// If the user is not authenticated, it returns with an encoded error message | |||
func (t *BaseIOThread) RespAuth(args []string) interface{} { | |||
func (h *BaseCommandHandler) RespAuth(args []string) interface{} { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would suggest we move this to the commandhandler.go file.
const defaultRequestTimeout = 6 * time.Second | ||
|
||
var requestCounter uint32 | ||
|
||
// IOThread interface | ||
type IOThread interface { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the Handler
naming convention. It follows Go naming convention, where the data-structure names are encouraged to be verbs name. Based on this should we rename IOThread to IOHandler?
@@ -21,10 +18,9 @@ var ( | |||
ErrIOThreadNotFound = errors.New("io-thread not found") | |||
) | |||
|
|||
func NewManager(maxClients int32, sm *shard.ShardManager) *Manager { | |||
func NewManager(maxClients int32) *Manager { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as earlier, what do you think of naming Manager to Registar/Registry? This name provides the precise nature of the work done by this type.
@@ -104,15 +104,16 @@ func (manager *ShardManager) GetShard(id ShardID) *ShardThread { | |||
return nil | |||
} | |||
|
|||
// RegisterIOThread registers a io-thread with all Shards present in the ShardManager. | |||
func (manager *ShardManager) RegisterIOThread(id string, request, processing chan *ops.StoreResponse) { | |||
// RegisterCommandHandler registers a command handler with all Shards present in the ShardManager. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as earlier, what do you think of naming Manager to Registar/Registry? This name provides the precise nature of the work done by this type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, minor comments.
func GenerateUniqueIOThreadID() string { | ||
count := atomic.AddUint64(&ioThreadCounter, 1) | ||
timestamp := time.Now().UnixNano()/int64(time.Millisecond) - startTime | ||
return fmt.Sprintf("W-%d-%d", timestamp, count) | ||
} | ||
|
||
func GenerateUniqueCommandHandlerID() string { | ||
count := atomic.AddUint64(&cmdHandlerCounter, 1) | ||
timestamp := time.Now().UnixNano()/int64(time.Millisecond) - startTime | ||
return fmt.Sprintf("W-%d-%d", timestamp, count) | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we optimise this as below? Also any reasons why we're keeping ID format for cmdHandler and IOThread same?
func GenerateUniqueID(prefix string, counter *uint64) string {
count := atomic.AddUint64(counter, 1)
timestamp := time.Now().UnixMilli() - startTime
return fmt.Sprintf("%s-%d-%d", prefix, timestamp, count)
}
func GenerateUniqueIOThreadID() string {
return GenerateUniqueID("W", &ioThreadCounter)
}
func GenerateUniqueCommandHandlerID() string {
return GenerateUniqueID("W", &cmdHandlerCounter)
}
m.mu.Lock() | ||
defer m.mu.Unlock() | ||
|
||
if m.CommandHandlerCount() >= m.maxCmdHandlers { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When this returns error we've already registered an iothread, may be we can move this check earlier?
Also this'd initiate a server shutdown I believe would that be okay? Shouldn't we just drop the new client gracefully and continue server ops?
|
||
type Manager struct { | ||
activeCmdHandlers sync.Map | ||
numCmdHandlers atomic.Int32 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we use atomic.Uint32
instead?
type Manager struct { | ||
activeCmdHandlers sync.Map | ||
numCmdHandlers atomic.Int32 | ||
maxCmdHandlers int32 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above uint32
?
if responseChan != nil && preprocessingChan != nil { | ||
m.ShardManager.RegisterCommandHandler(cmdHandler.ID(), responseChan, preprocessingChan) // TODO: Change responseChan type to ShardResponse | ||
} else if responseChan != nil && preprocessingChan == nil { | ||
m.ShardManager.RegisterCommandHandler(cmdHandler.ID(), responseChan, nil) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we change it to?
m.ShardManager.RegisterCommandHandler(cmdHandler.ID(),cmdHandler.responseChan,cmdHandler.preprocessingChan)
responseChan := cmdHandler.responseChan | ||
preprocessingChan := cmdHandler.preprocessingChan |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
May be we can have checks in handler registration to mandate responseChan
presence?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the changes, minor comments.
TODO
[ ] Investigate CopyTest failure
[ ] Investigate SDK tests failure
This PR is a preparatory step towards solving nearly 50-60% CPU utilisation due to context shift between user space and kernel space for frequent read and write calls.
As first step, this PR moves command execution logic out of IOThread to a new entity
CommandHandler
. IOThread is now responsible for reading from and writing to the client connection, and CommandHandler is responsible for execution of command by interacting with ShardManager and WatchManager.Currently, there is 1:1 mapping between IOThread and CommandHandler, and both of them are spawned by Resp Server. IOThread and CommandHandler communicate using 3 channels
ioThreadReadChan
- to send command from IOThread to CommandHandlerioThreadWriteChan
- to send response from CommandHandler to IOThreadioThreadErrChan
- to send connection error signal from IOThread to CommandHandler, so that CommandHandler shuts down when IOHandler does