Skip to content

Commit

Permalink
Finish lab 3A: key/value service without log compaction.
Browse files Browse the repository at this point in the history
  • Loading branch information
WenbinZhu committed Feb 14, 2021
1 parent 9e09f0b commit c996f20
Show file tree
Hide file tree
Showing 4 changed files with 204 additions and 10 deletions.
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,17 @@

### (Updated to Spring 2020 Course Labs)

Course website: https://pdos.csail.mit.edu/6.824/schedule.html
Course website: http://nil.csail.mit.edu/6.824/2020/schedule.html

- [x] Lab 1: MapReduce

- [x] Lab 2: Raft Consensus Algorithm
- [x] Lab 2A: Raft Leader Election
- [x] Lab 2B: Raft Log Entries Append
- [x] Lab 2C: Raft state persistence

- [ ] Lab 3: Fault-tolerant Key/Value Service
- [x] Lab 3A: Key/value Service Without Log Compaction
- [ ] Lab 3B: Key/value Service With Log Compaction

- [ ] Lab 4: Sharded Key/Value Service
62 changes: 58 additions & 4 deletions src/kvraft/client.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
package kvraft

import "../labrpc"
import (
"../labrpc"
"sync/atomic"
)
import "crypto/rand"
import "math/big"


type Clerk struct {
servers []*labrpc.ClientEnd
// You will have to modify this struct.
leaderId int32
clientId int64
requestId int64
}

func nrand() int64 {
Expand All @@ -21,6 +26,9 @@ func MakeClerk(servers []*labrpc.ClientEnd) *Clerk {
ck := new(Clerk)
ck.servers = servers
// You'll have to add code here.
ck.leaderId = 0
ck.clientId = nrand()
ck.requestId = 0
return ck
}

Expand All @@ -37,9 +45,33 @@ func MakeClerk(servers []*labrpc.ClientEnd) *Clerk {
// arguments. and reply must be passed as a pointer.
//
func (ck *Clerk) Get(key string) string {

// You will have to modify this function.
return ""
reqId := atomic.AddInt64(&ck.requestId, 1)
leader := atomic.LoadInt32(&ck.leaderId)

args := GetArgs{
Key: key,
ClientId: ck.clientId,
RequestId: reqId,
}

value := ""
server := leader
for ; ; server = (server + 1) % int32(len(ck.servers)) {
reply := GetReply{}
ok := ck.servers[server].Call("KVServer.Get", &args, &reply)

if ok && reply.Err != ErrWrongLeader {
if reply.Err == OK {
value = reply.Value
}
break
}
}

atomic.StoreInt32(&ck.leaderId, server)

return value
}

//
Expand All @@ -54,6 +86,28 @@ func (ck *Clerk) Get(key string) string {
//
func (ck *Clerk) PutAppend(key string, value string, op string) {
// You will have to modify this function.
reqId := atomic.AddInt64(&ck.requestId, 1)
leader := atomic.LoadInt32(&ck.leaderId)

args := PutAppendArgs{
Key: key,
Value: value,
Op: op,
ClientId: ck.clientId,
RequestId: reqId,
}

server := leader
for ; ; server = (server + 1) % int32(len(ck.servers)) {
reply := PutAppendReply{}
ok := ck.servers[server].Call("KVServer.PutAppend", &args, &reply)

if ok && reply.Err != ErrWrongLeader {
break
}
}

atomic.StoreInt32(&ck.leaderId, server)
}

func (ck *Clerk) Put(key string, value string) {
Expand Down
4 changes: 4 additions & 0 deletions src/kvraft/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ type PutAppendArgs struct {
// You'll have to add definitions here.
// Field names must start with capital letters,
// otherwise RPC will break.
ClientId int64
RequestId int64
}

type PutAppendReply struct {
Expand All @@ -25,6 +27,8 @@ type PutAppendReply struct {
type GetArgs struct {
Key string
// You'll have to add definitions here.
ClientId int64
RequestId int64
}

type GetReply struct {
Expand Down
143 changes: 138 additions & 5 deletions src/kvraft/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ package kvraft
import (
"../labgob"
"../labrpc"
"log"
"../raft"
"log"
"sync"
"sync/atomic"
"time"
)

const Debug = 0
Expand All @@ -18,11 +19,17 @@ func DPrintf(format string, a ...interface{}) (n int, err error) {
return
}


type Op struct {
// Your definitions here.
// Field names must start with capital letters,
// otherwise RPC will break.
Type string
Key string
Value string
// duplicate detection info needs to be part of state machine
// so that all raft servers eliminate the same duplicates
ClientId int64
RequestId int64
}

type KVServer struct {
Expand All @@ -35,15 +42,138 @@ type KVServer struct {
maxraftstate int // snapshot if log grows this big

// Your definitions here.
store map[string]string
results map[int]chan Op
lastApplied map[int64]int64
}


//
// Get RPC handler
//
func (kv *KVServer) Get(args *GetArgs, reply *GetReply) {
// Your code here.
op := Op{
Type: "Get",
Key: args.Key,
ClientId: args.ClientId,
RequestId: args.RequestId,
}

ok, appliedOp := kv.waitForApplied(op)
if !ok {
reply.Err = ErrWrongLeader
return
}

reply.Err = OK
reply.Value = appliedOp.Value
}

//
// PutAppend RPC handler
//
func (kv *KVServer) PutAppend(args *PutAppendArgs, reply *PutAppendReply) {
// Your code here.
op := Op{
Type: args.Op,
Key: args.Key,
Value: args.Value,
ClientId: args.ClientId,
RequestId: args.RequestId,
}

ok, _ := kv.waitForApplied(op)
if !ok {
reply.Err = ErrWrongLeader
return
}

reply.Err = OK
}

//
// send the op log to Raft library and wait for it to be applied
//
func (kv *KVServer) waitForApplied(op Op) (bool, Op) {
index, _, isLeader := kv.rf.Start(op)

if !isLeader {
return false, op
}

kv.mu.Lock()
opCh, ok := kv.results[index]
if !ok {
opCh = make(chan Op, 1)
kv.results[index] = opCh
}
kv.mu.Unlock()

select {
case appliedOp := <-opCh:
return kv.isSameOp(op, appliedOp), appliedOp
case <-time.After(600 * time.Millisecond):
return false, op
}
}

//
// check if the issued command is the same as the applied command
//
func (kv *KVServer) isSameOp(issued Op, applied Op) bool {
return issued.ClientId == applied.ClientId &&
issued.RequestId == applied.RequestId
}

//
// background loop to receive the logs committed by the Raft
// library and apply them to the kv server state machine
//
func (kv *KVServer) applyOpsLoop() {
for {
msg := <-kv.applyCh
if !msg.CommandValid {
continue
}
index := msg.CommandIndex
op := msg.Command.(Op)

kv.mu.Lock()

if op.Type == "Get" {
kv.applyToStateMachine(&op)
} else {
lastId, ok := kv.lastApplied[op.ClientId]
if !ok || op.RequestId > lastId {
kv.applyToStateMachine(&op)
kv.lastApplied[op.ClientId] = op.RequestId
}
}

opCh, ok := kv.results[index]
if !ok {
opCh = make(chan Op, 1)
kv.results[index] = opCh
}
opCh <- op

kv.mu.Unlock()
}
}

//
// applied the command to the state machine
// lock must be held before calling this
//
func (kv *KVServer) applyToStateMachine(op *Op) {
switch op.Type {
case "Get":
op.Value = kv.store[op.Key]
case "Put":
kv.store[op.Key] = op.Value
case "Append":
kv.store[op.Key] += op.Value
}
}

//
Expand Down Expand Up @@ -91,11 +221,14 @@ func StartKVServer(servers []*labrpc.ClientEnd, me int, persister *raft.Persiste
kv.maxraftstate = maxraftstate

// You may need initialization code here.

kv.applyCh = make(chan raft.ApplyMsg)
kv.applyCh = make(chan raft.ApplyMsg, 1)
kv.rf = raft.Make(servers, me, persister, kv.applyCh)
kv.store = make(map[string]string)
kv.results = make(map[int]chan Op)
kv.lastApplied = make(map[int64]int64)

// You may need initialization code here.
go kv.applyOpsLoop()

return kv
}

0 comments on commit c996f20

Please sign in to comment.