/*
* Copyright 2009 Red Hat, Inc.
* Red Hat licenses this file to you under the Apache License, version
* 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package org.hornetq.core.paging.impl;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.paging.PageTransactionInfo;
import org.hornetq.core.paging.PagingManager;
import org.hornetq.core.paging.PagingStore;
import org.hornetq.core.paging.PagingStoreFactory;
import org.hornetq.core.server.HornetQServerLogger;
import org.hornetq.core.settings.HierarchicalRepository;
import org.hornetq.core.settings.impl.AddressSettings;
/**
*
* @author <a href="mailto:clebert.suconic@jboss.com">Clebert Suconic</a>
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
* @author <a href="mailto:andy.taylor@jboss.org>Andy Taylor</a>
*
*/
public class PagingManagerImpl implements PagingManager
{
private volatile boolean started = false;
/**
* Lock used at the start of synchronization between a live server and its backup.
* Synchronization will lock all {@link PagingStore} instances, and so any operation here that
* requires a lock on a {@link PagingStore} instance needs to take a read-lock on
* {@link #syncLock} to avoid dead-locks.
*/
private final ReentrantReadWriteLock syncLock = new ReentrantReadWriteLock();
private final ConcurrentMap<SimpleString, PagingStore> stores = new ConcurrentHashMap<SimpleString, PagingStore>();
private final HierarchicalRepository<AddressSettings> addressSettingsRepository;
private final PagingStoreFactory pagingStoreFactory;
private volatile boolean cleanupEnabled = true;
private final ConcurrentMap</*TransactionID*/Long, PageTransactionInfo> transactions =
new ConcurrentHashMap<Long, PageTransactionInfo>();
// Static
// --------------------------------------------------------------------------------------------------------------------------
private static boolean isTrace = HornetQServerLogger.LOGGER.isTraceEnabled();
// Constructors
// --------------------------------------------------------------------------------------------------------------------
public PagingManagerImpl(final PagingStoreFactory pagingSPI,
final HierarchicalRepository<AddressSettings> addressSettingsRepository)
{
pagingStoreFactory = pagingSPI;
this.addressSettingsRepository = addressSettingsRepository;
addressSettingsRepository.registerListener(this);
}
@Override
public void onChange()
{
reaplySettings();
}
private void reaplySettings()
{
for (PagingStore store : stores.values())
{
AddressSettings settings = this.addressSettingsRepository.getMatch(store.getAddress().toString());
store.applySetting(settings);
}
}
public void disableCleanup()
{
if (!cleanupEnabled)
{
return;
}
lock();
try
{
cleanupEnabled = false;
for (PagingStore store: stores.values())
{
store.disableCleanup();
}
}
finally
{
unlock();
}
}
public void resumeCleanup()
{
if (cleanupEnabled)
{
return;
}
lock();
try
{
cleanupEnabled = true;
for (PagingStore store: stores.values())
{
store.enableCleanup();
}
}
finally
{
unlock();
}
}
public SimpleString[] getStoreNames()
{
Set<SimpleString> names = stores.keySet();
return names.toArray(new SimpleString[names.size()]);
}
public void reloadStores() throws Exception
{
lock();
try
{
List<PagingStore> reloadedStores = pagingStoreFactory.reloadStores(addressSettingsRepository);
for (PagingStore store : reloadedStores)
{
store.start();
stores.put(store.getStoreName(), store);
}
}
finally
{
unlock();
}
}
public void deletePageStore(final SimpleString storeName) throws Exception
{
syncLock.readLock().lock();
try
{
PagingStore store = stores.remove(storeName);
if (store != null)
{
store.stop();
}
}
finally
{
syncLock.readLock().unlock();
}
}
/** stores is a ConcurrentHashMap, so we don't need to synchronize this method */
public PagingStore getPageStore(final SimpleString storeName) throws Exception
{
PagingStore store = stores.get(storeName);
if (store != null)
{
return store;
}
return newStore(storeName);
}
public void addTransaction(final PageTransactionInfo pageTransaction)
{
if (isTrace)
{
HornetQServerLogger.LOGGER.trace("Adding pageTransaction " + pageTransaction.getTransactionID());
}
transactions.put(pageTransaction.getTransactionID(), pageTransaction);
}
public void removeTransaction(final long id)
{
if (isTrace)
{
HornetQServerLogger.LOGGER.trace("Removing pageTransaction " +id);
}
transactions.remove(id);
}
public PageTransactionInfo getTransaction(final long id)
{
if (isTrace)
{
HornetQServerLogger.LOGGER.trace("looking up pageTX = " + id);
}
return transactions.get(id);
}
@Override
public Map<Long, PageTransactionInfo> getTransactions()
{
return transactions;
}
@Override
public boolean isStarted()
{
return started;
}
@Override
public void start() throws Exception
{
lock();
try
{
if (started)
{
return;
}
pagingStoreFactory.setPagingManager(this);
reloadStores();
started = true;
}
finally
{
unlock();
}
}
public synchronized void stop() throws Exception
{
if (!started)
{
return;
}
started = false;
lock();
try
{
for (PagingStore store : stores.values())
{
store.stop();
}
pagingStoreFactory.stop();
}
finally
{
unlock();
}
}
public void processReload() throws Exception
{
for (PagingStore store: stores.values())
{
store.processReload();
}
}
private PagingStore newStore(final SimpleString address) throws Exception
{
syncLock.readLock().lock();
try {
PagingStore store = stores.get(address);
if (store == null)
{
store = pagingStoreFactory.newStore(address, addressSettingsRepository.getMatch(address.toString()));
store.start();
if (!cleanupEnabled)
{
store.disableCleanup();
}
stores.put(address, store);
}
return store;
}
finally
{
syncLock.readLock().unlock();
}
}
public void unlock()
{
syncLock.writeLock().unlock();
}
public void lock()
{
syncLock.writeLock().lock();
}
}