Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use pebble to replace ethdb/pebble #233

Merged
merged 8 commits into from
Dec 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion portalnetwork/storage/content_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ import (
)

var ErrContentNotFound = fmt.Errorf("content not found")
var ErrInsufficientRadius = fmt.Errorf("insufficient radius")

var MaxDistance = uint256.MustFromHex("0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff")

type ContentType byte

var RadisuKey = []byte("radius")
var SizeKey = []byte("size")

type ContentKey struct {
Expand Down
255 changes: 238 additions & 17 deletions portalnetwork/storage/ethpepple/storage.go
Original file line number Diff line number Diff line change
@@ -1,37 +1,131 @@
package ethpepple

import (
"bytes"
"encoding/binary"
"runtime"
"sync"
"sync/atomic"

"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/ethdb/pebble"
"github.com/cockroachdb/pebble"
"github.com/cockroachdb/pebble/bloom"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/portalnetwork/storage"
"github.com/holiman/uint256"
)

const (
// minCache is the minimum amount of memory in megabytes to allocate to pebble
// read and write caching, split half and half.
minCache = 16

// minHandles is the minimum number of files handles to allocate to the open
// database files.
minHandles = 16

// 5% of the content will be deleted when the storage capacity is hit and radius gets adjusted.
contentDeletionFraction = 0.05
)

var _ storage.ContentStorage = &ContentStorage{}

type PeppleStorageConfig struct {
StorageCapacityMB uint64
DB ethdb.KeyValueStore
DB *pebble.DB
NodeId enode.ID
NetworkName string
}

func NewPeppleDB(dataDir string, cache, handles int, namespace string) (ethdb.KeyValueStore, error) {
db, err := pebble.New(dataDir+"/"+namespace, cache, handles, namespace, false)
func NewPeppleDB(dataDir string, cache, handles int, namespace string) (*pebble.DB, error) {
// Ensure we have some minimal caching and file guarantees
if cache < minCache {
cache = minCache
}
if handles < minHandles {
handles = minHandles
}
logger := log.New("database", namespace)
logger.Info("Allocated cache and file handles", "cache", common.StorageSize(cache*1024*1024), "handles", handles)

// The max memtable size is limited by the uint32 offsets stored in
// internal/arenaskl.node, DeferredBatchOp, and flushableBatchEntry.
//
// - MaxUint32 on 64-bit platforms;
// - MaxInt on 32-bit platforms.
//
// It is used when slices are limited to Uint32 on 64-bit platforms (the
// length limit for slices is naturally MaxInt on 32-bit platforms).
//
// Taken from https://github.com/cockroachdb/pebble/blob/master/internal/constants/constants.go
maxMemTableSize := (1<<31)<<(^uint(0)>>63) - 1

// Two memory tables is configured which is identical to leveldb,
// including a frozen memory table and another live one.
memTableLimit := 2
memTableSize := cache * 1024 * 1024 / 2 / memTableLimit

// The memory table size is currently capped at maxMemTableSize-1 due to a
// known bug in the pebble where maxMemTableSize is not recognized as a
// valid size.
//
// TODO use the maxMemTableSize as the maximum table size once the issue
// in pebble is fixed.
if memTableSize >= maxMemTableSize {
memTableSize = maxMemTableSize - 1
}
opt := &pebble.Options{
// Pebble has a single combined cache area and the write
// buffers are taken from this too. Assign all available
// memory allowance for cache.
Cache: pebble.NewCache(int64(cache * 1024 * 1024)),
MaxOpenFiles: handles,

// The size of memory table(as well as the write buffer).
// Note, there may have more than two memory tables in the system.
MemTableSize: uint64(memTableSize),

// MemTableStopWritesThreshold places a hard limit on the size
// of the existent MemTables(including the frozen one).
// Note, this must be the number of tables not the size of all memtables
// according to https://github.com/cockroachdb/pebble/blob/master/options.go#L738-L742
// and to https://github.com/cockroachdb/pebble/blob/master/db.go#L1892-L1903.
MemTableStopWritesThreshold: memTableLimit,

// The default compaction concurrency(1 thread),
// Here use all available CPUs for faster compaction.
MaxConcurrentCompactions: runtime.NumCPU,

// Per-level options. Options for at least one level must be specified. The
// options for the last level are used for all subsequent levels.
Levels: []pebble.LevelOptions{
{TargetFileSize: 2 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)},
{TargetFileSize: 4 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)},
{TargetFileSize: 8 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)},
{TargetFileSize: 16 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)},
{TargetFileSize: 32 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)},
{TargetFileSize: 64 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)},
{TargetFileSize: 128 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)},
},
ReadOnly: false,
}
// Disable seek compaction explicitly. Check https://github.com/ethereum/go-ethereum/pull/20130
// for more details.
opt.Experimental.ReadSamplingMultiplier = -1
db, err := pebble.Open(dataDir+"/"+namespace, opt)
return db, err
}

type ContentStorage struct {
nodeId enode.ID
storageCapacityInBytes uint64
radius atomic.Value
// size uint64
log log.Logger
db ethdb.KeyValueStore
log log.Logger
db *pebble.DB
size atomic.Uint64
writeOptions *pebble.WriteOptions
bytePool sync.Pool
}

func NewPeppleStorage(config PeppleStorageConfig) (storage.ContentStorage, error) {
Expand All @@ -40,19 +134,35 @@ func NewPeppleStorage(config PeppleStorageConfig) (storage.ContentStorage, error
db: config.DB,
storageCapacityInBytes: config.StorageCapacityMB * 1000_000,
log: log.New("storage", config.NetworkName),
writeOptions: &pebble.WriteOptions{Sync: false},
bytePool: sync.Pool{
New: func() interface{} {
out := make([]byte, 8)
return &out
},
},
}
cs.radius.Store(storage.MaxDistance)
exist, err := cs.db.Has(storage.RadisuKey)

val, _, err := cs.db.Get(storage.SizeKey)
if err != nil && err != pebble.ErrNotFound {
return nil, err
}
if err == nil {
size := binary.BigEndian.Uint64(val)
// init stage, no need to use lock
cs.size.Store(size)
}

iter, err := cs.db.NewIter(nil)
if err != nil {
return nil, err
}
if exist {
radius, err := cs.db.Get(storage.RadisuKey)
if err != nil {
return nil, err
}
defer iter.Close()
if iter.Last() && iter.Valid() {
distance := iter.Key()
dis := uint256.NewInt(0)
err = dis.UnmarshalSSZ(radius)
err = dis.UnmarshalSSZ(distance)
if err != nil {
return nil, err
}
Expand All @@ -63,12 +173,56 @@ func NewPeppleStorage(config PeppleStorageConfig) (storage.ContentStorage, error

// Get implements storage.ContentStorage.
func (c *ContentStorage) Get(contentKey []byte, contentId []byte) ([]byte, error) {
return c.db.Get(contentId)
distance := xor(contentId, c.nodeId[:])
data, closer, err := c.db.Get(distance)
if err != nil && err != pebble.ErrNotFound {
return nil, err
}
if err == pebble.ErrNotFound {
return nil, storage.ErrContentNotFound
}
closer.Close()
return data, nil
}

// Put implements storage.ContentStorage.
func (c *ContentStorage) Put(contentKey []byte, contentId []byte, content []byte) error {
return c.db.Put(contentId, content)
distance := xor(contentId, c.nodeId[:])
valid, err := c.inRadius(distance)
if err != nil {
return err
}
if !valid {
return storage.ErrInsufficientRadius
}
length := uint64(len(contentId)) + uint64(len(content))
newSize := c.size.Add(length)

buf := c.bytePool.Get().(*[]byte)
defer c.bytePool.Put(buf)
binary.BigEndian.PutUint64(*buf, newSize)
batch := c.db.NewBatch()

err = batch.Set(storage.SizeKey, *buf, c.writeOptions)
if err != nil {
return err
}
err = batch.Set(distance, content, c.writeOptions)
if err != nil {
return err
}
err = batch.Commit(c.writeOptions)
if err != nil {
return err
}

if newSize > c.storageCapacityInBytes {
err := c.prune()
if err != nil {
return err
}
}
return nil
}

// Radius implements storage.ContentStorage.
Expand All @@ -77,3 +231,70 @@ func (c *ContentStorage) Radius() *uint256.Int {
val := radius.(*uint256.Int)
return val
}

func (c *ContentStorage) prune() error {
expectSize := uint64(float64(c.storageCapacityInBytes) * contentDeletionFraction)
var curentSize uint64 = 0

// get the keys to be deleted order by distance desc
iter, err := c.db.NewIter(nil)
if err != nil {
return err
}

batch := c.db.NewBatch()
for iter.Last(); iter.Valid(); iter.Prev() {
if bytes.Equal(iter.Key(), storage.SizeKey) {
continue
}
if curentSize < expectSize {
batch.Delete(iter.Key(), nil)
curentSize += uint64(len(iter.Key())) + uint64(len(iter.Value()))
} else {
distance := iter.Key()
dis := uint256.NewInt(0)
err = dis.UnmarshalSSZ(distance)
if err != nil {
return err
}
c.radius.Store(dis)
break
}
}
newSize := c.size.Add(-curentSize)
buf := c.bytePool.Get().(*[]byte)
defer c.bytePool.Put(buf)
binary.BigEndian.PutUint64(*buf, newSize)
batch.Set(storage.SizeKey, *buf, c.writeOptions)
err = batch.Commit(&pebble.WriteOptions{Sync: true})
if err != nil {
return err
}
return nil
}

func (c *ContentStorage) inRadius(distance []byte) (bool, error) {
dis := uint256.NewInt(0)
err := dis.UnmarshalSSZ(distance)
if err != nil {
return false, err
}
val := c.radius.Load()
radius := val.(*uint256.Int)
return radius.Gt(dis), nil
}

func xor(contentId, nodeId []byte) []byte {
// length of contentId maybe not 32bytes
padding := make([]byte, 32)
if len(contentId) != len(nodeId) {
copy(padding, contentId)
} else {
padding = contentId
}
res := make([]byte, len(padding))
for i := range padding {
res[i] = padding[i] ^ nodeId[i]
}
return res
}
Loading