lib_consensus.js

/**
 * @module brig/consensus
 * @see https://liangrunda.com/posts/raft-lite/
 */
'use strict';

const { Transform } = require('node:stream');
const { EventEmitter } = require('node:events');
const { LogState, LogEntry } = require('./log');

const {
  ElectionTimeout,
  ReplicationTimeout,
  VoteRequest,
  VoteResponse,
  LogRequest,
  LogResponse,
  Broadcast,
  LogCommit,
  RoleChanged,
  LeaderChanged,
  MessageQueued,
  Debug
} = require('./events');

const { Leader, Follower, Candidate } = require('./roles');

const {
  Message,
  VoteRequestMessage,
  VoteResponseMessage,
  LogRequestMessage,
  LogResponseMessage,
  BroadcastMessage
} = require('./messages');


class Peer extends EventEmitter {

  /**
   * User-defined function that receives address information for delivering
   * a message to a peer.
   *
   * @callback module:brig/consensus~peerTxHandler
   * @param {string} id - Peer identifier.
   * @param {module:brig/messages~Message} msg - Message to deliver.
   */

  /**
   * Abstract representation of a peer. Accepts any objectMode=true Duplex 
   * stream and a unique ID. Messages sent to peer will be written to the 
   * stream. {@link module:brig/consensus~Cluster} expects messages from 
   * peers to be read from this stream.
   *
   * @constructor
   * @extends EventEmitter
   * @param {string} id - Peer ID.
   * @param {module:brig/consensus~peerTxHandler} [tx] - User-defined transport.
   */
  constructor(id, tx) {
    super();

    this.id = id;

    this.tx = (_id, msg) => {
      this.emit(MessageQueued, _id, msg);
      tx && tx(_id, msg);
      return this;
    };
  }

  /**
   * Writes the message to the underlying stream.
   * @param {module:brig/messages~Message} message - The message to send.
   * @fires module:brig/events~MessageQueued
   * @returns {module:brig/consensus~Peer}
   */
  send(msg) {
    return this.tx(this.id, msg);
  }

}

module.exports.Peer = Peer;

/**
 * @property {number} ELECTION_T_MIN
 */
module.exports.ELECTION_T_MIN = 150;

/**
 * @property {number} ELECTION_T_MAX
 */
module.exports.ELECTION_T_MAX = 300;

/**
 * @property {number} REPLICATE_T
 */
module.exports.REPLICATE_T = 50;


class Cluster extends EventEmitter {

  /**
   * Election timeout in milliseconds plus a randomized additional time.
   * @returns {number}
   */
  static getElectionTimeoutMs() {
    const multiplier = module.exports.ELECTION_T_MAX - 
      module.exports.ELECTION_T_MIN;

    return module.exports.ELECTION_T_MIN + 
      Math.ceil(Math.random() * multiplier);
  }

  static getReplicateTimeoutMs() {
    return module.exports.REPLICATE_T;
  }

  /**
   * Primary brig interface. A synchronized, crash-fault-tolerant state 
   * machine consensus layer. Implements a Raft-like protocol.
   * 
   * @constructor
   * @param {buffer} id - Unique identifier for this node.
   * @param {Array.<module:brig/consensus~Peer>} [peers=[]] - Peer nodes.
   * @param {module:brig/log~LogState} [logState] - Initialize 
   * with the given state machine.
   */
  constructor(id, peers = [], logState) {
    super();

    /**
     * @property {string|number} id - Unique identifier for this context. 
     */
    this.id = id;
    /**
     * @property {module:brig/log~LogState} state - Underlying state machine.
     */
    this.state = logState
      ? logState 
      : new LogState();
    /**
     * @property {Array.<module:brig/consensus~Peer>} peers - Synchonized nodes.
     */
    this.peers = peers;

    this._bcastMsgBuffer = [];
  }

  /**
   * Initializes consensus algorithm.
   */
  join() {
    return this._init();
  }

  /**
   * Emits a debug event.
   *
   * @private
   */
  _dbg() {
    return this.emit(Debug, [...arguments]);
  }

  /**
   * Returns a map of METHOD -> HANDLER for integrating userland transport.
   *
   * returns {object.<string, function>}
   */
  createProtocolMapping() {
    this._dbg('Creating protocol mapping.');

    const mapping = {
      [VoteRequestMessage.method]: (msg, _ok) => {
        return this.emit(VoteRequest, Message.from(msg)) && _ok();
      },
      [VoteResponseMessage.method]: (msg, _ok) => {
        return this.emit(VoteResponse, Message.from(msg)) && _ok();
      },
      [LogRequestMessage.method]: (msg, _ok) => {
        return this.emit(LogRequest, Message.from(msg)) && _ok();
      },
      [LogResponseMessage.method]: (msg, _ok) => {
        return this.emit(LogResponse, Message.from(msg)) && _ok();
      },
      [BroadcastMessage.method]: (msg, _ok) => {
        return this.emit(Broadcast, Message.from(msg)) && _ok();
      }
    };

    return mapping;
  }

  /**
   * Binds event listeners to internal handler methods.
   * 
   * @private
   * @returns {module:brig/consensus~Cluster}
   */ 
  _init() {
    this._dbg('Initializing consensus module for node: ', this.id);

    this.state.on(RoleChanged, (oldRole, newRole) => {
      this._dbg('Node role changed from', oldRole, 'to', newRole);

      if (newRole === Leader) {
        this.startHeartbeat();
        clearTimeout(this._electionT);
      } else if (newRole === Follower) {
        this.resetElectionTimer();
        this.stopHeartbeat();
      } else {
        this.state.votesReceived.add(this.id);
        clearTimeout(this._electionT);
        this.stopHeartbeat();
      }
      
      this.emit(RoleChanged, oldRole, newRole);
    });

    this.state.on(LeaderChanged, (oldLeader, newLeader) => {
      if (this.state.currentLeader === this.id) {
        this.startHeartbeat();
      }

      if (oldLeader === newLeader) {
        return;
      }

      this._dbg('Node leader changed from', oldLeader, 'to', newLeader);
      
      while (this._bcastMsgBuffer.length) {
        const msg = this._bcastMsgBuffer.pop();
        
        this._dbg('Emptying broadcast buffer.', msg);
        this.getPeer(newLeader).send(msg);
      } 
      
      this.emit(LeaderChanged, oldLeader, newLeader);
    });

    this
      .on(ElectionTimeout, () => this.startElection())
      .on(ReplicationTimeout, () => this.handleReplicateLog())
      .on(VoteRequest, (vReq) => this.handleVoteRequest(vReq))
      .on(VoteResponse, (vRes) => this.handleVoteResponse(vRes))
      .on(LogRequest, (lReq) => this.handleLogRequest(lReq))
      .on(LogResponse, (lRes) => this.handleLogResponse(lRes))
      .on(Broadcast, (bCast) => this.handleBroadcast(bCast));

    this.resetElectionTimer();

    return this;
  }

  /**
   * Get peer from list by ID.
   * 
   * @param {string} peerId - Identifier for the peer node.
   * @returns {module:brig/consensus~Peer}
   */
  getPeer(id) {
    if (id === this.id) {
      throw new Error(
        'Tried to get peer object for self. This is probably not what you want'
      );
    }

    this._dbg('Looking for peer by ID: ', id);

    for (let i = 0; i < this.peers.length; i++) {
      if (this.peers[i].id === id) {
        this._dbg('Found peer: ', this.peers[i].id);
        return this.peers[i];
      }
    }

    this._dbg('Peer not found: ', id);

    return null;
  }

  /**
   * Adds a peer to the cluster.
   *
   * @param {module:brig/consensus~Peer} peer - Peer to add to cluster.
   * @returns {module:brig/consensus~Peer}
   */
  addPeer(peer) {
    this._dbg('Adding peer to cluster: ', peer.id);
    this.peers.push(peer);

    return this.getPeer(peer.id);
  }

  /**
   * Resets the election timeout.
   */
  resetElectionTimer() {
    this._electionT && clearTimeout(this._electionT);
    this._electionT = setTimeout(() => {
      this.emit(ElectionTimeout);
      this.resetElectionTimer();
    }, Cluster.getElectionTimeoutMs());
  }

  /**
   * Sends heartbeat messages to maintain authority and to replicate the log.
   */
  startHeartbeat() {
    this.stopHeartbeat();

    this._heartbeatT = setInterval(() => {
      this.emit(ReplicationTimeout);
    }, Cluster.getReplicateTimeoutMs());
  }

  stopHeartbeat() {
    this._heartbeatT && clearInterval(this._heartbeatT);
  }
  
  /**
   * Each node has a election timer, which is reset when receving the heartbeat 
   * from its leader. When the Election Timer expires, the follower will 
   * transition to the role of “candidate”. Following this transition, it will 
   * proceed to send voting requests to all nodes.
   * 
   */
  startElection() {
    this._dbg('Starting leader election.');

    if (this.state.currentRole === Leader) {
      this._dbg('This node is already the Leader.');
      return;
    }

    if (this.state.currentRole === Candidate) {
      this._dbg('Node is already a Candidate.');
      return;
    }
    
    this._dbg('Advancing term.');
    this.state.currentTerm += 1;
    this.state.votedFor = this.id;

    this._dbg('Requesting votes and resetting election timer.');
    this.state.setCurrentRole(Candidate);
    this.state.votesReceived.clear();
    this.state.votesReceived.add(this.id);

    this._dbg('Voted for self:', this.state.votesReceived);
    let lastTerm = 0;

    if (this.state.log.entries.length > 0) {
      lastTerm = this.state.log.getEntryByIndex(
        this.state.log.entries.length - 1).term;
    }

    const msg = new VoteRequestMessage({
      candidateId: this.id, 
      currentTerm: this.state.currentTerm, 
      currentLogLength: this.state.log.entries.length, 
      currentLogLastTerm: lastTerm
    });

    for (let i = 0; i < this.peers.length; i++) {
      if (this.peers[i].id === this.id) {
        continue;
      }
      this._dbg('Sending message: ', msg, ' to peer: ', this.peers[i].id);
      this.getPeer(this.peers[i].id).send(msg);
    }

    return this;
  }

  /**
   * When the replication timer expires, the leader will synchronize its log 
   * with all followers. The synchronization message also serves as a heartbeat 
   * message.
   *
   */
  handleReplicateLog() {
    if (this.state.currentRole !== Leader) {
      this._dbg('Not clusters leader, so will not replicate.');
      return;
    }

    for (let i = 0; i < this.peers.length; i++) {
      if (this.peers[i].id === this.id) {
        continue;
      }

      this.replicateLog(this.id, this.peers[i]);
    }
  }

    /**
   * Helper function that synchronizes the log of the leader with a follower.
   * The simplest way to synchronize the log is to send the entire log to the 
   * follower. However, this is inefficient. As mentioned earlier, the leader 
   * assumes that the log of the follower is the same as its own log when it 
   * becomes a leader. Therefore, the leader only needs to send the log entries 
   * that the follower does not have.
   *
   * ```
   * sentLength[follower] := log.length 
   * // the node assumes that the log of the follower is the same as its own log
   * ```
   *
   * The leader maintains a variable `sentLength` for each follower. 
   * `sentLength[follower]` denotes the length of the log that the leader 
   * believes the follower has. When the leader synchronizes the logs with the 
   * follower, it will send the log entries after `sentLength[follower]`. If 
   * the synchronization is failed, the leader will decrease 
   * `sentLength[follower]` by 1, and try again.
   *
   * @param {string} leaderId - Peer ID to replicate from.
   * @param {module:brig/consensus~Peer} follower - Peer to replicate to.
   */
  replicateLog(leaderId, follower) {
    if (follower.id === this.id) {
      return;
    }

    this._dbg('Replicate log from', leaderId, 'to', follower.id);

    // We call the log entries that the leader believes are already replicated 
    // on the follower as prefix.
    const prefixLength = this.state.sentLength.get(follower.id);

    // Only send the suffix of the log to the follower.
    const suffix = this.state.log.getEntriesAfterIndex(prefixLength);

    // prefixTerm is the term of the last log entry in the prefix. We will 
    // explain it later.
    let prefixTerm = 0;

    if (prefixLength > 0) {
      let lastLogEntry = this.state.log.getEntryByIndex(prefixLength);

      prefixTerm = lastLogEntry
        ? lastLogEntry.term
        : this.state.currentTerm;
    }

    const msg = new LogRequestMessage({
      leaderId: leaderId,
      term: this.state.currentTerm,
      prefixLength,
      prefixTerm,
      leaderCommit: this.state.commitLength,
      suffix
    });

    this._dbg('Sending LogRequestMessage:', msg);
    follower.send(msg);
  }

  /**
   * When node A receives a voting request from node B, it will perform the 
   * following steps:  
   *
   * 1. Check if the term of B is greater than or equal the current term of A. 
   * If not, A will reject the voting request, since voting for B might result 
   * in multiple leaders in B’s term.
   * 
   * 2. Check if the log of B is more or equal up-to-date than the log of A. 
   * If not, A will reject the voting request, since voting for B might result 
   * in log entries being lost.
   * 
   * 3. Check if A has already voted for another candidate in the current term. 
   * If so, A will reject the voting request, since voting for B might result 
   * in multiple leaders in the current term.
   *
   * @param {module:brig/messages~VoteRequestMessage} vReq - Vote request 
   * message.
   */
  handleVoteRequest(vReq) {
    this._dbg('Handling VoteRequest: ', vReq);

    // If the term of the candidate is greater than the current term of the 
    // node, then the node should update its current term to the term of the 
    // candidate, and become a follower. This is because:
    // 
    // 1. If the current node is a follower: it doesn't make sense to stay in 
    // the current term, since the leader may crash or disconnect.
    // 
    // 2. If the current node is a leader: it might be disconnected from the 
    // network or crashed for a while. In this case, the current node should 
    // step down and become a follower.
    if (vReq.currentTerm > this.state.currentTerm) {
      this._dbg('VoteRequestMessage term:', vReq.currentTerm, 
        'is greater than our current term:', this.state.currentTerm,
        'Becoming a follower.')

      this.state.currentTerm = vReq.currentTerm;
      this.state.setCurrentRole(Follower);
      this.state.votedFor = null;
    }

    let lastTerm = 0;

    if (this.state.log.entries.length > 0) {
      lastTerm = this.state.log.entries[this.state.log.entries.length - 1].term;
    }
    // Check if the log of the candidate is more up-to-date than the log of 
    // the node. logOk means the log of the candidate is more up-to-date than 
    // the log of the current node.
    const logOk = vReq.currentLogLastTerm > lastTerm
      || (vReq.currentLogLastTerm === lastTerm && 
          vReq.currentLogLength >= this.state.log.entries.length);

    let granted = false;
    // 1. If the term of the candidate is less than the current term of the 
    // node, then the node should reject the vote request.
    //
    // 2. If the log of the candidate is not more up-to-date than the log of 
    // the node, then the node should reject the vote request.
    // 
    // 3. If the node has already voted for another candidate in the current 
    // term, then the node should reject the vote request. 
    if (vReq.currentTerm >= this.state.currentTerm
      && logOk 
      && (this.state.votedFor === null || this.state.votedFor === vReq.candidateId)) {
      this.state.votedFor = vReq.candidateId;
      granted = true;
    }

    const msg = new VoteResponseMessage({
      voterId: this.id,
      term: this.state.currentTerm,
      // If false this means the node does not vote for the candidate, and 
      // the node will inform the candidate its current term. This is 
      // because the candidate may have a smaller term, and the node should 
      // make the candidate to update its term.
      granted
    });

    this.getPeer(vReq.candidateId).send(msg);
  }

  /**
   * Upon receiving voting responses, a node should check whether it has 
   * received a majority of votes. If so, it should transition to the role of 
   * leader. Otherwise, it should remain a candidate.
   *
   * @param {module:brig/messages~VoteResponseMessage} vRes - Vote response 
   * message to handle.
   */
  handleVoteResponse(vRes) {
    this._dbg('Handling vote response: ', vRes);

    this.resetElectionTimer();
    this.state.votesReceived.add(this.id);
    
    const isCandidate = this.state.currentRole === Candidate;
    const isCurrentTerm = vRes.term === this.state.currentTerm;
    const minVotesToWin = Math.ceil((this.peers.length + 1) / 2);
   
    this._dbg(isCandidate 
      ? 'Node is a candidate.'
      : 'Node is not a candidate.');

    this._dbg('Role:', this.state.currentRole);
    this._dbg('Vote term:', vRes.term, 
      ', Current term:', this.state.currentTerm);

    // If the node is a candidate, and the term of the vote response is the 
    // same as the current term of the node, and the vote response is granted, 
    // then the node should add the voter to the list of votes received.
    if (isCandidate && isCurrentTerm && vRes.granted) {
      this._dbg('Received a vote for the current term.');
      this.state.votesReceived.add(vRes.voterId);
    }

    this._dbg('Votes received:', this.state.votesReceived.size, 
      '/', minVotesToWin);

    // If the node receives a majority of votes, then the node becomes a leader.
    if (this.state.votesReceived.size >= minVotesToWin) {
      this._dbg('Node won the election.');

      // The node becomes the leader of itself.
      this.state.setCurrentRole(Leader);  
      this.state.currentLeader = this.id;

      // For each follower, the node should send a heartbeat message to it.
      for (let i = 0; i < this.peers.length; i++) {
        if (this.peers[i].id === this.id) {
          continue;
        }

        // The node assumes that the log of the follower is the same as its 
        // own log.
        this.state.sentLength.set(this.peers[i].id, this.state.log.entries.length);

        // The node does not receive any ack from the follower.
        this.state.ackedLength.set(this.peers[i].id, 0);
      }
        
      this.handleReplicateLog();
    } else if (vRes.term > this.state.currentTerm) {
      this._dbg('Vote response is from a future term, following.');

      // However, if the term of the vote response is greater than the current 
      // term of the node, then the node should update its current term to the 
      // term of the vote response, and become a follower. This is because the 
      // current node is already out of date, and it should step down and 
      // become a follower to avoid multiple leaders in the current term.
      this.state.currentTerm = vRes.term;
      this.state.setCurrentRole(Follower);
      this.state.votedFor = null;
      this.resetElectionTimer();
    }
  }
 
  /**
   * When a follower receives a synchronization message from the leader, it 
   * will perform the following steps:
   *
   * 1. The follower will check whether the log is consistent with the log 
   * entries that the leader believes the follower has. If not, the follower 
   * will reject the synchronization request.
   *
   * 2. If the log is consistent, the follower will append the suffix log 
   * entries to its own log.
   *
   * 3. The follower will check whether the leader has committed any log 
   * entries. If so, the follower will commit the log entries that the leader 
   * has committed.
   * 
   * To check whether the log is consistent, the follower will compare the term 
   * of the last log entry in the prefix with leader’s prefix_term. If they are 
   * not equal, the log is inconsistent. It is true due to a property of Raft: 
   * if two nodes have the same log term at the same index, then they have the 
   * same log entries at and before that index. Here we don’t give the proof of 
   * this property, but you can find it in the original paper.
   * 
   * @param {module:brig/messages~LogRequestMessage} lReq - Append entries 
   * request message.
   */
  handleLogRequest(lReq) {
    this._dbg('Handling LogRequestMessage:', lReq);

    // If the term of the log request is greater than the current term of the 
    // node, then the node should become a follower of the leader.
    if (lReq.term > this.state.currentTerm) {
      this.state.currentTerm = lReq.term;
      this.state.votedFor = null;
      this.resetElectionTimer();
    } 

    // If the term of the log request is the same as the current term of the 
    // node, then the node should become a follower of the leader (the current 
    // node might be a candidate).
    if (lReq.term === this.state.currentTerm) {
      this.state.setCurrentRole(Follower); 
      this.state.currentLeader = lReq.leaderId;
      this.resetElectionTimer();
    }

    // If logOk is true, then the prefix of the leader is the same as the 
    // prefix of the follower. Otherwise, the leader should send the log 
    // request again.
    const logOk = (this.state.log.entries.length >= lReq.prefixLength) 
      && (lReq.prefixLength === 0 
        || this.state.log.entries[lReq.prefixLength - 1].term === lReq.prefixTerm);

    let ack = 0;
    let success = false;

    if (lReq.term === this.state.currentTerm && logOk) {
      // Update the log using suffix.
      this._dbg('Terms match and log ok, appending entries.', 
        'Prefix:', lReq.prefixLength, 'Leader commit:', lReq.leaderCommit,
        'Suffix:', lReq.suffix);
      this.appendEntries(lReq.prefixLength, lReq.leaderCommit, lReq.suffix);

      // The node should notify the leader that it has received the log 
      // entries.
      ack = lReq.prefixLength + lReq.suffix.length;
      success = true;
    }

    const msg = new LogResponseMessage({
      followerId: this.id,
      term: this.state.currentTerm,
      ack,
      success
    });

    this.getPeer(lReq.leaderId).send(msg);
  }

  /**
   * Appends the suffix log entries to the log of the follower. Here we check 
   * whether the follower has the same suffix log entries as the leader. If 
   * not, the follower will remove all the log entries after prefix from its 
   * log, and append the suffix log entries from leader to its log.
   *
   * @param {number} prefixLength - Number of entries before current.
   * @param {number} leaderCommit - Number of commits on the leader.
   * @param {Array.<module:brig/log~LogEntry>} suffix - Entries to append.
   */
  appendEntries(prefixLength, leaderCommit, suffix) {
    this._dbg('Appending entries, prefix: ', prefixLength, ', leader commit: ', 
      leaderCommit, ', suffix: ', suffix);

    // If the suffix of the leader is not empty, and the suffix of the follower 
    // is not empty.
    if (suffix.length > 0 && this.state.log.entries.length > prefixLength) {
      // Get the index of the last log entry that can be compared.
      let index = Math.min(this.state.log.entries.length, 
        prefixLength + suffix.length) - 1;

      // If they have different terms, then the suffix of the follower might be 
      // different from the suffix of the leader.
      if (this.state.log.entries[index].term !== suffix[index - prefixLength].term) {       
        // Remove the suffix of the follower.
        this.state.log.entries = this.state.log.entries.slice(0, prefixLength);
      }
    }

    // If the we can find log entries that can be appended. 
    if (prefixLength + suffix.length > this.state.log.entries.length) {
      for (let i = this.state.log.entries.length - prefixLength; i < suffix.length; i++) {
        // Append the log entries to the log.
        this.state.log.append(new LogEntry(suffix[i].data, suffix[i].term)); 
      }
    }

    // logs[0..leaderCommit − 1] are acknowledged by the majority of nodes, so 
    // we can commit those log entries.
    if (leaderCommit > this.state.commitLength) {
      for (let i = this.state.commitLength; i < leaderCommit; i++) {
        this.emit(LogCommit, this.state.log.entries[i]);
      }

      this.state.commitLength = leaderCommit;
    }
  }

  /**
   * When the leader receives a log response from a follower, it will perform 
   * the following steps:
   *
   * 1. If the synchronization is successful, the leader will update 
   * `ackedLength` and `sentLength` of the follower.
   *
   * 2. If the synchronization is failed, the leader will decrease `sentLength` 
   * of the follower by 1, and try again.
   *
   * @param {module:brig/messages~LogResponseMessage} lRes - Append entries 
   * response message.
   */
  handleLogResponse(lRes) {
    this._dbg('Handling LogResponseMessage:', lRes);

    if (lRes.term === this.state.currentTerm && this.state.currentRole === Leader) {
      if (lRes.success && lRes.ack >= this.state.ackedLength.get(lRes.followerId)) {
        this.state.sentLength.set(lRes.followerId, lRes.ack);
        this.state.ackedLength.set(lRes.followerId, lRes.ack);
        this.commitLogEntries();
      } else if (this.state.sentLength.get(lRes.followerId) > 0) {
        this.state.sentLength.set(lRes.followerId, 
          this.state.sentLength.get(lRes.followerId) - 1);
        this.replicateLog(this.getPeer(this.id), this.getPeer(lRes.followerId));
      }
    } else if (lRes.term > this.state.currentTerm) {
      this.state.currentTerm = lRes.term;
      this.state.setCurrentRole(Follower);
      this.state.votedFor = null;
      this.resetElectionTimer();
    }
  }

  /**
   * Helper function for broadcasting a log entry.
   *
   * @param {object} payload - Key-value pairs to include in log entry.
   * @returns {module:brig/consensus~Cluster}
   */
  broadcast(payload) {
    this._dbg('Broadcasting payload: ', payload);  
    return this.emit(Broadcast, new BroadcastMessage(payload));
  }

  /**
   * When the application layer triggers a broadcast, the leader will append 
   * the broadcast message to its log, and send the log entry to all followers. 
   * If the current node is not a leader, it will forward the broadcast message 
   * to the leader.
   *
   * @param {object} payload - Payload to be included in log entry broadcasted.
   */
  handleBroadcast(bCast) {
    // If the node is a leader, then it can directly append the message to its 
    // log.
    this._dbg('Handling BroadcastMessage:', bCast)

    if (this.state.currentRole === Leader || this.state.currentLeader === this.id) {
      this._dbg('Node is Leader. Appending and replicating.');
      const msg = bCast;
      this.state.log.append(new LogEntry(msg, this.state.currentTerm));

      // The node is synchronized with itself.
      this.state.ackedLength.set(this.id, this.state.log.entries.length);

      // Synchronize the log with all followers.
      this.handleReplicateLog();
    } else if (this.state.currentLeader !== null) {
      this._dbg('Node is not Leader, so replicating to the Leader.');
      // If the node is not a leader, but it follows a leader, then it should 
      // forward the request to the leader.
      const msg = bCast;
      this.getPeer(this.state.currentLeader).send(msg);
    } else { 
      this._dbg('Node is not Leader and does not follow one. Waiting.');
      // If the node is not a leader, and it does not follow a 
      // leader, then it should buffer the message until it follows a leader.
      this._bcastMsgBuffer.push(bCast);
    }
  }
    
  /**
   * If the leader receives a majority of acknowledgements for a log entry, it 
   * will commit the log entry.
   *
   */
  commitLogEntries() {
    const minAcks = ((this.peers.length + 1) + 1) / 2;
    let readyMax = 0;

    for (let i = this.state.commitLength + 1; i < this.state.log.entries.length + 1; i++) {
      if (this.acks(i) >= minAcks) {
        readyMax = i;
      }
    }

    if (readyMax > 0 && this.state.log.entries[readyMax - 1].term === this.state.currentTerm) {
      for (let i = this.state.commitLength; i < readyMax; i++) {
        this.emit(LogCommit, this.state.log.entries[i]);
      }

      this.state.commitLength = readyMax;
    }
  }

    /**
   * The number of nodes whose `ackedLength` is greater than or equal to x.
   *
   * @param {number} x - Minimum number of acks.
   * @returns {number}
   */
  acks(length) {
    let acks = 0;
   
    for (let len of this.state.ackedLength.values()) {
      if (len >= length) {
        acks += 1;
      }
    }

    return acks;
  }

}

module.exports.Cluster = Cluster;