From 6167b19bfdcf52b40e65d36f0686b4b466d30b3c Mon Sep 17 00:00:00 2001 From: Arpit Bhayani Date: Tue, 22 Oct 2024 13:44:06 +0530 Subject: [PATCH 01/12] Removing the redundant passing of slog.Logger --- Dockerfile | 4 +- Makefile | 8 +-- README.md | 4 +- config/config.go | 13 +---- integration_tests/commands/async/setup.go | 12 ++-- integration_tests/commands/http/setup.go | 6 +- integration_tests/commands/resp/setup.go | 9 ++- integration_tests/commands/websocket/setup.go | 9 ++- .../clientio/iohandler/netconn/netconn.go | 22 ++++--- .../clientio/requestparser/resp/respparser.go | 12 ++-- internal/logger/logger.go | 10 ++-- internal/observability/ping.go | 4 +- internal/querymanager/query_manager.go | 3 +- internal/server/httpServer.go | 43 +++++++------- internal/server/resp/server.go | 43 +++++++------- internal/server/server.go | 35 ++++++------ internal/server/websocketServer.go | 37 ++++++------ internal/shard/shard_manager.go | 5 +- internal/shard/shard_thread.go | 4 +- internal/watchmanager/watch_manager.go | 8 +-- internal/worker/cmd_decompose.go | 4 +- internal/worker/worker.go | 57 +++++++++---------- main.go | 57 +++++++++---------- 23 files changed, 188 insertions(+), 221 deletions(-) diff --git a/Dockerfile b/Dockerfile index 74b818e17..ec4596a54 100644 --- a/Dockerfile +++ b/Dockerfile @@ -10,5 +10,5 @@ FROM gcr.io/distroless/static-debian12:nonroot WORKDIR /app COPY --from=builder /dicedb/dicedb ./ EXPOSE 7379 -ENV DICE_ENV=prod -CMD ["/app/dicedb"] + +ENTRYPOINT ["/app/dicedb"] diff --git a/Makefile b/Makefile index ddc84d85d..09d33edca 100644 --- a/Makefile +++ b/Makefile @@ -10,7 +10,7 @@ PORT ?= 7379 #Port for dicedb GOOS ?= $(shell go env GOOS) GOARCH ?= $(shell go env GOARCH) -VERSION=$(shell bash -c 'grep -oP "const Version = \"\K[^\"]+" main.go') +VERSION=$(shell bash -c 'grep -oP "DiceVersion string = \"\K[^\"]+" config/config.go') .PHONY: build test build-docker run test-one @@ -58,10 +58,10 @@ unittest: unittest-one: go test -v -race -count=1 --run $(TEST_FUNC) ./internal/... -build-docker: +release: + git tag v$(VERSION) + git push origin --tags docker build --tag dicedb/dicedb:latest --tag dicedb/dicedb:$(VERSION) . - -push-docker: docker push dicedb/dicedb:$(VERSION) GOLANGCI_LINT_VERSION := 1.60.1 diff --git a/README.md b/README.md index 92d2ab681..b20f5a634 100644 --- a/README.md +++ b/README.md @@ -37,7 +37,7 @@ With this, you can build truly real-time applications like [Leaderboard](https:/ The easiest way to get started with DiceDB is using [Docker](https://www.docker.com/) by running the following command. ```bash -docker run -p 7379:7379 dicedb/dicedb +docker run -p 7379:7379 dicedb/dicedb --enable-multithreading --enable-watch ``` The above command will start the DiceDB server running locally on the port `7379` and you can connect @@ -109,8 +109,6 @@ cd dice air ``` -> The `DICE_ENV` environment variable is used set the environment, by default it is treated as production. `dev` is used to get pretty printed logs and lower log level. - ### Local Setup with Custom Config By default, DiceDB will look for the configuration file at `/etc/dice/config.toml`. (Linux, Darwin, and WSL) diff --git a/config/config.go b/config/config.go index 5b7be459b..ea56d67dc 100644 --- a/config/config.go +++ b/config/config.go @@ -224,16 +224,8 @@ var defaultConfig Config func init() { config := baseConfig - env := os.Getenv("DICE_ENV") - if env == "prod" { - config.Logging.LogLevel = "info" - config.Logging.PrettyPrintLogs = false - } - - if logLevel := os.Getenv("DICE_LOG_LEVEL"); logLevel != "" { - config.Logging.LogLevel = logLevel - } - + config.Logging.PrettyPrintLogs = false + config.Logging.LogLevel = "debug" defaultConfig = config } @@ -351,7 +343,6 @@ func setUpViperConfig(configFilePath string) { // override default configurations with command line flags mergeFlagsWithConfig() - slog.Info("configurations loaded successfully.") } func mergeFlagsWithConfig() { diff --git a/integration_tests/commands/async/setup.go b/integration_tests/commands/async/setup.go index e9b24887e..3b2664a7f 100644 --- a/integration_tests/commands/async/setup.go +++ b/integration_tests/commands/async/setup.go @@ -122,9 +122,9 @@ func RunTestServer(ctx context.Context, wg *sync.WaitGroup, opt TestServerOption var err error watchChan := make(chan dstore.QueryWatchEvent, config.DiceConfig.Memory.KeysLimit) gec := make(chan error) - shardManager := shard.NewShardManager(1, watchChan, nil, gec, opt.Logger) + shardManager := shard.NewShardManager(1, watchChan, nil, gec) // Initialize the AsyncServer - testServer := server.NewAsyncServer(shardManager, watchChan, opt.Logger) + testServer := server.NewAsyncServer(shardManager, watchChan) // Try to bind to a port with a maximum of `totalRetries` retries. for i := 0; i < totalRetries; i++ { @@ -133,19 +133,19 @@ func RunTestServer(ctx context.Context, wg *sync.WaitGroup, opt TestServerOption } if err.Error() == "address already in use" { - opt.Logger.Info("Port already in use, trying port", + slog.Info("Port already in use, trying port", slog.Int("port", config.DiceConfig.AsyncServer.Port), slog.Int("new_port", config.DiceConfig.AsyncServer.Port+1), ) config.DiceConfig.AsyncServer.Port++ } else { - opt.Logger.Error("Failed to bind port", slog.Any("error", err)) + slog.Error("Failed to bind port", slog.Any("error", err)) return } } if err != nil { - opt.Logger.Error("Failed to bind to a port after retries", + slog.Error("Failed to bind to a port after retries", slog.Any("error", err), slog.Int("retry_count", totalRetries), ) @@ -172,7 +172,7 @@ func RunTestServer(ctx context.Context, wg *sync.WaitGroup, opt TestServerOption cancelShardManager() return } - opt.Logger.Error("Test server encountered an error", slog.Any("error", err)) + slog.Error("Test server encountered an error", slog.Any("error", err)) os.Exit(1) } }() diff --git a/integration_tests/commands/http/setup.go b/integration_tests/commands/http/setup.go index 430c42052..72f0a7203 100644 --- a/integration_tests/commands/http/setup.go +++ b/integration_tests/commands/http/setup.go @@ -106,11 +106,11 @@ func RunHTTPServer(ctx context.Context, wg *sync.WaitGroup, opt TestServerOption globalErrChannel := make(chan error) watchChan := make(chan dstore.QueryWatchEvent, config.DiceConfig.Performance.WatchChanBufSize) - shardManager := shard.NewShardManager(1, watchChan, nil, globalErrChannel, opt.Logger) - queryWatcherLocal := querymanager.NewQueryManager(opt.Logger) + shardManager := shard.NewShardManager(1, watchChan, nil, globalErrChannel) + queryWatcherLocal := querymanager.NewQueryManager() config.HTTPPort = opt.Port // Initialize the HTTPServer - testServer := server.NewHTTPServer(shardManager, opt.Logger) + testServer := server.NewHTTPServer(shardManager) // Inform the user that the server is starting fmt.Println("Starting the test server on port", config.HTTPPort) shardManagerCtx, cancelShardManager := context.WithCancel(ctx) diff --git a/integration_tests/commands/resp/setup.go b/integration_tests/commands/resp/setup.go index 617c19bb5..b4da6b0b2 100644 --- a/integration_tests/commands/resp/setup.go +++ b/integration_tests/commands/resp/setup.go @@ -112,8 +112,7 @@ func fireCommandAndGetRESPParser(conn net.Conn, cmd string) *clientio.RESPParser } func RunTestServer(wg *sync.WaitGroup, opt TestServerOptions) { - logr := logger.New(logger.Opts{WithTimestamp: true}) - slog.SetDefault(logr) + slog.SetDefault(logger.New(logger.Opts{WithTimestamp: true})) config.DiceConfig.Network.IOBufferLength = 16 config.DiceConfig.Persistence.WriteAOFOnCleanup = false if opt.Port != 0 { @@ -125,10 +124,10 @@ func RunTestServer(wg *sync.WaitGroup, opt TestServerOptions) { queryWatchChan := make(chan dstore.QueryWatchEvent, config.DiceConfig.Memory.KeysLimit) cmdWatchChan := make(chan dstore.CmdWatchEvent, config.DiceConfig.Memory.KeysLimit) gec := make(chan error) - shardManager := shard.NewShardManager(1, queryWatchChan, cmdWatchChan, gec, logr) + shardManager := shard.NewShardManager(1, queryWatchChan, cmdWatchChan, gec) workerManager := worker.NewWorkerManager(20000, shardManager) // Initialize the RESP Server - testServer := resp.NewServer(shardManager, workerManager, cmdWatchChan, gec, logr) + testServer := resp.NewServer(shardManager, workerManager, cmdWatchChan, gec) ctx, cancel := context.WithCancel(context.Background()) fmt.Println("Starting the test server on port", config.DiceConfig.AsyncServer.Port) @@ -149,7 +148,7 @@ func RunTestServer(wg *sync.WaitGroup, opt TestServerOptions) { cancelShardManager() return } - opt.Logger.Error("Test server encountered an error", slog.Any("error", err)) + slog.Error("Test server encountered an error", slog.Any("error", err)) os.Exit(1) } }() diff --git a/integration_tests/commands/websocket/setup.go b/integration_tests/commands/websocket/setup.go index 485ca5183..48a25fe95 100644 --- a/integration_tests/commands/websocket/setup.go +++ b/integration_tests/commands/websocket/setup.go @@ -101,17 +101,16 @@ func (e *WebsocketCommandExecutor) Name() string { } func RunWebsocketServer(ctx context.Context, wg *sync.WaitGroup, opt TestServerOptions) { - logger := opt.Logger config.DiceConfig.Network.IOBufferLength = 16 config.DiceConfig.Persistence.WriteAOFOnCleanup = false // Initialize WebsocketServer globalErrChannel := make(chan error) watchChan := make(chan dstore.QueryWatchEvent, config.DiceConfig.Performance.WatchChanBufSize) - shardManager := shard.NewShardManager(1, watchChan, nil, globalErrChannel, opt.Logger) - queryWatcherLocal := querymanager.NewQueryManager(opt.Logger) + shardManager := shard.NewShardManager(1, watchChan, nil, globalErrChannel) + queryWatcherLocal := querymanager.NewQueryManager() config.WebsocketPort = opt.Port - testServer := server.NewWebSocketServer(shardManager, testPort1, opt.Logger) + testServer := server.NewWebSocketServer(shardManager, testPort1) shardManagerCtx, cancelShardManager := context.WithCancel(ctx) // run shard manager @@ -138,7 +137,7 @@ func RunWebsocketServer(ctx context.Context, wg *sync.WaitGroup, opt TestServerO if errors.Is(srverr, derrors.ErrAborted) { return } - logger.Debug("Websocket test server encountered an error: %v", slog.Any("error", srverr)) + slog.Debug("Websocket test server encountered an error: %v", slog.Any("error", srverr)) } }() } diff --git a/internal/clientio/iohandler/netconn/netconn.go b/internal/clientio/iohandler/netconn/netconn.go index 02c6e957d..5b78590a8 100644 --- a/internal/clientio/iohandler/netconn/netconn.go +++ b/internal/clientio/iohandler/netconn/netconn.go @@ -35,13 +35,12 @@ type IOHandler struct { conn net.Conn reader *bufio.Reader writer *bufio.Writer - logger *slog.Logger } var _ iohandler.IOHandler = (*IOHandler)(nil) // NewIOHandler creates a new IOHandler from a file descriptor -func NewIOHandler(clientFD int, logger *slog.Logger) (*IOHandler, error) { +func NewIOHandler(clientFD int) (*IOHandler, error) { file := os.NewFile(uintptr(clientFD), "client-connection") if file == nil { return nil, fmt.Errorf("failed to create file from file descriptor") @@ -54,7 +53,7 @@ func NewIOHandler(clientFD int, logger *slog.Logger) (*IOHandler, error) { // Only close the file if we haven't successfully created a net.Conn err := file.Close() if err != nil { - logger.Warn("Error closing file in NewIOHandler:", slog.Any("error", err)) + slog.Warn("Error closing file in NewIOHandler:", slog.Any("error", err)) } } }() @@ -71,7 +70,6 @@ func NewIOHandler(clientFD int, logger *slog.Logger) (*IOHandler, error) { conn: conn, reader: bufio.NewReader(conn), writer: bufio.NewWriter(conn), - logger: logger, }, nil } @@ -112,27 +110,27 @@ func (h *IOHandler) Read(ctx context.Context) ([]byte, error) { // No more data to read at this time return data, nil case errors.Is(err, net.ErrClosed), errors.Is(err, syscall.EPIPE), errors.Is(err, syscall.ECONNRESET): - h.logger.Debug("Connection closed", slog.Any("error", err)) + slog.Debug("Connection closed", slog.Any("error", err)) cerr := h.Close() if cerr != nil { - h.logger.Warn("Error closing connection", slog.Any("error", errors.Join(err, cerr))) + slog.Warn("Error closing connection", slog.Any("error", errors.Join(err, cerr))) } return nil, ErrorClosed case errors.Is(err, syscall.ETIMEDOUT): - h.logger.Info("Connection idle timeout", slog.Any("error", err)) + slog.Info("Connection idle timeout", slog.Any("error", err)) cerr := h.Close() if cerr != nil { - h.logger.Warn("Error closing connection", slog.Any("error", errors.Join(err, cerr))) + slog.Warn("Error closing connection", slog.Any("error", errors.Join(err, cerr))) } return nil, ErrIdleTimeout default: - h.logger.Error("Error reading from connection", slog.Any("error", err)) + slog.Error("Error reading from connection", slog.Any("error", err)) return nil, fmt.Errorf("error reading request: %w", err) } } if len(data) > maxRequestSize { - h.logger.Warn("Request too large", slog.Any("size", len(data))) + slog.Warn("Request too large", slog.Any("size", len(data))) return nil, ErrRequestTooLarge } @@ -186,7 +184,7 @@ func (h *IOHandler) Write(ctx context.Context, response interface{}) error { err = errors.Join(err, cerr) } - h.logger.Debug("Connection closed", slog.Any("error", err)) + slog.Debug("Connection closed", slog.Any("error", err)) return err } @@ -199,7 +197,7 @@ func (h *IOHandler) Write(ctx context.Context, response interface{}) error { // Close underlying network connection func (h *IOHandler) Close() error { - h.logger.Info("Closing connection") + slog.Info("Closing connection") return errors.Join(h.conn.Close(), h.file.Close()) } diff --git a/internal/clientio/requestparser/resp/respparser.go b/internal/clientio/requestparser/resp/respparser.go index fcf237fa4..71ad93eaf 100644 --- a/internal/clientio/requestparser/resp/respparser.go +++ b/internal/clientio/requestparser/resp/respparser.go @@ -33,16 +33,14 @@ var CRLF = []byte{'\r', '\n'} // Parser is responsible for parsing RESP protocol data type Parser struct { - data []byte - pos int - logger *slog.Logger + data []byte + pos int } // NewParser creates a new RESP parser -func NewParser(l *slog.Logger) *Parser { +func NewParser() *Parser { return &Parser{ - pos: 0, - logger: l, + pos: 0, } } @@ -76,7 +74,7 @@ func (p *Parser) parseCommand() (*cmd.DiceDBCmd, error) { // A Dice command should always be an array as it follows RESP2 specifications elements, err := p.parse() if err != nil { - p.logger.Error("error while parsing command", slog.Any("cmd", string(p.data)), slog.Any("error", err)) + slog.Error("error while parsing command", slog.Any("cmd", string(p.data)), slog.Any("error", err)) return nil, fmt.Errorf("error parsing command: %w", err) } diff --git a/internal/logger/logger.go b/internal/logger/logger.go index dc09d68c3..d2f73e9e7 100644 --- a/internal/logger/logger.go +++ b/internal/logger/logger.go @@ -31,13 +31,11 @@ type Opts struct { func New(opts Opts) *slog.Logger { var writer io.Writer = os.Stderr if config.DiceConfig.Logging.PrettyPrintLogs { - writer = zerolog.ConsoleWriter{Out: os.Stderr} - } - zerologLogger := zerolog.New(writer).Level(mapLevel(getLogLevel().Level())) - if opts.WithTimestamp { - zerologLogger = zerologLogger.With().Timestamp().Logger() + writer = zerolog.ConsoleWriter{Out: os.Stderr, NoColor: true} } + zerologLogger := zerolog.New(writer). + Level(mapLevel(getLogLevel().Level())). + With().Timestamp().Logger() logger := slog.New(newZerologHandler(&zerologLogger)) - return logger } diff --git a/internal/observability/ping.go b/internal/observability/ping.go index 1e4c0032f..7ba4e3f69 100644 --- a/internal/observability/ping.go +++ b/internal/observability/ping.go @@ -29,7 +29,7 @@ const ( type DBConfig struct { } -func Ping(logger *slog.Logger) { +func Ping() { hwConfig, err := GetHardwareMeta() if err != nil { return @@ -55,7 +55,7 @@ func Ping(logger *slog.Logger) { client := &http.Client{Timeout: time.Second * 5} resp, err := client.Do(req) if err != nil { - logger.Error("Error reporting observability metrics.", slog.Any("error", err)) + slog.Error("Error reporting observability metrics.", slog.Any("error", err)) return } diff --git a/internal/querymanager/query_manager.go b/internal/querymanager/query_manager.go index c3a81c63a..594b16a81 100644 --- a/internal/querymanager/query_manager.go +++ b/internal/querymanager/query_manager.go @@ -107,13 +107,12 @@ func NewCacheStore() CacheStore { } // NewQueryManager initializes a new Manager. -func NewQueryManager(logger *slog.Logger) *Manager { +func NewQueryManager() *Manager { QuerySubscriptionChan = make(chan QuerySubscription) AdhocQueryChan = make(chan AdhocQuery, 1000) return &Manager{ WatchList: sync.Map{}, QueryCache: NewQueryCacheStore(), - logger: logger, } } diff --git a/internal/server/httpServer.go b/internal/server/httpServer.go index 746d8485c..52b4b6317 100644 --- a/internal/server/httpServer.go +++ b/internal/server/httpServer.go @@ -5,7 +5,6 @@ import ( "context" "encoding/json" "fmt" - "github.com/dicedb/dice/internal/server/abstractserver" "hash/crc32" "log/slog" "net/http" @@ -13,6 +12,8 @@ import ( "sync" "time" + "github.com/dicedb/dice/internal/server/abstractserver" + "github.com/dicedb/dice/internal/eval" "github.com/dicedb/dice/config" @@ -39,7 +40,6 @@ type HTTPServer struct { shardManager *shard.ShardManager ioChan chan *ops.StoreResponse httpServer *http.Server - logger *slog.Logger qwatchResponseChan chan comm.QwatchResponse shutdownChan chan struct{} } @@ -61,7 +61,7 @@ func (cim *CaseInsensitiveMux) ServeHTTP(w http.ResponseWriter, r *http.Request) cim.mux.ServeHTTP(w, r) } -func NewHTTPServer(shardManager *shard.ShardManager, logger *slog.Logger) *HTTPServer { +func NewHTTPServer(shardManager *shard.ShardManager) *HTTPServer { mux := http.NewServeMux() caseInsensitiveMux := &CaseInsensitiveMux{mux: mux} srv := &http.Server{ @@ -74,7 +74,6 @@ func NewHTTPServer(shardManager *shard.ShardManager, logger *slog.Logger) *HTTPS shardManager: shardManager, ioChan: make(chan *ops.StoreResponse, 1000), httpServer: srv, - logger: logger, qwatchResponseChan: make(chan comm.QwatchResponse), shutdownChan: make(chan struct{}), } @@ -107,12 +106,12 @@ func (s *HTTPServer) Run(ctx context.Context) error { case <-ctx.Done(): case <-s.shutdownChan: err = derrors.ErrAborted - s.logger.Debug("Shutting down HTTP Server") + slog.Debug("Shutting down HTTP Server") } shutdownErr := s.httpServer.Shutdown(httpCtx) if shutdownErr != nil { - s.logger.Error("HTTP Server Shutdown Failed", slog.Any("error", err)) + slog.Error("HTTP Server Shutdown Failed", slog.Any("error", err)) err = shutdownErr return } @@ -121,7 +120,7 @@ func (s *HTTPServer) Run(ctx context.Context) error { wg.Add(1) go func() { defer wg.Done() - s.logger.Info("HTTP Server running", slog.String("addr", s.httpServer.Addr)) + slog.Info("HTTP Server running", slog.String("addr", s.httpServer.Addr)) err = s.httpServer.ListenAndServe() }() @@ -138,15 +137,15 @@ func (s *HTTPServer) DiceHTTPHandler(writer http.ResponseWriter, request *http.R writer.WriteHeader(http.StatusBadRequest) // Set HTTP status code to 500 _, err = writer.Write(responseJSON) if err != nil { - s.logger.Error("Error writing response", "error", err) + slog.Error("Error writing response", "error", err) } - s.logger.Error("Error parsing HTTP request", slog.Any("error", err)) + slog.Error("Error parsing HTTP request", slog.Any("error", err)) return } if diceDBCmd.Cmd == Abort { - s.logger.Debug("ABORT command received") - s.logger.Debug("Shutting down HTTP Server") + slog.Debug("ABORT command received") + slog.Debug("Shutting down HTTP Server") close(s.shutdownChan) return } @@ -157,9 +156,9 @@ func (s *HTTPServer) DiceHTTPHandler(writer http.ResponseWriter, request *http.R writer.WriteHeader(http.StatusBadRequest) // Set HTTP status code to 500 _, err = writer.Write(responseJSON) if err != nil { - s.logger.Error("Error writing response", "error", err) + slog.Error("Error writing response", "error", err) } - s.logger.Error("Command %s is not implemented", slog.String("cmd", diceDBCmd.Cmd)) + slog.Error("Command %s is not implemented", slog.String("cmd", diceDBCmd.Cmd)) return } @@ -182,12 +181,12 @@ func (s *HTTPServer) DiceHTTPQwatchHandler(writer http.ResponseWriter, request * diceDBCmd, err := utils.ParseHTTPRequest(request) if err != nil { http.Error(writer, "Error parsing HTTP request", http.StatusBadRequest) - s.logger.Error("Error parsing HTTP request", slog.Any("error", err)) + slog.Error("Error parsing HTTP request", slog.Any("error", err)) return } if len(diceDBCmd.Args) < 1 { - s.logger.Error("Invalid request for QWATCH") + slog.Error("Invalid request for QWATCH") http.Error(writer, "Invalid request for QWATCH", http.StatusBadRequest) return } @@ -223,7 +222,7 @@ func (s *HTTPServer) DiceHTTPQwatchHandler(writer http.ResponseWriter, request * HTTPOp: true, } - s.logger.Info("Registered client for watching query", slog.Any("clientID", clientIdentifierID), + slog.Info("Registered client for watching query", slog.Any("clientID", clientIdentifierID), slog.Any("query", qwatchQuery)) s.shardManager.GetShard(0).ReqChan <- storeOp @@ -244,7 +243,7 @@ func (s *HTTPServer) DiceHTTPQwatchHandler(writer http.ResponseWriter, request * return case <-doneChan: // Client disconnected or request finished - s.logger.Info("Client disconnected") + slog.Info("Client disconnected") unWatchCmd := &cmd.DiceDBCmd{ Cmd: "Q.UNWATCH", Args: []string{qwatchQuery}, @@ -271,7 +270,7 @@ func (s *HTTPServer) writeQWatchResponse(writer http.ResponseWriter, response in result = resp.EvalResponse.Result err = resp.EvalResponse.Error default: - s.logger.Error("Unsupported response type") + slog.Error("Unsupported response type") http.Error(writer, "Internal Server Error", http.StatusInternalServerError) return } @@ -285,7 +284,7 @@ func (s *HTTPServer) writeQWatchResponse(writer http.ResponseWriter, response in val, err := rp.DecodeOne() if err != nil { - s.logger.Error("Error decoding response: %v", slog.Any("error", err)) + slog.Error("Error decoding response: %v", slog.Any("error", err)) http.Error(writer, "Internal Server Error", http.StatusInternalServerError) return } @@ -308,7 +307,7 @@ func (s *HTTPServer) writeQWatchResponse(writer http.ResponseWriter, response in } if err != nil { - s.logger.Error("Error marshaling QueryData to JSON: %v", slog.Any("error", err)) + slog.Error("Error marshaling QueryData to JSON: %v", slog.Any("error", err)) http.Error(writer, "Internal Server Error", http.StatusInternalServerError) return } @@ -316,7 +315,7 @@ func (s *HTTPServer) writeQWatchResponse(writer http.ResponseWriter, response in // Format the response as SSE event _, err = writer.Write(responseJSON) if err != nil { - s.logger.Error("Error writing SSE data: %v", slog.Any("error", err)) + slog.Error("Error writing SSE data: %v", slog.Any("error", err)) http.Error(writer, "Internal Server Error", http.StatusInternalServerError) return } @@ -345,7 +344,7 @@ func (s *HTTPServer) writeResponse(writer http.ResponseWriter, result *ops.Store if !ok { responseValue, err = decodeEvalResponse(result.EvalResponse) if err != nil { - s.logger.Error("Error decoding response", "error", err) + slog.Error("Error decoding response", "error", err) httpResponse = utils.HTTPResponse{Status: utils.HTTPStatusError, Data: "Internal Server Error"} writeJSONResponse(writer, httpResponse, http.StatusInternalServerError) return diff --git a/internal/server/resp/server.go b/internal/server/resp/server.go index fd4ef2b93..77c848d59 100644 --- a/internal/server/resp/server.go +++ b/internal/server/resp/server.go @@ -4,7 +4,6 @@ import ( "context" "errors" "fmt" - "github.com/dicedb/dice/internal/server/abstractserver" "log/slog" "net" "sync" @@ -12,6 +11,8 @@ import ( "syscall" "time" + "github.com/dicedb/dice/internal/server/abstractserver" + dstore "github.com/dicedb/dice/internal/store" "github.com/dicedb/dice/internal/watchmanager" @@ -47,27 +48,25 @@ type Server struct { watchManager *watchmanager.Manager cmdWatchChan chan dstore.CmdWatchEvent globalErrorChan chan error - logger *slog.Logger } -func NewServer(shardManager *shard.ShardManager, workerManager *worker.WorkerManager, cmdWatchChan chan dstore.CmdWatchEvent, globalErrChan chan error, l *slog.Logger) *Server { +func NewServer(shardManager *shard.ShardManager, workerManager *worker.WorkerManager, cmdWatchChan chan dstore.CmdWatchEvent, globalErrChan chan error) *Server { return &Server{ Host: config.DiceConfig.AsyncServer.Addr, Port: config.DiceConfig.AsyncServer.Port, connBacklogSize: DefaultConnBacklogSize, workerManager: workerManager, shardManager: shardManager, - watchManager: watchmanager.NewManager(l), + watchManager: watchmanager.NewManager(), cmdWatchChan: cmdWatchChan, globalErrorChan: globalErrChan, - logger: l, } } func (s *Server) Run(ctx context.Context) (err error) { // BindAndListen the desired port to the server if err = s.BindAndListen(); err != nil { - s.logger.Error("failed to bind server", slog.Any("error", err)) + slog.Error("failed to bind server", slog.Any("error", err)) return err } @@ -93,19 +92,19 @@ func (s *Server) Run(ctx context.Context) (err error) { } }(wg) - s.logger.Info("DiceDB ready to accept connections on port", slog.Int("resp-port", config.Port)) + slog.Info("DiceDB ready to accept connections on port", slog.Int("resp-port", config.Port)) select { case <-ctx.Done(): - s.logger.Info("Context canceled, initiating shutdown") + slog.Info("Context canceled, initiating shutdown") case err = <-errChan: - s.logger.Error("Error while accepting connections, initiating shutdown", slog.Any("error", err)) + slog.Error("Error while accepting connections, initiating shutdown", slog.Any("error", err)) } s.Shutdown() wg.Wait() // Wait for the go routines to finish - s.logger.Info("All connections are closed, RESP server exiting gracefully.") + slog.Info("All connections are closed, RESP server exiting gracefully.") return err } @@ -122,9 +121,9 @@ func (s *Server) BindAndListen() error { if err != nil { if closeErr := syscall.Close(serverFD); closeErr != nil { // Wrap the close error with the original bind/listen error - s.logger.Error("Error occurred", slog.Any("error", err), "additionally, failed to close socket", slog.Any("close-err", closeErr)) + slog.Error("Error occurred", slog.Any("error", err), "additionally, failed to close socket", slog.Any("close-err", closeErr)) } else { - s.logger.Error("Error occurred", slog.Any("error", err)) + slog.Error("Error occurred", slog.Any("error", err)) } } }() @@ -155,16 +154,16 @@ func (s *Server) BindAndListen() error { } s.serverFD = serverFD - s.logger.Info("RESP Server successfully bound", slog.String("Host", s.Host), slog.Int("Port", s.Port)) + slog.Info("RESP Server successfully bound", slog.String("Host", s.Host), slog.Int("Port", s.Port)) return nil } // ReleasePort closes the server socket. func (s *Server) ReleasePort() { if err := syscall.Close(s.serverFD); err != nil { - s.logger.Error("Failed to close server socket", slog.Any("error", err)) + slog.Error("Failed to close server socket", slog.Any("error", err)) } else { - s.logger.Debug("Server socket closed successfully") + slog.Debug("Server socket closed successfully") } } @@ -173,7 +172,7 @@ func (s *Server) AcceptConnectionRequests(ctx context.Context, wg *sync.WaitGrou for { select { case <-ctx.Done(): - s.logger.Info("Context canceled, initiating RESP server shutdown") + slog.Info("Context canceled, initiating RESP server shutdown") return ctx.Err() default: @@ -187,19 +186,19 @@ func (s *Server) AcceptConnectionRequests(ctx context.Context, wg *sync.WaitGrou } // Register a new worker for the client - ioHandler, err := netconn.NewIOHandler(clientFD, s.logger) + ioHandler, err := netconn.NewIOHandler(clientFD) if err != nil { - s.logger.Error("Failed to create new IOHandler for clientFD", slog.Int("client-fd", clientFD), slog.Any("error", err)) + slog.Error("Failed to create new IOHandler for clientFD", slog.Int("client-fd", clientFD), slog.Any("error", err)) return err } - parser := respparser.NewParser(s.logger) + parser := respparser.NewParser() responseChan := make(chan *ops.StoreResponse) // responseChan is used for handling common responses from shards preprocessingChan := make(chan *ops.StoreResponse) // preprocessingChan is specifically for handling responses from shards for commands that require preprocessing wID := GenerateUniqueWorkerID() - w := worker.NewWorker(wID, responseChan, preprocessingChan, ioHandler, parser, s.shardManager, s.globalErrorChan, s.logger) + w := worker.NewWorker(wID, responseChan, preprocessingChan, ioHandler, parser, s.shardManager, s.globalErrorChan) // Register the worker with the worker manager err = s.workerManager.RegisterWorker(w) @@ -213,14 +212,14 @@ func (s *Server) AcceptConnectionRequests(ctx context.Context, wg *sync.WaitGrou defer func(wm *worker.WorkerManager, workerID string) { err := wm.UnregisterWorker(workerID) if err != nil { - s.logger.Warn("Failed to unregister worker", slog.String("worker-id", wID), slog.Any("error", err)) + slog.Warn("Failed to unregister worker", slog.String("worker-id", wID), slog.Any("error", err)) } }(s.workerManager, wID) wctx, cwctx := context.WithCancel(ctx) defer cwctx() err := w.Start(wctx) if err != nil { - s.logger.Debug("Worker stopped", slog.String("worker-id", wID), slog.Any("error", err)) + slog.Debug("Worker stopped", slog.String("worker-id", wID), slog.Any("error", err)) } }(wID) } diff --git a/internal/server/server.go b/internal/server/server.go index aee7e61d3..e0b55b097 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -5,7 +5,6 @@ import ( "context" "errors" "fmt" - "github.com/dicedb/dice/internal/server/abstractserver" "io" "log/slog" "net" @@ -14,6 +13,8 @@ import ( "syscall" "time" + "github.com/dicedb/dice/internal/server/abstractserver" + "github.com/dicedb/dice/config" "github.com/dicedb/dice/internal/auth" "github.com/dicedb/dice/internal/clientio" @@ -40,20 +41,18 @@ type AsyncServer struct { shardManager *shard.ShardManager ioChan chan *ops.StoreResponse // The server acts like a worker today, this behavior will change once IOThreads are introduced and each client gets its own worker. queryWatchChan chan dstore.QueryWatchEvent // This is needed to co-ordinate between the store and the query watcher. - logger *slog.Logger // logger is the logger for the server } // NewAsyncServer initializes a new AsyncServer -func NewAsyncServer(shardManager *shard.ShardManager, queryWatchChan chan dstore.QueryWatchEvent, logger *slog.Logger) *AsyncServer { +func NewAsyncServer(shardManager *shard.ShardManager, queryWatchChan chan dstore.QueryWatchEvent) *AsyncServer { return &AsyncServer{ maxClients: config.DiceConfig.Performance.MaxClients, connectedClients: make(map[int]*comm.Client), shardManager: shardManager, - queryWatcher: querymanager.NewQueryManager(logger), + queryWatcher: querymanager.NewQueryManager(), multiplexerPollTimeout: config.DiceConfig.Performance.MultiplexerPollTimeout, ioChan: make(chan *ops.StoreResponse, 1000), queryWatchChan: queryWatchChan, - logger: logger, } } @@ -78,7 +77,7 @@ func (s *AsyncServer) FindPortAndBind() (socketErr error) { defer func() { if socketErr != nil { if err := syscall.Close(serverFD); err != nil { - s.logger.Warn("failed to close server socket", slog.Any("error", err)) + slog.Warn("failed to close server socket", slog.Any("error", err)) } } }() @@ -104,7 +103,7 @@ func (s *AsyncServer) FindPortAndBind() (socketErr error) { }); err != nil { return err } - s.logger.Info( + slog.Info( "DiceDB server is running in a single-threaded mode", slog.String("version", config.DiceConfig.Version), slog.Int("port", config.DiceConfig.AsyncServer.Port), @@ -116,9 +115,9 @@ func (s *AsyncServer) FindPortAndBind() (socketErr error) { func (s *AsyncServer) ClosePort() { if s.serverFD != 0 { if err := syscall.Close(s.serverFD); err != nil { - s.logger.Warn("failed to close server socket", slog.Any("error", err)) + slog.Warn("failed to close server socket", slog.Any("error", err)) } else { - s.logger.Debug("Server socket closed successfully") + slog.Debug("Server socket closed successfully") } s.serverFD = 0 } @@ -134,7 +133,7 @@ func (s *AsyncServer) InitiateShutdown() { // Close all client connections for fd := range s.connectedClients { if err := syscall.Close(fd); err != nil { - s.logger.Warn("failed to close client connection", slog.Any("error", err)) + slog.Warn("failed to close client connection", slog.Any("error", err)) } delete(s.connectedClients, fd) } @@ -170,7 +169,7 @@ func (s *AsyncServer) Run(ctx context.Context) error { defer func() { if err := s.multiplexer.Close(); err != nil { - s.logger.Warn("failed to close multiplexer", slog.Any("error", err)) + slog.Warn("failed to close multiplexer", slog.Any("error", err)) } }() @@ -218,15 +217,15 @@ func (s *AsyncServer) eventLoop(ctx context.Context) error { for _, event := range events { if event.Fd == s.serverFD { if err := s.acceptConnection(); err != nil { - s.logger.Warn(err.Error()) + slog.Warn(err.Error()) } } else { if err := s.handleClientEvent(event); err != nil { if errors.Is(err, diceerrors.ErrAborted) { - s.logger.Debug("Received abort command, initiating graceful shutdown") + slog.Debug("Received abort command, initiating graceful shutdown") return err } else if !errors.Is(err, syscall.ECONNRESET) && !errors.Is(err, net.ErrClosed) { - s.logger.Warn(err.Error()) + slog.Warn(err.Error()) } } } @@ -263,7 +262,7 @@ func (s *AsyncServer) handleClientEvent(event iomultiplexer.Event) error { commands, hasAbort, err := readCommands(client) if err != nil { if err := syscall.Close(event.Fd); err != nil { - s.logger.Error("error closing client connection", slog.Any("error", err)) + slog.Error("error closing client connection", slog.Any("error", err)) } delete(s.connectedClients, event.Fd) return err @@ -417,7 +416,7 @@ func (s *AsyncServer) handleTransactionCommand(diceDBCmd *cmd.DiceDBCmd, c *comm case eval.DiscardCmdMeta.Name: s.discardTransaction(c, buf) default: - s.logger.Error( + slog.Error( "Unhandled transaction command", slog.String("command", diceDBCmd.Cmd), ) @@ -446,7 +445,7 @@ func (s *AsyncServer) executeTransaction(c *comm.Client, buf *bytes.Buffer) { cmds := c.Cqueue.Cmds _, err := fmt.Fprintf(buf, "*%d\r\n", len(cmds)) if err != nil { - s.logger.Error("Error writing to buffer", slog.Any("error", err)) + slog.Error("Error writing to buffer", slog.Any("error", err)) return } @@ -465,6 +464,6 @@ func (s *AsyncServer) discardTransaction(c *comm.Client, buf *bytes.Buffer) { func (s *AsyncServer) writeResponse(c *comm.Client, buf *bytes.Buffer) { if _, err := c.Write(buf.Bytes()); err != nil { - s.logger.Error(err.Error()) + slog.Error(err.Error()) } } diff --git a/internal/server/websocketServer.go b/internal/server/websocketServer.go index 5e665f17b..44f51120b 100644 --- a/internal/server/websocketServer.go +++ b/internal/server/websocketServer.go @@ -6,7 +6,6 @@ import ( "encoding/json" "errors" "fmt" - "github.com/dicedb/dice/internal/server/abstractserver" "log/slog" "net" "net/http" @@ -15,6 +14,8 @@ import ( "syscall" "time" + "github.com/dicedb/dice/internal/server/abstractserver" + "github.com/dicedb/dice/config" "github.com/dicedb/dice/internal/clientio" "github.com/dicedb/dice/internal/cmd" @@ -43,10 +44,9 @@ type WebsocketServer struct { upgrader websocket.Upgrader qwatchResponseChan chan comm.QwatchResponse shutdownChan chan struct{} - logger *slog.Logger } -func NewWebSocketServer(shardManager *shard.ShardManager, port int, logger *slog.Logger) *WebsocketServer { +func NewWebSocketServer(shardManager *shard.ShardManager, port int) *WebsocketServer { mux := http.NewServeMux() srv := &http.Server{ Addr: fmt.Sprintf(":%d", port), @@ -65,7 +65,6 @@ func NewWebSocketServer(shardManager *shard.ShardManager, port int, logger *slog upgrader: upgrader, qwatchResponseChan: make(chan comm.QwatchResponse), shutdownChan: make(chan struct{}), - logger: logger, } mux.HandleFunc("/", websocketServer.WebsocketHandler) @@ -88,12 +87,12 @@ func (s *WebsocketServer) Run(ctx context.Context) error { case <-ctx.Done(): case <-s.shutdownChan: err = diceerrors.ErrAborted - s.logger.Debug("Shutting down Websocket Server", slog.Any("time", time.Now())) + slog.Debug("Shutting down Websocket Server", slog.Any("time", time.Now())) } shutdownErr := s.websocketServer.Shutdown(websocketCtx) if shutdownErr != nil { - s.logger.Error("Websocket Server shutdown failed:", slog.Any("error", err)) + slog.Error("Websocket Server shutdown failed:", slog.Any("error", err)) return } }() @@ -101,10 +100,10 @@ func (s *WebsocketServer) Run(ctx context.Context) error { wg.Add(1) go func() { defer wg.Done() - s.logger.Info("Websocket Server running", slog.String("port", s.websocketServer.Addr[1:])) + slog.Info("Websocket Server running", slog.String("port", s.websocketServer.Addr[1:])) err = s.websocketServer.ListenAndServe() if err != nil { - s.logger.Debug("Error in Websocket Server", slog.Any("time", time.Now()), slog.Any("error", err)) + slog.Debug("Error in Websocket Server", slog.Any("time", time.Now()), slog.Any("error", err)) } }() @@ -133,7 +132,7 @@ func (s *WebsocketServer) WebsocketHandler(w http.ResponseWriter, r *http.Reques // acceptable close errors errs := []int{websocket.CloseNormalClosure, websocket.CloseGoingAway, websocket.CloseAbnormalClosure} if !websocket.IsCloseError(err, errs...) { - s.logger.Warn("failed to read message from client", slog.Any("error", err)) + slog.Warn("failed to read message from client", slog.Any("error", err)) } break } @@ -144,7 +143,7 @@ func (s *WebsocketServer) WebsocketHandler(w http.ResponseWriter, r *http.Reques continue } else if err != nil { if err := WriteResponseWithRetries(conn, []byte("error: parsing failed"), maxRetries); err != nil { - s.logger.Debug(fmt.Sprintf("Error writing message: %v", err)) + slog.Debug(fmt.Sprintf("Error writing message: %v", err)) } continue } @@ -157,7 +156,7 @@ func (s *WebsocketServer) WebsocketHandler(w http.ResponseWriter, r *http.Reques if unimplementedCommandsWebsocket[diceDBCmd.Cmd] { if err := WriteResponseWithRetries(conn, []byte("Command is not implemented with Websocket"), maxRetries); err != nil { - s.logger.Debug(fmt.Sprintf("Error writing message: %v", err)) + slog.Debug(fmt.Sprintf("Error writing message: %v", err)) } continue } @@ -193,7 +192,7 @@ func (s *WebsocketServer) processQwatchUpdates(clientIdentifierID uint32, conn * case resp := <-s.qwatchResponseChan: if resp.ClientIdentifierID == clientIdentifierID { if err := s.processResponse(conn, dicDBCmd, resp); err != nil { - s.logger.Debug("Error writing response to client. Shutting down goroutine for q.watch updates", slog.Any("clientIdentifierID", clientIdentifierID), slog.Any("error", err)) + slog.Debug("Error writing response to client. Shutting down goroutine for q.watch updates", slog.Any("clientIdentifierID", clientIdentifierID), slog.Any("error", err)) return } } @@ -217,9 +216,9 @@ func (s *WebsocketServer) processResponse(conn *websocket.Conn, diceDBCmd *cmd.D result = resp.EvalResponse.Result err = resp.EvalResponse.Error default: - s.logger.Debug("Unsupported response type") + slog.Debug("Unsupported response type") if err := WriteResponseWithRetries(conn, []byte("error: 500 Internal Server Error"), maxRetries); err != nil { - s.logger.Debug(fmt.Sprintf("Error writing message: %v", err)) + slog.Debug(fmt.Sprintf("Error writing message: %v", err)) return fmt.Errorf("error writing response: %v", err) } return nil @@ -249,9 +248,9 @@ func (s *WebsocketServer) processResponse(conn *websocket.Conn, diceDBCmd *cmd.D responseValue, err = rp.DecodeOne() if err != nil { - s.logger.Debug("Error decoding response", "error", err) + slog.Debug("Error decoding response", "error", err) if err := WriteResponseWithRetries(conn, []byte("error: 500 Internal Server Error"), maxRetries); err != nil { - s.logger.Debug(fmt.Sprintf("Error writing message: %v", err)) + slog.Debug(fmt.Sprintf("Error writing message: %v", err)) return fmt.Errorf("error writing response: %v", err) } return nil @@ -274,9 +273,9 @@ func (s *WebsocketServer) processResponse(conn *websocket.Conn, diceDBCmd *cmd.D respBytes, err := json.Marshal(responseValue) if err != nil { - s.logger.Debug("Error marshaling json", "error", err) + slog.Debug("Error marshaling json", "error", err) if err := WriteResponseWithRetries(conn, []byte("error: marshaling json"), maxRetries); err != nil { - s.logger.Debug(fmt.Sprintf("Error writing message: %v", err)) + slog.Debug(fmt.Sprintf("Error writing message: %v", err)) return fmt.Errorf("error writing response: %v", err) } return nil @@ -285,7 +284,7 @@ func (s *WebsocketServer) processResponse(conn *websocket.Conn, diceDBCmd *cmd.D // success // Write response with retries for transient errors if err := WriteResponseWithRetries(conn, respBytes, config.DiceConfig.WebSocket.MaxWriteResponseRetries); err != nil { - s.logger.Debug(fmt.Sprintf("Error writing message: %v", err)) + slog.Debug(fmt.Sprintf("Error writing message: %v", err)) return fmt.Errorf("error writing response: %v", err) } diff --git a/internal/shard/shard_manager.go b/internal/shard/shard_manager.go index e46078fe7..c94a13b4d 100644 --- a/internal/shard/shard_manager.go +++ b/internal/shard/shard_manager.go @@ -2,7 +2,6 @@ package shard import ( "context" - "log/slog" "os" "os/signal" "sync" @@ -26,14 +25,14 @@ type ShardManager struct { } // NewShardManager creates a new ShardManager instance with the given number of Shards and a parent context. -func NewShardManager(shardCount uint8, queryWatchChan chan dstore.QueryWatchEvent, cmdWatchChan chan dstore.CmdWatchEvent, globalErrorChan chan error, logger *slog.Logger) *ShardManager { +func NewShardManager(shardCount uint8, queryWatchChan chan dstore.QueryWatchEvent, cmdWatchChan chan dstore.CmdWatchEvent, globalErrorChan chan error) *ShardManager { shards := make([]*ShardThread, shardCount) shardReqMap := make(map[ShardID]chan *ops.StoreOp) shardErrorChan := make(chan *ShardError) for i := uint8(0); i < shardCount; i++ { // Shards are numbered from 0 to shardCount-1 - shard := NewShardThread(i, globalErrorChan, shardErrorChan, queryWatchChan, cmdWatchChan, logger) + shard := NewShardThread(i, globalErrorChan, shardErrorChan, queryWatchChan, cmdWatchChan) shards[i] = shard shardReqMap[i] = shard.ReqChan } diff --git a/internal/shard/shard_thread.go b/internal/shard/shard_thread.go index 7f0cf0fc9..ba8243f44 100644 --- a/internal/shard/shard_thread.go +++ b/internal/shard/shard_thread.go @@ -40,11 +40,10 @@ type ShardThread struct { shardErrorChan chan *ShardError // ShardErrorChan is the channel for sending shard-level errors. lastCronExecTime time.Time // lastCronExecTime is the last time the shard executed cron tasks. cronFrequency time.Duration // cronFrequency is the frequency at which the shard executes cron tasks. - logger *slog.Logger // logger is the logger for the shard. } // NewShardThread creates a new ShardThread instance with the given shard id and error channel. -func NewShardThread(id ShardID, gec chan error, sec chan *ShardError, queryWatchChan chan dstore.QueryWatchEvent, cmdWatchChan chan dstore.CmdWatchEvent, logger *slog.Logger) *ShardThread { +func NewShardThread(id ShardID, gec chan error, sec chan *ShardError, queryWatchChan chan dstore.QueryWatchEvent, cmdWatchChan chan dstore.CmdWatchEvent) *ShardThread { return &ShardThread{ id: id, store: dstore.NewStore(queryWatchChan, cmdWatchChan), @@ -54,7 +53,6 @@ func NewShardThread(id ShardID, gec chan error, sec chan *ShardError, queryWatch shardErrorChan: sec, lastCronExecTime: utils.GetCurrentTime(), cronFrequency: config.DiceConfig.Performance.ShardCronFrequency, - logger: logger, } } diff --git a/internal/watchmanager/watch_manager.go b/internal/watchmanager/watch_manager.go index 4ec22fb8c..571dc1f33 100644 --- a/internal/watchmanager/watch_manager.go +++ b/internal/watchmanager/watch_manager.go @@ -21,7 +21,6 @@ type ( querySubscriptionMap map[string]map[uint32]struct{} // querySubscriptionMap is a map of Key -> [fingerprint1, fingerprint2, ...] tcpSubscriptionMap map[uint32]map[chan *cmd.DiceDBCmd]struct{} // tcpSubscriptionMap is a map of fingerprint -> [client1Chan, client2Chan, ...] fingerprintCmdMap map[uint32]*cmd.DiceDBCmd // fingerprintCmdMap is a map of fingerprint -> DiceDBCmd - logger *slog.Logger } ) @@ -35,13 +34,12 @@ var ( } ) -func NewManager(logger *slog.Logger) *Manager { +func NewManager() *Manager { CmdWatchSubscriptionChan = make(chan WatchSubscription) return &Manager{ querySubscriptionMap: make(map[string]map[uint32]struct{}), tcpSubscriptionMap: make(map[uint32]map[chan *cmd.DiceDBCmd]struct{}), fingerprintCmdMap: make(map[uint32]*cmd.DiceDBCmd), - logger: logger, } } @@ -137,7 +135,7 @@ func (m *Manager) handleWatchEvent(event dstore.CmdWatchEvent) { affectedCommands, cmdExists := affectedCmdMap[event.Cmd] if !cmdExists { - m.logger.Error("Received a watch event for an unknown command type", + slog.Error("Received a watch event for an unknown command type", slog.String("cmd", event.Cmd)) return } @@ -159,7 +157,7 @@ func (m *Manager) handleWatchEvent(event dstore.CmdWatchEvent) { func (m *Manager) notifyClients(fingerprint uint32, diceDBCmd *cmd.DiceDBCmd) { clients, exists := m.tcpSubscriptionMap[fingerprint] if !exists { - m.logger.Warn("No clients found for fingerprint", + slog.Warn("No clients found for fingerprint", slog.Uint64("fingerprint", uint64(fingerprint))) return } diff --git a/internal/worker/cmd_decompose.go b/internal/worker/cmd_decompose.go index 7ed8b8f11..e5035dff2 100644 --- a/internal/worker/cmd_decompose.go +++ b/internal/worker/cmd_decompose.go @@ -27,7 +27,7 @@ func decomposeRename(ctx context.Context, w *BaseWorker, cd *cmd.DiceDBCmd) ([]* var val string select { case <-ctx.Done(): - w.logger.Error("Timed out waiting for response from shards", slog.String("workerID", w.id), slog.Any("error", ctx.Err())) + slog.Error("Timed out waiting for response from shards", slog.String("workerID", w.id), slog.Any("error", ctx.Err())) case preProcessedResp, ok := <-w.preprocessingChan: if ok { evalResp := preProcessedResp.EvalResponse @@ -66,7 +66,7 @@ func decomposeCopy(ctx context.Context, w *BaseWorker, cd *cmd.DiceDBCmd) ([]*cm var val string select { case <-ctx.Done(): - w.logger.Error("Timed out waiting for response from shards", slog.String("workerID", w.id), slog.Any("error", ctx.Err())) + slog.Error("Timed out waiting for response from shards", slog.String("workerID", w.id), slog.Any("error", ctx.Err())) case preProcessedResp, ok := <-w.preprocessingChan: if ok { evalResp := preProcessedResp.EvalResponse diff --git a/internal/worker/worker.go b/internal/worker/worker.go index 4e72a3179..107757ea2 100644 --- a/internal/worker/worker.go +++ b/internal/worker/worker.go @@ -45,15 +45,13 @@ type BaseWorker struct { adhocReqChan chan *cmd.DiceDBCmd Session *auth.Session globalErrorChan chan error - logger *slog.Logger responseChan chan *ops.StoreResponse preprocessingChan chan *ops.StoreResponse } func NewWorker(wid string, responseChan, preprocessingChan chan *ops.StoreResponse, ioHandler iohandler.IOHandler, parser requestparser.Parser, - shardManager *shard.ShardManager, gec chan error, - logger *slog.Logger) *BaseWorker { + shardManager *shard.ShardManager, gec chan error) *BaseWorker { return &BaseWorker{ id: wid, ioHandler: ioHandler, @@ -62,7 +60,6 @@ func NewWorker(wid string, responseChan, preprocessingChan chan *ops.StoreRespon globalErrorChan: gec, responseChan: responseChan, preprocessingChan: preprocessingChan, - logger: logger, Session: auth.NewSession(), adhocReqChan: make(chan *cmd.DiceDBCmd, config.DiceConfig.Performance.AdhocReqChanBufSize), } @@ -94,13 +91,13 @@ func (w *BaseWorker) Start(ctx context.Context) error { case <-ctx.Done(): err := w.Stop() if err != nil { - w.logger.Warn("Error stopping worker:", slog.String("workerID", w.id), slog.Any("error", err)) + slog.Warn("Error stopping worker:", slog.String("workerID", w.id), slog.Any("error", err)) } return ctx.Err() case err := <-errChan: if err != nil { if errors.Is(err, net.ErrClosed) || errors.Is(err, syscall.EPIPE) || errors.Is(err, syscall.ECONNRESET) { - w.logger.Debug("Connection closed for worker", slog.String("workerID", w.id), slog.Any("error", err)) + slog.Debug("Connection closed for worker", slog.String("workerID", w.id), slog.Any("error", err)) return err } } @@ -119,14 +116,14 @@ func (w *BaseWorker) Start(ctx context.Context) error { if err != nil { err = w.ioHandler.Write(ctx, err) if err != nil { - w.logger.Debug("Write error, connection closed possibly", slog.String("workerID", w.id), slog.Any("error", err)) + slog.Debug("Write error, connection closed possibly", slog.String("workerID", w.id), slog.Any("error", err)) return err } } if len(cmds) == 0 { err = w.ioHandler.Write(ctx, fmt.Errorf("ERR: Invalid request")) if err != nil { - w.logger.Debug("Write error, connection closed possibly", slog.String("workerID", w.id), slog.Any("error", err)) + slog.Debug("Write error, connection closed possibly", slog.String("workerID", w.id), slog.Any("error", err)) return err } continue @@ -137,7 +134,7 @@ func (w *BaseWorker) Start(ctx context.Context) error { if len(cmds) > 1 { err = w.ioHandler.Write(ctx, fmt.Errorf("ERR: Multiple commands not supported")) if err != nil { - w.logger.Debug("Write error, connection closed possibly", slog.String("workerID", w.id), slog.Any("error", err)) + slog.Debug("Write error, connection closed possibly", slog.String("workerID", w.id), slog.Any("error", err)) return err } } @@ -146,7 +143,7 @@ func (w *BaseWorker) Start(ctx context.Context) error { if err != nil { werr := w.ioHandler.Write(ctx, err) if werr != nil { - w.logger.Debug("Write error, connection closed possibly", slog.Any("error", errors.Join(err, werr))) + slog.Debug("Write error, connection closed possibly", slog.Any("error", errors.Join(err, werr))) return errors.Join(err, werr) } } @@ -157,7 +154,7 @@ func (w *BaseWorker) Start(ctx context.Context) error { w.executeCommandHandler(execCtx, errChan, cmds, false) }(errChan) case err := <-readErrChan: - w.logger.Debug("Read error, connection closed possibly", slog.String("workerID", w.id), slog.Any("error", err)) + slog.Debug("Read error, connection closed possibly", slog.String("workerID", w.id), slog.Any("error", err)) return err } } @@ -172,9 +169,9 @@ func (w *BaseWorker) executeCommandHandler(execCtx context.Context, errChan chan err := w.executeCommand(execCtx, cmds[0], isWatchNotification) if err != nil { - w.logger.Error("Error executing command", slog.String("workerID", w.id), slog.Any("error", err)) + slog.Error("Error executing command", slog.String("workerID", w.id), slog.Any("error", err)) if errors.Is(err, net.ErrClosed) || errors.Is(err, syscall.EPIPE) || errors.Is(err, syscall.ECONNRESET) || errors.Is(err, syscall.ETIMEDOUT) { - w.logger.Debug("Connection closed for worker", slog.String("workerID", w.id), slog.Any("error", err)) + slog.Debug("Connection closed for worker", slog.String("workerID", w.id), slog.Any("error", err)) errChan <- err } } @@ -196,7 +193,7 @@ func (w *BaseWorker) executeCommand(ctx context.Context, diceDBCmd *cmd.DiceDBCm case Global: // If it's a global command, process it immediately without involving any shards. err := w.ioHandler.Write(ctx, meta.WorkerCommandHandler(diceDBCmd.Args)) - w.logger.Debug("Error executing for worker", slog.String("workerID", w.id), slog.Any("error", err)) + slog.Debug("Error executing for worker", slog.String("workerID", w.id), slog.Any("error", err)) return err case SingleShard: @@ -210,7 +207,7 @@ func (w *BaseWorker) executeCommand(ctx context.Context, diceDBCmd *cmd.DiceDBCm if err != nil { workerErr := w.ioHandler.Write(ctx, err) if workerErr != nil { - w.logger.Debug("Error executing for worker", slog.String("workerID", w.id), slog.Any("error", workerErr)) + slog.Debug("Error executing for worker", slog.String("workerID", w.id), slog.Any("error", workerErr)) } return workerErr } @@ -221,15 +218,15 @@ func (w *BaseWorker) executeCommand(ctx context.Context, diceDBCmd *cmd.DiceDBCm case CmdAuth: err := w.ioHandler.Write(ctx, w.RespAuth(diceDBCmd.Args)) if err != nil { - w.logger.Error("Error sending auth response to worker", slog.String("workerID", w.id), slog.Any("error", err)) + slog.Error("Error sending auth response to worker", slog.String("workerID", w.id), slog.Any("error", err)) } return err case CmdAbort: err := w.ioHandler.Write(ctx, clientio.OK) if err != nil { - w.logger.Error("Error sending abort response to worker", slog.String("workerID", w.id), slog.Any("error", err)) + slog.Error("Error sending abort response to worker", slog.String("workerID", w.id), slog.Any("error", err)) } - w.logger.Info("Received ABORT command, initiating server shutdown", slog.String("workerID", w.id)) + slog.Info("Received ABORT command, initiating server shutdown", slog.String("workerID", w.id)) w.globalErrorChan <- diceerrors.ErrAborted return err default: @@ -316,7 +313,7 @@ func (w *BaseWorker) gather(ctx context.Context, diceDBCmd *cmd.DiceDBCmd, numCm for numCmds != 0 { select { case <-ctx.Done(): - w.logger.Error("Timed out waiting for response from shards", slog.String("workerID", w.id), slog.Any("error", ctx.Err())) + slog.Error("Timed out waiting for response from shards", slog.String("workerID", w.id), slog.Any("error", ctx.Err())) case resp, ok := <-w.responseChan: if ok { storeOp = append(storeOp, *resp) @@ -325,7 +322,7 @@ func (w *BaseWorker) gather(ctx context.Context, diceDBCmd *cmd.DiceDBCmd, numCm continue case sError, ok := <-w.shardManager.ShardErrorChan: if ok { - w.logger.Error("Error from shard", slog.String("workerID", w.id), slog.Any("error", sError)) + slog.Error("Error from shard", slog.String("workerID", w.id), slog.Any("error", sError)) } } } @@ -336,14 +333,14 @@ func (w *BaseWorker) gather(ctx context.Context, diceDBCmd *cmd.DiceDBCmd, numCm if storeOp[0].EvalResponse.Error != nil { err := w.ioHandler.Write(ctx, querymanager.GenericWatchResponse(diceDBCmd.Cmd, fmt.Sprintf("%d", diceDBCmd.GetFingerprint()), storeOp[0].EvalResponse.Error)) if err != nil { - w.logger.Debug("Error sending push response to client", slog.String("workerID", w.id), slog.Any("error", err)) + slog.Debug("Error sending push response to client", slog.String("workerID", w.id), slog.Any("error", err)) } return err } err := w.ioHandler.Write(ctx, querymanager.GenericWatchResponse(diceDBCmd.Cmd, fmt.Sprintf("%d", diceDBCmd.GetFingerprint()), storeOp[0].EvalResponse.Result)) if err != nil { - w.logger.Debug("Error sending push response to client", slog.String("workerID", w.id), slog.Any("error", err)) + slog.Debug("Error sending push response to client", slog.String("workerID", w.id), slog.Any("error", err)) return err } return nil // Exit after handling watch case @@ -354,14 +351,14 @@ func (w *BaseWorker) gather(ctx context.Context, diceDBCmd *cmd.DiceDBCmd, numCm if storeOp[0].EvalResponse.Error != nil { err := w.ioHandler.Write(ctx, storeOp[0].EvalResponse.Error) if err != nil { - w.logger.Debug("Error sending response to client", slog.String("workerID", w.id), slog.Any("error", err)) + slog.Debug("Error sending response to client", slog.String("workerID", w.id), slog.Any("error", err)) } return err } err := w.ioHandler.Write(ctx, storeOp[0].EvalResponse.Result) if err != nil { - w.logger.Debug("Error sending response to client", slog.String("workerID", w.id), slog.Any("error", err)) + slog.Debug("Error sending response to client", slog.String("workerID", w.id), slog.Any("error", err)) return err } } else { @@ -371,29 +368,29 @@ func (w *BaseWorker) gather(ctx context.Context, diceDBCmd *cmd.DiceDBCmd, numCm if storeOp[0].EvalResponse.Error != nil { err := w.ioHandler.Write(ctx, storeOp[0].EvalResponse.Error) if err != nil { - w.logger.Debug("Error sending response to client", slog.String("workerID", w.id), slog.Any("error", err)) + slog.Debug("Error sending response to client", slog.String("workerID", w.id), slog.Any("error", err)) } return err } err := w.ioHandler.Write(ctx, storeOp[0].EvalResponse.Result) if err != nil { - w.logger.Debug("Error sending response to client", slog.String("workerID", w.id), slog.Any("error", err)) + slog.Debug("Error sending response to client", slog.String("workerID", w.id), slog.Any("error", err)) return err } case MultiShard: err := w.ioHandler.Write(ctx, val.composeResponse(storeOp...)) if err != nil { - w.logger.Debug("Error sending response to client", slog.String("workerID", w.id), slog.Any("error", err)) + slog.Debug("Error sending response to client", slog.String("workerID", w.id), slog.Any("error", err)) return err } default: - w.logger.Error("Unknown command type", slog.String("workerID", w.id), slog.String("command", diceDBCmd.Cmd), slog.Any("evalResp", storeOp)) + slog.Error("Unknown command type", slog.String("workerID", w.id), slog.String("command", diceDBCmd.Cmd), slog.Any("evalResp", storeOp)) err := w.ioHandler.Write(ctx, diceerrors.ErrInternalServer) if err != nil { - w.logger.Debug("Error sending response to client", slog.String("workerID", w.id), slog.Any("error", err)) + slog.Debug("Error sending response to client", slog.String("workerID", w.id), slog.Any("error", err)) return err } } @@ -439,7 +436,7 @@ func (w *BaseWorker) RespAuth(args []string) interface{} { } func (w *BaseWorker) Stop() error { - w.logger.Info("Stopping worker", slog.String("workerID", w.id)) + slog.Info("Stopping worker", slog.String("workerID", w.id)) w.Session.Expire() return nil } diff --git a/main.go b/main.go index b36aae45a..acc6efc26 100644 --- a/main.go +++ b/main.go @@ -5,7 +5,6 @@ import ( "errors" "flag" "fmt" - "github.com/dicedb/dice/internal/server/abstractserver" "log/slog" "os" "os/signal" @@ -15,6 +14,8 @@ import ( "sync" "syscall" + "github.com/dicedb/dice/internal/server/abstractserver" + "github.com/dicedb/dice/config" diceerrors "github.com/dicedb/dice/internal/errors" "github.com/dicedb/dice/internal/logger" @@ -53,10 +54,8 @@ func init() { } func main() { - logr := logger.New(logger.Opts{WithTimestamp: true}) - slog.SetDefault(logr) - - go observability.Ping(logr) + slog.SetDefault(logger.New(logger.Opts{WithTimestamp: true})) + go observability.Ping() ctx, cancel := context.WithCancel(context.Background()) @@ -87,10 +86,10 @@ func main() { if config.NumShards > 0 { numCores = config.NumShards } - logr.Debug("The DiceDB server has started in multi-threaded mode.", slog.Int("number of cores", numCores)) + slog.Debug("The DiceDB server has started in multi-threaded mode.", slog.Int("number of cores", numCores)) } else { numCores = 1 - logr.Debug("The DiceDB server has started in single-threaded mode.") + slog.Debug("The DiceDB server has started in single-threaded mode.") } // The runtime.GOMAXPROCS(numCores) call limits the number of operating system @@ -100,7 +99,7 @@ func main() { runtime.GOMAXPROCS(numCores) // Initialize the ShardManager - shardManager := shard.NewShardManager(uint8(numCores), queryWatchChan, cmdWatchChan, serverErrCh, logr) + shardManager := shard.NewShardManager(uint8(numCores), queryWatchChan, cmdWatchChan, serverErrCh) wg := sync.WaitGroup{} @@ -114,36 +113,36 @@ func main() { if config.EnableMultiThreading { if config.EnableProfiling { - stopProfiling, err := startProfiling(logr) + stopProfiling, err := startProfiling() if err != nil { - logr.Error("Profiling could not be started", slog.Any("error", err)) + slog.Error("Profiling could not be started", slog.Any("error", err)) sigs <- syscall.SIGKILL } defer stopProfiling() } workerManager := worker.NewWorkerManager(config.DiceConfig.Performance.MaxClients, shardManager) - respServer := resp.NewServer(shardManager, workerManager, cmdWatchChan, serverErrCh, logr) + respServer := resp.NewServer(shardManager, workerManager, cmdWatchChan, serverErrCh) serverWg.Add(1) - go runServer(ctx, &serverWg, respServer, logr, serverErrCh) + go runServer(ctx, &serverWg, respServer, serverErrCh) } else { - asyncServer := server.NewAsyncServer(shardManager, queryWatchChan, logr) + asyncServer := server.NewAsyncServer(shardManager, queryWatchChan) if err := asyncServer.FindPortAndBind(); err != nil { - logr.Error("Error finding and binding port", slog.Any("error", err)) + slog.Error("Error finding and binding port", slog.Any("error", err)) sigs <- syscall.SIGKILL } serverWg.Add(1) - go runServer(ctx, &serverWg, asyncServer, logr, serverErrCh) + go runServer(ctx, &serverWg, asyncServer, serverErrCh) - httpServer := server.NewHTTPServer(shardManager, logr) + httpServer := server.NewHTTPServer(shardManager) serverWg.Add(1) - go runServer(ctx, &serverWg, httpServer, logr, serverErrCh) + go runServer(ctx, &serverWg, httpServer, serverErrCh) } - websocketServer := server.NewWebSocketServer(shardManager, config.WebsocketPort, logr) + websocketServer := server.NewWebSocketServer(shardManager, config.WebsocketPort) serverWg.Add(1) - go runServer(ctx, &serverWg, websocketServer, logr, serverErrCh) + go runServer(ctx, &serverWg, websocketServer, serverErrCh) wg.Add(1) go func() { @@ -169,26 +168,26 @@ func main() { cancel() wg.Wait() - logr.Debug("Server has shut down gracefully") + slog.Debug("Server has shut down gracefully") } -func runServer(ctx context.Context, wg *sync.WaitGroup, srv abstractserver.AbstractServer, logr *slog.Logger, errCh chan<- error) { +func runServer(ctx context.Context, wg *sync.WaitGroup, srv abstractserver.AbstractServer, errCh chan<- error) { defer wg.Done() if err := srv.Run(ctx); err != nil { switch { case errors.Is(err, context.Canceled): - logr.Debug(fmt.Sprintf("%T was canceled", srv)) + slog.Debug(fmt.Sprintf("%T was canceled", srv)) case errors.Is(err, diceerrors.ErrAborted): - logr.Debug(fmt.Sprintf("%T received abort command", srv)) + slog.Debug(fmt.Sprintf("%T received abort command", srv)) default: - logr.Error(fmt.Sprintf("%T error", srv), slog.Any("error", err)) + slog.Error(fmt.Sprintf("%T error", srv), slog.Any("error", err)) } errCh <- err } else { - logr.Debug(fmt.Sprintf("%T stopped without error", srv)) + slog.Debug(fmt.Sprintf("%T stopped without error", srv)) } } -func startProfiling(logr *slog.Logger) (func(), error) { +func startProfiling() (func(), error) { // Start CPU profiling cpuFile, err := os.Create("cpu.prof") if err != nil { @@ -239,7 +238,7 @@ func startProfiling(logr *slog.Logger) (func(), error) { // Write heap profile runtime.GC() if err := pprof.WriteHeapProfile(memFile); err != nil { - logr.Warn("could not write memory profile", slog.Any("error", err)) + slog.Warn("could not write memory profile", slog.Any("error", err)) } memFile.Close() @@ -247,10 +246,10 @@ func startProfiling(logr *slog.Logger) (func(), error) { // Write block profile blockFile, err := os.Create("block.prof") if err != nil { - logr.Warn("could not create block profile", slog.Any("error", err)) + slog.Warn("could not create block profile", slog.Any("error", err)) } else { if err := pprof.Lookup("block").WriteTo(blockFile, 0); err != nil { - logr.Warn("could not write block profile", slog.Any("error", err)) + slog.Warn("could not write block profile", slog.Any("error", err)) } blockFile.Close() } From 9c6476665c776708a8ccf0652b4ade017ebef0b9 Mon Sep 17 00:00:00 2001 From: Arpit Bhayani Date: Tue, 22 Oct 2024 14:41:22 +0530 Subject: [PATCH 02/12] Configuring numShards vs setting MAXPROCS to numCores --- main.go | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/main.go b/main.go index acc6efc26..8dc740ecc 100644 --- a/main.go +++ b/main.go @@ -54,7 +54,7 @@ func init() { } func main() { - slog.SetDefault(logger.New(logger.Opts{WithTimestamp: true})) + slog.SetDefault(logger.New(logger.Opts{})) go observability.Ping() ctx, cancel := context.WithCancel(context.Background()) @@ -80,26 +80,26 @@ func main() { // for parallel execution. Setting the maximum number of CPUs to the available // core count ensures the application can make full use of all available hardware. // If multithreading is not enabled, server will run on a single core. - var numCores int + var numShards int if config.EnableMultiThreading { - numCores = runtime.NumCPU() + numShards = runtime.NumCPU() if config.NumShards > 0 { - numCores = config.NumShards + numShards = config.NumShards } - slog.Debug("The DiceDB server has started in multi-threaded mode.", slog.Int("number of cores", numCores)) + slog.Debug("The DiceDB server has started in multi-threaded mode.", slog.Int("number of cores", numShards)) } else { - numCores = 1 + numShards = 1 slog.Debug("The DiceDB server has started in single-threaded mode.") } - // The runtime.GOMAXPROCS(numCores) call limits the number of operating system + // The runtime.GOMAXPROCS(numShards) call limits the number of operating system // threads that can execute Go code simultaneously to the number of CPU cores. // This enables Go to run more efficiently, maximizing CPU utilization and // improving concurrency performance across multiple goroutines. - runtime.GOMAXPROCS(numCores) + runtime.GOMAXPROCS(runtime.NumCPU()) // Initialize the ShardManager - shardManager := shard.NewShardManager(uint8(numCores), queryWatchChan, cmdWatchChan, serverErrCh) + shardManager := shard.NewShardManager(uint8(numShards), queryWatchChan, cmdWatchChan, serverErrCh) wg := sync.WaitGroup{} From c0d706977b8880a124df5a7ce7b5dfcb18fb0e68 Mon Sep 17 00:00:00 2001 From: Arpit Bhayani Date: Tue, 22 Oct 2024 15:13:33 +0530 Subject: [PATCH 03/12] Once logger instance across DiceDB --- integration_tests/commands/async/main_test.go | 9 +------ integration_tests/commands/async/setup.go | 1 - integration_tests/commands/http/main_test.go | 7 +----- integration_tests/commands/resp/main_test.go | 9 +------ integration_tests/commands/resp/setup.go | 2 +- .../commands/websocket/main_test.go | 5 ---- internal/auth/session.go | 3 ++- internal/eval/main_test.go | 5 ---- internal/logger/logger.go | 24 +++++++------------ internal/server/httpServer.go | 2 +- internal/server/server.go | 8 ++----- internal/server/websocketServer.go | 4 ++-- internal/worker/cmd_meta.go | 6 ++--- main.go | 11 +++++---- 14 files changed, 28 insertions(+), 68 deletions(-) diff --git a/integration_tests/commands/async/main_test.go b/integration_tests/commands/async/main_test.go index 0184b5e0e..eb42ca9f1 100644 --- a/integration_tests/commands/async/main_test.go +++ b/integration_tests/commands/async/main_test.go @@ -2,19 +2,13 @@ package async import ( "context" - "log/slog" "os" "sync" "testing" "time" - - "github.com/dicedb/dice/internal/logger" ) func TestMain(m *testing.M) { - l := logger.New(logger.Opts{WithTimestamp: false}) - slog.SetDefault(l) - var wg sync.WaitGroup // Run the test server @@ -22,8 +16,7 @@ func TestMain(m *testing.M) { // checks for available port and then forks a goroutine // to start the server opts := TestServerOptions{ - Port: 8739, - Logger: l, + Port: 8739, } RunTestServer(context.Background(), &wg, opts) diff --git a/integration_tests/commands/async/setup.go b/integration_tests/commands/async/setup.go index 3b2664a7f..cfdcaebdc 100644 --- a/integration_tests/commands/async/setup.go +++ b/integration_tests/commands/async/setup.go @@ -23,7 +23,6 @@ import ( type TestServerOptions struct { Port int - Logger *slog.Logger MaxClients int32 } diff --git a/integration_tests/commands/http/main_test.go b/integration_tests/commands/http/main_test.go index 678b7c2bd..0e480a37e 100644 --- a/integration_tests/commands/http/main_test.go +++ b/integration_tests/commands/http/main_test.go @@ -2,8 +2,6 @@ package http import ( "context" - "github.com/dicedb/dice/internal/logger" - "log/slog" "os" "sync" "testing" @@ -11,8 +9,6 @@ import ( ) func TestMain(m *testing.M) { - l := logger.New(logger.Opts{WithTimestamp: false}) - slog.SetDefault(l) var wg sync.WaitGroup // Run the test server @@ -20,8 +16,7 @@ func TestMain(m *testing.M) { // checks for available port and then forks a goroutine // to start the server opts := TestServerOptions{ - Port: 8083, - Logger: l, + Port: 8083, } ctx, cancel := context.WithCancel(context.Background()) RunHTTPServer(ctx, &wg, opts) diff --git a/integration_tests/commands/resp/main_test.go b/integration_tests/commands/resp/main_test.go index abca6ed69..69184d45c 100644 --- a/integration_tests/commands/resp/main_test.go +++ b/integration_tests/commands/resp/main_test.go @@ -1,27 +1,20 @@ package resp import ( - "log/slog" "os" "sync" "testing" "time" - - "github.com/dicedb/dice/internal/logger" ) func TestMain(m *testing.M) { - logger := logger.New(logger.Opts{WithTimestamp: false}) - slog.SetDefault(logger) - var wg sync.WaitGroup // Run the test server // This is a synchronous method, because internally it // checks for available port and then forks a goroutine // to start the server opts := TestServerOptions{ - Port: 9739, - Logger: logger, + Port: 9739, } RunTestServer(&wg, opts) diff --git a/integration_tests/commands/resp/setup.go b/integration_tests/commands/resp/setup.go index b4da6b0b2..47642cba3 100644 --- a/integration_tests/commands/resp/setup.go +++ b/integration_tests/commands/resp/setup.go @@ -112,7 +112,7 @@ func fireCommandAndGetRESPParser(conn net.Conn, cmd string) *clientio.RESPParser } func RunTestServer(wg *sync.WaitGroup, opt TestServerOptions) { - slog.SetDefault(logger.New(logger.Opts{WithTimestamp: true})) + slog.SetDefault(logger.New()) config.DiceConfig.Network.IOBufferLength = 16 config.DiceConfig.Persistence.WriteAOFOnCleanup = false if opt.Port != 0 { diff --git a/integration_tests/commands/websocket/main_test.go b/integration_tests/commands/websocket/main_test.go index cc330a19d..1365b012a 100644 --- a/integration_tests/commands/websocket/main_test.go +++ b/integration_tests/commands/websocket/main_test.go @@ -2,18 +2,13 @@ package websocket import ( "context" - "log/slog" "os" "sync" "testing" "time" - - "github.com/dicedb/dice/internal/logger" ) func TestMain(m *testing.M) { - l := logger.New(logger.Opts{WithTimestamp: false}) - slog.SetDefault(l) var wg sync.WaitGroup // Run the test server diff --git a/internal/auth/session.go b/internal/auth/session.go index 52c18c6be..02f13c506 100644 --- a/internal/auth/session.go +++ b/internal/auth/session.go @@ -81,7 +81,8 @@ func (user *User) SetPassword(password string) (err error) { hashedPassword []byte ) if password == utils.EmptyStr { - slog.Warn("DiceDB is running without authentication. Consider setting a password.") + // TODO: add link to documentation on how to do it. + slog.Warn("running without authentication, consider setting a password") } if hashedPassword, err = bcrypt.GenerateFromPassword([]byte(password), bcrypt.DefaultCost); err != nil { diff --git a/internal/eval/main_test.go b/internal/eval/main_test.go index 04d622233..8c9fe4a80 100644 --- a/internal/eval/main_test.go +++ b/internal/eval/main_test.go @@ -1,18 +1,13 @@ package eval_test import ( - "log/slog" "os" "testing" - "github.com/dicedb/dice/internal/logger" dstore "github.com/dicedb/dice/internal/store" ) func TestMain(m *testing.M) { - l := logger.New(logger.Opts{WithTimestamp: false}) - slog.SetDefault(l) - store := dstore.NewStore(nil, nil) store.ResetStore() diff --git a/internal/logger/logger.go b/internal/logger/logger.go index d2f73e9e7..b70e00cc9 100644 --- a/internal/logger/logger.go +++ b/internal/logger/logger.go @@ -1,9 +1,9 @@ package logger import ( - "io" "log/slog" "os" + "time" "github.com/dicedb/dice/config" "github.com/rs/zerolog" @@ -24,18 +24,12 @@ func getLogLevel() slog.Leveler { return level } -type Opts struct { - WithTimestamp bool -} - -func New(opts Opts) *slog.Logger { - var writer io.Writer = os.Stderr - if config.DiceConfig.Logging.PrettyPrintLogs { - writer = zerolog.ConsoleWriter{Out: os.Stderr, NoColor: true} - } - zerologLogger := zerolog.New(writer). - Level(mapLevel(getLogLevel().Level())). - With().Timestamp().Logger() - logger := slog.New(newZerologHandler(&zerologLogger)) - return logger +func New() *slog.Logger { + zerolog.TimeFieldFormat = zerolog.TimeFormatUnix + zerologLogger := zerolog.New(zerolog.ConsoleWriter{ + Out: os.Stderr, + NoColor: true, + TimeFormat: time.RFC3339, + }).Level(mapLevel(getLogLevel().Level())).With().Timestamp().Logger() + return slog.New(newZerologHandler(&zerologLogger)) } diff --git a/internal/server/httpServer.go b/internal/server/httpServer.go index 52b4b6317..e86395418 100644 --- a/internal/server/httpServer.go +++ b/internal/server/httpServer.go @@ -120,7 +120,7 @@ func (s *HTTPServer) Run(ctx context.Context) error { wg.Add(1) go func() { defer wg.Done() - slog.Info("HTTP Server running", slog.String("addr", s.httpServer.Addr)) + slog.Info("also listenting HTTP on", slog.String("port", s.httpServer.Addr[1:])) err = s.httpServer.ListenAndServe() }() diff --git a/internal/server/server.go b/internal/server/server.go index e0b55b097..f616dc298 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -103,11 +103,7 @@ func (s *AsyncServer) FindPortAndBind() (socketErr error) { }); err != nil { return err } - slog.Info( - "DiceDB server is running in a single-threaded mode", - slog.String("version", config.DiceConfig.Version), - slog.Int("port", config.DiceConfig.AsyncServer.Port), - ) + slog.Info("ready to serve the requests") return nil } @@ -117,7 +113,7 @@ func (s *AsyncServer) ClosePort() { if err := syscall.Close(s.serverFD); err != nil { slog.Warn("failed to close server socket", slog.Any("error", err)) } else { - slog.Debug("Server socket closed successfully") + slog.Debug("server socket closed successfully") } s.serverFD = 0 } diff --git a/internal/server/websocketServer.go b/internal/server/websocketServer.go index 44f51120b..2f5a2fdb9 100644 --- a/internal/server/websocketServer.go +++ b/internal/server/websocketServer.go @@ -100,10 +100,10 @@ func (s *WebsocketServer) Run(ctx context.Context) error { wg.Add(1) go func() { defer wg.Done() - slog.Info("Websocket Server running", slog.String("port", s.websocketServer.Addr[1:])) + slog.Info("also listenting WebSocket on", slog.String("port", s.websocketServer.Addr[1:])) err = s.websocketServer.ListenAndServe() if err != nil { - slog.Debug("Error in Websocket Server", slog.Any("time", time.Now()), slog.Any("error", err)) + slog.Debug("Error in Websocket Server", slog.Any("error", err)) } }() diff --git a/internal/worker/cmd_meta.go b/internal/worker/cmd_meta.go index 57aec4ddf..9be05a82b 100644 --- a/internal/worker/cmd_meta.go +++ b/internal/worker/cmd_meta.go @@ -3,10 +3,10 @@ package worker import ( "context" "fmt" + "log/slog" "github.com/dicedb/dice/internal/cmd" "github.com/dicedb/dice/internal/eval" - "github.com/dicedb/dice/internal/logger" "github.com/dicedb/dice/internal/ops" ) @@ -241,11 +241,9 @@ var CommandsMeta = map[string]CmdMeta{ } func init() { - l := logger.New(logger.Opts{WithTimestamp: true}) - // Validate the metadata for each command for c, meta := range CommandsMeta { if err := validateCmdMeta(c, meta); err != nil { - l.Error("error validating worker command metadata %s: %v", c, err) + slog.Error("error validating worker command metadata %s: %v", c, err) } } } diff --git a/main.go b/main.go index 8dc740ecc..845704450 100644 --- a/main.go +++ b/main.go @@ -14,11 +14,11 @@ import ( "sync" "syscall" + "github.com/dicedb/dice/internal/logger" "github.com/dicedb/dice/internal/server/abstractserver" "github.com/dicedb/dice/config" diceerrors "github.com/dicedb/dice/internal/errors" - "github.com/dicedb/dice/internal/logger" "github.com/dicedb/dice/internal/observability" "github.com/dicedb/dice/internal/server" "github.com/dicedb/dice/internal/server/resp" @@ -30,9 +30,9 @@ import ( func init() { flag.StringVar(&config.Host, "host", "0.0.0.0", "host for the dicedb server") flag.IntVar(&config.Port, "port", 7379, "port for the dicedb server") + flag.IntVar(&config.HTTPPort, "http-port", 7380, "HTTP port for the dicedb server") flag.BoolVar(&config.EnableHTTP, "enable-http", true, "run server in HTTP mode as well") flag.BoolVar(&config.EnableMultiThreading, "enable-multithreading", false, "run server in multithreading mode") - flag.IntVar(&config.HTTPPort, "http-port", 8082, "HTTP port for the dicedb server") flag.IntVar(&config.WebsocketPort, "websocket-port", 8379, "Websocket port for the dicedb server") flag.IntVar(&config.NumShards, "num-shards", -1, "number of shards to create. default = number of cores") flag.StringVar(&config.RequirePass, "requirepass", config.RequirePass, "enable authentication for the default user") @@ -51,10 +51,11 @@ func init() { iid := observability.GetOrCreateInstanceID() config.DiceConfig.InstanceID = iid + + slog.SetDefault(logger.New()) } func main() { - slog.SetDefault(logger.New(logger.Opts{})) go observability.Ping() ctx, cancel := context.WithCancel(context.Background()) @@ -86,10 +87,10 @@ func main() { if config.NumShards > 0 { numShards = config.NumShards } - slog.Debug("The DiceDB server has started in multi-threaded mode.", slog.Int("number of cores", numShards)) + slog.Info("running with", slog.String("mode", "multi-threaded"), slog.Int("num shards", numShards)) } else { numShards = 1 - slog.Debug("The DiceDB server has started in single-threaded mode.") + slog.Info("running with", slog.String("mode", "single-threaded")) } // The runtime.GOMAXPROCS(numShards) call limits the number of operating system From a0bc1a824e3f860d757f67e92a77c96968a3269a Mon Sep 17 00:00:00 2001 From: Arpit Bhayani Date: Tue, 22 Oct 2024 15:29:42 +0530 Subject: [PATCH 04/12] Better and clear logging messages --- internal/server/resp/server.go | 3 +-- internal/server/server.go | 8 ++------ main.go | 36 +++++++++++++++++++++++----------- 3 files changed, 28 insertions(+), 19 deletions(-) diff --git a/internal/server/resp/server.go b/internal/server/resp/server.go index 77c848d59..719fa8b3b 100644 --- a/internal/server/resp/server.go +++ b/internal/server/resp/server.go @@ -92,7 +92,7 @@ func (s *Server) Run(ctx context.Context) (err error) { } }(wg) - slog.Info("DiceDB ready to accept connections on port", slog.Int("resp-port", config.Port)) + slog.Info("ready to accept and serve requests on", slog.Int("port", config.Port)) select { case <-ctx.Done(): @@ -154,7 +154,6 @@ func (s *Server) BindAndListen() error { } s.serverFD = serverFD - slog.Info("RESP Server successfully bound", slog.String("Host", s.Host), slog.Int("Port", s.Port)) return nil } diff --git a/internal/server/server.go b/internal/server/server.go index f616dc298..2e07933e3 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -97,14 +97,10 @@ func (s *AsyncServer) FindPortAndBind() (socketErr error) { return diceerrors.ErrInvalidIPAddress } - if err := syscall.Bind(serverFD, &syscall.SockaddrInet4{ + return syscall.Bind(serverFD, &syscall.SockaddrInet4{ Port: config.DiceConfig.AsyncServer.Port, Addr: [4]byte{ip4[0], ip4[1], ip4[2], ip4[3]}, - }); err != nil { - return err - } - slog.Info("ready to serve the requests") - return nil + }) } // ClosePort ensures the server socket is closed properly. diff --git a/main.go b/main.go index 845704450..75631d293 100644 --- a/main.go +++ b/main.go @@ -28,22 +28,28 @@ import ( ) func init() { - flag.StringVar(&config.Host, "host", "0.0.0.0", "host for the dicedb server") - flag.IntVar(&config.Port, "port", 7379, "port for the dicedb server") - flag.IntVar(&config.HTTPPort, "http-port", 7380, "HTTP port for the dicedb server") - flag.BoolVar(&config.EnableHTTP, "enable-http", true, "run server in HTTP mode as well") - flag.BoolVar(&config.EnableMultiThreading, "enable-multithreading", false, "run server in multithreading mode") - flag.IntVar(&config.WebsocketPort, "websocket-port", 8379, "Websocket port for the dicedb server") - flag.IntVar(&config.NumShards, "num-shards", -1, "number of shards to create. default = number of cores") + flag.StringVar(&config.Host, "host", "0.0.0.0", "host for the DiceDB server") + + flag.IntVar(&config.Port, "port", 7379, "port for the DiceDB server") + flag.IntVar(&config.HTTPPort, "http-port", 7380, "port for the HTTP variant of DiceDB server") + flag.IntVar(&config.WebsocketPort, "websocket-port", 7381, "Websocket port for the DiceDB server") + + flag.BoolVar(&config.EnableHTTP, "enable-http", true, "enable DiceDB to listen, accept, and process HTTP") + + flag.BoolVar(&config.EnableMultiThreading, "enable-multithreading", false, "enable multithreading execution and leverage multiple CPU cores") + flag.IntVar(&config.NumShards, "num-shards", -1, "number shards to create. defaults to number of cores") + + flag.BoolVar(&config.EnableWatch, "enable-watch", false, "enable support for .WATCH commands and real-time reactivity") + + flag.BoolVar(&config.EnableProfiling, "enable-profiling", false, "enable profiling and capture critical metrics and traces in .prof files") + flag.StringVar(&config.RequirePass, "requirepass", config.RequirePass, "enable authentication for the default user") flag.StringVar(&config.CustomConfigFilePath, "o", config.CustomConfigFilePath, "dir path to create the config file") flag.StringVar(&config.FileLocation, "c", config.FileLocation, "file path of the config file") flag.BoolVar(&config.InitConfigCmd, "init-config", false, "initialize a new config file") - flag.IntVar(&config.KeysLimit, "keys-limit", config.KeysLimit, "keys limit for the dicedb server. "+ + flag.IntVar(&config.KeysLimit, "keys-limit", config.KeysLimit, "keys limit for the DiceDB server. "+ "This flag controls the number of keys each shard holds at startup. You can multiply this number with the "+ "total number of shard threads to estimate how much memory will be required at system start up.") - flag.BoolVar(&config.EnableProfiling, "enable-profiling", false, "enable profiling for the dicedb server") - flag.BoolVar(&config.EnableWatch, "enable-watch", false, "enable reactivity features which power the .WATCH commands") flag.Parse() @@ -56,6 +62,14 @@ func init() { } func main() { + slog.Info("starting DiceDB ...") + slog.Info("running with", slog.Int("port", config.Port)) + slog.Info("running with", slog.Bool("enable-watch", config.EnableWatch)) + + if config.EnableProfiling { + slog.Info("running with", slog.Bool("enable-profiling", config.EnableProfiling)) + } + go observability.Ping() ctx, cancel := context.WithCancel(context.Background()) @@ -87,7 +101,7 @@ func main() { if config.NumShards > 0 { numShards = config.NumShards } - slog.Info("running with", slog.String("mode", "multi-threaded"), slog.Int("num shards", numShards)) + slog.Info("running with", slog.String("mode", "multi-threaded"), slog.Int("num-shards", numShards)) } else { numShards = 1 slog.Info("running with", slog.String("mode", "single-threaded")) From a63a38022adcd8eae7db743d7a28e22b36a5d928 Mon Sep 17 00:00:00 2001 From: Arpit Bhayani Date: Tue, 22 Oct 2024 15:45:27 +0530 Subject: [PATCH 05/12] Logging made consistent across start and shutdown --- internal/server/resp/server.go | 10 ++++------ internal/server/server.go | 2 -- internal/server/websocketServer.go | 4 ++-- internal/shard/shard_thread.go | 3 --- main.go | 6 ++++-- 5 files changed, 10 insertions(+), 15 deletions(-) diff --git a/internal/server/resp/server.go b/internal/server/resp/server.go index 719fa8b3b..7818e0152 100644 --- a/internal/server/resp/server.go +++ b/internal/server/resp/server.go @@ -96,15 +96,15 @@ func (s *Server) Run(ctx context.Context) (err error) { select { case <-ctx.Done(): - slog.Info("Context canceled, initiating shutdown") + slog.Info("initiating shutdown") case err = <-errChan: - slog.Error("Error while accepting connections, initiating shutdown", slog.Any("error", err)) + slog.Error("error while accepting connections, initiating shutdown", slog.Any("error", err)) } s.Shutdown() wg.Wait() // Wait for the go routines to finish - slog.Info("All connections are closed, RESP server exiting gracefully.") + slog.Info("exiting gracefully") return err } @@ -161,8 +161,6 @@ func (s *Server) BindAndListen() error { func (s *Server) ReleasePort() { if err := syscall.Close(s.serverFD); err != nil { slog.Error("Failed to close server socket", slog.Any("error", err)) - } else { - slog.Debug("Server socket closed successfully") } } @@ -171,7 +169,7 @@ func (s *Server) AcceptConnectionRequests(ctx context.Context, wg *sync.WaitGrou for { select { case <-ctx.Done(): - slog.Info("Context canceled, initiating RESP server shutdown") + slog.Info("no new connections will be accepted") return ctx.Err() default: diff --git a/internal/server/server.go b/internal/server/server.go index 2e07933e3..db67af221 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -108,8 +108,6 @@ func (s *AsyncServer) ClosePort() { if s.serverFD != 0 { if err := syscall.Close(s.serverFD); err != nil { slog.Warn("failed to close server socket", slog.Any("error", err)) - } else { - slog.Debug("server socket closed successfully") } s.serverFD = 0 } diff --git a/internal/server/websocketServer.go b/internal/server/websocketServer.go index 2f5a2fdb9..2f5895001 100644 --- a/internal/server/websocketServer.go +++ b/internal/server/websocketServer.go @@ -102,8 +102,8 @@ func (s *WebsocketServer) Run(ctx context.Context) error { defer wg.Done() slog.Info("also listenting WebSocket on", slog.String("port", s.websocketServer.Addr[1:])) err = s.websocketServer.ListenAndServe() - if err != nil { - slog.Debug("Error in Websocket Server", slog.Any("error", err)) + if err != nil && !errors.Is(err, http.ErrServerClosed) { + slog.Error("error while listenting on WebSocket", slog.Any("error", err)) } }() diff --git a/internal/shard/shard_thread.go b/internal/shard/shard_thread.go index ba8243f44..1e7b5be26 100644 --- a/internal/shard/shard_thread.go +++ b/internal/shard/shard_thread.go @@ -6,8 +6,6 @@ import ( "sync" "time" - "log/slog" - "github.com/dicedb/dice/config" diceerrors "github.com/dicedb/dice/internal/errors" "github.com/dicedb/dice/internal/eval" @@ -132,7 +130,6 @@ func (shard *ShardThread) processRequest(op *ops.StoreOp) { func (shard *ShardThread) cleanup() { close(shard.ReqChan) if !config.DiceConfig.Persistence.WriteAOFOnCleanup { - slog.Info("Skipping AOF dump.") return } diff --git a/main.go b/main.go index 75631d293..604a18b6d 100644 --- a/main.go +++ b/main.go @@ -6,6 +6,7 @@ import ( "flag" "fmt" "log/slog" + "net/http" "os" "os/signal" "runtime" @@ -183,7 +184,6 @@ func main() { cancel() wg.Wait() - slog.Debug("Server has shut down gracefully") } func runServer(ctx context.Context, wg *sync.WaitGroup, srv abstractserver.AbstractServer, errCh chan<- error) { @@ -194,12 +194,14 @@ func runServer(ctx context.Context, wg *sync.WaitGroup, srv abstractserver.Abstr slog.Debug(fmt.Sprintf("%T was canceled", srv)) case errors.Is(err, diceerrors.ErrAborted): slog.Debug(fmt.Sprintf("%T received abort command", srv)) + case errors.Is(err, http.ErrServerClosed): + return default: slog.Error(fmt.Sprintf("%T error", srv), slog.Any("error", err)) } errCh <- err } else { - slog.Debug(fmt.Sprintf("%T stopped without error", srv)) + slog.Debug("bye.") } } func startProfiling() (func(), error) { From e429dc827b0ad84b25f05eb93ca7bf9c83a4317a Mon Sep 17 00:00:00 2001 From: Arpit Bhayani Date: Tue, 22 Oct 2024 15:53:16 +0530 Subject: [PATCH 06/12] Putting HTTP and WS server behind a flag --- main.go | 26 ++++++++++++++++---------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/main.go b/main.go index 604a18b6d..49d89072e 100644 --- a/main.go +++ b/main.go @@ -32,10 +32,12 @@ func init() { flag.StringVar(&config.Host, "host", "0.0.0.0", "host for the DiceDB server") flag.IntVar(&config.Port, "port", 7379, "port for the DiceDB server") - flag.IntVar(&config.HTTPPort, "http-port", 7380, "port for the HTTP variant of DiceDB server") - flag.IntVar(&config.WebsocketPort, "websocket-port", 7381, "Websocket port for the DiceDB server") - flag.BoolVar(&config.EnableHTTP, "enable-http", true, "enable DiceDB to listen, accept, and process HTTP") + flag.IntVar(&config.HTTPPort, "http-port", 7380, "port for accepting requets over HTTP") + flag.BoolVar(&config.EnableHTTP, "enable-http", false, "enable DiceDB to listen, accept, and process HTTP") + + flag.IntVar(&config.WebsocketPort, "websocket-port", 7381, "port for accepting requets over WebSocket") + flag.BoolVar(&config.EnableWebsocket, "enable-websocket", false, "enable DiceDB to listen, accept, and process WebSocket") flag.BoolVar(&config.EnableMultiThreading, "enable-multithreading", false, "enable multithreading execution and leverage multiple CPU cores") flag.IntVar(&config.NumShards, "num-shards", -1, "number shards to create. defaults to number of cores") @@ -151,14 +153,18 @@ func main() { serverWg.Add(1) go runServer(ctx, &serverWg, asyncServer, serverErrCh) - httpServer := server.NewHTTPServer(shardManager) - serverWg.Add(1) - go runServer(ctx, &serverWg, httpServer, serverErrCh) + if config.EnableHTTP { + httpServer := server.NewHTTPServer(shardManager) + serverWg.Add(1) + go runServer(ctx, &serverWg, httpServer, serverErrCh) + } } - websocketServer := server.NewWebSocketServer(shardManager, config.WebsocketPort) - serverWg.Add(1) - go runServer(ctx, &serverWg, websocketServer, serverErrCh) + if config.EnableWebsocket { + websocketServer := server.NewWebSocketServer(shardManager, config.WebsocketPort) + serverWg.Add(1) + go runServer(ctx, &serverWg, websocketServer, serverErrCh) + } wg.Add(1) go func() { @@ -195,7 +201,7 @@ func runServer(ctx context.Context, wg *sync.WaitGroup, srv abstractserver.Abstr case errors.Is(err, diceerrors.ErrAborted): slog.Debug(fmt.Sprintf("%T received abort command", srv)) case errors.Is(err, http.ErrServerClosed): - return + slog.Debug(fmt.Sprintf("%T received abort command", srv)) default: slog.Error(fmt.Sprintf("%T error", srv), slog.Any("error", err)) } From c5a3281f0ab19821a30cd66310b0ed59eae00fb6 Mon Sep 17 00:00:00 2001 From: Arpit Bhayani Date: Tue, 22 Oct 2024 16:00:13 +0530 Subject: [PATCH 07/12] Unittests for regular logger --- .../clientio/iohandler/netconn/netconn_resp_test.go | 4 ---- internal/clientio/iohandler/netconn/netconn_test.go | 10 ++-------- .../clientio/requestparser/resp/respparser_test.go | 6 +----- 3 files changed, 3 insertions(+), 17 deletions(-) diff --git a/internal/clientio/iohandler/netconn/netconn_resp_test.go b/internal/clientio/iohandler/netconn/netconn_resp_test.go index 920dea768..9b9f4224b 100644 --- a/internal/clientio/iohandler/netconn/netconn_resp_test.go +++ b/internal/clientio/iohandler/netconn/netconn_resp_test.go @@ -4,13 +4,10 @@ import ( "bufio" "context" "errors" - "log/slog" "strings" "testing" "time" - "github.com/dicedb/dice/mocks" - "github.com/stretchr/testify/assert" ) @@ -168,7 +165,6 @@ func TestNetConnIOHandler_RESP(t *testing.T) { conn: mock, reader: bufio.NewReaderSize(mock, 512), writer: bufio.NewWriterSize(mock, 1024), - logger: slog.New(mocks.SlogNoopHandler{}), } ctx := context.Background() diff --git a/internal/clientio/iohandler/netconn/netconn_test.go b/internal/clientio/iohandler/netconn/netconn_test.go index a9a5b5b72..92cfc475e 100644 --- a/internal/clientio/iohandler/netconn/netconn_test.go +++ b/internal/clientio/iohandler/netconn/netconn_test.go @@ -6,15 +6,12 @@ import ( "context" "errors" "io" - "log/slog" "net" "os" "sync" "testing" "time" - "github.com/dicedb/dice/mocks" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -131,7 +128,6 @@ func TestNetConnIOHandler(t *testing.T) { conn: mock, reader: bufio.NewReaderSize(mock, 512), writer: bufio.NewWriterSize(mock, 1024), - logger: slog.New(mocks.SlogNoopHandler{}), } ctx := context.Background() @@ -196,8 +192,7 @@ func TestNewNetConnIOHandler(t *testing.T) { require.NoError(t, err, "Setup failed") defer cleanup() - logger := slog.New(mocks.SlogNoopHandler{}) - handler, err := NewIOHandler(fd, logger) + handler, err := NewIOHandler(fd) if tt.expectedErr != nil { assert.Error(t, err) @@ -255,8 +250,7 @@ func TestNewNetConnIOHandler_RealNetwork(t *testing.T) { // More of an integrati fd := int(file.Fd()) - logger := slog.New(mocks.SlogNoopHandler{}) - handler, err := NewIOHandler(fd, logger) + handler, err := NewIOHandler(fd) require.NoError(t, err, "Failed to create IOHandler") testData := []byte("Hello, World!") diff --git a/internal/clientio/requestparser/resp/respparser_test.go b/internal/clientio/requestparser/resp/respparser_test.go index 7d1178d69..bfe553017 100644 --- a/internal/clientio/requestparser/resp/respparser_test.go +++ b/internal/clientio/requestparser/resp/respparser_test.go @@ -1,12 +1,9 @@ package respparser import ( - "log/slog" "reflect" "testing" - "github.com/dicedb/dice/mocks" - "github.com/dicedb/dice/internal/cmd" ) @@ -127,8 +124,7 @@ func TestParser_Parse(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - l := slog.New(mocks.SlogNoopHandler{}) - p := NewParser(l) + p := NewParser() got, err := p.Parse([]byte(tt.input)) if (err != nil) != tt.wantErr { t.Errorf("Parser.Parse() error = %v, wantErr %v", err, tt.wantErr) From d6bb9f4e705d82e8b59721bb574d668d14353942 Mon Sep 17 00:00:00 2001 From: Arpit Bhayani Date: Tue, 22 Oct 2024 17:12:08 +0530 Subject: [PATCH 08/12] Log Level option added with default being info --- Makefile | 2 +- config/config.go | 8 ++++---- dice.toml | 4 ---- internal/logger/logger.go | 18 ++++++++---------- internal/logger/zerolog.go | 8 ++++---- main.go | 3 ++- 6 files changed, 19 insertions(+), 24 deletions(-) diff --git a/Makefile b/Makefile index 09d33edca..e27ed63c7 100644 --- a/Makefile +++ b/Makefile @@ -10,7 +10,7 @@ PORT ?= 7379 #Port for dicedb GOOS ?= $(shell go env GOOS) GOARCH ?= $(shell go env GOARCH) -VERSION=$(shell bash -c 'grep -oP "DiceVersion string = \"\K[^\"]+" config/config.go') +VERSION=$(shell bash -c 'grep -oP "DiceDBVersion string = \"\K[^\"]+" config/config.go') .PHONY: build test build-docker run test-one diff --git a/config/config.go b/config/config.go index ea56d67dc..9809c135f 100644 --- a/config/config.go +++ b/config/config.go @@ -15,7 +15,7 @@ import ( ) const ( - DiceVersion string = "0.0.5" + DiceDBVersion string = "0.0.5" DefaultHost string = "0.0.0.0" DefaultPort int = 7379 @@ -125,7 +125,7 @@ type Config struct { // Default configurations for internal use var baseConfig = Config{ - Version: DiceVersion, + Version: DiceDBVersion, AsyncServer: struct { Addr string `mapstructure:"addr"` Port int `mapstructure:"port"` @@ -201,7 +201,7 @@ var baseConfig = Config{ LogLevel string `mapstructure:"loglevel"` PrettyPrintLogs bool `mapstructure:"prettyprintlogs"` }{ - LogLevel: "debug", + LogLevel: "info", PrettyPrintLogs: true, }, Auth: struct { @@ -225,7 +225,7 @@ var defaultConfig Config func init() { config := baseConfig config.Logging.PrettyPrintLogs = false - config.Logging.LogLevel = "debug" + config.Logging.LogLevel = "info" defaultConfig = config } diff --git a/dice.toml b/dice.toml index 815813428..89d7a3575 100644 --- a/dice.toml +++ b/dice.toml @@ -44,10 +44,6 @@ AOFFile = './dice-master.aof' PersistenceEnabled = true WriteAOFOnCleanup = false -[Logging] -LogLevel = 'debug' -PrettyPrintLogs = true - [Auth] UserName = 'dice' Password = '' diff --git a/internal/logger/logger.go b/internal/logger/logger.go index b70e00cc9..52ad30987 100644 --- a/internal/logger/logger.go +++ b/internal/logger/logger.go @@ -1,6 +1,7 @@ package logger import ( + "fmt" "log/slog" "os" "time" @@ -9,19 +10,16 @@ import ( "github.com/rs/zerolog" ) -func getLogLevel() slog.Leveler { - var level slog.Leveler +func getSLogLevel() slog.Level { + fmt.Println("slog level", config.DiceConfig.Logging.LogLevel) switch config.DiceConfig.Logging.LogLevel { case "debug": - level = slog.LevelDebug - case "warn": - level = slog.LevelWarn - case "error": - level = slog.LevelError + return slog.LevelDebug + case "info": + return slog.LevelInfo default: - level = slog.LevelInfo + return slog.LevelInfo } - return level } func New() *slog.Logger { @@ -30,6 +28,6 @@ func New() *slog.Logger { Out: os.Stderr, NoColor: true, TimeFormat: time.RFC3339, - }).Level(mapLevel(getLogLevel().Level())).With().Timestamp().Logger() + }).Level(toZerologLevel(getSLogLevel())).With().Timestamp().Logger() return slog.New(newZerologHandler(&zerologLogger)) } diff --git a/internal/logger/zerolog.go b/internal/logger/zerolog.go index 0c839d520..90d71864f 100644 --- a/internal/logger/zerolog.go +++ b/internal/logger/zerolog.go @@ -24,7 +24,7 @@ func newZerologHandler(logger *zerolog.Logger) *ZerologHandler { // //nolint:gocritic // The slog.Record struct triggers hugeParam, but we don't control the interface (it's a standard library one) func (h *ZerologHandler) Handle(_ context.Context, record slog.Record) error { - event := h.logger.WithLevel(mapLevel(record.Level)) + event := h.logger.WithLevel(toZerologLevel(record.Level)) record.Attrs(func(attr slog.Attr) bool { addAttrToZerolog(attr, event) return true @@ -35,7 +35,7 @@ func (h *ZerologHandler) Handle(_ context.Context, record slog.Record) error { // Enabled implements the slog.Handler interface func (h *ZerologHandler) Enabled(_ context.Context, level slog.Level) bool { - return h.logger.GetLevel() <= mapLevel(level) + return h.logger.GetLevel() <= toZerologLevel(level) } // WithAttrs adds attributes to the log event @@ -104,8 +104,8 @@ func addAttrToZerolog[T interface { } } -// mapLevel maps slog levels to zerolog levels -func mapLevel(level slog.Level) zerolog.Level { +// toZerologLevel maps slog levels to zerolog levels +func toZerologLevel(level slog.Level) zerolog.Level { switch { case level == slog.LevelDebug: return zerolog.DebugLevel diff --git a/main.go b/main.go index 49d89072e..f12119d2b 100644 --- a/main.go +++ b/main.go @@ -43,9 +43,10 @@ func init() { flag.IntVar(&config.NumShards, "num-shards", -1, "number shards to create. defaults to number of cores") flag.BoolVar(&config.EnableWatch, "enable-watch", false, "enable support for .WATCH commands and real-time reactivity") - flag.BoolVar(&config.EnableProfiling, "enable-profiling", false, "enable profiling and capture critical metrics and traces in .prof files") + flag.StringVar(&config.DiceConfig.Logging.LogLevel, "log-level", "info", "log level, values: info, debug") + flag.StringVar(&config.RequirePass, "requirepass", config.RequirePass, "enable authentication for the default user") flag.StringVar(&config.CustomConfigFilePath, "o", config.CustomConfigFilePath, "dir path to create the config file") flag.StringVar(&config.FileLocation, "c", config.FileLocation, "file path of the config file") From 583812db25f7bb774fe5282e5c1e87af277d72b4 Mon Sep 17 00:00:00 2001 From: Arpit Bhayani Date: Tue, 22 Oct 2024 17:15:27 +0530 Subject: [PATCH 09/12] Removed multi-threading experiemental mode from the README --- README.md | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/README.md b/README.md index b20f5a634..6dce91dde 100644 --- a/README.md +++ b/README.md @@ -45,18 +45,6 @@ to it using DiceDB CLI and SDKs, or even Redis CLIs and SDKs. > Note: Given it is a drop-in replacement of Redis, you can also use any Redis CLI and SDK to connect to DiceDB. -### Multi-Threading Mode (Experimental) - -Multi-threading is currently under active development. To run the server with multi-threading enabled, follow these steps: - -```bash -git clone https://github.com/dicedb/dice -cd dice -go run main.go --enable-multithreading --enable-watch -``` - -**Note:** Only the following commands are optimized for multithreaded execution: `PING, AUTH, SET, GET, GETSET, ABORT` - ### Setting up DiceDB from source for development and contributions To run DiceDB for local development or running from source, you will need From 940a5f430d386a599afd34e143b500916a967649 Mon Sep 17 00:00:00 2001 From: Arpit Bhayani Date: Tue, 22 Oct 2024 17:26:50 +0530 Subject: [PATCH 10/12] Logger options fixed with Integration Tests --- integration_tests/server/max_conn_test.go | 1 - integration_tests/server/server_abort_test.go | 7 +++---- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/integration_tests/server/max_conn_test.go b/integration_tests/server/max_conn_test.go index 550f93cab..d8d667ccc 100644 --- a/integration_tests/server/max_conn_test.go +++ b/integration_tests/server/max_conn_test.go @@ -27,7 +27,6 @@ func TestMaxConnection(t *testing.T) { var maxConnTestOptions = commands.TestServerOptions{ Port: 8741, MaxClients: 50, - Logger: slog.Default(), } commands.RunTestServer(context.Background(), &wg, maxConnTestOptions) diff --git a/integration_tests/server/server_abort_test.go b/integration_tests/server/server_abort_test.go index fa52c6b14..f4c98ac4f 100644 --- a/integration_tests/server/server_abort_test.go +++ b/integration_tests/server/server_abort_test.go @@ -15,8 +15,7 @@ import ( ) var testServerOptions = commands.TestServerOptions{ - Port: 8740, - Logger: slog.Default(), + Port: 8740, } func TestAbortCommand(t *testing.T) { @@ -100,11 +99,11 @@ func TestServerRestartAfterAbort(t *testing.T) { // wait for the server to shut down time.Sleep(2 * time.Second) - testServerOptions.Logger.Info("Wait completed for server shutdown") + slog.Info("Wait completed for server shutdown") wg.Wait() - testServerOptions.Logger.Info("Restarting server after abort for server_abort_test") + slog.Info("Restarting server after abort for server_abort_test") // restart server ctx2, cancel2 := context.WithCancel(context.Background()) t.Cleanup(cancel2) From 23cd715aa00d4aa9f2c83134b1d5a77c93ca94d1 Mon Sep 17 00:00:00 2001 From: Arpit Bhayani Date: Tue, 22 Oct 2024 17:34:25 +0530 Subject: [PATCH 11/12] DiceDB ASCII Art --- internal/logger/logger.go | 2 -- main.go | 11 ++++++++++- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/internal/logger/logger.go b/internal/logger/logger.go index 52ad30987..7ba1eecc9 100644 --- a/internal/logger/logger.go +++ b/internal/logger/logger.go @@ -1,7 +1,6 @@ package logger import ( - "fmt" "log/slog" "os" "time" @@ -11,7 +10,6 @@ import ( ) func getSLogLevel() slog.Level { - fmt.Println("slog level", config.DiceConfig.Logging.LogLevel) switch config.DiceConfig.Logging.LogLevel { case "debug": return slog.LevelDebug diff --git a/main.go b/main.go index f12119d2b..11c99dd5a 100644 --- a/main.go +++ b/main.go @@ -66,7 +66,16 @@ func init() { } func main() { - slog.Info("starting DiceDB ...") + fmt.Print(` +██████╗ ██╗ ██████╗███████╗██████╗ ██████╗ +██╔══██╗██║██╔════╝██╔════╝██╔══██╗██╔══██╗ +██║ ██║██║██║ █████╗ ██║ ██║██████╔╝ +██║ ██║██║██║ ██╔══╝ ██║ ██║██╔══██╗ +██████╔╝██║╚██████╗███████╗██████╔╝██████╔╝ +╚═════╝ ╚═╝ ╚═════╝╚══════╝╚═════╝ ╚═════╝ + +`) + slog.Info("starting DiceDB", slog.String("version", config.DiceDBVersion)) slog.Info("running with", slog.Int("port", config.Port)) slog.Info("running with", slog.Bool("enable-watch", config.EnableWatch)) From cace85f8c092b327875755e57e28ac61fc8ed7b7 Mon Sep 17 00:00:00 2001 From: Arpit Bhayani Date: Tue, 22 Oct 2024 18:03:01 +0530 Subject: [PATCH 12/12] Integration Test Websocket Logger Error --- integration_tests/commands/websocket/main_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/integration_tests/commands/websocket/main_test.go b/integration_tests/commands/websocket/main_test.go index 1365b012a..1c4ac68bb 100644 --- a/integration_tests/commands/websocket/main_test.go +++ b/integration_tests/commands/websocket/main_test.go @@ -16,8 +16,7 @@ func TestMain(m *testing.M) { // checks for available port and then forks a goroutine // to start the server opts := TestServerOptions{ - Port: testPort1, - Logger: l, + Port: testPort1, } ctx, cancel := context.WithCancel(context.Background()) RunWebsocketServer(ctx, &wg, opts)