From b85c2d4c270428056de09217041660c24da12e2a Mon Sep 17 00:00:00 2001 From: ayushsatyam146 Date: Tue, 3 Dec 2024 12:36:48 +0530 Subject: [PATCH 1/7] added WAL implementation with configurables --- config/config.go | 38 +++- dicedb.conf | 17 +- internal/iothread/iothread.go | 5 +- internal/wal/gen_wal_proto.sh | 50 +++++ internal/wal/wal.go | 2 +- internal/wal/wal.pb.go | 138 +++++++------ internal/wal/wal.proto | 10 +- internal/wal/wal_aof.go | 363 +++++++++++++++++++++++----------- internal/wal/wal_null.go | 3 +- internal/wal/wal_test.go | 44 ++--- internal/wal/wal_utils.go | 81 ++++++++ main.go | 10 +- 12 files changed, 550 insertions(+), 211 deletions(-) create mode 100755 internal/wal/gen_wal_proto.sh create mode 100644 internal/wal/wal_utils.go diff --git a/config/config.go b/config/config.go index 98d1fc64f..314fa4923 100644 --- a/config/config.go +++ b/config/config.go @@ -84,7 +84,22 @@ auth.password = "" # Network Configuration network.io_buffer_length = 512 -network.io_buffer_length_max = 51200` +network.io_buffer_length_max = 51200 + +# WAL Configuration +LogDir = "tmp/deicdeb-wal-lt" +Enabled = "true" +WalMode = "buffered" +WriteMode = "default" +BufferSizeMB = 1 +RotationMode = "segemnt-size" +MaxSegmentSizeMB = 16 +SegmentRotationTimeSec = 60 +BufferSyncIntervalMillis = 200 +RetentionMode = "num-segments" +MaxSegmentCount = 10 +SegmentRetentionDurationSec = 600 +RecoveryMode = "strict"` ) var ( @@ -104,6 +119,7 @@ type Config struct { Persistence persistence `config:"persistence"` Logging logging `config:"logging"` Network network `config:"network"` + WAL WALConfig `config:"WAL"` } type auth struct { @@ -147,12 +163,12 @@ type memory struct { MaxMemory int64 `config:"max_memory" default:"0" validate:"min=0"` EvictionPolicy string `config:"eviction_policy" default:"allkeys-lfu" validate:"oneof=simple-first allkeys-random allkeys-lru allkeys-lfu"` EvictionRatio float64 `config:"eviction_ratio" default:"0.9" validate:"min=0,lte=1"` - KeysLimit int `config:"keys_limit" default:"200000000" validate:"min=0"` + KeysLimit int `config:"keys_limit" default:"200000000" validate:"min=10"` LFULogFactor int `config:"lfu_log_factor" default:"10" validate:"min=0"` } type persistence struct { - Enabled bool `config:"enabled" default:"false"` + Enabled bool `config:"enabled" default:"true"` AOFFile string `config:"aof_file" default:"./dice-master.aof" validate:"filepath"` WriteAOFOnCleanup bool `config:"write_aof_on_cleanup" default:"false"` WALDir string `config:"wal-dir" default:"./" validate:"dirpath"` @@ -160,6 +176,22 @@ type persistence struct { WALEngine string `config:"wal-engine" default:"aof" validate:"oneof=sqlite aof"` } +type WALConfig struct { + LogDir string `config:"log_dir" default:"tmp/deicdeb-wal-lt"` + Enabled bool `config:"enabled" default:"true"` + WalMode string `config:"mode" default:"buffered" validate:"oneof=buffered unbuffered"` + WriteMode string `config:"write_mode" default:"default" validate:"oneof=default fsync"` + BufferSizeMB int `config:"buffer_size" default:"1" validate:"min=1"` + RotationMode string `config:"rotation_mode" default:"segemnt-size" validate:"oneof=segment-size time"` + MaxSegmentSizeMB int64 `config:"buffer_size" default:"16" validate:"min=1"` + SegmentRotationTimeSec time.Duration `config:"max_segment_rotation_time" default:"60" validate:"min=1"` + BufferSyncIntervalMillis time.Duration `config:"max_segment_rotation_time" default:"200" validate:"min=1"` + RetentionMode string `config:"retention_mode" default:"num-segments" validate:"oneof=num-segments time checkpoint"` + MaxSegmentCount int `config:"max_segment_count" default:"10" validate:"min=1"` + SegmentRetentionDurationSec time.Duration `config:"max_segment_retention_time" default:"600" validate:"min=1"` + RecoveryMode string `config:"recovery_mode" default:"strict" validate:"oneof=strict truncate ignore"` +} + type logging struct { LogLevel string `config:"log_level" default:"info" validate:"oneof=debug info warn error"` LogDir string `config:"log_dir" default:"/tmp/dicedb" validate:"dirpath"` diff --git a/dicedb.conf b/dicedb.conf index 070f3c91d..d012b3ca7 100644 --- a/dicedb.conf +++ b/dicedb.conf @@ -56,4 +56,19 @@ auth.password = "" # Network Configuration network.io_buffer_length = 512 -network.io_buffer_length_max = 51200 \ No newline at end of file +network.io_buffer_length_max = 51200 + +# WAL Configuration +LogDir = "tmp/deicdeb-wal-lt" +Enabled = "true" +WalMode = "buffered" +WriteMode = "default" +BufferSizeMB = 1 +RotationMode = "segemnt-size" +MaxSegmentSizeMB = 16 +SegmentRotationTimeSec = 60 +BufferSyncIntervalMillis = 200 +RetentionMode = "num-segments" +MaxSegmentCount = 10 +SegmentRetentionDurationSec = 600 +RecoveryMode = "strict"` \ No newline at end of file diff --git a/internal/iothread/iothread.go b/internal/iothread/iothread.go index 6aab5120f..e2e9842d2 100644 --- a/internal/iothread/iothread.go +++ b/internal/iothread/iothread.go @@ -7,6 +7,7 @@ import ( "log/slog" "net" "strconv" + "strings" "sync/atomic" "syscall" "time" @@ -563,13 +564,13 @@ func (t *BaseIOThread) handleCommand(ctx context.Context, cmdMeta CmdMeta, diceD } if err == nil && t.wl != nil { - t.wl.LogCommand(diceDBCmd) + t.wl.LogCommand([]byte(fmt.Sprintf("%s %s", diceDBCmd.Cmd, strings.Join(diceDBCmd.Args, " ")))) } case MultiShard, AllShard: err = t.writeResponse(ctx, cmdMeta.composeResponse(storeOp...)) if err == nil && t.wl != nil { - t.wl.LogCommand(diceDBCmd) + t.wl.LogCommand([]byte(fmt.Sprintf("%s %s", diceDBCmd.Cmd, strings.Join(diceDBCmd.Args, " ")))) } default: slog.Error("Unknown command type", diff --git a/internal/wal/gen_wal_proto.sh b/internal/wal/gen_wal_proto.sh new file mode 100755 index 000000000..228ec0db1 --- /dev/null +++ b/internal/wal/gen_wal_proto.sh @@ -0,0 +1,50 @@ +#!/bin/bash + +# Exit immediately if a command exits with a non-zero status +set -e + +# Define the directory where the proto file resides +PROTO_DIR=$(dirname "$0") + +# Define the proto file and the output file +PROTO_FILE="$PROTO_DIR/wal.proto" +OUTPUT_DIR="." + +# Function to install protoc if not present +install_protoc() { + echo "protoc not found. Installing Protocol Buffers compiler..." + if [[ "$(uname)" == "Darwin" ]]; then + # MacOS installation + brew install protobuf + elif [[ "$(uname)" == "Linux" ]]; then + # Linux installation + sudo apt update && sudo apt install -y protobuf-compiler + else + echo "Unsupported OS. Please install 'protoc' manually." + exit 1 + fi +} + +# Function to install the Go plugin for protoc if not present +install_protoc_gen_go() { + echo "protoc-gen-go not found. Installing Go plugin for protoc..." + go install google.golang.org/protobuf/cmd/protoc-gen-go@latest + export PATH="$PATH:$(go env GOPATH)/bin" + echo "Make sure $(go env GOPATH)/bin is in your PATH." +} + +# Check if protoc is installed, install if necessary +if ! command -v protoc &>/dev/null; then + install_protoc +fi + +# Check if the Go plugin for protoc is installed, install if necessary +if ! command -v protoc-gen-go &>/dev/null; then + install_protoc_gen_go +fi + +# Generate the wal.pb.go file +echo "Generating wal.pb.go from wal.proto..." +protoc --go_out="$OUTPUT_DIR" --go_opt=paths=source_relative "$PROTO_FILE" + +echo "Generation complete. File created at $OUTPUT_DIR/wal.pb.go" diff --git a/internal/wal/wal.go b/internal/wal/wal.go index ca251560e..54c8f2919 100644 --- a/internal/wal/wal.go +++ b/internal/wal/wal.go @@ -10,7 +10,7 @@ import ( ) type AbstractWAL interface { - LogCommand(c *cmd.DiceDBCmd) + LogCommand([] byte) error Close() error Init(t time.Time) error ForEachCommand(f func(c cmd.DiceDBCmd) error) error diff --git a/internal/wal/wal.pb.go b/internal/wal/wal.pb.go index 4d82bcec8..40f22cc8c 100644 --- a/internal/wal/wal.pb.go +++ b/internal/wal/wal.pb.go @@ -1,8 +1,8 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.35.2 -// protoc v3.21.12 -// source: wal.proto +// protoc v5.28.3 +// source: internal/wal/wal.proto package wal @@ -20,31 +20,33 @@ const ( _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) ) -// WALLogEntry represents a single log entry in the WAL. -type WALLogEntry struct { +type WAL_Entry struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Checksum []byte `protobuf:"bytes,1,opt,name=checksum,proto3,oneof" json:"checksum,omitempty"` // SHA-256 checksum of the command for integrity - Command *string `protobuf:"bytes,2,opt,name=command,proto3,oneof" json:"command,omitempty"` // Command + Version string `protobuf:"bytes,1,opt,name=version,proto3" json:"version,omitempty"` // Version of the WAL entry (e.g., "v1.0") + LogSequenceNumber uint64 `protobuf:"varint,2,opt,name=logSequenceNumber,proto3" json:"logSequenceNumber,omitempty"` // Log Sequence Number (LSN) + Data []byte `protobuf:"bytes,3,opt,name=data,proto3" json:"data,omitempty"` // The actual data being logged + CRC uint32 `protobuf:"varint,4,opt,name=CRC,proto3" json:"CRC,omitempty"` // Cyclic Redundancy Check for integrity + Timestamp int64 `protobuf:"varint,5,opt,name=timestamp,proto3" json:"timestamp,omitempty"` // Timestamp for the WAL entry (epoch time in nanoseconds) } -func (x *WALLogEntry) Reset() { - *x = WALLogEntry{} - mi := &file_wal_proto_msgTypes[0] +func (x *WAL_Entry) Reset() { + *x = WAL_Entry{} + mi := &file_internal_wal_wal_proto_msgTypes[0] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } -func (x *WALLogEntry) String() string { +func (x *WAL_Entry) String() string { return protoimpl.X.MessageStringOf(x) } -func (*WALLogEntry) ProtoMessage() {} +func (*WAL_Entry) ProtoMessage() {} -func (x *WALLogEntry) ProtoReflect() protoreflect.Message { - mi := &file_wal_proto_msgTypes[0] +func (x *WAL_Entry) ProtoReflect() protoreflect.Message { + mi := &file_internal_wal_wal_proto_msgTypes[0] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -55,56 +57,81 @@ func (x *WALLogEntry) ProtoReflect() protoreflect.Message { return mi.MessageOf(x) } -// Deprecated: Use WALLogEntry.ProtoReflect.Descriptor instead. -func (*WALLogEntry) Descriptor() ([]byte, []int) { - return file_wal_proto_rawDescGZIP(), []int{0} +// Deprecated: Use WAL_Entry.ProtoReflect.Descriptor instead. +func (*WAL_Entry) Descriptor() ([]byte, []int) { + return file_internal_wal_wal_proto_rawDescGZIP(), []int{0} } -func (x *WALLogEntry) GetChecksum() []byte { +func (x *WAL_Entry) GetVersion() string { if x != nil { - return x.Checksum + return x.Version + } + return "" +} + +func (x *WAL_Entry) GetLogSequenceNumber() uint64 { + if x != nil { + return x.LogSequenceNumber + } + return 0 +} + +func (x *WAL_Entry) GetData() []byte { + if x != nil { + return x.Data } return nil } -func (x *WALLogEntry) GetCommand() string { - if x != nil && x.Command != nil { - return *x.Command +func (x *WAL_Entry) GetCRC() uint32 { + if x != nil { + return x.CRC } - return "" + return 0 +} + +func (x *WAL_Entry) GetTimestamp() int64 { + if x != nil { + return x.Timestamp + } + return 0 } -var File_wal_proto protoreflect.FileDescriptor - -var file_wal_proto_rawDesc = []byte{ - 0x0a, 0x09, 0x77, 0x61, 0x6c, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x03, 0x77, 0x61, 0x6c, - 0x22, 0x66, 0x0a, 0x0b, 0x57, 0x41, 0x4c, 0x4c, 0x6f, 0x67, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, - 0x1f, 0x0a, 0x08, 0x63, 0x68, 0x65, 0x63, 0x6b, 0x73, 0x75, 0x6d, 0x18, 0x01, 0x20, 0x01, 0x28, - 0x0c, 0x48, 0x00, 0x52, 0x08, 0x63, 0x68, 0x65, 0x63, 0x6b, 0x73, 0x75, 0x6d, 0x88, 0x01, 0x01, - 0x12, 0x1d, 0x0a, 0x07, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, - 0x09, 0x48, 0x01, 0x52, 0x07, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x88, 0x01, 0x01, 0x42, - 0x0b, 0x0a, 0x09, 0x5f, 0x63, 0x68, 0x65, 0x63, 0x6b, 0x73, 0x75, 0x6d, 0x42, 0x0a, 0x0a, 0x08, - 0x5f, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x42, 0x0e, 0x5a, 0x0c, 0x69, 0x6e, 0x74, 0x65, - 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x77, 0x61, 0x6c, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +var File_internal_wal_wal_proto protoreflect.FileDescriptor + +var file_internal_wal_wal_proto_rawDesc = []byte{ + 0x0a, 0x16, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x77, 0x61, 0x6c, 0x2f, 0x77, + 0x61, 0x6c, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x03, 0x77, 0x61, 0x6c, 0x22, 0x97, 0x01, + 0x0a, 0x09, 0x57, 0x41, 0x4c, 0x5f, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x18, 0x0a, 0x07, 0x76, + 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x76, 0x65, + 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x2c, 0x0a, 0x11, 0x6c, 0x6f, 0x67, 0x53, 0x65, 0x71, 0x75, + 0x65, 0x6e, 0x63, 0x65, 0x4e, 0x75, 0x6d, 0x62, 0x65, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, + 0x52, 0x11, 0x6c, 0x6f, 0x67, 0x53, 0x65, 0x71, 0x75, 0x65, 0x6e, 0x63, 0x65, 0x4e, 0x75, 0x6d, + 0x62, 0x65, 0x72, 0x12, 0x12, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x03, 0x20, 0x01, 0x28, + 0x0c, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x12, 0x10, 0x0a, 0x03, 0x43, 0x52, 0x43, 0x18, 0x04, + 0x20, 0x01, 0x28, 0x0d, 0x52, 0x03, 0x43, 0x52, 0x43, 0x12, 0x1c, 0x0a, 0x09, 0x74, 0x69, 0x6d, + 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, 0x05, 0x20, 0x01, 0x28, 0x03, 0x52, 0x09, 0x74, 0x69, + 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x42, 0x0e, 0x5a, 0x0c, 0x69, 0x6e, 0x74, 0x65, 0x72, + 0x6e, 0x61, 0x6c, 0x2f, 0x77, 0x61, 0x6c, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( - file_wal_proto_rawDescOnce sync.Once - file_wal_proto_rawDescData = file_wal_proto_rawDesc + file_internal_wal_wal_proto_rawDescOnce sync.Once + file_internal_wal_wal_proto_rawDescData = file_internal_wal_wal_proto_rawDesc ) -func file_wal_proto_rawDescGZIP() []byte { - file_wal_proto_rawDescOnce.Do(func() { - file_wal_proto_rawDescData = protoimpl.X.CompressGZIP(file_wal_proto_rawDescData) +func file_internal_wal_wal_proto_rawDescGZIP() []byte { + file_internal_wal_wal_proto_rawDescOnce.Do(func() { + file_internal_wal_wal_proto_rawDescData = protoimpl.X.CompressGZIP(file_internal_wal_wal_proto_rawDescData) }) - return file_wal_proto_rawDescData + return file_internal_wal_wal_proto_rawDescData } -var file_wal_proto_msgTypes = make([]protoimpl.MessageInfo, 1) -var file_wal_proto_goTypes = []any{ - (*WALLogEntry)(nil), // 0: wal.WALLogEntry +var file_internal_wal_wal_proto_msgTypes = make([]protoimpl.MessageInfo, 1) +var file_internal_wal_wal_proto_goTypes = []any{ + (*WAL_Entry)(nil), // 0: wal.WAL_Entry } -var file_wal_proto_depIdxs = []int32{ +var file_internal_wal_wal_proto_depIdxs = []int32{ 0, // [0:0] is the sub-list for method output_type 0, // [0:0] is the sub-list for method input_type 0, // [0:0] is the sub-list for extension type_name @@ -112,28 +139,27 @@ var file_wal_proto_depIdxs = []int32{ 0, // [0:0] is the sub-list for field type_name } -func init() { file_wal_proto_init() } -func file_wal_proto_init() { - if File_wal_proto != nil { +func init() { file_internal_wal_wal_proto_init() } +func file_internal_wal_wal_proto_init() { + if File_internal_wal_wal_proto != nil { return } - file_wal_proto_msgTypes[0].OneofWrappers = []any{} type x struct{} out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), - RawDescriptor: file_wal_proto_rawDesc, + RawDescriptor: file_internal_wal_wal_proto_rawDesc, NumEnums: 0, NumMessages: 1, NumExtensions: 0, NumServices: 0, }, - GoTypes: file_wal_proto_goTypes, - DependencyIndexes: file_wal_proto_depIdxs, - MessageInfos: file_wal_proto_msgTypes, + GoTypes: file_internal_wal_wal_proto_goTypes, + DependencyIndexes: file_internal_wal_wal_proto_depIdxs, + MessageInfos: file_internal_wal_wal_proto_msgTypes, }.Build() - File_wal_proto = out.File - file_wal_proto_rawDesc = nil - file_wal_proto_goTypes = nil - file_wal_proto_depIdxs = nil + File_internal_wal_wal_proto = out.File + file_internal_wal_wal_proto_rawDesc = nil + file_internal_wal_wal_proto_goTypes = nil + file_internal_wal_wal_proto_depIdxs = nil } diff --git a/internal/wal/wal.proto b/internal/wal/wal.proto index 87e88e72d..dde026656 100644 --- a/internal/wal/wal.proto +++ b/internal/wal/wal.proto @@ -3,8 +3,10 @@ syntax = "proto3"; package wal; option go_package = "internal/wal"; -// WALLogEntry represents a single log entry in the WAL. -message WALLogEntry { - optional bytes checksum = 1; // SHA-256 checksum of the command for integrity - optional string command = 2; // Command +message WAL_Entry { + string version = 1; // Version of the WAL entry (e.g., "v1.0") + uint64 logSequenceNumber = 2; // Log Sequence Number (LSN) + bytes data = 3; // The actual data being logged + uint32 CRC = 4; // Cyclic Redundancy Check for integrity + int64 timestamp = 5; // Timestamp for the WAL entry (epoch time in nanoseconds) } diff --git a/internal/wal/wal_aof.go b/internal/wal/wal_aof.go index fe0ca7c34..7371e48aa 100644 --- a/internal/wal/wal_aof.go +++ b/internal/wal/wal_aof.go @@ -1,186 +1,329 @@ package wal import ( - "bytes" - "crypto/sha256" + "bufio" + "context" "encoding/binary" "fmt" + "hash/crc32" "io" - "log/slog" + "log" "os" "path/filepath" - "sort" - "strings" - "sync" + sync "sync" "time" + "github.com/dicedb/dice/config" "github.com/dicedb/dice/internal/cmd" - "google.golang.org/protobuf/proto" ) -var writeBuf bytes.Buffer +const ( + segmentPrefix = "seg-" + defaultVersion = "v0.0.1" + versionTagSize = 1 // Tag for "version" field + versionLengthPrefixSize = 1 // Length prefix for "version" + versionSize = 6 // Fixed size for "v0.0.1" + logSequenceNumberSize = 8 + dataTagSize = 1 // Tag for "data" field + dataLengthPrefixSize = 1 // Length prefix for "data" + CRCSize = 4 + timestampSize = 8 +) type WALAOF struct { - file *os.File - mutex sync.Mutex - logDir string + logDir string + currentSegmentFile *os.File + walMode string + writeMode string + maxSegmentSize int64 + maxSegmentCount int + currentSegmentIndex int + oldestSegmentIndex int + byteOffset int + bufferSize int + retentionMode string + recoveryMode string + rotationMode string + lastSequenceNo uint64 + bufWriter *bufio.Writer + bufferSyncTicker *time.Ticker + segmentRotationTicker *time.Ticker + segmentRetentionTicker *time.Ticker + lock sync.Mutex + ctx context.Context + cancel context.CancelFunc } -func NewAOFWAL(logDir string) (*WALAOF, error) { +func NewAOFWAL(directory string) (*WALAOF, error) { + + ctx, cancel := context.WithCancel(context.Background()) + return &WALAOF{ - logDir: logDir, + logDir: directory, + walMode: config.DiceConfig.WAL.WalMode, + bufferSyncTicker: time.NewTicker(config.DiceConfig.WAL.BufferSyncIntervalMillis * time.Millisecond), + segmentRotationTicker: time.NewTicker(config.DiceConfig.WAL.SegmentRotationTimeSec * time.Second), + segmentRetentionTicker: time.NewTicker(config.DiceConfig.WAL.SegmentRetentionDurationSec * time.Second), + writeMode: config.DiceConfig.WAL.WriteMode, + maxSegmentSize: config.DiceConfig.WAL.MaxSegmentSizeMB * 1024 * 1024, + maxSegmentCount: config.DiceConfig.WAL.MaxSegmentCount, + bufferSize: config.DiceConfig.WAL.BufferSizeMB * 1024 * 1024, + retentionMode: config.DiceConfig.WAL.RetentionMode, + recoveryMode: config.DiceConfig.WAL.RecoveryMode, + rotationMode: config.DiceConfig.WAL.RotationMode, + ctx: ctx, + cancel: cancel, }, nil } func (w *WALAOF) Init(t time.Time) error { - slog.Debug("initializing WAL at", slog.Any("log-dir", w.logDir)) - if err := os.MkdirAll(w.logDir, os.ModePerm); err != nil { - return fmt.Errorf("failed to create log directory: %w", err) + + if err := w.validateConfig(); err != nil { + return err } - timestamp := t.Format("20060102_1504") - path := filepath.Join(w.logDir, fmt.Sprintf("wal_%s.aof", timestamp)) + // TODO - Restore existing checkpoints to memory - newFile, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + // Create the directory if it doesn't exist + if err := os.MkdirAll(w.logDir, 0755); err != nil { + return nil + } + + // Get the list of log segment files in the directory + files, err := filepath.Glob(filepath.Join(w.logDir, segmentPrefix+"*")) if err != nil { - return fmt.Errorf("failed to open new WAL file: %v", err) + return nil } - w.file = newFile - return nil -} + if len(files) > 0 { + // TODO - Check if we have newer WAL entries after the last checkpoint and simultaneously replay and checkpoint them + } -// LogCommand serializes a WALLogEntry and writes it to the current WAL file. -func (w *WALAOF) LogCommand(c *cmd.DiceDBCmd) { - w.mutex.Lock() - defer w.mutex.Unlock() + var wg sync.WaitGroup + errCh := make(chan error, w.maxSegmentCount) + + for i := 0; i < w.maxSegmentCount; i++ { + wg.Add(1) + go func(index int) { + defer wg.Done() + filePath := filepath.Join(w.logDir, segmentPrefix+fmt.Sprintf("-%d", index)) + file, err := os.Create(filePath) + if err != nil { + errCh <- fmt.Errorf("error creating segment file %s: %v", filePath, err) + return + } + defer file.Close() + }(i) + } - repr := fmt.Sprintf("%s %s", c.Cmd, strings.Join(c.Args, " ")) + wg.Wait() + close(errCh) - entry := &WALLogEntry{ - Command: &repr, - Checksum: checksum(repr), + w.lastSequenceNo = 0 + w.currentSegmentIndex = 0 + w.oldestSegmentIndex = 0 + w.byteOffset = 0 + w.currentSegmentFile, err = os.OpenFile(filepath.Join(w.logDir, "seg-0"), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if _, err = w.currentSegmentFile.Seek(0, io.SeekEnd); err != nil { + return err } + w.bufWriter = bufio.NewWriterSize(w.currentSegmentFile, w.bufferSize) - data, err := proto.Marshal(entry) - if err != nil { - slog.Warn("failed to serialize command", slog.Any("error", err.Error())) + go w.keepSyncingBuffer() + + if w.rotationMode == "time" { + go w.rotateSegmentPeriodically() } - writeBuf.Reset() - writeBuf.Grow(4 + len(data)) - if binary.Write(&writeBuf, binary.BigEndian, uint32(len(data))) != nil { - slog.Warn("failed to write entry length to WAL", slog.Any("error", err.Error())) + if w.retentionMode == "time" { + go w.deleteSegmentPeriodically() } - writeBuf.Write(data) - if _, err := w.file.Write(writeBuf.Bytes()); err != nil { - slog.Warn("failed to write serialized command to WAL", slog.Any("error", err.Error())) + return nil +} + +// WriteEntry writes an entry to the WAL. +func (wal *WALAOF) LogCommand(data []byte) error { + return wal.writeEntry(data) +} + +func (wal *WALAOF) writeEntry(data []byte) error { + wal.lock.Lock() + defer wal.lock.Unlock() + + wal.lastSequenceNo++ + entry := &WAL_Entry{ + Version: defaultVersion, + LogSequenceNumber: wal.lastSequenceNo, + Data: data, + CRC: crc32.ChecksumIEEE(append(data, byte(wal.lastSequenceNo))), + Timestamp: time.Now().UnixNano(), } - if err := w.file.Sync(); err != nil { - slog.Warn("failed to sync WAL", slog.Any("error", err.Error())) + entrySize := getEntrySize(data) + wal.rotateLogIfNeeded(entrySize) + + wal.byteOffset += entrySize + + if err := wal.writeEntryToBuffer(entry); err != nil { + return err } - slog.Debug("logged command in WAL", slog.Any("command", c.Repr())) + // if wal-mode unbuffered immediatley sync to disk + if wal.walMode == "unbuffered" { + wal.Sync() + } + + return nil } -func (w *WALAOF) Close() error { - if w.file == nil { - return nil +func (wal *WALAOF) writeEntryToBuffer(entry *WAL_Entry) error { + marshaledEntry := MustMarshal(entry) + + size := int32(len(marshaledEntry)) + if err := binary.Write(wal.bufWriter, binary.LittleEndian, size); err != nil { + return err } - return w.file.Close() + _, err := wal.bufWriter.Write(marshaledEntry) + + return err } -// checksum generates a SHA-256 hash for the given command. -func checksum(command string) []byte { - hash := sha256.Sum256([]byte(command)) - return hash[:] +// rotateLogIfNeeded is not thread safe +func (wal *WALAOF) rotateLogIfNeeded(entrySize int) error { + if int64(wal.byteOffset+entrySize) > wal.maxSegmentSize { + if err := wal.rotateLog(); err != nil { + return err + } + } + return nil } -func (w *WALAOF) ForEachCommand(f func(c cmd.DiceDBCmd) error) error { - var length uint32 +// rotateLog is not thread safe +func (wal *WALAOF) rotateLog() error { + if err := wal.Sync(); err != nil { + return err + } - files, err := os.ReadDir(w.logDir) - if err != nil { - return fmt.Errorf("failed to read log directory: %v", err) + if err := wal.currentSegmentFile.Close(); err != nil { + return err } - var walFiles []os.DirEntry + wal.currentSegmentIndex++ - for _, file := range files { - if !file.IsDir() && filepath.Ext(file.Name()) == ".aof" { - walFiles = append(walFiles, file) + if wal.currentSegmentIndex-wal.oldestSegmentIndex+1 > wal.maxSegmentCount { + if err := wal.deleteOldestSegment(); err != nil { + return err } + wal.oldestSegmentIndex++ } - if len(walFiles) == 0 { - return fmt.Errorf("no valid WAL files found in log directory") + newFile, err := os.OpenFile(filepath.Join(wal.logDir, segmentPrefix+fmt.Sprintf("-%d", wal.currentSegmentIndex)), os.O_RDWR|os.O_CREATE, 0644) + if err != nil { + log.Fatalf("failed opening file: %s", err) } - // Sort files by timestamp in ascending order - sort.Slice(walFiles, func(i, j int) bool { - timestampStrI := walFiles[i].Name()[4:17] - timestampStrJ := walFiles[j].Name()[4:17] - timestampI, errI := time.Parse("20060102_1504", timestampStrI) - timestampJ, errJ := time.Parse("20060102_1504", timestampStrJ) - if errI != nil || errJ != nil { - return false - } - return timestampI.Before(timestampJ) - }) + wal.byteOffset = 0 + + wal.currentSegmentFile = newFile + wal.bufWriter = bufio.NewWriter(newFile) + + return nil +} - for _, file := range walFiles { - filePath := filepath.Join(w.logDir, file.Name()) +func (wal *WALAOF) deleteOldestSegment() error { - slog.Debug("loading WAL", slog.Any("file", filePath)) + oldestSegmentFilePath := filepath.Join(wal.logDir, segmentPrefix+fmt.Sprintf("%d", wal.oldestSegmentIndex)) - file, err := os.OpenFile(filePath, os.O_RDONLY, 0644) - if err != nil { - return fmt.Errorf("failed to open WAL file %s: %v", file.Name(), err) + // TODO: checkpoint before deleting the file + + if err := os.Remove(oldestSegmentFilePath); err != nil { + return err + } + wal.oldestSegmentIndex++ + + return nil +} + +// Close the WAL file. It also calls Sync() on the WAL. +func (wal *WALAOF) Close() error { + wal.cancel() + if err := wal.Sync(); err != nil { + return err + } + return wal.currentSegmentFile.Close() +} + +// Writes out any data in the WAL's in-memory buffer to the segment file. If +// fsync is enabled, it also calls fsync on the segment file. +func (wal *WALAOF) Sync() error { + if err := wal.bufWriter.Flush(); err != nil { + return err + } + if wal.writeMode == "fsync" { + if err := wal.currentSegmentFile.Sync(); err != nil { + return err } + } - for { - if err := binary.Read(file, binary.BigEndian, &length); err != nil { - if err == io.EOF { - break - } - return fmt.Errorf("failed to read entry length: %v", err) - } + return nil +} - // TODO: Optimize this allocation. - // Pre-allocate and reuse rather than allocating for each entry. - readBufBytes := make([]byte, length) - if _, err := io.ReadFull(file, readBufBytes); err != nil { - return fmt.Errorf("failed to read entry data: %v", err) - } +func (wal *WALAOF) keepSyncingBuffer() { + for { + select { + case <-wal.bufferSyncTicker.C: - entry := &WALLogEntry{} - if err := proto.Unmarshal(readBufBytes, entry); err != nil { - return fmt.Errorf("failed to unmarshal WAL entry: %v", err) - } + wal.lock.Lock() + err := wal.Sync() + wal.lock.Unlock() - if entry.Command == nil { - return fmt.Errorf("invalid WAL entry: missing command field") + if err != nil { + log.Printf("Error while performing sync: %v", err) } - commandParts := strings.SplitN(*entry.Command, " ", 2) - if len(commandParts) < 2 { - return fmt.Errorf("invalid command format in WAL entry: %s", *entry.Command) - } + case <-wal.ctx.Done(): + return + } + } +} - c := cmd.DiceDBCmd{ - Cmd: commandParts[0], - Args: strings.Split(commandParts[1], " "), - } +func (wal *WALAOF) rotateSegmentPeriodically() { + for { + select { + case <-wal.segmentRotationTicker.C: - if err := f(c); err != nil { - return fmt.Errorf("error processing command: %v", err) + wal.lock.Lock() + err := wal.rotateLog() + wal.lock.Unlock() + if err != nil { + log.Printf("Error while performing sync: %v", err) } + + case <-wal.ctx.Done(): + return } + } +} + +func (wal *WALAOF) deleteSegmentPeriodically() { + for { + select { + case <-wal.segmentRetentionTicker.C: - file.Close() + wal.lock.Lock() + err := wal.deleteOldestSegment() + wal.lock.Unlock() + if err != nil { + log.Printf("Error while deleting segment: %v", err) + } + case <-wal.ctx.Done(): + return + } } +} +func (w *WALAOF) ForEachCommand(f func(c cmd.DiceDBCmd) error) error { + // TODO: implement this method return nil } diff --git a/internal/wal/wal_null.go b/internal/wal/wal_null.go index c4a58ccc9..46ae01b75 100644 --- a/internal/wal/wal_null.go +++ b/internal/wal/wal_null.go @@ -18,7 +18,8 @@ func (w *WALNull) Init(t time.Time) error { } // LogCommand serializes a WALLogEntry and writes it to the current WAL file. -func (w *WALNull) LogCommand(c *cmd.DiceDBCmd) { +func (w *WALNull) LogCommand(b []byte) error{ + return nil } func (w *WALNull) Close() error { diff --git a/internal/wal/wal_test.go b/internal/wal/wal_test.go index b3ee707e3..f35de93e1 100644 --- a/internal/wal/wal_test.go +++ b/internal/wal/wal_test.go @@ -5,29 +5,28 @@ import ( "testing" "time" - "github.com/dicedb/dice/internal/cmd" "github.com/dicedb/dice/internal/wal" ) -func BenchmarkLogCommandSQLite(b *testing.B) { - wl, err := wal.NewSQLiteWAL("/tmp/dicedb-lt") - if err != nil { - panic(err) - } - - if err := wl.Init(time.Now()); err != nil { - slog.Error("could not initialize WAL", slog.Any("error", err)) - } else { - go wal.InitBG(wl) - } - - for i := 0; i < b.N; i++ { - wl.LogCommand(&cmd.DiceDBCmd{ - Cmd: "SET", - Args: []string{"key", "value"}, - }) - } -} +// func BenchmarkLogCommandSQLite(b *testing.B) { +// wl, err := wal.NewSQLiteWAL("/tmp/dicedb-lt") +// if err != nil { +// panic(err) +// } + +// if err := wl.Init(time.Now()); err != nil { +// slog.Error("could not initialize WAL", slog.Any("error", err)) +// } else { +// go wal.InitBG(wl) +// } + +// for i := 0; i < b.N; i++ { +// wl.LogCommand(&cmd.DiceDBCmd{ +// Cmd: "SET", +// Args: []string{"key", "value"}, +// }) +// } +// } func BenchmarkLogCommandAOF(b *testing.B) { wl, err := wal.NewAOFWAL("/tmp/dicedb-lt") @@ -42,9 +41,6 @@ func BenchmarkLogCommandAOF(b *testing.B) { } for i := 0; i < b.N; i++ { - wl.LogCommand(&cmd.DiceDBCmd{ - Cmd: "SET", - Args: []string{"key", "value"}, - }) + wl.LogCommand([]byte("SET key value")) } } diff --git a/internal/wal/wal_utils.go b/internal/wal/wal_utils.go new file mode 100644 index 000000000..57375918e --- /dev/null +++ b/internal/wal/wal_utils.go @@ -0,0 +1,81 @@ +package wal + +import ( + "fmt" + "hash/crc32" + + "google.golang.org/protobuf/proto" +) + +// unmarshalAndVerifyEntry unmarshals the given data into a WAL entry and +// verifies the CRC of the entry. Only returns an error if the CRC is invalid. +func unmarshalAndVerifyEntry(data []byte) (*WAL_Entry, error) { + var entry WAL_Entry + MustUnmarshal(data, &entry) + + if !verifyCRC(&entry) { + return nil, fmt.Errorf("CRC mismatch: data may be corrupted") + } + + return &entry, nil +} + +// Validates whether the given entry has a valid CRC. +func verifyCRC(entry *WAL_Entry) bool { + // Reset the entry CRC for the verification. + actualCRC := crc32.ChecksumIEEE(append(entry.GetData(), byte(entry.GetLogSequenceNumber()))) + + return entry.CRC == actualCRC +} + +// Marshals +func MustMarshal(entry *WAL_Entry) []byte { + marshaledEntry, err := proto.Marshal(entry) + if err != nil { + panic(fmt.Sprintf("marshal should never fail (%v)", err)) + } + + return marshaledEntry +} + +func MustUnmarshal(data []byte, entry *WAL_Entry) { + if err := proto.Unmarshal(data, entry); err != nil { + panic(fmt.Sprintf("unmarshal should never fail (%v)", err)) + } +} + +func getEntrySize(data []byte) int { + return versionTagSize + versionLengthPrefixSize + versionSize + // Version field + logSequenceNumberSize + // Log Sequence Number field + dataTagSize + dataLengthPrefixSize + len(data) + // Data field + CRCSize + // CRC field + timestampSize // Timestamp field +} + +func (w *WALAOF) validateConfig() error { + if w.logDir == "" { + return fmt.Errorf("logDir cannot be empty") + } + + if w.maxSegmentSize <= 0 { + return fmt.Errorf("maxSegmentSize must be greater than 0") + } + + if w.maxSegmentCount <= 0 { + return fmt.Errorf("maxSegmentCount must be greater than 0") + } + + if w.bufferSize <= 0 { + return fmt.Errorf("bufferSize must be greater than 0") + } + + if w.walMode == "buffered" && w.writeMode == "fsync" { + return fmt.Errorf("walMode 'buffered' cannot be used with writeMode 'fsync'") + } + + if w.walMode == "unbuffered" && w.writeMode == "default" { + return fmt.Errorf("walMode 'unbuffered' cannot have writeMode as 'default'") + } + + return nil +} diff --git a/main.go b/main.go index 0eea883a5..39e24e691 100644 --- a/main.go +++ b/main.go @@ -55,15 +55,7 @@ func main() { wl, _ = wal.NewNullWAL() if config.DiceConfig.Persistence.Enabled { - if config.DiceConfig.Persistence.WALEngine == "sqlite" { - _wl, err := wal.NewSQLiteWAL(config.DiceConfig.Persistence.WALDir) - if err != nil { - slog.Warn("could not create WAL with", slog.String("wal-engine", config.DiceConfig.Persistence.WALEngine), slog.Any("error", err)) - sigs <- syscall.SIGKILL - return - } - wl = _wl - } else if config.DiceConfig.Persistence.WALEngine == "aof" { + if config.DiceConfig.Persistence.WALEngine == "aof" { _wl, err := wal.NewAOFWAL(config.DiceConfig.Persistence.WALDir) if err != nil { slog.Warn("could not create WAL with", slog.String("wal-engine", config.DiceConfig.Persistence.WALEngine), slog.Any("error", err)) From 2f84bd8c2d855607f8a2a46bfc5410818f40d047 Mon Sep 17 00:00:00 2001 From: ayushsatyam146 Date: Tue, 3 Dec 2024 15:49:26 +0530 Subject: [PATCH 2/7] fix lint --- config/config.go | 26 +++++++------- internal/iothread/iothread.go | 8 +++-- internal/wal/wal.go | 2 +- internal/wal/wal_aof.go | 64 ++++++++++++++++++----------------- internal/wal/wal_null.go | 2 +- internal/wal/wal_utils.go | 22 ------------ 6 files changed, 54 insertions(+), 70 deletions(-) diff --git a/config/config.go b/config/config.go index 314fa4923..d45ce0b8a 100644 --- a/config/config.go +++ b/config/config.go @@ -177,19 +177,19 @@ type persistence struct { } type WALConfig struct { - LogDir string `config:"log_dir" default:"tmp/deicdeb-wal-lt"` - Enabled bool `config:"enabled" default:"true"` - WalMode string `config:"mode" default:"buffered" validate:"oneof=buffered unbuffered"` - WriteMode string `config:"write_mode" default:"default" validate:"oneof=default fsync"` - BufferSizeMB int `config:"buffer_size" default:"1" validate:"min=1"` - RotationMode string `config:"rotation_mode" default:"segemnt-size" validate:"oneof=segment-size time"` - MaxSegmentSizeMB int64 `config:"buffer_size" default:"16" validate:"min=1"` - SegmentRotationTimeSec time.Duration `config:"max_segment_rotation_time" default:"60" validate:"min=1"` - BufferSyncIntervalMillis time.Duration `config:"max_segment_rotation_time" default:"200" validate:"min=1"` - RetentionMode string `config:"retention_mode" default:"num-segments" validate:"oneof=num-segments time checkpoint"` - MaxSegmentCount int `config:"max_segment_count" default:"10" validate:"min=1"` - SegmentRetentionDurationSec time.Duration `config:"max_segment_retention_time" default:"600" validate:"min=1"` - RecoveryMode string `config:"recovery_mode" default:"strict" validate:"oneof=strict truncate ignore"` + LogDir string `config:"log_dir" default:"tmp/deicdeb-wal-lt"` + Enabled bool `config:"enabled" default:"true"` + WalMode string `config:"mode" default:"buffered" validate:"oneof=buffered unbuffered"` + WriteMode string `config:"write_mode" default:"default" validate:"oneof=default fsync"` + BufferSizeMB int `config:"buffer_size" default:"1" validate:"min=1"` + RotationMode string `config:"rotation_mode" default:"segemnt-size" validate:"oneof=segment-size time"` + MaxSegmentSizeMB int64 `config:"buffer_size" default:"16" validate:"min=1"` + SegmentRotationTime time.Duration `config:"max_segment_rotation_time" default:"60" validate:"min=1"` + BufferSyncInterval time.Duration `config:"max_segment_rotation_time" default:"200" validate:"min=1"` + RetentionMode string `config:"retention_mode" default:"num-segments" validate:"oneof=num-segments time checkpoint"` + MaxSegmentCount int `config:"max_segment_count" default:"10" validate:"min=1"` + SegmentRetentionDuration time.Duration `config:"max_segment_retention_time" default:"600" validate:"min=1"` + RecoveryMode string `config:"recovery_mode" default:"strict" validate:"oneof=strict truncate ignore"` } type logging struct { diff --git a/internal/iothread/iothread.go b/internal/iothread/iothread.go index e2e9842d2..3111a18c7 100644 --- a/internal/iothread/iothread.go +++ b/internal/iothread/iothread.go @@ -564,13 +564,17 @@ func (t *BaseIOThread) handleCommand(ctx context.Context, cmdMeta CmdMeta, diceD } if err == nil && t.wl != nil { - t.wl.LogCommand([]byte(fmt.Sprintf("%s %s", diceDBCmd.Cmd, strings.Join(diceDBCmd.Args, " ")))) + if err := t.wl.LogCommand([]byte(fmt.Sprintf("%s %s", diceDBCmd.Cmd, strings.Join(diceDBCmd.Args, " ")))); err != nil { + return err + } } case MultiShard, AllShard: err = t.writeResponse(ctx, cmdMeta.composeResponse(storeOp...)) if err == nil && t.wl != nil { - t.wl.LogCommand([]byte(fmt.Sprintf("%s %s", diceDBCmd.Cmd, strings.Join(diceDBCmd.Args, " ")))) + if err := t.wl.LogCommand([]byte(fmt.Sprintf("%s %s", diceDBCmd.Cmd, strings.Join(diceDBCmd.Args, " ")))); err != nil { + return err + } } default: slog.Error("Unknown command type", diff --git a/internal/wal/wal.go b/internal/wal/wal.go index 54c8f2919..32c5e3174 100644 --- a/internal/wal/wal.go +++ b/internal/wal/wal.go @@ -10,7 +10,7 @@ import ( ) type AbstractWAL interface { - LogCommand([] byte) error + LogCommand([]byte) error Close() error Init(t time.Time) error ForEachCommand(f func(c cmd.DiceDBCmd) error) error diff --git a/internal/wal/wal_aof.go b/internal/wal/wal_aof.go index 7371e48aa..f66f44b20 100644 --- a/internal/wal/wal_aof.go +++ b/internal/wal/wal_aof.go @@ -55,15 +55,14 @@ type WALAOF struct { } func NewAOFWAL(directory string) (*WALAOF, error) { - ctx, cancel := context.WithCancel(context.Background()) return &WALAOF{ logDir: directory, walMode: config.DiceConfig.WAL.WalMode, - bufferSyncTicker: time.NewTicker(config.DiceConfig.WAL.BufferSyncIntervalMillis * time.Millisecond), - segmentRotationTicker: time.NewTicker(config.DiceConfig.WAL.SegmentRotationTimeSec * time.Second), - segmentRetentionTicker: time.NewTicker(config.DiceConfig.WAL.SegmentRetentionDurationSec * time.Second), + bufferSyncTicker: time.NewTicker(config.DiceConfig.WAL.BufferSyncInterval * time.Millisecond), + segmentRotationTicker: time.NewTicker(config.DiceConfig.WAL.SegmentRotationTime * time.Second), + segmentRetentionTicker: time.NewTicker(config.DiceConfig.WAL.SegmentRetentionDuration * time.Second), writeMode: config.DiceConfig.WAL.WriteMode, maxSegmentSize: config.DiceConfig.WAL.MaxSegmentSizeMB * 1024 * 1024, maxSegmentCount: config.DiceConfig.WAL.MaxSegmentCount, @@ -76,37 +75,37 @@ func NewAOFWAL(directory string) (*WALAOF, error) { }, nil } -func (w *WALAOF) Init(t time.Time) error { - - if err := w.validateConfig(); err != nil { +func (wal *WALAOF) Init(t time.Time) error { + if err := wal.validateConfig(); err != nil { return err } // TODO - Restore existing checkpoints to memory // Create the directory if it doesn't exist - if err := os.MkdirAll(w.logDir, 0755); err != nil { + if err := os.MkdirAll(wal.logDir, 0755); err != nil { return nil } // Get the list of log segment files in the directory - files, err := filepath.Glob(filepath.Join(w.logDir, segmentPrefix+"*")) + files, err := filepath.Glob(filepath.Join(wal.logDir, segmentPrefix+"*")) if err != nil { return nil } if len(files) > 0 { + fmt.Println("Found existing log segments:", files) // TODO - Check if we have newer WAL entries after the last checkpoint and simultaneously replay and checkpoint them } var wg sync.WaitGroup - errCh := make(chan error, w.maxSegmentCount) + errCh := make(chan error, wal.maxSegmentCount) - for i := 0; i < w.maxSegmentCount; i++ { + for i := 0; i < wal.maxSegmentCount; i++ { wg.Add(1) go func(index int) { defer wg.Done() - filePath := filepath.Join(w.logDir, segmentPrefix+fmt.Sprintf("-%d", index)) + filePath := filepath.Join(wal.logDir, segmentPrefix+fmt.Sprintf("-%d", index)) file, err := os.Create(filePath) if err != nil { errCh <- fmt.Errorf("error creating segment file %s: %v", filePath, err) @@ -119,24 +118,24 @@ func (w *WALAOF) Init(t time.Time) error { wg.Wait() close(errCh) - w.lastSequenceNo = 0 - w.currentSegmentIndex = 0 - w.oldestSegmentIndex = 0 - w.byteOffset = 0 - w.currentSegmentFile, err = os.OpenFile(filepath.Join(w.logDir, "seg-0"), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) - if _, err = w.currentSegmentFile.Seek(0, io.SeekEnd); err != nil { + wal.lastSequenceNo = 0 + wal.currentSegmentIndex = 0 + wal.oldestSegmentIndex = 0 + wal.byteOffset = 0 + wal.currentSegmentFile, err = os.OpenFile(filepath.Join(wal.logDir, "seg-0"), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if _, err := wal.currentSegmentFile.Seek(0, io.SeekEnd); err != nil { return err } - w.bufWriter = bufio.NewWriterSize(w.currentSegmentFile, w.bufferSize) + wal.bufWriter = bufio.NewWriterSize(wal.currentSegmentFile, wal.bufferSize) - go w.keepSyncingBuffer() + go wal.keepSyncingBuffer() - if w.rotationMode == "time" { - go w.rotateSegmentPeriodically() + if wal.rotationMode == "time" { //nolint:goconst + go wal.rotateSegmentPeriodically() } - if w.retentionMode == "time" { - go w.deleteSegmentPeriodically() + if wal.retentionMode == "time" { //nolint:goconst + go wal.deleteSegmentPeriodically() } return nil @@ -161,7 +160,9 @@ func (wal *WALAOF) writeEntry(data []byte) error { } entrySize := getEntrySize(data) - wal.rotateLogIfNeeded(entrySize) + if err := wal.rotateLogIfNeeded(entrySize); err != nil { + return err + } wal.byteOffset += entrySize @@ -169,9 +170,11 @@ func (wal *WALAOF) writeEntry(data []byte) error { return err } - // if wal-mode unbuffered immediatley sync to disk - if wal.walMode == "unbuffered" { - wal.Sync() + // if wal-mode unbuffered immediately sync to disk + if wal.walMode == "unbuffered" { //nolint:goconst + if err := wal.Sync(); err != nil { + return err + } } return nil @@ -232,7 +235,6 @@ func (wal *WALAOF) rotateLog() error { } func (wal *WALAOF) deleteOldestSegment() error { - oldestSegmentFilePath := filepath.Join(wal.logDir, segmentPrefix+fmt.Sprintf("%d", wal.oldestSegmentIndex)) // TODO: checkpoint before deleting the file @@ -260,7 +262,7 @@ func (wal *WALAOF) Sync() error { if err := wal.bufWriter.Flush(); err != nil { return err } - if wal.writeMode == "fsync" { + if wal.writeMode == "fsync" { //nolint:goconst if err := wal.currentSegmentFile.Sync(); err != nil { return err } @@ -323,7 +325,7 @@ func (wal *WALAOF) deleteSegmentPeriodically() { } } -func (w *WALAOF) ForEachCommand(f func(c cmd.DiceDBCmd) error) error { +func (wal *WALAOF) ForEachCommand(f func(c cmd.DiceDBCmd) error) error { // TODO: implement this method return nil } diff --git a/internal/wal/wal_null.go b/internal/wal/wal_null.go index 46ae01b75..80f3aa849 100644 --- a/internal/wal/wal_null.go +++ b/internal/wal/wal_null.go @@ -18,7 +18,7 @@ func (w *WALNull) Init(t time.Time) error { } // LogCommand serializes a WALLogEntry and writes it to the current WAL file. -func (w *WALNull) LogCommand(b []byte) error{ +func (w *WALNull) LogCommand(b []byte) error { return nil } diff --git a/internal/wal/wal_utils.go b/internal/wal/wal_utils.go index 57375918e..ee8536f03 100644 --- a/internal/wal/wal_utils.go +++ b/internal/wal/wal_utils.go @@ -2,32 +2,10 @@ package wal import ( "fmt" - "hash/crc32" "google.golang.org/protobuf/proto" ) -// unmarshalAndVerifyEntry unmarshals the given data into a WAL entry and -// verifies the CRC of the entry. Only returns an error if the CRC is invalid. -func unmarshalAndVerifyEntry(data []byte) (*WAL_Entry, error) { - var entry WAL_Entry - MustUnmarshal(data, &entry) - - if !verifyCRC(&entry) { - return nil, fmt.Errorf("CRC mismatch: data may be corrupted") - } - - return &entry, nil -} - -// Validates whether the given entry has a valid CRC. -func verifyCRC(entry *WAL_Entry) bool { - // Reset the entry CRC for the verification. - actualCRC := crc32.ChecksumIEEE(append(entry.GetData(), byte(entry.GetLogSequenceNumber()))) - - return entry.CRC == actualCRC -} - // Marshals func MustMarshal(entry *WAL_Entry) []byte { marshaledEntry, err := proto.Marshal(entry) From c8929364c441a483244a7e996c973d521f4022aa Mon Sep 17 00:00:00 2001 From: ayushsatyam146 Date: Tue, 3 Dec 2024 15:57:33 +0530 Subject: [PATCH 3/7] fixes lint --- internal/wal/wal_aof.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/internal/wal/wal_aof.go b/internal/wal/wal_aof.go index f66f44b20..adfe3061e 100644 --- a/internal/wal/wal_aof.go +++ b/internal/wal/wal_aof.go @@ -122,7 +122,12 @@ func (wal *WALAOF) Init(t time.Time) error { wal.currentSegmentIndex = 0 wal.oldestSegmentIndex = 0 wal.byteOffset = 0 - wal.currentSegmentFile, err = os.OpenFile(filepath.Join(wal.logDir, "seg-0"), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + newFile, err := os.OpenFile(filepath.Join(wal.logDir, "seg-0"), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + return err + } + wal.currentSegmentFile = newFile + if _, err := wal.currentSegmentFile.Seek(0, io.SeekEnd); err != nil { return err } From 848d7ed314e4482025be5938a1483d6d33a41f35 Mon Sep 17 00:00:00 2001 From: ayushsatyam146 Date: Tue, 3 Dec 2024 16:57:47 +0530 Subject: [PATCH 4/7] fixed lint --- internal/wal/wal_utils.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/internal/wal/wal_utils.go b/internal/wal/wal_utils.go index ee8536f03..fc7059511 100644 --- a/internal/wal/wal_utils.go +++ b/internal/wal/wal_utils.go @@ -30,28 +30,28 @@ func getEntrySize(data []byte) int { timestampSize // Timestamp field } -func (w *WALAOF) validateConfig() error { - if w.logDir == "" { +func (wal *WALAOF) validateConfig() error { + if wal.logDir == "" { return fmt.Errorf("logDir cannot be empty") } - if w.maxSegmentSize <= 0 { + if wal.maxSegmentSize <= 0 { return fmt.Errorf("maxSegmentSize must be greater than 0") } - if w.maxSegmentCount <= 0 { + if wal.maxSegmentCount <= 0 { return fmt.Errorf("maxSegmentCount must be greater than 0") } - if w.bufferSize <= 0 { + if wal.bufferSize <= 0 { return fmt.Errorf("bufferSize must be greater than 0") } - if w.walMode == "buffered" && w.writeMode == "fsync" { + if wal.walMode == "buffered" && wal.writeMode == "fsync" { return fmt.Errorf("walMode 'buffered' cannot be used with writeMode 'fsync'") } - if w.walMode == "unbuffered" && w.writeMode == "default" { + if wal.walMode == "unbuffered" && wal.writeMode == "default" { return fmt.Errorf("walMode 'unbuffered' cannot have writeMode as 'default'") } From d972a4a247c53453005260aab423f20cab4e1cd6 Mon Sep 17 00:00:00 2001 From: ayushsatyam146 Date: Tue, 3 Dec 2024 20:03:17 +0530 Subject: [PATCH 5/7] fixing unit tests --- config/config.go | 12 ++++++------ dicedb.conf | 6 +++--- internal/wal/wal_aof.go | 6 +++--- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/config/config.go b/config/config.go index d45ce0b8a..b35c1d4a4 100644 --- a/config/config.go +++ b/config/config.go @@ -94,11 +94,11 @@ WriteMode = "default" BufferSizeMB = 1 RotationMode = "segemnt-size" MaxSegmentSizeMB = 16 -SegmentRotationTimeSec = 60 -BufferSyncIntervalMillis = 200 +SegmentRotationTime = 60 +BufferSyncInterval = 200 RetentionMode = "num-segments" MaxSegmentCount = 10 -SegmentRetentionDurationSec = 600 +SegmentRetentionDuration = 600 RecoveryMode = "strict"` ) @@ -184,11 +184,11 @@ type WALConfig struct { BufferSizeMB int `config:"buffer_size" default:"1" validate:"min=1"` RotationMode string `config:"rotation_mode" default:"segemnt-size" validate:"oneof=segment-size time"` MaxSegmentSizeMB int64 `config:"buffer_size" default:"16" validate:"min=1"` - SegmentRotationTime time.Duration `config:"max_segment_rotation_time" default:"60" validate:"min=1"` - BufferSyncInterval time.Duration `config:"max_segment_rotation_time" default:"200" validate:"min=1"` + SegmentRotationTime time.Duration `config:"max_segment_rotation_time" default:"60s" validate:"min=1s"` + BufferSyncInterval time.Duration `config:"max_segment_rotation_time" default:"200ms" validate:"min=1ms"` RetentionMode string `config:"retention_mode" default:"num-segments" validate:"oneof=num-segments time checkpoint"` MaxSegmentCount int `config:"max_segment_count" default:"10" validate:"min=1"` - SegmentRetentionDuration time.Duration `config:"max_segment_retention_time" default:"600" validate:"min=1"` + SegmentRetentionDuration time.Duration `config:"max_segment_retention_time" default:"600s" validate:"min=1s"` RecoveryMode string `config:"recovery_mode" default:"strict" validate:"oneof=strict truncate ignore"` } diff --git a/dicedb.conf b/dicedb.conf index d012b3ca7..2cc62eb3d 100644 --- a/dicedb.conf +++ b/dicedb.conf @@ -66,9 +66,9 @@ WriteMode = "default" BufferSizeMB = 1 RotationMode = "segemnt-size" MaxSegmentSizeMB = 16 -SegmentRotationTimeSec = 60 -BufferSyncIntervalMillis = 200 +SegmentRotationTime = 60s +BufferSyncInterval = 200ms RetentionMode = "num-segments" MaxSegmentCount = 10 -SegmentRetentionDurationSec = 600 +SegmentRetentionDuration = 600s RecoveryMode = "strict"` \ No newline at end of file diff --git a/internal/wal/wal_aof.go b/internal/wal/wal_aof.go index adfe3061e..81a25cbb7 100644 --- a/internal/wal/wal_aof.go +++ b/internal/wal/wal_aof.go @@ -60,9 +60,9 @@ func NewAOFWAL(directory string) (*WALAOF, error) { return &WALAOF{ logDir: directory, walMode: config.DiceConfig.WAL.WalMode, - bufferSyncTicker: time.NewTicker(config.DiceConfig.WAL.BufferSyncInterval * time.Millisecond), - segmentRotationTicker: time.NewTicker(config.DiceConfig.WAL.SegmentRotationTime * time.Second), - segmentRetentionTicker: time.NewTicker(config.DiceConfig.WAL.SegmentRetentionDuration * time.Second), + bufferSyncTicker: time.NewTicker(config.DiceConfig.WAL.BufferSyncInterval), + segmentRotationTicker: time.NewTicker(config.DiceConfig.WAL.SegmentRotationTime), + segmentRetentionTicker: time.NewTicker(config.DiceConfig.WAL.SegmentRetentionDuration), writeMode: config.DiceConfig.WAL.WriteMode, maxSegmentSize: config.DiceConfig.WAL.MaxSegmentSizeMB * 1024 * 1024, maxSegmentCount: config.DiceConfig.WAL.MaxSegmentCount, From 982faa42a9cd9b99c073060061dce8adb2adadfb Mon Sep 17 00:00:00 2001 From: ayushsatyam146 Date: Thu, 12 Dec 2024 13:05:22 +0530 Subject: [PATCH 6/7] introducing enums and removing pre allocated files --- config/config.go | 52 +++++++++++------- config/validator.go | 34 ++++++++++++ dicedb.conf | 10 ++-- internal/wal/wal.pb.go | 65 +++++++++++----------- internal/wal/wal.proto | 6 +-- internal/wal/wal_aof.go | 111 +++++++++++++++----------------------- internal/wal/wal_test.go | 20 ------- internal/wal/wal_utils.go | 43 +++++---------- main.go | 8 ++- 9 files changed, 168 insertions(+), 181 deletions(-) diff --git a/config/config.go b/config/config.go index b35c1d4a4..25c0659ba 100644 --- a/config/config.go +++ b/config/config.go @@ -66,7 +66,7 @@ memory.keys_limit = 200000000 memory.lfu_log_factor = 10 # Persistence Configuration -persistence.enabled = true +persistence.enabled = false persistence.aof_file = "./dice-master.aof" persistence.persistence_enabled = true persistence.write_aof_on_cleanup = false @@ -87,18 +87,18 @@ network.io_buffer_length = 512 network.io_buffer_length_max = 51200 # WAL Configuration -LogDir = "tmp/deicdeb-wal-lt" +LogDir = "tmp/dicedb-wal" Enabled = "true" WalMode = "buffered" WriteMode = "default" BufferSizeMB = 1 RotationMode = "segemnt-size" MaxSegmentSizeMB = 16 -SegmentRotationTime = 60 -BufferSyncInterval = 200 +MaxSegmentRotationTime = 60s +BufferSyncInterval = 200ms RetentionMode = "num-segments" MaxSegmentCount = 10 -SegmentRetentionDuration = 600 +MaxSegmentRetentionDuration = 600s RecoveryMode = "strict"` ) @@ -168,28 +168,40 @@ type memory struct { } type persistence struct { - Enabled bool `config:"enabled" default:"true"` + Enabled bool `config:"enabled" default:"false"` AOFFile string `config:"aof_file" default:"./dice-master.aof" validate:"filepath"` WriteAOFOnCleanup bool `config:"write_aof_on_cleanup" default:"false"` - WALDir string `config:"wal-dir" default:"./" validate:"dirpath"` RestoreFromWAL bool `config:"restore-wal" default:"false"` WALEngine string `config:"wal-engine" default:"aof" validate:"oneof=sqlite aof"` } type WALConfig struct { - LogDir string `config:"log_dir" default:"tmp/deicdeb-wal-lt"` - Enabled bool `config:"enabled" default:"true"` - WalMode string `config:"mode" default:"buffered" validate:"oneof=buffered unbuffered"` - WriteMode string `config:"write_mode" default:"default" validate:"oneof=default fsync"` - BufferSizeMB int `config:"buffer_size" default:"1" validate:"min=1"` - RotationMode string `config:"rotation_mode" default:"segemnt-size" validate:"oneof=segment-size time"` - MaxSegmentSizeMB int64 `config:"buffer_size" default:"16" validate:"min=1"` - SegmentRotationTime time.Duration `config:"max_segment_rotation_time" default:"60s" validate:"min=1s"` - BufferSyncInterval time.Duration `config:"max_segment_rotation_time" default:"200ms" validate:"min=1ms"` - RetentionMode string `config:"retention_mode" default:"num-segments" validate:"oneof=num-segments time checkpoint"` - MaxSegmentCount int `config:"max_segment_count" default:"10" validate:"min=1"` - SegmentRetentionDuration time.Duration `config:"max_segment_retention_time" default:"600s" validate:"min=1s"` - RecoveryMode string `config:"recovery_mode" default:"strict" validate:"oneof=strict truncate ignore"` + // Directory where WAL log files will be stored + LogDir string `config:"log_dir" default:"tmp/dicedb-wal"` + // Whether WAL is enabled + Enabled bool `config:"enabled" default:"true"` + // WAL buffering mode: 'buffered' (writes buffered in memory) or 'unbuffered' (immediate disk writes) + WalMode string `config:"wal_mode" default:"buffered" validate:"oneof=buffered unbuffered"` + // Write mode: 'default' (OS handles syncing) or 'fsync' (explicit fsync after writes) + WriteMode string `config:"write_mode" default:"default" validate:"oneof=default fsync"` + // Size of the write buffer in megabytes + BufferSizeMB int `config:"buffer_size_mb" default:"1" validate:"min=1"` + // How WAL rotation is triggered: 'segment-size' (based on file size) or 'time' (based on duration) + RotationMode string `config:"rotation_mode" default:"segemnt-size" validate:"oneof=segment-size time"` + // Maximum size of a WAL segment file in megabytes before rotation + MaxSegmentSizeMB int `config:"max_segment_size_mb" default:"16" validate:"min=1"` + // Time interval in seconds after which WAL segment is rotated when using time-based rotation + MaxSegmentRotationTime time.Duration `config:"max_segment_rotation_time" default:"60s" validate:"min=1s"` + // Time interval in Milliseconds after which buffered WAL data is synced to disk + BufferSyncInterval time.Duration `config:"buffer_sync_interval" default:"200ms" validate:"min=1ms"` + // How old segments are removed: 'num-segments' (keep N latest), 'time' (by age), or 'checkpoint' (after checkpoint) + RetentionMode string `config:"retention_mode" default:"num-segments" validate:"oneof=num-segments time checkpoint"` + // Maximum number of WAL segment files to retain when using num-segments retention + MaxSegmentCount int `config:"max_segment_count" default:"10" validate:"min=1"` + // Time interval in Seconds till which WAL segments are retained when using time-based retention + MaxSegmentRetentionDuration time.Duration `config:"max_segment_retention_duration" default:"600s" validate:"min=1s"` + // How to handle WAL corruption on recovery: 'strict' (fail), 'truncate' (truncate at corruption), 'ignore' (skip corrupted) + RecoveryMode string `config:"recovery_mode" default:"strict" validate:"oneof=strict truncate ignore"` } type logging struct { diff --git a/config/validator.go b/config/validator.go index 66669ff43..9564777a9 100644 --- a/config/validator.go +++ b/config/validator.go @@ -12,6 +12,7 @@ import ( func validateConfig(config *Config) error { validate := validator.New() validate.RegisterStructValidation(validateShardCount, Config{}) + validate.RegisterStructValidation(validateWALConfig, Config{}) if err := validate.Struct(config); err != nil { validationErrors, ok := err.(validator.ValidationErrors) @@ -95,3 +96,36 @@ func applyDefaultValuesFromTags(config *Config, fieldName string) error { log.Printf("Setting default value for %s to: %s", fieldName, defaultValue) return nil } + +func validateWALConfig(sl validator.StructLevel) { + config := sl.Current().Interface().(Config) + + // LogDir validation + if config.WAL.LogDir == "" { + sl.ReportError(config.WAL.LogDir, "LogDir", "LogDir", "required", "cannot be empty") + } + + // MaxSegmentSize validation + if config.WAL.MaxSegmentSizeMB <= 0 { + sl.ReportError(config.WAL.MaxSegmentSizeMB, "MaxSegmentSize", "MaxSegmentSize", "gt", "must be greater than 0") + } + + // MaxSegmentCount validation + if config.WAL.MaxSegmentCount <= 0 { + sl.ReportError(config.WAL.MaxSegmentCount, "MaxSegmentCount", "MaxSegmentCount", "gt", "must be greater than 0") + } + + // BufferSize validation + if config.WAL.BufferSizeMB <= 0 { + sl.ReportError(config.WAL.BufferSizeMB, "BufferSize", "BufferSize", "gt", "must be greater than 0") + } + + // WALMode and WriteMode compatibility checks + if config.WAL.WalMode == "buffered" && config.WAL.WriteMode == "fsync" { + sl.ReportError(config.WAL.WalMode, "WALMode", "WALMode", "incompatible", "walMode 'buffered' cannot be used with writeMode 'fsync'") + } + + if config.WAL.WalMode == "unbuffered" && config.WAL.WriteMode == "default" { + sl.ReportError(config.WAL.WalMode, "WALMode", "WALMode", "incompatible", "walMode 'unbuffered' cannot have writeMode as 'default'") + } +} diff --git a/dicedb.conf b/dicedb.conf index 2cc62eb3d..af4759337 100644 --- a/dicedb.conf +++ b/dicedb.conf @@ -39,7 +39,7 @@ memory.keys_limit = 200000000 memory.lfu_log_factor = 10 # Persistence Configuration -persistence.enabled = true +persistence.enabled = false persistence.aof_file = "./dice-master.aof" persistence.persistence_enabled = true persistence.write_aof_on_cleanup = false @@ -59,16 +59,16 @@ network.io_buffer_length = 512 network.io_buffer_length_max = 51200 # WAL Configuration -LogDir = "tmp/deicdeb-wal-lt" +LogDir = "tmp/dicedb-wal" Enabled = "true" WalMode = "buffered" WriteMode = "default" BufferSizeMB = 1 RotationMode = "segemnt-size" MaxSegmentSizeMB = 16 -SegmentRotationTime = 60s +MaxSegmentRotationTime = 60s BufferSyncInterval = 200ms RetentionMode = "num-segments" MaxSegmentCount = 10 -SegmentRetentionDuration = 600s -RecoveryMode = "strict"` \ No newline at end of file +MaxSegmentRetentionDuration = 600s +RecoveryMode = "strict" \ No newline at end of file diff --git a/internal/wal/wal.pb.go b/internal/wal/wal.pb.go index 40f22cc8c..617c6d9ca 100644 --- a/internal/wal/wal.pb.go +++ b/internal/wal/wal.pb.go @@ -20,32 +20,32 @@ const ( _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) ) -type WAL_Entry struct { +type WALEntry struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Version string `protobuf:"bytes,1,opt,name=version,proto3" json:"version,omitempty"` // Version of the WAL entry (e.g., "v1.0") - LogSequenceNumber uint64 `protobuf:"varint,2,opt,name=logSequenceNumber,proto3" json:"logSequenceNumber,omitempty"` // Log Sequence Number (LSN) - Data []byte `protobuf:"bytes,3,opt,name=data,proto3" json:"data,omitempty"` // The actual data being logged - CRC uint32 `protobuf:"varint,4,opt,name=CRC,proto3" json:"CRC,omitempty"` // Cyclic Redundancy Check for integrity - Timestamp int64 `protobuf:"varint,5,opt,name=timestamp,proto3" json:"timestamp,omitempty"` // Timestamp for the WAL entry (epoch time in nanoseconds) + Version string `protobuf:"bytes,1,opt,name=version,proto3" json:"version,omitempty"` // Version of the WAL entry (e.g., "v1.0") + LogSequenceNumber uint64 `protobuf:"varint,2,opt,name=log_sequence_number,json=logSequenceNumber,proto3" json:"log_sequence_number,omitempty"` // Log Sequence Number (LSN) + Data []byte `protobuf:"bytes,3,opt,name=data,proto3" json:"data,omitempty"` // The actual data being logged + Crc32 uint32 `protobuf:"varint,4,opt,name=crc32,proto3" json:"crc32,omitempty"` // Cyclic Redundancy Check for integrity + Timestamp int64 `protobuf:"varint,5,opt,name=timestamp,proto3" json:"timestamp,omitempty"` // Timestamp for the WAL entry (epoch time in nanoseconds) } -func (x *WAL_Entry) Reset() { - *x = WAL_Entry{} +func (x *WALEntry) Reset() { + *x = WALEntry{} mi := &file_internal_wal_wal_proto_msgTypes[0] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } -func (x *WAL_Entry) String() string { +func (x *WALEntry) String() string { return protoimpl.X.MessageStringOf(x) } -func (*WAL_Entry) ProtoMessage() {} +func (*WALEntry) ProtoMessage() {} -func (x *WAL_Entry) ProtoReflect() protoreflect.Message { +func (x *WALEntry) ProtoReflect() protoreflect.Message { mi := &file_internal_wal_wal_proto_msgTypes[0] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -57,40 +57,40 @@ func (x *WAL_Entry) ProtoReflect() protoreflect.Message { return mi.MessageOf(x) } -// Deprecated: Use WAL_Entry.ProtoReflect.Descriptor instead. -func (*WAL_Entry) Descriptor() ([]byte, []int) { +// Deprecated: Use WALEntry.ProtoReflect.Descriptor instead. +func (*WALEntry) Descriptor() ([]byte, []int) { return file_internal_wal_wal_proto_rawDescGZIP(), []int{0} } -func (x *WAL_Entry) GetVersion() string { +func (x *WALEntry) GetVersion() string { if x != nil { return x.Version } return "" } -func (x *WAL_Entry) GetLogSequenceNumber() uint64 { +func (x *WALEntry) GetLogSequenceNumber() uint64 { if x != nil { return x.LogSequenceNumber } return 0 } -func (x *WAL_Entry) GetData() []byte { +func (x *WALEntry) GetData() []byte { if x != nil { return x.Data } return nil } -func (x *WAL_Entry) GetCRC() uint32 { +func (x *WALEntry) GetCrc32() uint32 { if x != nil { - return x.CRC + return x.Crc32 } return 0 } -func (x *WAL_Entry) GetTimestamp() int64 { +func (x *WALEntry) GetTimestamp() int64 { if x != nil { return x.Timestamp } @@ -101,18 +101,19 @@ var File_internal_wal_wal_proto protoreflect.FileDescriptor var file_internal_wal_wal_proto_rawDesc = []byte{ 0x0a, 0x16, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x77, 0x61, 0x6c, 0x2f, 0x77, - 0x61, 0x6c, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x03, 0x77, 0x61, 0x6c, 0x22, 0x97, 0x01, - 0x0a, 0x09, 0x57, 0x41, 0x4c, 0x5f, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x18, 0x0a, 0x07, 0x76, - 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x76, 0x65, - 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x2c, 0x0a, 0x11, 0x6c, 0x6f, 0x67, 0x53, 0x65, 0x71, 0x75, - 0x65, 0x6e, 0x63, 0x65, 0x4e, 0x75, 0x6d, 0x62, 0x65, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, - 0x52, 0x11, 0x6c, 0x6f, 0x67, 0x53, 0x65, 0x71, 0x75, 0x65, 0x6e, 0x63, 0x65, 0x4e, 0x75, 0x6d, - 0x62, 0x65, 0x72, 0x12, 0x12, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x03, 0x20, 0x01, 0x28, - 0x0c, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x12, 0x10, 0x0a, 0x03, 0x43, 0x52, 0x43, 0x18, 0x04, - 0x20, 0x01, 0x28, 0x0d, 0x52, 0x03, 0x43, 0x52, 0x43, 0x12, 0x1c, 0x0a, 0x09, 0x74, 0x69, 0x6d, - 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, 0x05, 0x20, 0x01, 0x28, 0x03, 0x52, 0x09, 0x74, 0x69, - 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x42, 0x0e, 0x5a, 0x0c, 0x69, 0x6e, 0x74, 0x65, 0x72, - 0x6e, 0x61, 0x6c, 0x2f, 0x77, 0x61, 0x6c, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x61, 0x6c, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x03, 0x77, 0x61, 0x6c, 0x22, 0x9c, 0x01, + 0x0a, 0x08, 0x57, 0x41, 0x4c, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x18, 0x0a, 0x07, 0x76, 0x65, + 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x76, 0x65, 0x72, + 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x2e, 0x0a, 0x13, 0x6c, 0x6f, 0x67, 0x5f, 0x73, 0x65, 0x71, 0x75, + 0x65, 0x6e, 0x63, 0x65, 0x5f, 0x6e, 0x75, 0x6d, 0x62, 0x65, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, + 0x04, 0x52, 0x11, 0x6c, 0x6f, 0x67, 0x53, 0x65, 0x71, 0x75, 0x65, 0x6e, 0x63, 0x65, 0x4e, 0x75, + 0x6d, 0x62, 0x65, 0x72, 0x12, 0x12, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x03, 0x20, 0x01, + 0x28, 0x0c, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x12, 0x14, 0x0a, 0x05, 0x63, 0x72, 0x63, 0x33, + 0x32, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x05, 0x63, 0x72, 0x63, 0x33, 0x32, 0x12, 0x1c, + 0x0a, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, 0x05, 0x20, 0x01, 0x28, + 0x03, 0x52, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x42, 0x0e, 0x5a, 0x0c, + 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x77, 0x61, 0x6c, 0x62, 0x06, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -129,7 +130,7 @@ func file_internal_wal_wal_proto_rawDescGZIP() []byte { var file_internal_wal_wal_proto_msgTypes = make([]protoimpl.MessageInfo, 1) var file_internal_wal_wal_proto_goTypes = []any{ - (*WAL_Entry)(nil), // 0: wal.WAL_Entry + (*WALEntry)(nil), // 0: wal.WALEntry } var file_internal_wal_wal_proto_depIdxs = []int32{ 0, // [0:0] is the sub-list for method output_type diff --git a/internal/wal/wal.proto b/internal/wal/wal.proto index dde026656..9184fe008 100644 --- a/internal/wal/wal.proto +++ b/internal/wal/wal.proto @@ -3,10 +3,10 @@ syntax = "proto3"; package wal; option go_package = "internal/wal"; -message WAL_Entry { +message WALEntry { string version = 1; // Version of the WAL entry (e.g., "v1.0") - uint64 logSequenceNumber = 2; // Log Sequence Number (LSN) + uint64 log_sequence_number = 2; // Log Sequence Number (LSN) bytes data = 3; // The actual data being logged - uint32 CRC = 4; // Cyclic Redundancy Check for integrity + uint32 crc32 = 4; // Cyclic Redundancy Check for integrity int64 timestamp = 5; // Timestamp for the WAL entry (epoch time in nanoseconds) } diff --git a/internal/wal/wal_aof.go b/internal/wal/wal_aof.go index 81a25cbb7..c55ead590 100644 --- a/internal/wal/wal_aof.go +++ b/internal/wal/wal_aof.go @@ -8,6 +8,7 @@ import ( "hash/crc32" "io" "log" + "log/slog" "os" "path/filepath" sync "sync" @@ -18,24 +19,19 @@ import ( ) const ( - segmentPrefix = "seg-" - defaultVersion = "v0.0.1" - versionTagSize = 1 // Tag for "version" field - versionLengthPrefixSize = 1 // Length prefix for "version" - versionSize = 6 // Fixed size for "v0.0.1" - logSequenceNumberSize = 8 - dataTagSize = 1 // Tag for "data" field - dataLengthPrefixSize = 1 // Length prefix for "data" - CRCSize = 4 - timestampSize = 8 + segmentPrefix = "seg-" + defaultVersion = "v0.0.1" + RotationModeTime = "time" + RetentionModeTime = "time" + WALModeUnbuffered = "unbuffered" ) -type WALAOF struct { +type AOF struct { logDir string currentSegmentFile *os.File walMode string writeMode string - maxSegmentSize int64 + maxSegmentSize int maxSegmentCount int currentSegmentIndex int oldestSegmentIndex int @@ -49,20 +45,20 @@ type WALAOF struct { bufferSyncTicker *time.Ticker segmentRotationTicker *time.Ticker segmentRetentionTicker *time.Ticker - lock sync.Mutex + mu sync.Mutex ctx context.Context cancel context.CancelFunc } -func NewAOFWAL(directory string) (*WALAOF, error) { +func NewAOFWAL(directory string) (*AOF, error) { ctx, cancel := context.WithCancel(context.Background()) - return &WALAOF{ + return &AOF{ logDir: directory, walMode: config.DiceConfig.WAL.WalMode, bufferSyncTicker: time.NewTicker(config.DiceConfig.WAL.BufferSyncInterval), - segmentRotationTicker: time.NewTicker(config.DiceConfig.WAL.SegmentRotationTime), - segmentRetentionTicker: time.NewTicker(config.DiceConfig.WAL.SegmentRetentionDuration), + segmentRotationTicker: time.NewTicker(config.DiceConfig.WAL.MaxSegmentRotationTime), + segmentRetentionTicker: time.NewTicker(config.DiceConfig.WAL.MaxSegmentRetentionDuration), writeMode: config.DiceConfig.WAL.WriteMode, maxSegmentSize: config.DiceConfig.WAL.MaxSegmentSizeMB * 1024 * 1024, maxSegmentCount: config.DiceConfig.WAL.MaxSegmentCount, @@ -75,10 +71,7 @@ func NewAOFWAL(directory string) (*WALAOF, error) { }, nil } -func (wal *WALAOF) Init(t time.Time) error { - if err := wal.validateConfig(); err != nil { - return err - } +func (wal *AOF) Init(t time.Time) error { // TODO - Restore existing checkpoints to memory @@ -94,30 +87,10 @@ func (wal *WALAOF) Init(t time.Time) error { } if len(files) > 0 { - fmt.Println("Found existing log segments:", files) + slog.Info("Found existing log segments", slog.Any("files", files)) // TODO - Check if we have newer WAL entries after the last checkpoint and simultaneously replay and checkpoint them } - var wg sync.WaitGroup - errCh := make(chan error, wal.maxSegmentCount) - - for i := 0; i < wal.maxSegmentCount; i++ { - wg.Add(1) - go func(index int) { - defer wg.Done() - filePath := filepath.Join(wal.logDir, segmentPrefix+fmt.Sprintf("-%d", index)) - file, err := os.Create(filePath) - if err != nil { - errCh <- fmt.Errorf("error creating segment file %s: %v", filePath, err) - return - } - defer file.Close() - }(i) - } - - wg.Wait() - close(errCh) - wal.lastSequenceNo = 0 wal.currentSegmentIndex = 0 wal.oldestSegmentIndex = 0 @@ -135,11 +108,11 @@ func (wal *WALAOF) Init(t time.Time) error { go wal.keepSyncingBuffer() - if wal.rotationMode == "time" { //nolint:goconst + if wal.rotationMode == RotationModeTime { go wal.rotateSegmentPeriodically() } - if wal.retentionMode == "time" { //nolint:goconst + if wal.retentionMode == RetentionModeTime { go wal.deleteSegmentPeriodically() } @@ -147,20 +120,20 @@ func (wal *WALAOF) Init(t time.Time) error { } // WriteEntry writes an entry to the WAL. -func (wal *WALAOF) LogCommand(data []byte) error { +func (wal *AOF) LogCommand(data []byte) error { return wal.writeEntry(data) } -func (wal *WALAOF) writeEntry(data []byte) error { - wal.lock.Lock() - defer wal.lock.Unlock() +func (wal *AOF) writeEntry(data []byte) error { + wal.mu.Lock() + defer wal.mu.Unlock() wal.lastSequenceNo++ - entry := &WAL_Entry{ + entry := &WALEntry{ Version: defaultVersion, LogSequenceNumber: wal.lastSequenceNo, Data: data, - CRC: crc32.ChecksumIEEE(append(data, byte(wal.lastSequenceNo))), + Crc32: crc32.ChecksumIEEE(append(data, byte(wal.lastSequenceNo))), Timestamp: time.Now().UnixNano(), } @@ -176,7 +149,7 @@ func (wal *WALAOF) writeEntry(data []byte) error { } // if wal-mode unbuffered immediately sync to disk - if wal.walMode == "unbuffered" { //nolint:goconst + if wal.walMode == WALModeUnbuffered { //nolint:goconst if err := wal.Sync(); err != nil { return err } @@ -185,7 +158,7 @@ func (wal *WALAOF) writeEntry(data []byte) error { return nil } -func (wal *WALAOF) writeEntryToBuffer(entry *WAL_Entry) error { +func (wal *AOF) writeEntryToBuffer(entry *WALEntry) error { marshaledEntry := MustMarshal(entry) size := int32(len(marshaledEntry)) @@ -198,8 +171,8 @@ func (wal *WALAOF) writeEntryToBuffer(entry *WAL_Entry) error { } // rotateLogIfNeeded is not thread safe -func (wal *WALAOF) rotateLogIfNeeded(entrySize int) error { - if int64(wal.byteOffset+entrySize) > wal.maxSegmentSize { +func (wal *AOF) rotateLogIfNeeded(entrySize int) error { + if wal.byteOffset+entrySize > wal.maxSegmentSize { if err := wal.rotateLog(); err != nil { return err } @@ -208,7 +181,7 @@ func (wal *WALAOF) rotateLogIfNeeded(entrySize int) error { } // rotateLog is not thread safe -func (wal *WALAOF) rotateLog() error { +func (wal *AOF) rotateLog() error { if err := wal.Sync(); err != nil { return err } @@ -226,7 +199,7 @@ func (wal *WALAOF) rotateLog() error { wal.oldestSegmentIndex++ } - newFile, err := os.OpenFile(filepath.Join(wal.logDir, segmentPrefix+fmt.Sprintf("-%d", wal.currentSegmentIndex)), os.O_RDWR|os.O_CREATE, 0644) + newFile, err := os.OpenFile(filepath.Join(wal.logDir, segmentPrefix+fmt.Sprintf("-%d", wal.currentSegmentIndex)), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) if err != nil { log.Fatalf("failed opening file: %s", err) } @@ -239,7 +212,7 @@ func (wal *WALAOF) rotateLog() error { return nil } -func (wal *WALAOF) deleteOldestSegment() error { +func (wal *AOF) deleteOldestSegment() error { oldestSegmentFilePath := filepath.Join(wal.logDir, segmentPrefix+fmt.Sprintf("%d", wal.oldestSegmentIndex)) // TODO: checkpoint before deleting the file @@ -253,7 +226,7 @@ func (wal *WALAOF) deleteOldestSegment() error { } // Close the WAL file. It also calls Sync() on the WAL. -func (wal *WALAOF) Close() error { +func (wal *AOF) Close() error { wal.cancel() if err := wal.Sync(); err != nil { return err @@ -263,7 +236,7 @@ func (wal *WALAOF) Close() error { // Writes out any data in the WAL's in-memory buffer to the segment file. If // fsync is enabled, it also calls fsync on the segment file. -func (wal *WALAOF) Sync() error { +func (wal *AOF) Sync() error { if err := wal.bufWriter.Flush(); err != nil { return err } @@ -276,14 +249,14 @@ func (wal *WALAOF) Sync() error { return nil } -func (wal *WALAOF) keepSyncingBuffer() { +func (wal *AOF) keepSyncingBuffer() { for { select { case <-wal.bufferSyncTicker.C: - wal.lock.Lock() + wal.mu.Lock() err := wal.Sync() - wal.lock.Unlock() + wal.mu.Unlock() if err != nil { log.Printf("Error while performing sync: %v", err) @@ -295,14 +268,14 @@ func (wal *WALAOF) keepSyncingBuffer() { } } -func (wal *WALAOF) rotateSegmentPeriodically() { +func (wal *AOF) rotateSegmentPeriodically() { for { select { case <-wal.segmentRotationTicker.C: - wal.lock.Lock() + wal.mu.Lock() err := wal.rotateLog() - wal.lock.Unlock() + wal.mu.Unlock() if err != nil { log.Printf("Error while performing sync: %v", err) } @@ -313,14 +286,14 @@ func (wal *WALAOF) rotateSegmentPeriodically() { } } -func (wal *WALAOF) deleteSegmentPeriodically() { +func (wal *AOF) deleteSegmentPeriodically() { for { select { case <-wal.segmentRetentionTicker.C: - wal.lock.Lock() + wal.mu.Lock() err := wal.deleteOldestSegment() - wal.lock.Unlock() + wal.mu.Unlock() if err != nil { log.Printf("Error while deleting segment: %v", err) } @@ -330,7 +303,7 @@ func (wal *WALAOF) deleteSegmentPeriodically() { } } -func (wal *WALAOF) ForEachCommand(f func(c cmd.DiceDBCmd) error) error { +func (wal *AOF) ForEachCommand(f func(c cmd.DiceDBCmd) error) error { // TODO: implement this method return nil } diff --git a/internal/wal/wal_test.go b/internal/wal/wal_test.go index f35de93e1..69ff2d803 100644 --- a/internal/wal/wal_test.go +++ b/internal/wal/wal_test.go @@ -8,26 +8,6 @@ import ( "github.com/dicedb/dice/internal/wal" ) -// func BenchmarkLogCommandSQLite(b *testing.B) { -// wl, err := wal.NewSQLiteWAL("/tmp/dicedb-lt") -// if err != nil { -// panic(err) -// } - -// if err := wl.Init(time.Now()); err != nil { -// slog.Error("could not initialize WAL", slog.Any("error", err)) -// } else { -// go wal.InitBG(wl) -// } - -// for i := 0; i < b.N; i++ { -// wl.LogCommand(&cmd.DiceDBCmd{ -// Cmd: "SET", -// Args: []string{"key", "value"}, -// }) -// } -// } - func BenchmarkLogCommandAOF(b *testing.B) { wl, err := wal.NewAOFWAL("/tmp/dicedb-lt") if err != nil { diff --git a/internal/wal/wal_utils.go b/internal/wal/wal_utils.go index fc7059511..790f8b7c1 100644 --- a/internal/wal/wal_utils.go +++ b/internal/wal/wal_utils.go @@ -6,8 +6,19 @@ import ( "google.golang.org/protobuf/proto" ) +const ( + versionTagSize = 1 // Tag for "version" field + versionLengthPrefixSize = 1 // Length prefix for "version" + versionSize = 6 // Fixed size for "v0.0.1" + logSequenceNumberSize = 8 + dataTagSize = 1 // Tag for "data" field + dataLengthPrefixSize = 1 // Length prefix for "data" + CRCSize = 4 + timestampSize = 8 +) + // Marshals -func MustMarshal(entry *WAL_Entry) []byte { +func MustMarshal(entry *WALEntry) []byte { marshaledEntry, err := proto.Marshal(entry) if err != nil { panic(fmt.Sprintf("marshal should never fail (%v)", err)) @@ -16,7 +27,7 @@ func MustMarshal(entry *WAL_Entry) []byte { return marshaledEntry } -func MustUnmarshal(data []byte, entry *WAL_Entry) { +func MustUnmarshal(data []byte, entry *WALEntry) { if err := proto.Unmarshal(data, entry); err != nil { panic(fmt.Sprintf("unmarshal should never fail (%v)", err)) } @@ -29,31 +40,3 @@ func getEntrySize(data []byte) int { CRCSize + // CRC field timestampSize // Timestamp field } - -func (wal *WALAOF) validateConfig() error { - if wal.logDir == "" { - return fmt.Errorf("logDir cannot be empty") - } - - if wal.maxSegmentSize <= 0 { - return fmt.Errorf("maxSegmentSize must be greater than 0") - } - - if wal.maxSegmentCount <= 0 { - return fmt.Errorf("maxSegmentCount must be greater than 0") - } - - if wal.bufferSize <= 0 { - return fmt.Errorf("bufferSize must be greater than 0") - } - - if wal.walMode == "buffered" && wal.writeMode == "fsync" { - return fmt.Errorf("walMode 'buffered' cannot be used with writeMode 'fsync'") - } - - if wal.walMode == "unbuffered" && wal.writeMode == "default" { - return fmt.Errorf("walMode 'unbuffered' cannot have writeMode as 'default'") - } - - return nil -} diff --git a/main.go b/main.go index 39e24e691..495282c6d 100644 --- a/main.go +++ b/main.go @@ -32,6 +32,10 @@ import ( dstore "github.com/dicedb/dice/internal/store" ) +const ( + WALEngineAOF = "aof" +) + func main() { iid := observability.GetOrCreateInstanceID() config.DiceConfig.InstanceID = iid @@ -55,8 +59,8 @@ func main() { wl, _ = wal.NewNullWAL() if config.DiceConfig.Persistence.Enabled { - if config.DiceConfig.Persistence.WALEngine == "aof" { - _wl, err := wal.NewAOFWAL(config.DiceConfig.Persistence.WALDir) + if config.DiceConfig.Persistence.WALEngine == WALEngineAOF { + _wl, err := wal.NewAOFWAL(config.DiceConfig.WAL.LogDir) if err != nil { slog.Warn("could not create WAL with", slog.String("wal-engine", config.DiceConfig.Persistence.WALEngine), slog.Any("error", err)) sigs <- syscall.SIGKILL From 0d30b6851ad95b154360a80f402f842fceac94ff Mon Sep 17 00:00:00 2001 From: ayushsatyam146 Date: Thu, 12 Dec 2024 13:08:37 +0530 Subject: [PATCH 7/7] fix lint --- internal/wal/wal_aof.go | 1 - 1 file changed, 1 deletion(-) diff --git a/internal/wal/wal_aof.go b/internal/wal/wal_aof.go index c55ead590..a94add0cc 100644 --- a/internal/wal/wal_aof.go +++ b/internal/wal/wal_aof.go @@ -72,7 +72,6 @@ func NewAOFWAL(directory string) (*AOF, error) { } func (wal *AOF) Init(t time.Time) error { - // TODO - Restore existing checkpoints to memory // Create the directory if it doesn't exist