+ "specified Hadoop configuration path is wrong and the sytem is using the default Hadoop configuration values."
+ "The Stratosphere YARN client needs to store its files in a distributed file system");
}
// Create yarnClient
final YarnClient yarnClient = YarnClient.createYarnClient();
yarnClient.init(conf);
yarnClient.start();
// Query cluster for metrics
if(cmd.hasOption(QUERY.getOpt())) {
showClusterMetrics(yarnClient);
}
if(!cmd.hasOption(CONTAINER.getOpt())) {
LOG.fatal("Missing required argument "+CONTAINER.getOpt());
printUsage();
yarnClient.stop();
System.exit(1);
}
// TM Count
final int taskManagerCount = Integer.valueOf(cmd.getOptionValue(CONTAINER.getOpt()));
System.out.println("Using values:");
System.out.println("\tContainer Count = "+taskManagerCount);
System.out.println("\tJar Path = "+localJarPath.toUri().getPath());
System.out.println("\tConfiguration file = "+confPath.toUri().getPath());
System.out.println("\tJobManager memory = "+jmMemory);
System.out.println("\tTaskManager memory = "+tmMemory);
System.out.println("\tTaskManager cores = "+tmCores);
// Create application via yarnClient
YarnClientApplication app = yarnClient.createApplication();
GetNewApplicationResponse appResponse = app.getNewApplicationResponse();
Resource maxRes = appResponse.getMaximumResourceCapability();
if(tmMemory > maxRes.getMemory() || tmCores > maxRes.getVirtualCores()) {
LOG.fatal("The cluster does not have the requested resources for the TaskManagers available!\n"
+ "Maximum Memory: "+maxRes.getMemory() +", Maximum Cores: "+tmCores);
yarnClient.stop();
System.exit(1);
}
if(jmMemory > maxRes.getMemory() ) {
LOG.fatal("The cluster does not have the requested resources for the JobManager available!\n"
+ "Maximum Memory: "+maxRes.getMemory());
yarnClient.stop();
System.exit(1);
}
// respect custom JVM options in the YAML file
final String javaOpts = GlobalConfiguration.getString(ConfigConstants.STRATOSPHERE_JVM_OPTIONS, "");
// Set up the container launch context for the application master
ContainerLaunchContext amContainer = Records
.newRecord(ContainerLaunchContext.class);
String amCommand = "$JAVA_HOME/bin/java"
+ " -Xmx"+jmMemory+"M " +javaOpts;
if(hasLog4j) {
amCommand += " -Dlog.file=\""+ApplicationConstants.LOG_DIR_EXPANSION_VAR +"/jobmanager-log4j.log\" -Dlog4j.configuration=file:log4j.properties";
}
amCommand += " eu.stratosphere.yarn.ApplicationMaster" + " "
+ " 1>"
+ ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager-stdout.log"
+ " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager-stderr.log";
amContainer.setCommands(Collections.singletonList(amCommand));
System.err.println("amCommand="+amCommand);
// Set-up ApplicationSubmissionContext for the application
ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext();
final ApplicationId appId = appContext.getApplicationId();
// Setup jar for ApplicationMaster
LocalResource appMasterJar = Records.newRecord(LocalResource.class);
LocalResource stratosphereConf = Records.newRecord(LocalResource.class);
Path remotePathJar = Utils.setupLocalResource(conf, fs, appId.toString(), localJarPath, appMasterJar, fs.getHomeDirectory());
Path remotePathConf = Utils.setupLocalResource(conf, fs, appId.toString(), confPath, stratosphereConf, fs.getHomeDirectory());
Map<String, LocalResource> localResources = new HashMap<String, LocalResource>(2);
localResources.put("stratosphere.jar", appMasterJar);
localResources.put("stratosphere-conf.yaml", stratosphereConf);
// setup security tokens (code from apache storm)
final Path[] paths = new Path[3 + shipFiles.size()];
StringBuffer envShipFileList = new StringBuffer();
// upload ship files
for (int i = 0; i < shipFiles.size(); i++) {
File shipFile = shipFiles.get(i);
LocalResource shipResources = Records.newRecord(LocalResource.class);
Path shipLocalPath = new Path("file://" + shipFile.getAbsolutePath());
paths[3 + i] = Utils.setupLocalResource(conf, fs, appId.toString(),
shipLocalPath, shipResources, fs.getHomeDirectory());
localResources.put(shipFile.getName(), shipResources);
envShipFileList.append(paths[3 + i]);
if(i+1 < shipFiles.size()) {
envShipFileList.append(',');
}
}
paths[0] = remotePathJar;
paths[1] = remotePathConf;
paths[2] = new Path(fs.getHomeDirectory(), ".stratosphere/" + appId.toString() + "/");
FsPermission permission = new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL);
fs.setPermission(paths[2], permission); // set permission for path.
Utils.setTokensFor(amContainer, paths, this.conf);
amContainer.setLocalResources(localResources);
fs.close();
// Setup CLASSPATH for ApplicationMaster
Map<String, String> appMasterEnv = new HashMap<String, String>();
Utils.setupEnv(conf, appMasterEnv);
// set configuration values
appMasterEnv.put(Client.ENV_TM_COUNT, String.valueOf(taskManagerCount));
appMasterEnv.put(Client.ENV_TM_CORES, String.valueOf(tmCores));
appMasterEnv.put(Client.ENV_TM_MEMORY, String.valueOf(tmMemory));
appMasterEnv.put(Client.STRATOSPHERE_JAR_PATH, remotePathJar.toString() );
appMasterEnv.put(Client.ENV_APP_ID, appId.toString());
appMasterEnv.put(Client.ENV_CLIENT_HOME_DIR, fs.getHomeDirectory().toString());
appMasterEnv.put(Client.ENV_CLIENT_SHIP_FILES, envShipFileList.toString() );
appMasterEnv.put(Client.ENV_CLIENT_USERNAME, UserGroupInformation.getCurrentUser().getShortUserName());
amContainer.setEnvironment(appMasterEnv);
// Set up resource type requirements for ApplicationMaster
Resource capability = Records.newRecord(Resource.class);
capability.setMemory(jmMemory);
capability.setVirtualCores(1);
appContext.setApplicationName("Stratosphere"); // application name
appContext.setAMContainerSpec(amContainer);
appContext.setResource(capability);
appContext.setQueue(queue);
// file that we write into the conf/ dir containing the jobManager address.
final File addrFile = new File(confDirPath + CliFrontend.JOBMANAGER_ADDRESS_FILE);
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
try {
LOG.info("Killing the Stratosphere-YARN application.");
yarnClient.killApplication(appId);
LOG.info("Deleting files in "+paths[2]);
FileSystem shutFS = FileSystem.get(conf);
shutFS.delete(paths[2], true); // delete conf and jar file.
shutFS.close();
} catch (Exception e) {
LOG.warn("Exception while killing the YARN application", e);
}
try {
addrFile.delete();
} catch (Exception e) {
LOG.warn("Exception while deleting the jobmanager address file", e);
}
LOG.info("YARN Client is shutting down");
yarnClient.stop();
}
});
LOG.info("Submitting application master " + appId);
yarnClient.submitApplication(appContext);
ApplicationReport appReport = yarnClient.getApplicationReport(appId);
YarnApplicationState appState = appReport.getYarnApplicationState();
boolean told = false;
char[] el = { '/', '|', '\\', '-'};
int i = 0;
while (appState != YarnApplicationState.FINISHED
&& appState != YarnApplicationState.KILLED
&& appState != YarnApplicationState.FAILED) {
if(!told && appState == YarnApplicationState.RUNNING) {
System.err.println("Stratosphere JobManager is now running on "+appReport.getHost()+":"+jmPort);
System.err.println("JobManager Web Interface: "+appReport.getTrackingUrl());
// write jobmanager connect information
PrintWriter out = new PrintWriter(addrFile);
out.println(appReport.getHost()+":"+jmPort);
out.close();
addrFile.setReadable(true, false); // readable for all.
told = true;
}
if(!told) {
System.err.print(el[i++]+"\r");
if(i == el.length) {
i = 0;
}
Thread.sleep(500); // wait for the application to switch to RUNNING
} else {
Thread.sleep(5000);
}
appReport = yarnClient.getApplicationReport(appId);
appState = appReport.getYarnApplicationState();
}
LOG.info("Application " + appId + " finished with"
+ " state " + appState + " at " + appReport.getFinishTime());