diff --git a/portalnetwork/storage/ethpepple/storage.go b/portalnetwork/storage/ethpepple/storage.go index ad4bae90be27..b943914690cc 100644 --- a/portalnetwork/storage/ethpepple/storage.go +++ b/portalnetwork/storage/ethpepple/storage.go @@ -1,27 +1,119 @@ package ethpepple import ( + "encoding/binary" + "runtime" + "sync" "sync/atomic" + "time" - "github.com/ethereum/go-ethereum/ethdb" - "github.com/ethereum/go-ethereum/ethdb/pebble" + "github.com/cockroachdb/pebble" + "github.com/cockroachdb/pebble/bloom" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/portalnetwork/storage" "github.com/holiman/uint256" ) +const ( + // minCache is the minimum amount of memory in megabytes to allocate to pebble + // read and write caching, split half and half. + minCache = 16 + + // minHandles is the minimum number of files handles to allocate to the open + // database files. + minHandles = 16 + + // 5% of the content will be deleted when the storage capacity is hit and radius gets adjusted. + contentDeletionFraction = 0.05 +) + var _ storage.ContentStorage = &ContentStorage{} type PeppleStorageConfig struct { StorageCapacityMB uint64 - DB ethdb.KeyValueStore + DB *pebble.DB NodeId enode.ID NetworkName string } -func NewPeppleDB(dataDir string, cache, handles int, namespace string) (ethdb.KeyValueStore, error) { - db, err := pebble.New(dataDir+"/"+namespace, cache, handles, namespace, false) +func NewPeppleDB(dataDir string, cache, handles int, namespace string) (*pebble.DB, error) { + // Ensure we have some minimal caching and file guarantees + if cache < minCache { + cache = minCache + } + if handles < minHandles { + handles = minHandles + } + logger := log.New("database", namespace) + logger.Info("Allocated cache and file handles", "cache", common.StorageSize(cache*1024*1024), "handles", handles) + + // The max memtable size is limited by the uint32 offsets stored in + // internal/arenaskl.node, DeferredBatchOp, and flushableBatchEntry. + // + // - MaxUint32 on 64-bit platforms; + // - MaxInt on 32-bit platforms. + // + // It is used when slices are limited to Uint32 on 64-bit platforms (the + // length limit for slices is naturally MaxInt on 32-bit platforms). + // + // Taken from https://github.com/cockroachdb/pebble/blob/master/internal/constants/constants.go + maxMemTableSize := (1<<31)<<(^uint(0)>>63) - 1 + + // Two memory tables is configured which is identical to leveldb, + // including a frozen memory table and another live one. + memTableLimit := 2 + memTableSize := cache * 1024 * 1024 / 2 / memTableLimit + + // The memory table size is currently capped at maxMemTableSize-1 due to a + // known bug in the pebble where maxMemTableSize is not recognized as a + // valid size. + // + // TODO use the maxMemTableSize as the maximum table size once the issue + // in pebble is fixed. + if memTableSize >= maxMemTableSize { + memTableSize = maxMemTableSize - 1 + } + opt := &pebble.Options{ + // Pebble has a single combined cache area and the write + // buffers are taken from this too. Assign all available + // memory allowance for cache. + Cache: pebble.NewCache(int64(cache * 1024 * 1024)), + MaxOpenFiles: handles, + + // The size of memory table(as well as the write buffer). + // Note, there may have more than two memory tables in the system. + MemTableSize: uint64(memTableSize), + + // MemTableStopWritesThreshold places a hard limit on the size + // of the existent MemTables(including the frozen one). + // Note, this must be the number of tables not the size of all memtables + // according to https://github.com/cockroachdb/pebble/blob/master/options.go#L738-L742 + // and to https://github.com/cockroachdb/pebble/blob/master/db.go#L1892-L1903. + MemTableStopWritesThreshold: memTableLimit, + + // The default compaction concurrency(1 thread), + // Here use all available CPUs for faster compaction. + MaxConcurrentCompactions: runtime.NumCPU, + + // Per-level options. Options for at least one level must be specified. The + // options for the last level are used for all subsequent levels. + Levels: []pebble.LevelOptions{ + {TargetFileSize: 2 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)}, + {TargetFileSize: 4 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)}, + {TargetFileSize: 8 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)}, + {TargetFileSize: 16 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)}, + {TargetFileSize: 32 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)}, + {TargetFileSize: 64 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)}, + {TargetFileSize: 128 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)}, + }, + ReadOnly: false, + } + // Disable seek compaction explicitly. Check https://github.com/ethereum/go-ethereum/pull/20130 + // for more details. + opt.Experimental.ReadSamplingMultiplier = -1 + db, err := pebble.Open(dataDir+"/"+namespace, opt) return db, err } @@ -29,9 +121,14 @@ type ContentStorage struct { nodeId enode.ID storageCapacityInBytes uint64 radius atomic.Value - // size uint64 - log log.Logger - db ethdb.KeyValueStore + log log.Logger + db *pebble.DB + size uint64 + sizeChan chan uint64 + sizeMutex sync.RWMutex + isPruning bool + pruneDoneChan chan uint64 // finish prune and get the pruned size + writeOptions *pebble.WriteOptions } func NewPeppleStorage(config PeppleStorageConfig) (storage.ContentStorage, error) { @@ -40,17 +137,16 @@ func NewPeppleStorage(config PeppleStorageConfig) (storage.ContentStorage, error db: config.DB, storageCapacityInBytes: config.StorageCapacityMB * 1000_000, log: log.New("storage", config.NetworkName), + sizeChan: make(chan uint64, 100), + pruneDoneChan: make(chan uint64, 1), + writeOptions: &pebble.WriteOptions{Sync: false}, } cs.radius.Store(storage.MaxDistance) - exist, err := cs.db.Has(storage.RadisuKey) - if err != nil { + radius, _, err := cs.db.Get(storage.RadisuKey) + if err != nil && err != pebble.ErrNotFound { return nil, err } - if exist { - radius, err := cs.db.Get(storage.RadisuKey) - if err != nil { - return nil, err - } + if err == nil { dis := uint256.NewInt(0) err = dis.UnmarshalSSZ(radius) if err != nil { @@ -58,17 +154,40 @@ func NewPeppleStorage(config PeppleStorageConfig) (storage.ContentStorage, error } cs.radius.Store(dis) } + + val, _, err := cs.db.Get(storage.SizeKey) + if err != nil && err != pebble.ErrNotFound { + return nil, err + } + if err == nil { + size := binary.BigEndian.Uint64(val) + // init stage, no need to use lock + cs.size = size + } + go cs.saveCapacity() return cs, nil } // Get implements storage.ContentStorage. func (c *ContentStorage) Get(contentKey []byte, contentId []byte) ([]byte, error) { - return c.db.Get(contentId) + distance := xor(contentId, c.nodeId[:]) + data, closer, err := c.db.Get(distance) + if err != nil && err != pebble.ErrNotFound { + return nil, err + } + if err == pebble.ErrNotFound { + return nil, storage.ErrContentNotFound + } + closer.Close() + return data, nil } // Put implements storage.ContentStorage. func (c *ContentStorage) Put(contentKey []byte, contentId []byte, content []byte) error { - return c.db.Put(contentId, content) + length := uint64(len(contentId)) + uint64(len(content)) + c.sizeChan <- length + distance := xor(contentId, c.nodeId[:]) + return c.db.Set(distance, content, c.writeOptions) } // Radius implements storage.ContentStorage. @@ -77,3 +196,93 @@ func (c *ContentStorage) Radius() *uint256.Int { val := radius.(*uint256.Int) return val } + +func (c *ContentStorage) saveCapacity() { + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + sizeChanged := false + buf := make([]byte, 8) // uint64 + + for { + select { + case <-ticker.C: + if sizeChanged { + binary.BigEndian.PutUint64(buf, c.size) + err := c.db.Set(storage.SizeKey, buf, c.writeOptions) + if err != nil { + c.log.Error("save capacity failed", "error", err) + } + sizeChanged = false + } + case size := <-c.sizeChan: + c.log.Debug("reveice size %v", size) + c.sizeMutex.Lock() + c.size += size + c.sizeMutex.Unlock() + sizeChanged = true + if c.size > c.storageCapacityInBytes { + if !c.isPruning { + c.isPruning = true + go c.prune() + } + } + case prunedSize := <-c.pruneDoneChan: + c.isPruning = false + c.size -= prunedSize + sizeChanged = true + } + } +} + +func (c *ContentStorage) prune() { + expectSize := uint64(float64(c.storageCapacityInBytes) * contentDeletionFraction) + var curentSize uint64 = 0 + + defer func() { + c.pruneDoneChan <- curentSize + }() + // get the keys to be deleted order by distance desc + iter, err := c.db.NewIter(nil) + if err != nil { + c.log.Error("get iter failed", "error", err) + return + } + + batch := c.db.NewBatch() + for iter.Last(); iter.Valid(); iter.Prev() { + if curentSize < expectSize { + batch.Delete(iter.Key(), nil) + curentSize += uint64(len(iter.Key())) + uint64(len(iter.Value())) + } else { + distance := iter.Key() + c.db.Set(storage.RadisuKey, distance, c.writeOptions) + dis := uint256.NewInt(0) + err = dis.UnmarshalSSZ(distance) + if err != nil { + c.log.Error("unmarshal distance failed", "error", err) + } + c.radius.Store(dis) + break + } + } + err = batch.Commit(&pebble.WriteOptions{Sync: true}) + if err != nil { + c.log.Error("prune batch commit failed", "error", err) + return + } +} + +func xor(contentId, nodeId []byte) []byte { + // length of contentId maybe not 32bytes + padding := make([]byte, 32) + if len(contentId) != len(nodeId) { + copy(padding, contentId) + } else { + padding = contentId + } + res := make([]byte, len(padding)) + for i := range padding { + res[i] = padding[i] ^ nodeId[i] + } + return res +} diff --git a/portalnetwork/storage/ethpepple/storage_test.go b/portalnetwork/storage/ethpepple/storage_test.go index 7bf543df9487..0154f07dfa07 100644 --- a/portalnetwork/storage/ethpepple/storage_test.go +++ b/portalnetwork/storage/ethpepple/storage_test.go @@ -1,86 +1,162 @@ package ethpepple import ( - "crypto/rand" - "encoding/hex" - "os" "testing" + "time" - "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/portalnetwork/storage" "github.com/holiman/uint256" "github.com/stretchr/testify/assert" ) -var testRadius = uint256.NewInt(100000) - -func clearNodeData(path string) { - _ = os.RemoveAll(path) +func genBytes(length int) []byte { + res := make([]byte, length) + for i := 0; i < length; i++ { + res[i] = byte(i) + } + return res } -func getRandomPath() string { - // gen a random hex string - bytes := make([]byte, 32) - _, err := rand.Read(bytes) - if err != nil { - panic(err) - } - return hex.EncodeToString(bytes) +func TestNewPeppleDB(t *testing.T) { + db, err := NewPeppleDB(t.TempDir(), 16, 16, "test") + assert.NoError(t, err) + defer db.Close() + + assert.NotNil(t, db) } -func getTestDb(path string) (storage.ContentStorage, error) { - db, err := NewPeppleDB(path, 100, 100, "history") - if err != nil { - return nil, err - } +func setupTestStorage(t *testing.T) storage.ContentStorage { + db, err := NewPeppleDB(t.TempDir(), 16, 16, "test") + assert.NoError(t, err) + t.Cleanup(func() { db.Close() }) + config := PeppleStorageConfig{ + StorageCapacityMB: 1, DB: db, - StorageCapacityMB: 100, - NodeId: enode.ID{}, - NetworkName: "history", + NodeId: uint256.NewInt(0).Bytes32(), + NetworkName: "test", } - return NewPeppleStorage(config) -} -func TestReadRadius(t *testing.T) { - path := getRandomPath() - db, err := getTestDb(path) + storage, err := NewPeppleStorage(config) assert.NoError(t, err) - defer clearNodeData(path) - assert.True(t, db.Radius().Eq(storage.MaxDistance)) + return storage +} - data, err := testRadius.MarshalSSZ() - assert.NoError(t, err) - db.Put(nil, storage.RadisuKey, data) +func TestContentStoragePutAndGet(t *testing.T) { + db := setupTestStorage(t) - store := db.(*ContentStorage) - err = store.db.Close() - assert.NoError(t, err) + testCases := []struct { + contentKey []byte + contentId []byte + content []byte + }{ + {[]byte("key1"), []byte("id1"), []byte("content1")}, + {[]byte("key2"), []byte("id2"), []byte("content2")}, + } - db, err = getTestDb(path) - assert.NoError(t, err) - assert.True(t, db.Radius().Eq(testRadius)) + for _, tc := range testCases { + err := db.Put(tc.contentKey, tc.contentId, tc.content) + assert.NoError(t, err) + + got, err := db.Get(tc.contentKey, tc.contentId) + assert.NoError(t, err) + assert.Equal(t, tc.content, got) + } } -func TestStorage(t *testing.T) { - path := getRandomPath() - db, err := getTestDb(path) - assert.NoError(t, err) - defer clearNodeData(path) - testcases := map[string][]byte{ - "test1": []byte("test1"), - "test2": []byte("test2"), - "test3": []byte("test3"), - "test4": []byte("test4"), +func TestRadius(t *testing.T) { + db := setupTestStorage(t) + radius := db.Radius() + assert.NotNil(t, radius) + assert.True(t, radius.Eq(storage.MaxDistance)) +} + +func TestXOR(t *testing.T) { + testCases := []struct { + contentId []byte + nodeId []byte + expected []byte + }{ + { + contentId: []byte{0x01}, + nodeId: make([]byte, 32), + expected: append([]byte{0x01}, make([]byte, 31)...), + }, + { + contentId: []byte{0xFF}, + nodeId: []byte{0x0F}, + expected: []byte{0xF0}, + }, } - for key, value := range testcases { - db.Put(nil, []byte(key), value) + for _, tc := range testCases { + result := xor(tc.contentId, tc.nodeId) + assert.Equal(t, tc.expected, result) } +} - for key, value := range testcases { - val, err := db.Get(nil, []byte(key)) - assert.NoError(t, err) - assert.Equal(t, value, val) +// the capacity is 1MB, so prune will delete over 50Kb content +func TestPrune(t *testing.T) { + db := setupTestStorage(t) + // the nodeId is zeros, so contentKey and contentId is the same + testcases := []struct { + contentKey [32]byte + content []byte + shouldPrune bool + }{ + { + contentKey: uint256.NewInt(1).Bytes32(), + content: genBytes(900_000), + shouldPrune: false, + }, + { + contentKey: uint256.NewInt(2).Bytes32(), + content: genBytes(40_000), + shouldPrune: false, + }, + { + contentKey: uint256.NewInt(3).Bytes32(), + content: genBytes(20_000), + shouldPrune: false, + }, + { + contentKey: uint256.NewInt(4).Bytes32(), + content: genBytes(20_000), + shouldPrune: false, + }, + { + contentKey: uint256.NewInt(5).Bytes32(), + content: genBytes(20_000), + shouldPrune: true, + }, + { + contentKey: uint256.NewInt(6).Bytes32(), + content: genBytes(20_000), + shouldPrune: true, + }, + { + contentKey: uint256.NewInt(7).Bytes32(), + content: genBytes(20_000), + shouldPrune: true, + }, + } + + for _, val := range testcases { + db.Put(val.contentKey[:], val.contentKey[:], val.content) + } + // // wait to prune done + time.Sleep(2 * time.Second) + for _, val := range testcases { + content, err := db.Get(val.contentKey[:], val.contentKey[:]) + if !val.shouldPrune { + assert.Equal(t, val.content, content) + } else { + assert.Error(t, err) + } } + radius := db.Radius() + data, err := radius.MarshalSSZ() + assert.NoError(t, err) + actual := uint256.NewInt(4).Bytes32() + assert.Equal(t, data, actual[:]) }