Implementing a Distributed Message Queue with the Raft Consensus Algorithm in Go
This guide is a walkthrough of an implementation of a distributed message queue using the Raft consensus algorithm in Go.
I aim to address the following:
- Why are Distributed Systems important?
- What is the Raft Concensus Algorithm?
- An implementation of a distributed message queue using the Raft consensus algorithm in Go
Sources of Inspiration
This is heavily inspired by the following projects / talks / resources:
- GopherCon 2023: Philip O'Toole - Build Your Own Distributed System Using Go
- Raft Consensus Algorithm
- Golang Implementation of the Raft Consensus Protocol
- Raft backend implementation using BoltDB
- A reference use of Hashicorp's Raft implementation
Full Implementation: https://github.com/kavinaravind/go-raft-message-queue
Why are Distributed Systems important?
Philip O'Toole had a fantastic talk on distributed systems using Go which this walkthrough is heavily inspired by. He illustrated the importance of distributed computing; Here is a rundown of his points:
Distributed systems are the backbone of modern computing, particulary webscale, because the world demands:
- Uptime (available and functional 24 hours a day, 7 days a week, without interruptions)
- Fault Tolerance (the ability of a system to continue operating correctly in the event of the failure of some of its components)
- Scale (as demand increases, the system can accommodate this growth efficiently without performance degradation)
As developers, designers, and computer scientists, we have decided that distributed systems are the answer to these issues.
It is not entirely apparent, but much of the popular software that we use today rely on distributed systems:
Understanding how they are built and the tradeoffs you need to make will make you a better programmer, a better designer, and a better operator of those systems.
Complexities of Distributed Systems
There is a real challenge with correctly replicating data reliably, consistently, and quickly.
This problem is known as Distributed Consensus.
One solution that has solved this is a protocol called Raft. The aim of Raft was to create a protocol that was easy to understand and encourage system builders and programmers to build more effective and reliable distributed systems.
Core Raft Concepts
Raft Log
Every node in a Raft cluster agrees on a set of events and a sequence of events. The whole point of the raft consensus protocol is to make this log the same on every machine. Raft doesn't give importance to what is in the log, and what each entry in the log represents; It is up to you. All Raft will do is make sure that each of these entries are applied to whatever state machine (database / storage system, file system, etc).
(Philip O'Toole Talk)
Snapshotting
Raft defines Snapshotting to ensure that the Raft Log doesn't grow without bound. Every so often, Raft will ask your system for an encapsulation of the entire state of your state machine and it will take that encapsulated state and delete every log entry that has already been represented in that snapshot. This allows for the truncation of the log, and now the state that Raft has is your snapshot and any log entries that have not been applied.
(Philip O'Toole Talk)
Leader / Follower Paradigm
Raft is a leader oriented protocol. It defines a leader election process so there is one and only one leader in the cluster. This node is responsible for making sure the log is exactly the same on each node. Nodes that are not leaders are followers. Raft also defines how we detect when a node has failed via heartbeating.
(Philip O'Toole Talk)
At a high level, this is what Raft solves:
Concurrent Updates | Consistency | Fault Tolerance |
---|---|---|
Only the leader can make changes | Raft log is key | Only a quorum needs to agree |
No "eventually consistent" results | Failure-detection via heartbeating | |
Leader Election |
- Raft provides a solution to the distributed consensus problem
- It abstracts away tons of the details
- It allows you to define what state machine you want to have replicated in a clean manner
Implementation in Go
You now have a general understanding on why distributed systems matter and at a high level understand the Raft protocol and how it solves the distributed consensus problem.
Now lets walk through an end-to-end implemetation in Golang of a distributed message queue using the Raft consensus protocol.
One of the best well-known implementations of the Raft protocol in Go is from Hashicorp. This implementation is currently powering Consul
and Nomad
, has great documention, and is also very customizable.
We will be using this library to manage the replicated log, snapshots, and leader election.
There will also be various packages involved, here is a rundown of each of them:
Package | Summary |
---|---|
model | A sample struct that will be pushed and popped to the queue |
ds | An implementation of a generic queue with support for concurrency |
consensus | Constructs a new Raft node, bootstraps the cluster, and logic for nodes to join the cluster |
store | Initializes the consensus module, implements the raft FSM interface |
server | Initializes the restful API server with various routes and handlers |
main | Reads in command line arguments, starts HTTP server, starts raft node |
Package model
Lets start by first defining the data we would like to push and pull to a generic queue:
package model
import "time"
// Comment is the model for a comment
type Comment struct {
Timestamp *time.Time `json:"timestamp,omitempty"`
Author string `json:"author,omitempty"`
Content string `json:"content,omitempty"`
}
In the example above, we have defined a comment that includes a timestamp, the author of the comment, and the actual message. This can be really anything you like, but for simplicity, we will be mimicking a basic chat.
Package ds
Now lets define the queue itself:
package ds
import "sync"
// Message is a generic message type
type Message[T any] struct {
Data T
}
// Queue is a generic queue type
type Queue[T any] struct {
Messages []Message[T]
lock sync.RWMutex
}
// NewQueue creates a new instance of the Queue
func NewQueue[T any]() *Queue[T] {
return &Queue[T]{}
}
// Enqueue is used to add a message to the queue
func (q *Queue[T]) Enqueue(message Message[T]) {
q.lock.Lock()
defer q.lock.Unlock()
q.Messages = append(q.Messages, message)
}
// Dequeue is used to remove a message from the queue
func (q *Queue[T]) Dequeue() (Message[T], bool) {
q.lock.Lock()
defer q.lock.Unlock()
if len(q.Messages) == 0 {
return Message[T]{}, false
}
message := q.Messages[0]
q.Messages = q.Messages[1:]
return message, true
}
// Copy is used to create a copy of the queue
func (q *Queue[T]) Copy() *Queue[T] {
q.lock.RLock()
defer q.lock.RUnlock()
copy := NewQueue[T]()
copy.Messages = append(copy.Messages, q.Messages...)
return copy
}
You can see that this is a very simple generic implementation of a queue with a mutual exclusion lock for thread-safety.
We define a Message
struct with a Data
field where we will be using Comment
described earlier. We then define a Queue
struct which contains a slice of Messages
along with the mutex lock. We then have methods to push to the queue, pop from the queue, and copy the queue itself.
An important piece to note here is that Copy
is essential for Snapshotting where we want an encapsulation of the entire state.
Also, we are not sharding the queue; every single node that is a part of this cluster will have a complete copy of the queue.
Package consensus
We now get into the consensus
module where we will be leveraging the Hashicorp Raft library to construct a new Raft Node.
package consensus
import (
"fmt"
"net"
"os"
"path/filepath"
"time"
"github.com/hashicorp/raft"
raftboltdb "github.com/hashicorp/raft-boltdb/v2"
)
// Consensus is the consensus module
type Consensus struct {
Node *raft.Raft
}
// Config is the configuration for the consensus module
type Config struct {
// IsLeader is a flag that indicates if the server is the leader
IsLeader bool
// ServerID is the unique identifier for the server
ServerID string
// BaseDirectory is the directory where the raft data will be stored
BaseDirectory string
// Address is the address at which the server will be listening
Address string
}
// NewConsensusConfig creates a new consensus config
func NewConsensusConfig() *Config {
return &Config{}
}
// NewConsensus creates a new instance of the consensus module
func NewConsensus(fsm raft.FSM, conf *Config) (*Consensus, error) {
// Create the raft configuration
config := raft.DefaultConfig()
config.LocalID = raft.ServerID(conf.ServerID)
// Set the snapshot interval to 1 second and the snapshot threshold to 1
// so a snapshot is taken after every log entry for testing
// config.SnapshotInterval = 1 * time.Second
// config.SnapshotThreshold = 1
// Create the raft store
store, err := raftboltdb.NewBoltStore(filepath.Join(conf.BaseDirectory, "raft.db"))
if err != nil {
return nil, err
}
logStore, stableStore := store, store
// Create the snapshot store
snapshotStore, err := raft.NewFileSnapshotStore(conf.BaseDirectory, 2, os.Stderr)
if err != nil {
return nil, err
}
// Create the transport
address, err := net.ResolveTCPAddr("tcp", conf.Address)
if err != nil {
return nil, err
}
transport, err := raft.NewTCPTransport(conf.Address, address, 3, 10*time.Second, os.Stderr)
if err != nil {
return nil, err
}
// Create the raft node
node, err := raft.NewRaft(config, fsm, logStore, stableStore, snapshotStore, transport)
if err != nil {
return nil, err
}
// If the server is the leader, bootstrap the cluster
if conf.IsLeader {
configuration := raft.Configuration{
Servers: []raft.Server{
{
ID: raft.ServerID(conf.ServerID),
Address: raft.ServerAddress(conf.Address),
},
},
}
node.BootstrapCluster(configuration)
}
return &Consensus{Node: node}, nil
}
// Join joins the raft cluster
func (c *Consensus) Join(nodeID, address string) error {
configFuture := c.Node.GetConfiguration()
if err := configFuture.Error(); err != nil {
return err
}
for _, server := range configFuture.Configuration().Servers {
// The node is already part of the cluster
if server.ID == raft.ServerID(nodeID) && server.Address == raft.ServerAddress(address) {
return nil
}
// There's a node with the same ID or address, remove it first
if server.ID == raft.ServerID(nodeID) || server.Address == raft.ServerAddress(address) {
future := c.Node.RemoveServer(server.ID, 0, 0)
if err := future.Error(); err != nil {
return fmt.Errorf("error removing existing node %s at %s: %s", nodeID, address, err)
}
}
}
// Add the new node as a voter
f := c.Node.AddVoter(raft.ServerID(nodeID), raft.ServerAddress(address), 0, 0)
if f.Error() != nil {
return f.Error()
}
return nil
}
// WaitForNodeToBeLeader waits for the node to become the leader
func (c *Consensus) WaitForNodeToBeLeader(duration time.Duration) error {
timeout := time.After(duration)
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-timeout:
return fmt.Errorf("timed out waiting for node to be leader")
case <-ticker.C:
if c.Node.State() == raft.Leader {
return nil
}
}
}
}
This may be a bit overwhelming at first, but lets highlight the main pieces:
- A
Consensus
struct is defined which contains a pointer to Hashicorpsraft.Node
- A
Config
struct is defined containing fields essential for the setup of the raft node raft.NewRaft
takes in 6 parameters:*raft.Config
:DefaultConfig
will return a Config with usable defaultsraft.FSM
: Implemented and explained in thestore
package (see later section)raft.LogStore
: Used to store the Raft lograft.StableStore
: Used to provide stable storageraft.SnapshotStore
: Used to store the snapshotsraft.Transport
: A network transport to interace with other nodes
- We are using Hashicorps Bolt Store which implements
raft.LogStore
andraft.StableStore
BootstrapCluster
will be invoked if the server is the leader which will set up the raft node- The
Join
method will join a new node into an existing cluster. We useAddVoter
to go about this. - The
WaitForNodeToBeLeader
method is a helper function as it takes a bit of time for the node to boot up
At a high level, the NewConsensus
function will pull in configuration details, setup the log store, stable store, snapshot store, transport, and
initialize the raft node. There are also 2 other methods, where Join
will join a new node into an existing cluster, and WaitForNodeToBeLeader
which
is a helper function that will wait for a raft node to initialize.
Package store
Package store
contains all of the relevant methods to send and recieve messages as well as implementing Rafts FSM interface
.
Here is the declaration of that interface:
// FSM is implemented by clients to make use of the replicated log.
type FSM interface {
// Apply is called once a log entry is committed by a majority of the cluster.
//
// Apply should apply the log to the FSM. Apply must be deterministic and
// produce the same result on all peers in the cluster.
//
// The returned value is returned to the client as the ApplyFuture.Response.
Apply(*Log) interface{}
// Snapshot returns an FSMSnapshot used to: support log compaction, to
// restore the FSM to a previous state, or to bring out-of-date followers up
// to a recent log index.
//
// The Snapshot implementation should return quickly, because Apply can not
// be called while Snapshot is running. Generally this means Snapshot should
// only capture a pointer to the state, and any expensive IO should happen
// as part of FSMSnapshot.Persist.
//
// Apply and Snapshot are always called from the same thread, but Apply will
// be called concurrently with FSMSnapshot.Persist. This means the FSM should
// be implemented to allow for concurrent updates while a snapshot is happening.
Snapshot() (FSMSnapshot, error)
// Restore is used to restore an FSM from a snapshot. It is not called
// concurrently with any other command. The FSM must discard all previous
// state before restoring the snapshot.
Restore(snapshot io.ReadCloser) error
}
We need to address the following needs:
- How do we Apply changes to the queue? Based on the command type (send / recieve), we can enqueue or dequeue from the queue respectively.
- How do we Snapshot the state machine? Well we can do this by getting a copy of the queue from the leader node.
- How do we Restore the state machine to a node? Snapshots are encoded using
gob
(shown later in this section) so we can decode the queue and set the state.
package store
import (
"context"
"encoding/gob"
"encoding/json"
"errors"
"fmt"
"io"
"log/slog"
"time"
"github.com/hashicorp/raft"
"github.com/kavinaravind/go-raft-message-queue/consensus"
"github.com/kavinaravind/go-raft-message-queue/ds"
)
// Specific operations that can be applied to the store
const (
Send = iota
Recieve
)
// command is used to represent the command that will be applied to the store
type command[T any] struct {
Operation int `json:"operation"`
Message ds.Message[T] `json:"message"`
}
// newCommand is used to create a new command instance
func newCommand[T any](operation int, message ds.Message[T]) *command[T] {
return &command[T]{
Operation: operation,
Message: message,
}
}
type Store[T any] struct {
// ds that will be distributed across each node
queue *ds.Queue[T]
// consensus instance that will be used to replicate the ds
consensus *consensus.Consensus
// logger instance
logger *slog.Logger
}
// NewStore creates a new store instance with the given logger
func NewStore[T any](logger *slog.Logger) *Store[T] {
return &Store[T]{
queue: ds.NewQueue[T](),
logger: logger,
}
}
// Initialize is used to initialize the store with the given config
func (s *Store[T]) Initialize(ctx context.Context, conf *consensus.Config) (chan struct{}, error) {
s.logger.Info("Initializing store")
consensus, err := consensus.NewConsensus(s, conf)
if err != nil {
return nil, err
}
s.consensus = consensus
// Listen for context cancellation and shutdown the server
shutdownComplete := make(chan struct{})
go func() {
<-ctx.Done()
future := s.consensus.Node.Shutdown()
if err := future.Error(); err != nil {
s.logger.Error("Failed to shutdown node", "error", err)
} else {
s.logger.Info("Node shutdown")
}
close(shutdownComplete)
}()
return shutdownComplete, nil
}
// Send is used to enqueue a message into the queue
func (s *Store[T]) Send(data T) error {
if s.consensus.Node.State() != raft.Leader {
return errors.New("not the leader")
}
c := newCommand[T](Send, ds.Message[T]{Data: data})
bytes, err := json.Marshal(c)
if err != nil {
return err
}
future := s.consensus.Node.Apply(bytes, 10*time.Second)
return future.Error()
}
// Recieve is used to dequeue a message from the queue
func (s *Store[T]) Recieve() (*ds.Message[T], error) {
if s.consensus.Node.State() != raft.Leader {
return nil, errors.New("node is not the leader")
}
c := newCommand[T](Recieve, ds.Message[T]{})
bytes, err := json.Marshal(c)
if err != nil {
s.logger.Error("failed to marshal message", "error", err)
return nil, err
}
future := s.consensus.Node.Apply(bytes, 10*time.Second)
if err := future.Error(); err != nil {
s.logger.Error("failed to apply message", "error", err)
return nil, err
}
switch response := future.Response().(type) {
case nil:
// The Apply method returned an empty response
return &ds.Message[T]{}, nil
case ds.Message[T]:
// The Apply method returned a message
return &response, nil
default:
// The Apply method returned an unexpected type
return nil, fmt.Errorf("unexpected response type: %T", response)
}
}
// Stats is used to return the stats of the raft instance
func (s *Store[T]) Stats() map[string]string {
return s.consensus.Node.Stats()
}
// Join is used to join a remote node to the raft cluster
func (s *Store[T]) Join(nodeID, address string) error {
s.logger.Info(fmt.Sprintf("received join request for remote node %s at %s", nodeID, address))
return s.consensus.Join(nodeID, address)
}
// implement the raft fsm interface
// Apply is used to apply a log entry to the store
func (s *Store[T]) Apply(log *raft.Log) interface{} {
var command command[T]
err := json.Unmarshal(log.Data, &command)
if err != nil {
s.logger.Error("failed to unmarshal message", "error", err)
return err
}
switch command.Operation {
case Send:
s.queue.Enqueue(command.Message)
return nil
case Recieve:
val, ok := s.queue.Dequeue()
// If the queue is empty, return nil
if !ok {
return nil
}
return val
default:
return fmt.Errorf("unknown operation: %v", command.Operation)
}
}
// Snapshot is used to create a snapshot of the store
func (s *Store[T]) Snapshot() (raft.FSMSnapshot, error) {
return &Snapshot[T]{
queue: s.queue.Copy(),
}, nil
}
// Restore is used to restore the store from a snapshot
func (s *Store[T]) Restore(rc io.ReadCloser) error {
defer rc.Close()
dec := gob.NewDecoder(rc)
// Decode the entire queue
var queue ds.Queue[T]
if err := dec.Decode(&queue); err != nil {
return fmt.Errorf("failed to decode queue: %w", err)
}
s.queue = &queue
return nil
}
// WaitForNodeToBeLeader is used to wait for the node to become the leader
func (s *Store[T]) WaitForNodeToBeLeader(duration time.Duration) error {
return s.consensus.WaitForNodeToBeLeader(duration)
}
This bit of code can also be a bit overwhelming. Lets break it down:
- The two operations we have support for are sending a message to the queue and recieving a message from the queue. These are defined as the commands
Send
andRecieve
. - We define a
command
struct which denotes the type of operation as well the message itself. This wil be used when sending, recieving, and applying messages. - We define a
Store
struct which contains a pointer to the queue itself, a pointer the consensus module that points to a running Raft node instance, and a logger. - The
Initialize
method will initiliaze the Raft Node and spawn a go routine that handles Shutdown. - The
Send
andRecieve
methods are exported and will be invoked by the server during a RESTful call. - The
Stats
method is a sanity call to retrieve Raft.Node metadata for debugging purposes. - The
Apply
,Snapshot
,Restore
methods are needed to implement the Raft FSM interface detailed before this code snippet.
At a high level, this package starts the Raft.Node
and exposes APIs that push and pop messages in a distributed manner by leveraging the Raft.Node and implements the Raft FSM
to
Apply
changes to the queue based on the operation type, snapshot the queue via a slice copy, and restore the queue via a gob
decode.
We also need to implement the Raft FSMSnapshot
detailed here:
// FSMSnapshot is returned by an FSM in response to a Snapshot
// It must be safe to invoke FSMSnapshot methods with concurrent
// calls to Apply.
type FSMSnapshot interface {
// Persist should dump all necessary state to the WriteCloser 'sink',
// and call sink.Close() when finished or call sink.Cancel() on error.
Persist(sink SnapshotSink) error
// Release is invoked when we are finished with the snapshot.
Release()
}
This comprises of logic to store the fsm to a snapshot sink. To tie it into this example, we need a way to serialize the queue and write it to sink.
The logic below demonstrates how to go about that:
package store
import (
"encoding/gob"
"github.com/hashicorp/raft"
"github.com/kavinaravind/go-raft-message-queue/ds"
)
// Snapshot is used to create a snapshot of the queue
type Snapshot[T any] struct {
queue *ds.Queue[T]
}
// Persist is used to persist the snapshot to the sink
func (s *Snapshot[T]) Persist(sink raft.SnapshotSink) error {
err := func() error {
// Create a gob encoder and encode the queue
enc := gob.NewEncoder(sink)
if err := enc.Encode(s.queue); err != nil {
return err
}
return nil
}()
// If there was an error, cancel the sink and return the error
if err != nil {
sink.Cancel()
return err
}
// Otherwise, close the sink and return any errors
return sink.Close()
}
// Release is used to release any resources acquired during the snapshot
// In this case, we don't have any resources to clean up (noop)
func (s *Snapshot[T]) Release() {}
As described before, we used gob
to decode the queue. This is because when snapshotting, we are using gob
to encode the queue.
In the Persist
method, we are encoding the queue and writing to sink.
The other method that is needed to implement this interface is Release
which is used to release any resources acquired during the snapshot.
In this case, we don't have any resources to clean up so this is a noop.
One of the most important function calls from the Raft library is Apply
:
// Apply is used to apply a command to the FSM in a highly consistent
// manner. This returns a future that can be used to wait on the application.
// An optional timeout can be provided to limit the amount of time we wait
// for the command to be started. This must be run on the leader or it
// will fail.
//
// If the node discovers it is no longer the leader while applying the command,
// it will return ErrLeadershipLost. There is no way to guarantee whether the
// write succeeded or failed in this case. For example, if the leader is
// partitioned it can't know if a quorum of followers wrote the log to disk. If
// at least one did, it may survive into the next leader's term.
//
// If a user snapshot is restored while the command is in-flight, an
// ErrAbortedByRestore is returned. In this case the write effectively failed
// since its effects will not be present in the FSM after the restore.
func (r *Raft) Apply(cmd []byte, timeout time.Duration) ApplyFuture {
return r.ApplyLog(Log{Data: cmd}, timeout)
}
So in other words, we aren't pushing or popping directly to the queue, we are calling Apply
for the Raft.Node
to Apply
a change in a consistent manner.
We then recieve a future
which represents an action that may occur in the future.
Package server
Now lets define RESTful endpoints that we can use to interact with the store
and by extension the Raft.Node
. This package is responsible for
running the HTTP server and registering all the routes with its corresponding handlers.
package server
import (
"context"
"encoding/json"
"log/slog"
"net/http"
"os"
"github.com/kavinaravind/go-raft-message-queue/model"
"github.com/kavinaravind/go-raft-message-queue/store"
)
// Config is the configuration for the server
type Config struct {
// Address is the address at which the server will be listening
Address string
}
// NewServerConfig creates a new server config
func NewServerConfig() *Config {
return &Config{}
}
// Server is a simple HTTP server that listens for incoming requests
type Server struct {
// httpServer is the underlying HTTP server
httpServer *http.Server
// store is the store instance
store *store.Store[model.Comment]
// logger is the logger instance
logger *slog.Logger
}
// NewServer creates a new instance of the Server
func NewServer(store *store.Store[model.Comment], logger *slog.Logger) *Server {
return &Server{
store: store,
logger: logger,
}
}
// Initialize starts the HTTP server
func (s *Server) Initialize(ctx context.Context, conf *Config) chan struct{} {
s.logger.Info("Initializing server")
mux := http.NewServeMux()
// Register the handlers
mux.HandleFunc("/send", s.handleSend)
mux.HandleFunc("/recieve", s.handleRecieve)
mux.HandleFunc("/stats", s.handleStats)
mux.HandleFunc("/join", s.handleJoin)
// Create the HTTP server
s.httpServer = &http.Server{
Addr: conf.Address,
Handler: mux,
}
// Start the HTTP server
go func() {
if err := s.httpServer.ListenAndServe(); err != http.ErrServerClosed {
s.logger.Error("Failed to start server", "error", err)
os.Exit(1)
}
}()
// Listen for context cancellation and shutdown the server
shutdownComplete := make(chan struct{})
go func() {
<-ctx.Done()
if err := s.httpServer.Shutdown(ctx); err != nil {
s.logger.Error("Failed to shutdown server", "error", err)
} else {
s.logger.Info("Server shutdown")
}
close(shutdownComplete)
}()
return shutdownComplete
}
// handleSend is the handler for sending a message
func (s *Server) handleSend(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "Invalid request method", http.StatusMethodNotAllowed)
return
}
var message model.Comment
if err := json.NewDecoder(r.Body).Decode(&message); err != nil {
http.Error(w, "Failed to decode message", http.StatusBadRequest)
return
}
if err := s.store.Send(message); err != nil {
http.Error(w, "Failed to send message", http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusCreated)
}
// handleRecieve is the handler for recieving a message
func (s *Server) handleRecieve(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
http.Error(w, "Invalid request method", http.StatusMethodNotAllowed)
return
}
message, err := s.store.Recieve()
if err != nil {
http.Error(w, "Failed to recieve message", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
err = json.NewEncoder(w).Encode(message)
if err != nil {
http.Error(w, "Failed to encode message", http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusOK)
}
// handleStats is the handler for getting the stats of the store
func (s *Server) handleStats(w http.ResponseWriter, r *http.Request) {
stats := s.store.Stats()
w.Header().Set("Content-Type", "application/json")
err := json.NewEncoder(w).Encode(stats)
if err != nil {
http.Error(w, "Failed to encode message", http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusOK)
}
// handleJoin is the handler for joining a remote node to the cluster
func (s *Server) handleJoin(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "Invalid request method", http.StatusMethodNotAllowed)
return
}
body := map[string]string{}
if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
http.Error(w, "Failed to decode body", http.StatusBadRequest)
return
}
if len(body) != 2 {
w.WriteHeader(http.StatusBadRequest)
return
}
remoteAddress, ok := body["address"]
if !ok {
w.WriteHeader(http.StatusBadRequest)
return
}
nodeID, ok := body["id"]
if !ok {
w.WriteHeader(http.StatusBadRequest)
return
}
if err := s.store.Join(nodeID, remoteAddress); err != nil {
http.Error(w, "Failed to join cluster", http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusCreated)
}
Similar to other sections, lets break down this package from the top down:
- The
Config
struct defines essential details that the server needs. In this case, it is the address at which the server will be listening - The
Server
struct holds the http server, the store (comprising of pointers to the consensus module and queue), and a logger - The
Initialize
method sets up a new mux, registers the routes with handlers, and starts the server - Lets define each route:
/send
(handleSend
): A POST request that will Apply the send operation and trigger the push of the message to the queue on all Raft nodes/recieve
(handleRecieve
): A GET request that will Apply the recieve operation and fetch the message from the queue on the leader Raft node/stats
(handleStats
): A GET request that will output Raft statistic metadata/join
(handleJoin
): A POST request that will join a remote node to the cluster
At a high level, we have just defined handlers and routes that will push and pull from the queue, gather Raft node stats, and join a given node to the Raft cluster. We also start the HTTP server with logic to shutdown.
Package main
The last package! You've almost made it!
The main
package is responsible for parsing various input flags (remember those configs for consensus
and server
that were defined earlier?)
and starting the http server and raft node.
package main
import (
"bytes"
"context"
"encoding/json"
"flag"
"fmt"
"log/slog"
"net/http"
"os"
"os/signal"
"syscall"
"github.com/kavinaravind/go-raft-message-queue/consensus"
"github.com/kavinaravind/go-raft-message-queue/model"
"github.com/kavinaravind/go-raft-message-queue/server"
"github.com/kavinaravind/go-raft-message-queue/store"
)
type config struct {
JoinAddress string
Concensus *consensus.Config
Server *server.Config
}
func newConfig() *config {
return &config{
Concensus: consensus.NewConsensusConfig(),
Server: server.NewServerConfig(),
}
}
var conf *config
func init() {
conf = newConfig()
// Concensus Specific Flags
flag.BoolVar(&conf.Concensus.IsLeader, "leader", false, "Set to true if this node is the leader")
flag.StringVar(&conf.Concensus.ServerID, "id", "", "The unique identifier for this server")
flag.StringVar(&conf.Concensus.Address, "raddr", "localhost:3001", "The address that the Raft consensus group should use")
flag.StringVar(&conf.Concensus.BaseDirectory, "dir", "/tmp", "The base directory for storing Raft data")
flag.StringVar(&conf.JoinAddress, "paddr", "", "The address of an existing node to join")
// Server Specific Flags
flag.StringVar(&conf.Server.Address, "haddr", "localhost:3000", "The address that the HTTP server should use")
// Set Usage Details
flag.Usage = func() {
fmt.Fprintf(os.Stderr, "Usage of %s:\n", os.Args[0])
flag.PrintDefaults()
}
}
func main() {
flag.Parse()
logger := slog.Default()
if conf.Concensus.ServerID == "" {
logger.Error("The -id flag is required")
os.Exit(2)
}
// Create the base directory if it does not exist
if err := os.MkdirAll(conf.Concensus.BaseDirectory, 0755); err != nil {
logger.Error("Failed to create base directory", "error", err)
os.Exit(1)
}
// Create a context with cancellation
ctx, cancel := context.WithCancel(context.Background())
// Create a new store instance with the given logger
store := store.NewStore[model.Comment](logger)
// Initialize the store
nodeShutdownComplete, err := store.Initialize(ctx, conf.Concensus)
if err != nil {
logger.Error("Failed to initialize store", "error", err)
os.Exit(1)
}
// Create a new instance of the server
server := server.NewServer(store, logger)
// Initialize the server
serverShutdownComplete := server.Initialize(ctx, conf.Server)
// If join was specified, make the join request.
if conf.JoinAddress != "" {
b, err := json.Marshal(map[string]string{"address": conf.Concensus.Address, "id": conf.Concensus.ServerID})
if err != nil {
logger.Error("Failed to marshal join request", "error", err)
os.Exit(1)
}
resp, err := http.Post(fmt.Sprintf("http://%s/join", conf.JoinAddress), "application-type/json", bytes.NewReader(b))
if err != nil {
logger.Error("Failed to send join request", "error", err)
os.Exit(1)
}
resp.Body.Close()
if resp.StatusCode != http.StatusCreated {
logger.Error("Received non-OK response to join request", "status", resp.StatusCode)
os.Exit(1)
}
}
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
sig := <-sigs
switch sig {
case syscall.SIGINT:
logger.Info("Received SIGINT, shutting down")
case syscall.SIGTERM:
logger.Info("Received SIGTERM, shutting down")
}
// Cancel the context to stop the HTTP server and consensus node
cancel()
// Wait for the server and consensus node to finish shutting down
<-nodeShutdownComplete
<-serverShutdownComplete
}
Now lets break this down from the top.
- The
config
struct comprises of all details needed for initial setup. It gets set via the command line flags. - In
go
, theinit
function gets invoked automatically by the runtime. Here is where we define the various flags. - Now in
main
, we setup the parse the command line, setup the default logger, and validate input. - We initalize the store and server, and if join is specified, we know this is a follower node and we make a call to the leader node to join, otherwise we can bootstrap the cluster as its the leader.
- Finally, we wait for any signals and attempt to gracefully shutdown if so
Again here is the full implementation: https://github.com/kavinaravind/go-raft-message-queue
The README
will walk through the necessary setup details if you would like to try this out locally, as well as a detailed account
of all of the command line arguments.
I purposely left that out as I wanted to fully focus on the implementaton and less on running the nodes. It will
also go through how to run the leader node, how to run follower nodes, and all of the API Endpoints with examples via curl
.
This concludes the walkthrough of implementing a distributed message queue in Golang using the Raft Consensus Protocol.
I hope this helps with understanding why distributed computing is important, the fundamentals of the Raft consensus protocol, and insight with an end to end example in Go! 😊