Package org.codehaus.activemq.streams

Source Code of org.codehaus.activemq.streams.JMSInputStream

/**
*
* Copyright 2004 Protique Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
**/

package org.codehaus.activemq.streams;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import org.codehaus.activemq.message.ActiveMQMessage;
import org.codehaus.activemq.message.util.ByteArray;

/**
* An inputStream that reads data from a MessageConsumer
*
* @version $Revision: 1.1 $
*/
public class JMSInputStream extends InputStream {
    private static final int ARRAY_SIZE = 10;
    private boolean closed;
    protected ByteArray[] arrays = new ByteArray[ARRAY_SIZE];
    private int offset;
    private int current = 0;
    protected int clen = 0;
    private int markArray = -1;
    private int markOffset = -1;
    private MessageConsumer consumer;

    /**
     * Construct an input stream to read from a JMS Consumer
     *
     * @param consumer
     */
    public JMSInputStream(MessageConsumer consumer) {
        this.consumer = consumer;
    }

    /**
     * Read the next byte from this stream.
     *
     * @return the next byte
     * @throws IOException
     */
    public int read() throws IOException {
        if (closed)
            throw new EOFException("JMSInputStream is closed");
        if (current == clen) {
            fillBuffer(1);
        }
        int c = (arrays[current].get(offset) & 0xff);
        offset++;
        if (offset == arrays[current].getLength()) {
            offset = 0;
            releaseBuffer(current);
            current++;
        }
        return c;
    }

    /**
     * Read data from this input stream into the given byte array starting at offset 0 for b.length bytes. Returns the
     * actual number of bytes read;
     *
     * @param b
     * @return the number of bytes read
     * @throws IOException
     */
    public int read(byte b[]) throws IOException {
        return read(b, 0, b.length);
    }

    /**
     * Read data from this input stream into the given byte array starting at offset 'off' for 'len' bytes. Returns the
     * actual number of bytes read.
     *
     * @param b buffer to read data into
     * @param off offset into b
     * @param len the maximum length
     * @return the number of bytes actually read
     * @throws IOException
     */
    public int read(byte b[], int off, int len) throws IOException {
        if (closed)
            throw new EOFException("JMSInputStream is closed");
        int n = off;
        int total = 0;
        int last = Math.min(off + len, b.length);
        if (current == clen) {
            fillBuffer(len);
        }
        while ((current < clen) && (n < last)) {
            int num_left = arrays[current].getLength() - offset;
            int tocopy = Math.min(num_left, last - n);
            System.arraycopy(arrays[current].getBuf(), offset, b, n, tocopy);
            total += tocopy;
            n += tocopy;
            offset += tocopy;
            if (offset == arrays[current].getLength()) {
                offset = 0;
                releaseBuffer(current);
                current++;
            }
        }
        return total;
    }

    /**
     * Skip n bytes in this stream; returns the number of bytes actually skipped (which may be less than the number
     * requested).
     *
     * @param length the number of bytes to skip
     * @return the number of bytes actually skipped
     * @throws IOException
     */
    public long skip(long length) throws IOException {
        if (closed)
            throw new EOFException("JMSInputStream is closed!");
        int requested = Math.min((int) length, Integer.MAX_VALUE);
        int totalskipped = 0;
        while ((current < clen) && (arrays[current] != null) && (requested > 0)) {
            if (current == clen) {
                break;
            }
            int num_left = arrays[current].getLength() - offset;
            if (num_left < requested) {
                requested -= num_left;
                totalskipped += num_left;
                releaseBuffer(current);
                current++;
                offset = 0;
            }
            else {
                totalskipped += requested;
                offset += requested;
                requested = 0;
            }
        }
        return totalskipped;
    }

    /**
     * Return the number of bytes available for reading.
     *
     * @return the number of bytes available
     * @throws IOException
     */
    public int available() throws IOException {
        if (closed)
            throw new EOFException("JMSInputStream is closed!");
        fillBuffer(0);
        if (current == clen)
            return 0;
        int num_left = arrays[current].getLength() - offset;
        for (int i = current + 1;i < clen;i++) {
            if (arrays[i] == null)
                break;
            num_left += arrays[i].getLength();
        }
        return num_left;
    }

    /**
     * close the stream and the MessageConsumer
     */
    public void close() {
        try {
            consumer.close();
        }
        catch (JMSException jmsEx) {
        }
    }

    /**
     * @return true
     */
    public boolean markSupported() {
        return true;
    }

    /**
     * Returns the stream to the position of the previous mark().
     *
     * @throws IOException
     */
    public void reset() throws IOException {
        if (markArray == -1)
            throw new IOException("PooledArrayInputStream not marked!");
        current = markArray;
        offset = markOffset;
        markArray = -1;
    }

    /**
     * Set the stream's mark to the current position.
     *
     * @param readlimit
     */
    public void mark(int readlimit) {
        markArray = current;
        markOffset = offset;
    }

    /**
     * release up to the current buffer to GC
     *
     * @param index
     */
    private void releaseBuffer(int index) {
        if (markArray < 0 || index < markArray) {
            for (int i = 0;i <= index;i++) {
                arrays[index] = null;
            }
        }
    }

    /**
     * fill the buffer
     *
     * @param requiredLength
     * @throws IOException
     */
    private void fillBuffer(int requiredLength) throws IOException {
        int len = 0;
        try {
            do {
                if (!closed) {
                    ActiveMQMessage msg = null;
                    if (len == 0 && requiredLength > 0) {
                        msg = (ActiveMQMessage) consumer.receive(2000);
                    }
                    else {
                        msg = (ActiveMQMessage) consumer.receiveNoWait();
                    }
                    if (msg != null) {
                        ByteArray ba = msg.getBodyAsBytes();
                        if (ba != null) {
                            len += ba.getLength();
                            process(ba);
                        }
                    }
                    else if (closed) {
                        break;
                    }
                }
            }
            while (len < requiredLength && !closed);
        }
        catch (JMSException jmsEx) {
            throw new IOException(jmsEx.getMessage());
        }
    }

    /**
     * Add an array to this PooledArrayInputStream.
     *
     * @param ba
     */
    private void process(ByteArray ba) {
        if (current == clen && (clen + 1) == arrays.length) {
            offset = 0;
            current = 0;
            clen = 0;
            if (arrays.length > ARRAY_SIZE && markArray == -1) {
                arrays = new ByteArray[ARRAY_SIZE];
            }
        }
        arrays[clen] = ba;
        clen++;
        if (clen == arrays.length) {
            ByteArray[] old = arrays;
            arrays = new ByteArray[old.length + ARRAY_SIZE];
            System.arraycopy(old, 0, arrays, 0, old.length);
        }
    }
}
TOP

Related Classes of org.codehaus.activemq.streams.JMSInputStream

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.