Skip to content

Commit

Permalink
[db] fix Size() and refactor counting index interface
Browse files Browse the repository at this point in the history
  • Loading branch information
dustinxie committed Nov 15, 2024
1 parent 8e7d62f commit 2673dd6
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 127 deletions.
10 changes: 7 additions & 3 deletions blockchain/filedao/filedao_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,9 @@ func (fd *fileDAOv2) Start(ctx context.Context) error {
}

func (fd *fileDAOv2) Stop(ctx context.Context) error {
fd.hashStore.Close()
fd.blkStore.Close()
fd.sysStore.Close()
return fd.kvStore.Stop(ctx)
}

Expand Down Expand Up @@ -230,21 +233,22 @@ func (fd *fileDAOv2) PutBlock(_ context.Context, blk *block.Block) error {
if err := fd.putTipHashHeightMapping(blk); err != nil {
return errors.Wrap(err, "failed to write hash-height mapping")
}

// write block data
if err := fd.putBlock(blk); err != nil {
return errors.Wrap(err, "failed to write block")
}

// write receipt and transaction log
if err := fd.putTransactionLog(blk); err != nil {
return errors.Wrap(err, "failed to write receipt")
}

if err := fd.kvStore.WriteBatch(fd.batch); err != nil {
return errors.Wrapf(err, "failed to put block at height %d", blk.Height())
}
fd.batch.Clear()
// done processing the batch
fd.hashStore.ExternalBatchDone()
fd.blkStore.ExternalBatchDone()
fd.sysStore.ExternalBatchDone()
// update file tip
tip = &FileTip{Height: blk.Height(), Hash: blk.HashBlock()}
fd.storeTip(tip)
Expand Down
17 changes: 3 additions & 14 deletions blockchain/filedao/filedao_v2_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/iotexproject/iotex-core/v2/action"
"github.com/iotexproject/iotex-core/v2/blockchain/block"
"github.com/iotexproject/iotex-core/v2/db"
"github.com/iotexproject/iotex-core/v2/db/batch"
"github.com/iotexproject/iotex-core/v2/pkg/compress"
"github.com/iotexproject/iotex-core/v2/pkg/util/byteutil"
)
Expand Down Expand Up @@ -55,7 +54,7 @@ func (fd *fileDAOv2) populateStagingBuffer() (*stagingBuffer, error) {
func (fd *fileDAOv2) putTipHashHeightMapping(blk *block.Block) error {
// write height <-> hash mapping
h := blk.HashBlock()
if err := addOneEntryToBatch(fd.hashStore, h[:], fd.batch); err != nil {
if err := fd.hashStore.AddToExternalBatch(fd.batch, [][]byte{h[:]}); err != nil {
return err
}

Expand Down Expand Up @@ -104,7 +103,7 @@ func (fd *fileDAOv2) putBlock(blk *block.Block) error {
if blkBytes, err = compBytes(ser, fd.header.Compressor); err != nil {
return err
}
return addOneEntryToBatch(fd.blkStore, blkBytes, fd.batch)
return fd.blkStore.AddToExternalBatch(fd.batch, [][]byte{blkBytes})
}

func (fd *fileDAOv2) putTransactionLog(blk *block.Block) error {
Expand All @@ -116,17 +115,7 @@ func (fd *fileDAOv2) putTransactionLog(blk *block.Block) error {
if err != nil {
return err
}
return addOneEntryToBatch(fd.sysStore, logBytes, fd.batch)
}

func addOneEntryToBatch(c db.CountingIndex, v []byte, b batch.KVStoreBatch) error {
if err := c.UseBatch(b); err != nil {
return err
}
if err := c.Add(v, true); err != nil {
return err
}
return c.Finalize()
return fd.sysStore.AddToExternalBatch(fd.batch, [][]byte{logBytes})
}

func compBytes(v []byte, comp string) ([]byte, error) {
Expand Down
146 changes: 68 additions & 78 deletions db/counting_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ package db

import (
"fmt"
"sync/atomic"

"github.com/pkg/errors"

"github.com/iotexproject/iotex-core/v2/db/batch"
"github.com/iotexproject/iotex-core/v2/pkg/lifecycle"
"github.com/iotexproject/iotex-core/v2/pkg/util/byteutil"
)

Expand All @@ -31,8 +31,14 @@ type (
CountingIndex interface {
// Size returns the total number of keys so far
Size() uint64
// Add inserts a value into the index
Add([]byte, bool) error
// AddOne inserts one entry into the index
AddOne([]byte) error
// AddMultiple inserts multiple entries into the index
AddMultiple([][]byte) error
// AddToExternalBatch inserts entries into external batch
AddToExternalBatch(batch.KVStoreBatch, [][]byte) error
// ExternalBatchDone should be called when the external batch is done
ExternalBatchDone() error
// Get return value of key[slot]
Get(uint64) ([]byte, error)
// Range return value of keys [start, start+count)
Expand All @@ -41,20 +47,13 @@ type (
Revert(uint64) error
// Close makes the index not usable
Close()
// Commit commits the batch
Commit() error
// UseBatch
UseBatch(batch.KVStoreBatch) error
// Finalize
Finalize() error
}

// countingIndex is CountingIndex implementation based on KVStore
countingIndex struct {
lifecycle.Readiness
kvStore KVStoreWithRange
bucket string
size uint64 // total number of keys
batch batch.KVStoreBatch
}
)

Expand All @@ -78,13 +77,11 @@ func NewCountingIndexNX(kv KVStore, name []byte) (CountingIndex, error) {
if err := kv.Put(bucket, CountKey, ZeroIndex); err != nil {
return nil, errors.Wrapf(err, "failed to create counting index %x", name)
}
total = ZeroIndex
}

return &countingIndex{
kvStore: kvRange,
bucket: bucket,
size: byteutil.BytesToUint64BigEndian(total),
}, nil
}

Expand All @@ -103,71 +100,95 @@ func GetCountingIndex(kv KVStore, name []byte) (CountingIndex, error) {
return &countingIndex{
kvStore: kvRange,
bucket: bucket,
size: byteutil.BytesToUint64BigEndian(total),
}, nil
}

// Size returns the total number of keys so far
func (c *countingIndex) Size() uint64 {
return atomic.LoadUint64(&c.size)
return c.size()
}

func (c *countingIndex) size() uint64 {
total, err := c.kvStore.Get(c.bucket, CountKey)
if errors.Cause(err) == ErrNotExist || total == nil {
return 0
}
return byteutil.BytesToUint64BigEndian(total)
}

// Add inserts a value into the index
func (c *countingIndex) Add(value []byte, inBatch bool) error {
if inBatch {
return c.addBatch(value)
func (c *countingIndex) AddOne(value []byte) error {
if err := c.TurnOn(); err != nil {
return errors.Wrap(err, "another add is in active progress")
}
if c.batch != nil {
return errors.Wrap(ErrInvalid, "cannot call Add in batch mode, call Commit() first to exit batch mode")
b := batch.NewBatch()
c.addtoBatch(b, [][]byte{value})
if err := c.kvStore.WriteBatch(b); err != nil {
return err
}
return c.TurnOff()
}

func (c *countingIndex) AddMultiple(value [][]byte) error {
if len(value) == 0 {
return nil
}
if err := c.TurnOn(); err != nil {
return errors.Wrap(err, "another add is in active progress")
}
b := batch.NewBatch()
size := c.Size()
b.Put(c.bucket, byteutil.Uint64ToBytesBigEndian(size), value, fmt.Sprintf("failed to add %d-th item", size+1))
b.Put(c.bucket, CountKey, byteutil.Uint64ToBytesBigEndian(size+1), fmt.Sprintf("failed to update size = %d", size+1))
b.AddFillPercent(c.bucket, 1.0)
c.addtoBatch(b, value)
if err := c.kvStore.WriteBatch(b); err != nil {
return err
}
atomic.AddUint64(&c.size, 1)
return nil
return c.TurnOff()
}

// addBatch inserts a value into the index in batch mode
func (c *countingIndex) addBatch(value []byte) error {
if c.batch == nil {
c.batch = batch.NewBatch()
func (c *countingIndex) AddToExternalBatch(b batch.KVStoreBatch, value [][]byte) error {
if len(value) == 0 {
return nil
}
if err := c.TurnOn(); err != nil {
return errors.Wrap(err, "another add is in active progress")
}
size := c.Size()
c.batch.Put(c.bucket, byteutil.Uint64ToBytesBigEndian(size), value, fmt.Sprintf("failed to add %d-th item", size+1))
atomic.AddUint64(&c.size, 1)
c.addtoBatch(b, value)
return nil
}

func (c *countingIndex) addtoBatch(b batch.KVStoreBatch, value [][]byte) {
size := c.size()
for i := range value {
b.Put(c.bucket, byteutil.Uint64ToBytesBigEndian(size+uint64(i)), value[i], fmt.Sprintf("failed to add %d-th item", size+uint64(i)))
}
b.Put(c.bucket, CountKey, byteutil.Uint64ToBytesBigEndian(size+uint64(len(value))), fmt.Sprintf("failed to update size = %d", size+uint64(len(value))))
b.AddFillPercent(c.bucket, 1.0)
}

// Get return value of key[slot]
func (c *countingIndex) Get(slot uint64) ([]byte, error) {
if slot >= c.Size() {
if slot >= c.size() {
return nil, errors.Wrapf(ErrNotExist, "slot: %d", slot)
}
return c.kvStore.Get(c.bucket, byteutil.Uint64ToBytesBigEndian(slot))
}

// Range return value of keys [start, start+count)
func (c *countingIndex) Range(start, count uint64) ([][]byte, error) {
if start+count > c.Size() || count == 0 {
if start+count > c.size() || count == 0 {
return nil, errors.Wrapf(ErrInvalid, "start: %d, count: %d", start, count)
}
return c.kvStore.Range(c.bucket, byteutil.Uint64ToBytesBigEndian(start), count)
}

// Revert removes entries from end
func (c *countingIndex) Revert(count uint64) error {
if c.batch != nil {
return errors.Wrap(ErrInvalid, "cannot call Revert in batch mode, call Commit() first to exit batch mode")
}
size := c.Size()
size := c.size()
if count == 0 || count > size {
return errors.Wrapf(ErrInvalid, "count: %d", count)
}
if err := c.TurnOn(); err != nil {
return errors.Wrap(err, "an add is in active progress")
}
b := batch.NewBatch()
start := size - count
for i := uint64(0); i < count; i++ {
Expand All @@ -178,49 +199,18 @@ func (c *countingIndex) Revert(count uint64) error {
if err := c.kvStore.WriteBatch(b); err != nil {
return err
}
atomic.StoreUint64(&c.size, start)
return nil
return c.TurnOff()
}

// Close makes the index not usable
func (c *countingIndex) Close() {
// frees reference to db, the db object itself will be closed/freed by its owner, not here
c.kvStore = nil
c.batch = nil
}

// Commit commits a batch
func (c *countingIndex) Commit() error {
if c.batch == nil {
func (c *countingIndex) ExternalBatchDone() error {
if !c.IsReady() {
return nil
}
size := c.Size()
c.batch.Put(c.bucket, CountKey, byteutil.Uint64ToBytesBigEndian(size), fmt.Sprintf("failed to update size = %d", size))
c.batch.AddFillPercent(c.bucket, 1.0)
if err := c.kvStore.WriteBatch(c.batch); err != nil {
return err
}
c.batch = nil
return nil
return c.TurnOff()
}

// UseBatch sets a (usually common) batch for the counting index to use
func (c *countingIndex) UseBatch(b batch.KVStoreBatch) error {
if b == nil {
return ErrInvalid
}
c.batch = b
return nil
}

// Finalize updates the total size before committing the (usually common) batch
func (c *countingIndex) Finalize() error {
if c.batch == nil {
return ErrInvalid
}
size := c.Size()
c.batch.Put(c.bucket, CountKey, byteutil.Uint64ToBytesBigEndian(size), fmt.Sprintf("failed to update size = %d", size))
c.batch.AddFillPercent(c.bucket, 1.0)
c.batch = nil
return nil
// Close makes the index not usable
func (c *countingIndex) Close() {
// frees reference to db, the db object itself will be closed/freed by its owner, not here
c.kvStore = nil
}
Loading

0 comments on commit 2673dd6

Please sign in to comment.