Package net.kuujo.copycat.internal.state

Source Code of net.kuujo.copycat.internal.state.CandidateController

/*
* Copyright 2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.kuujo.copycat.internal.state;

import net.kuujo.copycat.CopycatState;
import net.kuujo.copycat.internal.cluster.RemoteNode;
import net.kuujo.copycat.internal.log.CopycatEntry;
import net.kuujo.copycat.internal.util.Quorum;
import net.kuujo.copycat.protocol.PingRequest;
import net.kuujo.copycat.protocol.PingResponse;
import net.kuujo.copycat.protocol.PollRequest;
import net.kuujo.copycat.protocol.PollResponse;
import net.kuujo.copycat.spi.protocol.ProtocolClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* Candidate state.<p>
*
* The candidate state is a brief state in which the replica is
* a candidate in an election. During its candidacy, the replica
* will send vote requests to all other nodes in the cluster. Based
* on the vote result, the replica will then either transition back
* to <code>Follower</code> or become the leader.
*
* @author <a href="http://github.com/kuujo">Jordan Halterman</a>
*/
public class CandidateController extends StateController {
  private static final Logger LOGGER = LoggerFactory.getLogger(CandidateController.class);
  private Quorum quorum;
  private ScheduledFuture<Void> currentTimer;

  @Override
  CopycatState state() {
    return CopycatState.CANDIDATE;
  }

  @Override
  Logger logger() {
    return LOGGER;
  }

  @Override
  void init(StateContext context) {
    super.init(context);
    LOGGER.info("{} - Starting election", context.clusterManager().localNode());
    resetTimer();
  }

  /**
   * Resets the election timer.
   */
  @SuppressWarnings("unchecked")
  private synchronized void resetTimer() {
    // Cancel the current timer task and purge the election timer of cancelled tasks.
    if (currentTimer != null) {
      currentTimer.cancel(true);
    }

    // When the election timer is reset, increment the current term and
    // restart the election.
    context.currentTerm(context.currentTerm() + 1);
    long delay = context.config().getElectionTimeout() - (context.config().getElectionTimeout() / 4) + (Math.round(Math.random() * (context.config().getElectionTimeout() / 2)));
    currentTimer = context.config().getTimerStrategy().schedule(() -> {
      // When the election times out, clear the previous majority vote
      // check and restart the election.
      LOGGER.info("{} - Election timed out", context.clusterManager().localNode());
      if (quorum != null) {
        quorum.cancel();
        quorum = null;
      }
      resetTimer();
      LOGGER.info("{} - Restarted election", context.clusterManager().localNode());
    }, delay, TimeUnit.MILLISECONDS);

    final AtomicBoolean complete = new AtomicBoolean();

    // Send vote requests to all nodes. The vote request that is sent
    // to this node will be automatically successful.
    // First check if the quorum is null. If the quorum isn't null then that
    // indicates that another vote is already going on.
    final Quorum quorum = new Quorum((int) Math.floor(context.clusterManager().nodes().size() / 2) + 1, (elected) -> {
      complete.set(true);
      if (elected) {
        context.transition(LeaderController.class);
      } else {
        context.transition(FollowerController.class);
      }
    }).countSelf();

    // First, load the last log entry to get its term. We load the entry
    // by its index since the index is required by the protocol.
    final long lastIndex = context.log().lastIndex();
    CopycatEntry lastEntry = context.log().getEntry(lastIndex);

    // Once we got the last log term, iterate through each current member
    // of the cluster and poll each member for a vote.
    LOGGER.info("{} - Polling members {}", context.clusterManager().localNode(), context.clusterManager().cluster().remoteMembers());
    final long lastTerm = lastEntry != null ? lastEntry.term() : 0;
    for (RemoteNode<?> node : (Set<RemoteNode<?>>) context.clusterManager().remoteNodes()) {
      final ProtocolClient client = node.client();
      client.connect().whenComplete((result1, error1) -> {
        if (error1 != null) {
          quorum.fail();
        } else {
          LOGGER.debug("{} - Polling {}", context.clusterManager().localNode(), node.member());
          client.poll(new PollRequest(context.nextCorrelationId(), context.currentTerm(), context.clusterManager()
            .localNode()
            .member()
            .id(), lastIndex, lastTerm)).whenComplete((result2, error2) -> {
            client.close();
            if (!complete.get()) {
              if (error2 != null) {
                LOGGER.warn(context.clusterManager().localNode().toString(), error2);
                quorum.fail();
              } else if (!result2.voteGranted()) {
                LOGGER.info("{} - Received rejected vote from {}", context.clusterManager().localNode(), node.member());
                quorum.fail();
              } else if (result2.term() != context.currentTerm()) {
                LOGGER.info("{} - Received successful vote for a different term from {}", context.clusterManager()
                  .localNode(), node.member());
                quorum.fail();
              } else {
                LOGGER.info("{} - Received successful vote from {}", context.clusterManager()
                  .localNode(), node.member());
                quorum.succeed();
              }
            }
          });
        }
      });
    }
  }

  @Override
  public CompletableFuture<PingResponse> ping(PingRequest request) {
    // If the request indicates a term that is greater than the current term then
    // assign that term and leader to the current context and step down as a candidate.
    if (request.term() >= context.currentTerm()) {
      context.currentTerm(request.term());
      context.currentLeader(null);
      context.lastVotedFor(null);
      context.transition(FollowerController.class);
    }
    return super.ping(request);
  }

  @Override
  public CompletableFuture<PollResponse> poll(PollRequest request) {
    // If the request indicates a term that is greater than the current term then
    // assign that term and leader to the current context and step down as a candidate.
    if (request.term() > context.currentTerm()) {
      context.currentTerm(request.term());
      context.currentLeader(null);
      context.lastVotedFor(null);
      context.transition(FollowerController.class);
    }

    // If the vote request is not for this candidate then reject the vote.
    if (!request.candidate().equals(context.clusterManager().localNode().member().id())) {
      return CompletableFuture.completedFuture(logResponse(new PollResponse(logRequest(request).id(), context.currentTerm(), false)));
    } else {
      return super.poll(request);
    }
  }

  @Override
  synchronized void destroy() {
    if (currentTimer != null) {
      LOGGER.debug("{} - Cancelling election", context.clusterManager().localNode());
      currentTimer.cancel(true);
    }
    if (quorum != null) {
      quorum.cancel();
      quorum = null;
    }
  }

  @Override
  public String toString() {
    return String.format("CandidateController[context=%s]", context);
  }

}
TOP

Related Classes of net.kuujo.copycat.internal.state.CandidateController

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.