/**
* Copyright 2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.dfs;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.ArrayList;
import java.util.Random;
import java.util.TreeSet;
import org.apache.commons.logging.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSOutputStream;
import org.apache.hadoop.io.UTF8;
import org.apache.hadoop.util.ToolBase;
/**
* This class provides rudimentary checking of DFS volumes for errors and
* sub-optimal conditions.
* <p>The tool scans all files and directories, starting from an indicated
* root path. The following abnormal conditions are detected and handled:</p>
* <ul>
* <li>files with blocks that are completely missing from all datanodes.<br/>
* In this case the tool can perform one of the following actions:
* <ul>
* <li>none ({@link #FIXING_NONE})</li>
* <li>move corrupted files to /lost+found directory on DFS
* ({@link #FIXING_MOVE}). Remaining data blocks are saved as a
* block chains, representing longest consecutive series of valid blocks.</li>
* <li>delete corrupted files ({@link #FIXING_DELETE})</li>
* </ul>
* </li>
* <li>detect files with under-replicated or over-replicated blocks</li>
* </ul>
* Additionally, the tool collects a detailed overall DFS statistics, and
* optionally can print detailed statistics on block locations and replication
* factors of each file.
*
* @author Andrzej Bialecki
*/
public class DFSck extends ToolBase {
private static final Log LOG = LogFactory.getLog(DFSck.class.getName());
/** Don't attempt any fixing . */
public static final int FIXING_NONE = 0;
/** Move corrupted files to /lost+found . */
public static final int FIXING_MOVE = 1;
/** Delete corrupted files. */
public static final int FIXING_DELETE = 2;
private DFSClient dfs;
private UTF8 lostFound = null;
private boolean lfInited = false;
private boolean lfInitedOk = false;
private boolean showFiles = false;
private boolean showBlocks = false;
private boolean showLocations = false;
private int fixing;
DFSck() {
}
/**
* Filesystem checker.
* @param conf current Configuration
* @param fixing one of pre-defined values
* @param showFiles show each file being checked
* @param showBlocks for each file checked show its block information
* @param showLocations for each block in each file show block locations
* @throws Exception
*/
public DFSck(Configuration conf, int fixing, boolean showFiles, boolean showBlocks, boolean showLocations) throws Exception {
setConf(conf);
init(fixing, showFiles, showBlocks, showLocations);
}
public void init(int fixing, boolean showFiles,
boolean showBlocks, boolean showLocations) throws IOException {
String fsName = conf.get("fs.default.name", "local");
if (fsName.equals("local")) {
throw new IOException("This tool only checks DFS, but your config uses 'local' FS.");
}
this.dfs = new DFSClient(DataNode.createSocketAddr(fsName), conf);
this.fixing = fixing;
this.showFiles = showFiles;
this.showBlocks = showBlocks;
this.showLocations = showLocations;
}
/**
* Check files on DFS, starting from the indicated path.
* @param path starting point
* @return result of checking
* @throws Exception
*/
public Result fsck(String path) throws Exception {
DFSFileInfo[] files = dfs.listPaths(new UTF8(path));
Result res = new Result();
res.setReplication(dfs.getDefaultReplication());
for (int i = 0; i < files.length; i++) {
check(files[i], res);
}
return res;
}
private void check(DFSFileInfo file, Result res) throws Exception {
if (file.isDir()) {
if (showFiles)
System.out.println(file.getPath() + " <dir>");
res.totalDirs++;
DFSFileInfo[] files = dfs.listPaths(new UTF8(file.getPath()));
for (int i = 0; i < files.length; i++) {
check(files[i], res);
}
return;
}
res.totalFiles++;
res.totalSize += file.getLen();
LocatedBlock[] blocks = dfs.namenode.open(file.getPath());
res.totalBlocks += blocks.length;
if (showFiles) {
System.out.print(file.getPath() + " " + file.getLen() + ", " + blocks.length + " block(s): ");
} else {
System.out.print('.');
System.out.flush();
if (res.totalFiles % 100 == 0) System.out.println();
}
int missing = 0;
long missize = 0;
StringBuffer report = new StringBuffer();
for (int i = 0; i < blocks.length; i++) {
Block block = blocks[i].getBlock();
long id = block.getBlockId();
DatanodeInfo[] locs = blocks[i].getLocations();
short targetFileReplication = file.getReplication();
if (locs.length > targetFileReplication) res.overReplicatedBlocks += (locs.length - targetFileReplication);
if (locs.length < targetFileReplication && locs.length > 0) res.underReplicatedBlocks += (targetFileReplication - locs.length);
report.append(i + ". " + id + " len=" + block.getNumBytes());
if (locs == null || locs.length == 0) {
report.append(" MISSING!");
res.addMissing(block.getBlockName(), block.getNumBytes());
missing++;
missize += block.getNumBytes();
} else {
report.append(" repl=" + locs.length);
if (showLocations) {
StringBuffer sb = new StringBuffer("[");
for (int j = 0; j < locs.length; j++) {
if (j > 0) sb.append(", ");
sb.append(locs[j]);
}
sb.append(']');
report.append(" " + sb.toString());
}
}
report.append('\n');
}
if (missing > 0) {
if (!showFiles)
System.out.println("\nMISSING " + missing + " blocks of total size " + missize + " B");
res.corruptFiles++;
switch (fixing) {
case FIXING_NONE: // do nothing
System.err.println("\n - ignoring corrupted " + file.getPath());
break;
case FIXING_MOVE:
System.err.println("\n - moving to /lost+found: " + file.getPath());
lostFoundMove(file, blocks);
break;
case FIXING_DELETE:
System.err.println("\n - deleting corrupted " + file.getPath());
dfs.delete(new UTF8(file.getPath()));
}
}
if (showFiles) {
if (missing > 0) {
System.out.println(" MISSING " + missing + " blocks of total size " + missize + " B");
} else System.out.println(" OK");
if (showBlocks) System.out.println(report.toString());
}
}
private void lostFoundMove(DFSFileInfo file, LocatedBlock[] blocks) {
if (!lfInited) {
lostFoundInit();
}
if (!lfInitedOk) {
return;
}
UTF8 target = new UTF8(lostFound.toString() + file.getPath());
String errmsg = "Failed to move " + file.getPath() + " to /lost+found";
try {
if (!dfs.mkdirs(target)) {
System.err.println(errmsg);
return;
}
// create chains
int chain = 0;
FSOutputStream fos = null;
for (int i = 0; i < blocks.length; i++) {
LocatedBlock lblock = blocks[i];
DatanodeInfo[] locs = lblock.getLocations();
if (locs == null || locs.length == 0) {
if (fos != null) {
fos.flush();
fos.close();
fos = null;
}
continue;
}
if (fos == null) {
fos = dfs.create(new UTF8(target.toString() + "/" + chain), true);
if (fos != null) chain++;
}
if (fos == null) {
System.err.println(errmsg + ": could not store chain " + chain);
// perhaps we should bail out here...
// return;
continue;
}
// copy the block. It's a pity it's not abstracted from DFSInputStream ...
try {
copyBlock(lblock, fos);
} catch (Exception e) {
e.printStackTrace();
// something went wrong copying this block...
System.err.println(" - could not copy block " + lblock.getBlock().getBlockName() + " to " + target);
fos.flush();
fos.close();
fos = null;
}
}
if (fos != null) fos.close();
System.err.println("\n - moved corrupted file " + file.getPath() + " to /lost+found");
dfs.delete(new UTF8(file.getPath()));
} catch (Exception e) {
e.printStackTrace();
System.err.println(errmsg + ": " + e.getMessage());
}
}
/*
* XXX (ab) Bulk of this method is copied verbatim from {@link DFSClient}, which is
* bad. Both places should be refactored to provide a method to copy blocks
* around.
*/
private void copyBlock(LocatedBlock lblock, FSOutputStream fos) throws Exception {
int failures = 0;
InetSocketAddress targetAddr = null;
TreeSet deadNodes = new TreeSet();
Socket s = null;
DataInputStream in = null;
DataOutputStream out = null;
while (s == null) {
DatanodeInfo chosenNode;
try {
chosenNode = bestNode(lblock.getLocations(), deadNodes);
targetAddr = DataNode.createSocketAddr(chosenNode.getName());
} catch (IOException ie) {
if (failures >= DFSClient.MAX_BLOCK_ACQUIRE_FAILURES) {
throw new IOException("Could not obtain block " + lblock);
}
LOG.info("Could not obtain block from any node: " + ie);
try {
Thread.sleep(10000);
} catch (InterruptedException iex) {
}
deadNodes.clear();
failures++;
continue;
}
try {
s = new Socket();
s.connect(targetAddr, FSConstants.READ_TIMEOUT);
s.setSoTimeout(FSConstants.READ_TIMEOUT);
//
// Xmit header info to datanode
//
out = new DataOutputStream(new BufferedOutputStream(s.getOutputStream()));
out.write(FSConstants.OP_READSKIP_BLOCK);
lblock.getBlock().write(out);
out.writeLong(0L);
out.flush();
//
// Get bytes in block, set streams
//
in = new DataInputStream(new BufferedInputStream(s.getInputStream()));
long curBlockSize = in.readLong();
long amtSkipped = in.readLong();
if (curBlockSize != lblock.getBlock().len) {
throw new IOException("Recorded block size is " + lblock.getBlock().len + ", but datanode reports size of " + curBlockSize);
}
if (amtSkipped != 0L) {
throw new IOException("Asked for offset of " + 0L + ", but only received offset of " + amtSkipped);
}
} catch (IOException ex) {
// Put chosen node into dead list, continue
LOG.info("Failed to connect to " + targetAddr + ":" + ex);
deadNodes.add(chosenNode);
if (s != null) {
try {
s.close();
} catch (IOException iex) {
}
}
s = null;
}
}
if (in == null) {
throw new Exception("Could not open data stream for " + lblock.getBlock().getBlockName());
}
byte[] buf = new byte[1024];
int cnt = 0;
boolean success = true;
try {
while ((cnt = in.read(buf)) > 0) {
fos.write(buf, 0, cnt);
}
} catch (Exception e) {
e.printStackTrace();
success = false;
} finally {
try {in.close(); } catch (Exception e1) {}
try {out.close(); } catch (Exception e1) {}
try {s.close(); } catch (Exception e1) {}
}
if (!success)
throw new Exception("Could not copy block data for " + lblock.getBlock().getBlockName());
}
/*
* XXX (ab) See comment above for copyBlock().
*
* Pick the best node from which to stream the data.
* That's the local one, if available.
*/
Random r = new Random();
private DatanodeInfo bestNode(DatanodeInfo nodes[], TreeSet deadNodes) throws IOException {
if ((nodes == null) ||
(nodes.length - deadNodes.size() < 1)) {
throw new IOException("No live nodes contain current block");
}
DatanodeInfo chosenNode = null;
for (int i = 0; i < nodes.length; i++) {
if (deadNodes.contains(nodes[i])) {
continue;
}
String nodename = nodes[i].getName();
int colon = nodename.indexOf(':');
if (colon >= 0) {
nodename = nodename.substring(0, colon);
}
if (dfs.localName.equals(nodename)) {
chosenNode = nodes[i];
break;
}
}
if (chosenNode == null) {
do {
chosenNode = nodes[Math.abs(r.nextInt()) % nodes.length];
} while (deadNodes.contains(chosenNode));
}
return chosenNode;
}
private void lostFoundInit() {
lfInited = true;
try {
UTF8 lfName = new UTF8("/lost+found");
// check that /lost+found exists
if (!dfs.exists(lfName)) {
lfInitedOk = dfs.mkdirs(lfName);
lostFound = lfName;
} else if (!dfs.isDirectory(lfName)) {
System.err.println("Cannot use /lost+found : a regular file with this name exists.");
lfInitedOk = false;
} else { // exists and isDirectory
lostFound = lfName;
lfInitedOk = true;
}
} catch (Exception e) {
e.printStackTrace();
lfInitedOk = false;
}
if (lostFound == null) {
System.err.println("Cannot initialize /lost+found .");
lfInitedOk = false;
}
}
/**
* @param args
*/
public int run(String[] args) throws Exception {
if (args.length == 0) {
System.err.println("Usage: DFSck <path> [-move | -delete] [-files] [-blocks [-locations]]");
System.err.println("\t<path>\tstart checking from this path");
System.err.println("\t-move\tmove corrupted files to /lost+found");
System.err.println("\t-delete\tdelete corrupted files");
System.err.println("\t-files\tprint out files being checked");
System.err.println("\t-blocks\tprint out block report");
System.err.println("\t-locations\tprint out locations for every block");
return -1;
}
String path = args[0];
boolean showFiles = false;
boolean showBlocks = false;
boolean showLocations = false;
int fixing = FIXING_NONE;
for (int i = 1; i < args.length; i++) {
if (args[i].equals("-files")) showFiles = true;
if (args[i].equals("-blocks")) showBlocks = true;
if (args[i].equals("-locations")) showLocations = true;
if (args[i].equals("-move")) fixing = FIXING_MOVE;
if (args[i].equals("-delete")) fixing = FIXING_DELETE;
}
init(fixing, showFiles, showBlocks, showLocations);
Result res = fsck(path);
System.out.println();
System.out.println(res);
if (res.isHealthy()) {
System.out.println("\n\nThe filesystem under path '" + args[0] + "' is HEALTHY");
} else {
System.out.println("\n\nThe filesystem under path '" + args[0] + "' is CORRUPT");
}
return 0;
}
public static void main(String[] args) throws Exception {
int res = new DFSck().doMain(new Configuration(), args);
System.exit(res);
}
/**
* Result of checking, plus overall DFS statistics.
* @author Andrzej Bialecki
*/
public static class Result {
private ArrayList missingIds = new ArrayList();
private long missingSize = 0L;
private long corruptFiles = 0L;
private long overReplicatedBlocks = 0L;
private long underReplicatedBlocks = 0L;
private int replication = 0;
private long totalBlocks = 0L;
private long totalFiles = 0L;
private long totalDirs = 0L;
private long totalSize = 0L;
/**
* DFS is considered healthy if there are no missing blocks.
* @return
*/
public boolean isHealthy() {
return missingIds.size() == 0;
}
/** Add a missing block name, plus its size. */
public void addMissing(String id, long size) {
missingIds.add(id);
missingSize += size;
}
/** Return a list of missing block names (as list of Strings). */
public ArrayList getMissingIds() {
return missingIds;
}
/** Return total size of missing data, in bytes. */
public long getMissingSize() {
return missingSize;
}
public void setMissingSize(long missingSize) {
this.missingSize = missingSize;
}
/** Return the number of over-replicsted blocks. */
public long getOverReplicatedBlocks() {
return overReplicatedBlocks;
}
public void setOverReplicatedBlocks(long overReplicatedBlocks) {
this.overReplicatedBlocks = overReplicatedBlocks;
}
/** Return the actual replication factor. */
public float getReplicationFactor() {
if (totalBlocks != 0)
return (float)(totalBlocks * replication + overReplicatedBlocks - underReplicatedBlocks) / (float)totalBlocks;
else return 0.0f;
}
/** Return the number of under-replicated blocks. Note: missing blocks are not counted here.*/
public long getUnderReplicatedBlocks() {
return underReplicatedBlocks;
}
public void setUnderReplicatedBlocks(long underReplicatedBlocks) {
this.underReplicatedBlocks = underReplicatedBlocks;
}
/** Return total number of directories encountered during this scan. */
public long getTotalDirs() {
return totalDirs;
}
public void setTotalDirs(long totalDirs) {
this.totalDirs = totalDirs;
}
/** Return total number of files encountered during this scan. */
public long getTotalFiles() {
return totalFiles;
}
public void setTotalFiles(long totalFiles) {
this.totalFiles = totalFiles;
}
/** Return total size of scanned data, in bytes. */
public long getTotalSize() {
return totalSize;
}
public void setTotalSize(long totalSize) {
this.totalSize = totalSize;
}
/** Return the intended replication factor, against which the over/under-
* replicated blocks are counted. Note: this values comes from the current
* Configuration supplied for the tool, so it may be different from the
* value in DFS Configuration.
*/
public int getReplication() {
return replication;
}
public void setReplication(int replication) {
this.replication = replication;
}
/** Return the total number of blocks in the scanned area. */
public long getTotalBlocks() {
return totalBlocks;
}
public void setTotalBlocks(long totalBlocks) {
this.totalBlocks = totalBlocks;
}
public String toString() {
StringBuffer res = new StringBuffer();
res.append("Status: " + (isHealthy() ? "HEALTHY" : "CORRUPT"));
res.append("\n Total size:\t" + totalSize + " B");
res.append("\n Total blocks:\t" + totalBlocks);
if (totalBlocks > 0) res.append(" (avg. block size "
+ (totalSize / totalBlocks) + " B)");
res.append("\n Total dirs:\t" + totalDirs);
res.append("\n Total files:\t" + totalFiles);
if (missingSize > 0) {
res.append("\n ********************************");
res.append("\n CORRUPT FILES:\t" + corruptFiles);
res.append("\n MISSING BLOCKS:\t" + missingIds.size());
res.append("\n MISSING SIZE:\t\t" + missingSize + " B");
res.append("\n ********************************");
}
res.append("\n Over-replicated blocks:\t" + overReplicatedBlocks);
if (totalBlocks > 0) res.append(" ("
+ ((float)(overReplicatedBlocks * 100) / (float)totalBlocks)
+ " %)");
res.append("\n Under-replicated blocks:\t" + underReplicatedBlocks);
if (totalBlocks > 0) res.append(" ("
+ ((float)(underReplicatedBlocks * 100) / (float)totalBlocks)
+ " %)");
res.append("\n Target replication factor:\t" + replication);
res.append("\n Real replication factor:\t" + getReplicationFactor());
return res.toString();
}
/** Return the number of currupted files. */
public long getCorruptFiles() {
return corruptFiles;
}
public void setCorruptFiles(long corruptFiles) {
this.corruptFiles = corruptFiles;
}
}
}