Package org.linkedin.zookeeper.server

Source Code of org.linkedin.zookeeper.server.StandaloneZooKeeperServer

/*
* Copyright 2010-2010 LinkedIn, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/


package org.linkedin.zookeeper.server;

import org.slf4j.Logger;
import org.apache.zookeeper.server.NIOServerCnxn;
import org.apache.zookeeper.server.NIOServerCnxnFactory;
import org.apache.zookeeper.server.ServerConfig;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
import org.linkedin.util.annotations.Initializable;
import org.linkedin.util.annotations.Initializer;
import org.linkedin.util.clock.Clock;
import org.linkedin.util.clock.ClockUtils;
import org.linkedin.util.clock.SystemClock;
import org.linkedin.util.concurrent.ConcurrentUtils;
import org.linkedin.util.exceptions.InternalException;
import org.linkedin.util.lifecycle.Shutdownable;
import org.linkedin.util.lifecycle.Startable;
import java.net.InetSocketAddress;

import java.io.File;
import java.io.IOException;
import java.util.Properties;
import java.util.concurrent.TimeoutException;

/**
* Encapsulates creation of a standalone zookeeper server (most likely
* going to be used for testing only)
*
* @author ypujante@linkedin.com
*/
public class StandaloneZooKeeperServer implements Startable, Shutdownable
{
  public static final String MODULE = StandaloneZooKeeperServer.class.getName();
  public static final Logger log = org.slf4j.LoggerFactory.getLogger(MODULE);

  @Initializable
  public Clock clock = SystemClock.INSTANCE;

  private String _tickTime;
  private int _clientPort;
  private String _dataDir;

  private Thread _mainThread;
 
  private volatile boolean _shutdown = false;
  private NIOServerCnxnFactory _cnxnFactory;
  private ZooKeeperServer _zkServer;

  public StandaloneZooKeeperServer(String tickTime, String dataDir, int clientPort)
  {
    _dataDir = dataDir;
    _clientPort = clientPort;
    _tickTime = tickTime;
  }

  StandaloneZooKeeperServer()
  {
    _tickTime = "2000";
    _clientPort = 2120;
  }

  public int getClientPort()
  {
    return _clientPort;
  }

  @Initializer
  public void setClientPort(int clientPort)
  {
    _clientPort = clientPort;
  }

  public String getDataDir()
  {
    return _dataDir;
  }

  @Initializer(required = true)
  public void setDataDir(String dataDir)
  {
    _dataDir = dataDir;
  }

  public String getTickTime()
  {
    return _tickTime;
  }

  @Initializer
  public void setTickTime(String tickTime)
  {
    _tickTime = tickTime;
  }

  @Override
  public synchronized void start()
  {
    try
    {
      QuorumPeerConfig qpc = new QuorumPeerConfig();
      Properties p = new Properties();
      p.setProperty("tickTime", _tickTime);
      p.setProperty("clientPort", String.valueOf(_clientPort));
      p.setProperty("dataDir", _dataDir);
      qpc.parseProperties(p);

      ServerConfig serverConfig = new ServerConfig();
      serverConfig.readFrom(qpc);

      runFromConfig(serverConfig);

      _mainThread = new Thread(new Runnable()
      {
        @Override
        public void run()
        {
          try
          {
            _cnxnFactory.join();
          }
          catch(InterruptedException e)
          {
            throw new InternalException(e);
          }
        }
      });
      _mainThread.start();
    }
    catch(Exception e)
    {
      throw new InternalException(e);
    }
  }

  /**
   * This method is coming from <code>ZooKeeperServerMain</code> but tweaked to not block and to
   * have access to <code>_zkServer</code> and <code>_cnxnFactory</code>.
   */
  private void runFromConfig(ServerConfig config) throws IOException
  {
    log.info("Starting server");
    try
    {
      _zkServer = new ZooKeeperServer();

      FileTxnSnapLog ftxn = new FileTxnSnapLog(new File(config.getDataLogDir()),
                                               new File(config.getDataDir()));
      _zkServer.setTxnLogFactory(ftxn);
      _zkServer.setTickTime(config.getTickTime());
      _zkServer.setMinSessionTimeout(config.getMinSessionTimeout());
      _zkServer.setMaxSessionTimeout(config.getMaxSessionTimeout());
      _cnxnFactory = new NIOServerCnxnFactory();
      _cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns());
                                             
      _cnxnFactory.startup(_zkServer);
    }
    catch(InterruptedException e)
    {
      // warn, but generally this is ok
      log.warn("Server interrupted", e);
    }
  }

  @Override
  public synchronized void shutdown()
  {
    if(_mainThread == null)
      throw new IllegalStateException("not started");

    if(_shutdown)
      return;

    _cnxnFactory.shutdown();

    _shutdown = true;
  }

  @Override
  public void waitForShutdown() throws InterruptedException, IllegalStateException
  {
    try
    {
      waitForShutdown(-1);
    }
    catch(TimeoutException e)
    {
      throw new RuntimeException(e);
    }
  }

  /**
   * Waits for shutdown to be completed. After calling shutdown, there may still be some pending work
   * that needs to be accomplised. This method will block until it is done but no longer than the
   * timeout.
   *
   * @param timeout how long to wait maximum for the shutdown (see {@link ClockUtils#toTimespan(Object)})
   * @throws InterruptedException  if interrupted while waiting
   * @throws IllegalStateException if shutdown has not been called
   * @throws TimeoutException      if shutdown still not complete after timeout
   */
  @Override
  public void waitForShutdown(Object timeout)
    throws InterruptedException, IllegalStateException, TimeoutException
  {
    if(!_shutdown)
      throw new IllegalStateException("call shutdown first");
    ConcurrentUtils.joinFor(clock, _mainThread, timeout);

  }
}
TOP

Related Classes of org.linkedin.zookeeper.server.StandaloneZooKeeperServer

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.