Package it.unimi.dsi.mg4j.index.remote

Source Code of it.unimi.dsi.mg4j.index.remote.RemoteIndexReader$RemoteIndexReaderIndexIterator$RemoteIndexIntervalIterator

package it.unimi.dsi.mg4j.index.remote;

/*    
* MG4J: Managing Gigabytes for Java
*
* Copyright (C) 2006-2010 Sebastiano Vigna
*
*  This library is free software; you can redistribute it and/or modify it
*  under the terms of the GNU Lesser General Public License as published by the Free
*  Software Foundation; either version 3 of the License, or (at your option)
*  any later version.
*
*  This library is distributed in the hope that it will be useful, but
*  WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
*  or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License
*  for more details.
*
*  You should have received a copy of the GNU Lesser General Public License
*  along with this program; if not, see <http://www.gnu.org/licenses/>.
*
*/


import it.unimi.dsi.fastutil.ints.IntIterator;
import it.unimi.dsi.fastutil.ints.IntIterators;
import it.unimi.dsi.fastutil.ints.IntSet;
import it.unimi.dsi.fastutil.objects.AbstractObjectIterator;
import it.unimi.dsi.fastutil.objects.Reference2ReferenceMap;
import it.unimi.dsi.fastutil.objects.Reference2ReferenceMaps;
import it.unimi.dsi.fastutil.objects.ReferenceSet;
import it.unimi.dsi.mg4j.index.AbstractIndexIterator;
import it.unimi.dsi.mg4j.index.AbstractIndexReader;
import it.unimi.dsi.mg4j.index.Index;
import it.unimi.dsi.mg4j.index.IndexIterator;
import it.unimi.dsi.mg4j.index.IndexReader;
import it.unimi.dsi.mg4j.index.payload.Payload;
import it.unimi.dsi.mg4j.search.IntervalIterator;
import it.unimi.dsi.mg4j.search.IntervalIterators;
import it.unimi.dsi.io.InputBitStream;
import it.unimi.dsi.io.OutputBitStream;
import it.unimi.dsi.util.Interval;
import it.unimi.dsi.Util;
import it.unimi.dsi.lang.MutableString;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.util.NoSuchElementException;

import org.apache.log4j.Logger;


/** An index reader for {@link it.unimi.dsi.mg4j.index.remote.RemoteIndex}.
*
* @author Sebastiano Vigna
* @author Alessandro Arrabito
*/

public class RemoteIndexReader extends AbstractIndexReader {
  final private static Logger LOGGER = Util.getLogger( RemoteIndexReader.class );
   
  private static final boolean ASSERTS = false;

  private static final byte DOCUMENTS_BY_NAME = 0;
  private static final byte DOCUMENTS_BY_INDEX = 1;
  private static final byte PREFETCH = 2;
  private static final byte CLOSE = 3;
  private static final byte DISPOSE = 4;
  private static final byte SKIP_TO = 5;
  private static final byte SKIP = 6;
     
  /** The index we refer to. */
  protected final Index index;
  /** The remote server connection used to call the index server.*/
  protected final RemoteIndexServerConnection connection;
  /** The index iterator associated to this reader.*/
  protected final RemoteIndexReaderIndexIterator remoteIndexIterator;
  /** The input stream in {@link #connection}, cached. */
  protected final DataInputStream inputStream;
  /** The output stream in {@link #connection}, cached. */
  protected final DataOutputStream outputStream;
 
  public RemoteIndexReader( final RemoteIndex index, final int bufferSize ) throws IOException {   
    this.index = index;
    connection = new RemoteIndexServerConnection( index.socketAddress, IndexServer.GET_INDEX_READER );
    inputStream = connection.inputStream;
    outputStream = connection.outputStream;
    remoteIndexIterator = new RemoteIndexReaderIndexIterator( bufferSize );
  }
 
  public void close() throws IOException, IllegalStateException {
    outputStream.writeByte( RemoteIndexReader.CLOSE );
    outputStream.flush();
    try {
      connection.close();
    }
    catch( IOException dontCare ) {
      // Whatever may happen, we're so outta here...
    }
  }

  protected void finalize() throws Throwable {
    try {
      if ( ! connection.socket.isClosed() ) {
        LOGGER.warn( "This " + this.getClass().getName() + " [" + toString() + "] should have been closed." );
        close();
      }
    }
    finally {
      super.finalize();
    }
  }
 
  public IndexIterator documents( final int termNumber ) throws IOException {
    remoteIndexIterator.flush();
    outputStream.writeByte( RemoteIndexReader.DOCUMENTS_BY_INDEX );
    outputStream.writeInt( termNumber );
    outputStream.flush();
    remoteIndexIterator.term( null );
    // Read frequency
    remoteIndexIterator.reset( inputStream.readInt() );
    remoteIndexIterator.prefetchDocs( false );
    return remoteIndexIterator;
  }

  public IndexIterator documents( final CharSequence term ) throws IOException {
    remoteIndexIterator.flush();
    outputStream.writeByte( RemoteIndexReader.DOCUMENTS_BY_NAME );
    new MutableString( term ).writeSelfDelimUTF8( (OutputStream)outputStream );
    outputStream.flush();
    remoteIndexIterator.term( term );
    // Read frequency
    remoteIndexIterator.reset( inputStream.readInt() );
    remoteIndexIterator.prefetchDocs( false );
    return remoteIndexIterator;
  }


  /** An index iterator based on a remote index reader.
   *
   * <p>Each remote index reader creates exactly one instance of this class. The instance
   * is reused upon calls to {@link IndexReader#documents(int)}.
   *
   * <p>The internal state is unfortunately quite complicated by the necessity of grabbing data
   * from the socket as lazily as possible.
   *
   * <p>Basically, an instance of this class can be in one of three states:
   * <ul>
   * <li>If {@link #exhausted} is true, then there are no more items to be returned
   * <em>and</em> there is no more data coming from the socket. If you need to force
   * this state, you can call {@link #flush()}, which discards the remaining data
   * coming from the socket and set {@link #exhausted} (this is necessary, for instance,
   * each time you reuse the iterator).
   * <li>Otherwise, if {@link #last} is -1 the iterator is brand new, {@link #next} is -1,
   * too, and the socket input stream has been filled but never read.
   * <li>Otherwise, if {@link #next} is greater than or equal to zero, then {@link #next}
   * is the next document pointer to be returned, and it has been just read from the
   * socket input stream (for instance, if there are counts the socket input stream is
   * positioned just before the count).
   * <li>Finally, if {@link #next} is -1 then {@link #last} is the last document pointer
   * returned, and the socket input stream is positioned exactly before the next document
   * pointer to be returned, or over the end-of-block marker.
   * </ul>
   */
 
  private class RemoteIndexReaderIndexIterator extends AbstractIndexIterator implements IndexIterator {
    /** The number of byte requested with a single request to an index server. */
    private final int bufferSize;
    /** The next document pointer to be returned, or -1 if the iterator has to be advanced. */
    private int next;
    /** The last document pointer returned. */
    private int last;
    /** The frequency of the current term. */
    private int frequency;
    /** Whether this iterator has been exhausted. */
    private boolean exhausted;
    /** The current payload, or <code>null</code>. */
    protected Payload payload;
    /** The current count. */
    protected int count;
    /** The current positions. */
    protected final int[] position;
   
    public RemoteIndexReaderIndexIterator ( final int bufferSize ) {
      this.bufferSize = bufferSize;
      this.position = new int[ index.maxCount ];
      this.exhausted = true; // To avoid flushing the first time
    }

    public Index index() {
      return keyIndex;
    }
   
   
    public void flush() throws IOException {
      if ( ! exhausted ) {
        while( inputStream.readInt() >= 0 );
        inputStream.readBoolean();
        if ( ASSERTS ) assert inputStream.available() == 0;
        exhausted = true;
      }
    }
   
    public void reset( final int frequency ) {
      this.frequency = frequency;
      exhausted = false;
      next = last = -1;
    }
   
    /** Prefetches a batch of document data from the server.
     * @param alreadyOnFirst will be passed as an argument to the remote call.
     */
    public void prefetchDocs( final boolean alreadyOnFirst ) throws IOException {
      outputStream.writeByte( RemoteIndexReader.PREFETCH );
      outputStream.writeBoolean( alreadyOnFirst );
      outputStream.writeInt( bufferSize );
      outputStream.flush();
    }

    /** Tries to advance the remote iterator.
     *
     * <p>After a call to this method returning -1, the prefetched data is exhausted
     * and {@link #exhausted} is true. Otherwise, the input stream of the connection is
     * positioned just before counts and position of the returned document pointer.
     *
     * @return -1 if there are no more elements, the next pointer otherwise. The value returned
     * in stored in {@link #next}.
     */
   
    private int advance() {
      if ( next >= 0 ) return next;
      try {
        next = inputStream.readInt();
        if ( next < 0 ) {
          if ( inputStream.readBoolean() ) prefetchDocs( false );
          else {
            exhausted = true;
            return -1;
          }
          next = inputStream.readInt();
          if ( ASSERTS ) assert next >= 0;
        }
        return next;
      }
      catch ( Exception e ) {
        throw new RuntimeException( e );
      }
    }
   
    public int document() {
      if ( last < 0 ) throw new IllegalStateException();
      return last;
    }
   
    public int skipTo( final int p ) {
      try {
        if ( p <= last ) return last;
        if ( exhausted ) return Integer.MAX_VALUE;
        // First we check whether we can skip inside the local buffer.
        if ( next < 0 ) next = inputStream.readInt();
        while( next >= 0 && next < p ) {
          if ( index.hasCounts ) {
            count = inputStream.readInt();
            if ( index.hasPositions ) for ( int i = 0; i < count; i++ ) inputStream.readInt();
          }
          next = inputStream.readInt();
        }

        //System.err.println( "Out of loop: " + next );
        if ( next >= 0 ) return nextInt();
        if ( exhausted = ! inputStream.readBoolean() ) return Integer.MAX_VALUE;

        if ( ASSERTS ) assert inputStream.available() == 0;
       
        int result;
        outputStream.writeByte( RemoteIndexReader.SKIP_TO );
        outputStream.writeInt( p );
        outputStream.flush();
        result = inputStream.readInt();
        //System.err.println( "Skip to " + p + " result: " + result );
        if ( result == Integer.MAX_VALUE ) {
          exhausted = true;
          return Integer.MAX_VALUE;
        }
       
        prefetchDocs( true );
        //System.err.println( "Prefetch completed" );
        next = result;
        return nextInt();
      }
      catch ( Exception e ) {
        throw new RuntimeException( e );
      }
     
    }

    public boolean hasNext() {
      if ( exhausted ) return false;
      if ( next >= 0 ) return true;
      next = advance();
      return ! exhausted;
    }

    public int nextInt() {
      if ( ! hasNext() ) throw new NoSuchElementException();

      last = next;
      next = -1;

      try {
        // TODO: this is *very roughf* and preliminary
        if ( index.hasPayloads ) payload.read( new InputBitStream( inputStream, 0 ) );
        if ( index.hasCounts ) {
          count = inputStream.readInt();
          if ( index.hasPositions ) {
            for ( int i = 0; i < count; i++ ) position[ i ] = inputStream.readInt();
            intervalIterator.reset();
          }
        }
      }
      catch ( IOException e ) {
        throw new RuntimeException( e );
      }

      return last;
    }

    public void dispose() throws IOException {
      close();
    }
   
    // TODO: implement efficiently
    public int nextDocument() { 
      return hasNext() ? nextInt() : -1;   
    }

    private final RemoteIndexIntervalIterator intervalIterator = index.hasPositions? new RemoteIndexIntervalIterator() : null;
    private final Index keyIndex = RemoteIndexReader.this.index.keyIndex;
    private final Reference2ReferenceMap<Index,IntervalIterator> singletonIntervalIterator = Reference2ReferenceMaps.singleton( keyIndex, (IntervalIterator)intervalIterator );
   
    private class RemoteIndexIntervalIterator extends AbstractObjectIterator<Interval> implements IntervalIterator {
      private int pos;
      public Interval next() {     
        if ( ! hasNext() ) throw new NoSuchElementException();
        return Interval.valueOf( position[ pos++ ] );     
      }
      public void intervalTerms( final IntSet terms ) {
        throw new UnsupportedOperationException();
      }
      public Interval nextInterval() {     
        return pos < count ? Interval.valueOf( position[ pos++ ] ) : null;
      }
      public void reset() { pos = 0; }
      public int extent() { return 1; }
      public boolean hasNext() { return pos < count; }
    }
   
    public int frequency() {   
      return frequency;
    }

    public Payload payload() {
      return payload;
    }
   
    public int count() {
      return count;
    }
   
    public IntIterator positions() {       
      return IntIterators.wrap( position );
    }
   
    public int positions( final int[] p ) {
      System.arraycopy( position, 0, p, 0, Math.min( count, p.length ) );
      return p.length < position.length ? -count : count;
    }
   
    public int[] positionArray() {
      return position;
    }
   
    public ReferenceSet<Index> indices() {
      return index.singletonSet;
    }
 
    public IntervalIterator intervalIterator( final Index index ) {
      if ( index != keyIndex || ! index.hasPositions ) return IntervalIterators.TRUE;
      if ( ASSERTS ) assert intervalIterator != null;
      if ( ASSERTS ) assert count > 0;
      return count > 0 ? intervalIterator : IntervalIterators.FALSE;
    }
   
    public IntervalIterator intervalIterator() {
      return intervalIterator( keyIndex );
    }

    public Reference2ReferenceMap<Index,IntervalIterator> intervalIterators() {
      return singletonIntervalIterator;
    }

    public int termNumber() {
      throw new UnsupportedOperationException();
    }
  }
 
 
  public static class ServerThread extends it.unimi.dsi.mg4j.index.remote.ServerThread {
    @SuppressWarnings("hiding")
    private final static Logger LOGGER = Util.getLogger( ServerThread.class );
    private final static boolean DEBUG = false;
   
    /** The index we refer to. */
    private final Index index;
    /** The remoted index reader. */
    private final IndexReader indexReader;
    /** The current index iterator . */
    private IndexIterator indexIterator;
   
    public ServerThread( final Socket socket, final Index index ) throws IOException {
      super( socket );
      this.index = index;
      this.indexReader = index.getReader();
    }
   
    public void run() {
      try {
        int command;
        for(;;) {
          try {
            command = inputStream.readByte();
          }
          catch ( IOException e ) {
            LOGGER.warn( "Socket has been probably closed", e );
            return;
          }

          if ( DEBUG ) LOGGER.debug( "Received remote command: " + command );
         
          switch ( command ) {
         
          case RemoteIndexReader.CLOSE:
            indexReader.close();
            // We don't close the socket--the caller should
            return;
           
          case RemoteIndexReader.DOCUMENTS_BY_INDEX:
            indexIterator = indexReader.documents( inputStream.readInt() );
            outputStream.writeInt( indexIterator.frequency() );
            outputStream.flush();
            break;
           
          case RemoteIndexReader.DOCUMENTS_BY_NAME:
            indexIterator = indexReader.documents( new MutableString().readSelfDelimUTF8( (InputStream)inputStream ) );
            outputStream.writeInt( indexIterator.frequency() );
            outputStream.flush();
            break;
           
          case RemoteIndexReader.SKIP_TO:
            outputStream.writeInt( indexIterator.skipTo( inputStream.readInt() ) );
            outputStream.flush();
            break;
           
          case RemoteIndexReader.SKIP:
            outputStream.writeInt( indexIterator.skip( inputStream.readInt() ) );
            outputStream.flush();
            break;
           
          case RemoteIndexReader.PREFETCH:                 
            /* When alreadyOfFirst is true, the caller does not really want
             * to get the first document pointer, as it has it already
             * got somehow (e.g., by skipping). */
            boolean alreadyOnFirst = inputStream.readBoolean();
            int count, bufSize = inputStream.readInt();
            int[] position;
           
            for ( int i = 0; ( indexIterator.hasNext() || alreadyOnFirst && i == 0 ) && bufSize > 0; i++ ) {
              if ( i > 0 || ! alreadyOnFirst ) {
                outputStream.writeInt( indexIterator.nextDocument() );
                bufSize--;
              }
              if ( index.hasPayloads ) {
                // TODO: this is *very rough* & preliminary
                OutputBitStream obs = new OutputBitStream( outputStream );
                index.payload.write( obs );
                obs.flush();
              }
              if ( index.hasCounts ) {
                outputStream.writeInt( count = indexIterator.count() );
                bufSize--;
                if ( index.hasPositions ) {
                  position = indexIterator.positionArray();
                  for( int p = 0; p < count; p++ ) outputStream.writeInt( position[ p ] );
                  bufSize -= count;
                }
              }
            }
            outputStream.writeInt( -1 ); // End marker
            outputStream.writeBoolean( indexIterator.hasNext() ); // A peek farther.
            outputStream.flush();
            //System.err.println( "Prefetch completed" );
            break;
           
          case RemoteIndexReader.DISPOSE:
            indexIterator.dispose();
            // We don't close the socket--the caller should 
            return;
           
          default:
            LOGGER.error( "Unknown remote command: " + command );
          }
        }
      }
      catch ( EOFException e ) {
        LOGGER.warn( "The socket has been closed" );
      }
      catch ( Exception e ) {
        LOGGER.fatal( e, e );
      }
    }
  }
}
TOP

Related Classes of it.unimi.dsi.mg4j.index.remote.RemoteIndexReader$RemoteIndexReaderIndexIterator$RemoteIndexIntervalIterator

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.
OP">TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.