Package org.apache.qpid.server.virtualhost

Source Code of org.apache.qpid.server.virtualhost.RemoveExpiredMessagesTask

/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*   http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied.  See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.server.virtualhost;

import javax.management.NotCompliantMBeanException;

import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.log4j.Logger;
import org.apache.qpid.server.AMQBrokerManagerMBean;
import org.apache.qpid.server.security.access.ACLPlugin;
import org.apache.qpid.server.security.access.ACLManager;
import org.apache.qpid.server.security.access.Accessable;
import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
import org.apache.qpid.server.configuration.Configurator;
import org.apache.qpid.server.exchange.DefaultExchangeFactory;
import org.apache.qpid.server.exchange.DefaultExchangeRegistry;
import org.apache.qpid.server.exchange.ExchangeFactory;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.management.AMQManagedObject;
import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.queue.DefaultQueueRegistry;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.AMQException;

import java.util.Timer;
import java.util.TimerTask;

public class VirtualHost implements Accessable
{
    private static final Logger _logger = Logger.getLogger(VirtualHost.class);


    private final String _name;

    private QueueRegistry _queueRegistry;

    private ExchangeRegistry _exchangeRegistry;

    private ExchangeFactory _exchangeFactory;

    private MessageStore _messageStore;

    protected VirtualHostMBean _virtualHostMBean;

    private AMQBrokerManagerMBean _brokerMBean;

    private AuthenticationManager _authenticationManager;

    private ACLPlugin _accessManager;

    private final Timer _houseKeepingTimer = new Timer("Queue-housekeeping", true);
    
    private static final long DEFAULT_HOUSEKEEPING_PERIOD = 30000L;
   
    public void setAccessableName(String name)
    {
        _logger.warn("Setting Accessable Name for VirualHost is not allowed. ("
                     + name + ") ignored remains :" + getAccessableName());
    }

    public String getAccessableName()
    {
        return _name;
    }


    /**
     * Abstract MBean class. This has some of the methods implemented from management intrerface for exchanges. Any
     * implementaion of an Exchange MBean should extend this class.
     */
    public class VirtualHostMBean extends AMQManagedObject implements ManagedVirtualHost
    {
        public VirtualHostMBean() throws NotCompliantMBeanException
        {
            super(ManagedVirtualHost.class, "VirtualHost");
        }

        public String getObjectInstanceName()
        {
            return _name.toString();
        }

        public String getName()
        {
            return _name.toString();
        }

        public VirtualHost getVirtualHost()
        {
            return VirtualHost.this;
        }


    } // End of MBean class

    /**
     * Used for testing only
     * @param name
     * @param store
     * @throws Exception
     */
    public VirtualHost(String name, MessageStore store) throws Exception
    {
        this(name, new PropertiesConfiguration(), store);
    }

    /**
     * Normal Constructor
     * @param name
     * @param hostConfig
     * @throws Exception
     */
    public VirtualHost(String name, Configuration hostConfig) throws Exception
    {
        this(name, hostConfig, null);
    }

    public VirtualHost(String name, Configuration hostConfig, MessageStore store) throws Exception
    {
        _name = name;

        _virtualHostMBean = new VirtualHostMBean();
        // This isn't needed to be registered
        //_virtualHostMBean.register();

        _queueRegistry = new DefaultQueueRegistry(this);
        _exchangeFactory = new DefaultExchangeFactory(this);
        _exchangeFactory.initialise(hostConfig);
        _exchangeRegistry = new DefaultExchangeRegistry(this);

        if (store != null)
        {
            _messageStore = store;
        }
        else
        {
            if (hostConfig == null)
            {
                throw new IllegalAccessException("HostConfig and MessageStore cannot be null");
            }
            initialiseMessageStore(hostConfig);
        }

        _exchangeRegistry.initialise();

        _authenticationManager = new PrincipalDatabaseAuthenticationManager(name, hostConfig);

        _accessManager = ACLManager.loadACLManager(name, hostConfig);

        _brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean);
        _brokerMBean.register();
        initialiseHouseKeeping(hostConfig);
    }

    private void initialiseHouseKeeping(final Configuration hostConfig)
    {
    
      long period = hostConfig.getLong("housekeeping.expiredMessageCheckPeriod", DEFAULT_HOUSEKEEPING_PERIOD);
   
      /* add a timer task to iterate over queues, cleaning expired messages from queues with no consumers */
      if(period != 0L)
      {
        class RemoveExpiredMessagesTask extends TimerTask
        {
          public void run()
          {
            for(AMQQueue q : _queueRegistry.getQueues())
            {

              try
              {
                q.removeExpiredIfNoSubscribers();
              }
              catch (AMQException e)
              {
                _logger.error("Exception in housekeeping for queue: " + q.getName().toString(),e);
                throw new RuntimeException(e);
              }
            }
          }
        }
       
        _houseKeepingTimer.scheduleAtFixedRate(new RemoveExpiredMessagesTask(),
            period/2,
            period);
      }
    }
   
    private void initialiseMessageStore(Configuration config) throws Exception
    {
        String messageStoreClass = config.getString("store.class");

        Class clazz = Class.forName(messageStoreClass);
        Object o = clazz.newInstance();

        if (!(o instanceof MessageStore))
        {
            throw new ClassCastException("Message store class must implement " + MessageStore.class + ". Class " + clazz +
                                         " does not.");
        }
        _messageStore = (MessageStore) o;
        _messageStore.configure(this, "store", config);
    }


    public <T> T getConfiguredObject(Class<T> instanceType, Configuration config)
    {
        T instance;
        try
        {
            instance = instanceType.newInstance();
        }
        catch (Exception e)
        {
            _logger.error("Unable to instantiate configuration class " + instanceType + " - ensure it has a public default constructor");
            throw new IllegalArgumentException("Unable to instantiate configuration class " + instanceType + " - ensure it has a public default constructor", e);
        }
        Configurator.configure(instance);

        return instance;
    }


    public String getName()
    {
        return _name;
    }

    public QueueRegistry getQueueRegistry()
    {
        return _queueRegistry;
    }

    public ExchangeRegistry getExchangeRegistry()
    {
        return _exchangeRegistry;
    }

    public ExchangeFactory getExchangeFactory()
    {
        return _exchangeFactory;
    }

    public ApplicationRegistry getApplicationRegistry()
    {
        throw new UnsupportedOperationException();
    }

    public MessageStore getMessageStore()
    {
        return _messageStore;
    }

    public AuthenticationManager getAuthenticationManager()
    {
        return _authenticationManager;
    }

    public ACLPlugin getAccessManager()
    {
        return _accessManager;
    }

    public void close() throws Exception
    {
        if (_houseKeepingTimer != null)
        {
            _houseKeepingTimer.cancel();
        }
        if (_messageStore != null)
        {
            _messageStore.close();
        }
    }

    public ManagedObject getBrokerMBean()
    {
        return _brokerMBean;
    }

    public ManagedObject getManagedObject()
    {
        return _virtualHostMBean;
    }
}
TOP

Related Classes of org.apache.qpid.server.virtualhost.RemoveExpiredMessagesTask

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.