From e69d3e2dc02660af14b6e85866ae1bcad09dce28 Mon Sep 17 00:00:00 2001 From: Dustin Xie Date: Thu, 25 Apr 2024 14:37:01 -0700 Subject: [PATCH] [db] implement Get/Put/Delete/Version() for BoltDBVersioned --- db/db_versioned.go | 252 +++++++++++++++++++--------- db/db_versioned_test.go | 313 ++++++++++++++++++++++++++++++++++- db/db_versioned_util.go | 3 - db/db_versioned_util_test.go | 26 +++ db/kvstore_versioned.go | 50 ++++++ db/versionpb/version.pb.go | 129 ++------------- db/versionpb/version.proto | 8 - 7 files changed, 566 insertions(+), 215 deletions(-) create mode 100644 db/db_versioned_util_test.go create mode 100644 db/kvstore_versioned.go diff --git a/db/db_versioned.go b/db/db_versioned.go index 9818004778..0d356b9b0d 100644 --- a/db/db_versioned.go +++ b/db/db_versioned.go @@ -7,47 +7,42 @@ package db import ( + "bytes" "context" + "fmt" + "math" + "github.com/pkg/errors" + bolt "go.etcd.io/bbolt" + + "github.com/iotexproject/iotex-core/v2/db/batch" "github.com/iotexproject/iotex-core/v2/pkg/lifecycle" + "github.com/iotexproject/iotex-core/v2/pkg/util/byteutil" +) + +var ( + ErrDeleted = errors.New("deleted in DB") + _minKey = []byte{0} // the minimum key, used to store namespace's metadata ) type ( - // KvVersioned is a versioned key-value store, where each key has multiple - // versions of value (corresponding to different heights in a blockchain) - // - // Versioning is achieved by using (key + 8-byte version) as the actual - // storage key in the underlying DB. For each bucket, a metadata is stored - // at the special key = []byte{0}. The metadata specifies the bucket's name - // and the key length. - // - // For each versioned key, the special location = key + []byte{0} stores the - // key's metadata, which includes the following info: - // 1. the version when the key is first created - // 2. the version when the key is lastly written - // 3. the version when the key is deleted - // 4. hash of the key's last written value (to detect/avoid same write) - // If the location does not store a value, the key has never been written. - // - // How to use a versioned DB: - // - // db := NewBoltDBVersioned(cfg) // creates a versioned DB - // db.Start(ctx) - // defer func() { db.Stop(ctx) }() - // - // kv := db.SetVersion(5) - // value, err := kv.Get("ns", key) // read 'key' at version 5 - // kv = db.SetVersion(8) - // err := kv.Put("ns", key, value) // write 'key' at version 8 - - KvVersioned interface { + VersionedDB interface { lifecycle.StartStopper + // Put insert or update a record identified by (namespace, key) + Put(uint64, string, []byte, []byte) error + + // Get gets a record by (namespace, key) + Get(uint64, string, []byte) ([]byte, error) + + // Delete deletes a record by (namespace, key) + Delete(uint64, string, []byte) error + + // Base returns the underlying KVStore + Base() KVStore + // Version returns the key's most recent version Version(string, []byte) (uint64, error) - - // SetVersion sets the version, and returns a KVStore to call Put()/Get() - SetVersion(uint64) KVStoreBasic } // BoltDBVersioned is KvVersioned implementation based on bolt DB @@ -56,18 +51,12 @@ type ( } ) -// Option sets an option -type Option func(b *BoltDBVersioned) - -// NewBoltDBVersioned instantiates an BoltDB which implements KvVersioned -func NewBoltDBVersioned(cfg Config, opts ...Option) *BoltDBVersioned { - b := &BoltDBVersioned{ +// NewBoltDBVersioned instantiates an BoltDB which implements VersionedDB +func NewBoltDBVersioned(cfg Config) *BoltDBVersioned { + b := BoltDBVersioned{ db: NewBoltDB(cfg), } - for _, opt := range opts { - opt(b) - } - return b + return &b } // Start starts the DB @@ -80,31 +69,114 @@ func (b *BoltDBVersioned) Stop(ctx context.Context) error { return b.db.Stop(ctx) } +// Base returns the underlying KVStore +func (b *BoltDBVersioned) Base() KVStore { + return b.db +} + // Put writes a record -func (b *BoltDBVersioned) Put(ns string, version uint64, key, value []byte) error { +func (b *BoltDBVersioned) Put(version uint64, ns string, key, value []byte) error { if !b.db.IsReady() { return ErrDBNotStarted } - // TODO: implement Put - return nil + // check namespace + vn, err := b.checkNamespace(ns) + if err != nil { + return err + } + buf := batch.NewBatch() + if vn == nil { + // namespace not yet created + buf.Put(ns, _minKey, (&versionedNamespace{ + keyLen: uint32(len(key)), + }).serialize(), "failed to create metadata") + } else { + if len(key) != int(vn.keyLen) { + return errors.Wrapf(ErrInvalid, "invalid key length, expecting %d, got %d", vn.keyLen, len(key)) + } + last, _, err := b.get(math.MaxUint64, ns, key) + if !isNotExist(err) && version < last { + // not allowed to perform write on an earlier version + return ErrInvalid + } + buf.Delete(ns, keyForDelete(key, version), fmt.Sprintf("failed to delete key %x", key)) + } + buf.Put(ns, keyForWrite(key, version), value, fmt.Sprintf("failed to put key %x", key)) + return b.db.WriteBatch(buf) } // Get retrieves the most recent version -func (b *BoltDBVersioned) Get(ns string, version uint64, key []byte) ([]byte, error) { +func (b *BoltDBVersioned) Get(version uint64, ns string, key []byte) ([]byte, error) { if !b.db.IsReady() { return nil, ErrDBNotStarted } - // TODO: implement Get - return nil, nil + // check key's metadata + if err := b.checkNamespaceAndKey(ns, key); err != nil { + return nil, err + } + _, v, err := b.get(version, ns, key) + return v, err } -// Delete deletes a record,if key is nil,this will delete the whole bucket -func (b *BoltDBVersioned) Delete(ns string, key []byte) error { +func (b *BoltDBVersioned) get(version uint64, ns string, key []byte) (uint64, []byte, error) { + var ( + last uint64 + isDelete bool + value []byte + ) + err := b.db.db.View(func(tx *bolt.Tx) error { + bucket := tx.Bucket([]byte(ns)) + if bucket == nil { + return ErrBucketNotExist + } + var ( + c = bucket.Cursor() + min = keyForDelete(key, 0) + key = keyForWrite(key, version) + k, v = c.Seek(key) + ) + if k == nil || bytes.Compare(k, key) == 1 { + k, v = c.Prev() + if k == nil || bytes.Compare(k, min) <= 0 { + // cursor is at the beginning/end of the bucket or smaller than minimum key + return ErrNotExist + } + } + isDelete, last = parseKey(k) + value = make([]byte, len(v)) + copy(value, v) + return nil + }) + if err != nil { + return last, nil, err + } + if isDelete { + return last, nil, ErrDeleted + } + return last, value, nil +} + +// Delete deletes a record, if key does not exist, it returns nil +func (b *BoltDBVersioned) Delete(version uint64, ns string, key []byte) error { if !b.db.IsReady() { return ErrDBNotStarted } - // TODO: implement Delete - return nil + // check key's metadata + if err := b.checkNamespaceAndKey(ns, key); err != nil { + return err + } + last, _, err := b.get(math.MaxUint64, ns, key) + if isNotExist(err) { + return err + } + if version < last { + // not allowed to perform delete on an earlier version + return ErrInvalid + } + buf := batch.NewBatch() + buf.Put(ns, keyForDelete(key, version), nil, fmt.Sprintf("failed to delete key %x", key)) + buf.Delete(ns, keyForWrite(key, version), fmt.Sprintf("failed to delete key %x", key)) + return b.db.WriteBatch(buf) } // Version returns the key's most recent version @@ -112,45 +184,67 @@ func (b *BoltDBVersioned) Version(ns string, key []byte) (uint64, error) { if !b.db.IsReady() { return 0, ErrDBNotStarted } - // TODO: implement Version - return 0, nil -} - -// SetVersion sets the version, and returns a KVStore to call Put()/Get() -func (b *BoltDBVersioned) SetVersion(v uint64) KVStoreBasic { - return &KvWithVersion{ - db: b, - version: v, + // check key's metadata + if err := b.checkNamespaceAndKey(ns, key); err != nil { + return 0, err + } + last, _, err := b.get(math.MaxUint64, ns, key) + if isNotExist(err) { + // key not yet written + err = errors.Wrapf(ErrNotExist, "key = %x doesn't exist", key) } + return last, err } -// KvWithVersion wraps the BoltDBVersioned with a certain version -type KvWithVersion struct { - db *BoltDBVersioned - version uint64 // version for Get/Put() +func isNotExist(err error) bool { + return err == ErrNotExist || err == ErrBucketNotExist } -// Start starts the DB -func (b *KvWithVersion) Start(context.Context) error { - panic("should call BoltDBVersioned's Start method") +func keyForWrite(key []byte, v uint64) []byte { + k := make([]byte, len(key), len(key)+9) + copy(k, key) + k = append(k, byteutil.Uint64ToBytesBigEndian(v)...) + return append(k, 1) } -// Stop stops the DB -func (b *KvWithVersion) Stop(context.Context) error { - panic("should call BoltDBVersioned's Stop method") +func keyForDelete(key []byte, v uint64) []byte { + k := make([]byte, len(key), len(key)+9) + copy(k, key) + k = append(k, byteutil.Uint64ToBytesBigEndian(v)...) + return append(k, 0) } -// Put writes a record -func (b *KvWithVersion) Put(ns string, key, value []byte) error { - return b.db.Put(ns, b.version, key, value) +func parseKey(key []byte) (bool, uint64) { + size := len(key) + return (key[size-1] == 0), byteutil.BytesToUint64BigEndian(key[size-9 : size-1]) } -// Get retrieves a key's value -func (b *KvWithVersion) Get(ns string, key []byte) ([]byte, error) { - return b.db.Get(ns, b.version, key) +func (b *BoltDBVersioned) checkNamespace(ns string) (*versionedNamespace, error) { + data, err := b.db.Get(ns, _minKey) + switch errors.Cause(err) { + case nil: + vn, err := deserializeVersionedNamespace(data) + if err != nil { + return nil, err + } + return vn, nil + case ErrNotExist, ErrBucketNotExist: + return nil, nil + default: + return nil, err + } } -// Delete deletes a key -func (b *KvWithVersion) Delete(ns string, key []byte) error { - return b.db.Delete(ns, key) +func (b *BoltDBVersioned) checkNamespaceAndKey(ns string, key []byte) error { + vn, err := b.checkNamespace(ns) + if err != nil { + return err + } + if vn == nil { + return errors.Wrapf(ErrNotExist, "namespace = %x doesn't exist", ns) + } + if len(key) != int(vn.keyLen) { + return errors.Wrapf(ErrInvalid, "invalid key length, expecting %d, got %d", vn.keyLen, len(key)) + } + return nil } diff --git a/db/db_versioned_test.go b/db/db_versioned_test.go index d132a422b1..d13e9afec0 100644 --- a/db/db_versioned_test.go +++ b/db/db_versioned_test.go @@ -7,21 +7,316 @@ package db import ( - "encoding/hex" + "context" "testing" + "github.com/pkg/errors" "github.com/stretchr/testify/require" + + "github.com/iotexproject/iotex-core/v2/testutil" ) -func TestPb(t *testing.T) { +type versionTest struct { + ns string + k, v []byte + height uint64 + err error +} + +func TestVersionedDB(t *testing.T) { r := require.New(t) + testPath, err := testutil.PathOfTempFile("test-version") + r.NoError(err) + defer func() { + testutil.CleanupPath(testPath) + }() + + cfg := DefaultConfig + cfg.DbPath = testPath + db := NewBoltDBVersioned(cfg) + ctx := context.Background() + r.NoError(db.Start(ctx)) + defer func() { + db.Stop(ctx) + }() - vn := &versionedNamespace{ - name: "3jfsp5@(%)EW*#)_#¡ªº–ƒ˚œade∆…", - keyLen: 5} - data := vn.serialize() - r.Equal("0a29336a667370354028252945572a23295f23c2a1c2aac2bae28093c692cb9ac593616465e28886e280a61005", hex.EncodeToString(data)) - vn1, err := deserializeVersionedNamespace(data) + // namespace and key does not exist + vn, err := db.checkNamespace(_bucket1) + r.Nil(vn) + r.Nil(err) + // write first key, namespace and key now exist + r.NoError(db.Put(0, _bucket1, _k2, _v2)) + vn, err = db.checkNamespace(_bucket1) + r.NoError(err) + r.EqualValues(len(_k2), vn.keyLen) + // check more Put/Get + var ( + _k5 = []byte("key_5") + _k10 = []byte("key_10") + ) + err = db.Put(1, _bucket1, _k10, _v1) + r.Equal("invalid key length, expecting 5, got 6: invalid input", err.Error()) + r.NoError(db.Put(1, _bucket1, _k2, _v1)) + r.NoError(db.Put(3, _bucket1, _k2, _v3)) + r.NoError(db.Put(6, _bucket1, _k2, _v2)) + r.NoError(db.Put(2, _bucket1, _k4, _v2)) + r.NoError(db.Put(4, _bucket1, _k4, _v1)) + r.NoError(db.Put(7, _bucket1, _k4, _v3)) + for _, e := range []versionTest{ + {_bucket2, _k1, nil, 0, ErrNotExist}, // bucket not exist + {_bucket1, _k1, nil, 0, ErrNotExist}, + {_bucket1, _k2, _v2, 0, nil}, + {_bucket1, _k2, _v1, 1, nil}, + {_bucket1, _k2, _v1, 2, nil}, + {_bucket1, _k2, _v3, 3, nil}, + {_bucket1, _k2, _v3, 5, nil}, + {_bucket1, _k2, _v2, 6, nil}, + {_bucket1, _k2, _v2, 7, nil}, // after last write version + {_bucket1, _k3, nil, 0, ErrNotExist}, + {_bucket1, _k4, nil, 1, ErrNotExist}, // before first write version + {_bucket1, _k4, _v2, 2, nil}, + {_bucket1, _k4, _v2, 3, nil}, + {_bucket1, _k4, _v1, 4, nil}, + {_bucket1, _k4, _v1, 6, nil}, + {_bucket1, _k4, _v3, 7, nil}, + {_bucket1, _k4, _v3, 8, nil}, // larger than last key in bucket + {_bucket1, _k5, nil, 0, ErrNotExist}, + {_bucket1, _k10, nil, 0, ErrInvalid}, + } { + value, err := db.Get(e.height, e.ns, e.k) + r.Equal(e.err, errors.Cause(err)) + r.Equal(e.v, value) + } + // overwrite the same height again + r.NoError(db.Put(6, _bucket1, _k2, _v4)) + r.NoError(db.Put(7, _bucket1, _k4, _v4)) + // write to earlier version again is invalid + r.Equal(ErrInvalid, db.Put(3, _bucket1, _k2, _v4)) + r.Equal(ErrInvalid, db.Put(4, _bucket1, _k4, _v4)) + // write with same value + r.NoError(db.Put(9, _bucket1, _k2, _v4)) + r.NoError(db.Put(10, _bucket1, _k4, _v4)) + for _, e := range []versionTest{ + {_bucket2, _k1, nil, 0, ErrNotExist}, // bucket not exist + {_bucket1, _k1, nil, 0, ErrNotExist}, + {_bucket1, _k2, _v2, 0, nil}, + {_bucket1, _k2, _v1, 1, nil}, + {_bucket1, _k2, _v1, 2, nil}, + {_bucket1, _k2, _v3, 3, nil}, + {_bucket1, _k2, _v3, 5, nil}, + {_bucket1, _k2, _v4, 6, nil}, + {_bucket1, _k2, _v4, 8, nil}, + {_bucket1, _k2, _v4, 9, nil}, + {_bucket1, _k2, _v4, 10, nil}, // after last write version + {_bucket1, _k3, nil, 0, ErrNotExist}, + {_bucket1, _k4, nil, 1, ErrNotExist}, // before first write version + {_bucket1, _k4, _v2, 2, nil}, + {_bucket1, _k4, _v2, 3, nil}, + {_bucket1, _k4, _v1, 4, nil}, + {_bucket1, _k4, _v1, 6, nil}, + {_bucket1, _k4, _v4, 7, nil}, + {_bucket1, _k4, _v4, 9, nil}, + {_bucket1, _k4, _v4, 10, nil}, + {_bucket1, _k4, _v4, 11, nil}, // larger than last key in bucket + {_bucket1, _k5, nil, 0, ErrNotExist}, + {_bucket1, _k10, nil, 0, ErrInvalid}, + } { + value, err := db.Get(e.height, e.ns, e.k) + r.Equal(e.err, errors.Cause(err)) + r.Equal(e.v, value) + } + // check version + for _, e := range []versionTest{ + {_bucket1, _k1, nil, 0, ErrNotExist}, + {_bucket1, _k2, nil, 9, nil}, + {_bucket1, _k3, nil, 0, ErrNotExist}, + {_bucket1, _k4, nil, 10, nil}, + {_bucket1, _k5, nil, 0, ErrNotExist}, + {_bucket1, _k10, nil, 0, ErrInvalid}, + } { + value, err := db.Version(e.ns, e.k) + r.Equal(e.err, errors.Cause(err)) + r.Equal(e.height, value) + } + // test delete + r.Equal(ErrNotExist, errors.Cause(db.Delete(10, _bucket2, _k1))) + for _, k := range [][]byte{_k2, _k4} { + r.NoError(db.Delete(11, _bucket1, k)) + } + for _, k := range [][]byte{_k1, _k3, _k5} { + r.Equal(ErrNotExist, errors.Cause(db.Delete(10, _bucket1, k))) + } + r.Equal(ErrInvalid, errors.Cause(db.Delete(10, _bucket1, _k10))) + // key still can be read before delete version + for _, e := range []versionTest{ + {_bucket2, _k1, nil, 0, ErrNotExist}, // bucket not exist + {_bucket1, _k1, nil, 0, ErrNotExist}, + {_bucket1, _k2, _v2, 0, nil}, + {_bucket1, _k2, _v1, 1, nil}, + {_bucket1, _k2, _v1, 2, nil}, + {_bucket1, _k2, _v3, 3, nil}, + {_bucket1, _k2, _v3, 5, nil}, + {_bucket1, _k2, _v4, 6, nil}, + {_bucket1, _k2, _v4, 8, nil}, + {_bucket1, _k2, _v4, 9, nil}, + {_bucket1, _k2, _v4, 10, nil}, // before delete version + {_bucket1, _k2, nil, 11, ErrDeleted}, // after delete version + {_bucket1, _k3, nil, 0, ErrNotExist}, + {_bucket1, _k4, nil, 1, ErrNotExist}, // before first write version + {_bucket1, _k4, _v2, 2, nil}, + {_bucket1, _k4, _v2, 3, nil}, + {_bucket1, _k4, _v1, 4, nil}, + {_bucket1, _k4, _v1, 6, nil}, + {_bucket1, _k4, _v4, 7, nil}, + {_bucket1, _k4, _v4, 10, nil}, // before delete version + {_bucket1, _k4, nil, 11, ErrDeleted}, // after delete version + {_bucket1, _k5, nil, 0, ErrNotExist}, + {_bucket1, _k10, nil, 0, ErrInvalid}, + } { + value, err := db.Get(e.height, e.ns, e.k) + r.Equal(e.err, errors.Cause(err)) + r.Equal(e.v, value) + } + // write before delete version is invalid + r.Equal(ErrInvalid, db.Put(9, _bucket1, _k2, _k2)) + r.Equal(ErrInvalid, db.Put(9, _bucket1, _k4, _k4)) + for _, e := range []versionTest{ + {_bucket1, _k2, _v4, 10, nil}, // before delete version + {_bucket1, _k2, nil, 11, ErrDeleted}, // after delete version + {_bucket1, _k4, _v4, 10, nil}, // before delete version + {_bucket1, _k4, nil, 11, ErrDeleted}, // after delete version + } { + value, err := db.Get(e.height, e.ns, e.k) + r.Equal(e.err, errors.Cause(err)) + r.Equal(e.v, value) + } + // write after delete version + r.NoError(db.Put(12, _bucket1, _k2, _k2)) + r.NoError(db.Put(12, _bucket1, _k4, _k4)) + for _, e := range []versionTest{ + {_bucket2, _k1, nil, 0, ErrNotExist}, // bucket not exist + {_bucket1, _k1, nil, 0, ErrNotExist}, + {_bucket1, _k2, _v2, 0, nil}, + {_bucket1, _k2, _v1, 1, nil}, + {_bucket1, _k2, _v1, 2, nil}, + {_bucket1, _k2, _v3, 3, nil}, + {_bucket1, _k2, _v3, 5, nil}, + {_bucket1, _k2, _v4, 6, nil}, + {_bucket1, _k2, _v4, 8, nil}, + {_bucket1, _k2, _v4, 10, nil}, // before delete version + {_bucket1, _k2, nil, 11, ErrDeleted}, // after delete version + {_bucket1, _k2, _k2, 12, nil}, // after next write version + {_bucket1, _k3, nil, 0, ErrNotExist}, + {_bucket1, _k4, nil, 1, ErrNotExist}, // before first write version + {_bucket1, _k4, _v2, 2, nil}, + {_bucket1, _k4, _v2, 3, nil}, + {_bucket1, _k4, _v1, 4, nil}, + {_bucket1, _k4, _v1, 6, nil}, + {_bucket1, _k4, _v4, 7, nil}, + {_bucket1, _k4, _v4, 10, nil}, // before delete version + {_bucket1, _k4, nil, 11, ErrDeleted}, // after delete version + {_bucket1, _k4, _k4, 12, nil}, // after next write version + {_bucket1, _k5, nil, 0, ErrNotExist}, + {_bucket1, _k10, nil, 0, ErrInvalid}, + } { + value, err := db.Get(e.height, e.ns, e.k) + r.Equal(e.err, errors.Cause(err)) + r.Equal(e.v, value) + } + // check version after delete + for _, e := range []versionTest{ + {_bucket1, _k1, nil, 0, ErrNotExist}, + {_bucket1, _k2, nil, 12, nil}, + {_bucket1, _k3, nil, 0, ErrNotExist}, + {_bucket1, _k4, nil, 12, nil}, + {_bucket1, _k5, nil, 0, ErrNotExist}, + {_bucket1, _k10, nil, 0, ErrInvalid}, + } { + value, err := db.Version(e.ns, e.k) + r.Equal(e.err, errors.Cause(err)) + r.Equal(e.height, value) + } +} + +func TestMultipleWriteDelete(t *testing.T) { + r := require.New(t) + testPath, err := testutil.PathOfTempFile("test-version") + r.NoError(err) + defer func() { + testutil.CleanupPath(testPath) + }() + + cfg := DefaultConfig + cfg.DbPath = testPath + db := NewBoltDBVersioned(cfg) + ctx := context.Background() + r.NoError(db.Start(ctx)) + defer func() { + db.Stop(ctx) + }() + + // multiple writes and deletes + r.NoError(db.Put(1, _bucket1, _k2, _v1)) + r.NoError(db.Put(3, _bucket1, _k2, _v3)) + v, err := db.Version(_bucket1, _k2) + r.NoError(err) + r.EqualValues(3, v) + r.NoError(db.Delete(7, _bucket1, _k2)) + _, err = db.Version(_bucket1, _k2) + r.Equal(ErrDeleted, errors.Cause(err)) + r.NoError(db.Put(10, _bucket1, _k2, _v2)) + v, err = db.Version(_bucket1, _k2) + r.NoError(err) + r.EqualValues(10, v) + r.NoError(db.Delete(15, _bucket1, _k2)) + _, err = db.Version(_bucket1, _k2) + r.Equal(ErrDeleted, errors.Cause(err)) + r.NoError(db.Put(18, _bucket1, _k2, _v3)) + r.NoError(db.Delete(18, _bucket1, _k2)) // delete-after-write + _, err = db.Version(_bucket1, _k2) + r.Equal(ErrDeleted, errors.Cause(err)) + r.NoError(db.Put(18, _bucket1, _k2, _v3)) // write again + value, err := db.Get(18, _bucket1, _k2) + r.NoError(err) + r.Equal(_v3, value) + v, err = db.Version(_bucket1, _k2) + r.NoError(err) + r.EqualValues(18, v) + r.NoError(db.Delete(18, _bucket1, _k2)) // delete-after-write + _, err = db.Version(_bucket1, _k2) + r.Equal(ErrDeleted, errors.Cause(err)) + r.NoError(db.Put(21, _bucket1, _k2, _v4)) + v, err = db.Version(_bucket1, _k2) + r.NoError(err) + r.EqualValues(21, v) + r.NoError(db.Delete(25, _bucket1, _k2)) + r.NoError(db.Put(25, _bucket1, _k2, _k2)) // write-after-delete + v, err = db.Version(_bucket1, _k2) r.NoError(err) - r.Equal(vn, vn1) + r.EqualValues(25, v) + for _, e := range []versionTest{ + {_bucket1, _k2, nil, 0, ErrNotExist}, + {_bucket1, _k2, _v1, 1, nil}, + {_bucket1, _k2, _v1, 2, nil}, + {_bucket1, _k2, _v3, 3, nil}, + {_bucket1, _k2, _v3, 6, nil}, + {_bucket1, _k2, nil, 7, ErrDeleted}, + {_bucket1, _k2, nil, 9, ErrDeleted}, + {_bucket1, _k2, _v2, 10, nil}, + {_bucket1, _k2, _v2, 14, nil}, + {_bucket1, _k2, nil, 15, ErrDeleted}, + {_bucket1, _k2, nil, 17, ErrDeleted}, + {_bucket1, _k2, nil, 18, ErrDeleted}, + {_bucket1, _k2, nil, 20, ErrDeleted}, + {_bucket1, _k2, _v4, 21, nil}, + {_bucket1, _k2, _v4, 22, nil}, + {_bucket1, _k2, _v4, 24, nil}, + {_bucket1, _k2, _k2, 25, nil}, + {_bucket1, _k2, _k2, 26, nil}, + {_bucket1, _k2, _k2, 25000, nil}, + } { + value, err := db.Get(e.height, e.ns, e.k) + r.Equal(e.err, errors.Cause(err)) + r.Equal(e.v, value) + } } diff --git a/db/db_versioned_util.go b/db/db_versioned_util.go index d40d1b921b..215d3651b4 100644 --- a/db/db_versioned_util.go +++ b/db/db_versioned_util.go @@ -15,7 +15,6 @@ import ( // versionedNamespace is the metadata for versioned namespace type versionedNamespace struct { - name string keyLen uint32 } @@ -26,14 +25,12 @@ func (vn *versionedNamespace) serialize() []byte { func (vn *versionedNamespace) toProto() *versionpb.VersionedNamespace { return &versionpb.VersionedNamespace{ - Name: vn.name, KeyLen: vn.keyLen, } } func fromProtoVN(pb *versionpb.VersionedNamespace) *versionedNamespace { return &versionedNamespace{ - name: pb.Name, keyLen: pb.KeyLen, } } diff --git a/db/db_versioned_util_test.go b/db/db_versioned_util_test.go new file mode 100644 index 0000000000..e32dfa88fa --- /dev/null +++ b/db/db_versioned_util_test.go @@ -0,0 +1,26 @@ +// Copyright (c) 2021 IoTeX Foundation +// This is an alpha (internal) release and is not suitable for production. This source code is provided 'as is' and no +// warranties are given as to title or non-infringement, merchantability or fitness for purpose and, to the extent +// permitted by law, all liability for your use of the code is disclaimed. This source code is governed by Apache +// License 2.0 that can be found in the LICENSE file. + +package db + +import ( + "encoding/hex" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestPb(t *testing.T) { + r := require.New(t) + + vn := &versionedNamespace{ + keyLen: 5} + data := vn.serialize() + r.Equal("1005", hex.EncodeToString(data)) + vn1, err := deserializeVersionedNamespace(data) + r.NoError(err) + r.Equal(vn, vn1) +} diff --git a/db/kvstore_versioned.go b/db/kvstore_versioned.go new file mode 100644 index 0000000000..e1e85b01b3 --- /dev/null +++ b/db/kvstore_versioned.go @@ -0,0 +1,50 @@ +// Copyright (c) 2024 IoTeX Foundation +// This is an alpha (internal) release and is not suitable for production. This source code is provided 'as is' and no +// warranties are given as to title or non-infringement, merchantability or fitness for purpose and, to the extent +// permitted by law, all liability for your use of the code is disclaimed. This source code is governed by Apache +// License 2.0 that can be found in the LICENSE file. + +package db + +import ( + "github.com/iotexproject/iotex-core/v2/pkg/lifecycle" +) + +type ( + // KvVersioned is a versioned key-value store, where each key has multiple + // versions of value (corresponding to different heights in a blockchain) + // + // Versioning is achieved by using (key + 8-byte version) as the actual + // storage key in the underlying DB. For each bucket, a metadata is stored + // at the special key = []byte{0}. The metadata specifies the bucket's name + // and the key length. + // + // For each versioned key, the special location = key + []byte{0} stores the + // key's metadata, which includes the following info: + // 1. the version when the key is first created + // 2. the version when the key is lastly written + // 3. the version when the key is deleted + // 4. the key's last written value (to fast-track read of current version) + // If the location does not store a value, the key has never been written. + // + // How to use a versioned key-value store: + // + // db := NewKVStoreWithVersion(cfg) // creates a versioned DB + // db.Start(ctx) + // defer func() { db.Stop(ctx) }() + // + // kv := db.SetVersion(5) + // value, err := kv.Get("ns", key) // read 'key' at version 5 + // kv = db.SetVersion(8) + // err := kv.Put("ns", key, value) // write 'key' at version 8 + + KvVersioned interface { + lifecycle.StartStopper + + // Version returns the key's most recent version + Version(string, []byte) (uint64, error) + + // SetVersion sets the version, and returns a KVStore to call Put()/Get() + SetVersion(uint64) KVStore + } +) diff --git a/db/versionpb/version.pb.go b/db/versionpb/version.pb.go index e15c6531f5..9e2f1f0618 100644 --- a/db/versionpb/version.pb.go +++ b/db/versionpb/version.pb.go @@ -9,8 +9,8 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.28.1 -// protoc v3.21.12 +// protoc-gen-go v1.34.2 +// protoc v5.27.1 // source: version.proto package versionpb @@ -34,7 +34,6 @@ type VersionedNamespace struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` KeyLen uint32 `protobuf:"varint,2,opt,name=keyLen,proto3" json:"keyLen,omitempty"` } @@ -70,13 +69,6 @@ func (*VersionedNamespace) Descriptor() ([]byte, []int) { return file_version_proto_rawDescGZIP(), []int{0} } -func (x *VersionedNamespace) GetName() string { - if x != nil { - return x.Name - } - return "" -} - func (x *VersionedNamespace) GetKeyLen() uint32 { if x != nil { return x.KeyLen @@ -84,100 +76,18 @@ func (x *VersionedNamespace) GetKeyLen() uint32 { return 0 } -type KeyMeta struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - LastWriteHash []byte `protobuf:"bytes,1,opt,name=lastWriteHash,proto3" json:"lastWriteHash,omitempty"` // hash of value that was last written - FirstVersion uint64 `protobuf:"varint,2,opt,name=firstVersion,proto3" json:"firstVersion,omitempty"` - LastVersion uint64 `protobuf:"varint,3,opt,name=lastVersion,proto3" json:"lastVersion,omitempty"` - DeleteVersion uint64 `protobuf:"varint,4,opt,name=deleteVersion,proto3" json:"deleteVersion,omitempty"` -} - -func (x *KeyMeta) Reset() { - *x = KeyMeta{} - if protoimpl.UnsafeEnabled { - mi := &file_version_proto_msgTypes[1] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *KeyMeta) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*KeyMeta) ProtoMessage() {} - -func (x *KeyMeta) ProtoReflect() protoreflect.Message { - mi := &file_version_proto_msgTypes[1] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use KeyMeta.ProtoReflect.Descriptor instead. -func (*KeyMeta) Descriptor() ([]byte, []int) { - return file_version_proto_rawDescGZIP(), []int{1} -} - -func (x *KeyMeta) GetLastWriteHash() []byte { - if x != nil { - return x.LastWriteHash - } - return nil -} - -func (x *KeyMeta) GetFirstVersion() uint64 { - if x != nil { - return x.FirstVersion - } - return 0 -} - -func (x *KeyMeta) GetLastVersion() uint64 { - if x != nil { - return x.LastVersion - } - return 0 -} - -func (x *KeyMeta) GetDeleteVersion() uint64 { - if x != nil { - return x.DeleteVersion - } - return 0 -} - var File_version_proto protoreflect.FileDescriptor var file_version_proto_rawDesc = []byte{ 0x0a, 0x0d, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, - 0x09, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x70, 0x62, 0x22, 0x40, 0x0a, 0x12, 0x56, 0x65, + 0x09, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x70, 0x62, 0x22, 0x2c, 0x0a, 0x12, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x65, 0x64, 0x4e, 0x61, 0x6d, 0x65, 0x73, 0x70, 0x61, 0x63, 0x65, - 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, - 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x6b, 0x65, 0x79, 0x4c, 0x65, 0x6e, 0x18, 0x02, - 0x20, 0x01, 0x28, 0x0d, 0x52, 0x06, 0x6b, 0x65, 0x79, 0x4c, 0x65, 0x6e, 0x22, 0x9b, 0x01, 0x0a, - 0x07, 0x4b, 0x65, 0x79, 0x4d, 0x65, 0x74, 0x61, 0x12, 0x24, 0x0a, 0x0d, 0x6c, 0x61, 0x73, 0x74, - 0x57, 0x72, 0x69, 0x74, 0x65, 0x48, 0x61, 0x73, 0x68, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, - 0x0d, 0x6c, 0x61, 0x73, 0x74, 0x57, 0x72, 0x69, 0x74, 0x65, 0x48, 0x61, 0x73, 0x68, 0x12, 0x22, - 0x0a, 0x0c, 0x66, 0x69, 0x72, 0x73, 0x74, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x02, - 0x20, 0x01, 0x28, 0x04, 0x52, 0x0c, 0x66, 0x69, 0x72, 0x73, 0x74, 0x56, 0x65, 0x72, 0x73, 0x69, - 0x6f, 0x6e, 0x12, 0x20, 0x0a, 0x0b, 0x6c, 0x61, 0x73, 0x74, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, - 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0b, 0x6c, 0x61, 0x73, 0x74, 0x56, 0x65, 0x72, - 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x24, 0x0a, 0x0d, 0x64, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x56, 0x65, - 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x04, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0d, 0x64, 0x65, 0x6c, - 0x65, 0x74, 0x65, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x42, 0x31, 0x5a, 0x2f, 0x67, 0x69, - 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x69, 0x6f, 0x74, 0x65, 0x78, 0x70, 0x72, - 0x6f, 0x6a, 0x65, 0x63, 0x74, 0x2f, 0x69, 0x6f, 0x74, 0x65, 0x78, 0x2d, 0x63, 0x6f, 0x72, 0x65, - 0x2f, 0x64, 0x62, 0x2f, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x70, 0x62, 0x62, 0x06, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x12, 0x16, 0x0a, 0x06, 0x6b, 0x65, 0x79, 0x4c, 0x65, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0d, + 0x52, 0x06, 0x6b, 0x65, 0x79, 0x4c, 0x65, 0x6e, 0x42, 0x31, 0x5a, 0x2f, 0x67, 0x69, 0x74, 0x68, + 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x69, 0x6f, 0x74, 0x65, 0x78, 0x70, 0x72, 0x6f, 0x6a, + 0x65, 0x63, 0x74, 0x2f, 0x69, 0x6f, 0x74, 0x65, 0x78, 0x2d, 0x63, 0x6f, 0x72, 0x65, 0x2f, 0x64, + 0x62, 0x2f, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x33, } var ( @@ -192,10 +102,9 @@ func file_version_proto_rawDescGZIP() []byte { return file_version_proto_rawDescData } -var file_version_proto_msgTypes = make([]protoimpl.MessageInfo, 2) -var file_version_proto_goTypes = []interface{}{ +var file_version_proto_msgTypes = make([]protoimpl.MessageInfo, 1) +var file_version_proto_goTypes = []any{ (*VersionedNamespace)(nil), // 0: versionpb.VersionedNamespace - (*KeyMeta)(nil), // 1: versionpb.KeyMeta } var file_version_proto_depIdxs = []int32{ 0, // [0:0] is the sub-list for method output_type @@ -211,7 +120,7 @@ func file_version_proto_init() { return } if !protoimpl.UnsafeEnabled { - file_version_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + file_version_proto_msgTypes[0].Exporter = func(v any, i int) any { switch v := v.(*VersionedNamespace); i { case 0: return &v.state @@ -223,18 +132,6 @@ func file_version_proto_init() { return nil } } - file_version_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*KeyMeta); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } } type x struct{} out := protoimpl.TypeBuilder{ @@ -242,7 +139,7 @@ func file_version_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_version_proto_rawDesc, NumEnums: 0, - NumMessages: 2, + NumMessages: 1, NumExtensions: 0, NumServices: 0, }, diff --git a/db/versionpb/version.proto b/db/versionpb/version.proto index 69645bed67..b4a1c519d7 100644 --- a/db/versionpb/version.proto +++ b/db/versionpb/version.proto @@ -11,13 +11,5 @@ package versionpb; option go_package = "github.com/iotexproject/iotex-core/db/versionpb"; message VersionedNamespace { - string name = 1; uint32 keyLen = 2; } - -message KeyMeta { - bytes lastWriteHash = 1; // hash of value that was last written - uint64 firstVersion = 2; - uint64 lastVersion = 3; - uint64 deleteVersion = 4; -}