@Override
public Repo<Master> call(long tid, Master master) throws Exception {
// This needs to execute after the arbiter is stopped
VolumeManager fs = master.getFileSystem();
if (!fs.exists(new Path(error, BulkImport.FAILURES_TXT)))
return new CleanUpBulkImport(tableId, source, bulk, error);
HashMap<String,String> failures = new HashMap<String,String>();
HashMap<String,String> loadedFailures = new HashMap<String,String>();
FSDataInputStream failFile = fs.open(new Path(error, BulkImport.FAILURES_TXT));
BufferedReader in = new BufferedReader(new InputStreamReader(failFile, Constants.UTF8));
try {
String line = null;
while ((line = in.readLine()) != null) {
Path path = new Path(line);
if (!fs.exists(new Path(error, path.getName())))
failures.put("/" + path.getParent().getName() + "/" + path.getName(), line);
}
} finally {
failFile.close();
}
/*
* I thought I could move files that have no file references in the table. However its possible a clone references a file. Therefore only move files that
* have no loaded markers.
*/
// determine which failed files were loaded
Connector conn = master.getConnector();
Scanner mscanner = new IsolatedScanner(conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY));
mscanner.setRange(new KeyExtent(new Text(tableId), null, null).toMetadataRange());
mscanner.fetchColumnFamily(TabletsSection.BulkFileColumnFamily.NAME);
for (Entry<Key,Value> entry : mscanner) {
if (Long.parseLong(entry.getValue().toString()) == tid) {
String loadedFile = entry.getKey().getColumnQualifier().toString();
String absPath = failures.remove(loadedFile);
if (absPath != null) {
loadedFailures.put(loadedFile, absPath);
}
}
}
// move failed files that were not loaded
for (String failure : failures.values()) {
Path orig = new Path(failure);
Path dest = new Path(error, orig.getName());
fs.rename(orig, dest);
log.debug("tid " + tid + " renamed " + orig + " to " + dest + ": import failed");
}
if (loadedFailures.size() > 0) {
DistributedWorkQueue bifCopyQueue = new DistributedWorkQueue(Constants.ZROOT + "/" + HdfsZooInstance.getInstance().getInstanceID()
+ Constants.ZBULK_FAILED_COPYQ);
HashSet<String> workIds = new HashSet<String>();
for (String failure : loadedFailures.values()) {
Path orig = new Path(failure);
Path dest = new Path(error, orig.getName());
if (fs.exists(dest))
continue;
bifCopyQueue.addWork(orig.getName(), (failure + "," + dest).getBytes(Constants.UTF8));
workIds.add(orig.getName());
log.debug("tid " + tid + " added to copyq: " + orig + " to " + dest + ": failed");
}
bifCopyQueue.waitUntilDone(workIds);
}
fs.deleteRecursively(new Path(error, BulkImport.FAILURES_TXT));
return new CleanUpBulkImport(tableId, source, bulk, error);
}