What you build
- a local 3-node Raft group
- a tiny atomic register state machine
- tests for election, replication, snapshots, and membership changes
This is the best end-to-end walkthrough in the docs. It takes you from MicroRaft abstractions to a small working replicated state machine.
./gradlew :microraft-tutorial:test --tests io.microraft.tutorial.OperationCommitTestIn this section, we will build an atomic register on top of MicroRaft to
demonstrate how to implement and use MicroRaft's main abstractions. Our atomic
register will consist of a single value and only 3 operations: set,
compare-and-set, and get. In order to keep things simple, we will not run
our Raft group in a distributed setting. Instead, will run each Raft node on a
different thread and make them communicate with each other via multi-threaded
queues.
If you haven't read the Main Abstractions section yet, I highly recommend you to read that section before this tutorial.
All the code shown here are compiling and available in the MicroRaft GitHub repository. You can clone the repository and run the code samples on your machine to try each part yourself. I intentionally duplicated a lot of code in the test classes below to put all pieces together so that you can see what is going on without navigating through multiple classes.
Let's crank up the engine!
We will start with writing our
RaftEndpoint,
StateMachine,
and
Transport
classes. We can use the default implementations of
RaftNodeExecutor,
RaftModel,
and
RaftModelFactory
abstractions. Since all of our Raft nodes
will run in the same JVM process, we also don't need any serialization logic
inside our Transport implementation. Last, we will also skip persistence. Our
Raft nodes will keep their state only in memory.
RaftEndpointFirst, we need to implement RaftEndpoint to represent identity of our Raft
nodes. Since we don't distribute our Raft nodes to multiple servers in this
tutorial, we don't really need IP addresses. We can simply identify our Raft
nodes with strings IDs and keep a mapping of unique IDs to Raft nodes so that we
can deliver
RaftMessage
objects to target Raft nodes.
Let's write a LocalRaftEndpoint class as below. We will generate unique Raft
endpoints via its static LocalRaftEndpoint.newEndpoint() method.
A Raft endpoint can stay intentionally small. For the tutorial harness, a stable identity is enough to form a group and route messages correctly.
You can also see this class in the MicroRaft GitHub repository.
StateMachineWe will implement our atomic register state machine iteratively. In the first iteration, we will create our Raft nodes and elect a leader without committing any atomic register operation. So the first version of our state machine, which is shown below, does not have any execution and snapshotting logic for our atomic register.
We implement
StateMachine.getNewTermOperation()
in our first version of
state machine. This method returns an operation which will be committed every
time a new leader is elected. This is actually related to the
single-server membership change bug
in the Raft consensus
algorithm rather than our atomic register logic.
Once we see that we are able to form a Raft group and elect a leader, we will extend this class to implement the missing functionality.
The first state machine version exists only to make cluster formation testable. You can defer business logic and still validate the core Raft lifecycle first.
You can also see this class in the MicroRaft GitHub repository.
TransportWe are almost there to run our first test for bootstrapping a Raft group and
electing a leader. The only missing piece is
Transport.
Recall that Transport is responsible for
sending Raft messages to other Raft nodes (serialization and networking). Since
our Raft nodes will run in a single JVM process in this tutorial, we will skip
serialization. To mimic networking, we will keep a mapping of Raft endpoints to
Raft nodes on each Transport object and pass Raft messages to between Raft
nodes by using this mapping.
Our LocalTransport class is shown below.
Transport integration can start as an in-memory adapter. That keeps the tutorial focused on Raft behavior instead of serialization or network plumbing.
You can also see this class in the MicroRaft GitHub repository.
Before application logic matters, you need a cluster that can form, discover peers, and elect a leader consistently.
Now we have all the required functionality to start our Raft group and elect a leader. Let's write our first test.
This is the first full-system check: nodes can start, discover peers, and converge on a leader before any application-specific behavior is layered on top.
To run this test on your machine, try the following:
$ ./gradlew :microraft-tutorial:test --tests io.microraft.tutorial.LeaderElectionTest
You can also see it in the MicroRaft GitHub repository.
Ok. That is a big piece of code, but no worries. We will swallow it one piece at a time.
Before bootstrapping a new Raft group, we first decide on its initial member
list, i.e., the list of Raft endpoints. The very same initial Raft group member
list must be provided to all Raft nodes, including the ones we will start later
and join to our already-running Raft group. It is what we do in the beginning of
the class by populating initialMembers with 3 unique Raft endpoints.
startRaftGroup() calls createRaftNode() for each Raft endpoint and then
starts the created Raft nodes. When a new Raft node is created, it does not know
how to talk to the other Raft nodes. Therefore, we pass created Raft node
objects to enableDiscovery() to enable them to talk to each other. This method
just adds a given Raft node to the discovery maps of the LocalTransport
objects of the other Raft nodes.
Once we create a Raft node via
RaftNodeBuilder,
its initial status is
RaftNodeStatus.INITIAL
and it does not execute the Raft consensus algorithm in this status. When
RaftNode.start() is called, its status becomes
RaftNodeStatus.ACTIVE
and the Raft node internally submits
a task to its RaftNodeExecutor to check if there is a leader. Since we are
starting a new Raft group in this test, obviously there is no leader yet so our
Raft nodes will start a new leader election round.
The actual test method is rather short. It calls waitUntilLeaderElected() to
get the leader Raft node and asserts that a Raft node is returned as leader.
Please note that this method offers the discovery functionality to find the
leader. It just waits until all Raft nodes report the same Raft node as the
leader.
Let's run this code and see the logs. The logs start like following:
23:54:38.121 INFO - [RaftNode] node2<default> Starting for default with 3 members: [LocalRaftEndpoint{id=node1}, LocalRaftEndpoint{id=node2}, LocalRaftEndpoint{id=node3}]
23:54:38.121 INFO - [RaftNode] node3<default> Starting for default with 3 members: [LocalRaftEndpoint{id=node1}, LocalRaftEndpoint{id=node2}, LocalRaftEndpoint{id=node3}]
23:54:38.121 INFO - [RaftNode] node1<default> Starting for default with 3 members: [LocalRaftEndpoint{id=node1}, LocalRaftEndpoint{id=node2}, LocalRaftEndpoint{id=node3}]
23:54:38.123 INFO - [RaftNode] node2<default> Status is set to ACTIVE
23:54:38.123 INFO - [RaftNode] node3<default> Status is set to ACTIVE
23:54:38.123 INFO - [RaftNode] node1<default> Status is set to ACTIVE
23:54:38.125 INFO - [RaftNode] node3<default> Raft Group Members {groupId: default, size: 3, term: 0, logIndex: 0} [
node1
node2
node3 - FOLLOWER this (ACTIVE)
] reason: STATUS_CHANGE
23:54:38.125 INFO - [RaftNode] node2<default> Raft Group Members {groupId: default, size: 3, term: 0, logIndex: 0} [
node1
node2 - FOLLOWER this (ACTIVE)
node3
] reason: STATUS_CHANGE
23:54:38.125 INFO - [RaftNode] node3<default> started.
23:54:38.125 INFO - [RaftNode] node1<default> Raft Group Members {groupId: default, size: 3, term: 0, logIndex: 0} [
node1 - FOLLOWER this (ACTIVE)
node2
node3
] reason: STATUS_CHANGE
23:54:38.125 INFO - [RaftNode] node2<default> started.
23:54:38.125 INFO - [RaftNode] node1<default> started.
Each RaftNode first prints the id and initial member list of the Raft group.
When we call RaftNode.start(), they switch to the RaftNodeStatus.ACTIVE
status and print a summary of the Raft group, including the member list. Each
node also marks itself in the Raft group member list, and the reason of why the
log is printed so that we can follow what is going on.
After this part, our Raft nodes start a leader election. Raft can be very chatty during leader elections. For the sake of simplicity, we will just skip the leader election logs and look at the final status.
23:54:39.141 INFO - [VoteResponseHandler] node3<default> Vote granted from node1 for term: 2, number of votes: 2, majority: 2
23:54:39.141 INFO - [VoteResponseHandler] node3<default> We are the LEADER!
23:54:39.149 INFO - [AppendEntriesRequestHandler] node1<default> Setting leader: node3
23:54:39.149 INFO - [RaftNode] node3<default> Raft Group Members {groupId: default, size: 3, term: 2, logIndex: 0} [
node1
node2
node3 - LEADER this (ACTIVE)
] reason: ROLE_CHANGE
23:54:39.149 INFO - [AppendEntriesRequestHandler] node2<default> Setting leader: node3
23:54:39.149 INFO - [RaftNode] node1<default> Raft Group Members {groupId: default, size: 3, term: 2, logIndex: 0} [
node1 - FOLLOWER this (ACTIVE)
node2
node3 - LEADER
] reason: ROLE_CHANGE
23:54:39.149 INFO - [RaftNode] node2<default> Raft Group Members {groupId: default, size: 3, term: 2, logIndex: 0} [
node1
node2 - FOLLOWER this (ACTIVE)
node3 - LEADER
] reason: ROLE_CHANGE
As you see, we managed to elect our leader in the 2nd term. It means that we had a split-vote situation in the 1st term. This is quite normal because in our test code we start our Raft nodes at the same time and each Raft node just votes for itself during the first term. Please keep in mind that in your run, another Raft node could become the leader.
Now it is time to implement the set, compare-and-set, and get operations
we talked before for our atomic register state machine. We must ensure that they
are implemented in a deterministic way, because it is a fundamental requirement
of the replicated state machines approach. MicroRaft guarantees that each Raft
node will execute committed operations in the same order and since our
operations run deterministically, we know that our state machines will end up
with the same state after they execute committed operations.
To implement these operations, we will extend our AtomicRegister class instead
of modifying it so that the readers can follow the different stages of this
tutorial. The new state machine class is below. We define an interface,
AtomicRegisterOperation, as a marker for the operations we will execute on our
state machine. There are 3 inner classes implementing this interface for the
set, compare-and-set and get operations, and accompanying static methods
to create their instances. We will pass an AtomicRegisterOperation object to
RaftNode.replicate() and once it is committed by MicroRaft it will be passed
to our state machine for execution. Our new state machine class handles these
AtomicRegisterOperation objects in the runOperation() method. set updates
the atomic register with the given value and returns its previous value,
compare-and-set updates the atomic register with the given new value only if
its current value is equal to the given current value, and get simply returns
the current value of the atomic register. Please note that the snapshotting
logic is still missing and will be implemented later in the tutorial.
The state machine boundary is explicit now. Commands and queries can be modeled cleanly before snapshotting and more advanced lifecycle concerns are added.
You can also see this class in the MicroRaft GitHub repository.
In the next test we will use the new state machine to start our Raft nodes. We
will commit a number of operations on the Raft group and verify their results.
We replicate 2 set operations, 2 compare-and-set operations, and a get
operation at the end. After each operation, we verify that its commit index is
greater than the commit index of the previous operation.
This test validates the main promise of Raft-backed writes: operations commit in a single ordered sequence, and clients can observe that sequence through commit indices.
$ ./gradlew :microraft-tutorial:test --tests io.microraft.tutorial.OperationCommitTest
You can also see it in the MicroRaft GitHub repository.
We use RaftNode.replicate() to replicate and commit operations on the Raft
group. Most of the Raft node APIs, including RaftNode.replicate(), return
CompletableFuture<Ordered> objects. For RaftNode.replicate(),
Ordered
provides return value of the executed operation
and on which Raft log index the operation has been committed.
The output of the first 2 sysout lines are below. Please note that set
returns previous value of the atomic register.
1st operation commit index: 2, result: null
2nd operation commit index: 3, result: value1
Then we commit 2 cas operations. The first cas manages to update the atomic
register, but the second one fails because the atomic register value is
different from the expected value.
3rd operation commit index: 4, result: true
4th operation commit index: 5, result: false
The last operation is a get to read the current value of the atomic register.
5th operation commit index: 6, result: value3
If we call RaftNode.replicate() on a follower or candidate Raft node, the
returned CompletableFuture<Ordered> object is simply notified with
NotLeaderException,
which also provides Raft endpoint of
the leader Raft node. I am not going to build an advanced RPC system in front of
MicroRaft here, but when we use MicroRaft in a distributed setting, we can build
a retry mechanism in the RPC layer to forward a failed operation to the Raft
endpoint given in the exception. NotLeaderException may not specify any leader
as well, for example if there is an ongoing leader election round, or the Raft
node we contacted does not know the leader yet. In this case, our RPC layer
could retry the operation on each Raft node in a round robin fashion until it
discovers the new leader.
The last operation we committed in our previous test is get, which is actually
a query (i.e, read only) operation. Even though it does not mutate the state
machine, since we used RaftNode.replicate(), it is appended to the Raft log
and committed similar to the other mutating operations. If we had persistence,
it means this approach would increase our disk usage unnecessarily because we
will persist every new entry in the Raft log, even if it is a query. Actually,
this is a sub-optimal approach.
MicroRaft offers a separate API, RaftNode.query(), to handle queries more
efficiently. There are
3 policies for queries,
each with a different consistency
guarantee:
QueryPolicy.LINEARIZABLE: We can perform a linearizable query with this
policy. MicroRaft employs the optimization described in § 6.4:
Processing read-only queries more efficiently of
the Raft dissertation
to preserve linearizability without growing the internal Raft
log. We need to hit the leader Raft node to execute a linearizable query.
QueryPolicy.LEADER_LEASE: We can run a query locally on the leader Raft node
without talking to the majority. If the called Raft node is not the leader,
the returned CompletableFuture<Ordered> object is notified with
NotLeaderException.
QueryPolicy.EVENTUAL_CONSISTENCY: We can use this policy to run a query
locally on any Raft node independent of its current role. This policy provides
the weakest consistency guarantee, but it can help us to distribute our
query-workload by utilizing followers. We can also utilize Ordered to
achive monotonic reads and read your writes.
MicroRaft also employs leader stickiness and auto-demotion of leaders on loss of
majority heartbeats. Leader stickiness means that a follower does not vote for
another Raft node before the leader heartbeat timeout elapses after the last
received Append Entries or Install Snapshot RPC. Dually, a leader Raft node
automatically demotes itself to the follower role if it does not receive Append
Entries RPC responses from the majority during the leader heartbeat timeout
duration. Along with these techniques, QueryPolicy.LEADER_LEASE can be used
for performing linearizable queries without talking to the majority if clock
drifts and network delays are bounded. However, bounding clock drifts and
network delays is not an easy job. Hence, QueryPolicy.LEADER_LEASE may cause
reading stale data if a Raft node still considers itself as the leader because
of a clock drift, while the other Raft nodes have elected a new leader and
committed new operations. Moreover, QueryPolicy.LEADER_LEASE and
QueryPolicy.LINEARIZABLE have the same processing cost since only the leader
Raft node runs a given query for both policies. QueryPolicy.LINEARIZABLE
guarantees linearizability with an extra RTT latency overhead compared to
QueryPolicy.LEADER_LEASE. For these reasons, QueryPolicy.LINEARIZABLE is the
recommended policy for linearizable queries, and QueryPolicy.LEADER_LEASE
should be used carefully.
Ok. Let's write another test to use the query API for get. We will first set a
value to the atomic register and then get the value back with a linearizable
query. Just ignore the third parameter passed to the RaftNode.query() call for
now. We will talk about it in a minute.
Linearizable queries do not need extra log entries. The test shows how MicroRaft preserves fresh reads without turning every read into a replicated write.
To run this test on your machine, try the following:
$ ./gradlew :microraft-tutorial:test --tests io.microraft.tutorial.LinearizableQueryTest
You can also see it in the MicroRaft GitHub repository.
The output of the sysout lines are below:
set operation commit index: 2
get operation commit index: 2, result: value
As we discussed above, MicroRaft handles linearizable queries without appending a new entry to the Raft log. So our query is executed at the last committed log index. That is why both commit indices are the same in the output.
QueryPolicy.LEADER_LEASE and QueryPolicy.EVENTUAL_CONSISTENCY can be easily
used if monotonicity is sufficient for query results. This is where
Ordered
comes in handy. A client can track commit indices
observed via returned Ordered objects and use the greatest observed commit
index to preserve monotonicity while issuing a local query to a Raft node. If
the local commit index of a Raft node is smaller than the commit index passed to
the RaftNode.query() call, the returned CompletableFuture object fails with
LaggingCommitIndexException.
This exception means that the
state observed by the client is more up-to-date than the contacted Raft node's
state. In this case, the client can retry its query on another Raft node.
Please refer to § 6.4.1 of the Raft dissertation
for more details.
We will make a little trick to demonstrate how to maintain the monotonicity of
the observed Raft group state for the local query policies. Recall that
LocalTransport keeps a discovery map to send Raft messages to target Raft
nodes. When we create a new Raft node, we add it to the other Raft nodes'
discovery maps. This time, we will do exactly the reverse to block the
communication between the leader and the follower. Let's add the following
method to our LocalTransport class:
public void undiscoverNode(RaftNode node) {
RaftEndpoint endpoint = node.getLocalEndpoint();
if (localEndpoint.equals(endpoint)) {
throw new IllegalArgumentException(localEndpoint + " cannot undiscover itself!");
}
nodes.remove(node.getLocalEndpoint(), node);
}
We use this method in our new test below. We block the communication between the
leader and follower after we set a value to the atomic register. Then we set
another value which will not be replicated to the disconnected follower. After
this step, we issue a query on the leader. We are also tracking the commit index
we observed. Now, we switch to the disconnected follower and issue a new local
query by passing the last observed commit index. Since the disconnected follower
does not have the second commit, it cannot satisfy the monotonicity we demand,
hence our query fails with LaggingCommitIndexException.
Monotonic local reads are enforceable with commit indices. A follower that falls behind can detect that fact explicitly instead of returning silently stale data.
To run this test on your machine, try the following:
$ ./gradlew :microraft-tutorial:test --tests io.microraft.tutorial.MonotonicLocalQueryTest
You can also see it in the MicroRaft GitHub repository.
Now it is time to implement the missing snapshotting functionality in our atomic
register state machine. Let's talk about snapshotting first. The Raft log grows
during the lifecycle of the Raft group as more operations are committed.
However, in a real-world system we cannot allow it to grow unboundedly. As the
Raft log grows longer, it will consume more space both in memory and disk, and
cause a lagging follower to catch up with the majority in a longer duration.
Raft solves this problem by taking a snapshot of the state machine at current
commit index and discarding all log entries up to it. MicroRaft implements
snapshotting by putting an upper bound on the number of log entries kept in Raft
log. It takes a snapshot of the state machine at every N commits and shrinks
the log. N is configurable via
RaftConfig.setCommitCountToTakeSnapshot().
A snapshot is
represented as a list of chunks, where a chunk can be any object provided by the
state machine.
StateMachine contains 2 methods for the snapshotting functionality:
takeSnapshot() and installSnapshot(). Similar to what we did in the previous
part, we will extend OperableAtomicRegister to implement these methods. Since
our atomic register state machine consists of a single value, we just create a
single snapshot chunk object in takeSnapshot(). Dually, to install a snapshot,
we overwrite the value of the atomic register with the value present in the
received snapshot chunk object. MicroRaft guarantees that commit index of an
installed snapshot is always greater than the last commit index observed by the
state machine.
Snapshot support lives inside the state machine contract. The example shows the minimum implementation required to compact the log without losing correctness.
You can also see this class in the MicroRaft GitHub repository.
We have the following test to demonstrate how snapshotting works in MicroRaft.
In createRaftNode(), we configure our Raft nodes to take a new snapshot at
every 100 commits. Similar to what we did in the previous test, we block the
communication between the leader and a follower, and fill up the leader's Raft
log with new commits until it takes a snapshot. When we allow the leader to
communicate with the follower again, the follower catches up with the leader by
transferring the snapshot.
This test demonstrates the operational payoff of snapshots: a lagging follower can recover by receiving compacted state instead of replaying an ever-growing log.
To run this test on your machine, try the following:
$ ./gradlew :microraft-tutorial:test --tests io.microraft.tutorial.SnapshotInstallationTest
You can also see it in the MicroRaft GitHub repository.
Ok. We covered a lot of topics up until this part. There are only a few things left to discuss. In this part, we will see how we can perform changes in Raft group member lists.
MicroRaft supports membership changes in Raft groups via
RaftNode.changeMembership(). Let's first see the rules to realize membership
changes in Raft groups.
Membership changes are appended to the internal Raft log like regular operations, so they still require majority availability to commit.
When a membership change commits, its commit index becomes the group members commit index. Calls to RaftNode.changeMembership() must provide the current one.
MicroRaft allows one membership change at a time. More complex topology changes must be expressed as a sequence of single changes.
Since we know the rules for member list changes now, let's see some code. In our last test, we want to improve our 3-member Raft group's degree of fault tolerance by adding 2 new members (majority quorum size of 3 = 2 -> majority quorum size of 5 = 3).
Membership changes are protocol operations too. This example shows that expanding the cluster safely means tracking commit indices and adding nodes in an ordered sequence.
To run this test on your machine, try the following:
$ ./gradlew :microraft-tutorial:test --tests io.microraft.tutorial.ChangeRaftGroupMemberListTest
You can also see it in the MicroRaft GitHub repository.
In this test, we create a new Raft endpoint, endpoint4, and add it to the Raft
group in the following lines:
RaftEndpoint endpoint4 = LocalRaftEndpoint.newEndpoint();
// group members commit index of the initial Raft group members is 0.
RaftGroupMembers newMemberList1 = leader.changeMembership(endpoint4, MembershipChangeMode.ADD_OR_PROMOTE_TO_FOLLOWER, 0).join().getResult();
System.out.println("New member list: " + newMemberList1.getMembers() + ", majority: " + newMemberList1.getMajorityQuorumSize()
+ ", commit index: " + newMemberList1.getLogIndex());
// endpoint4 is now part of the member list. Let's start its Raft node
RaftNode raftNode4 = createRaftNode(endpoint4);
raftNode4.start();
Please notice that we pass 0 for the third argument to
RaftNode.changeMembership(), because our Raft group operates with the initial
member list whose group members commit index is 0. After this call returns,
endpoint4 is part of the Raft group, so we start its Raft node as well.
Our sysout line here prints the following:
New member list: [LocalRaftEndpoint{id=node1}, LocalRaftEndpoint{id=node2}, LocalRaftEndpoint{id=node3}, LocalRaftEndpoint{id=node4}], majority: 3, commit index: 3
As you see, our Raft group has 4 members now, whose majority quorum size is 3. Actually, running a 4-node Raft group has no advantage over running a 3-node Raft group in terms of the degree of availability because both of them can handle failure of only 1 Raft node. Since we want to achieve better availability, we will add one more Raft node to our Raft group.
So we create another Raft endpoint, endpoint5, add it to the Raft group, and
start its Raft node. However, as the group members commit index parameter, we
pass newMemberList1.getLogIndex() which is the log index endpoint4 is added
to the Raft group. Please note that we could also get the current Raft group
members commit index via RaftNode.getCommittedMembers().
Our second sysout line prints the following:
New member list: [LocalRaftEndpoint{id=node1}, LocalRaftEndpoint{id=node2}, LocalRaftEndpoint{id=node3}, LocalRaftEndpoint{id=node4}, LocalRaftEndpoint{id=node5}], majority: 3, commit index: 5
Now we have 5 nodes in our Raft group with the majority quorum size of 3. It means that now our Raft group can tolerate failure of 2 Raft nodes and still remain operational. Voila!
In this example, for the sake of simplicity, we add new Raft nodes to the Raft
group with MembershipChangeMode.ADD_OR_PROMOTE_TO_FOLLOWER. With this mode,
the Raft node is directly added as a follower, and the majority quorum size
of the Raft group increases to 3. However, in real life use cases, Raft nodes
can contain large state, and it may take some time until the new Raft node
catches up with the leader. Because of this, increasing the majority quorum
size may cause availability gaps if failures occur before the new Raft node
catches up. To prevent this, MicroRaft offers another membership change mode,
MembershipChangeMode.ADD_LEARNER, to add new Raft nodes. With this mode, the
new Raft node is added with the learner role. Learner Raft nodes are excluded
in majority calculations, hence adding a new learner Raft node to the Raft
group does not change the majority quorum size. Once the new learner Raft node
catches up, it can be promoted to the follower role by triggering another
membership change: MembershipChangeMode.ADD_OR_PROMOTE_TO_FOLLOWER.
In the next section, we will see how MicroRaft deals with failures. Just keep calm and carry on!