@Override
public Repo<Master> call(final long tid, final Master master) throws Exception {
ExecutorService executor = getThreadPool(master);
final SiteConfiguration conf = ServerConfiguration.getSiteConfiguration();
VolumeManager fs = master.getFileSystem();
List<FileStatus> files = new ArrayList<FileStatus>();
for (FileStatus entry : fs.listStatus(new Path(bulk))) {
files.add(entry);
}
log.debug("tid " + tid + " importing " + files.size() + " files");
Path writable = new Path(this.errorDir, ".iswritable");
if (!fs.createNewFile(writable)) {
// Maybe this is a re-try... clear the flag and try again
fs.delete(writable);
if (!fs.createNewFile(writable))
throw new ThriftTableOperationException(tableId, null, TableOperation.BULK_IMPORT, TableOperationExceptionType.BULK_BAD_ERROR_DIRECTORY,
"Unable to write to " + this.errorDir);
}
fs.delete(writable);
final Set<String> filesToLoad = Collections.synchronizedSet(new HashSet<String>());
for (FileStatus f : files)
filesToLoad.add(f.getPath().toString());
final int RETRIES = Math.max(1, conf.getCount(Property.MASTER_BULK_RETRIES));
for (int attempt = 0; attempt < RETRIES && filesToLoad.size() > 0; attempt++) {
List<Future<List<String>>> results = new ArrayList<Future<List<String>>>();
if (master.onlineTabletServers().size() == 0)
log.warn("There are no tablet server to process bulk import, waiting (tid = " + tid + ")");
while (master.onlineTabletServers().size() == 0) {
UtilWaitThread.sleep(500);
}
// Use the threadpool to assign files one-at-a-time to the server
final List<String> loaded = Collections.synchronizedList(new ArrayList<String>());
for (final String file : filesToLoad) {
results.add(executor.submit(new Callable<List<String>>() {
@Override
public List<String> call() {
List<String> failures = new ArrayList<String>();
ClientService.Client client = null;
String server = null;
try {
// get a connection to a random tablet server, do not prefer cached connections because
// this is running on the master and there are lots of connections to tablet servers
// serving the metadata tablets
long timeInMillis = master.getConfiguration().getConfiguration().getTimeInMillis(Property.MASTER_BULK_TIMEOUT);
Pair<String,Client> pair = ServerClient.getConnection(master.getInstance(), false, timeInMillis);
client = pair.getSecond();
server = pair.getFirst();
List<String> attempt = Collections.singletonList(file);
log.debug("Asking " + pair.getFirst() + " to bulk import " + file);
List<String> fail = client.bulkImportFiles(Tracer.traceInfo(), SystemCredentials.get().toThrift(master.getInstance()), tid, tableId, attempt,
errorDir, setTime);
if (fail.isEmpty()) {
loaded.add(file);
} else {
failures.addAll(fail);
}
} catch (Exception ex) {
log.error("rpc failed server:" + server + ", tid:" + tid + " " + ex);
} finally {
ServerClient.close(client);
}
return failures;
}
}));
}
Set<String> failures = new HashSet<String>();
for (Future<List<String>> f : results)
failures.addAll(f.get());
filesToLoad.removeAll(loaded);
if (filesToLoad.size() > 0) {
log.debug("tid " + tid + " attempt " + (attempt + 1) + " " + sampleList(filesToLoad, 10) + " failed");
UtilWaitThread.sleep(100);
}
}
FSDataOutputStream failFile = fs.create(new Path(errorDir, BulkImport.FAILURES_TXT), true);
BufferedWriter out = new BufferedWriter(new OutputStreamWriter(failFile, Constants.UTF8));
try {
for (String f : filesToLoad) {
out.write(f);
out.write("\n");