Skip to content

Commit

Permalink
Finish lab 2C: Raft state persistence.
Browse files Browse the repository at this point in the history
  • Loading branch information
WenbinZhu committed Feb 9, 2021
1 parent 46b2616 commit 9e09f0b
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 42 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ Course website: https://pdos.csail.mit.edu/6.824/schedule.html

- [x] Lab 1: MapReduce

- [ ] Lab 2: Raft Consensus Algorithm
- [x] Lab 2: Raft Consensus Algorithm
- [x] Lab 2A: Raft Leader Election
- [x] Lab 2B: Raft Log Entries Append
- [ ] Lab 2C: Raft state persistence
- [x] Lab 2C: Raft state persistence
- [ ] Lab 3: Fault-tolerant Key/Value Service

- [ ] Lab 4: Sharded Key/Value Service
82 changes: 42 additions & 40 deletions src/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,15 @@ package raft
//

import (
"../labgob"
"../labrpc"
"bytes"
"math/rand"
"sync"
"sync/atomic"
"time"
)

// import "bytes"
// import "../labgob"

//
// as each Raft peer becomes aware that successive log entries are
// committed, the peer should send an ApplyMsg to the service (or
Expand Down Expand Up @@ -111,38 +110,35 @@ func (rf *Raft) GetState() (int, bool) {
// save Raft's persistent state to stable storage,
// where it can later be retrieved after a crash and restart.
// see paper's Figure 2 for a description of what should be persistent.
// lock must be held before calling this.
//
func (rf *Raft) persist() {
// Your code here (2C).
// Example:
// w := new(bytes.Buffer)
// e := labgob.NewEncoder(w)
// e.Encode(rf.xxx)
// e.Encode(rf.yyy)
// data := w.Bytes()
// rf.persister.SaveRaftState(data)
w := new(bytes.Buffer)
e := labgob.NewEncoder(w)
if e.Encode(rf.currentTerm) != nil ||
e.Encode(rf.votedFor) != nil || e.Encode(rf.logs) != nil {
panic("failed to encode raft persistent state")
}
data := w.Bytes()
rf.persister.SaveRaftState(data)
}

//
// restore previously persisted state.
//
func (rf *Raft) readPersist(data []byte) {
if data == nil || len(data) < 1 { // bootstrap without any state?
// bootstrap without any state?
if data == nil || len(data) < 1 {
return
}
// Your code here (2C).
// Example:
// r := bytes.NewBuffer(data)
// d := labgob.NewDecoder(r)
// var xxx
// var yyy
// if d.Decode(&xxx) != nil ||
// d.Decode(&yyy) != nil {
// error...
// } else {
// rf.xxx = xxx
// rf.yyy = yyy
// }
r := bytes.NewBuffer(data)
d := labgob.NewDecoder(r)
if d.Decode(&rf.currentTerm) != nil ||
d.Decode(&rf.votedFor) != nil || d.Decode(&rf.logs) != nil {
panic("failed to decode raft persistent state")
}
}

//
Expand Down Expand Up @@ -254,13 +250,31 @@ func (rf *Raft) isLogUpToDate(cLastIndex int, cLastTerm int) bool {
return cLastTerm > myLastTerm
}

//
// apply the committed logs.
//
func (rf *Raft) applyLogs() {
rf.mu.Lock()
defer rf.mu.Unlock()

for i := rf.lastApplied + 1; i <= rf.commitIndex; i++ {
rf.applyCh <- ApplyMsg{
CommandValid: true,
Command: rf.logs[i].Command,
CommandIndex: i,
}
rf.lastApplied = i
}
}

//
// RequestVote RPC handler.
//
func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
// Your code here (2A, 2B).
rf.mu.Lock()
defer rf.mu.Unlock()
defer rf.persist()

if args.Term < rf.currentTerm {
reply.Term = rf.currentTerm
Expand Down Expand Up @@ -328,6 +342,7 @@ func (rf *Raft) sendRequestVote(server int, args *RequestVoteArgs, reply *Reques

if reply.Term > rf.currentTerm {
rf.stepDownToFollower(args.Term)
rf.persist()
return
}

Expand Down Expand Up @@ -363,29 +378,13 @@ func (rf *Raft) broadcastRequestVote() {
}
}

//
// apply the committed logs.
//
func (rf *Raft) applyLogs() {
rf.mu.Lock()
defer rf.mu.Unlock()

for i := rf.lastApplied + 1; i <= rf.commitIndex; i++ {
rf.applyCh <- ApplyMsg{
CommandValid: true,
Command: rf.logs[i].Command,
CommandIndex: i,
}
rf.lastApplied = i
}
}

//
// AppendEntries RPC handler.
//
func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
rf.mu.Lock()
defer rf.mu.Unlock()
defer rf.persist()

if args.Term < rf.currentTerm {
reply.Term = rf.currentTerm
Expand Down Expand Up @@ -460,6 +459,7 @@ func (rf *Raft) sendAppendEntries(server int, args *AppendEntriesArgs, reply *Ap

rf.mu.Lock()
defer rf.mu.Unlock()
defer rf.persist()

if rf.state != Leader || args.Term != rf.currentTerm || reply.Term < rf.currentTerm {
return
Expand Down Expand Up @@ -569,6 +569,7 @@ func (rf *Raft) Start(command interface{}) (int, int, bool) {

term := rf.currentTerm
rf.logs = append(rf.logs, LogEntry{term, command})
rf.persist()

return rf.getLastIndex(), term, true
}
Expand Down Expand Up @@ -637,6 +638,7 @@ func (rf *Raft) convertToCandidate(fromState State) {
rf.currentTerm++
rf.votedFor = rf.me
rf.voteCount = 1
rf.persist()

rf.broadcastRequestVote()
}
Expand Down

0 comments on commit 9e09f0b

Please sign in to comment.