How Raft Works
Raft divides the consensus problem into three subproblems:
1. Leader Election
- Servers start as followers
- If followers don’t hear from a leader, they become candidates
- Candidates request votes from other servers
- A candidate becomes leader if it receives votes from a majority
- Leaders send periodic heartbeats to maintain authority
2. Log Replication
- Clients send commands to the leader
- The leader appends the command to its log
- The leader replicates the log entry to followers
- Once safely replicated, the leader commits the entry
- The leader notifies followers of committed entries
3. Safety
- Election Safety: At most one leader per term
- Leader Append-Only: Leaders never overwrite or delete entries
- Log Matching: If two logs contain an entry with the same index and term, all previous entries are identical
- Leader Completeness: If an entry is committed, it will be present in the logs of all future leaders
- State Machine Safety: If a server applies an entry to its state machine, no other server will apply a different entry for the same log index
Raft Implementation Example
Here’s a simplified implementation of Raft’s core components:
// Server states
const (
Follower = iota
Candidate
Leader
)
type RaftServer struct {
// Server identity
id int
state int
// Persistent state
currentTerm int
votedFor int
log []LogEntry
// Volatile state
commitIndex int
lastApplied int
// Leader state
nextIndex map[int]int
matchIndex map[int]int
// Channels for communication
electionTimer *time.Timer
heartbeatTimer *time.Timer
// Network connections to other servers
peers []RaftPeer
}
type LogEntry struct {
Term int
Command interface{}
}
func (s *RaftServer) Start() {
s.state = Follower
s.resetElectionTimer()
for {
switch s.state {
case Follower:
s.runFollower()
case Candidate:
s.runCandidate()
case Leader:
s.runLeader()
}
}
}
func (s *RaftServer) runFollower() {
select {
case <-s.electionTimer.C:
s.state = Candidate
case rpc := <-s.rpcCh:
s.handleRPC(rpc)
s.resetElectionTimer()
}
}
func (s *RaftServer) runCandidate() {
s.currentTerm++
s.votedFor = s.id
votesReceived := 1 // Vote for self
// Request votes from all peers
for _, peer := range s.peers {
go func(p RaftPeer) {
args := RequestVoteArgs{
Term: s.currentTerm,
CandidateId: s.id,
LastLogIndex: len(s.log) - 1,
LastLogTerm: s.getLastLogTerm(),
}
response := p.RequestVote(args)
if response.VoteGranted {
votesReceived++
if votesReceived > len(s.peers)/2 {
s.state = Leader
s.initLeaderState()
}
} else if response.Term > s.currentTerm {
s.currentTerm = response.Term
s.state = Follower
s.votedFor = -1
}
}(peer)
}
s.resetElectionTimer()
}
func (s *RaftServer) runLeader() {
s.sendHeartbeats()
s.heartbeatTimer = time.NewTimer(100 * time.Millisecond)
select {
case <-s.heartbeatTimer.C:
// Time to send heartbeats again
case rpc := <-s.rpcCh:
s.handleRPC(rpc)
case cmd := <-s.proposeCh:
// Append to local log
s.log = append(s.log, LogEntry{Term: s.currentTerm, Command: cmd})
// Replicate to followers (simplified)
s.replicateLogs()
}
}
func (s *RaftServer) handleRequestVote(args RequestVoteArgs) RequestVoteResponse {
if args.Term < s.currentTerm {
return RequestVoteResponse{Term: s.currentTerm, VoteGranted: false}
}
if args.Term > s.currentTerm {
s.currentTerm = args.Term
s.state = Follower
s.votedFor = -1
}
if (s.votedFor == -1 || s.votedFor == args.CandidateId) &&
s.isLogUpToDate(args.LastLogIndex, args.LastLogTerm) {
s.votedFor = args.CandidateId
return RequestVoteResponse{Term: s.currentTerm, VoteGranted: true}
}
return RequestVoteResponse{Term: s.currentTerm, VoteGranted: false}
}
func (s *RaftServer) handleAppendEntries(args AppendEntriesArgs) AppendEntriesResponse {
if args.Term < s.currentTerm {
return AppendEntriesResponse{Term: s.currentTerm, Success: false}
}
// Valid leader, reset election timer
s.resetElectionTimer()
if args.Term > s.currentTerm {
s.currentTerm = args.Term
s.state = Follower
s.votedFor = -1
}
// Check if log contains an entry at prevLogIndex with prevLogTerm
if args.PrevLogIndex >= len(s.log) ||
(args.PrevLogIndex >= 0 && s.log[args.PrevLogIndex].Term != args.PrevLogTerm) {
return AppendEntriesResponse{Term: s.currentTerm, Success: false}
}
// Process log entries
for i, entry := range args.Entries {
index := args.PrevLogIndex + 1 + i
if index < len(s.log) {
if s.log[index].Term != entry.Term {
// Delete conflicting entries and all that follow
s.log = s.log[:index]
// Append new entries
s.log = append(s.log, entry)
}
// Else entry already exists, do nothing
} else {
// Append new entries
s.log = append(s.log, entry)
}
}
// Update commit index
if args.LeaderCommit > s.commitIndex {
s.commitIndex = min(args.LeaderCommit, len(s.log)-1)
s.applyCommittedEntries()
}
return AppendEntriesResponse{Term: s.currentTerm, Success: true}
}
When to Use Raft
Raft is ideal when:
- You need a consensus algorithm that’s easier to understand and implement
- You’re building a system where leader-based operation is acceptable
- You want strong consistency with reasonable performance
- You need a well-documented algorithm with many existing implementations
Challenges with Raft
- Leader-based design can be a bottleneck
- Leader changes require at least one round-trip time
- Membership changes require careful handling
ZAB: ZooKeeper Atomic Broadcast
ZAB (ZooKeeper Atomic Broadcast) is the consensus protocol used by Apache ZooKeeper, a widely-used coordination service for distributed systems. While less general-purpose than Paxos or Raft, ZAB is optimized for the specific requirements of ZooKeeper.
How ZAB Works
ZAB operates in three phases:
1. Leader Election
- Servers elect a leader using a fast leader election algorithm
- The elected leader must have the most up-to-date transaction history