/*
* Created on 31-Jul-2004
* Created by Paul Gardner
* Copyright (C) 2004, 2005, 2006 Aelitis, All Rights Reserved.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*
* AELITIS, SAS au capital de 46,603.30 euros
* 8 Allee Lenotre, La Grille Royale, 78600 Le Mesnil le Roi, France.
*
*/
package org.gudy.azureus2.core3.disk.impl.access.impl;
import java.util.*;
import org.gudy.azureus2.core3.config.COConfigurationManager;
import org.gudy.azureus2.core3.config.ParameterListener;
import org.gudy.azureus2.core3.disk.*;
import org.gudy.azureus2.core3.disk.impl.DiskManagerFileInfoImpl;
import org.gudy.azureus2.core3.disk.impl.DiskManagerHelper;
import org.gudy.azureus2.core3.disk.impl.DiskManagerRecheckInstance;
import org.gudy.azureus2.core3.disk.impl.access.DMChecker;
import org.gudy.azureus2.core3.disk.impl.piecemapper.DMPieceList;
import org.gudy.azureus2.core3.disk.impl.piecemapper.DMPieceMapEntry;
import org.gudy.azureus2.core3.logging.*;
import org.gudy.azureus2.core3.util.*;
import com.aelitis.azureus.core.diskmanager.cache.CacheFile;
/**
* @author parg
*
*/
public class
DMCheckerImpl
implements DMChecker
{
protected static final LogIDs LOGID = LogIDs.DISK;
private static boolean flush_pieces;
private static boolean checking_read_priority;
private static AEMonitor class_mon = new AEMonitor( "DMChecker:class" );
private static List async_check_queue = new ArrayList();
private static AESemaphore async_check_queue_sem = new AESemaphore("DMChecker::asyncCheck");
private static boolean fully_async = COConfigurationManager.getBooleanParameter( "diskmanager.perf.checking.fully.async" );
static{
if ( fully_async ){
new AEThread2( "DMCheckerImpl:asyncCheckScheduler", true )
{
public void
run()
{
while( true ){
async_check_queue_sem.reserve();
Object[] entry;
try{
class_mon.enter();
entry = (Object[])async_check_queue.remove(0);
int queue_size = async_check_queue.size();
if ( queue_size % 100 == 0 && queue_size > 0 ){
System.out.println( "async check queue size=" + async_check_queue.size());
}
}finally{
class_mon.exit();
}
((DMCheckerImpl)entry[0]).enqueueCheckRequest(
(DiskManagerCheckRequest)entry[1],
(DiskManagerCheckRequestListener)entry[2],
flush_pieces );
}
}
}.start();
}
}
static{
ParameterListener param_listener = new ParameterListener() {
public void
parameterChanged(
String str )
{
flush_pieces = COConfigurationManager.getBooleanParameter( "diskmanager.perf.cache.flushpieces" );
checking_read_priority = COConfigurationManager.getBooleanParameter( "diskmanager.perf.checking.read.priority" );
}
};
COConfigurationManager.addAndFireParameterListeners(
new String[]{
"diskmanager.perf.cache.flushpieces",
"diskmanager.perf.checking.read.priority" },
param_listener );
}
protected DiskManagerHelper disk_manager;
protected int async_checks;
protected AESemaphore async_check_sem = new AESemaphore("DMChecker::asyncCheck");
protected int async_reads;
protected AESemaphore async_read_sem = new AESemaphore("DMChecker::asyncRead");
private boolean started;
protected volatile boolean stopped;
private volatile boolean complete_recheck_in_progress;
private volatile int complete_recheck_progress;
private boolean checking_enabled = true;
protected AEMonitor this_mon = new AEMonitor( "DMChecker" );
public
DMCheckerImpl(
DiskManagerHelper _disk_manager )
{
disk_manager = _disk_manager;
}
public void
start()
{
try{
this_mon.enter();
if ( started ){
throw( new RuntimeException( "DMChecker: start while started"));
}
if ( stopped ){
throw( new RuntimeException( "DMChecker: start after stopped"));
}
started = true;
}finally{
this_mon.exit();
}
}
public void
stop()
{
int check_wait;
int read_wait;
try{
this_mon.enter();
if ( stopped || !started ){
return;
}
// when we exit here we guarantee that all file usage operations have completed
// i.e. writes and checks (checks being doubly async)
stopped = true;
read_wait = async_reads;
check_wait = async_checks;
}finally{
this_mon.exit();
}
long log_time = SystemTime.getCurrentTime();
// wait for reads
for (int i=0;i<read_wait;i++){
long now = SystemTime.getCurrentTime();
if ( now < log_time ){
log_time = now;
}else{
if ( now - log_time > 1000 ){
log_time = now;
if ( Logger.isEnabled()){
Logger.log(new LogEvent(disk_manager, LOGID, "Waiting for check-reads to complete - " + (read_wait-i) + " remaining" ));
}
}
}
async_read_sem.reserve();
}
log_time = SystemTime.getCurrentTime();
// wait for checks
for (int i=0;i<check_wait;i++){
long now = SystemTime.getCurrentTime();
if ( now < log_time ){
log_time = now;
}else{
if ( now - log_time > 1000 ){
log_time = now;
if ( Logger.isEnabled()){
Logger.log(new LogEvent(disk_manager, LOGID, "Waiting for checks to complete - " + (read_wait-i) + " remaining" ));
}
}
}
async_check_sem.reserve();
}
}
public int
getCompleteRecheckStatus()
{
if (complete_recheck_in_progress ){
return( complete_recheck_progress );
}else{
return( -1 );
}
}
public void
setCheckingEnabled(
boolean enabled )
{
checking_enabled = enabled;
}
public DiskManagerCheckRequest
createCheckRequest(
int pieceNumber,
Object user_data )
{
return( new DiskManagerCheckRequestImpl( pieceNumber, user_data ));
}
public void
enqueueCompleteRecheckRequest(
final DiskManagerCheckRequest request,
final DiskManagerCheckRequestListener listener )
{
if ( !checking_enabled ){
listener.checkCompleted( request, true );
return;
}
complete_recheck_progress = 0;
complete_recheck_in_progress = true;
new AEThread2("DMChecker::completeRecheck", true )
{
public void
run()
{
DiskManagerRecheckInstance recheck_inst = disk_manager.getRecheckScheduler().register( disk_manager, true );
try{
final AESemaphore sem = new AESemaphore( "DMChecker::completeRecheck" );
int checks_submitted = 0;
final AESemaphore run_sem = new AESemaphore( "DMChecker::completeRecheck:runsem", 2 );
int nbPieces = disk_manager.getNbPieces();
for ( int i=0; i < nbPieces; i++ ){
complete_recheck_progress = 1000*i / nbPieces;
DiskManagerPiece dm_piece = disk_manager.getPiece(i);
// only recheck the piece if it happens to be done (a complete dnd file that's
// been set back to dnd for example) or the piece is part of a non-dnd file
if ( dm_piece.isDone() || !dm_piece.isSkipped()){
run_sem.reserve();
while( !stopped ){
if ( recheck_inst.getPermission()){
break;
}
}
if ( stopped ){
break;
}
final DiskManagerCheckRequest this_request = createCheckRequest( i, request.getUserData());
enqueueCheckRequest(
this_request,
new DiskManagerCheckRequestListener()
{
public void
checkCompleted(
DiskManagerCheckRequest request,
boolean passed )
{
try{
listener.checkCompleted( request, passed );
}catch( Throwable e ){
Debug.printStackTrace(e);
}finally{
complete();
}
}
public void
checkCancelled(
DiskManagerCheckRequest request )
{
try{
listener.checkCancelled( request );
}catch( Throwable e ){
Debug.printStackTrace(e);
}finally{
complete();
}
}
public void
checkFailed(
DiskManagerCheckRequest request,
Throwable cause )
{
try{
listener.checkFailed( request, cause );
}catch( Throwable e ){
Debug.printStackTrace(e);
}finally{
complete();
} }
protected void
complete()
{
run_sem.release();
sem.release();
}
},
false );
checks_submitted++;
}
}
// wait for all to complete
for (int i=0;i<checks_submitted;i++){
sem.reserve();
}
}finally{
complete_recheck_in_progress = false;
recheck_inst.unregister();
}
}
}.start();
}
public void
enqueueCheckRequest(
DiskManagerCheckRequest request,
DiskManagerCheckRequestListener listener )
{
if ( fully_async ){
// if the disk controller read-queue is full then normal the read-request allocation
// will block. This option forces the check request to be scheduled off the caller's
// thread
try{
class_mon.enter();
async_check_queue.add( new Object[]{ this, request, listener });
if ( async_check_queue.size() % 100 == 0 ){
System.out.println( "async check queue size=" + async_check_queue.size());
}
}finally{
class_mon.exit();
}
async_check_queue_sem.release();
}else{
enqueueCheckRequest( request, listener, flush_pieces );
}
}
public boolean
hasOutstandingCheckRequestForPiece(
int piece_number )
{
if ( fully_async ){
try{
class_mon.enter();
for (int i=0;i<async_check_queue.size();i++){
Object[] entry = (Object[])async_check_queue.get(i);
if ( entry[0] == this ){
DiskManagerCheckRequest request = (DiskManagerCheckRequest)entry[1];
if ( request.getPieceNumber() == piece_number ){
return( true );
}
}
}
}finally{
class_mon.exit();
}
}
return( false );
}
protected void
enqueueCheckRequest(
final DiskManagerCheckRequest request,
final DiskManagerCheckRequestListener listener,
boolean read_flush )
{
// everything comes through here - the interceptor listener maintains the piece state and
// does logging
request.requestStarts();
enqueueCheckRequestSupport(
request,
new DiskManagerCheckRequestListener()
{
public void
checkCompleted(
DiskManagerCheckRequest request,
boolean passed )
{
request.requestEnds( true );
try{
int piece_number = request.getPieceNumber();
DiskManagerPiece piece = disk_manager.getPiece(request.getPieceNumber());
piece.setDone( passed );
if ( passed ){
DMPieceList piece_list = disk_manager.getPieceList( piece_number );
for (int i = 0; i < piece_list.size(); i++) {
DMPieceMapEntry piece_entry = piece_list.get(i);
piece_entry.getFile().dataChecked( piece_entry.getOffset(), piece_entry.getLength());
}
}
}finally{
listener.checkCompleted( request, passed );
if (Logger.isEnabled()){
if ( passed ){
Logger.log(new LogEvent(disk_manager, LOGID, LogEvent.LT_INFORMATION,
"Piece " + request.getPieceNumber() + " passed hash check."));
}else{
Logger.log(new LogEvent(disk_manager, LOGID, LogEvent.LT_WARNING,
"Piece " + request.getPieceNumber() + " failed hash check."));
}
}
}
}
public void
checkCancelled(
DiskManagerCheckRequest request )
{
request.requestEnds( false );
// don't explicitly mark a piece as failed if we get a cancellation as the
// existing state will suffice. Either we're rechecking because it is bad
// already (in which case it won't be done, or we're doing a recheck-on-complete
// in which case the state is ok and musn't be flipped to bad
listener.checkCancelled( request );
if (Logger.isEnabled()){
Logger.log(new LogEvent(disk_manager, LOGID, LogEvent.LT_WARNING,
"Piece " + request.getPieceNumber() + " hash check cancelled."));
}
}
public void
checkFailed(
DiskManagerCheckRequest request,
Throwable cause )
{
request.requestEnds( false );
try{
disk_manager.getPiece(request.getPieceNumber()).setDone( false );
}finally{
listener.checkFailed( request, cause );
if (Logger.isEnabled()){
Logger.log(new LogEvent(disk_manager, LOGID, LogEvent.LT_WARNING,
"Piece " + request.getPieceNumber() + " failed hash check - " + Debug.getNestedExceptionMessage( cause )));
}
}
}
}, read_flush );
}
protected void
enqueueCheckRequestSupport(
final DiskManagerCheckRequest request,
final DiskManagerCheckRequestListener listener,
boolean read_flush )
{
if ( !checking_enabled ){
listener.checkCompleted( request, true );
return;
}
final int pieceNumber = request.getPieceNumber();
try{
final byte[] required_hash = disk_manager.getPieceHash(pieceNumber);
// quick check that the files that make up this piece are at least big enough
// to warrant reading the data to check
// also, if the piece is entirely compact then we can immediately
// fail as we don't actually have any data for the piece (or can assume we don't)
// we relax this a bit to catch pieces that are part of compact files with less than
// three pieces as it is possible that these were once complete and have all their bits
// living in retained compact areas
final DMPieceList pieceList = disk_manager.getPieceList(pieceNumber);
try{
// there are other comments in the code about the existence of 0 length piece lists
// just in case these still occur for who knows what reason ensure that a 0 length list
// causes the code to carry on and do the check (i.e. it is no worse that before this
// optimisation was added...)
boolean all_compact = pieceList.size() > 0;
for (int i = 0; i < pieceList.size(); i++) {
DMPieceMapEntry piece_entry = pieceList.get(i);
DiskManagerFileInfoImpl file_info = piece_entry.getFile();
CacheFile cache_file = file_info.getCacheFile();
if ( cache_file.compareLength( piece_entry.getOffset()) < 0 ){
listener.checkCompleted( request, false );
return;
}
if ( all_compact ){
int st = cache_file.getStorageType();
if (( st != CacheFile.CT_COMPACT && st != CacheFile.CT_PIECE_REORDER_COMPACT ) || file_info.getNbPieces() <= 2 ){
all_compact = false;
}
}
}
if ( all_compact ){
// System.out.println( "Piece " + pieceNumber + " is all compact, failing hash check" );
listener.checkCompleted( request, false );
return;
}
}catch( Throwable e ){
// we can fail here if the disk manager has been stopped as the cache file length access may be being
// performed on a "closed" (i.e. un-owned) file
listener.checkCancelled( request );
return;
}
int this_piece_length = disk_manager.getPieceLength( pieceNumber );
DiskManagerReadRequest read_request = disk_manager.createReadRequest( pieceNumber, 0, this_piece_length );
try{
this_mon.enter();
if ( stopped ){
listener.checkCancelled( request );
return;
}
async_reads++;
}finally{
this_mon.exit();
}
read_request.setFlush( read_flush );
read_request.setUseCache( !request.isAdHoc());
disk_manager.enqueueReadRequest(
read_request,
new DiskManagerReadRequestListener()
{
public void
readCompleted(
DiskManagerReadRequest read_request,
DirectByteBuffer buffer )
{
complete();
try{
this_mon.enter();
if ( stopped ){
buffer.returnToPool();
listener.checkCancelled( request );
return;
}
async_checks++;
}finally{
this_mon.exit();
}
try{
final DirectByteBuffer f_buffer = buffer;
ConcurrentHasher.getSingleton().addRequest(
buffer.getBuffer(DirectByteBuffer.SS_DW),
new ConcurrentHasherRequestListener()
{
public void
complete(
ConcurrentHasherRequest hash_request )
{
int async_result = 3; // cancelled
try{
byte[] actual_hash = hash_request.getResult();
if ( actual_hash != null ){
request.setHash( actual_hash );
async_result = 1; // success
for (int i = 0; i < actual_hash.length; i++){
if ( actual_hash[i] != required_hash[i]){
async_result = 2; // failed;
break;
}
}
}
}finally{
try{
if ( async_result == 1 ){
try{
for (int i = 0; i < pieceList.size(); i++) {
DMPieceMapEntry piece_entry = pieceList.get(i);
DiskManagerFileInfoImpl file_info = piece_entry.getFile();
// edge case here for skipped zero length files that have been deleted
if ( file_info.getLength() > 0 || !file_info.isSkipped()){
CacheFile cache_file = file_info.getCacheFile();
cache_file.setPieceComplete( pieceNumber, f_buffer );
}
}
}catch( Throwable e ){
f_buffer.returnToPool();
Debug.out( e );
listener.checkFailed( request, e );
return;
}
}
f_buffer.returnToPool();
if ( async_result == 1 ){
listener.checkCompleted( request, true );
}else if ( async_result == 2 ){
listener.checkCompleted( request, false );
}else{
listener.checkCancelled( request );
}
}finally{
try{
this_mon.enter();
async_checks--;
if ( stopped ){
async_check_sem.release();
}
}finally{
this_mon.exit();
}
}
}
}
},
request.isLowPriority());
}catch( Throwable e ){
Debug.printStackTrace(e);
buffer.returnToPool();
listener.checkFailed( request, e );
}
}
public void
readFailed(
DiskManagerReadRequest read_request,
Throwable cause )
{
complete();
listener.checkFailed( request, cause );
}
public int
getPriority()
{
return( checking_read_priority?0:-1 );
}
public void
requestExecuted(long bytes)
{
}
protected void
complete()
{
try{
this_mon.enter();
async_reads--;
if ( stopped ){
async_read_sem.release();
}
}finally{
this_mon.exit();
}
}
});
}catch( Throwable e ){
disk_manager.setFailed( "Piece check error - " + Debug.getNestedExceptionMessage(e));
Debug.printStackTrace( e );
listener.checkFailed( request, e );
}
}
}