Skip to content

Commit

Permalink
Cherry-pick IOTEX-192 Removing PutIfNotExists API db
Browse files Browse the repository at this point in the history
  • Loading branch information
dustinxie committed Jan 15, 2019
1 parent 57ade5e commit 9226bbb
Show file tree
Hide file tree
Showing 8 changed files with 67 additions and 161 deletions.
11 changes: 11 additions & 0 deletions blockchain/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"sync"
"sync/atomic"

"github.com/boltdb/bolt"
"github.com/facebookgo/clock"
"github.com/pkg/errors"

Expand Down Expand Up @@ -943,6 +944,16 @@ func (bc *blockchain) validateBlock(blk *Block, containCoinbase bool) error {

// commitBlock commits a block to the chain
func (bc *blockchain) commitBlock(blk *Block) error {
// Check if it is already exists, and return earlier
blkHash, err := bc.dao.getBlockHash(blk.Height())
if blkHash != hash.ZeroHash32B {
logger.Debug().Uint64("height", blk.Height()).Msg("Block already exists")
return nil
}
// If it's a ready db io error, return earlier with the error
if errors.Cause(err) != db.ErrNotExist && errors.Cause(err) != bolt.ErrBucketNotFound {
return err
}
// write block into DB
if err := bc.dao.putBlock(blk); err != nil {
return err
Expand Down
10 changes: 4 additions & 6 deletions blockchain/blockchain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,12 +378,11 @@ func TestLoadBlockchainfromDB(t *testing.T) {
require.NotNil(err)
fmt.Printf("Cannot validate block %d: %v\n", blk.Height(), err)

// cannot add existing block again
// add existing block again will have no effect
blk, err = bc.GetBlockByHeight(3)
require.NotNil(blk)
require.Nil(err)
err = bc.(*blockchain).commitBlock(blk)
require.NotNil(err)
require.NoError(bc.(*blockchain).commitBlock(blk))
fmt.Printf("Cannot add block 3 again: %v\n", err)

// check all Tx from block 4
Expand Down Expand Up @@ -584,12 +583,11 @@ func TestLoadBlockchainfromDBWithoutExplorer(t *testing.T) {
err = bc.ValidateBlock(blk, true)
require.NotNil(err)
fmt.Printf("Cannot validate block %d: %v\n", blk.Height(), err)
// cannot add existing block again
// add existing block again will have no effect
blk, err = bc.GetBlockByHeight(3)
require.NotNil(blk)
require.Nil(err)
err = bc.(*blockchain).commitBlock(blk)
require.NotNil(err)
require.NoError(bc.(*blockchain).commitBlock(blk))
fmt.Printf("Cannot add block 3 again: %v\n", err)
// check all Tx from block 4
blk, err = bc.GetBlockByHeight(4)
Expand Down
58 changes: 35 additions & 23 deletions blockchain/blockdao.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package blockchain
import (
"context"

"github.com/boltdb/bolt"
"github.com/pkg/errors"

"github.com/iotexproject/iotex-core/action"
Expand Down Expand Up @@ -86,33 +87,44 @@ func (dao *blockDAO) Start(ctx context.Context) error {
}

// set init height value
if err := dao.kvstore.PutIfNotExists(blockNS, topHeightKey, make([]byte, 8)); err != nil {
// ok on none-fresh db
if err == db.ErrAlreadyExist {
return nil
// TODO: not working with badger, we shouldn't expose detailed db error (e.g., bolt.ErrBucketExists) to application
if _, err = dao.kvstore.Get(blockNS, topHeightKey); err != nil &&
(errors.Cause(err) == db.ErrNotExist || errors.Cause(err) == bolt.ErrBucketNotFound) {
if err := dao.kvstore.Put(blockNS, topHeightKey, make([]byte, 8)); err != nil {
return errors.Wrap(err, "failed to write initial value for top height")
}

return errors.Wrap(err, "failed to write initial value for top height")
}

// set init total transfer to be 0
if err = dao.kvstore.PutIfNotExists(blockNS, totalTransfersKey, make([]byte, 8)); err != nil {
return errors.Wrap(err, "failed to write initial value for total transfers")
if _, err := dao.kvstore.Get(blockNS, totalTransfersKey); err != nil &&
(errors.Cause(err) == db.ErrNotExist || errors.Cause(err) == bolt.ErrBucketNotFound) {
if err = dao.kvstore.Put(blockNS, totalTransfersKey, make([]byte, 8)); err != nil {
return errors.Wrap(err, "failed to write initial value for total transfers")
}
}

// set init total vote to be 0
if err = dao.kvstore.PutIfNotExists(blockNS, totalVotesKey, make([]byte, 8)); err != nil {
return errors.Wrap(err, "failed to write initial value for total votes")
if _, err := dao.kvstore.Get(blockNS, totalVotesKey); err != nil &&
(errors.Cause(err) == db.ErrNotExist || errors.Cause(err) == bolt.ErrBucketNotFound) {
if err = dao.kvstore.Put(blockNS, totalVotesKey, make([]byte, 8)); err != nil {
return errors.Wrap(err, "failed to write initial value for total votes")
}
}

// set init total executions to be 0
if err = dao.kvstore.PutIfNotExists(blockNS, totalExecutionsKey, make([]byte, 8)); err != nil {
return errors.Wrap(err, "failed to write initial value for total executions")
if _, err := dao.kvstore.Get(blockNS, totalExecutionsKey); err != nil &&
(errors.Cause(err) == db.ErrNotExist || errors.Cause(err) == bolt.ErrBucketNotFound) {
if err = dao.kvstore.Put(blockNS, totalExecutionsKey, make([]byte, 8)); err != nil {
return errors.Wrap(err, "failed to write initial value for total executions")
}
}

// set init total actions to be 0
if err = dao.kvstore.PutIfNotExists(blockNS, totalActionsKey, make([]byte, 8)); err != nil {
return errors.Wrap(err, "failed to write initial value for total actions")
if _, err := dao.kvstore.Get(blockNS, totalActionsKey); err != nil &&
(errors.Cause(err) == db.ErrNotExist || errors.Cause(err) == bolt.ErrBucketNotFound) {
if err = dao.kvstore.Put(blockNS, totalActionsKey, make([]byte, 8)); err != nil {
return errors.Wrap(err, "failed to write initial value for total actions")
}
}

return nil
Expand Down Expand Up @@ -633,7 +645,7 @@ func (dao *blockDAO) putBlock(blk *Block) error {
return errors.Wrap(err, "failed to serialize block")
}
hash := blk.HashBlock()
batch.PutIfNotExists(blockNS, hash[:], serialized, "failed to put block")
batch.Put(blockNS, hash[:], serialized, "failed to put block")

hashKey := append(hashPrefix, hash[:]...)
batch.Put(blockHashHeightMappingNS, hashKey, height, "failed to put hash -> height mapping")
Expand Down Expand Up @@ -770,7 +782,7 @@ func putTransfers(dao *blockDAO, blk *Block, batch db.KVStoreBatch) error {
// put new transfer to sender
senderKey := append(transferFromPrefix, transfer.Sender()...)
senderKey = append(senderKey, byteutil.Uint64ToBytes(senderTransferCount)...)
batch.PutIfNotExists(blockAddressTransferMappingNS, senderKey, transferHash[:],
batch.Put(blockAddressTransferMappingNS, senderKey, transferHash[:],
"failed to put transfer hash %x for sender %x", transfer.Hash(), transfer.Sender())

// update sender transfers count
Expand All @@ -795,7 +807,7 @@ func putTransfers(dao *blockDAO, blk *Block, batch db.KVStoreBatch) error {
recipientKey := append(transferToPrefix, transfer.Recipient()...)
recipientKey = append(recipientKey, byteutil.Uint64ToBytes(recipientTransferCount)...)

batch.PutIfNotExists(blockAddressTransferMappingNS, recipientKey, transferHash[:],
batch.Put(blockAddressTransferMappingNS, recipientKey, transferHash[:],
"failed to put transfer hash %x for recipient %x", transfer.Hash(), transfer.Recipient())

// update recipient transfers count
Expand Down Expand Up @@ -834,7 +846,7 @@ func putVotes(dao *blockDAO, blk *Block, batch db.KVStoreBatch) error {
// put new vote to sender
senderKey := append(voteFromPrefix, Sender...)
senderKey = append(senderKey, byteutil.Uint64ToBytes(senderVoteCount)...)
batch.PutIfNotExists(blockAddressVoteMappingNS, senderKey, voteHash[:],
batch.Put(blockAddressVoteMappingNS, senderKey, voteHash[:],
"failed to put vote hash %x for sender %x", voteHash, Sender)

// update sender votes count
Expand All @@ -858,7 +870,7 @@ func putVotes(dao *blockDAO, blk *Block, batch db.KVStoreBatch) error {
// put new vote to recipient
recipientKey := append(voteToPrefix, Recipient...)
recipientKey = append(recipientKey, byteutil.Uint64ToBytes(recipientVoteCount)...)
batch.PutIfNotExists(blockAddressVoteMappingNS, recipientKey, voteHash[:],
batch.Put(blockAddressVoteMappingNS, recipientKey, voteHash[:],
"failed to put vote hash %x for recipient %x", voteHash, Recipient)

// update recipient votes count
Expand Down Expand Up @@ -895,7 +907,7 @@ func putExecutions(dao *blockDAO, blk *Block, batch db.KVStoreBatch) error {
// put new execution to executor
executorKey := append(executionFromPrefix, execution.Executor()...)
executorKey = append(executorKey, byteutil.Uint64ToBytes(executorExecutionCount)...)
batch.PutIfNotExists(blockAddressExecutionMappingNS, executorKey, executionHash[:],
batch.Put(blockAddressExecutionMappingNS, executorKey, executionHash[:],
"failed to put execution hash %x for executor %x", execution.Hash(), execution.Executor())

// update executor executions count
Expand All @@ -919,7 +931,7 @@ func putExecutions(dao *blockDAO, blk *Block, batch db.KVStoreBatch) error {
// put new execution to contract
contractKey := append(executionToPrefix, execution.Contract()...)
contractKey = append(contractKey, byteutil.Uint64ToBytes(contractExecutionCount)...)
batch.PutIfNotExists(blockAddressExecutionMappingNS, contractKey, executionHash[:],
batch.Put(blockAddressExecutionMappingNS, contractKey, executionHash[:],
"failed to put execution hash %x for contract %x", execution.Hash(), execution.Contract())

// update contract executions count
Expand Down Expand Up @@ -962,7 +974,7 @@ func putActions(dao *blockDAO, blk *Block, batch db.KVStoreBatch) error {
// put new action to sender
senderKey := append(actionFromPrefix, act.SrcAddr()...)
senderKey = append(senderKey, byteutil.Uint64ToBytes(senderActionCount)...)
batch.PutIfNotExists(blockAddressActionMappingNS, senderKey, actHash[:],
batch.Put(blockAddressActionMappingNS, senderKey, actHash[:],
"failed to put action hash %x for sender %s", actHash, act.SrcAddr())

// update sender action count
Expand All @@ -986,7 +998,7 @@ func putActions(dao *blockDAO, blk *Block, batch db.KVStoreBatch) error {
// put new action to recipient
recipientKey := append(actionToPrefix, act.DstAddr()...)
recipientKey = append(recipientKey, byteutil.Uint64ToBytes(recipientActionCount)...)
batch.PutIfNotExists(blockAddressActionMappingNS, recipientKey, actHash[:],
batch.Put(blockAddressActionMappingNS, recipientKey, actHash[:],
"failed to put action hash %x for recipient %s", actHash, act.DstAddr())

// update recipient action count
Expand Down
9 changes: 2 additions & 7 deletions blocksync/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,8 @@ func (b *blockBuffer) Flush(blk *blockchain.Block) (bool, bCheckinResult) {
}
delete(b.blocks, next)
if err := commitBlock(b.bc, b.ap, blk); err != nil {
l.Error().Err(err).Uint64("syncHeight", next).
Msg("Failed to commit the block.")
// unable to commit, check reason
committedBlk, err := b.bc.GetBlockByHeight(next)
if err != nil || committedBlk.HashBlock() != blk.HashBlock() {
break
}
l.Error().Err(err).Uint64("syncHeight", next).Msg("Failed to commit the block.")
break
}
moved = true
confirmedHeight = next
Expand Down
29 changes: 0 additions & 29 deletions db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ type KVStore interface {

// Put insert or update a record identified by (namespace, key)
Put(string, []byte, []byte) error
// Put puts a record only if (namespace, key) doesn't exist, otherwise return ErrAlreadyExist
PutIfNotExists(string, []byte, []byte) error
// Get gets a record by (namespace, key)
Get(string, []byte) ([]byte, error)
// Delete deletes a record by (namespace, key)
Expand Down Expand Up @@ -71,16 +69,6 @@ func (m *memKVStore) Put(namespace string, key, value []byte) error {
return nil
}

// PutIfNotExists inserts a <key, value> record only if it does not exist yet, otherwise return ErrAlreadyExist
func (m *memKVStore) PutIfNotExists(namespace string, key, value []byte) error {
m.bucket[namespace] = struct{}{}
_, loaded := m.data.LoadOrStore(namespace+keyDelimiter+string(key), value)
if loaded {
return ErrAlreadyExist
}
return nil
}

// Get retrieves a record
func (m *memKVStore) Get(namespace string, key []byte) ([]byte, error) {
if _, ok := m.bucket[namespace]; !ok {
Expand Down Expand Up @@ -121,11 +109,6 @@ func (m *memKVStore) Commit(b KVStoreBatch) (e error) {
e = err
break
}
} else if write.writeType == PutIfNotExists {
if err := m.PutIfNotExists(write.namespace, write.key, write.value); err != nil {
e = err
break
}
} else if write.writeType == Delete {
if err := m.Delete(write.namespace, write.key); err != nil {
e = err
Expand Down Expand Up @@ -315,18 +298,6 @@ func (b *boltDB) Commit(batch KVStoreBatch) (err error) {
if err := bucket.Put(write.key, write.value); err != nil {
return errors.Wrapf(err, write.errorFormat, write.errorArgs)
}
} else if write.writeType == PutIfNotExists {
bucket, err := tx.CreateBucketIfNotExists([]byte(write.namespace))
if err != nil {
return errors.Wrapf(err, write.errorFormat, write.errorArgs)
}
if bucket.Get(write.key) == nil {
if err := bucket.Put(write.key, write.value); err != nil {
return errors.Wrapf(err, write.errorFormat, write.errorArgs)
}
} else {
return ErrAlreadyExist
}
} else if write.writeType == Delete {
bucket := tx.Bucket([]byte(write.namespace))
if bucket == nil {
Expand Down
25 changes: 0 additions & 25 deletions db/db_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ type (
// b := NewBatch()
// and keep batching Put/Delete operation into it
// b.Put(bucket, k, v)
// b.PutIfNotExists(bucket, k, v)
// b.Delete(bucket, k, v)
// once it's done, call KVStore interface's Commit() to persist to underlying DB
// KVStore.Commit(b)
Expand All @@ -36,8 +35,6 @@ type (
ClearAndUnlock()
// Put insert or update a record identified by (namespace, key)
Put(string, []byte, []byte, string, ...interface{})
// PutIfNotExists puts a record only if (namespace, key) doesn't exist, otherwise return ErrAlreadyExist
PutIfNotExists(string, []byte, []byte, string, ...interface{}) error
// Delete deletes a record by (namespace, key)
Delete(string, []byte, string, ...interface{})
// Size returns the size of batch
Expand Down Expand Up @@ -85,8 +82,6 @@ const (
Put int32 = iota
// Delete indicate the type of write operation to be Delete
Delete int32 = 1
// PutIfNotExists indicate the type of write operation to be PutIfNotExists
PutIfNotExists int32 = 2
)

// NewBatch returns a batch
Expand Down Expand Up @@ -117,14 +112,6 @@ func (b *baseKVStoreBatch) Put(namespace string, key, value []byte, errorFormat
b.batch(Put, namespace, key, value, errorFormat, errorArgs)
}

// PutIfNotExists inserts a <key, value> record only if it does not exist yet, otherwise return ErrAlreadyExist
func (b *baseKVStoreBatch) PutIfNotExists(namespace string, key, value []byte, errorFormat string, errorArgs ...interface{}) error {
b.mutex.Lock()
defer b.mutex.Unlock()
b.batch(PutIfNotExists, namespace, key, value, errorFormat, errorArgs)
return nil
}

// Delete deletes a record
func (b *baseKVStoreBatch) Delete(namespace string, key []byte, errorFormat string, errorArgs ...interface{}) {
b.mutex.Lock()
Expand Down Expand Up @@ -202,18 +189,6 @@ func (cb *cachedBatch) Put(namespace string, key, value []byte, errorFormat stri
cb.batch(Put, namespace, key, value, errorFormat, errorArgs)
}

// PutIfNotExists inserts a <key, value> record only if it does not exist yet, otherwise return ErrAlreadyExist
func (cb *cachedBatch) PutIfNotExists(namespace string, key, value []byte, errorFormat string, errorArgs ...interface{}) error {
cb.lock.Lock()
defer cb.lock.Unlock()
if _, ok := cb.cache[cb.hash(namespace, key)]; ok {
return ErrAlreadyExist
}
cb.cache[cb.hash(namespace, key)] = value
cb.batch(PutIfNotExists, namespace, key, value, errorFormat, errorArgs)
return nil
}

// Delete deletes a record
func (cb *cachedBatch) Delete(namespace string, key []byte, errorFormat string, errorArgs ...interface{}) {
cb.lock.Lock()
Expand Down
Loading

0 comments on commit 9226bbb

Please sign in to comment.