Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement GetLastAppliedEntry/SaveEntry #149

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 45 additions & 12 deletions internal/sync/repl.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@ import (
"bytes"
"encoding/gob"
"errors"
"fmt"
"github.com/golang/protobuf/proto"
"io"
"strconv"
"strings"

"github.com/flipkart-incubator/dkv/internal/storage"
"github.com/flipkart-incubator/dkv/internal/sync/raftpb"
Expand All @@ -24,23 +27,58 @@ func NewDKVReplStore(kvs storage.KVStore) db.Store {
return &dkvReplStore{kvs}
}

func (dr *dkvReplStore) Save(_ db.RaftEntry, req []byte) ([]byte, error) {
func (dr *dkvReplStore) Save(ent db.RaftEntry, req []byte) (res []byte, err error) {
intReq := new(raftpb.InternalRaftRequest)
if err := proto.Unmarshal(req, intReq); err != nil {
if err = proto.Unmarshal(req, intReq); err != nil {
return nil, err
}
switch {
case intReq.Put != nil:
return dr.put(intReq.Put)
res, err = dr.put(intReq.Put)
case intReq.MultiPut != nil:
return dr.multiPut(intReq.MultiPut)
res, err = dr.multiPut(intReq.MultiPut)
case intReq.Delete != nil:
return dr.delete(intReq.Delete)
res, err = dr.delete(intReq.Delete)
case intReq.Cas != nil:
return dr.cas(intReq.Cas)
res, err = dr.cas(intReq.Cas)
default:
return nil, errors.New("Unknown Save request in dkv")
res, err = nil, errors.New("Unknown Save request in dkv")
}

// It is perhaps fine to save RAFT entry after
// the main operation. The alternative involves
// modifying each of the above mutations to save
// the RAFT entry, which may not be possible.
if err == nil {
err = dr.saveEntry(ent)
}
return
}

const (
raftMeta = "__$dkv_meta::RAFT_STATE"
raftMetaDelim = ':'
)

func (dr *dkvReplStore) saveEntry(ent db.RaftEntry) error {
_, err := dr.put(&serverpb.PutRequest{
Key: []byte(raftMeta),
Value: []byte(fmt.Sprintf("%d%c%d", ent.Term, raftMetaDelim, ent.Index)),
})
return err
}

func (dr *dkvReplStore) GetLastAppliedEntry() (db.RaftEntry, error) {
vals, err := dr.kvs.Get([]byte(raftMeta))
if err != nil || vals == nil || len(vals) == 0 {
return db.RaftEntry{}, errors.New("no RAFT metadata found")
}
raftMetaStr := string(vals[0].Value)
idx := strings.IndexRune(raftMetaStr, raftMetaDelim)
term, _ := strconv.ParseUint(raftMetaStr[:idx], 10, 64)
index, _ := strconv.ParseUint(raftMetaStr[idx+1:], 10, 64)

return db.RaftEntry{Term: term, Index: index}, nil
}

func (dr *dkvReplStore) Load(req []byte) ([]byte, error) {
Expand Down Expand Up @@ -114,11 +152,6 @@ func (dr *dkvReplStore) Close() error {
return dr.kvs.Close()
}

// TODO: implement this correctly
func (dr *dkvReplStore) GetLastAppliedEntry() (db.RaftEntry, error) {
return db.RaftEntry{}, errors.New("not implemented")
}

func (dr *dkvReplStore) Backup(_ db.SnapshotState) (io.ReadCloser, error) {
return dr.kvs.GetSnapshot()
}
Expand Down
39 changes: 29 additions & 10 deletions internal/sync/repl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"encoding/gob"
"errors"
"fmt"
"io"
"io/ioutil"
"sync"
Expand All @@ -21,19 +22,20 @@ func TestDKVReplStoreSave(t *testing.T) {
kvs := newMemStore()
dkvRepl := NewDKVReplStore(kvs)

testPut(t, kvs, dkvRepl, []byte("foo"), []byte("bar"))
testPut(t, kvs, dkvRepl, []byte("hello"), []byte("world"))
testPut(t, kvs, dkvRepl, []byte("kit"), []byte("kat"))
term, index := uint64(1), uint64(1)
testPut(t, kvs, dkvRepl, []byte("foo"), []byte("bar"), term, index)
testPut(t, kvs, dkvRepl, []byte("hello"), []byte("world"), term, index+1)
testPut(t, kvs, dkvRepl, []byte("kit"), []byte("kat"), term, index+2)

testGet(t, kvs, dkvRepl, []byte("foo"))
testGet(t, kvs, dkvRepl, []byte("hello"))
testGet(t, kvs, dkvRepl, []byte("kit"))

testMultiGet(t, kvs, dkvRepl, []byte("foo"), []byte("hello"), []byte("kit"))

testDelete(t, kvs, dkvRepl, []byte("foo"))
testDelete(t, kvs, dkvRepl, []byte("hello"))
testDelete(t, kvs, dkvRepl, []byte("kit"))
testDelete(t, kvs, dkvRepl, []byte("foo"), term, index+3)
testDelete(t, kvs, dkvRepl, []byte("hello"), term, index+4)
testDelete(t, kvs, dkvRepl, []byte("kit"), term, index+5)
}

func TestDKVReplStoreClose(t *testing.T) {
Expand All @@ -46,37 +48,50 @@ func TestDKVReplStoreClose(t *testing.T) {
}
}

func testPut(t *testing.T, kvs *memStore, dkvRepl db.Store, key, val []byte) {
func testPut(t *testing.T, kvs *memStore, dkvRepl db.Store, key, val []byte, term, index uint64) {
intReq := new(raftpb.InternalRaftRequest)
intReq.Put = &serverpb.PutRequest{Key: key, Value: val}
if reqBts, err := proto.Marshal(intReq); err != nil {
t.Error(err)
} else {
if _, err := dkvRepl.Save(db.RaftEntry{}, reqBts); err != nil {
if _, err := dkvRepl.Save(db.RaftEntry{term, index}, reqBts); err != nil {
t.Error(err)
} else {
if res, err := kvs.Get(key); err != nil {
t.Error(err)
} else if string(res[0].Value) != string(val) {
t.Errorf("Value mismatch for key: %s. Expected: %s, Actual: %s", key, val, res[0].Value)
}
checkRAFTEntry(t, kvs, term, index)
}
}
}

func testDelete(t *testing.T, kvs *memStore, dkvRepl db.Store, key []byte) {
func testDelete(t *testing.T, kvs *memStore, dkvRepl db.Store, key []byte, term, index uint64) {
intReq := new(raftpb.InternalRaftRequest)
intReq.Delete = &serverpb.DeleteRequest{Key: key}
if reqBts, err := proto.Marshal(intReq); err != nil {
t.Error(err)
} else {
if _, err := dkvRepl.Save(db.RaftEntry{}, reqBts); err != nil {
if _, err := dkvRepl.Save(db.RaftEntry{term, index}, reqBts); err != nil {
t.Error(err)
} else {
if _, err := kvs.Get(key); err.Error() != "Given key not found" {
t.Error(err)
}
}
checkRAFTEntry(t, kvs, term, index)
}
}

func checkRAFTEntry(t *testing.T, kvs *memStore, term, index uint64) {
if res, err := kvs.Get([]byte(raftMeta)); err != nil {
t.Error(err)
} else {
exp := fmt.Sprintf("%d%c%d", term, raftMetaDelim, index)
if string(res[0].Value) != exp {
t.Errorf("Mismatch in RAFT entry. Expected: %s, Actual: %s", exp, res[0].Value)
}
}
}

Expand Down Expand Up @@ -154,6 +169,10 @@ func (ms *memStore) Put(pairs ...*serverpb.KVPair) error {

for _, kv := range pairs {
storeKey := string(kv.Key)
if storeKey == raftMeta {
ms.store[storeKey] = memStoreObject{kv.Value, 0}
continue
}
if _, present := ms.store[storeKey]; present {
return errors.New("given key already exists")
}
Expand Down