Implementing a Distributed Message Queue with the Raft Consensus Algorithm in Go

This guide is a walkthrough of an implementation of a distributed message queue using the Raft consensus algorithm in Go.

I aim to address the following:

  1. Why are Distributed Systems important?
  2. What is the Raft Concensus Algorithm?
  3. An implementation of a distributed message queue using the Raft consensus algorithm in Go

Sources of Inspiration

This is heavily inspired by the following projects / talks / resources:

Full Implementation: https://github.com/kavinaravind/go-raft-message-queue

Why are Distributed Systems important?

Philip O'Toole had a fantastic talk on distributed systems using Go which this walkthrough is heavily inspired by. He illustrated the importance of distributed computing; Here is a rundown of his points:

Distributed systems are the backbone of modern computing, particulary webscale, because the world demands:

  1. Uptime (available and functional 24 hours a day, 7 days a week, without interruptions)
  2. Fault Tolerance (the ability of a system to continue operating correctly in the event of the failure of some of its components)
  3. Scale (as demand increases, the system can accommodate this growth efficiently without performance degradation)

As developers, designers, and computer scientists, we have decided that distributed systems are the answer to these issues.

It is not entirely apparent, but much of the popular software that we use today rely on distributed systems:

  1. Kubernetes
  2. Consul
  3. OpenStack

Understanding how they are built and the tradeoffs you need to make will make you a better programmer, a better designer, and a better operator of those systems.

Complexities of Distributed Systems

There is a real challenge with correctly replicating data reliably, consistently, and quickly.

This problem is known as Distributed Consensus.

One solution that has solved this is a protocol called Raft. The aim of Raft was to create a protocol that was easy to understand and encourage system builders and programmers to build more effective and reliable distributed systems.

Core Raft Concepts

Raft Log

Every node in a Raft cluster agrees on a set of events and a sequence of events. The whole point of the raft consensus protocol is to make this log the same on every machine. Raft doesn't give importance to what is in the log, and what each entry in the log represents; It is up to you. All Raft will do is make sure that each of these entries are applied to whatever state machine (database / storage system, file system, etc).

Raft Log (Philip O'Toole Talk)

Snapshotting

Raft defines Snapshotting to ensure that the Raft Log doesn't grow without bound. Every so often, Raft will ask your system for an encapsulation of the entire state of your state machine and it will take that encapsulated state and delete every log entry that has already been represented in that snapshot. This allows for the truncation of the log, and now the state that Raft has is your snapshot and any log entries that have not been applied.

Raft Snapshotting (Philip O'Toole Talk)

Leader / Follower Paradigm

Raft is a leader oriented protocol. It defines a leader election process so there is one and only one leader in the cluster. This node is responsible for making sure the log is exactly the same on each node. Nodes that are not leaders are followers. Raft also defines how we detect when a node has failed via heartbeating.

Raft Leader (Philip O'Toole Talk)

At a high level, this is what Raft solves:

Concurrent UpdatesConsistencyFault Tolerance
Only the leader can make changesRaft log is keyOnly a quorum needs to agree
No "eventually consistent" resultsFailure-detection via heartbeating
Leader Election
  1. Raft provides a solution to the distributed consensus problem
  2. It abstracts away tons of the details
  3. It allows you to define what state machine you want to have replicated in a clean manner

Implementation in Go

You now have a general understanding on why distributed systems matter and at a high level understand the Raft protocol and how it solves the distributed consensus problem.

Now lets walk through an end-to-end implemetation in Golang of a distributed message queue using the Raft consensus protocol.

One of the best well-known implementations of the Raft protocol in Go is from Hashicorp. This implementation is currently powering Consul and Nomad, has great documention, and is also very customizable. We will be using this library to manage the replicated log, snapshots, and leader election.

There will also be various packages involved, here is a rundown of each of them:

PackageSummary
modelA sample struct that will be pushed and popped to the queue
dsAn implementation of a generic queue with support for concurrency
consensusConstructs a new Raft node, bootstraps the cluster, and logic for nodes to join the cluster
storeInitializes the consensus module, implements the raft FSM interface
serverInitializes the restful API server with various routes and handlers
mainReads in command line arguments, starts HTTP server, starts raft node

Package model

Lets start by first defining the data we would like to push and pull to a generic queue:

package model

import "time"

// Comment is the model for a comment
type Comment struct {
	Timestamp *time.Time `json:"timestamp,omitempty"`
	Author    string     `json:"author,omitempty"`
	Content   string     `json:"content,omitempty"`
}

In the example above, we have defined a comment that includes a timestamp, the author of the comment, and the actual message. This can be really anything you like, but for simplicity, we will be mimicking a basic chat.

Package ds

Now lets define the queue itself:

package ds

import "sync"

// Message is a generic message type
type Message[T any] struct {
	Data T
}

// Queue is a generic queue type
type Queue[T any] struct {
	Messages []Message[T]
	lock     sync.RWMutex
}

// NewQueue creates a new instance of the Queue
func NewQueue[T any]() *Queue[T] {
	return &Queue[T]{}
}

// Enqueue is used to add a message to the queue
func (q *Queue[T]) Enqueue(message Message[T]) {
	q.lock.Lock()
	defer q.lock.Unlock()

	q.Messages = append(q.Messages, message)
}

// Dequeue is used to remove a message from the queue
func (q *Queue[T]) Dequeue() (Message[T], bool) {
	q.lock.Lock()
	defer q.lock.Unlock()

	if len(q.Messages) == 0 {
		return Message[T]{}, false
	}

	message := q.Messages[0]
	q.Messages = q.Messages[1:]

	return message, true
}

// Copy is used to create a copy of the queue
func (q *Queue[T]) Copy() *Queue[T] {
	q.lock.RLock()
	defer q.lock.RUnlock()

	copy := NewQueue[T]()
	copy.Messages = append(copy.Messages, q.Messages...)

	return copy
}

You can see that this is a very simple generic implementation of a queue with a mutual exclusion lock for thread-safety. We define a Message struct with a Data field where we will be using Comment described earlier. We then define a Queue struct which contains a slice of Messages along with the mutex lock. We then have methods to push to the queue, pop from the queue, and copy the queue itself. An important piece to note here is that Copy is essential for Snapshotting where we want an encapsulation of the entire state.

Also, we are not sharding the queue; every single node that is a part of this cluster will have a complete copy of the queue.

Package consensus

We now get into the consensus module where we will be leveraging the Hashicorp Raft library to construct a new Raft Node.

package consensus

import (
	"fmt"
	"net"
	"os"
	"path/filepath"
	"time"

	"github.com/hashicorp/raft"
	raftboltdb "github.com/hashicorp/raft-boltdb/v2"
)

// Consensus is the consensus module
type Consensus struct {
	Node *raft.Raft
}

// Config is the configuration for the consensus module
type Config struct {
	// IsLeader is a flag that indicates if the server is the leader
	IsLeader bool

	// ServerID is the unique identifier for the server
	ServerID string

	// BaseDirectory is the directory where the raft data will be stored
	BaseDirectory string

	// Address is the address at which the server will be listening
	Address string
}

// NewConsensusConfig creates a new consensus config
func NewConsensusConfig() *Config {
	return &Config{}
}

// NewConsensus creates a new instance of the consensus module
func NewConsensus(fsm raft.FSM, conf *Config) (*Consensus, error) {
	// Create the raft configuration
	config := raft.DefaultConfig()
	config.LocalID = raft.ServerID(conf.ServerID)

	// Set the snapshot interval to 1 second and the snapshot threshold to 1
	// so a snapshot is taken after every log entry for testing
	// config.SnapshotInterval = 1 * time.Second
	// config.SnapshotThreshold = 1

	// Create the raft store
	store, err := raftboltdb.NewBoltStore(filepath.Join(conf.BaseDirectory, "raft.db"))
	if err != nil {
		return nil, err
	}
	logStore, stableStore := store, store

	// Create the snapshot store
	snapshotStore, err := raft.NewFileSnapshotStore(conf.BaseDirectory, 2, os.Stderr)
	if err != nil {
		return nil, err
	}

	// Create the transport
	address, err := net.ResolveTCPAddr("tcp", conf.Address)
	if err != nil {
		return nil, err
	}
	transport, err := raft.NewTCPTransport(conf.Address, address, 3, 10*time.Second, os.Stderr)
	if err != nil {
		return nil, err
	}

	// Create the raft node
	node, err := raft.NewRaft(config, fsm, logStore, stableStore, snapshotStore, transport)
	if err != nil {
		return nil, err
	}

	// If the server is the leader, bootstrap the cluster
	if conf.IsLeader {
		configuration := raft.Configuration{
			Servers: []raft.Server{
				{
					ID:      raft.ServerID(conf.ServerID),
					Address: raft.ServerAddress(conf.Address),
				},
			},
		}
		node.BootstrapCluster(configuration)
	}

	return &Consensus{Node: node}, nil
}

// Join joins the raft cluster
func (c *Consensus) Join(nodeID, address string) error {
	configFuture := c.Node.GetConfiguration()
	if err := configFuture.Error(); err != nil {
		return err
	}

	for _, server := range configFuture.Configuration().Servers {
		// The node is already part of the cluster
		if server.ID == raft.ServerID(nodeID) && server.Address == raft.ServerAddress(address) {
			return nil
		}

		// There's a node with the same ID or address, remove it first
		if server.ID == raft.ServerID(nodeID) || server.Address == raft.ServerAddress(address) {
			future := c.Node.RemoveServer(server.ID, 0, 0)
			if err := future.Error(); err != nil {
				return fmt.Errorf("error removing existing node %s at %s: %s", nodeID, address, err)
			}
		}
	}

	// Add the new node as a voter
	f := c.Node.AddVoter(raft.ServerID(nodeID), raft.ServerAddress(address), 0, 0)
	if f.Error() != nil {
		return f.Error()
	}

	return nil
}

// WaitForNodeToBeLeader waits for the node to become the leader
func (c *Consensus) WaitForNodeToBeLeader(duration time.Duration) error {
	timeout := time.After(duration)
	ticker := time.NewTicker(100 * time.Millisecond)
	defer ticker.Stop()

	for {
		select {
		case <-timeout:
			return fmt.Errorf("timed out waiting for node to be leader")
		case <-ticker.C:
			if c.Node.State() == raft.Leader {
				return nil
			}
		}
	}
}

This may be a bit overwhelming at first, but lets highlight the main pieces:

  1. A Consensus struct is defined which contains a pointer to Hashicorps raft.Node
  2. A Config struct is defined containing fields essential for the setup of the raft node
  3. raft.NewRaft takes in 6 parameters:
    • *raft.Config: DefaultConfig will return a Config with usable defaults
    • raft.FSM: Implemented and explained in the store package (see later section)
    • raft.LogStore: Used to store the Raft log
    • raft.StableStore: Used to provide stable storage
    • raft.SnapshotStore: Used to store the snapshots
    • raft.Transport: A network transport to interace with other nodes
  4. We are using Hashicorps Bolt Store which implements raft.LogStore and raft.StableStore
  5. BootstrapCluster will be invoked if the server is the leader which will set up the raft node
  6. The Join method will join a new node into an existing cluster. We use AddVoter to go about this.
  7. The WaitForNodeToBeLeader method is a helper function as it takes a bit of time for the node to boot up

At a high level, the NewConsensus function will pull in configuration details, setup the log store, stable store, snapshot store, transport, and initialize the raft node. There are also 2 other methods, where Join will join a new node into an existing cluster, and WaitForNodeToBeLeader which is a helper function that will wait for a raft node to initialize.

Package store

Package store contains all of the relevant methods to send and recieve messages as well as implementing Rafts FSM interface.

Here is the declaration of that interface:

// FSM is implemented by clients to make use of the replicated log.
type FSM interface {
	// Apply is called once a log entry is committed by a majority of the cluster.
	//
	// Apply should apply the log to the FSM. Apply must be deterministic and
	// produce the same result on all peers in the cluster.
	//
	// The returned value is returned to the client as the ApplyFuture.Response.
	Apply(*Log) interface{}

	// Snapshot returns an FSMSnapshot used to: support log compaction, to
	// restore the FSM to a previous state, or to bring out-of-date followers up
	// to a recent log index.
	//
	// The Snapshot implementation should return quickly, because Apply can not
	// be called while Snapshot is running. Generally this means Snapshot should
	// only capture a pointer to the state, and any expensive IO should happen
	// as part of FSMSnapshot.Persist.
	//
	// Apply and Snapshot are always called from the same thread, but Apply will
	// be called concurrently with FSMSnapshot.Persist. This means the FSM should
	// be implemented to allow for concurrent updates while a snapshot is happening.
	Snapshot() (FSMSnapshot, error)

	// Restore is used to restore an FSM from a snapshot. It is not called
	// concurrently with any other command. The FSM must discard all previous
	// state before restoring the snapshot.
	Restore(snapshot io.ReadCloser) error
}

We need to address the following needs:

  1. How do we Apply changes to the queue? Based on the command type (send / recieve), we can enqueue or dequeue from the queue respectively.
  2. How do we Snapshot the state machine? Well we can do this by getting a copy of the queue from the leader node.
  3. How do we Restore the state machine to a node? Snapshots are encoded using gob (shown later in this section) so we can decode the queue and set the state.
package store

import (
	"context"
	"encoding/gob"
	"encoding/json"
	"errors"
	"fmt"
	"io"
	"log/slog"
	"time"

	"github.com/hashicorp/raft"
	"github.com/kavinaravind/go-raft-message-queue/consensus"
	"github.com/kavinaravind/go-raft-message-queue/ds"
)

// Specific operations that can be applied to the store
const (
	Send = iota
	Recieve
)

// command is used to represent the command that will be applied to the store
type command[T any] struct {
	Operation int           `json:"operation"`
	Message   ds.Message[T] `json:"message"`
}

// newCommand is used to create a new command instance
func newCommand[T any](operation int, message ds.Message[T]) *command[T] {
	return &command[T]{
		Operation: operation,
		Message:   message,
	}
}

type Store[T any] struct {
	// ds that will be distributed across each node
	queue *ds.Queue[T]

	// consensus instance that will be used to replicate the ds
	consensus *consensus.Consensus

	// logger instance
	logger *slog.Logger
}

// NewStore creates a new store instance with the given logger
func NewStore[T any](logger *slog.Logger) *Store[T] {
	return &Store[T]{
		queue:  ds.NewQueue[T](),
		logger: logger,
	}
}

// Initialize is used to initialize the store with the given config
func (s *Store[T]) Initialize(ctx context.Context, conf *consensus.Config) (chan struct{}, error) {
	s.logger.Info("Initializing store")

	consensus, err := consensus.NewConsensus(s, conf)
	if err != nil {
		return nil, err
	}

	s.consensus = consensus

	// Listen for context cancellation and shutdown the server
	shutdownComplete := make(chan struct{})
	go func() {
		<-ctx.Done()
		future := s.consensus.Node.Shutdown()
		if err := future.Error(); err != nil {
			s.logger.Error("Failed to shutdown node", "error", err)
		} else {
			s.logger.Info("Node shutdown")
		}
		close(shutdownComplete)
	}()

	return shutdownComplete, nil
}

// Send is used to enqueue a message into the queue
func (s *Store[T]) Send(data T) error {
	if s.consensus.Node.State() != raft.Leader {
		return errors.New("not the leader")
	}

	c := newCommand[T](Send, ds.Message[T]{Data: data})
	bytes, err := json.Marshal(c)
	if err != nil {
		return err
	}

	future := s.consensus.Node.Apply(bytes, 10*time.Second)
	return future.Error()
}

// Recieve is used to dequeue a message from the queue
func (s *Store[T]) Recieve() (*ds.Message[T], error) {
	if s.consensus.Node.State() != raft.Leader {
		return nil, errors.New("node is not the leader")
	}

	c := newCommand[T](Recieve, ds.Message[T]{})
	bytes, err := json.Marshal(c)
	if err != nil {
		s.logger.Error("failed to marshal message", "error", err)
		return nil, err
	}

	future := s.consensus.Node.Apply(bytes, 10*time.Second)
	if err := future.Error(); err != nil {
		s.logger.Error("failed to apply message", "error", err)
		return nil, err
	}

	switch response := future.Response().(type) {
	case nil:
		// The Apply method returned an empty response
		return &ds.Message[T]{}, nil
	case ds.Message[T]:
		// The Apply method returned a message
		return &response, nil
	default:
		// The Apply method returned an unexpected type
		return nil, fmt.Errorf("unexpected response type: %T", response)
	}
}

// Stats is used to return the stats of the raft instance
func (s *Store[T]) Stats() map[string]string {
	return s.consensus.Node.Stats()
}

// Join is used to join a remote node to the raft cluster
func (s *Store[T]) Join(nodeID, address string) error {
	s.logger.Info(fmt.Sprintf("received join request for remote node %s at %s", nodeID, address))
	return s.consensus.Join(nodeID, address)
}

// implement the raft fsm interface

// Apply is used to apply a log entry to the store
func (s *Store[T]) Apply(log *raft.Log) interface{} {
	var command command[T]
	err := json.Unmarshal(log.Data, &command)
	if err != nil {
		s.logger.Error("failed to unmarshal message", "error", err)
		return err
	}

	switch command.Operation {
	case Send:
		s.queue.Enqueue(command.Message)
		return nil
	case Recieve:
		val, ok := s.queue.Dequeue()
		// If the queue is empty, return nil
		if !ok {
			return nil
		}
		return val
	default:
		return fmt.Errorf("unknown operation: %v", command.Operation)
	}
}

// Snapshot is used to create a snapshot of the store
func (s *Store[T]) Snapshot() (raft.FSMSnapshot, error) {
	return &Snapshot[T]{
		queue: s.queue.Copy(),
	}, nil
}

// Restore is used to restore the store from a snapshot
func (s *Store[T]) Restore(rc io.ReadCloser) error {
	defer rc.Close()

	dec := gob.NewDecoder(rc)

	// Decode the entire queue
	var queue ds.Queue[T]
	if err := dec.Decode(&queue); err != nil {
		return fmt.Errorf("failed to decode queue: %w", err)
	}

	s.queue = &queue

	return nil
}

// WaitForNodeToBeLeader is used to wait for the node to become the leader
func (s *Store[T]) WaitForNodeToBeLeader(duration time.Duration) error {
	return s.consensus.WaitForNodeToBeLeader(duration)
}

This bit of code can also be a bit overwhelming. Lets break it down:

  1. The two operations we have support for are sending a message to the queue and recieving a message from the queue. These are defined as the commands Send and Recieve.
  2. We define a command struct which denotes the type of operation as well the message itself. This wil be used when sending, recieving, and applying messages.
  3. We define a Store struct which contains a pointer to the queue itself, a pointer the consensus module that points to a running Raft node instance, and a logger.
  4. The Initialize method will initiliaze the Raft Node and spawn a go routine that handles Shutdown.
  5. The Send and Recieve methods are exported and will be invoked by the server during a RESTful call.
  6. The Stats method is a sanity call to retrieve Raft.Node metadata for debugging purposes.
  7. The Apply, Snapshot, Restore methods are needed to implement the Raft FSM interface detailed before this code snippet.

At a high level, this package starts the Raft.Node and exposes APIs that push and pop messages in a distributed manner by leveraging the Raft.Node and implements the Raft FSM to Apply changes to the queue based on the operation type, snapshot the queue via a slice copy, and restore the queue via a gob decode.

We also need to implement the Raft FSMSnapshot detailed here:

// FSMSnapshot is returned by an FSM in response to a Snapshot
// It must be safe to invoke FSMSnapshot methods with concurrent
// calls to Apply.
type FSMSnapshot interface {
	// Persist should dump all necessary state to the WriteCloser 'sink',
	// and call sink.Close() when finished or call sink.Cancel() on error.
	Persist(sink SnapshotSink) error

	// Release is invoked when we are finished with the snapshot.
	Release()
}

This comprises of logic to store the fsm to a snapshot sink. To tie it into this example, we need a way to serialize the queue and write it to sink.

The logic below demonstrates how to go about that:

package store

import (
	"encoding/gob"

	"github.com/hashicorp/raft"
	"github.com/kavinaravind/go-raft-message-queue/ds"
)

// Snapshot is used to create a snapshot of the queue
type Snapshot[T any] struct {
	queue *ds.Queue[T]
}

// Persist is used to persist the snapshot to the sink
func (s *Snapshot[T]) Persist(sink raft.SnapshotSink) error {
	err := func() error {
		// Create a gob encoder and encode the queue
		enc := gob.NewEncoder(sink)
		if err := enc.Encode(s.queue); err != nil {
			return err
		}

		return nil
	}()

	// If there was an error, cancel the sink and return the error
	if err != nil {
		sink.Cancel()
		return err
	}

	// Otherwise, close the sink and return any errors
	return sink.Close()
}

// Release is used to release any resources acquired during the snapshot
// In this case, we don't have any resources to clean up (noop)
func (s *Snapshot[T]) Release() {}

As described before, we used gob to decode the queue. This is because when snapshotting, we are using gob to encode the queue. In the Persist method, we are encoding the queue and writing to sink.

The other method that is needed to implement this interface is Release which is used to release any resources acquired during the snapshot. In this case, we don't have any resources to clean up so this is a noop.

One of the most important function calls from the Raft library is Apply:

// Apply is used to apply a command to the FSM in a highly consistent
// manner. This returns a future that can be used to wait on the application.
// An optional timeout can be provided to limit the amount of time we wait
// for the command to be started. This must be run on the leader or it
// will fail.
//
// If the node discovers it is no longer the leader while applying the command,
// it will return ErrLeadershipLost. There is no way to guarantee whether the
// write succeeded or failed in this case. For example, if the leader is
// partitioned it can't know if a quorum of followers wrote the log to disk. If
// at least one did, it may survive into the next leader's term.
//
// If a user snapshot is restored while the command is in-flight, an
// ErrAbortedByRestore is returned. In this case the write effectively failed
// since its effects will not be present in the FSM after the restore.
func (r *Raft) Apply(cmd []byte, timeout time.Duration) ApplyFuture {
	return r.ApplyLog(Log{Data: cmd}, timeout)
}

So in other words, we aren't pushing or popping directly to the queue, we are calling Apply for the Raft.Node to Apply a change in a consistent manner. We then recieve a future which represents an action that may occur in the future.

Package server

Now lets define RESTful endpoints that we can use to interact with the store and by extension the Raft.Node. This package is responsible for running the HTTP server and registering all the routes with its corresponding handlers.

package server

import (
	"context"
	"encoding/json"
	"log/slog"
	"net/http"
	"os"

	"github.com/kavinaravind/go-raft-message-queue/model"
	"github.com/kavinaravind/go-raft-message-queue/store"
)

// Config is the configuration for the server
type Config struct {
	// Address is the address at which the server will be listening
	Address string
}

// NewServerConfig creates a new server config
func NewServerConfig() *Config {
	return &Config{}
}

// Server is a simple HTTP server that listens for incoming requests
type Server struct {
	// httpServer is the underlying HTTP server
	httpServer *http.Server

	// store is the store instance
	store *store.Store[model.Comment]

	// logger is the logger instance
	logger *slog.Logger
}

// NewServer creates a new instance of the Server
func NewServer(store *store.Store[model.Comment], logger *slog.Logger) *Server {
	return &Server{
		store:  store,
		logger: logger,
	}
}

// Initialize starts the HTTP server
func (s *Server) Initialize(ctx context.Context, conf *Config) chan struct{} {
	s.logger.Info("Initializing server")

	mux := http.NewServeMux()

	// Register the handlers
	mux.HandleFunc("/send", s.handleSend)
	mux.HandleFunc("/recieve", s.handleRecieve)
	mux.HandleFunc("/stats", s.handleStats)
	mux.HandleFunc("/join", s.handleJoin)

	// Create the HTTP server
	s.httpServer = &http.Server{
		Addr:    conf.Address,
		Handler: mux,
	}

	// Start the HTTP server
	go func() {
		if err := s.httpServer.ListenAndServe(); err != http.ErrServerClosed {
			s.logger.Error("Failed to start server", "error", err)
			os.Exit(1)
		}
	}()

	// Listen for context cancellation and shutdown the server
	shutdownComplete := make(chan struct{})
	go func() {
		<-ctx.Done()
		if err := s.httpServer.Shutdown(ctx); err != nil {
			s.logger.Error("Failed to shutdown server", "error", err)
		} else {
			s.logger.Info("Server shutdown")
		}
		close(shutdownComplete)
	}()

	return shutdownComplete
}

// handleSend is the handler for sending a message
func (s *Server) handleSend(w http.ResponseWriter, r *http.Request) {
	if r.Method != http.MethodPost {
		http.Error(w, "Invalid request method", http.StatusMethodNotAllowed)
		return
	}

	var message model.Comment
	if err := json.NewDecoder(r.Body).Decode(&message); err != nil {
		http.Error(w, "Failed to decode message", http.StatusBadRequest)
		return
	}

	if err := s.store.Send(message); err != nil {
		http.Error(w, "Failed to send message", http.StatusInternalServerError)
		return
	}

	w.WriteHeader(http.StatusCreated)
}

// handleRecieve is the handler for recieving a message
func (s *Server) handleRecieve(w http.ResponseWriter, r *http.Request) {
	if r.Method != http.MethodGet {
		http.Error(w, "Invalid request method", http.StatusMethodNotAllowed)
		return
	}

	message, err := s.store.Recieve()
	if err != nil {
		http.Error(w, "Failed to recieve message", http.StatusInternalServerError)
		return
	}

	w.Header().Set("Content-Type", "application/json")
	err = json.NewEncoder(w).Encode(message)
	if err != nil {
		http.Error(w, "Failed to encode message", http.StatusInternalServerError)
		return
	}

	w.WriteHeader(http.StatusOK)
}

// handleStats is the handler for getting the stats of the store
func (s *Server) handleStats(w http.ResponseWriter, r *http.Request) {
	stats := s.store.Stats()

	w.Header().Set("Content-Type", "application/json")
	err := json.NewEncoder(w).Encode(stats)
	if err != nil {
		http.Error(w, "Failed to encode message", http.StatusInternalServerError)
		return
	}

	w.WriteHeader(http.StatusOK)
}

// handleJoin is the handler for joining a remote node to the cluster
func (s *Server) handleJoin(w http.ResponseWriter, r *http.Request) {
	if r.Method != http.MethodPost {
		http.Error(w, "Invalid request method", http.StatusMethodNotAllowed)
		return
	}

	body := map[string]string{}
	if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
		http.Error(w, "Failed to decode body", http.StatusBadRequest)
		return
	}

	if len(body) != 2 {
		w.WriteHeader(http.StatusBadRequest)
		return
	}

	remoteAddress, ok := body["address"]
	if !ok {
		w.WriteHeader(http.StatusBadRequest)
		return
	}

	nodeID, ok := body["id"]
	if !ok {
		w.WriteHeader(http.StatusBadRequest)
		return
	}

	if err := s.store.Join(nodeID, remoteAddress); err != nil {
		http.Error(w, "Failed to join cluster", http.StatusInternalServerError)
		return
	}

	w.WriteHeader(http.StatusCreated)
}

Similar to other sections, lets break down this package from the top down:

  1. The Config struct defines essential details that the server needs. In this case, it is the address at which the server will be listening
  2. The Server struct holds the http server, the store (comprising of pointers to the consensus module and queue), and a logger
  3. The Initialize method sets up a new mux, registers the routes with handlers, and starts the server
  4. Lets define each route:
    • /send (handleSend): A POST request that will Apply the send operation and trigger the push of the message to the queue on all Raft nodes
    • /recieve (handleRecieve): A GET request that will Apply the recieve operation and fetch the message from the queue on the leader Raft node
    • /stats (handleStats): A GET request that will output Raft statistic metadata
    • /join (handleJoin): A POST request that will join a remote node to the cluster

At a high level, we have just defined handlers and routes that will push and pull from the queue, gather Raft node stats, and join a given node to the Raft cluster. We also start the HTTP server with logic to shutdown.

Package main

The last package! You've almost made it!

The main package is responsible for parsing various input flags (remember those configs for consensus and server that were defined earlier?) and starting the http server and raft node.

package main

import (
	"bytes"
	"context"
	"encoding/json"
	"flag"
	"fmt"
	"log/slog"
	"net/http"
	"os"
	"os/signal"
	"syscall"

	"github.com/kavinaravind/go-raft-message-queue/consensus"
	"github.com/kavinaravind/go-raft-message-queue/model"
	"github.com/kavinaravind/go-raft-message-queue/server"
	"github.com/kavinaravind/go-raft-message-queue/store"
)

type config struct {
	JoinAddress string
	Concensus   *consensus.Config
	Server      *server.Config
}

func newConfig() *config {
	return &config{
		Concensus: consensus.NewConsensusConfig(),
		Server:    server.NewServerConfig(),
	}
}

var conf *config

func init() {
	conf = newConfig()

	// Concensus Specific Flags
	flag.BoolVar(&conf.Concensus.IsLeader, "leader", false, "Set to true if this node is the leader")
	flag.StringVar(&conf.Concensus.ServerID, "id", "", "The unique identifier for this server")
	flag.StringVar(&conf.Concensus.Address, "raddr", "localhost:3001", "The address that the Raft consensus group should use")
	flag.StringVar(&conf.Concensus.BaseDirectory, "dir", "/tmp", "The base directory for storing Raft data")
	flag.StringVar(&conf.JoinAddress, "paddr", "", "The address of an existing node to join")

	// Server Specific Flags
	flag.StringVar(&conf.Server.Address, "haddr", "localhost:3000", "The address that the HTTP server should use")

	// Set Usage Details
	flag.Usage = func() {
		fmt.Fprintf(os.Stderr, "Usage of %s:\n", os.Args[0])
		flag.PrintDefaults()
	}
}

func main() {
	flag.Parse()

	logger := slog.Default()

	if conf.Concensus.ServerID == "" {
		logger.Error("The -id flag is required")
		os.Exit(2)
	}

	// Create the base directory if it does not exist
	if err := os.MkdirAll(conf.Concensus.BaseDirectory, 0755); err != nil {
		logger.Error("Failed to create base directory", "error", err)
		os.Exit(1)
	}

	// Create a context with cancellation
	ctx, cancel := context.WithCancel(context.Background())

	// Create a new store instance with the given logger
	store := store.NewStore[model.Comment](logger)

	// Initialize the store
	nodeShutdownComplete, err := store.Initialize(ctx, conf.Concensus)
	if err != nil {
		logger.Error("Failed to initialize store", "error", err)
		os.Exit(1)
	}

	// Create a new instance of the server
	server := server.NewServer(store, logger)

	// Initialize the server
	serverShutdownComplete := server.Initialize(ctx, conf.Server)

	// If join was specified, make the join request.
	if conf.JoinAddress != "" {
		b, err := json.Marshal(map[string]string{"address": conf.Concensus.Address, "id": conf.Concensus.ServerID})
		if err != nil {
			logger.Error("Failed to marshal join request", "error", err)
			os.Exit(1)
		}
		resp, err := http.Post(fmt.Sprintf("http://%s/join", conf.JoinAddress), "application-type/json", bytes.NewReader(b))
		if err != nil {
			logger.Error("Failed to send join request", "error", err)
			os.Exit(1)
		}
		resp.Body.Close()

		if resp.StatusCode != http.StatusCreated {
			logger.Error("Received non-OK response to join request", "status", resp.StatusCode)
			os.Exit(1)
		}
	}

	sigs := make(chan os.Signal, 1)
	signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
	sig := <-sigs
	switch sig {
	case syscall.SIGINT:
		logger.Info("Received SIGINT, shutting down")
	case syscall.SIGTERM:
		logger.Info("Received SIGTERM, shutting down")
	}

	// Cancel the context to stop the HTTP server and consensus node
	cancel()

	// Wait for the server and consensus node to finish shutting down
	<-nodeShutdownComplete
	<-serverShutdownComplete
}

Now lets break this down from the top.

  1. The config struct comprises of all details needed for initial setup. It gets set via the command line flags.
  2. In go, the init function gets invoked automatically by the runtime. Here is where we define the various flags.
  3. Now in main, we setup the parse the command line, setup the default logger, and validate input.
  4. We initalize the store and server, and if join is specified, we know this is a follower node and we make a call to the leader node to join, otherwise we can bootstrap the cluster as its the leader.
  5. Finally, we wait for any signals and attempt to gracefully shutdown if so

Again here is the full implementation: https://github.com/kavinaravind/go-raft-message-queue

The README will walk through the necessary setup details if you would like to try this out locally, as well as a detailed account of all of the command line arguments.

I purposely left that out as I wanted to fully focus on the implementaton and less on running the nodes. It will also go through how to run the leader node, how to run follower nodes, and all of the API Endpoints with examples via curl.


This concludes the walkthrough of implementing a distributed message queue in Golang using the Raft Consensus Protocol.

I hope this helps with understanding why distributed computing is important, the fundamentals of the Raft consensus protocol, and insight with an end to end example in Go! 😊