Package net.sf.fmj.media.codec

Source Code of net.sf.fmj.media.codec.InputStreamReader$ReaderThread

package net.sf.fmj.media.codec;

import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;

import javax.media.Buffer;

import com.lti.utils.synchronization.CloseableThread;
import com.lti.utils.synchronization.ProducerConsumerQueue;

/**
* Class to wrap an InputStream and do reads in a background thread, so that read never blocks.
* Used for badly behaving input streams where available() is not working right or useless.
* @author Ken Larson
*
*/
public class InputStreamReader extends InputStream
{
  private final InputStream is;
  private final int bufferSize;
  private ReaderThread readerThread;
 
 
  private final ProducerConsumerQueue emptyQueue = new ProducerConsumerQueue();
  private final ProducerConsumerQueue fullQueue = new ProducerConsumerQueue();
 
 

  public InputStreamReader(final InputStream is, final int bufferSize)
  {
    super();
    this.is = is;
    this.bufferSize = bufferSize;
   
    for (int i = 0; i < 2; ++i)
    {
      final Buffer b = new Buffer();
      b.setData(new byte[bufferSize]);
      try
      {
        emptyQueue.put(b);
      } catch (InterruptedException e)
      {
        throw new RuntimeException(e); // should never happen
      }
    }

    readerThread = new ReaderThread(emptyQueue, fullQueue, is, bufferSize);
    readerThread.setName("ReaderThread for " + is);
    readerThread.setDaemon(true);
   
   
  }
 
  private boolean readerThreadStarted;
 
  public void startReaderThread()
  {
    if (!readerThreadStarted)
    {
      readerThread.start();
      readerThreadStarted = true;
    }
  }
 

  @Override
  public void close() throws IOException
  {
    super.close();
   
    if (readerThread != null)
    {  readerThread.close();
      readerThread = null;
    }
  }


  @Override
  public int available() throws IOException
  {
   
   
    if (readException != null)
      throw readException;
    if (readBuffer != null && readBuffer.getLength() > 0)
      return readBuffer.getLength();
    else
      return 0;
  }

  @Override
  public int read() throws IOException
  {
    byte[] ba = new byte[1];
    int result = read(ba, 0, 1);
    if (result == -1)
      return -1;
    return ba[0] & 0xff;

  }

  private Buffer readBuffer;
  private IOException readException;
 
  @Override
  public int read(byte[] b, int off, int len) throws IOException
  {
    try
    {
      if (readBuffer == null && readException == null)
      {  Object o = fullQueue.get();
        if (o instanceof IOException)
          readException = (IOException) o;
        else
          readBuffer = (Buffer) o;
      }
     
      if (readException != null)
        throw readException;
     
      if (readBuffer.isEOM())
        return -1;
      final byte[] readBufferData = (byte []) readBuffer.getData();
     
      final int lenToCopy = readBuffer.getLength() < len? readBuffer.getLength() : len;
      System.arraycopy(readBufferData, readBuffer.getOffset(), b, off, lenToCopy);
      readBuffer.setOffset(readBuffer.getOffset() + lenToCopy);
      readBuffer.setLength(readBuffer.getLength() - lenToCopy);
      if (readBuffer.getLength() == 0)
      {  emptyQueue.put(readBuffer);
        readBuffer = null;
      }
      return lenToCopy;
    }
    catch (InterruptedException e)
    {  throw new InterruptedIOException();
    }
  }
 

 
  private static class ReaderThread extends CloseableThread
  {
    private final ProducerConsumerQueue emptyQueue;
    private final ProducerConsumerQueue fullQueue;
    private final InputStream is;
    private final int bufferSize;
   
   
    public ReaderThread(final ProducerConsumerQueue emptyQueue, final ProducerConsumerQueue fullQueue, final InputStream is, final int bufferSize)
    {
      super();
      this.emptyQueue = emptyQueue;
      this.fullQueue = fullQueue;
      this.is = is;
      this.bufferSize = bufferSize;
    }


    @Override
    public void run()
    {
      try
      {
        while (!isClosing())
        {
          Buffer b = (Buffer) emptyQueue.get();
          b.setEOM(false);
          b.setLength(0);
          b.setOffset(0);
         
          int len = is.read((byte []) b.getData(), 0, bufferSize);
          if (len < 0)
            b.setEOM(true);
          else
            b.setLength(len);
          fullQueue.put(b);
         
         
        }
      }
      catch (InterruptedException e)
      {
      }
      catch (IOException e)
      {
        try
        {
          fullQueue.put(e);
        } catch (InterruptedException e1)
        { 
        }
      }
      finally
      setClosed();
      }
    }
  }
 
 
}
TOP

Related Classes of net.sf.fmj.media.codec.InputStreamReader$ReaderThread

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.