/*
* This file is part of FFMQ.
*
* FFMQ is free software; you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* FFMQ is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with FFMQ; if not, write to the Free Software
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*/
package net.timewalker.ffmq3.storage.data.impl;
import java.io.File;
import java.io.IOException;
import net.timewalker.ffmq3.management.destination.AbstractDestinationDescriptor;
import net.timewalker.ffmq3.storage.data.DataStoreException;
import net.timewalker.ffmq3.storage.data.impl.journal.BlockBasedDataStoreJournal;
import net.timewalker.ffmq3.storage.data.impl.journal.DirtyBlockTable;
import net.timewalker.ffmq3.storage.data.impl.journal.JournalRecovery;
import net.timewalker.ffmq3.utils.async.AsyncTaskManager;
import net.timewalker.ffmq3.utils.concurrent.SynchronizationBarrier;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* JournalingBlockBasedDataStore
*/
public final class JournalingBlockBasedDataStore extends AbstractBlockBasedDataStore
{
private static final Log log = LogFactory.getLog(JournalingBlockBasedDataStore.class);
// Journal related
private AsyncTaskManager asyncTaskManager;
private BlockBasedDataStoreJournal journal;
private DirtyBlockTable dirtyBlockTable = new DirtyBlockTable();
private boolean keepJournalFiles = System.getProperty("ffmq.dataStore.keepJournalFiles", "false").equals("true");
/**
* Constructor
*/
public JournalingBlockBasedDataStore( AbstractDestinationDescriptor descriptor , AsyncTaskManager asyncTaskManager )
{
super(descriptor);
this.asyncTaskManager = asyncTaskManager;
}
/* (non-Javadoc)
* @see net.timewalker.ffmq3.storage.data.impl.AbstractBlockBasedDataStore#initFilesystem()
*/
protected void initFilesystem() throws DataStoreException
{
super.initFilesystem();
// Check for remaining journal files ...
String baseName = descriptor.getName();
File dataFolder = descriptor.getDataFolder();
File[] journalFiles = BlockBasedDataStoreTools.findJournalFiles(baseName, dataFolder);
if (journalFiles.length > 0)
{
// Recovery
JournalRecovery recovery = new JournalRecovery(baseName,journalFiles, allocationTableRandomAccessFile, dataRandomAccessFile);
int newBlockCount = recovery.recover();
// Update block count if necessary
if (newBlockCount != -1)
this.blockCount = newBlockCount;
if (!keepJournalFiles)
{
for (int i = 0; i < journalFiles.length; i++)
{
if (!journalFiles[i].delete())
throw new DataStoreException("Cannot delete journal file : "+journalFiles[i].getAbsolutePath());
}
}
// Integrity check
log.warn("["+baseName+"] Forcing integrity check after journal recovery ...");
integrityCheck();
log.warn("["+baseName+"] Check complete.");
}
// Create new journal
this.journal =
new BlockBasedDataStoreJournal(baseName,
descriptor.getJournalFolder(),
descriptor.getMaxJournalSize(),
descriptor.getMaxWriteBatchSize(),
descriptor.getMaxUncommittedJournalSize(),
descriptor.getMaxUncommittedStoreSize(),
descriptor.getJournalOutputBuffer(),
allocationTableRandomAccessFile,
dataRandomAccessFile,
dirtyBlockTable,
asyncTaskManager);
}
/*
* (non-Javadoc)
* @see net.timewalker.ffmq3.storage.data.impl.AbstractBlockBasedDataStore#writeFirstBlock()
*/
protected void writeFirstBlock() throws DataStoreException
{
journal.writeMetaData(AT_HEADER_FIRSTBLOCK_OFFSET, firstBlock);
}
/*
* (non-Javadoc)
* @see net.timewalker.ffmq3.storage.data.impl.AbstractBlockBasedDataStore#writeAllocationBlock(int)
*/
protected void writeAllocationBlock( int blockIndex ) throws DataStoreException
{
byte[] allocationBlock = new byte[AT_BLOCK_SIZE];
// Regroup I/O to improve performance
allocationBlock[AB_FLAGS_OFFSET] = flags[blockIndex];
allocationBlock[AB_ALLOCSIZE_OFFSET] = (byte)((allocatedSize[blockIndex] >>> 24) & 0xFF);
allocationBlock[AB_ALLOCSIZE_OFFSET+1] = (byte)((allocatedSize[blockIndex] >>> 16) & 0xFF);
allocationBlock[AB_ALLOCSIZE_OFFSET+2] = (byte)((allocatedSize[blockIndex] >>> 8) & 0xFF);
allocationBlock[AB_ALLOCSIZE_OFFSET+3] = (byte)((allocatedSize[blockIndex] >>> 0) & 0xFF);
allocationBlock[AB_PREVBLOCK_OFFSET] = (byte)((previousBlock[blockIndex] >>> 24) & 0xFF);
allocationBlock[AB_PREVBLOCK_OFFSET+1] = (byte)((previousBlock[blockIndex] >>> 16) & 0xFF);
allocationBlock[AB_PREVBLOCK_OFFSET+2] = (byte)((previousBlock[blockIndex] >>> 8) & 0xFF);
allocationBlock[AB_PREVBLOCK_OFFSET+3] = (byte)((previousBlock[blockIndex] >>> 0) & 0xFF);
allocationBlock[AB_NEXTBLOCK_OFFSET] = (byte)((nextBlock[blockIndex] >>> 24) & 0xFF);
allocationBlock[AB_NEXTBLOCK_OFFSET+1] = (byte)((nextBlock[blockIndex] >>> 16) & 0xFF);
allocationBlock[AB_NEXTBLOCK_OFFSET+2] = (byte)((nextBlock[blockIndex] >>> 8) & 0xFF);
allocationBlock[AB_NEXTBLOCK_OFFSET+3] = (byte)((nextBlock[blockIndex] >>> 0) & 0xFF);
journal.writeMetaDataBlock(AT_HEADER_SIZE+blockIndex*AT_BLOCK_SIZE, allocationBlock);
}
/*
* (non-Javadoc)
* @see net.timewalker.ffmq3.storage.data.impl.AbstractBlockBasedDataStore#writeDataBlock(byte[], int, int, int)
*/
protected void writeDataBlock(byte[] data, int offset, int len, int blockHandle) throws DataStoreException
{
byte[] blockData = new byte[blockSize];
System.arraycopy(data, offset, blockData, 0, len);
dirtyBlockTable.markDirty(blockHandle, blockData);
journal.writeDataBlock(blockHandle, (long)blockHandle*blockSize, blockData);
}
/*
* (non-Javadoc)
* @see net.timewalker.ffmq3.storage.data.impl.AbstractBlockBasedDataStore#readDataBlock(byte[], int, int, int)
*/
protected void readDataBlock(byte[] data, int offset, int len, int blockHandle) throws DataStoreException
{
byte[] dirtyBlock = dirtyBlockTable.get(blockHandle);
if (dirtyBlock != null)
{
// Copy data from memory-cached dirty block
System.arraycopy(dirtyBlock, 0, data, offset, len);
}
else
{
try
{
// Make sure we do not conflict with an async write from journal
synchronized (dataRandomAccessFile)
{
dataRandomAccessFile.seek((long)blockHandle*blockSize);
if (dataRandomAccessFile.read(data,offset,len) != len)
throw new DataStoreException("Cannot read "+len+" bytes from store file");
}
}
catch (DataStoreException e)
{
throw e;
}
catch (IOException e)
{
throw new DataStoreException("Could not read data block "+blockHandle,e);
}
}
}
/* (non-Javadoc)
* @see net.timewalker.ffmq3.storage.data.impl.AbstractBlockBasedDataStore#extendStoreFiles(int, int)
*/
protected void extendStoreFiles(int oldBlockCount, int newBlockCount) throws DataStoreException
{
journal.extendStore(blockSize,oldBlockCount,newBlockCount);
}
/* (non-Javadoc)
* @see net.timewalker.ffmq3.storage.data.impl.AbstractBlockBasedDataStore#flush()
*/
protected void flush() throws DataStoreException
{
journal.flush();
}
/* (non-Javadoc)
* @see net.timewalker.ffmq3.storage.data.DataStore#commit(net.timewalker.ffmq3.utils.concurrent.SynchronizationBarrier)
*/
public void commitChanges(SynchronizationBarrier barrier) throws DataStoreException
{
journal.commit(barrier);
}
/* (non-Javadoc)
* @see net.timewalker.ffmq3.storage.data.DataStore#commit()
*/
public void commitChanges() throws DataStoreException
{
try
{
SynchronizationBarrier barrier = new SynchronizationBarrier();
journal.commit(barrier);
barrier.waitFor();
}
catch (InterruptedException e)
{
throw new DataStoreException("Wait for commit barrier was interrupted");
}
}
/* (non-Javadoc)
* @see net.timewalker.ffmq3.storage.data.impl.AbstractBlockBasedDataStore#close()
*/
public void close()
{
// Close journal first
try
{
commitChanges();
journal.close();
}
catch (DataStoreException e)
{
log.error("Could not properly close store journal",e);
}
super.close();
}
}