/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zookeeper.server.quorum;
import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT;
import java.util.ArrayList;
import java.util.concurrent.TimeUnit;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.PortAssignment;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.TestableZooKeeper;
import org.apache.zookeeper.test.ClientBase;
import org.apache.zookeeper.test.ReconfigTest;
import org.apache.zookeeper.AsyncCallback.StatCallback;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
import org.junit.Assert;
import org.junit.Test;
public class StandaloneDisabledTest extends QuorumPeerTestBase {
private final int NUM_SERVERS = 5;
private MainThread peers[];
private ZooKeeper zkHandles[];
private int clientPorts[];
private final int leaderId = 0;
private final int follower1 = 1;
private final int follower2 = 2;
private final int observer1 = 3;
private final int observer2 = 4;
private ArrayList<String> serverStrings;
private ArrayList<String> reconfigServers;
/**
* Test normal quorum operations work cleanly
* with just a single server.
*/
@Test
public void startSingleServerTest() throws Exception {
setUpData();
//start one server
startServer(leaderId, serverStrings.get(leaderId) + "\n");
ReconfigTest.testServerHasConfig(zkHandles[leaderId], null, null);
LOG.info("Initial Configuration:\n"
+ new String(zkHandles[leaderId].getConfig(this, new Stat())));
//start and add 2 followers
startFollowers();
testReconfig(leaderId, true, reconfigServers);
LOG.info("Configuration after adding 2 followers:\n"
+ new String(zkHandles[leaderId].getConfig(this, new Stat())));
//shutdown leader- quorum should still exist
shutDownServer(leaderId);
ReconfigTest.testNormalOperation(zkHandles[follower1], zkHandles[follower2]);
//should not be able to remove follower 2
//No quorum in new config (1/2)
reconfigServers.clear();
reconfigServers.add(Integer.toString(follower2));
try {
ReconfigTest.reconfig(zkHandles[follower1], null, reconfigServers, null, -1);
Assert.fail("reconfig completed successfully even though there is no quorum up in new config!");
} catch (KeeperException.NewConfigNoQuorum e) { }
//reconfigure out leader and follower 1. Remaining follower
//2 should elect itself as leader and run by itself
reconfigServers.clear();
reconfigServers.add(Integer.toString(leaderId));
reconfigServers.add(Integer.toString(follower1));
testReconfig(follower2, false, reconfigServers);
LOG.info("Configuration after removing leader and follower 1:\n"
+ new String(zkHandles[follower2].getConfig(this, new Stat())));
//Add two participants and change them to observers to check
//that we can reconfigure down to one participant with observers.
ArrayList<String> observerStrings = new ArrayList<String>();
startObservers(observerStrings);
testReconfig(follower2, true, reconfigServers); //add partcipants
testReconfig(follower2, true, observerStrings); //change to observers
LOG.info("Configuration after adding two observers:\n"
+ new String(zkHandles[follower2].getConfig(this, new Stat())));
shutDownData();
}
/**
* Initialize private data for test.
*/
private void setUpData() throws Exception {
ClientBase.setupTestEnv();
QuorumPeerConfig.setStandaloneEnabled(false);
peers = new MainThread[NUM_SERVERS];
zkHandles = new ZooKeeper[NUM_SERVERS];
clientPorts = new int[NUM_SERVERS];
serverStrings = buildServerStrings();
reconfigServers = new ArrayList<String>();
}
/**
* Stop server threads.
*/
private void shutDownData() throws Exception {
for (int i = 0; i < NUM_SERVERS; i++) {
zkHandles[i].close();
}
for (int i = 1; i < NUM_SERVERS; i++) {
peers[i].shutdown();
}
}
/**
* Create config strings that will be used for
* the test servers.
*/
private ArrayList<String> buildServerStrings() {
ArrayList<String> serverStrings = new ArrayList<String>();
for(int i = 0; i < NUM_SERVERS; i++) {
clientPorts[i] = PortAssignment.unique();
String server = "server." + i + "=localhost:" + PortAssignment.unique()
+":"+PortAssignment.unique() + ":participant;"
+ "localhost:" + clientPorts[i];
serverStrings.add(server);
}
return serverStrings;
}
/**
* Starts a single server in replicated mode,
* initializes its client, and waits for it
* to be connected.
*/
private void startServer(int id, String config) throws Exception {
peers[id] = new MainThread(id, clientPorts[id], config);
zkHandles[id] = new ZooKeeper("127.0.0.1:" + clientPorts[id],
CONNECTION_TIMEOUT, this);
peers[id].start();
Assert.assertTrue("Server " + id + " is not up",
ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[id], CONNECTION_TIMEOUT));
Assert.assertTrue("Error- Server started in Standalone Mode!",
peers[id].isQuorumPeerRunning());
}
/**
* Shuts down a server, waits for it to disconnect,
* and gives enough time for the learner handler
* in its ensemble to realize it's been shut down.
*/
private void shutDownServer(int id) throws Exception {
peers[id].shutdown();
ClientBase.waitForServerDown("127.0.0.1:" + clientPorts[id], CONNECTION_TIMEOUT);
TimeUnit.SECONDS.sleep(25);
}
/**
* Starts servers 1 & 2 as participants and
* adds them to the list to be reconfigured
* into the ensemble.
*/
private void startFollowers() throws Exception {
reconfigServers.clear();
for(int i = 1; i <= 2; i++) {
String config = serverStrings.get(leaderId) + "\n"
+ serverStrings.get(i) + "\n"
+ serverStrings.get(i % 2 + 1) + "\n";
startServer(i, config);
reconfigServers.add(serverStrings.get(i));
}
}
/**
* Starts servers 1 & 2 as participants,
* adds them to the list to be reconfigured
* into the ensemble, and adds an observer
* version of their information to a list
* so they will be turned into observers later.
*/
private void startObservers(ArrayList<String> observerStrings) throws Exception {
reconfigServers.clear();
for(int i = observer1; i <= observer2; i++) {
String config = serverStrings.get(follower2) + "\n"
+ serverStrings.get(i) + "\n";
startServer(i, config);
reconfigServers.add(serverStrings.get(i));
observerStrings.add(serverStrings.get(i).replace("participant", "observer"));
}
}
/**
* Calls reconfig on the client corresponding to id to add or remove
* the given servers. Tests appropriately to make sure the
* reconfig succeeded.
*/
private void testReconfig(int id, boolean adding,
ArrayList<String> servers) throws Exception {
if (adding) {
ReconfigTest.reconfig(zkHandles[id], servers, null, null, -1);
for (String server : servers) {
int id2 = Integer.parseInt(server.substring(7, 8)); //server.#
ReconfigTest.testNormalOperation(zkHandles[id], zkHandles[id2]);
}
} else {
ReconfigTest.reconfig(zkHandles[id], null, servers, null, -1);
}
ReconfigTest.testServerHasConfig(zkHandles[id], servers, null);
}
/**
* Ensure observer cannot start by itself
**/
@Test
public void startObserver() throws Exception {
int clientPort = PortAssignment.unique();
String config = "server." + observer1 + "=localhost:"+ PortAssignment.unique()
+ ":" + clientPort + ":observer;"
+ "localhost:" + PortAssignment.unique();
MainThread observer = new MainThread(observer1, clientPort, config);
observer.start();
Assert.assertFalse("Observer was able to start by itself!",
ClientBase.waitForServerUp("127.0.0.1:" + clientPort, CONNECTION_TIMEOUT));
}
}