Package org.jredis.ri.alphazero.connection

Source Code of org.jredis.ri.alphazero.connection.ConnectionBase

/*
*   Copyright 2009 Joubin Houshyar
*
*   Licensed under the Apache License, Version 2.0 (the "License");
*   you may not use this file except in compliance with the License.
*   You may obtain a copy of the License at
*   
*   http://www.apache.org/licenses/LICENSE-2.0
*   
*   Unless required by applicable law or agreed to in writing, software
*   distributed under the License is distributed on an "AS IS" BASIS,
*   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*   See the License for the specific language governing permissions and
*   limitations under the License.
*/

package org.jredis.ri.alphazero.connection;

import static org.jredis.connector.ConnectionSpec.SocketFlag.SO_KEEP_ALIVE;
import static org.jredis.connector.ConnectionSpec.SocketProperty.SO_PREF_BANDWIDTH;
import static org.jredis.connector.ConnectionSpec.SocketProperty.SO_PREF_CONN_TIME;
import static org.jredis.connector.ConnectionSpec.SocketProperty.SO_PREF_LATENCY;
import static org.jredis.connector.ConnectionSpec.SocketProperty.SO_RCVBUF;
import static org.jredis.connector.ConnectionSpec.SocketProperty.SO_SNDBUF;
import static org.jredis.connector.ConnectionSpec.SocketProperty.SO_TIMEOUT;
import static org.jredis.ri.alphazero.support.Assert.notNull;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.jredis.ClientRuntimeException;
import org.jredis.NotSupportedException;
import org.jredis.ProviderException;
import org.jredis.RedisException;
import org.jredis.connector.Connection;
import org.jredis.connector.ConnectionSpec;
import org.jredis.protocol.Command;
import org.jredis.protocol.Protocol;
import org.jredis.protocol.Response;
import org.jredis.ri.alphazero.protocol.SynchProtocol;
import org.jredis.ri.alphazero.protocol.ConcurrentSynchProtocol;
import org.jredis.ri.alphazero.support.Assert;
import org.jredis.ri.alphazero.support.Convert;
import org.jredis.ri.alphazero.support.FastBufferedInputStream;
import org.jredis.ri.alphazero.support.Log;

/**
* This abstract class is responsible for managing the socket connection, and, defining
* the template of the Connection for concrete extensions.  Given that, it basically
* manages all the details of dealing with {@link Socket}, and maintains the reference to
* the handler. 
* <p>
* Further, it provides the default {@link NotSupportedException} response for the
* {@link Connection}'s methods that the extending classes of various {@link Connection.Modality}
* are expected to support.  (They would simply implement the method that they support.)
*
* @author  Joubin Houshyar (alphazero@sensesay.net)
* @version alpha.0, Apr 20, 2009
* @since   alpha.0
*
*/

public abstract class ConnectionBase implements Connection {

  // ------------------------------------------------------------------------
  // Properties
  // ------------------------------------------------------------------------
 
  /** Protocol specific matters are delegated to an instance of {@link Protocol} */
  protected Protocol       protocol;
 
  /** Connection specs used to create this {@link Connection} */
  final
  protected ConnectionSpec    spec;
 
  private InputStream        instream;
  private OutputStream      outstream;

  private boolean       isConnected = false;
 
  /** PINGs for heartbeat */
  private HeartbeatJinn      heartbeat;

  // ------------------------------------------------------------------------
  // Internal use fields
  // ------------------------------------------------------------------------
 
 
  /** address of the socket connection */
  private final InetSocketAddress    socketAddress;
 
  /** socket reference -- a new instance obtained in {@link ConnectionBase#newSocketConnect()} */
  private Socket  socket;
 
  // ------------------------------------------------------------------------
  // Constructors
  // ------------------------------------------------------------------------
 
  /**
   * Will create and initialize a socket per the connection spec. Will connect immediately.
   *
   * @See {@link ConnectionSpec}
   * @param spec
   * @throws ClientRuntimeException if connection attempt to specified host is not possible.
   */
  protected ConnectionBase (ConnectionSpec spec)
    throws ClientRuntimeException
  {
    this(spec, true);
  }
 
  /**
   * Will create and initialize a socket per the connection spec.
   * @See {@link ConnectionSpec}
   * @param spec
   * @param connectImmediately will connect the socket immediately if true
   * @throws ClientRuntimeException if connection attempt to specified host is not possible and
   * connect immediate was requested.
   */
  protected ConnectionBase (ConnectionSpec spec, boolean connectImmediately)
    throws ClientRuntimeException
  {
    try {
      this.spec = notNull(spec, "ConnectionSpec init parameter", ClientRuntimeException.class);
      socketAddress = new InetSocketAddress(spec.getAddress(), spec.getPort());
      initializeComponents();
//      if(connectImmediately) {
//        connect ();
//      }
    }
    catch (IllegalArgumentException e) {
      throw new ClientRuntimeException
        ("invalid connection spec parameters: " + e.getLocalizedMessage(), e);
    }
    catch (Exception e) {
      throw new ProviderException("Unexpected error on initialize -- BUG", e);
    }
   
    if(connectImmediately) { connect (); }
  }
 
  // ------------------------------------------------------------------------
  // Interface
  // ============================================================ Connection
  /*
   * These are the extension points for the concrete connection classes.
   */
  // ------------------------------------------------------------------------

//  @Override
  public Response serviceRequest(Command cmd, byte[]... args)
      throws RedisException, ClientRuntimeException, ProviderException
  {
    throw new NotSupportedException (
        "Response.serviceRequest(Command cmd, " +
        "byte[]...) is not supported.");
  }

//  @Override
  public Future<Response> queueRequest(Command cmd, byte[]... args)
    throws ClientRuntimeException, ProviderException
  {
    throw new NotSupportedException (
        "Response.serviceRequest(RequestListener requestListener, " +
        "Object , Command, byte[]...) is not supported.");
  }
 
  // ------------------------------------------------------------------------
  // Internal ops : Extension points
  // ------------------------------------------------------------------------
  /**
     * Extension point: child classes may override for additional components:
     * <pre>
     * In the extended class:
     * <code>
     * protected void initializeComponents() {
     *    super.initializeComponents();
     *    // my components here ...
     *    //
     * }
     * </code>
     * </pre>
     */
    protected void initializeComponents () {
    setProtocolHandler (Assert.notNull (newProtocolHandler(), "the delegate protocol handler", ClientRuntimeException.class));

    if(spec.isReliable()){
        heartbeat = new HeartbeatJinn(this, this.spec.getHeartbeat(), "connection [" + hashCode() + "] heartbeat");
        heartbeat.start();
    }
    }

    /**
     * Extension point -- callback on this method when {@link ConnectionBase} has connected to server.
     * <b>It is important to note that the extension must call super.notifyConnected</b> if reliable service (using
     * heartbeats) is required!.
     */
    protected void notifyConnected () {
      if (spec.isReliable()){
        heartbeat.notifyConnected();
      }
    }
    /**
     * Extension point -- callback on this method when {@link ConnectionBase} has disconnected from server.
     * <b>It is important to note that the extension must call super.notifyDisconnected</b> if reliable service (using
     * heartbeats) is required!.
     */
    protected void notifyDisconnected () {
      if (spec.isReliable()){
        heartbeat.notifyDisconnected();
      }
    }
    /**
     * Extension point:  child classes may override to return specific {@link Protocol} implementations per their requirements.
     * @return
     */
    protected Protocol newProtocolHandler () {
    return spec.isShared() ? new ConcurrentSynchProtocol() : new SynchProtocol()// TODO: rewire it to get it from the ProtocolManager
    }
   
    /**
     * Extension point: override to return stream per requirement.  Base implementation uses {@link FastBufferedInputStream} by default,
     * with buffer size matching the SO_RCVBUF property of the {@link Connection}'s {@link ConnectionSpec}
     * @param socketInputStream
     * @return
     */
    protected InputStream newInputStream(InputStream socketInputStream) {
      return  new FastBufferedInputStream(socketInputStream, spec.getSocketProperty(SO_RCVBUF));
    }
   
    /**
     * Extension point: override to return stream per requirement.  Base implementation simply returns the input param
     * @param socketOutputStream
     * @return
     */
    protected OutputStream newOutputStream(OutputStream socketOutputStream) { return socketOutputStream; }
   
  // ------------------------------------------------------------------------
  // Inner ops: socket and connection management
  // ------------------------------------------------------------------------

  /** @return connected status*/
  protected final boolean isConnected () { return isConnected; }
 
 
  /**
   * Attempt reconnect.  Must be in a (previously) connected state when called.
   * @throws IllegalStateException if not (logically) connected.
   */
  protected final void reconnect () {
    Log.log("RedisConnection - reconnecting");
    int attempts = 0;

    while(true){
      try {
        disconnect();
        connect ();
        break;
      }
      catch (RuntimeException e){
        Log.error("while attempting reconnect: " + e.getMessage());
        if(++attempts == spec.getReconnectCnt()) {
          Log.problem("Retry limit exceeded attempting reconnect.");
          throw new ClientRuntimeException ("Failed to reconnect to the server.");
        }
      }
    }
  }
  /**
   * @throws IOException
   * @throws IllegalStateException
   */
  protected final void connect () throws IllegalStateException, ClientRuntimeException {
    // we're not connected
    Assert.isTrue (!isConnected(), IllegalStateException.class);

    // create new socket and connect
    //
    try {
      newSocketConnect();
    }
    catch (IOException e) {
      throw new ClientRuntimeException(
        "Socket connect failed -- make sure the server is running at " + spec.getAddress().getHostName(), e);
    }
   
    // get the streams
    //
    try {
      initializeSocketStreams ();
    }
    catch (IOException e) {
      throw new ClientRuntimeException("Error obtaining connected socket's streams ", e);
    }
   
    isConnected = true;
   
    try {
          initializeConnection();
        }
        catch (RedisException e) {
          // either authorize or db select is using invalid parameters
          // which is user error
          throw new IllegalArgumentException("Failed to connect -- check credentials and/or database settings for the connection spec", e);
        }
   
//    Log.log("RedisConnection - connected");
    notifyConnected();
  }

  /**
   * @throws IllegalStateException
   */
  protected final void disconnect () throws IllegalStateException {
    Assert.isTrue (isConnected(), IllegalStateException.class);
   
    socketClose();
    isConnected = false;

    notifyDisconnected();
//    Log.log("RedisConnection - disconnected");
  }
 
  /**
   * Instantiates a new {@link Socket}, sets its properties and flags using the {@link ConnectionBase#spec}
   * and finally connects to the {@link ConnectionBase#socketAddress}.
   * <p>
   * Note that if the platform default send and receive buffers are larger than that specified, this method
   * will <b>not</b> use the (smaller) values defined in the spec.
   * <p>
   * Further note that method will not check connection state.
   *
   * @throws IOException thrown by the socket object.
   */
  private final void newSocketConnect ()
    throws IOException
  {
    socket = new Socket ();
   
    socket.setKeepAlive (
        spec.getSocketFlag (SO_KEEP_ALIVE));
   
    socket.setPerformancePreferences(
        spec.getSocketProperty (SO_PREF_CONN_TIME),
        spec.getSocketProperty (SO_PREF_LATENCY),
        spec.getSocketProperty (SO_PREF_BANDWIDTH));

    socket.setSoTimeout(
        spec.getSocketProperty(SO_TIMEOUT));

    if(socket.getSendBufferSize() < spec.getSocketProperty(SO_SNDBUF))
      socket.setSendBufferSize(spec.getSocketProperty(SO_SNDBUF));
   
    if(socket.getReceiveBufferSize() < spec.getSocketProperty(SO_RCVBUF))
      socket.setReceiveBufferSize(spec.getSocketProperty(SO_RCVBUF));
   
    socket.connect(socketAddress);
   
//    Log.log("RedisConnection - socket connected to %s:%d", socketAddress.getHostName(), port);
  }

  /**
   *
   */
  private final void socketClose () {
    try {
      if(null != socket) socket.close();
    }
    catch (IOException e) {
      Log.error("[IO] on closeSocketConnect -- socketClose() continues ..." + e.getLocalizedMessage());
    }
    finally {
      socket = null;
      instream = null;
      outstream = null;
    }
  }
 
  /**
   * @throws IllegalStateException if socket is null
   * @throws IOException thrown by socket instance stream accessors
   */
  protected final void initializeSocketStreams() throws IllegalArgumentException, IOException {
    instream = newInputStream (Assert.notNull(socket.getInputStream(), "socket input stream", IllegalArgumentException.class));
    Assert.notNull(instream, "input stream provided by extended class", IllegalArgumentException.class);
    outstream = newOutputStream (Assert.notNull(socket.getOutputStream(), "socket output stream", IllegalArgumentException.class));
  }
 
  /**
   * @throws RedisException
   * @throws ClientRuntimeException
   * @throws ProviderException
     *
     */
    protected final void initializeConnection () throws ProviderException, ClientRuntimeException, RedisException{
      switch (getModality()){
      case Asynchronous:
        initializeAsynchConnection();
        break;
      case Synchronous:
        initializeSynchConnection();
        break;
      default:
        throw new ProviderException("Modality " + getModality().name() + " is not supported.");
      }
    }

    /**
     * @throws ProviderException
     * @throws ClientRuntimeException
     * @throws RedisException
     */
    protected final void initializeSynchConnection () throws ProviderException, ClientRuntimeException, RedisException{
    if(null!=spec.getCredentials()) {
      this.serviceRequest(Command.AUTH, spec.getCredentials());
    }
    if(spec.getDatabase() != 0) {
      this.serviceRequest(Command.SELECT, Convert.toBytes(spec.getDatabase()));
    }
    }
    /**
     * @throws ProviderException
     * @throws ClientRuntimeException
     * @throws RedisException
     */
    protected final void initializeAsynchConnection () throws ProviderException, ClientRuntimeException, RedisException{
      try {
        if(null!=spec.getCredentials()) {
          this.queueRequest(Command.AUTH, spec.getCredentials()).get();
        }
        if(spec.getDatabase() != 0) {
          this.queueRequest(Command.SELECT, Convert.toBytes(spec.getDatabase())).get();
        }
        }
        catch (InterruptedException e) {
          e.printStackTrace();
          throw new ClientRuntimeException("Interrupted while initializing asynchronous connection", e);
        }
        catch (ExecutionException e) {
          e.printStackTrace();
          if(e.getCause() != null){
            if(e.getCause() instanceof RedisException)
              throw (RedisException) e.getCause();
            else if(e.getCause() instanceof ProviderException)
              throw (ProviderException) e.getCause();
            else if(e.getCause() instanceof ClientRuntimeException)
              throw (ClientRuntimeException) e.getCause();
          }
          throw new ProviderException("Exception while initializing asynchronous connection", e);
        }
    }
 
  // ------------------------------------------------------------------------
  // Property accessors
  // ------------------------------------------------------------------------
 
  final protected void setProtocolHandler(Protocol protocolHandler) {
    this.protocol = notNull(protocolHandler, "protocolHandler for ConnectionBase", ClientRuntimeException.class);
  }
 
  final protected Protocol getProtocolHandler() {
    return notNull(protocol, "protocolHandler for ConnectionBase", ClientRuntimeException.class);
  }

  final protected OutputStream getOutputStream() {
    return outstream;
  }

  final protected InputStream getInputStream() {
    return instream;
  }
}
TOP

Related Classes of org.jredis.ri.alphazero.connection.ConnectionBase

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.