Package lineage2.commons.net.nio.impl

Source Code of lineage2.commons.net.nio.impl.SelectorThread

/*
* This program is free software: you can redistribute it and/or modify it under
* the terms of the GNU General Public License as published by the Free Software
* Foundation, either version 3 of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package lineage2.commons.net.nio.impl;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* @author Mobius
* @version $Revision: 1.0 $
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
public class SelectorThread<T extends MMOClient> extends Thread
{
  /**
   * Field _log.
   */
  private static final Logger _log = LoggerFactory.getLogger(SelectorThread.class);
 
  /**
   * Field _selector.
   */
  private final Selector _selector = Selector.open();
 
  // Implementations
  /**
   * Field _packetHandler.
   */
  private final IPacketHandler<T> _packetHandler;
  /**
   * Field _executor.
   */
  private final IMMOExecutor<T> _executor;
  /**
   * Field _clientFactory.
   */
  private final IClientFactory<T> _clientFactory;
  /**
   * Field _acceptFilter.
   */
  private IAcceptFilter _acceptFilter;
 
  /**
   * Field _shutdown.
   */
  private boolean _shutdown;
 
  // Configs
  /**
   * Field _sc.
   */
  private final SelectorConfig _sc;
  /**
   * Field HELPER_BUFFER_SIZE.
   */
  private final int HELPER_BUFFER_SIZE;
 
  // MAIN BUFFERS
  /**
   * Field DIRECT_WRITE_BUFFER.
   */
  private final ByteBuffer DIRECT_WRITE_BUFFER;
  /**
   * Field READ_BUFFER. Field WRITE_BUFFER.
   */
  private final ByteBuffer WRITE_BUFFER, READ_BUFFER;
  /**
   * Field WRITE_CLIENT.
   */
  private T WRITE_CLIENT;
 
  // ByteBuffers General Purpose Pool
  /**
   * Field _bufferPool.
   */
  private final Queue<ByteBuffer> _bufferPool;
  /**
   * Field _connections.
   */
  private final List<MMOConnection<T>> _connections;
 
  /**
   * Field ALL_SELECTORS.
   */
  private static final List<SelectorThread> ALL_SELECTORS = new ArrayList<>();
  /**
   * Field stats.
   */
  private static SelectorStats stats = new SelectorStats();
 
  /**
   * Constructor for SelectorThread.
   * @param sc SelectorConfig
   * @param packetHandler IPacketHandler<T>
   * @param executor IMMOExecutor<T>
   * @param clientFactory IClientFactory<T>
   * @param acceptFilter IAcceptFilter
   * @throws IOException
   */
  public SelectorThread(SelectorConfig sc, IPacketHandler<T> packetHandler, IMMOExecutor<T> executor, IClientFactory<T> clientFactory, IAcceptFilter acceptFilter) throws IOException
  {
    synchronized (ALL_SELECTORS)
    {
      ALL_SELECTORS.add(this);
    }
   
    _sc = sc;
    _acceptFilter = acceptFilter;
    _packetHandler = packetHandler;
    _clientFactory = clientFactory;
    _executor = executor;
   
    _bufferPool = new ArrayDeque<>(_sc.HELPER_BUFFER_COUNT);
    _connections = new CopyOnWriteArrayList<>();
   
    DIRECT_WRITE_BUFFER = ByteBuffer.wrap(new byte[_sc.WRITE_BUFFER_SIZE]).order(_sc.BYTE_ORDER);
    WRITE_BUFFER = ByteBuffer.wrap(new byte[_sc.WRITE_BUFFER_SIZE]).order(_sc.BYTE_ORDER);
    READ_BUFFER = ByteBuffer.wrap(new byte[_sc.READ_BUFFER_SIZE]).order(_sc.BYTE_ORDER);
    HELPER_BUFFER_SIZE = Math.max(_sc.READ_BUFFER_SIZE, _sc.WRITE_BUFFER_SIZE);
   
    for (int i = 0; i < _sc.HELPER_BUFFER_COUNT; i++)
    {
      _bufferPool.add(ByteBuffer.wrap(new byte[HELPER_BUFFER_SIZE]).order(_sc.BYTE_ORDER));
    }
  }
 
  /**
   * Method openServerSocket.
   * @param address InetAddress
   * @param tcpPort int
   * @throws IOException
   */
  public void openServerSocket(InetAddress address, int tcpPort) throws IOException
  {
    ServerSocketChannel selectable = ServerSocketChannel.open();
    selectable.configureBlocking(false);
   
    selectable.socket().bind(address == null ? new InetSocketAddress(tcpPort) : new InetSocketAddress(address, tcpPort));
    selectable.register(getSelector(), selectable.validOps());
    setName("SelectorThread:" + selectable.socket().getLocalPort());
  }
 
  /**
   * Method getPooledBuffer.
   * @return ByteBuffer
   */
  protected ByteBuffer getPooledBuffer()
  {
    if (_bufferPool.isEmpty())
    {
      return ByteBuffer.wrap(new byte[HELPER_BUFFER_SIZE]).order(_sc.BYTE_ORDER);
    }
    return _bufferPool.poll();
  }
 
  /**
   * Method recycleBuffer.
   * @param buf ByteBuffer
   */
  protected void recycleBuffer(ByteBuffer buf)
  {
    if (_bufferPool.size() < _sc.HELPER_BUFFER_COUNT)
    {
      buf.clear();
      _bufferPool.add(buf);
    }
  }
 
  /**
   * Method freeBuffer.
   * @param buf ByteBuffer
   * @param con MMOConnection<T>
   */
  protected void freeBuffer(ByteBuffer buf, MMOConnection<T> con)
  {
    if (buf == READ_BUFFER)
    {
      READ_BUFFER.clear();
    }
    else
    {
      con.setReadBuffer(null);
      recycleBuffer(buf);
    }
  }
 
  /**
   * Method run.
   * @see java.lang.Runnable#run()
   */
  @Override
  public void run()
  {
    int totalKeys = 0;
    Set<SelectionKey> keys = null;
    Iterator<SelectionKey> itr = null;
    Iterator<MMOConnection<T>> conItr = null;
    SelectionKey key = null;
    MMOConnection<T> con = null;
    long currentMillis = 0L;
   
    // main loop
    for (;;)
    {
      try
      {
       
        if (isShuttingDown())
        {
          closeSelectorThread();
          break;
        }
       
        currentMillis = System.currentTimeMillis();
       
        conItr = _connections.iterator();
        while (conItr.hasNext())
        {
          con = conItr.next();
          if (con.isPengingClose())
          {
            if (!con.isPendingWrite() || ((currentMillis - con.getPendingCloseTime()) >= 10000L))
            {
              closeConnectionImpl(con);
              continue;
            }
          }
          if (con.isPendingWrite())
          {
            if ((currentMillis - con.getPendingWriteTime()) >= _sc.INTEREST_DELAY)
            {
              con.enableWriteInterest();
            }
          }
        }
       
        totalKeys = getSelector().selectNow();
       
        if (totalKeys > 0)
        {
          keys = getSelector().selectedKeys();
          itr = keys.iterator();
         
          while (itr.hasNext())
          {
            key = itr.next();
            itr.remove();
           
            if (key.isValid())
            {
              try
              {
                if (key.isAcceptable())
                {
                  acceptConnection(key);
                  continue;
                }
                else if (key.isConnectable())
                {
                  finishConnection(key);
                  continue;
                }
               
                if (key.isReadable())
                {
                  readPacket(key);
                }
                if (key.isValid())
                {
                  if (key.isWritable())
                  {
                    writePacket(key);
                  }
                }
              }
              catch (CancelledKeyException cke)
              {
               
              }
            }
          }
        }
       
        try
        {
          Thread.sleep(_sc.SLEEP_TIME);
        }
        catch (InterruptedException ie)
        {
         
        }
      }
      catch (IOException e)
      {
        _log.error("Error in " + getName(), e);
       
        try
        {
          Thread.sleep(1000L);
        }
        catch (InterruptedException ie)
        {
         
        }
      }
    }
  }
 
  /**
   * Method finishConnection.
   * @param key SelectionKey
   */
  protected void finishConnection(SelectionKey key)
  {
    try
    {
      ((SocketChannel) key.channel()).finishConnect();
    }
    catch (IOException e)
    {
      MMOConnection<T> con = (MMOConnection<T>) key.attachment();
      T client = con.getClient();
      client.getConnection().onForcedDisconnection();
      closeConnectionImpl(client.getConnection());
    }
  }
 
  /**
   * Method acceptConnection.
   * @param key SelectionKey
   */
  protected void acceptConnection(SelectionKey key)
  {
    ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
    SocketChannel sc;
    SelectionKey clientKey;
    try
    {
      while ((sc = ssc.accept()) != null)
      {
        if ((getAcceptFilter() == null) || getAcceptFilter().accept(sc))
        {
          sc.configureBlocking(false);
          clientKey = sc.register(getSelector(), SelectionKey.OP_READ);
         
          MMOConnection<T> con = new MMOConnection<>(this, sc.socket(), clientKey);
          T client = getClientFactory().create(con);
          client.setConnection(con);
          con.setClient(client);
          clientKey.attach(con);
         
          _connections.add(con);
          stats.increaseOpenedConnections();
        }
        else
        {
          sc.close();
        }
      }
    }
    catch (IOException e)
    {
      _log.error("Error in " + getName(), e);
    }
  }
 
  /**
   * Method readPacket.
   * @param key SelectionKey
   */
  protected void readPacket(SelectionKey key)
  {
    MMOConnection<T> con = (MMOConnection<T>) key.attachment();
   
    if (con.isClosed())
    {
      return;
    }
   
    ByteBuffer buf;
    int result = -2;
   
    if ((buf = con.getReadBuffer()) == null)
    {
      buf = READ_BUFFER;
    }
   
    // if we try to to do a read with no space in the buffer it will read 0 bytes
    // going into infinite loop
    if (buf.position() == buf.limit())
    {
      _log.error("Read buffer exhausted for client : " + con.getClient() + ", try to adjust buffer size, current : " + buf.capacity() + ", primary : " + (buf == READ_BUFFER) + ". Closing connection.");
      closeConnectionImpl(con);
    }
    else
    {
     
      try
      {
        result = con.getReadableByteChannel().read(buf);
      }
      catch (IOException e)
      {
        // error handling goes bellow
      }
     
      if (result > 0)
      {
        buf.flip();
       
        stats.increaseIncomingBytes(result);
       
        @SuppressWarnings("unused")
        int i;
        for (i = 0; this.tryReadPacket2(key, con, buf); i++)
        {
        }
      }
      else if (result == 0)
      {
        closeConnectionImpl(con);
      }
      else if (result == -1)
      {
        closeConnectionImpl(con);
      }
      else
      {
        con.onForcedDisconnection();
        closeConnectionImpl(con);
      }
    }
   
    if (buf == READ_BUFFER)
    {
      buf.clear();
    }
  }
 
  /**
   * Method tryReadPacket2.
   * @param key SelectionKey
   * @param con MMOConnection<T>
   * @param buf ByteBuffer
   * @return boolean
   */
  protected boolean tryReadPacket2(SelectionKey key, MMOConnection<T> con, ByteBuffer buf)
  {
    if (con.isClosed())
    {
      return false;
    }
   
    int pos = buf.position();
    if (buf.remaining() > _sc.HEADER_SIZE)
    {
      int size = buf.getShort() & 0xffff;
     
      if ((size <= _sc.HEADER_SIZE) || (size > _sc.PACKET_SIZE))
      {
        _log.error("Incorrect packet size : " + size + "! Client : " + con.getClient() + ". Closing connection.");
        closeConnectionImpl(con);
        return false;
      }
     
      size -= _sc.HEADER_SIZE;
     
      if (size <= buf.remaining())
      {
        stats.increaseIncomingPacketsCount();
        parseClientPacket(getPacketHandler(), buf, size, con);
        buf.position(pos + size + _sc.HEADER_SIZE);
       
        if (!buf.hasRemaining())
        {
          freeBuffer(buf, con);
          return false;
        }
       
        return true;
      }
     
      buf.position(pos);
    }
   
    if (pos == buf.capacity())
    {
      _log.warn("Read buffer exhausted for client : " + con.getClient() + ", try to adjust buffer size, current : " + buf.capacity() + ", primary : " + (buf == READ_BUFFER) + ".");
    }
   
    if (buf == READ_BUFFER)
    {
      allocateReadBuffer(con);
    }
    else
    {
      buf.compact();
    }
   
    return false;
  }
 
  /**
   * Method allocateReadBuffer.
   * @param con MMOConnection<T>
   */
  protected void allocateReadBuffer(MMOConnection<T> con)
  {
    con.setReadBuffer(getPooledBuffer().put(READ_BUFFER));
    READ_BUFFER.clear();
  }
 
  /**
   * Method parseClientPacket.
   * @param handler IPacketHandler<T>
   * @param buf ByteBuffer
   * @param dataSize int
   * @param con MMOConnection<T>
   * @return boolean
   */
  protected boolean parseClientPacket(IPacketHandler<T> handler, ByteBuffer buf, int dataSize, MMOConnection<T> con)
  {
    T client = con.getClient();
   
    int pos = buf.position();
   
    client.decrypt(buf, dataSize);
    buf.position(pos);
   
    if (buf.hasRemaining())
    {
      // apply limit
      int limit = buf.limit();
      buf.limit(pos + dataSize);
      ReceivablePacket<T> rp = handler.handlePacket(buf, client);
     
      if (rp != null)
      {
        rp.setByteBuffer(buf);
        rp.setClient(client);
       
        if (rp.read())
        {
          con.recvPacket(rp);
        }
       
        rp.setByteBuffer(null);
      }
      buf.limit(limit);
    }
    return true;
  }
 
  /**
   * Method writePacket.
   * @param key SelectionKey
   */
  protected void writePacket(SelectionKey key)
  {
    MMOConnection<T> con = (MMOConnection<T>) key.attachment();
   
    prepareWriteBuffer(con);
   
    DIRECT_WRITE_BUFFER.flip();
    int size = DIRECT_WRITE_BUFFER.remaining();
   
    int result = -1;
   
    try
    {
      result = con.getWritableChannel().write(DIRECT_WRITE_BUFFER);
    }
    catch (IOException e)
    {
      // error handling goes on the if bellow
    }
   
    // check if no error happened
    if (result >= 0)
    {
      stats.increaseOutgoingBytes(result);
     
      // check if we written everything
      if (result != size)
      {
        con.createWriteBuffer(DIRECT_WRITE_BUFFER);
      }
     
      if (!con.getSendQueue().isEmpty() || con.hasPendingWriteBuffer())
      {
        con.scheduleWriteInterest();
      }
    }
    else
    {
      con.onForcedDisconnection();
      closeConnectionImpl(con);
    }
  }
 
  /**
   * Method getWriteClient.
   * @return T
   */
  protected T getWriteClient()
  {
    return WRITE_CLIENT;
  }
 
  /**
   * Method getWriteBuffer.
   * @return ByteBuffer
   */
  protected ByteBuffer getWriteBuffer()
  {
    return WRITE_BUFFER;
  }
 
  /**
   * Method prepareWriteBuffer.
   * @param con MMOConnection<T>
   */
  protected void prepareWriteBuffer(MMOConnection<T> con)
  {
    WRITE_CLIENT = con.getClient();
    DIRECT_WRITE_BUFFER.clear();
   
    if (con.hasPendingWriteBuffer())
    {
      con.movePendingWriteBufferTo(DIRECT_WRITE_BUFFER);
    }
   
    if (DIRECT_WRITE_BUFFER.hasRemaining() && !con.hasPendingWriteBuffer())
    {
      int i;
      Queue<SendablePacket<T>> sendQueue = con.getSendQueue();
      SendablePacket<T> sp;
     
      for (i = 0; i < _sc.MAX_SEND_PER_PASS; i++)
      {
        synchronized (con)
        {
          if ((sp = sendQueue.poll()) == null)
          {
            break;
          }
        }
       
        try
        {
          stats.increaseOutgoingPacketsCount();
          putPacketIntoWriteBuffer(sp, true);
          WRITE_BUFFER.flip();
          if (DIRECT_WRITE_BUFFER.remaining() >= WRITE_BUFFER.limit())
          {
            DIRECT_WRITE_BUFFER.put(WRITE_BUFFER);
          }
          else
          {
            con.createWriteBuffer(WRITE_BUFFER);
            break;
          }
        }
        catch (Exception e)
        {
          _log.error("Error in " + getName(), e);
          break;
        }
      }
    }
   
    WRITE_BUFFER.clear();
    WRITE_CLIENT = null;
  }
 
  /**
   * Method putPacketIntoWriteBuffer.
   * @param sp SendablePacket<T>
   * @param encrypt boolean
   */
  protected final void putPacketIntoWriteBuffer(SendablePacket<T> sp, boolean encrypt)
  {
    WRITE_BUFFER.clear();
   
    // reserve space for the size
    int headerPos = WRITE_BUFFER.position();
    WRITE_BUFFER.position(headerPos + _sc.HEADER_SIZE);
   
    // write content to buffer
    sp.write();
   
    // size (incl header)
    int dataSize = WRITE_BUFFER.position() - headerPos - _sc.HEADER_SIZE;
    if (dataSize == 0)
    {
      WRITE_BUFFER.position(headerPos);
      return;
    }
    WRITE_BUFFER.position(headerPos + _sc.HEADER_SIZE);
    if (encrypt)
    {
      WRITE_CLIENT.encrypt(WRITE_BUFFER, dataSize);
      // recalculate size after encryption
      dataSize = WRITE_BUFFER.position() - headerPos - _sc.HEADER_SIZE;
    }
   
    // prepend header
    WRITE_BUFFER.position(headerPos);
    WRITE_BUFFER.putShort((short) (_sc.HEADER_SIZE + dataSize));
    WRITE_BUFFER.position(headerPos + _sc.HEADER_SIZE + dataSize);
  }
 
  /**
   * Method getConfig.
   * @return SelectorConfig
   */
  protected SelectorConfig getConfig()
  {
    return _sc;
  }
 
  /**
   * Method getSelector.
   * @return Selector
   */
  protected Selector getSelector()
  {
    return _selector;
  }
 
  /**
   * Method getExecutor.
   * @return IMMOExecutor<T>
   */
  protected IMMOExecutor<T> getExecutor()
  {
    return _executor;
  }
 
  /**
   * Method getPacketHandler.
   * @return IPacketHandler<T>
   */
  protected IPacketHandler<T> getPacketHandler()
  {
    return _packetHandler;
  }
 
  /**
   * Method getClientFactory.
   * @return IClientFactory<T>
   */
  protected IClientFactory<T> getClientFactory()
  {
    return _clientFactory;
  }
 
  /**
   * Method setAcceptFilter.
   * @param acceptFilter IAcceptFilter
   */
  public void setAcceptFilter(IAcceptFilter acceptFilter)
  {
    _acceptFilter = acceptFilter;
  }
 
  /**
   * Method getAcceptFilter.
   * @return IAcceptFilter
   */
  protected IAcceptFilter getAcceptFilter()
  {
    return _acceptFilter;
  }
 
  /**
   * Method closeConnectionImpl.
   * @param con MMOConnection<T>
   */
  protected void closeConnectionImpl(MMOConnection<T> con)
  {
    try
    {
      // notify connection
      con.onDisconnection();
    }
    finally
    {
      try
      {
        // close socket and the SocketChannel
        con.close();
      }
      catch (IOException e)
      {
        // ignore, we are closing anyway
      }
      finally
      {
        con.releaseBuffers();
        con.clearQueues();
        con.getClient().setConnection(null);
        con.getSelectionKey().attach(null);
        con.getSelectionKey().cancel();
       
        _connections.remove(con);
       
        stats.decreseOpenedConnections();
      }
    }
  }
 
  /**
   * Method shutdown.
   */
  public void shutdown()
  {
    _shutdown = true;
  }
 
  /**
   * Method isShuttingDown.
   * @return boolean
   */
  public boolean isShuttingDown()
  {
    return _shutdown;
  }
 
  /**
   * Method closeAllChannels.
   */
  protected void closeAllChannels()
  {
    Set<SelectionKey> keys = getSelector().keys();
    for (SelectionKey key : keys)
    {
      try
      {
        key.channel().close();
      }
      catch (IOException e)
      {
        // ignore
      }
    }
  }
 
  /**
   * Method closeSelectorThread.
   */
  protected void closeSelectorThread()
  {
    closeAllChannels();
   
    try
    {
      getSelector().close();
    }
    catch (IOException e)
    {
      // Ignore
    }
  }
 
  /**
   * Method getStats.
   * @return CharSequence
   */
  public static CharSequence getStats()
  {
    StringBuilder list = new StringBuilder();
   
    list.append("selectorThreadCount: .... ").append(ALL_SELECTORS.size()).append('\n');
    list.append("=================================================\n");
    list.append("getTotalConnections: .... ").append(stats.getTotalConnections()).append('\n');
    list.append("getCurrentConnections: .. ").append(stats.getCurrentConnections()).append('\n');
    list.append("getMaximumConnections: .. ").append(stats.getMaximumConnections()).append('\n');
    list.append("getIncomingBytesTotal: .. ").append(stats.getIncomingBytesTotal()).append('\n');
    list.append("getOutgoingBytesTotal: .. ").append(stats.getOutgoingBytesTotal()).append('\n');
    list.append("getIncomingPacketsTotal:  ").append(stats.getIncomingPacketsTotal()).append('\n');
    list.append("getOutgoingPacketsTotal:  ").append(stats.getOutgoingPacketsTotal()).append('\n');
    list.append("getMaxBytesPerRead: ..... ").append(stats.getMaxBytesPerRead()).append('\n');
    list.append("getMaxBytesPerWrite: .... ").append(stats.getMaxBytesPerWrite()).append('\n');
    list.append("=================================================\n");
   
    return list;
  }
}
TOP

Related Classes of lineage2.commons.net.nio.impl.SelectorThread

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.