Skip to content

Commit

Permalink
added an error channel between io thread and command handler to signa…
Browse files Browse the repository at this point in the history
…l exit
  • Loading branch information
psrvere committed Dec 5, 2024
1 parent 690430d commit 3e15eca
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 5 deletions.
6 changes: 5 additions & 1 deletion internal/commandhandler/commandhandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type BaseCommandHandler struct {
globalErrorChan chan error
ioThreadReadChan chan []byte // Channel to receive data from io-thread
ioThreadWriteChan chan interface{} // Channel to send data to io-thread
ioThreadErrChan chan error // Channel to receive errors from io-thread
responseChan chan *ops.StoreResponse // Channel to communicate with shard
preprocessingChan chan *ops.StoreResponse // Channel to communicate with shard
cmdWatchSubscriptionChan chan watchmanager.WatchSubscription
Expand All @@ -54,7 +55,7 @@ type BaseCommandHandler struct {
func NewCommandHandler(id string, responseChan, preprocessingChan chan *ops.StoreResponse,
cmdWatchSubscriptionChan chan watchmanager.WatchSubscription,
parser requestparser.Parser, shardManager *shard.ShardManager, gec chan error,
ioThreadReadChan chan []byte, ioThreadWriteChan chan interface{},
ioThreadReadChan chan []byte, ioThreadWriteChan chan interface{}, ioThreadErrChan chan error,
wl wal.AbstractWAL) *BaseCommandHandler {
return &BaseCommandHandler{
id: id,
Expand All @@ -65,6 +66,7 @@ func NewCommandHandler(id string, responseChan, preprocessingChan chan *ops.Stor
globalErrorChan: gec,
ioThreadReadChan: ioThreadReadChan,
ioThreadWriteChan: ioThreadWriteChan,
ioThreadErrChan: ioThreadErrChan,
responseChan: responseChan,
preprocessingChan: preprocessingChan,
cmdWatchSubscriptionChan: cmdWatchSubscriptionChan,
Expand All @@ -83,6 +85,8 @@ func (h *BaseCommandHandler) Start(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
case err := <-h.ioThreadErrChan:
return err
case cmdReq := <-h.adhocReqChan:
resp, err := h.handleCmdRequestWithTimeout(ctx, errChan, []*cmd.DiceDBCmd{cmdReq}, true, defaultRequestTimeout)
h.sendResponseToIOThread(resp, err)
Expand Down
4 changes: 3 additions & 1 deletion internal/iothread/iothread.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,18 @@ type BaseIOThread struct {
Session *auth.Session
ioThreadReadChan chan []byte // Channel to send data to the command handler
ioThreadWriteChan chan interface{} // Channel to receive data from the command handler
ioThreadErrChan chan error // Channel to receive errors from the ioHandler
}

func NewIOThread(id string, ioHandler iohandler.IOHandler,
ioThreadReadChan chan []byte, ioThreadWriteChan chan interface{}) *BaseIOThread {
ioThreadReadChan chan []byte, ioThreadWriteChan chan interface{}, ioThreadErrChan chan error) *BaseIOThread {
return &BaseIOThread{
id: id,
ioHandler: ioHandler,
Session: auth.NewSession(),
ioThreadReadChan: ioThreadReadChan,
ioThreadWriteChan: ioThreadWriteChan,
ioThreadErrChan: ioThreadErrChan,
}
}

Expand Down
7 changes: 4 additions & 3 deletions internal/server/resp/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,8 @@ func (s *Server) AcceptConnectionRequests(ctx context.Context, wg *sync.WaitGrou
ioThreadID := GenerateUniqueIOThreadID()
ioThreadReadChan := make(chan []byte) // for sending data to the command handler from the io-thread
ioThreadWriteChan := make(chan interface{}) // for sending data to the io-thread from the command handler
thread := iothread.NewIOThread(ioThreadID, ioHandler, ioThreadReadChan, ioThreadWriteChan)
ioThreadErrChan := make(chan error, 1) // for receiving errors from the io-thread
thread := iothread.NewIOThread(ioThreadID, ioHandler, ioThreadReadChan, ioThreadWriteChan, ioThreadErrChan)

// For each io-thread, we create a dedicated command handler - 1:1 mapping
cmdHandlerID := GenerateUniqueCommandHandlerID()
Expand All @@ -209,8 +210,8 @@ func (s *Server) AcceptConnectionRequests(ctx context.Context, wg *sync.WaitGrou
preprocessingChan := make(chan *ops.StoreResponse) // preprocessingChan is specifically for handling responses from shards for commands that require preprocessing

handler := commandhandler.NewCommandHandler(cmdHandlerID, responseChan, preprocessingChan,
s.cmdWatchSubscriptionChan, parser, s.shardManager,
s.globalErrorChan, ioThreadReadChan, ioThreadWriteChan, s.wl)
s.cmdWatchSubscriptionChan, parser, s.shardManager, s.globalErrorChan,
ioThreadReadChan, ioThreadWriteChan, ioThreadErrChan, s.wl)

// Register the io-thread with the manager
err = s.ioThreadManager.RegisterIOThread(thread)
Expand Down

0 comments on commit 3e15eca

Please sign in to comment.