diff --git a/internal/commandhandler/commandhandler.go b/internal/commandhandler/commandhandler.go index 050f2059f..e86eb0154 100644 --- a/internal/commandhandler/commandhandler.go +++ b/internal/commandhandler/commandhandler.go @@ -45,6 +45,7 @@ type BaseCommandHandler struct { globalErrorChan chan error ioThreadReadChan chan []byte // Channel to receive data from io-thread ioThreadWriteChan chan interface{} // Channel to send data to io-thread + ioThreadErrChan chan error // Channel to receive errors from io-thread responseChan chan *ops.StoreResponse // Channel to communicate with shard preprocessingChan chan *ops.StoreResponse // Channel to communicate with shard cmdWatchSubscriptionChan chan watchmanager.WatchSubscription @@ -54,7 +55,7 @@ type BaseCommandHandler struct { func NewCommandHandler(id string, responseChan, preprocessingChan chan *ops.StoreResponse, cmdWatchSubscriptionChan chan watchmanager.WatchSubscription, parser requestparser.Parser, shardManager *shard.ShardManager, gec chan error, - ioThreadReadChan chan []byte, ioThreadWriteChan chan interface{}, + ioThreadReadChan chan []byte, ioThreadWriteChan chan interface{}, ioThreadErrChan chan error, wl wal.AbstractWAL) *BaseCommandHandler { return &BaseCommandHandler{ id: id, @@ -65,6 +66,7 @@ func NewCommandHandler(id string, responseChan, preprocessingChan chan *ops.Stor globalErrorChan: gec, ioThreadReadChan: ioThreadReadChan, ioThreadWriteChan: ioThreadWriteChan, + ioThreadErrChan: ioThreadErrChan, responseChan: responseChan, preprocessingChan: preprocessingChan, cmdWatchSubscriptionChan: cmdWatchSubscriptionChan, @@ -83,6 +85,8 @@ func (h *BaseCommandHandler) Start(ctx context.Context) error { select { case <-ctx.Done(): return ctx.Err() + case err := <-h.ioThreadErrChan: + return err case cmdReq := <-h.adhocReqChan: resp, err := h.handleCmdRequestWithTimeout(ctx, errChan, []*cmd.DiceDBCmd{cmdReq}, true, defaultRequestTimeout) h.sendResponseToIOThread(resp, err) diff --git a/internal/iothread/iothread.go b/internal/iothread/iothread.go index f01023523..4149ab1e0 100644 --- a/internal/iothread/iothread.go +++ b/internal/iothread/iothread.go @@ -22,16 +22,18 @@ type BaseIOThread struct { Session *auth.Session ioThreadReadChan chan []byte // Channel to send data to the command handler ioThreadWriteChan chan interface{} // Channel to receive data from the command handler + ioThreadErrChan chan error // Channel to receive errors from the ioHandler } func NewIOThread(id string, ioHandler iohandler.IOHandler, - ioThreadReadChan chan []byte, ioThreadWriteChan chan interface{}) *BaseIOThread { + ioThreadReadChan chan []byte, ioThreadWriteChan chan interface{}, ioThreadErrChan chan error) *BaseIOThread { return &BaseIOThread{ id: id, ioHandler: ioHandler, Session: auth.NewSession(), ioThreadReadChan: ioThreadReadChan, ioThreadWriteChan: ioThreadWriteChan, + ioThreadErrChan: ioThreadErrChan, } } diff --git a/internal/server/resp/server.go b/internal/server/resp/server.go index 949e578ef..579d4bf62 100644 --- a/internal/server/resp/server.go +++ b/internal/server/resp/server.go @@ -200,7 +200,8 @@ func (s *Server) AcceptConnectionRequests(ctx context.Context, wg *sync.WaitGrou ioThreadID := GenerateUniqueIOThreadID() ioThreadReadChan := make(chan []byte) // for sending data to the command handler from the io-thread ioThreadWriteChan := make(chan interface{}) // for sending data to the io-thread from the command handler - thread := iothread.NewIOThread(ioThreadID, ioHandler, ioThreadReadChan, ioThreadWriteChan) + ioThreadErrChan := make(chan error, 1) // for receiving errors from the io-thread + thread := iothread.NewIOThread(ioThreadID, ioHandler, ioThreadReadChan, ioThreadWriteChan, ioThreadErrChan) // For each io-thread, we create a dedicated command handler - 1:1 mapping cmdHandlerID := GenerateUniqueCommandHandlerID() @@ -209,8 +210,8 @@ func (s *Server) AcceptConnectionRequests(ctx context.Context, wg *sync.WaitGrou preprocessingChan := make(chan *ops.StoreResponse) // preprocessingChan is specifically for handling responses from shards for commands that require preprocessing handler := commandhandler.NewCommandHandler(cmdHandlerID, responseChan, preprocessingChan, - s.cmdWatchSubscriptionChan, parser, s.shardManager, - s.globalErrorChan, ioThreadReadChan, ioThreadWriteChan, s.wl) + s.cmdWatchSubscriptionChan, parser, s.shardManager, s.globalErrorChan, + ioThreadReadChan, ioThreadWriteChan, ioThreadErrChan, s.wl) // Register the io-thread with the manager err = s.ioThreadManager.RegisterIOThread(thread)