diff --git a/skiplist/skiplist.go b/skiplist/skiplist.go index 878dfbc..91a2133 100644 --- a/skiplist/skiplist.go +++ b/skiplist/skiplist.go @@ -88,12 +88,7 @@ func NewNode(level int, key, value []byte, ttl *time.Duration) *Node { // Size returns the size of the node in bytes func (n *Node) Size() int { - size := len(n.key) + len(n.value) + len(n.forward)*int(unsafe.Sizeof(uintptr(0))) - if n.ttl != nil { - size += int(unsafe.Sizeof(*n.ttl)) - } - - return size + return int(unsafe.Sizeof(*n)) + len(n.key) + len(n.value) } // IsExpired checks if the node is expired diff --git a/v2/k4.go b/v2/k4.go index 8e3056d..61fa4fe 100644 --- a/v2/k4.go +++ b/v2/k4.go @@ -35,9 +35,9 @@ import ( "encoding/gob" "fmt" "github.com/guycipher/k4/compressor" - "github.com/guycipher/k4/pager" - "github.com/guycipher/k4/skiplist" "github.com/guycipher/k4/v2/cuckoofilter" + "github.com/guycipher/k4/v2/pager" + "github.com/guycipher/k4/v2/skiplist" "log" "os" "sort" diff --git a/v2/k4_test.go b/v2/k4_test.go index 103b226..1fcb39b 100644 --- a/v2/k4_test.go +++ b/v2/k4_test.go @@ -139,9 +139,12 @@ func TestMemtableFlush(t *testing.T) { t.Fatalf("Failed to open K4: %v", err) } + totalBytes := 0 + for i := 0; i < 100; i++ { key := []byte("key" + fmt.Sprintf("%d", i)) value := []byte("value" + fmt.Sprintf("%d", i)) + totalBytes += len(key) + len(value) err = k4.Put(key, value, nil) if err != nil { @@ -375,7 +378,7 @@ func TestCompaction2(t *testing.T) { dir := setup(t) defer teardown(dir) - k4, err := Open(dir, 950*2, 1000, true, false) + k4, err := Open(dir, 950*2, 2000, true, false) if err != nil { t.Fatalf("Failed to open K4: %v", err) } diff --git a/v2/pager/pager.go b/v2/pager/pager.go new file mode 100644 index 0000000..96f8a54 --- /dev/null +++ b/v2/pager/pager.go @@ -0,0 +1,404 @@ +// Package pager +// BSD 3-Clause License +// +// Copyright (c) 2024, Alex Gaetano Padula +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this +// list of conditions and the following disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, +// this list of conditions and the following disclaimer in the documentation +// and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +// FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +// DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +// CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +// OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +package pager + +import ( + "bytes" + "os" + "strconv" + "sync" + "time" +) + +const PAGE_SIZE = 1024 // Page size +const HEADER_SIZE = 16 // next (overflowed) +const SYNC_TICK_INTERVAL = 1 * time.Second // tick interval for syncing the file +const SYNC_ESCALATION = 1 * time.Minute // if the file is not synced in 1 minute, sync it +const WRITE_THRESHOLD = 24576 // every 24576 writes, sync the file + +// Pager manages pages in a file +type Pager struct { + file *os.File // file to store pages + pageLocks map[int64]*sync.RWMutex // locks for pages + pageLocksLock *sync.RWMutex // lock for pagesLocks + lock *sync.RWMutex // lock for the pager + stopSync chan struct{} // channel to stop background sync + once sync.Once // used to start the periodic sync once + wg *sync.WaitGroup // wait group for the periodic sync goroutine + writeCounter int // counter for writes + lastSync time.Time // last time the file was synced +} + +// OpenPager opens a file for page management +func OpenPager(filename string, flag int, perm os.FileMode) (*Pager, error) { + file, err := os.OpenFile(filename, flag, perm) + if err != nil { + return nil, err + } + + pgLocks := make(map[int64]*sync.RWMutex) + + // Read the tree file and create locks for each page + stat, err := file.Stat() + if err != nil { + return nil, err + } + + for i := int64(0); i < stat.Size()/PAGE_SIZE; i++ { + pgLocks[i] = &sync.RWMutex{} + } + + pager := &Pager{file: file, pageLocks: pgLocks, pageLocksLock: &sync.RWMutex{}, lock: &sync.RWMutex{}, wg: &sync.WaitGroup{}} + + // we create a stop sync channel to stop the periodic sync when the pager is closed + pager.stopSync = make(chan struct{}) + + // we add the periodic sync goroutine to the wait group + pager.wg.Add(1) + + // we start the periodic sync goroutine + go pager.startPeriodicSync() + + return pager, nil +} + +// splitDataIntoChunks splits data into chunks of PAGE_SIZE +func splitDataIntoChunks(data []byte) [][]byte { + var chunks [][]byte + for i := 0; i < len(data); i += PAGE_SIZE { + end := i + PAGE_SIZE + + // Check if end is beyond the length of data + if end > len(data) { + end = len(data) + } + + chunks = append(chunks, data[i:end]) + } + return chunks +} + +// WriteTo writes data to a specific page +func (p *Pager) WriteTo(pageID int64, data []byte) error { + // lock the page + p.getPageLock(pageID).Lock() + defer p.getPageLock(pageID).Unlock() + + // the reason we are doing this is because we are going to write to the page thus having any overflowed pages which are linked to the page may not be needed + + // check if data is larger than the page size + if len(data) > PAGE_SIZE { + // create an array [][]byte + // each element is a page + + chunks := splitDataIntoChunks(data) + + // clear data to free up memory + data = nil + + headerBuffer := make([]byte, HEADER_SIZE) + + // We need to create pages for each chunk + // after index 0 + // the next page is the current page + 1 + + // index 0 would have the next page of index 1 index 1 would have the next page of index 2 + + for i, chunk := range chunks { + // check if we are at the last chunk + if i == len(chunks)-1 { + headerBuffer = make([]byte, HEADER_SIZE) + nextPage := pageID + 1 + copy(headerBuffer, strconv.FormatInt(nextPage, 10)) + + // if chunk is less than PAGE_SIZE, we need to pad it with null bytes + if len(chunk) < PAGE_SIZE { + chunk = append(chunk, make([]byte, PAGE_SIZE-len(chunk))...) + } + + // write the chunk to the file + _, err := p.file.WriteAt(append(headerBuffer, chunk...), pageID*(PAGE_SIZE+HEADER_SIZE)) + if err != nil { + return err + } + + } else { + // update the header + headerBuffer = make([]byte, HEADER_SIZE) + nextPage := pageID + 1 + copy(headerBuffer, strconv.FormatInt(nextPage, 10)) + + if len(chunk) < PAGE_SIZE { + chunk = append(chunk, make([]byte, PAGE_SIZE-len(chunk))...) + } + + // write the chunk to the file + _, err := p.file.WriteAt(append(headerBuffer, chunk...), pageID*(PAGE_SIZE+HEADER_SIZE)) + if err != nil { + return err + } + + // update the pageID + pageID = nextPage + + } + } + + } else { + // create a buffer to store the header + headerBuffer := make([]byte, HEADER_SIZE) + + // set the next page to -1 + copy(headerBuffer, "-1") + + // if data is less than PAGE_SIZE, we need to pad it with null bytes + if len(data) < PAGE_SIZE { + data = append(data, make([]byte, PAGE_SIZE-len(data))...) + } + + // write the data to the file + _, err := p.file.WriteAt(append(headerBuffer, data...), (PAGE_SIZE+HEADER_SIZE)*pageID) + if err != nil { + return err + } + + } + + return nil +} + +// getPageLock gets the lock for a page +func (p *Pager) getPageLock(pageID int64) *sync.RWMutex { + // Lock the mutex that protects the PageLocks map + p.pageLocksLock.Lock() + defer p.pageLocksLock.Unlock() + + // Used for page level locking + // This is decent for concurrent reads and writes + if lock, ok := p.pageLocks[pageID]; ok { + return lock + } else { + // Create a new lock + p.pageLocks[pageID] = &sync.RWMutex{} + return p.pageLocks[pageID] + } +} + +// Write writes data to the next available page +func (p *Pager) Write(data []byte) (int64, error) { + // lock the pager + p.lock.Lock() + defer p.lock.Unlock() + + p.writeCounter++ + + // get the current file size + fileInfo, err := p.file.Stat() + if err != nil { + return -1, err + } + + if fileInfo.Size() == 0 { + + err = p.WriteTo(0, data) + if err != nil { + return -1, err + } + + return 0, nil + } + + // create a new page + pageId := fileInfo.Size() / (PAGE_SIZE + HEADER_SIZE) + + err = p.WriteTo(pageId, data) + if err != nil { + return -1, err + } + + return pageId, nil + +} + +// Close closes the file +func (p *Pager) Close() error { + p.stopSync <- struct{}{} // stop the periodic sync goroutine + + // we wait for the periodic sync goroutine to complete + p.wg.Wait() + + // Ensure all pending writes are flushed to disk + if err := p.file.Sync(); err != nil { + return err + } + + if p != nil { + return p.file.Close() + } + + return nil +} + +// GetPage gets a page and returns the data +// Will gather all the pages that are linked together +func (p *Pager) GetPage(pageID int64) ([]byte, error) { + + // lock the page + p.getPageLock(pageID).Lock() + defer p.getPageLock(pageID).Unlock() + + result := make([]byte, 0) + + // get the page + dataPHeader := make([]byte, PAGE_SIZE+HEADER_SIZE) + + if pageID == 0 { + + _, err := p.file.ReadAt(dataPHeader, 0) + if err != nil { + return nil, err + } + } else { + + _, err := p.file.ReadAt(dataPHeader, pageID*(PAGE_SIZE+HEADER_SIZE)) + if err != nil { + return nil, err + } + } + + // get header + header := dataPHeader[:HEADER_SIZE] + data := dataPHeader[HEADER_SIZE:] + + // remove the null bytes + header = bytes.Trim(header, "\x00") + + // append the data to the result + result = append(result, data...) + + // get the next page + nextPage, err := strconv.ParseInt(string(header), 10, 64) + if err != nil { + return nil, err + } + + if nextPage == -1 { + return result, nil + + } + + for { + + dataPHeader = make([]byte, PAGE_SIZE+HEADER_SIZE) + + _, err := p.file.ReadAt(dataPHeader, nextPage*(PAGE_SIZE+HEADER_SIZE)) + if err != nil { + break + } + + // get header + header = dataPHeader[:HEADER_SIZE] + data = dataPHeader[HEADER_SIZE:] + + // remove the null bytes + header = bytes.Trim(header, "\x00") + //data = bytes.Trim(data, "\x00") + + // append the data to the result + result = append(result, data...) + + // get the next page + nextPage, err = strconv.ParseInt(string(header), 10, 64) + if err != nil || nextPage == -1 { + break + } + + } + + return result, nil +} + +// Size returns the size of the file +func (p *Pager) Size() int64 { + if p == nil { + return 0 + } + + stat, _ := p.file.Stat() + return stat.Size() +} + +// Count returns the number of pages +func (p *Pager) Count() int64 { + return p.Size() / (PAGE_SIZE + HEADER_SIZE) +} + +// FileName returns the name of the file +func (p *Pager) FileName() string { + return p.file.Name() +} + +// startPeriodicSync ticks and checks if the file needs to be synced +// if the file is not synced from the amount of writes, we escalate the sync based on SYNC_ESCALATION +func (p *Pager) startPeriodicSync() { + defer p.wg.Done() // defer completion of the wait group + + // start the periodic sync + p.once.Do(func() { + + // start the ticker + ticker := time.NewTicker(SYNC_TICK_INTERVAL) + defer ticker.Stop() // defer stopping the ticker + + for { + select { + case <-ticker.C: // check if the file needs to be synced + if p.writeCounter < WRITE_THRESHOLD { // if the amount of writes is less than the threshold + + if time.Since(p.lastSync) < SYNC_ESCALATION { // check if the file is synced in SYNC_ESCALATION + continue + } // if the file is not synced in SYNC_ESCALATION, sync it + + } + err := p.file.Sync() + if err != nil { + return + } + p.lock.Lock() // lock the pager + p.writeCounter = 0 // reset the write counter + p.lastSync = time.Now() // update the last sync time + p.lock.Unlock() // unlock the pager + case <-p.stopSync: + return + } + } + }) +} diff --git a/v2/pager/pager_test.go b/v2/pager/pager_test.go new file mode 100644 index 0000000..1e195cf --- /dev/null +++ b/v2/pager/pager_test.go @@ -0,0 +1,188 @@ +// Package pager tests +// BSD 3-Clause License +// +// Copyright (c) 2024, Alex Gaetano Padula +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this +// list of conditions and the following disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, +// this list of conditions and the following disclaimer in the documentation +// and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +// FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +// DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +// CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +// OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +package pager + +import ( + "bytes" + "os" + "sync" + "testing" +) + +func TestOpenPager(t *testing.T) { + file, err := os.CreateTemp("", "pager_test") + if err != nil { + t.Fatalf("Failed to create temp file: %v", err) + } + defer os.Remove(file.Name()) + + p, err := OpenPager(file.Name(), os.O_RDWR|os.O_CREATE, 0666) + if err != nil { + t.Fatalf("Failed to open pager: %v", err) + } + if p == nil { + t.Fatalf("Pager is nil") + } + if err := p.Close(); err != nil { + t.Fatalf("Failed to close pager: %v", err) + } +} + +func TestWriteAndGetPage(t *testing.T) { + file, err := os.CreateTemp("", "pager_test") + if err != nil { + t.Fatalf("Failed to create temp file: %v", err) + } + defer os.Remove(file.Name()) + + p, err := OpenPager(file.Name(), os.O_RDWR|os.O_CREATE, 0666) + if err != nil { + t.Fatalf("Failed to open pager: %v", err) + } + defer p.Close() + + data := []byte("Hello, World!") + pageID, err := p.Write(data) + if err != nil { + t.Fatalf("Failed to write data: %v", err) + } + if pageID != 0 { + t.Fatalf("Expected pageID 0, got %d", pageID) + } + + readData, err := p.GetPage(pageID) + if err != nil { + t.Fatalf("Failed to get page: %v", err) + } + if !bytes.Equal(data, bytes.Trim(readData, "\x00")) { + t.Fatalf("Expected %s, got %s", data, readData) + } +} + +func TestWriteToMultiplePages(t *testing.T) { + file, err := os.CreateTemp("", "pager_test") + if err != nil { + t.Fatalf("Failed to create temp file: %v", err) + } + defer os.Remove(file.Name()) + + p, err := OpenPager(file.Name(), os.O_RDWR|os.O_CREATE, 0666) + if err != nil { + t.Fatalf("Failed to open pager: %v", err) + } + defer p.Close() + + data := make([]byte, PAGE_SIZE*2) + str := "Hello, World!" + + for i := len(data) - len(str); i < len(data); i += len(str) { + copy(data[i:], str) + } + + pageID, err := p.Write(data) + if err != nil { + t.Fatalf("Failed to write data: %v", err) + } + + // Read back the first page + readData, err := p.GetPage(pageID) + if err != nil { + t.Fatalf("Failed to get first page: %v", err) + } + + // end of readData should be str + if !bytes.Equal([]byte(str), readData[len(readData)-len(str):]) { + t.Fatalf("Expected %s, got %s", str, readData[len(readData)-len(str):]) + + } + +} + +func TestPagerSizeAndCount(t *testing.T) { + file, err := os.CreateTemp("", "pager_test") + if err != nil { + t.Fatalf("Failed to create temp file: %v", err) + } + defer os.Remove(file.Name()) + + p, err := OpenPager(file.Name(), os.O_RDWR|os.O_CREATE, 0666) + if err != nil { + t.Fatalf("Failed to open pager: %v", err) + } + defer p.Close() + + data := []byte("Hello, World!") + _, err = p.Write(data) + if err != nil { + t.Fatalf("Failed to write data: %v", err) + } + + expectedSize := int64(PAGE_SIZE + HEADER_SIZE) + if p.Size() != expectedSize { + t.Fatalf("Expected size %d, got %d", expectedSize, p.Size()) + } + if p.Count() != 1 { + t.Fatalf("Expected count 1, got %d", p.Count()) + } +} + +func TestPagerConcurrency(t *testing.T) { + file, err := os.CreateTemp("", "pager_test") + if err != nil { + t.Fatalf("Failed to create temp file: %v", err) + } + defer os.Remove(file.Name()) + + p, err := OpenPager(file.Name(), os.O_RDWR|os.O_CREATE, 0666) + if err != nil { + t.Fatalf("Failed to open pager: %v", err) + } + defer p.Close() + + data := []byte("Hello, World!") + var wg sync.WaitGroup + + for i := 0; i < 10; i++ { + wg.Add(1) + go func() { + defer wg.Done() + _, err := p.Write(data) + if err != nil { + t.Errorf("Failed to write data: %v", err) + } + }() + } + + wg.Wait() + if p.Count() != 10 { + t.Fatalf("Expected count 10, got %d", p.Count()) + } +} diff --git a/v2/skiplist/skiplist.go b/v2/skiplist/skiplist.go new file mode 100644 index 0000000..91a2133 --- /dev/null +++ b/v2/skiplist/skiplist.go @@ -0,0 +1,384 @@ +// Package skiplist +// BSD 3-Clause License +// +// Copyright (c) 2024, Alex Gaetano Padula +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this +// list of conditions and the following disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, +// this list of conditions and the following disclaimer in the documentation +// and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +// FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +// DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +// CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +// OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +package skiplist + +import ( + "bytes" + "math/rand" + "time" + "unsafe" +) + +const TOMBSTONE_VALUE = "$tombstone" // This is specific to k4 + +// Node represents a node in the skip list +type Node struct { + key []byte // Binary key + value []byte // Binary value + forward []*Node // Forward pointers + ttl *time.Time // Time to live for the node, can be nil +} + +// SkipList represents a skip list +type SkipList struct { + header *Node // Header node + level int // Current maximum level + size int // in bytes + maxLevel int // Maximum level + p float64 // Probability +} + +// SkipListIterator represents an iterator for the skip list +type SkipListIterator struct { + skipList *SkipList // Skip list + current *Node // Current node +} + +// Iterator represents an iterator for the skip list +type Iterator interface { + Next() bool // Move to the next node + Prev() bool // Move to the previous node + HasNext() bool // Check if there is a next node + HasPrev() bool // Check if there is a previous node + Current() ([]byte, []byte) // Get the current key and value +} + +// NewNode creates a new node +func NewNode(level int, key, value []byte, ttl *time.Duration) *Node { + var expiration *time.Time // Expiration time + if ttl != nil { + exp := time.Now().Add(*ttl) + expiration = &exp + } + return &Node{ + key: key, + value: value, + forward: make([]*Node, level+1), + ttl: expiration, + } +} + +// Size returns the size of the node in bytes +func (n *Node) Size() int { + return int(unsafe.Sizeof(*n)) + len(n.key) + len(n.value) +} + +// IsExpired checks if the node is expired +func (n *Node) IsExpired() bool { + if n.ttl == nil { + return false + } + return time.Now().After(*n.ttl) +} + +// NewSkipList creates a new skip list +func NewSkipList(maxLevel int, p float64) *SkipList { + rand.Seed(time.Now().UnixNano()) + return &SkipList{ + header: NewNode(maxLevel, nil, nil, nil), + level: 0, + size: 0, + maxLevel: maxLevel, + p: p, + } +} + +// randomLevel generates a random level for the new node +func (sl *SkipList) randomLevel() int { + level := 0 + for rand.Float64() < sl.p && level < sl.maxLevel { + level++ + } + return level +} + +// Insert inserts a new key-value pair into the skip list +func (sl *SkipList) Insert(key, value []byte, ttl *time.Duration) { + update := make([]*Node, sl.maxLevel+1) + current := sl.header + + // Traverse the skip list to find the position of the key + for i := sl.level; i >= 0; i-- { + for current.forward[i] != nil && (string(current.forward[i].key) < string(key) || bytes.Equal(current.forward[i].value, []byte(TOMBSTONE_VALUE))) { + if current.forward[i].IsExpired() || bytes.Equal(current.forward[i].value, []byte(TOMBSTONE_VALUE)) { + + // If expired we will set tombstone and delete later + if current.IsExpired() { + current.ttl = nil + + // Subtraction of the size of the old value + sl.size -= current.Size() + // Mark as tombstone + current.value = []byte(TOMBSTONE_VALUE) + + // Add the size of the tombstone + sl.size += current.Size() + } + + // Skip nodes with tombstone values + current = current.forward[i] + } else { + current = current.forward[i] + } + } + update[i] = current + } + + // Check if the key exists + current = current.forward[0] + if current != nil && string(current.key) == string(key) { + // Key exists, update the value + sl.size -= current.Size() // Subtract the size of the old value + current.value = value + if ttl != nil { + exp := time.Now().Add(*ttl) + current.ttl = &exp + } else { + current.ttl = nil + } + sl.size += current.Size() // Add the size of the new value + return + } + + // Key does not exist, proceed with insertion + level := sl.randomLevel() + if level > sl.level { + for i := sl.level + 1; i <= level; i++ { + update[i] = sl.header + } + sl.level = level + } + + newNode := NewNode(level, key, value, ttl) + for i := 0; i <= level; i++ { + newNode.forward[i] = update[i].forward[i] + update[i].forward[i] = newNode + } + + sl.size += newNode.Size() +} + +// Delete deletes a key from the skip list +func (sl *SkipList) Delete(key []byte) { + update := make([]*Node, sl.maxLevel+1) + current := sl.header + + for i := sl.level; i >= 0; i-- { + for current.forward[i] != nil && (string(current.forward[i].key) < string(key) || bytes.Equal(current.forward[i].value, []byte(TOMBSTONE_VALUE))) { + if current.forward[i].IsExpired() || bytes.Equal(current.forward[i].value, []byte(TOMBSTONE_VALUE)) { + // Skip nodes with tombstone values + // If expired we will set tombstone and delete later + if current.IsExpired() { + current.ttl = nil + + // Subtraction of the size of the old value + sl.size -= current.Size() + // Mark as tombstone + current.value = []byte(TOMBSTONE_VALUE) + + // Add the size of the tombstone + sl.size += current.Size() + } + current = current.forward[i] + } else { + current = current.forward[i] + } + } + update[i] = current + } + + current = current.forward[0] + if current != nil && string(current.key) == string(key) { + for i := 0; i <= sl.level; i++ { + if update[i].forward[i] != current { + break + } + update[i].forward[i] = current.forward[i] + } + + sl.size -= current.Size() + + for sl.level > 0 && sl.header.forward[sl.level] == nil { + sl.level-- + } + } +} + +// Search searches for a key in the skip list +func (sl *SkipList) Search(key []byte) ([]byte, bool) { + current := sl.header + for i := sl.level; i >= 0; i-- { + for current.forward[i] != nil && (string(current.forward[i].key) < string(key) || bytes.Equal(current.forward[i].value, []byte(TOMBSTONE_VALUE))) { + if current.forward[i].IsExpired() || bytes.Equal(current.forward[i].value, []byte(TOMBSTONE_VALUE)) { + if current.IsExpired() { + current.ttl = nil + + // Subtraction of the size of the old value + sl.size -= current.Size() + // Mark as tombstone + current.value = []byte(TOMBSTONE_VALUE) + + // Add the size of the tombstone + sl.size += current.Size() + } + + // Skip nodes with tombstone values + current = current.forward[i] + } else { + current = current.forward[i] + } + } + } + current = current.forward[0] + if current != nil && string(current.key) == string(key) { + if current.IsExpired() || bytes.Equal(current.value, []byte(TOMBSTONE_VALUE)) { + return nil, false + } + return current.value, true + } + return nil, false +} + +// NewIterator creates a new iterator for the skip list +func NewIterator(sl *SkipList) *SkipListIterator { + return &SkipListIterator{ + skipList: sl, + current: sl.header, + } +} + +// Next moves the iterator to the next node +func (it *SkipListIterator) Next() bool { + for it.current.forward[0] != nil { + it.current = it.current.forward[0] + if !it.current.IsExpired() { + return true + } + //it.skipList.Delete(it.current.key) + //sl.Delete(current.key) + // We mark as tombstone and delete later + it.current.ttl = nil + + // Subtraction of the size of the old value + it.skipList.size -= it.current.Size() + // Mark as tombstone + it.current.value = []byte(TOMBSTONE_VALUE) + + // Add the size of the tombstone + it.skipList.size += it.current.Size() + } + return false +} + +// Prev moves the iterator to the previous node +func (it *SkipListIterator) Prev() bool { + current := it.skipList.header + for i := it.skipList.level; i >= 0; i-- { + for current.forward[i] != nil && (string(current.forward[i].key) < string(it.current.key) || bytes.Equal(current.forward[i].value, []byte(TOMBSTONE_VALUE))) { + if current.forward[i].IsExpired() || bytes.Equal(current.forward[i].value, []byte(TOMBSTONE_VALUE)) { + // Skip nodes with tombstone values + current = current.forward[i] + } else { + current = current.forward[i] + } + } + } + if current != it.skipList.header { + it.current = current + return true + } + return false + +} + +// HasNext checks if there is a next node +func (it *SkipListIterator) HasNext() bool { + next := it.current.forward[0] + for next != nil && next.IsExpired() { + it.skipList.Delete(next.key) + next = it.current.forward[0] + } + return next != nil +} + +// HasPrev checks if there is a previous node +func (it *SkipListIterator) HasPrev() bool { + return it.current != it.skipList.header +} + +// Current returns the current key and value +func (it *SkipListIterator) Current() ([]byte, []byte, *time.Duration) { + if it.current == it.skipList.header || it.current.IsExpired() { + // If expired we will set tombstone and delete later + if it.current.IsExpired() { + it.current.ttl = nil + + // Subtraction of the size of the old value + it.skipList.size -= it.current.Size() + // Mark as tombstone + it.current.value = []byte(TOMBSTONE_VALUE) + + // Add the size of the tombstone + it.skipList.size += it.current.Size() + } + return nil, nil, nil + } + return it.current.key, it.current.value, timeToDuration(it.current.ttl) +} + +// timeToDuration converts a time.Time to a time.Duration +func timeToDuration(t *time.Time) *time.Duration { + if t == nil { + return nil + } + + duration := t.Sub(time.Now()) // Calculate the remaining time + return &duration + +} + +// Size returns the size of the skip list +func (sl *SkipList) Size() int { + return sl.size +} + +// Copy creates a copy of the skip list +func (sl *SkipList) Copy() *SkipList { + newSkipList := NewSkipList(sl.maxLevel, sl.p) + it := NewIterator(sl) + for it.Next() { + key, value, ttl := it.Current() + if value != nil && !bytes.Equal(value, []byte(TOMBSTONE_VALUE)) { + newSkipList.Insert(key, value, ttl) + } + } + return newSkipList +} diff --git a/v2/skiplist/skiplist_test.go b/v2/skiplist/skiplist_test.go new file mode 100644 index 0000000..f98491d --- /dev/null +++ b/v2/skiplist/skiplist_test.go @@ -0,0 +1,170 @@ +// Package skiplist tests +// BSD 3-Clause License +// +// Copyright (c) 2024, Alex Gaetano Padula +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this +// list of conditions and the following disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, +// this list of conditions and the following disclaimer in the documentation +// and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +// FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +// DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +// CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +// OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +package skiplist + +import ( + "testing" + "time" +) + +func TestNewSkipList(t *testing.T) { + sl := NewSkipList(12, 0.25) + if sl == nil { + t.Fatal("Expected new skip list to be created") + } + if sl.level != 0 { + t.Fatalf("Expected level to be 0, got %d", sl.level) + } + if sl.size != 0 { + t.Fatalf("Expected size to be 0, got %d", sl.size) + } +} + +func TestInsertAndSearch(t *testing.T) { + sl := NewSkipList(12, 0.25) + key := []byte("key1") + value := []byte("value1") + sl.Insert(key, value, nil) + + val, found := sl.Search(key) + if !found { + t.Fatal("Expected to find the key") + } + if string(val) != string(value) { + t.Fatalf("Expected value %s, got %s", value, val) + } +} + +func TestInsertWithTTL(t *testing.T) { + sl := NewSkipList(12, 0.25) + key := []byte("key1") + value := []byte("value1") + ttl := 1 * time.Second + sl.Insert(key, value, &ttl) + + time.Sleep(2 * time.Second) + _, found := sl.Search(key) + if found { + t.Fatal("Expected key to be expired and not found") + } +} + +func TestDelete(t *testing.T) { + sl := NewSkipList(12, 0.25) + key := []byte("key1") + value := []byte("value1") + sl.Insert(key, value, nil) + + sl.Delete(key) + _, found := sl.Search(key) + if found { + t.Fatal("Expected key to be deleted") + } +} + +func TestSize(t *testing.T) { + sl := NewSkipList(12, 0.25) + key := []byte("key1") + value := []byte("value1") + sl.Insert(key, value, nil) + + // Correct the expected size calculation + expectedSize := len(key) + len(value) + (sl.level+1)*8 + if sl.Size() != expectedSize { + t.Fatalf("Expected size %d, got %d", expectedSize, sl.Size()) + } +} + +func TestIterator(t *testing.T) { + sl := NewSkipList(12, 0.25) + key1 := []byte("key1") + value1 := []byte("value1") + key2 := []byte("key2") + value2 := []byte("value2") + sl.Insert(key1, value1, nil) + sl.Insert(key2, value2, nil) + + it := NewIterator(sl) + if !it.Next() { + t.Fatal("Expected iterator to move to the first element") + } + k, v, _ := it.Current() + if string(k) != string(key1) || string(v) != string(value1) { + t.Fatalf("Expected key %s and value %s, got key %s and value %s", key1, value1, k, v) + } + if !it.Next() { + t.Fatal("Expected iterator to move to the second element") + } + k, v, _ = it.Current() + if string(k) != string(key2) || string(v) != string(value2) { + t.Fatalf("Expected key %s and value %s, got key %s and value %s", key2, value2, k, v) + } + if it.Next() { + t.Fatal("Expected iterator to be at the end") + } +} + +func TestSearchNil(t *testing.T) { + sl := NewSkipList(12, 0.25) + key := []byte("key1") + + _, found := sl.Search(key) + if found { + t.Fatal("Expected key to not be found") + } + + // and again, just in case + _, found = sl.Search(key) + if found { + t.Fatal("Expected key to not be found") + } +} + +func TestInsertTombstone(t *testing.T) { + sl := NewSkipList(12, 0.25) + key := []byte("key1") + value := []byte("$tombstone") + + sl.Insert(key, value, nil) + _, found := sl.Search(key) + if found { + t.Fatal("Expected key to be deleted") + } + + key = []byte("key2") + value = []byte("$tombstone") + + sl.Insert(key, value, nil) + _, found = sl.Search(key) + if found { + t.Fatal("Expected key to be deleted") + } +}