Package net.lr.karaf.jms.service.activemq

Source Code of net.lr.karaf.jms.service.activemq.ActiveMQJmsService

package net.lr.karaf.jms.service.activemq;

import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;
import java.util.Set;

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.QueueBrowser;
import javax.jms.Session;

import net.lr.karaf.jms.service.ExtJmsService;
import net.lr.karaf.jms.service.QueueInfo;
import net.lr.karaf.jms.template.JmsTemplate;
import net.lr.karaf.jms.template.SessionExecutor;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.advisory.DestinationSource;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.pool.PooledConnection;

public class ActiveMQJmsService implements ExtJmsService {

    private ActiveMQConnection connection;

    public ActiveMQJmsService(Connection connection) {
        this.connection = getActiveMQConnection(connection);
    }

    @Override
    public List<String> listQueues() {
        try {
            DestinationSource destSource = connection.getDestinationSource();
            Set<ActiveMQQueue> queues = destSource.getQueues();
            List<String> names = new ArrayList<String>();
            for (ActiveMQQueue queue : queues) {
                names.add(queue.getQueueName());
            }
            return names;
        } catch (JMSException e) {
            throw new RuntimeException(e.getMessage(), e);
        }
    }

    @Override
    public List<String> listTopics() {
        try {
            DestinationSource destSource = connection.getDestinationSource();
            Set<ActiveMQTopic> topics = destSource.getTopics();
            List<String> names = new ArrayList<String>();
            for (ActiveMQTopic queue : topics) {
                names.add(queue.getTopicName());
            }
            return names;
        } catch (JMSException e) {
            throw new RuntimeException(e.getMessage(), e);
        }
    }

    private ActiveMQConnection getActiveMQConnection(final Connection connection) {
        Connection physConnection = connection;
        if (connection instanceof PooledConnection) {
            try {
                physConnection = ((PooledConnection)connection).getConnection();
            } catch (JMSException e) {
                throw new RuntimeException(e.getMessage(), e);
            }
        }
        ActiveMQConnection aConnection = (ActiveMQConnection)physConnection;
        return aConnection;
    }

    @Override
    public void deleteQueue(final String queueName) {
    }

    private Integer getMessagesInQueue(final String queueName) {
        return new JmsTemplate(this.connection).doInSession(new SessionExecutor<Integer>() {
            public Integer execute(Session session) throws JMSException {
                QueueBrowser browser = session.createBrowser(session.createQueue(queueName));
                @SuppressWarnings("unchecked")
                Enumeration<Message> en = browser.getEnumeration();
                int count = 0;
                while (en.hasMoreElements()) {
                    en.nextElement();
                    count ++;
                }
                return count;
            }
        });
    }

    @Override
    public QueueInfo getQueueInfo(String queueName) {
        QueueInfo qi = new QueueInfo();
        qi.setName(queueName);
        qi.setNumPendingMessages(getMessagesInQueue(queueName));
        return qi;
    }

}
TOP

Related Classes of net.lr.karaf.jms.service.activemq.ActiveMQJmsService

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.