/**
*
* Copyright 2004 Protique Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
**/
package org.codehaus.activemq.streams;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import org.codehaus.activemq.message.ActiveMQMessage;
import org.codehaus.activemq.message.util.ByteArray;
/**
* An inputStream that reads data from a MessageConsumer
*
* @version $Revision: 1.1 $
*/
public class JMSInputStream extends InputStream {
private static final int ARRAY_SIZE = 10;
private boolean closed;
protected ByteArray[] arrays = new ByteArray[ARRAY_SIZE];
private int offset;
private int current = 0;
protected int clen = 0;
private int markArray = -1;
private int markOffset = -1;
private MessageConsumer consumer;
/**
* Construct an input stream to read from a JMS Consumer
*
* @param consumer
*/
public JMSInputStream(MessageConsumer consumer) {
this.consumer = consumer;
}
/**
* Read the next byte from this stream.
*
* @return the next byte
* @throws IOException
*/
public int read() throws IOException {
if (closed)
throw new EOFException("JMSInputStream is closed");
if (current == clen) {
fillBuffer(1);
}
int c = (arrays[current].get(offset) & 0xff);
offset++;
if (offset == arrays[current].getLength()) {
offset = 0;
releaseBuffer(current);
current++;
}
return c;
}
/**
* Read data from this input stream into the given byte array starting at offset 0 for b.length bytes. Returns the
* actual number of bytes read;
*
* @param b
* @return the number of bytes read
* @throws IOException
*/
public int read(byte b[]) throws IOException {
return read(b, 0, b.length);
}
/**
* Read data from this input stream into the given byte array starting at offset 'off' for 'len' bytes. Returns the
* actual number of bytes read.
*
* @param b buffer to read data into
* @param off offset into b
* @param len the maximum length
* @return the number of bytes actually read
* @throws IOException
*/
public int read(byte b[], int off, int len) throws IOException {
if (closed)
throw new EOFException("JMSInputStream is closed");
int n = off;
int total = 0;
int last = Math.min(off + len, b.length);
if (current == clen) {
fillBuffer(len);
}
while ((current < clen) && (n < last)) {
int num_left = arrays[current].getLength() - offset;
int tocopy = Math.min(num_left, last - n);
System.arraycopy(arrays[current].getBuf(), offset, b, n, tocopy);
total += tocopy;
n += tocopy;
offset += tocopy;
if (offset == arrays[current].getLength()) {
offset = 0;
releaseBuffer(current);
current++;
}
}
return total;
}
/**
* Skip n bytes in this stream; returns the number of bytes actually skipped (which may be less than the number
* requested).
*
* @param length the number of bytes to skip
* @return the number of bytes actually skipped
* @throws IOException
*/
public long skip(long length) throws IOException {
if (closed)
throw new EOFException("JMSInputStream is closed!");
int requested = Math.min((int) length, Integer.MAX_VALUE);
int totalskipped = 0;
while ((current < clen) && (arrays[current] != null) && (requested > 0)) {
if (current == clen) {
break;
}
int num_left = arrays[current].getLength() - offset;
if (num_left < requested) {
requested -= num_left;
totalskipped += num_left;
releaseBuffer(current);
current++;
offset = 0;
}
else {
totalskipped += requested;
offset += requested;
requested = 0;
}
}
return totalskipped;
}
/**
* Return the number of bytes available for reading.
*
* @return the number of bytes available
* @throws IOException
*/
public int available() throws IOException {
if (closed)
throw new EOFException("JMSInputStream is closed!");
fillBuffer(0);
if (current == clen)
return 0;
int num_left = arrays[current].getLength() - offset;
for (int i = current + 1;i < clen;i++) {
if (arrays[i] == null)
break;
num_left += arrays[i].getLength();
}
return num_left;
}
/**
* close the stream and the MessageConsumer
*/
public void close() {
try {
consumer.close();
}
catch (JMSException jmsEx) {
}
}
/**
* @return true
*/
public boolean markSupported() {
return true;
}
/**
* Returns the stream to the position of the previous mark().
*
* @throws IOException
*/
public void reset() throws IOException {
if (markArray == -1)
throw new IOException("PooledArrayInputStream not marked!");
current = markArray;
offset = markOffset;
markArray = -1;
}
/**
* Set the stream's mark to the current position.
*
* @param readlimit
*/
public void mark(int readlimit) {
markArray = current;
markOffset = offset;
}
/**
* release up to the current buffer to GC
*
* @param index
*/
private void releaseBuffer(int index) {
if (markArray < 0 || index < markArray) {
for (int i = 0;i <= index;i++) {
arrays[index] = null;
}
}
}
/**
* fill the buffer
*
* @param requiredLength
* @throws IOException
*/
private void fillBuffer(int requiredLength) throws IOException {
int len = 0;
try {
do {
if (!closed) {
ActiveMQMessage msg = null;
if (len == 0 && requiredLength > 0) {
msg = (ActiveMQMessage) consumer.receive(2000);
}
else {
msg = (ActiveMQMessage) consumer.receiveNoWait();
}
if (msg != null) {
ByteArray ba = msg.getBodyAsBytes();
if (ba != null) {
len += ba.getLength();
process(ba);
}
}
else if (closed) {
break;
}
}
}
while (len < requiredLength && !closed);
}
catch (JMSException jmsEx) {
throw new IOException(jmsEx.getMessage());
}
}
/**
* Add an array to this PooledArrayInputStream.
*
* @param ba
*/
private void process(ByteArray ba) {
if (current == clen && (clen + 1) == arrays.length) {
offset = 0;
current = 0;
clen = 0;
if (arrays.length > ARRAY_SIZE && markArray == -1) {
arrays = new ByteArray[ARRAY_SIZE];
}
}
arrays[clen] = ba;
clen++;
if (clen == arrays.length) {
ByteArray[] old = arrays;
arrays = new ByteArray[old.length + ARRAY_SIZE];
System.arraycopy(old, 0, arrays, 0, old.length);
}
}
}