Tutorial: Building an Atomic Register

In this section, we will build an atomic register on top of MicroRaft to demonstrate how to implement and use MicroRaft's main abstractions. Our atomic register will consist of a single value and only 3 operations: set, compare-and-set, and get. In order to keep things simple, we will not run our Raft group in a distributed setting. Instead, will run each Raft node on a different thread and make them communicate with each other via multi-threaded queues.

If you haven't read the Main Abstractions section yet, I highly recommend you to read that section before this tutorial.

All the code shown here are compiling and available in the MicroRaft Github repository. You can clone the repository and run the code samples on your machine to try each part yourself. I intentionally duplicated a lot of code in the test classes below to put all pieces together so that you can see what is going on without navigating through multiple classes.

Let's crank up the engine!

1. Implementing the Main Abstractions

We will start with writing our RaftEndpoint, StateMachine and Transport classes. We can use the default implementations of RaftNodeExecutor, RaftModel and RaftModelFactory abstractions. Since all of our Raft nodes will run in the same JVM process, we also don't need any serialization logic inside our Transport implementation. Last, we will also skip persistence. Our Raft nodes will keep their state only in memory.

RaftEndpoint

First, we need to implement RaftEndpoint to represent identity of our Raft nodes. Since we don't distribute our Raft nodes to multiple servers in this tutorial, we don't really need IP addresses. We can simply identify our Raft nodes with strings IDs and keep a mapping of unique IDs to Raft nodes so that we can deliver RaftMessage objects to target Raft nodes.

Let's write a LocalRaftEndpoint class as below. We will generate unique Raft endpoints via its static LocalRaftEndpoint.newEndpoint() method.

You can also see this class in the MicroRaft Github repository.

StateMachine

We will implement our atomic register state machine iteratively. In the first iteration, we will create our Raft nodes and elect a leader without committing any atomic register operation. So the first version of our state machine, which is shown below, does not have any execution and snapshotting logic for our atomic register.

We implement StateMachine.getNewTermOperation() in our first version of state machine. This method returns an operation which will be committed every time a new leader is elected. This is actually related to the single-server membership change bug in the Raft consensus algorithm rather than our atomic register logic.

Once we see that we are able to form a Raft group and elect a leader, we will extend this class to implement the missing functionality.

You can also see this class in the MicroRaft Github repository.

Transport

We are almost there to run our first test for bootstrapping a Raft group and electing a leader. The only missing piece is Transport. Recall that Transport is responsible for sending Raft messages to other Raft nodes (serialization and networking). Since our Raft nodes will run in a single JVM process in this tutorial, we will skip serialization. To mimic networking, we will keep a mapping of Raft endpoints to Raft nodes on each Transport object and pass Raft messages to between Raft nodes by using this mapping.

Our LocalTransport class is shown below.

You can also see this class in the MicroRaft Github repository.


2. Bootstrapping the Raft group

Now we have all the required functionality to start our Raft group and elect a leader. Let's write our first test.

To run this test on your machine, try the following:

 $ gh repo clone MicroRaft/MicroRaft
 $ cd MicroRaft && ./mvnw clean test -Dtest=io.microraft.tutorial.LeaderElectionTest -DfailIfNoTests=false -Ptutorial

You can also see it in the MicroRaft Github repository.

Ok. That is a big piece of code, but no worries. We will swallow it one piece at a time.

Before bootstrapping a new Raft group, we first decide on its initial member list, i.e., the list of Raft endpoints. The very same initial Raft group member list must be provided to all Raft nodes, including the ones we will start later and join to our already-running Raft group. It is what we do in the beginning of the class by populating initialMembers with 3 unique Raft endpoints.

startRaftGroup() calls createRaftNode() for each Raft endpoint and then starts the created Raft nodes. When a new Raft node is created, it does not know how to talk to the other Raft nodes. Therefore, we pass created Raft node objects to enableDiscovery() to enable them to talk to each other. This method just adds a given Raft node to the discovery maps of the LocalTransport objects of the other Raft nodes.

Once we create a Raft node via RaftNodeBuilder, its initial status is RaftNodeStatus.INITIAL and it does not execute the Raft consensus algorithm in this status. When RaftNode.start() is called, its status becomes RaftNodeStatus.ACTIVE and the Raft node internally submits a task to its RaftNodeExecutor to check if there is a leader. Since we are starting a new Raft group in this test, obviously there is no leader yet so our Raft nodes will start a new leader election round.

The actual test method is rather short. It calls waitUntilLeaderElected() to get the leader Raft node and asserts that a Raft node is returned as leader. Please note that this method offers the discovery functionality to find the leader. It just waits until all Raft nodes report the same Raft node as the leader.

Let's run this code and see the logs. The logs start like following:

23:54:38.121 INFO - [RaftNode] node2<default> Starting for default with 3 members: [LocalRaftEndpoint{id=node1}, LocalRaftEndpoint{id=node2}, LocalRaftEndpoint{id=node3}]
23:54:38.121 INFO - [RaftNode] node3<default> Starting for default with 3 members: [LocalRaftEndpoint{id=node1}, LocalRaftEndpoint{id=node2}, LocalRaftEndpoint{id=node3}]
23:54:38.121 INFO - [RaftNode] node1<default> Starting for default with 3 members: [LocalRaftEndpoint{id=node1}, LocalRaftEndpoint{id=node2}, LocalRaftEndpoint{id=node3}]
23:54:38.123 INFO - [RaftNode] node2<default> Status is set to ACTIVE
23:54:38.123 INFO - [RaftNode] node3<default> Status is set to ACTIVE
23:54:38.123 INFO - [RaftNode] node1<default> Status is set to ACTIVE
23:54:38.125 INFO - [RaftNode] node3<default> Raft Group Members {groupId: default, size: 3, term: 0, logIndex: 0} [
    node1
    node2
    node3 - FOLLOWER this (ACTIVE)
] reason: STATUS_CHANGE

23:54:38.125 INFO - [RaftNode] node2<default> Raft Group Members {groupId: default, size: 3, term: 0, logIndex: 0} [
    node1
    node2 - FOLLOWER this (ACTIVE)
    node3
] reason: STATUS_CHANGE

23:54:38.125 INFO - [RaftNode] node3<default> started.
23:54:38.125 INFO - [RaftNode] node1<default> Raft Group Members {groupId: default, size: 3, term: 0, logIndex: 0} [
    node1 - FOLLOWER this (ACTIVE)
    node2
    node3
] reason: STATUS_CHANGE

23:54:38.125 INFO - [RaftNode] node2<default> started.
23:54:38.125 INFO - [RaftNode] node1<default> started.

Each RaftNode first prints the id and initial member list of the Raft group. When we call RaftNode.start(), they switch to the RaftNodeStatus.ACTIVE status and print a summary of the Raft group, including the member list. Each node also marks itself in the Raft group member list, and the reason of why the log is printed so that we can follow what is going on.

After this part, our Raft nodes start a leader election. Raft can be very chatty during leader elections. For the sake of simplicity, we will just skip the leader election logs and look at the final status.

23:54:39.141 INFO - [VoteResponseHandler] node3<default> Vote granted from node1 for term: 2, number of votes: 2, majority: 2
23:54:39.141 INFO - [VoteResponseHandler] node3<default> We are the LEADER!
23:54:39.149 INFO - [AppendEntriesRequestHandler] node1<default> Setting leader: node3
23:54:39.149 INFO - [RaftNode] node3<default> Raft Group Members {groupId: default, size: 3, term: 2, logIndex: 0} [
    node1
    node2
    node3 - LEADER this (ACTIVE)
] reason: ROLE_CHANGE

23:54:39.149 INFO - [AppendEntriesRequestHandler] node2<default> Setting leader: node3
23:54:39.149 INFO - [RaftNode] node1<default> Raft Group Members {groupId: default, size: 3, term: 2, logIndex: 0} [
    node1 - FOLLOWER this (ACTIVE)
    node2
    node3 - LEADER
] reason: ROLE_CHANGE

23:54:39.149 INFO - [RaftNode] node2<default> Raft Group Members {groupId: default, size: 3, term: 2, logIndex: 0} [
    node1
    node2 - FOLLOWER this (ACTIVE)
    node3 - LEADER
] reason: ROLE_CHANGE

As you see, we managed to elect our leader in the 2nd term. It means that we had a split-vote situation in the 1st term. This is quite normal because in our test code we start our Raft nodes at the same time and each Raft node just votes for itself during the first term. Please keep in mind that in your run, another Raft node could become the leader.


3. Sending requests

Now it is time to implement the set, compare-and-set, and get operations we talked before for our atomic register state machine. We must ensure that they are implemented in a deterministic way, because it is a fundamental requirement of the replicated state machines approach. MicroRaft guarantees that each Raft node will execute committed operations in the same order and since our operations run deterministically, we know that our state machines will end up with the same state after they execute committed operations.

To implement these operations, we will extend our AtomicRegister class instead of modifying it so that the readers can follow the different stages of this tutorial. The new state machine class is below. We define an interface, AtomicRegisterOperation, as a marker for the operations we will execute on our state machine. There are 3 inner classes implementing this interface for the set, compare-and-set and get operations, and accompanying static methods to create their instances. We will pass an AtomicRegisterOperation object to RaftNode.replicate() and once it is committed by MicroRaft it will be passed to our state machine for execution. Our new state machine class handles these AtomicRegisterOperation objects in the runOperation() method. set updates the atomic register with the given value and returns its previous value, compare-and-set updates the atomic register with the given new value only if its current value is equal to the given current value, and get simply returns the current value of the atomic register. Please note that the snapshotting logic is still missing and will be implemented later in the tutorial.

You can also see this class in the MicroRaft Github repository.

Committing operations

In the next test we will use the new state machine to start our Raft nodes. We will commit a number of operations on the Raft group and verify their results. We replicate 2 set operations, 2 compare-and-set operations, and a get operation at the end. After each operation, we verify that its commit index is greater than the commit index of the previous operation.

$ gh repo clone MicroRaft/MicroRaft
$ cd MicroRaft && ./mvnw clean test -Dtest=io.microraft.tutorial.OperationCommitTest -DfailIfNoTests=false -Ptutorial

You can also see it in the MicroRaft Github repository.

We use RaftNode.replicate() to replicate and commit operations on the Raft group. Most of the Raft node APIs, including RaftNode.replicate(), return CompletableFuture<Ordered> objects. For RaftNode.replicate(), Ordered provides return value of the executed operation and on which Raft log index the operation has been committed.

The output of the first 2 sysout lines are below. Please note that set returns previous value of the atomic register.

1st operation commit index: 2, result: null
2nd operation commit index: 3, result: value1

Then we commit 2 cas operations. The first cas manages to update the atomic register, but the second one fails because the atomic register value is different from the expected value.

3rd operation commit index: 4, result: true
4th operation commit index: 5, result: false

The last operation is a get to read the current value of the atomic register.

5th operation commit index: 6, result: value3

If we call RaftNode.replicate() on a follower or candidate Raft node, the returned CompletableFuture<Ordered> object is simply notified with NotLeaderException, which also provides Raft endpoint of the leader Raft node. I am not going to build an advanced RPC system in front of MicroRaft here, but when we use MicroRaft in a distributed setting, we can build a retry mechanism in the RPC layer to forward a failed operation to the Raft endpoint given in the exception. NotLeaderException may not specify any leader as well, for example if there is an ongoing leader election round, or the Raft node we contacted does not know the leader yet. In this case, our RPC layer could retry the operation on each Raft node in a round robin fashion until it discovers the new leader.

Performing queries

The last operation we committed in our previous test is get, which is actually a query (i.e, read only) operation. Even though it does not mutate the state machine, since we used RaftNode.replicate(), it is appended to the Raft log and committed similar to the other mutating operations. If we had persistence, it means this approach would increase our disk usage unnecessarily because we will persist every new entry in the Raft log, even if it is a query. Actually, this is a sub-optimal approach.

MicroRaft offers a separate API, RaftNode.query(), to handle queries more efficiently. There are 3 policies for queries, each with a different consistency guarantee:

  • QueryPolicy.LINEARIZABLE: We can perform a linearizable query with this policy. MicroRaft employs the optimization described in § 6.4: Processing read-only queries more efficiently of the Raft dissertation to preserve linearizability without growing the internal Raft log. We need to hit the leader Raft node to execute a linearizable query.

  • QueryPolicy.LEADER_LEASE: We can run a query locally on the leader Raft node without talking to the majority. If the called Raft node is not the leader, the returned CompletableFuture<Ordered> object is notified with NotLeaderException.

  • QueryPolicy.BOUNDED_STALENESS We can run a query locally on a follower or a learner Raft node if it has received an AppendEntries RPC from the leader in the last leader heartbeat timeout duration.

  • QueryPolicy.EVENTUAL_CONSISTENCY: We can use this policy to run a query locally on any Raft node independent of its current role. This policy provides the weakest consistency guarantee, but it can help us to distribute our query-workload by utilizing followers. We can also utilize Ordered to preserve a monotonic view of the Raft group state.

MicroRaft also employs leader stickiness and auto-demotion of leaders on loss of majority heartbeats. Leader stickiness means that a follower does not vote for another Raft node before the leader heartbeat timeout elapses after the last received Append Entries or Install Snapshot RPC. Dually, a leader Raft node automatically demotes itself to the follower role if it does not receive Append Entries RPC responses from the majority during the leader heartbeat timeout duration. Along with these techniques, QueryPolicy.LEADER_LEASE can be used for performing linearizable queries without talking to the majority if clock drifts and network delays are bounded. However, bounding clock drifts and network delays is not an easy job. Hence, QueryPolicy.LEADER_LEASE may cause reading stale data if a Raft node still considers itself as the leader because of a clock drift, while the other Raft nodes have elected a new leader and committed new operations. Moreover, QueryPolicy.LEADER_LEASE and QueryPolicy.LINEARIZABLE have the same processing cost since only the leader Raft node runs a given query for both policies. QueryPolicy.LINEARIZABLE guarantees linearizability with an extra RTT latency overhead compared to QueryPolicy.LEADER_LEASE. For these reasons, QueryPolicy.LINEARIZABLE is the recommended policy for linearizable queries, and QueryPolicy.LEADER_LEASE should be used carefully.

Linearizable queries

Ok. Let's write another test to use the query API for get. We will first set a value to the atomic register and then get the value back with a linearizable query. Just ignore the third parameter passed to the RaftNode.query() call for now. We will talk about it in a minute.

To run this test on your machine, try the following:

$ gh repo clone MicroRaft/MicroRaft
$ cd MicroRaft && ./mvnw clean test -Dtest=io.microraft.tutorial.LinearizableQueryTest -DfailIfNoTests=false -Ptutorial

You can also see it in the MicroRaft Github repository.

The output of the sysout lines are below:

set operation commit index: 2
get operation commit index: 2, result: value

As we discussed above, MicroRaft handles linearizable queries without appending a new entry to the Raft log. So our query is executed at the last committed log index. That is why both commit indices are the same in the output.

Monotonic local queries

QueryPolicy.LEADER_LEASE, QueryPolicy.BOUNDED_STALENESS and QueryPolicy.EVENTUAL_CONSISTENCY can be easily used if monotonicity is sufficient for query results. This is where Ordered comes in handy. A client can track commit indices observed via returned Ordered objects and use the greatest observed commit index to preserve monotonicity while issuing a local query to a Raft node. If the local commit index of a Raft node is smaller than the commit index passed to the RaftNode.query() call, the returned CompletableFuture object fails with LaggingCommitIndexException. This exception means that the state observed by the client is more up-to-date than the contacted Raft node's state. In this case, the client can retry its query on another Raft node. Please refer to § 6.4.1 of the Raft dissertation for more details.

We will make a little trick to demonstrate how to maintain the monotonicity of the observed Raft group state for the local query policies. Recall that LocalTransport keeps a discovery map to send Raft messages to target Raft nodes. When we create a new Raft node, we add it to the other Raft nodes' discovery maps. This time, we will do exactly the reverse to block the communication between the leader and the follower. Let's add the following method to our LocalTransport class:

public void undiscoverNode(RaftNode node) {
    RaftEndpoint endpoint = node.getLocalEndpoint();
    if (localEndpoint.equals(endpoint)) {
        throw new IllegalArgumentException(localEndpoint + " cannot undiscover itself!");
    }

    nodes.remove(node.getLocalEndpoint(), node);
}

We use this method in our new test below. We block the communication between the leader and follower after we set a value to the atomic register. Then we set another value which will not be replicated to the disconnected follower. After this step, we issue a query on the leader. We are also tracking the commit index we observed. Now, we switch to the disconnected follower and issue a new local query by passing the last observed commit index. Since the disconnected follower does not have the second commit, it cannot satisfy the monotonicity we demand, hence our query fails with LaggingCommitIndexException.

To run this test on your machine, try the following:

$ gh repo clone MicroRaft/MicroRaft
$ cd MicroRaft && ./mvnw clean test -Dtest=io.microraft.tutorial.MonotonicLocalQueryTest -DfailIfNoTests=false -Ptutorial

You can also see it in the MicroRaft Github repository.


4. Snapshotting

Now it is time to implement the missing snapshotting functionality in our atomic register state machine. Let's talk about snapshotting first. The Raft log grows during the lifecycle of the Raft group as more operations are committed. However, in a real-world system we cannot allow it to grow unboundedly. As the Raft log grows longer, it will consume more space both in memory and disk, and cause a lagging follower to catch up with the majority in a longer duration. Raft solves this problem by taking a snapshot of the state machine at current commit index and discarding all log entries up to it. MicroRaft implements snapshotting by putting an upper bound on the number of log entries kept in Raft log. It takes a snapshot of the state machine at every N commits and shrinks the log. N is configurable via RaftConfig.setCommitCountToTakeSnapshot(). A snapshot is represented as a list of chunks, where a chunk can be any object provided by the state machine.

StateMachine contains 2 methods for the snapshotting functionality: takeSnapshot() and installSnapshot(). Similar to what we did in the previous part, we will extend OperableAtomicRegister to implement these methods. Since our atomic register state machine consists of a single value, we just create a single snapshot chunk object in takeSnapshot(). Dually, to install a snapshot, we overwrite the value of the atomic register with the value present in the received snapshot chunk object. MicroRaft guarantees that commit index of an installed snapshot is always greater than the last commit index observed by the state machine.

You can also see this class in the MicroRaft Github repository.

We have the following test to demonstrate how snapshotting works in MicroRaft. In createRaftNode(), we configure our Raft nodes to take a new snapshot at every 100 commits. Similar to what we did in the previous test, we block the communication between the leader and a follower, and fill up the leader's Raft log with new commits until it takes a snapshot. When we allow the leader to communicate with the follower again, the follower catches up with the leader by transferring the snapshot.

To run this test on your machine, try the following:

$ gh repo clone MicroRaft/MicroRaft
$ cd MicroRaft && ./mvnw clean test -Dtest=io.microraft.tutorial.SnapshotInstallationTest -DfailIfNoTests=false -Ptutorial

You can also see it in the MicroRaft Github repository.


5. Extending the Raft group

Ok. We covered a lot of topics up until this part. There are only a few things left to discuss. In this part, we will see how we can perform changes in Raft group member lists.

Changing the Raft group member list

MicroRaft supports membership changes in Raft groups via RaftNode.changeMembership(). Let's first see the rules to realize membership changes in Raft groups.

Raft group membership changes are appended to the internal Raft log as regular log entries and committed similar to user-supplied operations. Therefore, Raft group membership changes require the majority of the Raft group to be operational.

When a membership change is committed, its commit index is used to denote the new member list of the Raft group and called group members commit index. Relatedly, when a membership change is triggered via RaftNode.changeMembership(), the current group members commit index must be provided.

Last, MicroRaft allows one membership change at a time in a Raft group and more complex changes must be applied as a series of single changes.

Since we know the rules for member list changes now, let's see some code. In our last test, we want to improve our 3-member Raft group's degree of fault tolerance by adding 2 new members (majority quorum size of 3 = 2 -> majority quorum size of 5 = 3).

To run this test on your machine, try the following:

$ gh repo clone MicroRaft/MicroRaft
$ cd MicroRaft && ./mvnw clean test -Dtest=io.microraft.tutorial.ChangeRaftGroupMemberListTest -DfailIfNoTests=false -Ptutorial

You can also see it in the MicroRaft Github repository.

In this test, we create a new Raft endpoint, endpoint4, and add it to the Raft group in the following lines:

RaftEndpoint endpoint4 = LocalRaftEndpoint.newEndpoint();
// group members commit index of the initial Raft group members is 0.
RaftGroupMembers newMemberList1 = leader.changeMembership(endpoint4, MembershipChangeMode.ADD_OR_PROMOTE_TO_FOLLOWER, 0).join().getResult();
System.out.println("New member list: " + newMemberList1.getMembers() + ", majority: " + newMemberList1.getMajorityQuorumSize()
                           + ", commit index: " + newMemberList1.getLogIndex());

// endpoint4 is now part of the member list. Let's start its Raft node
RaftNode raftNode4 = createRaftNode(endpoint4);
raftNode4.start();

Please notice that we pass 0 for the third argument to RaftNode.changeMembership(), because our Raft group operates with the initial member list whose group members commit index is 0. After this call returns, endpoint4 is part of the Raft group, so we start its Raft node as well.

Our sysout line here prints the following:

New member list: [LocalRaftEndpoint{id=node1}, LocalRaftEndpoint{id=node2}, LocalRaftEndpoint{id=node3}, LocalRaftEndpoint{id=node4}], majority: 3, commit index: 3

As you see, our Raft group has 4 members now, whose majority quorum size is 3. Actually, running a 4-node Raft group has no advantage over running a 3-node Raft group in terms of the degree of availability because both of them can handle failure of only 1 Raft node. Since we want to achieve better availability, we will add one more Raft node to our Raft group.

So we create another Raft endpoint, endpoint5, add it to the Raft group, and start its Raft node. However, as the group members commit index parameter, we pass newMemberList1.getLogIndex() which is the log index endpoint4 is added to the Raft group. Please note that we could also get the current Raft group members commit index via RaftNode.getCommittedMembers().

Our second sysout line prints the following:

New member list: [LocalRaftEndpoint{id=node1}, LocalRaftEndpoint{id=node2}, LocalRaftEndpoint{id=node3}, LocalRaftEndpoint{id=node4}, LocalRaftEndpoint{id=node5}], majority: 3, commit index: 5

Now we have 5 nodes in our Raft group with the majority quorum size of 3. It means that now our Raft group can tolerate failure of 2 Raft nodes and still remain operational. Voila!

In this example, for the sake of simplicity, we add new Raft nodes to the Raft group with MembershipChangeMode.ADD_OR_PROMOTE_TO_FOLLOWER. With this mode, the Raft node is directly added as a follower, and the majority quorum size of the Raft group increases to 3. However, in real life use cases, Raft nodes can contain large state, and it may take some time until the new Raft node catches up with the leader. Because of this, increasing the majority quorum size may cause availability gaps if failures occur before the new Raft node catches up. To prevent this, MicroRaft offers another membership change mode, MembershipChangeMode.ADD_LEARNER, to add new Raft nodes. With this mode, the new Raft node is added with the learner role. Learner Raft nodes are excluded in majority calculations, hence adding a new learner Raft node to the Raft group does not change the majority quorum size. Once the new learner Raft node catches up, it can be promoted to the follower role by triggering another membership change: MembershipChangeMode.ADD_OR_PROMOTE_TO_FOLLOWER.


What's next?

In the next section, we will see how MicroRaft deals with failures. Just keep calm and carry on!