Package com.facebook.thrift.direct_server

Source Code of com.facebook.thrift.direct_server.SelectorThread


package com.facebook.thrift.direct_server;

import java.io.IOException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.spi.AbstractSelectableChannel;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ThreadPoolExecutor;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SelectorThread extends Thread {

  private static Logger LOG =
      LoggerFactory.getLogger(SelectorThread.class);

  // Max number of handlers that can be handled in the selector thread
  // on a single select() invocation
  private final int numSyncHandlers_;

  private final DirectServer directServer;

  private volatile Selector selector_;

  // List of finished asynchronous tasks.
  private List<DirectServerTask> pendingTasks_;

  // The max number of connections that a single select can handle
  private final static int NUM_HANDLERS = 512;

  private final ThreadPoolExecutor executorService_;

  // A handler passed from DirectServer
  private final ChannelHandler[] newHandlers_;

  private volatile boolean stopping_ = false;

  SelectorThread(
      DirectServer s,
      ThreadPoolExecutor es,
      int numPendingChannelHandlers) {

    try {
      // TODO remove IO operation out of Ctor
      selector_ = Selector.open();
    } catch (IOException e) {
      LOG.warn("in constructor get", e);
    }

    directServer = s;
    numSyncHandlers_ = s.maxSyncHandlers();
    executorService_ = es;

    newHandlers_ = new ChannelHandler[numPendingChannelHandlers];
  }

  public Selector selector() {
    return selector_;
  }

  public DirectServer.LoggingCallback getLoggingCallback() {
    return directServer.getLoggingCallback();
  }

  // The thread pool worker calls this after the async task is done.
  void notifyExecutorServiceTaskComplete(DirectServerTask t) {
    synchronized(this) {
      if (pendingTasks_ == null) {
        pendingTasks_ = new ArrayList<DirectServerTask>();
      }
      pendingTasks_.add(t);
    }

    selector_.wakeup();
  }

  void addHandler(ChannelHandler h) {
    ChannelHandler discarding = null;
    boolean needNotify = false;

    synchronized(newHandlers_) {
      for (int i = 0; i < newHandlers_.length; ++i) {
        if (newHandlers_[i] != null) {
          continue;
        }
        newHandlers_[i] = h;
        h = null;
        if (i == 0) {
          needNotify = true;
        }
        break;
      }

      // shift newHandlers_ if it is full.
      if (h != null) {
        discarding = newHandlers_[0];
        for (int i = 1; i < newHandlers_.length; ++i) {
          newHandlers_[i - 1] = newHandlers_[i];
        }
        newHandlers_[newHandlers_.length - 1] = h;
      }
    }

    if (discarding == null) {
      if (needNotify) {
        selector_.wakeup();
      }
      return;
    }

    LOG.warn("Discard a pending connection");
    try {
      discarding.close();
    } catch (Exception e) {
      LOG.warn("Closing a channel:", e);
    }
  }

  @Override public void run() {
    ChannelHandler[] handlers = new ChannelHandler[NUM_HANDLERS];

    while (true) {
      try {
        selector_.select();
      } catch (IOException e) {
        LOG.warn("continue with exception", e);
        continue;
      } catch (Exception e) {
        LOG.error("aborting with exception ", e);
        break;
      }

      int numReqs = 0;
      boolean stopping = stopping_;

      // Check if we have any new connections
      synchronized(newHandlers_) {
        for (int i = 0; i < newHandlers_.length; ++i) {
          if (newHandlers_[i] == null) {
            break;
          }

          handlers[numReqs] = newHandlers_[i];
          newHandlers_[i] = null;
          ++numReqs;
        }
      }

      for (int i = 0; i < numReqs; ++i) {
        AbstractSelectableChannel result = handlers[i].channel();
        if (!stopping) {
          try {
            result.register(selector_, SelectionKey.OP_READ, handlers[i]);
          } catch (Exception e) {
            LOG.warn("Fails to register a channel", e);
          }
        } else {
          try {
            handlers[i].close();
            handlers[i] = null;
          } catch (Exception e) {
            LOG.warn("Fails to close a connection", e);
          }
        }
      }

      if (stopping) {
        numReqs = 0;
      }

      // Examin new active connections
      for (Iterator<SelectionKey> it = selector_.selectedKeys().iterator(); it
          .hasNext();) {
        SelectionKey key = it.next();

        // Remove from selected keys set to indicate that it is processed.
        it.remove();

        if (numReqs < NUM_HANDLERS && !stopping) {
          handlers[numReqs] = (ChannelHandler)key.attachment();
          ++numReqs;
        } else {
          // Too many active connections, close some of them.
          ChannelHandler handler = (ChannelHandler)key.attachment();
          key.cancel();
          try {
            handler.close();
          } catch (Exception e) {
            LOG.warn("Fails to close a connection", e);
          }
          LOG.info("Closed a connection to reduce load");
        }
      }


      // If we have too many connections, move some of them to thread pool.
      for (int i = numSyncHandlers_; i < numReqs; ++i) {
        ChannelHandler handler = handlers[i];
        handlers[i] = null;
        if (executorService_ == null) {
          handler.transition(this);
        } else {
          handler.wouldUseThreadPool(true);
          if (!handler.canUseThreadPool()) {
            handler.transition(this);
          }
          if (handler.canUseThreadPool()) {
            new DirectServerTask(this, handler).executingBy(
                executorService_);
          }
        }
      }

      // Handle synchronous requests
      int numSyncReqs = numReqs > numSyncHandlers_ ? numSyncHandlers_ : numReqs;
      for (int i = 0; i < numSyncReqs; ++i) {
        handlers[i].transition(this);
        handlers[i] = null;
      }

      // Processing finished asynchronous tasks.
      if (executorService_ != null) {
        List<DirectServerTask> tasks = null;
        synchronized(this) {
          if (pendingTasks_ != null && pendingTasks_.size() > 0) {
            tasks = pendingTasks_;
            pendingTasks_ = null;
          }
        }

        if (tasks != null) {
          for (DirectServerTask it : tasks) {
            it.handler().transition(this);
          }
        }
      }

      // Back to the beginning of a infinite loop.
    }
  }

  public void Stop() {
    stopping_ = true;
  }
};
TOP

Related Classes of com.facebook.thrift.direct_server.SelectorThread

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.