From c996f20a5de58b27b5ee5e1b13e90e4aeb50d745 Mon Sep 17 00:00:00 2001 From: WenbinZhu Date: Sun, 14 Feb 2021 12:15:46 -0800 Subject: [PATCH] Finish lab 3A: key/value service without log compaction. --- README.md | 5 +- src/kvraft/client.go | 62 +++++++++++++++++-- src/kvraft/common.go | 4 ++ src/kvraft/server.go | 143 +++++++++++++++++++++++++++++++++++++++++-- 4 files changed, 204 insertions(+), 10 deletions(-) diff --git a/README.md b/README.md index b18dc73..10bb183 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ ### (Updated to Spring 2020 Course Labs) -Course website: https://pdos.csail.mit.edu/6.824/schedule.html +Course website: http://nil.csail.mit.edu/6.824/2020/schedule.html - [x] Lab 1: MapReduce @@ -10,6 +10,9 @@ Course website: https://pdos.csail.mit.edu/6.824/schedule.html - [x] Lab 2A: Raft Leader Election - [x] Lab 2B: Raft Log Entries Append - [x] Lab 2C: Raft state persistence + - [ ] Lab 3: Fault-tolerant Key/Value Service + - [x] Lab 3A: Key/value Service Without Log Compaction + - [ ] Lab 3B: Key/value Service With Log Compaction - [ ] Lab 4: Sharded Key/Value Service diff --git a/src/kvraft/client.go b/src/kvraft/client.go index 527f95a..2ad7320 100644 --- a/src/kvraft/client.go +++ b/src/kvraft/client.go @@ -1,13 +1,18 @@ package kvraft -import "../labrpc" +import ( + "../labrpc" + "sync/atomic" +) import "crypto/rand" import "math/big" - type Clerk struct { servers []*labrpc.ClientEnd // You will have to modify this struct. + leaderId int32 + clientId int64 + requestId int64 } func nrand() int64 { @@ -21,6 +26,9 @@ func MakeClerk(servers []*labrpc.ClientEnd) *Clerk { ck := new(Clerk) ck.servers = servers // You'll have to add code here. + ck.leaderId = 0 + ck.clientId = nrand() + ck.requestId = 0 return ck } @@ -37,9 +45,33 @@ func MakeClerk(servers []*labrpc.ClientEnd) *Clerk { // arguments. and reply must be passed as a pointer. // func (ck *Clerk) Get(key string) string { - // You will have to modify this function. - return "" + reqId := atomic.AddInt64(&ck.requestId, 1) + leader := atomic.LoadInt32(&ck.leaderId) + + args := GetArgs{ + Key: key, + ClientId: ck.clientId, + RequestId: reqId, + } + + value := "" + server := leader + for ; ; server = (server + 1) % int32(len(ck.servers)) { + reply := GetReply{} + ok := ck.servers[server].Call("KVServer.Get", &args, &reply) + + if ok && reply.Err != ErrWrongLeader { + if reply.Err == OK { + value = reply.Value + } + break + } + } + + atomic.StoreInt32(&ck.leaderId, server) + + return value } // @@ -54,6 +86,28 @@ func (ck *Clerk) Get(key string) string { // func (ck *Clerk) PutAppend(key string, value string, op string) { // You will have to modify this function. + reqId := atomic.AddInt64(&ck.requestId, 1) + leader := atomic.LoadInt32(&ck.leaderId) + + args := PutAppendArgs{ + Key: key, + Value: value, + Op: op, + ClientId: ck.clientId, + RequestId: reqId, + } + + server := leader + for ; ; server = (server + 1) % int32(len(ck.servers)) { + reply := PutAppendReply{} + ok := ck.servers[server].Call("KVServer.PutAppend", &args, &reply) + + if ok && reply.Err != ErrWrongLeader { + break + } + } + + atomic.StoreInt32(&ck.leaderId, server) } func (ck *Clerk) Put(key string, value string) { diff --git a/src/kvraft/common.go b/src/kvraft/common.go index e5ee442..4558c95 100644 --- a/src/kvraft/common.go +++ b/src/kvraft/common.go @@ -16,6 +16,8 @@ type PutAppendArgs struct { // You'll have to add definitions here. // Field names must start with capital letters, // otherwise RPC will break. + ClientId int64 + RequestId int64 } type PutAppendReply struct { @@ -25,6 +27,8 @@ type PutAppendReply struct { type GetArgs struct { Key string // You'll have to add definitions here. + ClientId int64 + RequestId int64 } type GetReply struct { diff --git a/src/kvraft/server.go b/src/kvraft/server.go index 290a564..5ec482f 100644 --- a/src/kvraft/server.go +++ b/src/kvraft/server.go @@ -3,10 +3,11 @@ package kvraft import ( "../labgob" "../labrpc" - "log" "../raft" + "log" "sync" "sync/atomic" + "time" ) const Debug = 0 @@ -18,11 +19,17 @@ func DPrintf(format string, a ...interface{}) (n int, err error) { return } - type Op struct { // Your definitions here. // Field names must start with capital letters, // otherwise RPC will break. + Type string + Key string + Value string + // duplicate detection info needs to be part of state machine + // so that all raft servers eliminate the same duplicates + ClientId int64 + RequestId int64 } type KVServer struct { @@ -35,15 +42,138 @@ type KVServer struct { maxraftstate int // snapshot if log grows this big // Your definitions here. + store map[string]string + results map[int]chan Op + lastApplied map[int64]int64 } - +// +// Get RPC handler +// func (kv *KVServer) Get(args *GetArgs, reply *GetReply) { // Your code here. + op := Op{ + Type: "Get", + Key: args.Key, + ClientId: args.ClientId, + RequestId: args.RequestId, + } + + ok, appliedOp := kv.waitForApplied(op) + if !ok { + reply.Err = ErrWrongLeader + return + } + + reply.Err = OK + reply.Value = appliedOp.Value } +// +// PutAppend RPC handler +// func (kv *KVServer) PutAppend(args *PutAppendArgs, reply *PutAppendReply) { // Your code here. + op := Op{ + Type: args.Op, + Key: args.Key, + Value: args.Value, + ClientId: args.ClientId, + RequestId: args.RequestId, + } + + ok, _ := kv.waitForApplied(op) + if !ok { + reply.Err = ErrWrongLeader + return + } + + reply.Err = OK +} + +// +// send the op log to Raft library and wait for it to be applied +// +func (kv *KVServer) waitForApplied(op Op) (bool, Op) { + index, _, isLeader := kv.rf.Start(op) + + if !isLeader { + return false, op + } + + kv.mu.Lock() + opCh, ok := kv.results[index] + if !ok { + opCh = make(chan Op, 1) + kv.results[index] = opCh + } + kv.mu.Unlock() + + select { + case appliedOp := <-opCh: + return kv.isSameOp(op, appliedOp), appliedOp + case <-time.After(600 * time.Millisecond): + return false, op + } +} + +// +// check if the issued command is the same as the applied command +// +func (kv *KVServer) isSameOp(issued Op, applied Op) bool { + return issued.ClientId == applied.ClientId && + issued.RequestId == applied.RequestId +} + +// +// background loop to receive the logs committed by the Raft +// library and apply them to the kv server state machine +// +func (kv *KVServer) applyOpsLoop() { + for { + msg := <-kv.applyCh + if !msg.CommandValid { + continue + } + index := msg.CommandIndex + op := msg.Command.(Op) + + kv.mu.Lock() + + if op.Type == "Get" { + kv.applyToStateMachine(&op) + } else { + lastId, ok := kv.lastApplied[op.ClientId] + if !ok || op.RequestId > lastId { + kv.applyToStateMachine(&op) + kv.lastApplied[op.ClientId] = op.RequestId + } + } + + opCh, ok := kv.results[index] + if !ok { + opCh = make(chan Op, 1) + kv.results[index] = opCh + } + opCh <- op + + kv.mu.Unlock() + } +} + +// +// applied the command to the state machine +// lock must be held before calling this +// +func (kv *KVServer) applyToStateMachine(op *Op) { + switch op.Type { + case "Get": + op.Value = kv.store[op.Key] + case "Put": + kv.store[op.Key] = op.Value + case "Append": + kv.store[op.Key] += op.Value + } } // @@ -91,11 +221,14 @@ func StartKVServer(servers []*labrpc.ClientEnd, me int, persister *raft.Persiste kv.maxraftstate = maxraftstate // You may need initialization code here. - - kv.applyCh = make(chan raft.ApplyMsg) + kv.applyCh = make(chan raft.ApplyMsg, 1) kv.rf = raft.Make(servers, me, persister, kv.applyCh) + kv.store = make(map[string]string) + kv.results = make(map[int]chan Op) + kv.lastApplied = make(map[int64]int64) // You may need initialization code here. + go kv.applyOpsLoop() return kv }