diff --git a/config/config.go b/config/config.go index 98d1fc64f..25c0659ba 100644 --- a/config/config.go +++ b/config/config.go @@ -66,7 +66,7 @@ memory.keys_limit = 200000000 memory.lfu_log_factor = 10 # Persistence Configuration -persistence.enabled = true +persistence.enabled = false persistence.aof_file = "./dice-master.aof" persistence.persistence_enabled = true persistence.write_aof_on_cleanup = false @@ -84,7 +84,22 @@ auth.password = "" # Network Configuration network.io_buffer_length = 512 -network.io_buffer_length_max = 51200` +network.io_buffer_length_max = 51200 + +# WAL Configuration +LogDir = "tmp/dicedb-wal" +Enabled = "true" +WalMode = "buffered" +WriteMode = "default" +BufferSizeMB = 1 +RotationMode = "segemnt-size" +MaxSegmentSizeMB = 16 +MaxSegmentRotationTime = 60s +BufferSyncInterval = 200ms +RetentionMode = "num-segments" +MaxSegmentCount = 10 +MaxSegmentRetentionDuration = 600s +RecoveryMode = "strict"` ) var ( @@ -104,6 +119,7 @@ type Config struct { Persistence persistence `config:"persistence"` Logging logging `config:"logging"` Network network `config:"network"` + WAL WALConfig `config:"WAL"` } type auth struct { @@ -147,7 +163,7 @@ type memory struct { MaxMemory int64 `config:"max_memory" default:"0" validate:"min=0"` EvictionPolicy string `config:"eviction_policy" default:"allkeys-lfu" validate:"oneof=simple-first allkeys-random allkeys-lru allkeys-lfu"` EvictionRatio float64 `config:"eviction_ratio" default:"0.9" validate:"min=0,lte=1"` - KeysLimit int `config:"keys_limit" default:"200000000" validate:"min=0"` + KeysLimit int `config:"keys_limit" default:"200000000" validate:"min=10"` LFULogFactor int `config:"lfu_log_factor" default:"10" validate:"min=0"` } @@ -155,11 +171,39 @@ type persistence struct { Enabled bool `config:"enabled" default:"false"` AOFFile string `config:"aof_file" default:"./dice-master.aof" validate:"filepath"` WriteAOFOnCleanup bool `config:"write_aof_on_cleanup" default:"false"` - WALDir string `config:"wal-dir" default:"./" validate:"dirpath"` RestoreFromWAL bool `config:"restore-wal" default:"false"` WALEngine string `config:"wal-engine" default:"aof" validate:"oneof=sqlite aof"` } +type WALConfig struct { + // Directory where WAL log files will be stored + LogDir string `config:"log_dir" default:"tmp/dicedb-wal"` + // Whether WAL is enabled + Enabled bool `config:"enabled" default:"true"` + // WAL buffering mode: 'buffered' (writes buffered in memory) or 'unbuffered' (immediate disk writes) + WalMode string `config:"wal_mode" default:"buffered" validate:"oneof=buffered unbuffered"` + // Write mode: 'default' (OS handles syncing) or 'fsync' (explicit fsync after writes) + WriteMode string `config:"write_mode" default:"default" validate:"oneof=default fsync"` + // Size of the write buffer in megabytes + BufferSizeMB int `config:"buffer_size_mb" default:"1" validate:"min=1"` + // How WAL rotation is triggered: 'segment-size' (based on file size) or 'time' (based on duration) + RotationMode string `config:"rotation_mode" default:"segemnt-size" validate:"oneof=segment-size time"` + // Maximum size of a WAL segment file in megabytes before rotation + MaxSegmentSizeMB int `config:"max_segment_size_mb" default:"16" validate:"min=1"` + // Time interval in seconds after which WAL segment is rotated when using time-based rotation + MaxSegmentRotationTime time.Duration `config:"max_segment_rotation_time" default:"60s" validate:"min=1s"` + // Time interval in Milliseconds after which buffered WAL data is synced to disk + BufferSyncInterval time.Duration `config:"buffer_sync_interval" default:"200ms" validate:"min=1ms"` + // How old segments are removed: 'num-segments' (keep N latest), 'time' (by age), or 'checkpoint' (after checkpoint) + RetentionMode string `config:"retention_mode" default:"num-segments" validate:"oneof=num-segments time checkpoint"` + // Maximum number of WAL segment files to retain when using num-segments retention + MaxSegmentCount int `config:"max_segment_count" default:"10" validate:"min=1"` + // Time interval in Seconds till which WAL segments are retained when using time-based retention + MaxSegmentRetentionDuration time.Duration `config:"max_segment_retention_duration" default:"600s" validate:"min=1s"` + // How to handle WAL corruption on recovery: 'strict' (fail), 'truncate' (truncate at corruption), 'ignore' (skip corrupted) + RecoveryMode string `config:"recovery_mode" default:"strict" validate:"oneof=strict truncate ignore"` +} + type logging struct { LogLevel string `config:"log_level" default:"info" validate:"oneof=debug info warn error"` LogDir string `config:"log_dir" default:"/tmp/dicedb" validate:"dirpath"` diff --git a/config/validator.go b/config/validator.go index 66669ff43..9564777a9 100644 --- a/config/validator.go +++ b/config/validator.go @@ -12,6 +12,7 @@ import ( func validateConfig(config *Config) error { validate := validator.New() validate.RegisterStructValidation(validateShardCount, Config{}) + validate.RegisterStructValidation(validateWALConfig, Config{}) if err := validate.Struct(config); err != nil { validationErrors, ok := err.(validator.ValidationErrors) @@ -95,3 +96,36 @@ func applyDefaultValuesFromTags(config *Config, fieldName string) error { log.Printf("Setting default value for %s to: %s", fieldName, defaultValue) return nil } + +func validateWALConfig(sl validator.StructLevel) { + config := sl.Current().Interface().(Config) + + // LogDir validation + if config.WAL.LogDir == "" { + sl.ReportError(config.WAL.LogDir, "LogDir", "LogDir", "required", "cannot be empty") + } + + // MaxSegmentSize validation + if config.WAL.MaxSegmentSizeMB <= 0 { + sl.ReportError(config.WAL.MaxSegmentSizeMB, "MaxSegmentSize", "MaxSegmentSize", "gt", "must be greater than 0") + } + + // MaxSegmentCount validation + if config.WAL.MaxSegmentCount <= 0 { + sl.ReportError(config.WAL.MaxSegmentCount, "MaxSegmentCount", "MaxSegmentCount", "gt", "must be greater than 0") + } + + // BufferSize validation + if config.WAL.BufferSizeMB <= 0 { + sl.ReportError(config.WAL.BufferSizeMB, "BufferSize", "BufferSize", "gt", "must be greater than 0") + } + + // WALMode and WriteMode compatibility checks + if config.WAL.WalMode == "buffered" && config.WAL.WriteMode == "fsync" { + sl.ReportError(config.WAL.WalMode, "WALMode", "WALMode", "incompatible", "walMode 'buffered' cannot be used with writeMode 'fsync'") + } + + if config.WAL.WalMode == "unbuffered" && config.WAL.WriteMode == "default" { + sl.ReportError(config.WAL.WalMode, "WALMode", "WALMode", "incompatible", "walMode 'unbuffered' cannot have writeMode as 'default'") + } +} diff --git a/dicedb.conf b/dicedb.conf index 070f3c91d..af4759337 100644 --- a/dicedb.conf +++ b/dicedb.conf @@ -39,7 +39,7 @@ memory.keys_limit = 200000000 memory.lfu_log_factor = 10 # Persistence Configuration -persistence.enabled = true +persistence.enabled = false persistence.aof_file = "./dice-master.aof" persistence.persistence_enabled = true persistence.write_aof_on_cleanup = false @@ -56,4 +56,19 @@ auth.password = "" # Network Configuration network.io_buffer_length = 512 -network.io_buffer_length_max = 51200 \ No newline at end of file +network.io_buffer_length_max = 51200 + +# WAL Configuration +LogDir = "tmp/dicedb-wal" +Enabled = "true" +WalMode = "buffered" +WriteMode = "default" +BufferSizeMB = 1 +RotationMode = "segemnt-size" +MaxSegmentSizeMB = 16 +MaxSegmentRotationTime = 60s +BufferSyncInterval = 200ms +RetentionMode = "num-segments" +MaxSegmentCount = 10 +MaxSegmentRetentionDuration = 600s +RecoveryMode = "strict" \ No newline at end of file diff --git a/internal/iothread/iothread.go b/internal/iothread/iothread.go index 6aab5120f..3111a18c7 100644 --- a/internal/iothread/iothread.go +++ b/internal/iothread/iothread.go @@ -7,6 +7,7 @@ import ( "log/slog" "net" "strconv" + "strings" "sync/atomic" "syscall" "time" @@ -563,13 +564,17 @@ func (t *BaseIOThread) handleCommand(ctx context.Context, cmdMeta CmdMeta, diceD } if err == nil && t.wl != nil { - t.wl.LogCommand(diceDBCmd) + if err := t.wl.LogCommand([]byte(fmt.Sprintf("%s %s", diceDBCmd.Cmd, strings.Join(diceDBCmd.Args, " ")))); err != nil { + return err + } } case MultiShard, AllShard: err = t.writeResponse(ctx, cmdMeta.composeResponse(storeOp...)) if err == nil && t.wl != nil { - t.wl.LogCommand(diceDBCmd) + if err := t.wl.LogCommand([]byte(fmt.Sprintf("%s %s", diceDBCmd.Cmd, strings.Join(diceDBCmd.Args, " ")))); err != nil { + return err + } } default: slog.Error("Unknown command type", diff --git a/internal/wal/gen_wal_proto.sh b/internal/wal/gen_wal_proto.sh new file mode 100755 index 000000000..228ec0db1 --- /dev/null +++ b/internal/wal/gen_wal_proto.sh @@ -0,0 +1,50 @@ +#!/bin/bash + +# Exit immediately if a command exits with a non-zero status +set -e + +# Define the directory where the proto file resides +PROTO_DIR=$(dirname "$0") + +# Define the proto file and the output file +PROTO_FILE="$PROTO_DIR/wal.proto" +OUTPUT_DIR="." + +# Function to install protoc if not present +install_protoc() { + echo "protoc not found. Installing Protocol Buffers compiler..." + if [[ "$(uname)" == "Darwin" ]]; then + # MacOS installation + brew install protobuf + elif [[ "$(uname)" == "Linux" ]]; then + # Linux installation + sudo apt update && sudo apt install -y protobuf-compiler + else + echo "Unsupported OS. Please install 'protoc' manually." + exit 1 + fi +} + +# Function to install the Go plugin for protoc if not present +install_protoc_gen_go() { + echo "protoc-gen-go not found. Installing Go plugin for protoc..." + go install google.golang.org/protobuf/cmd/protoc-gen-go@latest + export PATH="$PATH:$(go env GOPATH)/bin" + echo "Make sure $(go env GOPATH)/bin is in your PATH." +} + +# Check if protoc is installed, install if necessary +if ! command -v protoc &>/dev/null; then + install_protoc +fi + +# Check if the Go plugin for protoc is installed, install if necessary +if ! command -v protoc-gen-go &>/dev/null; then + install_protoc_gen_go +fi + +# Generate the wal.pb.go file +echo "Generating wal.pb.go from wal.proto..." +protoc --go_out="$OUTPUT_DIR" --go_opt=paths=source_relative "$PROTO_FILE" + +echo "Generation complete. File created at $OUTPUT_DIR/wal.pb.go" diff --git a/internal/wal/wal.go b/internal/wal/wal.go index ca251560e..32c5e3174 100644 --- a/internal/wal/wal.go +++ b/internal/wal/wal.go @@ -10,7 +10,7 @@ import ( ) type AbstractWAL interface { - LogCommand(c *cmd.DiceDBCmd) + LogCommand([]byte) error Close() error Init(t time.Time) error ForEachCommand(f func(c cmd.DiceDBCmd) error) error diff --git a/internal/wal/wal.pb.go b/internal/wal/wal.pb.go index 4d82bcec8..617c6d9ca 100644 --- a/internal/wal/wal.pb.go +++ b/internal/wal/wal.pb.go @@ -1,8 +1,8 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.35.2 -// protoc v3.21.12 -// source: wal.proto +// protoc v5.28.3 +// source: internal/wal/wal.proto package wal @@ -20,31 +20,33 @@ const ( _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) ) -// WALLogEntry represents a single log entry in the WAL. -type WALLogEntry struct { +type WALEntry struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Checksum []byte `protobuf:"bytes,1,opt,name=checksum,proto3,oneof" json:"checksum,omitempty"` // SHA-256 checksum of the command for integrity - Command *string `protobuf:"bytes,2,opt,name=command,proto3,oneof" json:"command,omitempty"` // Command + Version string `protobuf:"bytes,1,opt,name=version,proto3" json:"version,omitempty"` // Version of the WAL entry (e.g., "v1.0") + LogSequenceNumber uint64 `protobuf:"varint,2,opt,name=log_sequence_number,json=logSequenceNumber,proto3" json:"log_sequence_number,omitempty"` // Log Sequence Number (LSN) + Data []byte `protobuf:"bytes,3,opt,name=data,proto3" json:"data,omitempty"` // The actual data being logged + Crc32 uint32 `protobuf:"varint,4,opt,name=crc32,proto3" json:"crc32,omitempty"` // Cyclic Redundancy Check for integrity + Timestamp int64 `protobuf:"varint,5,opt,name=timestamp,proto3" json:"timestamp,omitempty"` // Timestamp for the WAL entry (epoch time in nanoseconds) } -func (x *WALLogEntry) Reset() { - *x = WALLogEntry{} - mi := &file_wal_proto_msgTypes[0] +func (x *WALEntry) Reset() { + *x = WALEntry{} + mi := &file_internal_wal_wal_proto_msgTypes[0] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } -func (x *WALLogEntry) String() string { +func (x *WALEntry) String() string { return protoimpl.X.MessageStringOf(x) } -func (*WALLogEntry) ProtoMessage() {} +func (*WALEntry) ProtoMessage() {} -func (x *WALLogEntry) ProtoReflect() protoreflect.Message { - mi := &file_wal_proto_msgTypes[0] +func (x *WALEntry) ProtoReflect() protoreflect.Message { + mi := &file_internal_wal_wal_proto_msgTypes[0] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -55,56 +57,82 @@ func (x *WALLogEntry) ProtoReflect() protoreflect.Message { return mi.MessageOf(x) } -// Deprecated: Use WALLogEntry.ProtoReflect.Descriptor instead. -func (*WALLogEntry) Descriptor() ([]byte, []int) { - return file_wal_proto_rawDescGZIP(), []int{0} +// Deprecated: Use WALEntry.ProtoReflect.Descriptor instead. +func (*WALEntry) Descriptor() ([]byte, []int) { + return file_internal_wal_wal_proto_rawDescGZIP(), []int{0} } -func (x *WALLogEntry) GetChecksum() []byte { +func (x *WALEntry) GetVersion() string { if x != nil { - return x.Checksum + return x.Version + } + return "" +} + +func (x *WALEntry) GetLogSequenceNumber() uint64 { + if x != nil { + return x.LogSequenceNumber + } + return 0 +} + +func (x *WALEntry) GetData() []byte { + if x != nil { + return x.Data } return nil } -func (x *WALLogEntry) GetCommand() string { - if x != nil && x.Command != nil { - return *x.Command +func (x *WALEntry) GetCrc32() uint32 { + if x != nil { + return x.Crc32 } - return "" + return 0 +} + +func (x *WALEntry) GetTimestamp() int64 { + if x != nil { + return x.Timestamp + } + return 0 } -var File_wal_proto protoreflect.FileDescriptor - -var file_wal_proto_rawDesc = []byte{ - 0x0a, 0x09, 0x77, 0x61, 0x6c, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x03, 0x77, 0x61, 0x6c, - 0x22, 0x66, 0x0a, 0x0b, 0x57, 0x41, 0x4c, 0x4c, 0x6f, 0x67, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, - 0x1f, 0x0a, 0x08, 0x63, 0x68, 0x65, 0x63, 0x6b, 0x73, 0x75, 0x6d, 0x18, 0x01, 0x20, 0x01, 0x28, - 0x0c, 0x48, 0x00, 0x52, 0x08, 0x63, 0x68, 0x65, 0x63, 0x6b, 0x73, 0x75, 0x6d, 0x88, 0x01, 0x01, - 0x12, 0x1d, 0x0a, 0x07, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, - 0x09, 0x48, 0x01, 0x52, 0x07, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x88, 0x01, 0x01, 0x42, - 0x0b, 0x0a, 0x09, 0x5f, 0x63, 0x68, 0x65, 0x63, 0x6b, 0x73, 0x75, 0x6d, 0x42, 0x0a, 0x0a, 0x08, - 0x5f, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x42, 0x0e, 0x5a, 0x0c, 0x69, 0x6e, 0x74, 0x65, - 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x77, 0x61, 0x6c, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +var File_internal_wal_wal_proto protoreflect.FileDescriptor + +var file_internal_wal_wal_proto_rawDesc = []byte{ + 0x0a, 0x16, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x77, 0x61, 0x6c, 0x2f, 0x77, + 0x61, 0x6c, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x03, 0x77, 0x61, 0x6c, 0x22, 0x9c, 0x01, + 0x0a, 0x08, 0x57, 0x41, 0x4c, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x18, 0x0a, 0x07, 0x76, 0x65, + 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x76, 0x65, 0x72, + 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x2e, 0x0a, 0x13, 0x6c, 0x6f, 0x67, 0x5f, 0x73, 0x65, 0x71, 0x75, + 0x65, 0x6e, 0x63, 0x65, 0x5f, 0x6e, 0x75, 0x6d, 0x62, 0x65, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, + 0x04, 0x52, 0x11, 0x6c, 0x6f, 0x67, 0x53, 0x65, 0x71, 0x75, 0x65, 0x6e, 0x63, 0x65, 0x4e, 0x75, + 0x6d, 0x62, 0x65, 0x72, 0x12, 0x12, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x03, 0x20, 0x01, + 0x28, 0x0c, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x12, 0x14, 0x0a, 0x05, 0x63, 0x72, 0x63, 0x33, + 0x32, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x05, 0x63, 0x72, 0x63, 0x33, 0x32, 0x12, 0x1c, + 0x0a, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, 0x05, 0x20, 0x01, 0x28, + 0x03, 0x52, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x42, 0x0e, 0x5a, 0x0c, + 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x77, 0x61, 0x6c, 0x62, 0x06, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x33, } var ( - file_wal_proto_rawDescOnce sync.Once - file_wal_proto_rawDescData = file_wal_proto_rawDesc + file_internal_wal_wal_proto_rawDescOnce sync.Once + file_internal_wal_wal_proto_rawDescData = file_internal_wal_wal_proto_rawDesc ) -func file_wal_proto_rawDescGZIP() []byte { - file_wal_proto_rawDescOnce.Do(func() { - file_wal_proto_rawDescData = protoimpl.X.CompressGZIP(file_wal_proto_rawDescData) +func file_internal_wal_wal_proto_rawDescGZIP() []byte { + file_internal_wal_wal_proto_rawDescOnce.Do(func() { + file_internal_wal_wal_proto_rawDescData = protoimpl.X.CompressGZIP(file_internal_wal_wal_proto_rawDescData) }) - return file_wal_proto_rawDescData + return file_internal_wal_wal_proto_rawDescData } -var file_wal_proto_msgTypes = make([]protoimpl.MessageInfo, 1) -var file_wal_proto_goTypes = []any{ - (*WALLogEntry)(nil), // 0: wal.WALLogEntry +var file_internal_wal_wal_proto_msgTypes = make([]protoimpl.MessageInfo, 1) +var file_internal_wal_wal_proto_goTypes = []any{ + (*WALEntry)(nil), // 0: wal.WALEntry } -var file_wal_proto_depIdxs = []int32{ +var file_internal_wal_wal_proto_depIdxs = []int32{ 0, // [0:0] is the sub-list for method output_type 0, // [0:0] is the sub-list for method input_type 0, // [0:0] is the sub-list for extension type_name @@ -112,28 +140,27 @@ var file_wal_proto_depIdxs = []int32{ 0, // [0:0] is the sub-list for field type_name } -func init() { file_wal_proto_init() } -func file_wal_proto_init() { - if File_wal_proto != nil { +func init() { file_internal_wal_wal_proto_init() } +func file_internal_wal_wal_proto_init() { + if File_internal_wal_wal_proto != nil { return } - file_wal_proto_msgTypes[0].OneofWrappers = []any{} type x struct{} out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), - RawDescriptor: file_wal_proto_rawDesc, + RawDescriptor: file_internal_wal_wal_proto_rawDesc, NumEnums: 0, NumMessages: 1, NumExtensions: 0, NumServices: 0, }, - GoTypes: file_wal_proto_goTypes, - DependencyIndexes: file_wal_proto_depIdxs, - MessageInfos: file_wal_proto_msgTypes, + GoTypes: file_internal_wal_wal_proto_goTypes, + DependencyIndexes: file_internal_wal_wal_proto_depIdxs, + MessageInfos: file_internal_wal_wal_proto_msgTypes, }.Build() - File_wal_proto = out.File - file_wal_proto_rawDesc = nil - file_wal_proto_goTypes = nil - file_wal_proto_depIdxs = nil + File_internal_wal_wal_proto = out.File + file_internal_wal_wal_proto_rawDesc = nil + file_internal_wal_wal_proto_goTypes = nil + file_internal_wal_wal_proto_depIdxs = nil } diff --git a/internal/wal/wal.proto b/internal/wal/wal.proto index 87e88e72d..9184fe008 100644 --- a/internal/wal/wal.proto +++ b/internal/wal/wal.proto @@ -3,8 +3,10 @@ syntax = "proto3"; package wal; option go_package = "internal/wal"; -// WALLogEntry represents a single log entry in the WAL. -message WALLogEntry { - optional bytes checksum = 1; // SHA-256 checksum of the command for integrity - optional string command = 2; // Command +message WALEntry { + string version = 1; // Version of the WAL entry (e.g., "v1.0") + uint64 log_sequence_number = 2; // Log Sequence Number (LSN) + bytes data = 3; // The actual data being logged + uint32 crc32 = 4; // Cyclic Redundancy Check for integrity + int64 timestamp = 5; // Timestamp for the WAL entry (epoch time in nanoseconds) } diff --git a/internal/wal/wal_aof.go b/internal/wal/wal_aof.go index fe0ca7c34..a94add0cc 100644 --- a/internal/wal/wal_aof.go +++ b/internal/wal/wal_aof.go @@ -1,186 +1,308 @@ package wal import ( - "bytes" - "crypto/sha256" + "bufio" + "context" "encoding/binary" "fmt" + "hash/crc32" "io" + "log" "log/slog" "os" "path/filepath" - "sort" - "strings" - "sync" + sync "sync" "time" + "github.com/dicedb/dice/config" "github.com/dicedb/dice/internal/cmd" - "google.golang.org/protobuf/proto" ) -var writeBuf bytes.Buffer +const ( + segmentPrefix = "seg-" + defaultVersion = "v0.0.1" + RotationModeTime = "time" + RetentionModeTime = "time" + WALModeUnbuffered = "unbuffered" +) -type WALAOF struct { - file *os.File - mutex sync.Mutex - logDir string +type AOF struct { + logDir string + currentSegmentFile *os.File + walMode string + writeMode string + maxSegmentSize int + maxSegmentCount int + currentSegmentIndex int + oldestSegmentIndex int + byteOffset int + bufferSize int + retentionMode string + recoveryMode string + rotationMode string + lastSequenceNo uint64 + bufWriter *bufio.Writer + bufferSyncTicker *time.Ticker + segmentRotationTicker *time.Ticker + segmentRetentionTicker *time.Ticker + mu sync.Mutex + ctx context.Context + cancel context.CancelFunc } -func NewAOFWAL(logDir string) (*WALAOF, error) { - return &WALAOF{ - logDir: logDir, +func NewAOFWAL(directory string) (*AOF, error) { + ctx, cancel := context.WithCancel(context.Background()) + + return &AOF{ + logDir: directory, + walMode: config.DiceConfig.WAL.WalMode, + bufferSyncTicker: time.NewTicker(config.DiceConfig.WAL.BufferSyncInterval), + segmentRotationTicker: time.NewTicker(config.DiceConfig.WAL.MaxSegmentRotationTime), + segmentRetentionTicker: time.NewTicker(config.DiceConfig.WAL.MaxSegmentRetentionDuration), + writeMode: config.DiceConfig.WAL.WriteMode, + maxSegmentSize: config.DiceConfig.WAL.MaxSegmentSizeMB * 1024 * 1024, + maxSegmentCount: config.DiceConfig.WAL.MaxSegmentCount, + bufferSize: config.DiceConfig.WAL.BufferSizeMB * 1024 * 1024, + retentionMode: config.DiceConfig.WAL.RetentionMode, + recoveryMode: config.DiceConfig.WAL.RecoveryMode, + rotationMode: config.DiceConfig.WAL.RotationMode, + ctx: ctx, + cancel: cancel, }, nil } -func (w *WALAOF) Init(t time.Time) error { - slog.Debug("initializing WAL at", slog.Any("log-dir", w.logDir)) - if err := os.MkdirAll(w.logDir, os.ModePerm); err != nil { - return fmt.Errorf("failed to create log directory: %w", err) +func (wal *AOF) Init(t time.Time) error { + // TODO - Restore existing checkpoints to memory + + // Create the directory if it doesn't exist + if err := os.MkdirAll(wal.logDir, 0755); err != nil { + return nil + } + + // Get the list of log segment files in the directory + files, err := filepath.Glob(filepath.Join(wal.logDir, segmentPrefix+"*")) + if err != nil { + return nil } - timestamp := t.Format("20060102_1504") - path := filepath.Join(w.logDir, fmt.Sprintf("wal_%s.aof", timestamp)) + if len(files) > 0 { + slog.Info("Found existing log segments", slog.Any("files", files)) + // TODO - Check if we have newer WAL entries after the last checkpoint and simultaneously replay and checkpoint them + } - newFile, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + wal.lastSequenceNo = 0 + wal.currentSegmentIndex = 0 + wal.oldestSegmentIndex = 0 + wal.byteOffset = 0 + newFile, err := os.OpenFile(filepath.Join(wal.logDir, "seg-0"), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) if err != nil { - return fmt.Errorf("failed to open new WAL file: %v", err) + return err } + wal.currentSegmentFile = newFile - w.file = newFile - return nil -} + if _, err := wal.currentSegmentFile.Seek(0, io.SeekEnd); err != nil { + return err + } + wal.bufWriter = bufio.NewWriterSize(wal.currentSegmentFile, wal.bufferSize) -// LogCommand serializes a WALLogEntry and writes it to the current WAL file. -func (w *WALAOF) LogCommand(c *cmd.DiceDBCmd) { - w.mutex.Lock() - defer w.mutex.Unlock() + go wal.keepSyncingBuffer() - repr := fmt.Sprintf("%s %s", c.Cmd, strings.Join(c.Args, " ")) + if wal.rotationMode == RotationModeTime { + go wal.rotateSegmentPeriodically() + } - entry := &WALLogEntry{ - Command: &repr, - Checksum: checksum(repr), + if wal.retentionMode == RetentionModeTime { + go wal.deleteSegmentPeriodically() } - data, err := proto.Marshal(entry) - if err != nil { - slog.Warn("failed to serialize command", slog.Any("error", err.Error())) + return nil +} + +// WriteEntry writes an entry to the WAL. +func (wal *AOF) LogCommand(data []byte) error { + return wal.writeEntry(data) +} + +func (wal *AOF) writeEntry(data []byte) error { + wal.mu.Lock() + defer wal.mu.Unlock() + + wal.lastSequenceNo++ + entry := &WALEntry{ + Version: defaultVersion, + LogSequenceNumber: wal.lastSequenceNo, + Data: data, + Crc32: crc32.ChecksumIEEE(append(data, byte(wal.lastSequenceNo))), + Timestamp: time.Now().UnixNano(), } - writeBuf.Reset() - writeBuf.Grow(4 + len(data)) - if binary.Write(&writeBuf, binary.BigEndian, uint32(len(data))) != nil { - slog.Warn("failed to write entry length to WAL", slog.Any("error", err.Error())) + entrySize := getEntrySize(data) + if err := wal.rotateLogIfNeeded(entrySize); err != nil { + return err } - writeBuf.Write(data) - if _, err := w.file.Write(writeBuf.Bytes()); err != nil { - slog.Warn("failed to write serialized command to WAL", slog.Any("error", err.Error())) + wal.byteOffset += entrySize + + if err := wal.writeEntryToBuffer(entry); err != nil { + return err } - if err := w.file.Sync(); err != nil { - slog.Warn("failed to sync WAL", slog.Any("error", err.Error())) + // if wal-mode unbuffered immediately sync to disk + if wal.walMode == WALModeUnbuffered { //nolint:goconst + if err := wal.Sync(); err != nil { + return err + } } - slog.Debug("logged command in WAL", slog.Any("command", c.Repr())) + return nil } -func (w *WALAOF) Close() error { - if w.file == nil { - return nil +func (wal *AOF) writeEntryToBuffer(entry *WALEntry) error { + marshaledEntry := MustMarshal(entry) + + size := int32(len(marshaledEntry)) + if err := binary.Write(wal.bufWriter, binary.LittleEndian, size); err != nil { + return err } - return w.file.Close() + _, err := wal.bufWriter.Write(marshaledEntry) + + return err } -// checksum generates a SHA-256 hash for the given command. -func checksum(command string) []byte { - hash := sha256.Sum256([]byte(command)) - return hash[:] +// rotateLogIfNeeded is not thread safe +func (wal *AOF) rotateLogIfNeeded(entrySize int) error { + if wal.byteOffset+entrySize > wal.maxSegmentSize { + if err := wal.rotateLog(); err != nil { + return err + } + } + return nil } -func (w *WALAOF) ForEachCommand(f func(c cmd.DiceDBCmd) error) error { - var length uint32 +// rotateLog is not thread safe +func (wal *AOF) rotateLog() error { + if err := wal.Sync(); err != nil { + return err + } - files, err := os.ReadDir(w.logDir) - if err != nil { - return fmt.Errorf("failed to read log directory: %v", err) + if err := wal.currentSegmentFile.Close(); err != nil { + return err } - var walFiles []os.DirEntry + wal.currentSegmentIndex++ - for _, file := range files { - if !file.IsDir() && filepath.Ext(file.Name()) == ".aof" { - walFiles = append(walFiles, file) + if wal.currentSegmentIndex-wal.oldestSegmentIndex+1 > wal.maxSegmentCount { + if err := wal.deleteOldestSegment(); err != nil { + return err } + wal.oldestSegmentIndex++ } - if len(walFiles) == 0 { - return fmt.Errorf("no valid WAL files found in log directory") + newFile, err := os.OpenFile(filepath.Join(wal.logDir, segmentPrefix+fmt.Sprintf("-%d", wal.currentSegmentIndex)), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + log.Fatalf("failed opening file: %s", err) } - // Sort files by timestamp in ascending order - sort.Slice(walFiles, func(i, j int) bool { - timestampStrI := walFiles[i].Name()[4:17] - timestampStrJ := walFiles[j].Name()[4:17] - timestampI, errI := time.Parse("20060102_1504", timestampStrI) - timestampJ, errJ := time.Parse("20060102_1504", timestampStrJ) - if errI != nil || errJ != nil { - return false - } - return timestampI.Before(timestampJ) - }) + wal.byteOffset = 0 - for _, file := range walFiles { - filePath := filepath.Join(w.logDir, file.Name()) + wal.currentSegmentFile = newFile + wal.bufWriter = bufio.NewWriter(newFile) - slog.Debug("loading WAL", slog.Any("file", filePath)) + return nil +} + +func (wal *AOF) deleteOldestSegment() error { + oldestSegmentFilePath := filepath.Join(wal.logDir, segmentPrefix+fmt.Sprintf("%d", wal.oldestSegmentIndex)) + + // TODO: checkpoint before deleting the file + + if err := os.Remove(oldestSegmentFilePath); err != nil { + return err + } + wal.oldestSegmentIndex++ + + return nil +} + +// Close the WAL file. It also calls Sync() on the WAL. +func (wal *AOF) Close() error { + wal.cancel() + if err := wal.Sync(); err != nil { + return err + } + return wal.currentSegmentFile.Close() +} - file, err := os.OpenFile(filePath, os.O_RDONLY, 0644) - if err != nil { - return fmt.Errorf("failed to open WAL file %s: %v", file.Name(), err) +// Writes out any data in the WAL's in-memory buffer to the segment file. If +// fsync is enabled, it also calls fsync on the segment file. +func (wal *AOF) Sync() error { + if err := wal.bufWriter.Flush(); err != nil { + return err + } + if wal.writeMode == "fsync" { //nolint:goconst + if err := wal.currentSegmentFile.Sync(); err != nil { + return err } + } - for { - if err := binary.Read(file, binary.BigEndian, &length); err != nil { - if err == io.EOF { - break - } - return fmt.Errorf("failed to read entry length: %v", err) - } + return nil +} - // TODO: Optimize this allocation. - // Pre-allocate and reuse rather than allocating for each entry. - readBufBytes := make([]byte, length) - if _, err := io.ReadFull(file, readBufBytes); err != nil { - return fmt.Errorf("failed to read entry data: %v", err) - } +func (wal *AOF) keepSyncingBuffer() { + for { + select { + case <-wal.bufferSyncTicker.C: - entry := &WALLogEntry{} - if err := proto.Unmarshal(readBufBytes, entry); err != nil { - return fmt.Errorf("failed to unmarshal WAL entry: %v", err) - } + wal.mu.Lock() + err := wal.Sync() + wal.mu.Unlock() - if entry.Command == nil { - return fmt.Errorf("invalid WAL entry: missing command field") + if err != nil { + log.Printf("Error while performing sync: %v", err) } - commandParts := strings.SplitN(*entry.Command, " ", 2) - if len(commandParts) < 2 { - return fmt.Errorf("invalid command format in WAL entry: %s", *entry.Command) - } + case <-wal.ctx.Done(): + return + } + } +} - c := cmd.DiceDBCmd{ - Cmd: commandParts[0], - Args: strings.Split(commandParts[1], " "), - } +func (wal *AOF) rotateSegmentPeriodically() { + for { + select { + case <-wal.segmentRotationTicker.C: - if err := f(c); err != nil { - return fmt.Errorf("error processing command: %v", err) + wal.mu.Lock() + err := wal.rotateLog() + wal.mu.Unlock() + if err != nil { + log.Printf("Error while performing sync: %v", err) } + + case <-wal.ctx.Done(): + return } + } +} + +func (wal *AOF) deleteSegmentPeriodically() { + for { + select { + case <-wal.segmentRetentionTicker.C: - file.Close() + wal.mu.Lock() + err := wal.deleteOldestSegment() + wal.mu.Unlock() + if err != nil { + log.Printf("Error while deleting segment: %v", err) + } + case <-wal.ctx.Done(): + return + } } +} +func (wal *AOF) ForEachCommand(f func(c cmd.DiceDBCmd) error) error { + // TODO: implement this method return nil } diff --git a/internal/wal/wal_null.go b/internal/wal/wal_null.go index c4a58ccc9..80f3aa849 100644 --- a/internal/wal/wal_null.go +++ b/internal/wal/wal_null.go @@ -18,7 +18,8 @@ func (w *WALNull) Init(t time.Time) error { } // LogCommand serializes a WALLogEntry and writes it to the current WAL file. -func (w *WALNull) LogCommand(c *cmd.DiceDBCmd) { +func (w *WALNull) LogCommand(b []byte) error { + return nil } func (w *WALNull) Close() error { diff --git a/internal/wal/wal_test.go b/internal/wal/wal_test.go index b3ee707e3..69ff2d803 100644 --- a/internal/wal/wal_test.go +++ b/internal/wal/wal_test.go @@ -5,30 +5,9 @@ import ( "testing" "time" - "github.com/dicedb/dice/internal/cmd" "github.com/dicedb/dice/internal/wal" ) -func BenchmarkLogCommandSQLite(b *testing.B) { - wl, err := wal.NewSQLiteWAL("/tmp/dicedb-lt") - if err != nil { - panic(err) - } - - if err := wl.Init(time.Now()); err != nil { - slog.Error("could not initialize WAL", slog.Any("error", err)) - } else { - go wal.InitBG(wl) - } - - for i := 0; i < b.N; i++ { - wl.LogCommand(&cmd.DiceDBCmd{ - Cmd: "SET", - Args: []string{"key", "value"}, - }) - } -} - func BenchmarkLogCommandAOF(b *testing.B) { wl, err := wal.NewAOFWAL("/tmp/dicedb-lt") if err != nil { @@ -42,9 +21,6 @@ func BenchmarkLogCommandAOF(b *testing.B) { } for i := 0; i < b.N; i++ { - wl.LogCommand(&cmd.DiceDBCmd{ - Cmd: "SET", - Args: []string{"key", "value"}, - }) + wl.LogCommand([]byte("SET key value")) } } diff --git a/internal/wal/wal_utils.go b/internal/wal/wal_utils.go new file mode 100644 index 000000000..790f8b7c1 --- /dev/null +++ b/internal/wal/wal_utils.go @@ -0,0 +1,42 @@ +package wal + +import ( + "fmt" + + "google.golang.org/protobuf/proto" +) + +const ( + versionTagSize = 1 // Tag for "version" field + versionLengthPrefixSize = 1 // Length prefix for "version" + versionSize = 6 // Fixed size for "v0.0.1" + logSequenceNumberSize = 8 + dataTagSize = 1 // Tag for "data" field + dataLengthPrefixSize = 1 // Length prefix for "data" + CRCSize = 4 + timestampSize = 8 +) + +// Marshals +func MustMarshal(entry *WALEntry) []byte { + marshaledEntry, err := proto.Marshal(entry) + if err != nil { + panic(fmt.Sprintf("marshal should never fail (%v)", err)) + } + + return marshaledEntry +} + +func MustUnmarshal(data []byte, entry *WALEntry) { + if err := proto.Unmarshal(data, entry); err != nil { + panic(fmt.Sprintf("unmarshal should never fail (%v)", err)) + } +} + +func getEntrySize(data []byte) int { + return versionTagSize + versionLengthPrefixSize + versionSize + // Version field + logSequenceNumberSize + // Log Sequence Number field + dataTagSize + dataLengthPrefixSize + len(data) + // Data field + CRCSize + // CRC field + timestampSize // Timestamp field +} diff --git a/main.go b/main.go index 0eea883a5..495282c6d 100644 --- a/main.go +++ b/main.go @@ -32,6 +32,10 @@ import ( dstore "github.com/dicedb/dice/internal/store" ) +const ( + WALEngineAOF = "aof" +) + func main() { iid := observability.GetOrCreateInstanceID() config.DiceConfig.InstanceID = iid @@ -55,16 +59,8 @@ func main() { wl, _ = wal.NewNullWAL() if config.DiceConfig.Persistence.Enabled { - if config.DiceConfig.Persistence.WALEngine == "sqlite" { - _wl, err := wal.NewSQLiteWAL(config.DiceConfig.Persistence.WALDir) - if err != nil { - slog.Warn("could not create WAL with", slog.String("wal-engine", config.DiceConfig.Persistence.WALEngine), slog.Any("error", err)) - sigs <- syscall.SIGKILL - return - } - wl = _wl - } else if config.DiceConfig.Persistence.WALEngine == "aof" { - _wl, err := wal.NewAOFWAL(config.DiceConfig.Persistence.WALDir) + if config.DiceConfig.Persistence.WALEngine == WALEngineAOF { + _wl, err := wal.NewAOFWAL(config.DiceConfig.WAL.LogDir) if err != nil { slog.Warn("could not create WAL with", slog.String("wal-engine", config.DiceConfig.Persistence.WALEngine), slog.Any("error", err)) sigs <- syscall.SIGKILL