Skip to content

Commit

Permalink
feat: store size in each Put
Browse files Browse the repository at this point in the history
  • Loading branch information
fearlessfe authored and GrapeBaBa committed Dec 2, 2024
1 parent 7fd095c commit 1ac486b
Showing 1 changed file with 32 additions and 46 deletions.
78 changes: 32 additions & 46 deletions portalnetwork/storage/ethpepple/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import (
"bytes"
"encoding/binary"
"runtime"
"sync"
"sync/atomic"
"time"

"github.com/cockroachdb/pebble"
"github.com/cockroachdb/pebble/bloom"
Expand Down Expand Up @@ -123,10 +123,9 @@ type ContentStorage struct {
radius atomic.Value
log log.Logger
db *pebble.DB
size uint64
sizeChan chan struct{}
capacityChan chan uint64
size atomic.Uint64
writeOptions *pebble.WriteOptions
bytePool sync.Pool
}

func NewPeppleStorage(config PeppleStorageConfig) (storage.ContentStorage, error) {
Expand All @@ -135,9 +134,13 @@ func NewPeppleStorage(config PeppleStorageConfig) (storage.ContentStorage, error
db: config.DB,
storageCapacityInBytes: config.StorageCapacityMB * 1000_000,
log: log.New("storage", config.NetworkName),
sizeChan: make(chan struct{}, 1),
capacityChan: make(chan uint64, 100),
writeOptions: &pebble.WriteOptions{Sync: false},
bytePool: sync.Pool{
New: func() interface{} {
out := make([]byte, 8)
return &out
},
},
}
cs.radius.Store(storage.MaxDistance)

Expand All @@ -148,7 +151,7 @@ func NewPeppleStorage(config PeppleStorageConfig) (storage.ContentStorage, error
if err == nil {
size := binary.BigEndian.Uint64(val)
// init stage, no need to use lock
cs.size = size
cs.size.Store(size)
}

iter, err := cs.db.NewIter(nil)
Expand All @@ -165,9 +168,6 @@ func NewPeppleStorage(config PeppleStorageConfig) (storage.ContentStorage, error
}
cs.radius.Store(dis)
}

cs.sizeChan <- struct{}{}
go cs.saveCapacity()
return cs, nil
}

Expand Down Expand Up @@ -195,25 +195,33 @@ func (c *ContentStorage) Put(contentKey []byte, contentId []byte, content []byte
if !valid {
return storage.ErrInsufficientRadius
}
length := uint64(len(contentId)) + uint64(len(content))
newSize := c.size.Add(length)

buf := c.bytePool.Get().(*[]byte)
defer c.bytePool.Put(buf)
binary.BigEndian.PutUint64(*buf, newSize)
batch := c.db.NewBatch()

err = c.db.Set(distance, content, c.writeOptions)
err = batch.Set(storage.SizeKey, *buf, c.writeOptions)
if err != nil {
return err
}
err = batch.Set(distance, content, c.writeOptions)
if err != nil {
return err
}
err = batch.Commit(c.writeOptions)
if err != nil {
return err
}

length := uint64(len(contentId)) + uint64(len(content))
<-c.sizeChan
c.size += length
if c.size > c.storageCapacityInBytes {
if newSize > c.storageCapacityInBytes {
err := c.prune()
if err != nil {
c.sizeChan <- struct{}{}
return err
}
} else {
c.capacityChan <- c.size
}
c.sizeChan <- struct{}{}
return nil
}

Expand All @@ -224,31 +232,6 @@ func (c *ContentStorage) Radius() *uint256.Int {
return val
}

func (c *ContentStorage) saveCapacity() {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
capacityChanged := false
var currentCapacity uint64 = 0
buf := make([]byte, 8) // uint64

for {
select {
case <-ticker.C:
if capacityChanged {
binary.BigEndian.PutUint64(buf, currentCapacity)
err := c.db.Set(storage.SizeKey, buf, c.writeOptions)
if err != nil {
c.log.Error("save capacity failed", "error", err)
}
capacityChanged = false
}
case capacity := <-c.capacityChan:
capacityChanged = true
currentCapacity = capacity
}
}
}

func (c *ContentStorage) prune() error {
expectSize := uint64(float64(c.storageCapacityInBytes) * contentDeletionFraction)
var curentSize uint64 = 0
Expand Down Expand Up @@ -278,12 +261,15 @@ func (c *ContentStorage) prune() error {
break
}
}
newSize := c.size.Add(-curentSize)
buf := c.bytePool.Get().(*[]byte)
defer c.bytePool.Put(buf)
binary.BigEndian.PutUint64(*buf, newSize)
batch.Set(storage.SizeKey, *buf, c.writeOptions)
err = batch.Commit(&pebble.WriteOptions{Sync: true})
if err != nil {
return err
}
c.size -= curentSize
c.capacityChan <- c.size - curentSize
return nil
}

Expand Down

0 comments on commit 1ac486b

Please sign in to comment.