Implementing Raft: Part 1 - Elections



This is Part 1 in a series of posts describing the Raft distributed consensus algorithm and its complete implementation in Go. Here is a list of posts in the series:

  • Part 0: Introduction
  • Part 1: Elections (this post)
  • Part 2: Commands and log replication (coming soon)
  • Part 3: Persistence and optimizations (coming soon)

In this part, I'm going to explain the general structure of our Raft implementation and focus on the leader election facet of the algorithm. The code for this part includes a fully functional test harness with some tests you can run to experiment with the system. It doesn't answer client requests though, and it doesn't maintain a log; all of that will be added in part 2.

Code structure

A few words on how the Raft implementation is structured; this applies to all parts in the series.

Typically, Raft is implemented as an object you embed into some service. Since we're not actually developing a service here, but only studying Raft itself, I've created a simple Server type that wraps a ConsensusModule type to isolate the interesting parts of the code as much as possible:

Architecture of a consensus module embedded into a server

The consensus module (CM) implements the heart of the Raft algorithm, and is in file raft.go. It's completely abstracted away from the details of networking and connections to other replicas in the cluster. The only fields in ConsensusModule relevant to networking are:

// id is the server ID of this CM.
id int

// peerIds lists the IDs of our peers in the cluster.
peerIds []int

// server is the server containing this CM. It's used to issue RPC calls
// to peers.
server *Server

In the implementation, each Raft replica calls the other replicas in the cluster its "peers". Each peer in the cluster has a unique numeric ID, and a list of IDs of all its peers. The server field is a pointer to the containing Server (implemented in server.go), which enables ConsensusModule to send messages to peers. We'll see how it's done later on.

The goal of this design is to leave all the networking details out of the picture, focusing on the Raft algorithm. In general, to map the Raft paper onto this implementation all you need is the ConsensusModule type and its methods. The Server code is a fairly straightforward Go networking scaffold, with some minor intricacies to enable rigorous testing. I won't spend time on it in this series, but feel free to ask questions if something is not clear about it.

Raft server states

On a high level, a Raft CM is a state machine with 3 states [1]:

Raft high level state machine (Figure 4 from Raft paper)

This can be a little confusing since part 0 spent a lot of time explaining how Raft helps implement state machines; as is often the case, the term state is overloaded here. Raft is an algorithm for implementing arbitrary replicated state machines, but Raft also has a small state machine inside it. Going forward, which state is meant where should be clear from context - and I'll make sure to point it out when it isn't.

In a typical steady-state scenario, one server in a cluster is a leader, while all the others are followers. While we'd like things to continue this way forever, the goal if Raft is to be fault tolerant, so we're going to spend most of our time discussing atypical, failure scenarios, where some servers crash, others get disconnected, etc.

As was mentioned before, Raft uses a strong leadership model. The leader answers client requests, adds new entries to the log and replicates them to followers. Every follower is always ready to take over leadership in case the leader fails or stops communicating. This is the "times out, start election" transition from Follower to Candidate in the diagram.

Terms

Just like in regular elections, those in Raft have terms. A term is the period of time for which a certain server is a leader. A new election triggers a new term, and the Raft algorithm ensures that a given term has a single leader.

The analogy shouldn't be taken too far, however, since Raft elections are very different from real elections. In Raft, the elections are much more cooperative; the goal of a candidate is not to win election at all costs - all candidates share the goal of having some suitable server win the election in any given term. We'll talk more about what "suitable" means here shortly.

Election timer

A key building block of the Raft algorithm is the election timer. This is the timer every follower runs continuously, restarting it every time it hears from the current leader. The leader is sending periodic heartbeats, so when these stop arriving a follower assumes that the leader has crashed or got disconnected, and starts an election (switches to the Candidate state).

Q: Wouldn't all followers become candidates simultaneously?

A: The election timer is randomized, and this is one of the keys to Raft's simplicity. Raft uses this randomization to lower the chance of multiple followers running elections simultaneously. But even if they do become candidates at the same time, only one will be elected as a leader for any given term. In rare cases where the vote becomes split such that no candidate can win, a new election (with a new term) will run. While it's theoretically possible to re-run elections forever, the chance of this happening gets much lower with each election round.

Q: What if a follower gets disconnected (partitioned) from the cluster? Won't it start an election because it doesn't hear from a leader?

A: This is the insidiousness of network partitions, because the follower has no way of distinguishing who is partitioned. Yes, it will start an election. But if it's this follower that has been disconnected, this election will simply go nowhere - since it can't contact the other peers, it won't get any votes. It will likely keep spinning in the candidate state (restarting a new election every once in a while) until it reconnects to the cluster. We'll study this scenario in more detail later on.

Inter-peer RPCs

Raft has two kinds of RPCs peers send each other. For detailed arguments and rules for these RPCs please see Figure 2 in the paper. I will briefly discuss their goals:

  • RequestVotes (RV): used only in the candidate state; candidates use it to request votes from peers in an election. The reply contains an indication of whether a vote is granted.
  • AppendEntries (AE): used only in the leader state; leaders use this RPC to replicate log entries to followers, but also to send heartbeats. This RPC is periodically sent to each follower even if there are no new log entries to replicate.

Diligent readers will infer from the above that followers don't send any RPCs. This is correct; followers don't initiate RPCs to peers, but they have an election timer running in the background. If this timer elapses without hearing from a current leader, the follower becomes a candidate and starts sending RVs.

Implementing the election timer

It's time to dive into the code. Unless stated otherwise, all the code samples shown below are taken from this file. I won't provide a full listing of the fields of the ConsensusModule struct - you can see it in that file.

Our CM implements the election timer by running the following function in a goroutine:

func (cm *ConsensusModule) runElectionTimer() {
  timeoutDuration := cm.electionTimeout()
  cm.mu.Lock()
  termStarted := cm.currentTerm
  cm.mu.Unlock()
  cm.dlog("election timer started (%v), term=%d", timeoutDuration, termStarted)

  // This loops until either:
  // - we discover the election timer is no longer needed, or
  // - the election timer expires and this CM becomes a candidate
  // In a follower, this typically keeps running in the background for the
  // duration of the CM's lifetime.
  ticker := time.NewTicker(10 * time.Millisecond)
  defer ticker.Stop()
  for {
    <-ticker.C

    cm.mu.Lock()
    if cm.state != Candidate && cm.state != Follower {
      cm.dlog("in election timer state=%s, bailing out", cm.state)
      cm.mu.Unlock()
      return
    }

    if termStarted != cm.currentTerm {
      cm.dlog("in election timer term changed from %d to %d, bailing out", termStarted, cm.currentTerm)
      cm.mu.Unlock()
      return
    }

    // Start an election if we haven't heard from a leader or haven't voted for
    // someone for the duration of the timeout.
    if elapsed := time.Since(cm.electionResetEvent); elapsed >= timeoutDuration {
      cm.startElection()
      cm.mu.Unlock()
      return
    }
    cm.mu.Unlock()
  }
}

It begins by selecting a pseudo-random election timeout, by calling cm.electionTimeout. The range we're using here is from 150 to 300 milliseconds, as suggested in the paper. As most methods of ConsensusModule, runElectionTimer locks the struct while accessing fields. This is essential because the implementation tries to be as synchronous as possible, which is one of Go's strengths. This means that sequential code is... sequential, and not split across multiple event handlers. RPCs still happen concurrently, though, so we have to protect the shared data structure. We'll get to the RPC handlers soon.

The main loop in this method runs a ticker for 10 ms. There are more efficient ways of waiting for events, but this idiom results in the simplest code. Each loop iteration happens after the next 10 ms have elapsed. Theoretically this could sleep for the whole election timeout, but then it would be less responsive and somewhat harder to debug/follow in logs. We check if the state is still as expected [2] and the term hasn't changed. If anything is off, we terminate the election timer.

If enough time has passed since the last "election reset event", this peer starts an election and becomes a candidate. What is this election reset event? It's any of the things that can terminate an election - for example, a valid heartbeat was received, or a vote given to another candidate. We'll see this code shortly.

Becoming a candidate

We've seen above that once enough time has passed without the follower hearing from a leader or another candidate, it starts an election. Before looking at the code, let's think about the things we need to run an election:

  1. Switch the state to candidate and increment the term, because that's what the algorithm dictates for every election.
  2. Send RV RPCs to all peers, asking them to vote for us in this election.
  3. Wait for replies to these RPCs and count if we got enough votes to become a leader.

In Go all of this logic can be collected into a single function:

func (cm *ConsensusModule) startElection() {
  cm.state = Candidate
  cm.currentTerm += 1
  savedCurrentTerm := cm.currentTerm
  cm.electionResetEvent = time.Now()
  cm.votedFor = cm.id
  cm.dlog("becomes Candidate (currentTerm=%d); log=%v", savedCurrentTerm, cm.log)

  var votesReceived int32 = 1

  // Send RequestVote RPCs to all other servers concurrently.
  for _, peerId := range cm.peerIds {
    go func(peer int) {
      args := RequestVoteArgs{
        Term:        savedCurrentTerm,
        CandidateId: cm.id,
      }
      var reply RequestVoteReply

      cm.dlog("sending RequestVote to %d: %+v", peer, args)
      if err := cm.server.Call(peer, "ConsensusModule.RequestVote", args, &reply); err == nil {
        cm.mu.Lock()
        defer cm.mu.Unlock()
        cm.dlog("received RequestVoteReply %+v", reply)

        if cm.state != Candidate {
          cm.dlog("while waiting for reply, state = %v", cm.state)
          return
        }

        if reply.Term > savedCurrentTerm {
          cm.dlog("term out of date in RequestVoteReply")
          cm.becomeFollower(reply.Term)
          return
        } else if reply.Term == savedCurrentTerm {
          if reply.VoteGranted {
            votes := int(atomic.AddInt32(&votesReceived, 1))
            if votes*2 > len(cm.peerIds)+1 {
              // Won the election!
              cm.dlog("wins election with %d votes", votes)
              cm.startLeader()
              return
            }
          }
        }
      }
    }(peerId)
  }

  // Run another election timer, in case this election is not successful.
  go cm.runElectionTimer()
}

The candidate starts by voting for itself - initializing votesReceived to 1 and setting cm.votedFor = cm.id.

It then issues RPCs in parallel to all its peers. Each RPC is done in its own goroutine, because our RPC calls are synchronous - they block until a response is received, which can take a while.

This is a good place to demonstrate how RPCs are done:

cm.server.Call(peer, "ConsensusModule.RequestVote", args, &reply)

We use the Server pointer contained in the ConsensusModule.server field to issue a remote call, with ConsensusModule.RequestVotes as the remote method name. This ends up calling the RequestVote method of the peer given in the first argument.

If the RPC succeeds, some time has passed so we have to check the state to see what our options are. If our state is no longer candidate, bail out. When can this happen? For example, we might have won the election because there were enough votes in the other RPC calls. Or one of the other RPC calls heard from a server with a higher term, so we switched back to be a follower. It's important to remember that in case of a flaky network, the RPC can take a long while to arrive - when we have the reply, the rest of the code may have moved on and it's important to gracefully give up in such cases.

If we're still a candidate when the reply is back, we check the term of the reply and compare it to the original term we were on when we sent the request. If the reply's term is higher, we revert to a follower state. This can happen if another candidate won an election while we were collecting votes, for example.

If the term is the same as the one we sent out, check if a vote was granted. We use an atomic votes variable to collect votes from multiple goroutines safely. If this server has the majority of the votes (including the vote it granted itself), it becomes a leader.

Note that the startElection method is not blocking. It updates some state, launches a bunch of goroutines and returns. Therefore, it should also start a new election counter in a goroutine - which it does on the last line. This ensures that if nothing useful comes out of this election, a new one will begin after the usual timeout. This also explains the state checks in runElectionTimer: if this election does turn the peer into a leader, the concurrent runElectionTimer will just return when observing a state it didn't expect to be in.

Becoming a leader

We've seen the startLeader call in startElection when the vote tally shows this peer has won. Here it is:

func (cm *ConsensusModule) startLeader() {
  cm.state = Leader
  cm.dlog("becomes Leader; term=%d, log=%v", cm.currentTerm, cm.log)

  go func() {
    ticker := time.NewTicker(50 * time.Millisecond)
    defer ticker.Stop()

    // Send periodic heartbeats, as long as still leader.
    for {
      cm.leaderSendHeartbeats()
      <-ticker.C

      cm.mu.Lock()
      if cm.state != Leader {
        cm.mu.Unlock()
        return
      }
      cm.mu.Unlock()
    }
  }()
}

This is actually a fairly simple method: all it does is run the heartbeat ticker - a goroutine that calls leaderSendHeartbeats every 50 ms, as long as this CM is still the leader. This is the code for leaderSendHeartbeats:

func (cm *ConsensusModule) leaderSendHeartbeats() {
  cm.mu.Lock()
  savedCurrentTerm := cm.currentTerm
  cm.mu.Unlock()

  for _, peerId := range cm.peerIds {
    args := AppendEntriesArgs{
      Term:     savedCurrentTerm,
      LeaderId: cm.id,
    }
    go func(peer int) {
      cm.dlog("sending AppendEntries to %v: ni=%d, args=%+v", peer, 0, args)
      var reply AppendEntriesReply
      if err := cm.server.Call(peer, "ConsensusModule.AppendEntries", args, &reply); err == nil {
        cm.mu.Lock()
        defer cm.mu.Unlock()
        if reply.Term > savedCurrentTerm {
          cm.dlog("term out of date in heartbeat reply")
          cm.becomeFollower(reply.Term)
          return
        }
      }
    }(peerId)
  }
}

It's somewhat similar to startElection, in the sense that it launches a goroutine per peer to send an RPC. This time the RPC is AppendEntries (AE) with no log contents, which plays the role of a heartbeat in Raft.

Similarly to processing a RV reply, if the RPC returns a term higher than our own, this peer switches to become a follower. It's time to examine the becomeFollower method:

func (cm *ConsensusModule) becomeFollower(term int) {
  cm.dlog("becomes Follower with term=%d; log=%v", term, cm.log)
  cm.state = Follower
  cm.currentTerm = term
  cm.votedFor = -1
  cm.electionResetEvent = time.Now()

  go cm.runElectionTimer()
}

It sets the CM's state to follower and resets its term and other important state fields. It also starts a new election timer, since this is something a follower should always have running in the background.

Answering RPCs

So far we've seen the active parts of the implementation - the parts that initiate RPCs, timers and state transitions. The presentation is not complete before we see the server methods - procedures which other peers invoke remotely. Let's start with RequestVote:

func (cm *ConsensusModule) RequestVote(args RequestVoteArgs, reply *RequestVoteReply) error {
  cm.mu.Lock()
  defer cm.mu.Unlock()
  if cm.state == Dead {
    return nil
  }
  cm.dlog("RequestVote: %+v [currentTerm=%d, votedFor=%d]", args, cm.currentTerm, cm.votedFor)

  if args.Term > cm.currentTerm {
    cm.dlog("... term out of date in RequestVote")
    cm.becomeFollower(args.Term)
  }

  if cm.currentTerm == args.Term &&
    (cm.votedFor == -1 || cm.votedFor == args.CandidateId) {
    reply.VoteGranted = true
    cm.votedFor = args.CandidateId
    cm.electionResetEvent = time.Now()
  } else {
    reply.VoteGranted = false
  }
  reply.Term = cm.currentTerm
  cm.dlog("... RequestVote reply: %+v", reply)
  return nil
}

Note the check for a "dead" state. We'll talk about it later on.

It starts with the familiar logic of checking if the term is out of date and becoming a follower. If it's already a follower, the state won't change but the other state fields will reset.

Otherwise if the caller's term is aligned with ours and we haven't voted for another candidate yet, we'll grant the vote. We never grant a vote for RPCs from older terms.

This is the code for AppendEntries:

func (cm *ConsensusModule) AppendEntries(args AppendEntriesArgs, reply *AppendEntriesReply) error {
  cm.mu.Lock()
  defer cm.mu.Unlock()
  if cm.state == Dead {
    return nil
  }
  cm.dlog("AppendEntries: %+v", args)

  if args.Term > cm.currentTerm {
    cm.dlog("... term out of date in AppendEntries")
    cm.becomeFollower(args.Term)
  }

  reply.Success = false
  if args.Term == cm.currentTerm {
    if cm.state != Follower {
      cm.becomeFollower(args.Term)
    }
    cm.electionResetEvent = time.Now()
    reply.Success = true
  }

  reply.Term = cm.currentTerm
  cm.dlog("AppendEntries reply: %+v", *reply)
  return nil
}

This logic should also align with the election parts of figure 2 in the paper. One tricky condition to understand is this:

if cm.state != Follower {
  cm.becomeFollower(args.Term)
}

Q: What if this peer is a leader - why does it become a follower to another leader?

A: Raft guarantees that only a single leader exists in any given term. If you carefully follow the logic of RequestVote and the code in startElection that sends RVs, you'll see that two leaders can't exist in the cluster with the same term. This condition is important for candidates that find out that another peer won the election for this term.

States and goroutines

It's worth doing a recap of all the possible states a CM can be in, and the different goroutines running in them:

Follower: when the CM is initialized to be a follower, and in each invocation of becomeFollower, a new goroutine starts running runElectionTimer. This is the companion of followers. Note that there can be more than one running at once for short periods of time. Suppose a follower gets a RV from a leader in a higher term; this will trigger another becomeFollower call that launches a new timer goroutine. But the old one will exit without doing anything, as soon as it notices a changed term.

Candidate: a candidate also has the election goroutine ticking in parallel, but in addition it has a number of goroutines to send RPCs. It has the same safeguards as a follower for stopping an "old" election goroutine if a new one started running. Do recall that RPC goroutines may take a very long time to finish, so it's essential for them to exit quietly if they notice they are out of date by the time the RPC call returns.

Leader: a leader does not have an election goroutine, but it does have the heartbeat goroutine ticking every 50 ms.

There's an additional state making an appearance in the code - the Dead state. This is purely for orderly shutdown of a CM. A call to Stop sets the state to Dead and all goroutines take care to exit as soon as they observe this state.

Having all these goroutines running may be concerning - what if some of them remain, lingering in the background; or worse, they repeatedly leak and their number grows without bounds? This is what leak checking is for, and several of the tests have leak checking enabled. These tests run nontrivial Raft election sequences and ensure that no stray goroutine is found running at the end of the test (after Stop has been called and some time was given for stragglers to exit).

Runaway server and increasing terms

To conclude this part in the series, let's study a tricky scenario that may occur and how Raft handles it. I find this example very interesting and instructive. Here I'm trying to present it as a story, but you might want to use a piece of paper to follow the states of the different servers. If you can't follow the example - please send me an email - I'll gladly fix it to make it clearer.

Consider a cluster with three servers: A, B and C. Suppose that A is the leader, the starting term is 1 and the cluster is happily chugging along. A is sending heartbeat AE RPCs to B and C every 50 ms, and gets prompt responses within a few ms; each such AE resets B's and C's electionResetEvent, so they remain as contented followers.

At some point, due to a temporary hiccup in its network router, server B gets partitioned from A and C. A is still sending it AEs every 50 ms, but these AEs either error out immediately or after some long-ish timeout by the underlying RPC engine. There's not much A can do about it, but it's no big deal. We haven't talked about log replication yet, but since two of the three servers are alive, the cluster has a quorum to commit client commands.

What about B? Let's say that when it was disconnected, its election timeout was set to 200 ms. Approximately 200 ms after the disconnection, B's runElectionTimer goroutine realizes it hasn't heard from the leader for the election timeout; B has no way of distinguishing who's at fault here, so it will become a candidate and start a new election.

B's term will thus become 2 (while A's and C's are still at 1). B will dutifully send out RV RPCs to A and C to ask them to vote for it; but of course, these RPCs get lost in the jumbled wires of B's network router. No need to panic! B's startElection launched another runElectionTimer goroutine right at the start, and that goroutine waits for, say 250 ms (remember, our timeout range is random between 150-300 ms) to see if anything important happened as a result of the previous election. Nothing did for B, because it's still completely isolated. So runElectionTimer starts another new election, incrementing the term to 3.

And so on and so forth; B's router takes a few whole seconds to reset itself and go back online. In the meantime, B's rerunning elections every once in a while and its term has already reached 8.

At this point, the network partition is fixed and B is reconnected to A and to C.

Shortly after, an AE RPC arrives from A. Recall that A keeps sending them dutifully every 50 ms, even though B didn't reply for a while.

B's AppendEntries is called and sends back a reply with term=8.

A gets this reply in leaderSendHeartbeats, examines the reply's term and notices it's higher than its own. It updates its own term to 8 and becomes a follower. The cluster temporarily loses a leader.

Now multiple things can happen, depending on the timing. B is a candidate, but it may have sent its RVs before the network revived; C is a follower, but within its own election timeout it will become a candidate because it stops receiving periodic AEs from A. A became a follower, and will also turn into a candidate within its election timeout.

So any of the three servers can win the next election. Note that this is only because we don't actually replicate any logs here. As we'll see in the next part, in realistic scenarios A and C will likely add some new client commands while B is away, so their logs will be more up to date. Therefore, B cannot become the new leader - a new election will occur which will be won by either A or C; we'll revisit this scenario again in the next part.

Assuming no new commands were added since B disconnected, it's absolutely fine for a leader change to occur as a result of the reconnection.

If this may seem inefficient - it's true. The leader change is not really necessary here, because A was perfectly healthy throughout the scenario. But keeping the invariants simple at the cost of some efficiency in corner cases is one of the design choices made by Raft. What counts is efficiency in the common case (without any disruptions), because this is the state clusters spend 99.9% of their time in.

What's next

To ensure that your understanding of the implementation is not only theoretical, I strongly encourage you to play with the code a bit.

The repository README has some detailed instructions about interacting with the code, running its tests and observing results. The code comes with many tests that exercise specific scenarios (including the scenario described in the previous section) and it's very instructive to run a single test and watch the Raft logs. Notice all the cm.dlog(...) calls in the code? The repository comes with a special tool that helps visualize these logs alongside each other in a HTML file - see the README for instructions. Run some tests, watch their logs, and feel free to sprinkle your own dlog calls to get a better understanding of when different parts of the code are exercised.

Part 2 in the series will describe a more complete Raft implementation that actually handles client commands and replicates them throughout the cluster. Stay tuned!


[1]This diagram is the same as Figure 4 in the the Raft paper. This is a good place for a reminder that in this series of posts I assume you've already read that paper.
[2]The check for state being not follower and not candidate may seem strange. Can this peer become a leader suddenly, not through elections initiated in runElectionTimer? Read on to see how an election counter is restarted by candidates.

Recent posts

2020.02.22: Implementing Raft: Part 0 - Introduction
2020.01.21: Graceful shutdown of a TCP server in Go
2020.01.07: PubSub using channels in Go
2020.01.02: My Reading Habits
2019.12.31: Summary of reading: October - December 2019
2019.11.23: "Beating" C with 400 lines of unoptimized assembly
2019.11.06: How to send good pull requests on GitHub
2019.10.21: Diffie-Hellman Key Exchange
2019.10.01: Simple Go project layout with modules
2019.09.30: Summary of reading: July - September 2019

See Archives for a full list.