From 9e09f0b36c18a8304c192825414dd53a5adeb7b4 Mon Sep 17 00:00:00 2001 From: WenbinZhu Date: Tue, 9 Feb 2021 00:02:10 -0800 Subject: [PATCH] Finish lab 2C: Raft state persistence. --- README.md | 4 +-- src/raft/raft.go | 82 +++++++++++++++++++++++++----------------------- 2 files changed, 44 insertions(+), 42 deletions(-) diff --git a/README.md b/README.md index 63b9c3b..b18dc73 100644 --- a/README.md +++ b/README.md @@ -6,10 +6,10 @@ Course website: https://pdos.csail.mit.edu/6.824/schedule.html - [x] Lab 1: MapReduce -- [ ] Lab 2: Raft Consensus Algorithm +- [x] Lab 2: Raft Consensus Algorithm - [x] Lab 2A: Raft Leader Election - [x] Lab 2B: Raft Log Entries Append - - [ ] Lab 2C: Raft state persistence + - [x] Lab 2C: Raft state persistence - [ ] Lab 3: Fault-tolerant Key/Value Service - [ ] Lab 4: Sharded Key/Value Service diff --git a/src/raft/raft.go b/src/raft/raft.go index c13adde..37d6254 100644 --- a/src/raft/raft.go +++ b/src/raft/raft.go @@ -18,16 +18,15 @@ package raft // import ( + "../labgob" "../labrpc" + "bytes" "math/rand" "sync" "sync/atomic" "time" ) -// import "bytes" -// import "../labgob" - // // as each Raft peer becomes aware that successive log entries are // committed, the peer should send an ApplyMsg to the service (or @@ -111,38 +110,35 @@ func (rf *Raft) GetState() (int, bool) { // save Raft's persistent state to stable storage, // where it can later be retrieved after a crash and restart. // see paper's Figure 2 for a description of what should be persistent. +// lock must be held before calling this. // func (rf *Raft) persist() { // Your code here (2C). - // Example: - // w := new(bytes.Buffer) - // e := labgob.NewEncoder(w) - // e.Encode(rf.xxx) - // e.Encode(rf.yyy) - // data := w.Bytes() - // rf.persister.SaveRaftState(data) + w := new(bytes.Buffer) + e := labgob.NewEncoder(w) + if e.Encode(rf.currentTerm) != nil || + e.Encode(rf.votedFor) != nil || e.Encode(rf.logs) != nil { + panic("failed to encode raft persistent state") + } + data := w.Bytes() + rf.persister.SaveRaftState(data) } // // restore previously persisted state. // func (rf *Raft) readPersist(data []byte) { - if data == nil || len(data) < 1 { // bootstrap without any state? + // bootstrap without any state? + if data == nil || len(data) < 1 { return } // Your code here (2C). - // Example: - // r := bytes.NewBuffer(data) - // d := labgob.NewDecoder(r) - // var xxx - // var yyy - // if d.Decode(&xxx) != nil || - // d.Decode(&yyy) != nil { - // error... - // } else { - // rf.xxx = xxx - // rf.yyy = yyy - // } + r := bytes.NewBuffer(data) + d := labgob.NewDecoder(r) + if d.Decode(&rf.currentTerm) != nil || + d.Decode(&rf.votedFor) != nil || d.Decode(&rf.logs) != nil { + panic("failed to decode raft persistent state") + } } // @@ -254,6 +250,23 @@ func (rf *Raft) isLogUpToDate(cLastIndex int, cLastTerm int) bool { return cLastTerm > myLastTerm } +// +// apply the committed logs. +// +func (rf *Raft) applyLogs() { + rf.mu.Lock() + defer rf.mu.Unlock() + + for i := rf.lastApplied + 1; i <= rf.commitIndex; i++ { + rf.applyCh <- ApplyMsg{ + CommandValid: true, + Command: rf.logs[i].Command, + CommandIndex: i, + } + rf.lastApplied = i + } +} + // // RequestVote RPC handler. // @@ -261,6 +274,7 @@ func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) { // Your code here (2A, 2B). rf.mu.Lock() defer rf.mu.Unlock() + defer rf.persist() if args.Term < rf.currentTerm { reply.Term = rf.currentTerm @@ -328,6 +342,7 @@ func (rf *Raft) sendRequestVote(server int, args *RequestVoteArgs, reply *Reques if reply.Term > rf.currentTerm { rf.stepDownToFollower(args.Term) + rf.persist() return } @@ -363,29 +378,13 @@ func (rf *Raft) broadcastRequestVote() { } } -// -// apply the committed logs. -// -func (rf *Raft) applyLogs() { - rf.mu.Lock() - defer rf.mu.Unlock() - - for i := rf.lastApplied + 1; i <= rf.commitIndex; i++ { - rf.applyCh <- ApplyMsg{ - CommandValid: true, - Command: rf.logs[i].Command, - CommandIndex: i, - } - rf.lastApplied = i - } -} - // // AppendEntries RPC handler. // func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) { rf.mu.Lock() defer rf.mu.Unlock() + defer rf.persist() if args.Term < rf.currentTerm { reply.Term = rf.currentTerm @@ -460,6 +459,7 @@ func (rf *Raft) sendAppendEntries(server int, args *AppendEntriesArgs, reply *Ap rf.mu.Lock() defer rf.mu.Unlock() + defer rf.persist() if rf.state != Leader || args.Term != rf.currentTerm || reply.Term < rf.currentTerm { return @@ -569,6 +569,7 @@ func (rf *Raft) Start(command interface{}) (int, int, bool) { term := rf.currentTerm rf.logs = append(rf.logs, LogEntry{term, command}) + rf.persist() return rf.getLastIndex(), term, true } @@ -637,6 +638,7 @@ func (rf *Raft) convertToCandidate(fromState State) { rf.currentTerm++ rf.votedFor = rf.me rf.voteCount = 1 + rf.persist() rf.broadcastRequestVote() }