Package l2p.commons.net.nio.impl

Source Code of l2p.commons.net.nio.impl.SelectorThread

package l2p.commons.net.nio.impl;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@SuppressWarnings("unchecked")
public class SelectorThread<T extends MMOClient> extends Thread
{
  private static final Logger _log = LoggerFactory.getLogger(SelectorThread.class);

  private final Selector _selector = Selector.open();

  // Implementations
  private final IPacketHandler<T> _packetHandler;
  private final IMMOExecutor<T> _executor;
  private final IClientFactory<T> _clientFactory;
  private IAcceptFilter _acceptFilter;

  private boolean _shutdown;

  // Configs
  private final SelectorConfig _sc;
  private final int HELPER_BUFFER_SIZE;

  // MAIN BUFFERS
  private ByteBuffer DIRECT_WRITE_BUFFER;
  private final ByteBuffer WRITE_BUFFER, READ_BUFFER;
  private T WRITE_CLIENT;

  // ByteBuffers General Purpose Pool
  private final Queue<ByteBuffer> _bufferPool;
  private final List<MMOConnection<T>> _connections;

  private static final List<SelectorThread> ALL_SELECTORS = new ArrayList<SelectorThread>();
  private static SelectorStats stats = new SelectorStats();

  public SelectorThread(SelectorConfig sc, IPacketHandler<T> packetHandler, IMMOExecutor<T> executor, IClientFactory<T> clientFactory, IAcceptFilter acceptFilter) throws IOException
  {
    synchronized (ALL_SELECTORS)
    {
      ALL_SELECTORS.add(this);
    }

    _sc = sc;
    _acceptFilter = acceptFilter;
    _packetHandler = packetHandler;
    _clientFactory = clientFactory;
    _executor = executor;

    _bufferPool = new ArrayDeque<ByteBuffer>(_sc.HELPER_BUFFER_COUNT);
    _connections = new CopyOnWriteArrayList<MMOConnection<T>>();

    DIRECT_WRITE_BUFFER = ByteBuffer.wrap(new byte[_sc.WRITE_BUFFER_SIZE]).order(_sc.BYTE_ORDER);
    WRITE_BUFFER = ByteBuffer.wrap(new byte[_sc.WRITE_BUFFER_SIZE]).order(_sc.BYTE_ORDER);
    READ_BUFFER = ByteBuffer.wrap(new byte[_sc.READ_BUFFER_SIZE]).order(_sc.BYTE_ORDER);
    HELPER_BUFFER_SIZE = Math.max(_sc.READ_BUFFER_SIZE, _sc.WRITE_BUFFER_SIZE);

    for(int i = 0; i < _sc.HELPER_BUFFER_COUNT; i++)
      _bufferPool.add(ByteBuffer.wrap(new byte[HELPER_BUFFER_SIZE]).order(_sc.BYTE_ORDER));
  }

  public void openServerSocket(InetAddress address, int tcpPort) throws IOException
  {
    ServerSocketChannel selectable = ServerSocketChannel.open();
    selectable.configureBlocking(false);

    selectable.socket().bind(address == null ? new InetSocketAddress(tcpPort) : new InetSocketAddress(address, tcpPort));
    selectable.register(getSelector(), selectable.validOps());
    setName("SelectorThread:" + selectable.socket().getLocalPort());
  }

  protected ByteBuffer getPooledBuffer()
  {
    if(_bufferPool.isEmpty())
      return ByteBuffer.wrap(new byte[HELPER_BUFFER_SIZE]).order(_sc.BYTE_ORDER);
    return _bufferPool.poll();
  }

  protected void recycleBuffer(ByteBuffer buf)
  {
    if(_bufferPool.size() < _sc.HELPER_BUFFER_COUNT)
    {
      buf.clear();
      _bufferPool.add(buf);
    }
  }

  protected void freeBuffer(ByteBuffer buf, MMOConnection<T> con)
  {
    if(buf == READ_BUFFER)
      READ_BUFFER.clear();
    else
    {
      con.setReadBuffer(null);
      recycleBuffer(buf);
    }
  }

  @Override
  public void run()
  {
    int totalKeys = 0;
    Set<SelectionKey> keys = null;
    Iterator<SelectionKey> itr = null;
    Iterator<MMOConnection<T>> conItr = null;
    SelectionKey key = null;
    MMOConnection<T> con = null;
    long currentMillis = 0L;

    // main loop
    for(;;)
      try
    {

        if(isShuttingDown())
        {
          closeSelectorThread();
          break;
        }

        currentMillis = System.currentTimeMillis();

        conItr = _connections.iterator();
        while(conItr.hasNext())
        {
          con = conItr.next();
          if (con.isPengingClose())
            if (!con.isPendingWrite() || currentMillis - con.getPendingCloseTime() >= 10000L)
            {
              closeConnectionImpl(con);
              continue;
            }
          if (con.isPendingWrite())
            if (currentMillis - con.getPendingWriteTime() >= _sc.INTEREST_DELAY)
              con.enableWriteInterest();
        }

        totalKeys = getSelector().selectNow();

        if(totalKeys > 0)
        {
          keys = getSelector().selectedKeys();
          itr = keys.iterator();

          while(itr.hasNext())
          {
            key = itr.next();
            itr.remove();

            if(key.isValid())
              try
            {
                if(key.isAcceptable())
                {
                  acceptConnection(key);
                  continue;
                }
                else if(key.isConnectable())
                {
                  finishConnection(key);
                  continue;
                }

                if(key.isReadable())
                  readPacket(key);
                if(key.isValid())
                  if(key.isWritable())
                    writePacket(key);
            }
            catch(CancelledKeyException cke)
            {

            }
          }
        }

        try
        {
          Thread.sleep(_sc.SLEEP_TIME);
        }
        catch(InterruptedException ie)
        {

        }
    }
    catch(IOException e)
    {
      _log.error("Error in " + getName(), e);

      try
      {
        Thread.sleep(1000L);
      }
      catch(InterruptedException ie)
      {

      }
    }
  }

  protected void finishConnection(SelectionKey key)
  {
    try
    {
      ((SocketChannel) key.channel()).finishConnect();
    }
    catch(IOException e)
    {
      MMOConnection<T> con = (MMOConnection<T>) key.attachment();
      T client = con.getClient();
      client.getConnection().onForcedDisconnection();
      closeConnectionImpl(client.getConnection());
    }
  }

  protected void acceptConnection(SelectionKey key)
  {
    ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
    SocketChannel sc;
    SelectionKey clientKey;
    try
    {
      while((sc = ssc.accept()) != null)
        if(getAcceptFilter() == null || getAcceptFilter().accept(sc))
        {
          sc.configureBlocking(false);
          clientKey = sc.register(getSelector(), SelectionKey.OP_READ);

          MMOConnection<T> con = new MMOConnection<T>(this, sc.socket(), clientKey);
          T client = getClientFactory().create(con);
          client.setConnection(con);
          con.setClient(client);
          clientKey.attach(con);

          _connections.add(con);
          stats.increaseOpenedConnections();
        }
        else
        {
          sc.close();
        }
    }
    catch(IOException e)
    {
      _log.error("Error in " + getName(), e);
    }
  }

  protected void readPacket(SelectionKey key)
  {
    MMOConnection<T> con = (MMOConnection<T>) key.attachment();

    if(con.isClosed())
      return;

    ByteBuffer buf;
    int result = -2;

    if((buf = con.getReadBuffer()) == null)
      buf = READ_BUFFER;

    // if we try to to do a read with no space in the buffer it will read 0 bytes
    // going into infinite loop
    if(buf.position() == buf.limit())
    {
      _log.error("Read buffer exhausted for client : " + con.getClient() + ", try to adjust buffer size, current : " + buf.capacity() + ", primary : " + (buf == READ_BUFFER) + ". Closing connection.");
      closeConnectionImpl(con);
    }
    else
    {

      try
      {
        result = con.getReadableByteChannel().read(buf);
      }
      catch(IOException e)
      {
        //error handling goes bellow
      }

      if(result > 0)
      {
        buf.flip();

        stats.increaseIncomingBytes(result);

        int i;
        // читаем из буфера максимум пакетов
        for(i = 0; this.tryReadPacket2(key, con, buf); i++)
        {}
      }
      else if(result == 0)
        closeConnectionImpl(con);
      else if(result == -1)
        closeConnectionImpl(con);
      else
      {
        con.onForcedDisconnection();
        closeConnectionImpl(con);
      }
    }

    if(buf == READ_BUFFER)
      buf.clear();
  }

  protected boolean tryReadPacket2(SelectionKey key, MMOConnection<T> con, ByteBuffer buf)
  {
    if(con.isClosed())
      return false;

    int pos = buf.position();
    // проверяем, хватает ли нам байт для чтения заголовка и не пустого тела пакета
    if(buf.remaining() > _sc.HEADER_SIZE)
    {
      // получаем ожидаемый размер пакета
      int size = buf.getShort() & 0xffff;

      // проверяем корректность размера
      if(size <= _sc.HEADER_SIZE || size > _sc.PACKET_SIZE)
      {
        _log.error("Incorrect packet size : " + size + "! Client : " + con.getClient() + ". Closing connection.");
        closeConnectionImpl(con);
        return false;
      }

      //ожидаемый размер тела пакета
      size -= _sc.HEADER_SIZE;

      // проверяем, хватает ли байт на чтение тела
      if(size <= buf.remaining())
      {
        stats.increaseIncomingPacketsCount();
        parseClientPacket(getPacketHandler(), buf, size, con);
        buf.position(pos + size + _sc.HEADER_SIZE);

        // закончили чтение из буфера, почистим
        if(!buf.hasRemaining())
        {
          freeBuffer(buf, con);
          return false;
        }

        return true;
      }

      // не хватает данных на чтение тела пакета, сбрасываем позицию
      buf.position(pos);
    }

    if(pos == buf.capacity())
      _log.warn("Read buffer exhausted for client : " + con.getClient() + ", try to adjust buffer size, current : " + buf.capacity() + ", primary : " + (buf == READ_BUFFER) + ".");

    // не хватает данных, переносим содержимое первичного буфера во вторичный
    if(buf == READ_BUFFER)
      allocateReadBuffer(con);
    else
      buf.compact();

    return false;
  }

  protected void allocateReadBuffer(MMOConnection<T> con)
  {
    con.setReadBuffer(getPooledBuffer().put(READ_BUFFER));
    READ_BUFFER.clear();
  }

  protected boolean parseClientPacket(IPacketHandler<T> handler, ByteBuffer buf, int dataSize, MMOConnection<T> con)
  {
    T client = con.getClient();

    int pos = buf.position();

    client.decrypt(buf, dataSize);
    buf.position(pos);

    if(buf.hasRemaining())
    {
      //  apply limit
      int limit = buf.limit();
      buf.limit(pos + dataSize);
      ReceivablePacket<T> rp = handler.handlePacket(buf, client);

      if(rp != null)
      {
        rp.setByteBuffer(buf);
        rp.setClient(client);

        if(rp.read())
          con.recvPacket(rp);

        rp.setByteBuffer(null);
      }
      buf.limit(limit);
    }
    return true;
  }

  protected void writePacket(SelectionKey key)
  {
    MMOConnection<T> con = (MMOConnection<T>) key.attachment();

    prepareWriteBuffer(con);

    DIRECT_WRITE_BUFFER.flip();
    int size = DIRECT_WRITE_BUFFER.remaining();

    int result = -1;

    try
    {
      result = con.getWritableChannel().write(DIRECT_WRITE_BUFFER);
    }
    catch(IOException e)
    {
      // error handling goes on the if bellow
    }

    // check if no error happened
    if(result >= 0)
    {
      stats.increaseOutgoingBytes(result);

      // check if we written everything
      if(result != size)
        con.createWriteBuffer(DIRECT_WRITE_BUFFER);

      if(!con.getSendQueue().isEmpty() || con.hasPendingWriteBuffer())
        // запись не завершена
        con.scheduleWriteInterest();
    }
    else
    {
      con.onForcedDisconnection();
      closeConnectionImpl(con);
    }
  }

  protected T getWriteClient()
  {
    return WRITE_CLIENT;
  }

  protected ByteBuffer getWriteBuffer()
  {
    return WRITE_BUFFER;
  }

  protected void prepareWriteBuffer(MMOConnection<T> con)
  {
    WRITE_CLIENT = con.getClient();
    DIRECT_WRITE_BUFFER.clear();

    if(con.hasPendingWriteBuffer()) // если осталось что-то с прошлого раза
      con.movePendingWriteBufferTo(DIRECT_WRITE_BUFFER);

    if(DIRECT_WRITE_BUFFER.hasRemaining() && !con.hasPendingWriteBuffer())
    {
      int i;
      Queue<SendablePacket<T>> sendQueue = con.getSendQueue();
      SendablePacket<T> sp;

      for(i = 0; i < _sc.MAX_SEND_PER_PASS; i++)
      {
        synchronized (con)
        {
          if((sp = sendQueue.poll()) == null)
            break;
        }

        try
        {
          stats.increaseOutgoingPacketsCount();
          putPacketIntoWriteBuffer(sp, true); // записываем пакет в WRITE_BUFFER
          WRITE_BUFFER.flip();
          if(DIRECT_WRITE_BUFFER.remaining() >= WRITE_BUFFER.limit())
            DIRECT_WRITE_BUFFER.put(WRITE_BUFFER);
          else
            // если не осталось места в DIRECT_WRITE_BUFFER для WRITE_BUFFER то мы его запишев в следующий раз
          {
            con.createWriteBuffer(WRITE_BUFFER);
            break;
          }
        }
        catch(Exception e)
        {
          _log.error("Error in " + getName(), e);
          break;
        }
      }
    }

    WRITE_BUFFER.clear();
    WRITE_CLIENT = null;
  }

  protected final void putPacketIntoWriteBuffer(SendablePacket<T> sp, boolean encrypt)
  {
    WRITE_BUFFER.clear();

    // reserve space for the size
    int headerPos = WRITE_BUFFER.position();
    WRITE_BUFFER.position(headerPos + _sc.HEADER_SIZE);

    // write content to buffer
    sp.write();

    // size (incl header)
    int dataSize = WRITE_BUFFER.position() - headerPos - _sc.HEADER_SIZE;
    if(dataSize == 0)
    {
      WRITE_BUFFER.position(headerPos);
      return;
    }
    WRITE_BUFFER.position(headerPos + _sc.HEADER_SIZE);
    if(encrypt)
    {
      WRITE_CLIENT.encrypt(WRITE_BUFFER, dataSize);
      // recalculate size after encryption
      dataSize = WRITE_BUFFER.position() - headerPos - _sc.HEADER_SIZE;
    }

    // prepend header
    WRITE_BUFFER.position(headerPos);
    WRITE_BUFFER.putShort((short) (_sc.HEADER_SIZE + dataSize));
    WRITE_BUFFER.position(headerPos + _sc.HEADER_SIZE + dataSize);
  }

  protected SelectorConfig getConfig()
  {
    return _sc;
  }

  protected Selector getSelector()
  {
    return _selector;
  }

  protected IMMOExecutor<T> getExecutor()
  {
    return _executor;
  }

  protected IPacketHandler<T> getPacketHandler()
  {
    return _packetHandler;
  }

  protected IClientFactory<T> getClientFactory()
  {
    return _clientFactory;
  }

  public void setAcceptFilter(IAcceptFilter acceptFilter)
  {
    _acceptFilter = acceptFilter;
  }

  protected IAcceptFilter getAcceptFilter()
  {
    return _acceptFilter;
  }

  protected void closeConnectionImpl(MMOConnection<T> con)
  {
    try
    {
      // notify connection
      con.onDisconnection();
    }
    finally
    {
      try
      {
        // close socket and the SocketChannel
        con.close();
      }
      catch(IOException e)
      {
        // ignore, we are closing anyway
      }
      finally
      {
        // очистить буферы
        con.releaseBuffers();
        // очистим очереди
        con.clearQueues();
        // обнуляем соединение у клиента
        con.getClient().setConnection(null);
        // обнуляем соединение у ключа
        con.getSelectionKey().attach(null);
        // отменяем ключ
        con.getSelectionKey().cancel();

        _connections.remove(con);

        stats.decreseOpenedConnections();
      }
    }
  }

  public void shutdown()
  {
    _shutdown = true;
  }

  public boolean isShuttingDown()
  {
    return _shutdown;
  }

  protected void closeAllChannels()
  {
    Set<SelectionKey> keys = getSelector().keys();
    for(SelectionKey key : keys)
      try
    {
        key.channel().close();
    }
    catch(IOException e)
    {
      // ignore
    }
  }

  protected void closeSelectorThread()
  {
    closeAllChannels();

    try
    {
      getSelector().close();
    }
    catch(IOException e)
    {
      // Ignore
    }
  }

  public static CharSequence getStats()
  {
    StringBuilder list = new StringBuilder();

    list.append("selectorThreadCount: .... ").append(ALL_SELECTORS.size()).append("\n");
    list.append("=================================================\n");
    list.append("getTotalConnections: .... ").append(stats.getTotalConnections()).append("\n");
    list.append("getCurrentConnections: .. ").append(stats.getCurrentConnections()).append("\n");
    list.append("getMaximumConnections: .. ").append(stats.getMaximumConnections()).append("\n");
    list.append("getIncomingBytesTotal: .. ").append(stats.getIncomingBytesTotal()).append("\n");
    list.append("getOutgoingBytesTotal: .. ").append(stats.getOutgoingBytesTotal()).append("\n");
    list.append("getIncomingPacketsTotal:  ").append(stats.getIncomingPacketsTotal()).append("\n");
    list.append("getOutgoingPacketsTotal:  ").append(stats.getOutgoingPacketsTotal()).append("\n");
    list.append("getMaxBytesPerRead: ..... ").append(stats.getMaxBytesPerRead()).append("\n");
    list.append("getMaxBytesPerWrite: .... ").append(stats.getMaxBytesPerWrite()).append("\n");
    list.append("=================================================\n");

    return list;
  }
}
TOP

Related Classes of l2p.commons.net.nio.impl.SelectorThread

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.