From b84ba4388cda5db059d3a1b23771fc00a849117b Mon Sep 17 00:00:00 2001 From: Kinshuk Bairagi Date: Mon, 13 Jun 2022 21:04:03 +0530 Subject: [PATCH 1/3] Implement GetLastAppliedEntry/SaveEntry --- internal/sync/repl.go | 49 ++++++++++++++++++++++++++++++++++++------- 1 file changed, 42 insertions(+), 7 deletions(-) diff --git a/internal/sync/repl.go b/internal/sync/repl.go index 05f5a23e..80ece771 100644 --- a/internal/sync/repl.go +++ b/internal/sync/repl.go @@ -24,23 +24,58 @@ func NewDKVReplStore(kvs storage.KVStore) db.Store { return &dkvReplStore{kvs} } -func (dr *dkvReplStore) Save(_ db.RaftEntry, req []byte) ([]byte, error) { +func (dr *dkvReplStore) Save(ent db.RaftEntry, req []byte) (res []byte, err error) { intReq := new(raftpb.InternalRaftRequest) - if err := proto.Unmarshal(req, intReq); err != nil { + if err = proto.Unmarshal(req, intReq); err != nil { return nil, err } switch { case intReq.Put != nil: - return dr.put(intReq.Put) + res, err = dr.put(intReq.Put) case intReq.MultiPut != nil: - return dr.multiPut(intReq.MultiPut) + res, err = dr.multiPut(intReq.MultiPut) case intReq.Delete != nil: - return dr.delete(intReq.Delete) + res, err = dr.delete(intReq.Delete) case intReq.Cas != nil: - return dr.cas(intReq.Cas) + res, err = dr.cas(intReq.Cas) default: - return nil, errors.New("Unknown Save request in dkv") + res, err = nil, errors.New("Unknown Save request in dkv") + } + + // It is perhaps fine to save RAFT entry after + // the main operation. The alternative involves + // modifying each of the above mutations to save + // the RAFT entry, which may not be possible. + if err == nil { + err = dr.saveEntry(ent) + } + return +} + +const ( + raftMeta = "__$dkv_meta::RAFT_STATE" + raftMetaDelim = ':' +) + +func (dr *dkvReplStore) saveEntry(ent db.RaftEntry) error { + _, err := dr.put(&serverpb.PutRequest{ + Key: []byte(raftMeta), + Value: []byte(fmt.Sprintf("%d%c%d", ent.Term, raftMetaDelim, ent.Index)), + }) + return err +} + +func (dr *dkvReplStore) GetLastAppliedEntry() (db.RaftEntry, error) { + vals, err := dr.kvs.Get([]byte(raftMeta)) + if err != nil || vals == nil || len(vals) == 0 { + return db.RaftEntry{}, errors.New("no RAFT metadata found") } + raftMetaStr := string(vals[0].Value) + idx := strings.IndexRune(raftMetaStr, raftMetaDelim) + term, _ := strconv.ParseUint(raftMetaStr[:idx], 10, 64) + index, _ := strconv.ParseUint(raftMetaStr[idx+1:], 10, 64) + + return db.RaftEntry{Term: term, Index: index}, nil } func (dr *dkvReplStore) Load(req []byte) ([]byte, error) { From 78ac9ce5536ad16d555187db1399b215ec736848 Mon Sep 17 00:00:00 2001 From: Kinshuk Bairagi Date: Mon, 13 Jun 2022 22:46:01 +0530 Subject: [PATCH 2/3] Update repl_test.go --- internal/sync/repl_test.go | 39 ++++++++++++++++++++++++++++---------- 1 file changed, 29 insertions(+), 10 deletions(-) diff --git a/internal/sync/repl_test.go b/internal/sync/repl_test.go index e865bed7..9dff0985 100644 --- a/internal/sync/repl_test.go +++ b/internal/sync/repl_test.go @@ -4,6 +4,7 @@ import ( "bytes" "encoding/gob" "errors" + "fmt" "io" "io/ioutil" "sync" @@ -21,9 +22,10 @@ func TestDKVReplStoreSave(t *testing.T) { kvs := newMemStore() dkvRepl := NewDKVReplStore(kvs) - testPut(t, kvs, dkvRepl, []byte("foo"), []byte("bar")) - testPut(t, kvs, dkvRepl, []byte("hello"), []byte("world")) - testPut(t, kvs, dkvRepl, []byte("kit"), []byte("kat")) + term, index := uint64(1), uint64(1) + testPut(t, kvs, dkvRepl, []byte("foo"), []byte("bar"), term, index) + testPut(t, kvs, dkvRepl, []byte("hello"), []byte("world"), term, index+1) + testPut(t, kvs, dkvRepl, []byte("kit"), []byte("kat"), term, index+2) testGet(t, kvs, dkvRepl, []byte("foo")) testGet(t, kvs, dkvRepl, []byte("hello")) @@ -31,9 +33,9 @@ func TestDKVReplStoreSave(t *testing.T) { testMultiGet(t, kvs, dkvRepl, []byte("foo"), []byte("hello"), []byte("kit")) - testDelete(t, kvs, dkvRepl, []byte("foo")) - testDelete(t, kvs, dkvRepl, []byte("hello")) - testDelete(t, kvs, dkvRepl, []byte("kit")) + testDelete(t, kvs, dkvRepl, []byte("foo"), term, index+3) + testDelete(t, kvs, dkvRepl, []byte("hello"), term, index+4) + testDelete(t, kvs, dkvRepl, []byte("kit"), term, index+5) } func TestDKVReplStoreClose(t *testing.T) { @@ -46,13 +48,13 @@ func TestDKVReplStoreClose(t *testing.T) { } } -func testPut(t *testing.T, kvs *memStore, dkvRepl db.Store, key, val []byte) { +func testPut(t *testing.T, kvs *memStore, dkvRepl db.Store, key, val []byte, term, index uint64) { intReq := new(raftpb.InternalRaftRequest) intReq.Put = &serverpb.PutRequest{Key: key, Value: val} if reqBts, err := proto.Marshal(intReq); err != nil { t.Error(err) } else { - if _, err := dkvRepl.Save(db.RaftEntry{}, reqBts); err != nil { + if _, err := dkvRepl.Save(db.RaftEntry{term, index}, reqBts); err != nil { t.Error(err) } else { if res, err := kvs.Get(key); err != nil { @@ -60,23 +62,36 @@ func testPut(t *testing.T, kvs *memStore, dkvRepl db.Store, key, val []byte) { } else if string(res[0].Value) != string(val) { t.Errorf("Value mismatch for key: %s. Expected: %s, Actual: %s", key, val, res[0].Value) } + checkRAFTEntry(t, kvs, term, index) } } } -func testDelete(t *testing.T, kvs *memStore, dkvRepl db.Store, key []byte) { +func testDelete(t *testing.T, kvs *memStore, dkvRepl db.Store, key []byte, term, index uint64) { intReq := new(raftpb.InternalRaftRequest) intReq.Delete = &serverpb.DeleteRequest{Key: key} if reqBts, err := proto.Marshal(intReq); err != nil { t.Error(err) } else { - if _, err := dkvRepl.Save(db.RaftEntry{}, reqBts); err != nil { + if _, err := dkvRepl.Save(db.RaftEntry{term, index}, reqBts); err != nil { t.Error(err) } else { if _, err := kvs.Get(key); err.Error() != "Given key not found" { t.Error(err) } } + checkRAFTEntry(t, kvs, term, index) + } +} + +func checkRAFTEntry(t *testing.T, kvs *memStore, term, index uint64) { + if res, err := kvs.Get([]byte(raftMeta)); err != nil { + t.Error(err) + } else { + exp := fmt.Sprintf("%d%c%d", term, raftMetaDelim, index) + if string(res[0].Value) != exp { + t.Errorf("Mismatch in RAFT entry. Expected: %s, Actual: %s", exp, res[0].Value) + } } } @@ -154,6 +169,10 @@ func (ms *memStore) Put(pairs ...*serverpb.KVPair) error { for _, kv := range pairs { storeKey := string(kv.Key) + if storeKey == raftMeta { + ms.store[storeKey] = memStoreObject{kv.Value, 0} + continue + } if _, present := ms.store[storeKey]; present { return errors.New("given key already exists") } From ac9e47348c7c56fcc1784a15990230eadd1ace96 Mon Sep 17 00:00:00 2001 From: Kinshuk Bairagi Date: Mon, 13 Jun 2022 23:18:22 +0530 Subject: [PATCH 3/3] Update repl.go --- internal/sync/repl.go | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/internal/sync/repl.go b/internal/sync/repl.go index 80ece771..e9f5dbd6 100644 --- a/internal/sync/repl.go +++ b/internal/sync/repl.go @@ -4,8 +4,11 @@ import ( "bytes" "encoding/gob" "errors" + "fmt" "github.com/golang/protobuf/proto" "io" + "strconv" + "strings" "github.com/flipkart-incubator/dkv/internal/storage" "github.com/flipkart-incubator/dkv/internal/sync/raftpb" @@ -149,11 +152,6 @@ func (dr *dkvReplStore) Close() error { return dr.kvs.Close() } -// TODO: implement this correctly -func (dr *dkvReplStore) GetLastAppliedEntry() (db.RaftEntry, error) { - return db.RaftEntry{}, errors.New("not implemented") -} - func (dr *dkvReplStore) Backup(_ db.SnapshotState) (io.ReadCloser, error) { return dr.kvs.GetSnapshot() }