// accessing the JobTracker.
@Override
public void resourceOffers(SchedulerDriver schedulerDriver,
List<Offer> offers) {
// Before synchronizing, we pull all needed information from the JobTracker.
final HttpHost jobTrackerAddress = new HttpHost(jobTracker.getHostname(),
jobTracker.getTrackerPort());
final Collection<TaskTrackerStatus> taskTrackers = jobTracker.taskTrackers();
final List<JobInProgress> jobsInProgress = new ArrayList<JobInProgress>();
for (JobStatus status : jobTracker.jobsToComplete()) {
jobsInProgress.add(jobTracker.getJob(status.getJobID()));
}
synchronized (this) {
// Compute the number of pending maps and reduces.
int pendingMaps = 0;
int pendingReduces = 0;
for (JobInProgress progress : jobsInProgress) {
pendingMaps += progress.pendingMaps();
pendingReduces += progress.pendingReduces();
}
// Mark active (heartbeated) TaskTrackers and compute idle slots.
int idleMapSlots = 0;
int idleReduceSlots = 0;
for (TaskTrackerStatus status : taskTrackers) {
HttpHost host = new HttpHost(status.getHost(), status.getHttpPort());
if (mesosTrackers.containsKey(host)) {
mesosTrackers.get(host).active = true;
idleMapSlots += status.getAvailableMapSlots();
idleReduceSlots += status.getAvailableReduceSlots();
}
}
// Consider the TaskTrackers that have yet to become active as being idle,
// otherwise we will launch excessive TaskTrackers.
int inactiveMapSlots = 0;
int inactiveReduceSlots = 0;
for (MesosTracker tracker : mesosTrackers.values()) {
if (!tracker.active) {
inactiveMapSlots += tracker.mapSlots;
inactiveReduceSlots += tracker.reduceSlots;
}
}
// Compute how many slots we need to allocate.
int neededMapSlots = Math.max(0, pendingMaps - idleMapSlots);
int neededReduceSlots = Math.max(0, pendingReduces - idleReduceSlots);
LOG.info(join("\n", Arrays.asList(
"JobTracker Status",
" Pending Map Tasks: " + pendingMaps,
" Pending Reduce Tasks: " + pendingReduces,
" Idle Map Slots: " + idleMapSlots,
" Idle Reduce Slots: " + idleReduceSlots,
" Inactive Map Slots: " + inactiveMapSlots
+ " (launched but no hearbeat yet)",
" Inactive Reduce Slots: " + inactiveReduceSlots
+ " (launched but no hearbeat yet)",
" Needed Map Slots: " + neededMapSlots,
" Needed Reduce Slots: " + neededReduceSlots)));
// Launch TaskTrackers to satisfy the slot requirements.
// TODO(bmahler): Consider slotting intelligently.
// Ex: If more map slots are needed, but no reduce slots are needed,
// launch a map-only TaskTracker to better satisfy the slot needs.
for (Offer offer : offers) {
if (neededMapSlots <= 0 && neededReduceSlots <= 0) {
driver.declineOffer(offer.getId());
continue;
}
double cpus = -1.0;
double mem = -1.0;
double disk = -1.0;
Set<Integer> ports = new HashSet<Integer>(2);
// Pull out the cpus, memory, disk, and 2 ports from the offer.
for (Resource resource : offer.getResourcesList()) {
if (resource.getName().equals("cpus")
&& resource.getType() == Value.Type.SCALAR) {
cpus = resource.getScalar().getValue();
} else if (resource.getName().equals("mem")
&& resource.getType() == Value.Type.SCALAR) {
mem = resource.getScalar().getValue();
} else if (resource.getName().equals("disk")
&& resource.getType() == Value.Type.SCALAR) {
disk = resource.getScalar().getValue();
} else if (resource.getName().equals("ports")
&& resource.getType() == Value.Type.RANGES) {
for (Value.Range range : resource.getRanges().getRangeList()) {
if (ports.size() < 2)
ports.add((int) range.getBegin());
if (ports.size() < 2)
ports.add((int) range.getEnd());
}
}
}
int mapSlots = conf.getInt("mapred.tasktracker.map.tasks.maximum",
MAP_SLOTS_DEFAULT);
int reduceSlots = conf.getInt("mapred.tasktracker.reduce.tasks.maximum",
REDUCE_SLOTS_DEFAULT);
double slotCpus = conf.getFloat("mapred.mesos.slot.cpus",
(float) SLOT_CPUS_DEFAULT);
double slotDisk = conf.getInt("mapred.mesos.slot.disk",
SLOT_DISK_DEFAULT);
double slotMem = conf.getInt("mapred.mesos.slot.mem",
SLOT_JVM_HEAP_DEFAULT + JVM_MEM_OVERHEAD);
double slotJVMHeap = slotMem - JVM_MEM_OVERHEAD;
// Total resource requirements for the container (TaskTracker + map/red
// tasks).
double containerCpus = (mapSlots + reduceSlots) * slotCpus
+ TASKTRACKER_CPUS;
double containerMem = (mapSlots + reduceSlots) * slotMem
+ TASKTRACKER_MEM;
double containerDisk = (mapSlots + reduceSlots) * slotDisk;
if (containerCpus > cpus || containerMem > mem || containerDisk > disk
|| ports.size() < 2) {
LOG.info(join("\n", Arrays.asList(
"Declining offer with insufficient resources for a TaskTracker: ",
" cpus: offered " + cpus + " needed " + containerCpus,
" mem : offered " + mem + " needed " + containerMem,
" disk: offered " + disk + " needed " + containerDisk,
" ports: " + (ports.size() < 2
? " less than 2 offered"
: " at least 2 (sufficient)"),
offer.getResourcesList().toString())));
driver.declineOffer(offer.getId());
continue;
}
Integer[] portArray = ports.toArray(new Integer[2]);
HttpHost httpAddress = new HttpHost(offer.getHostname(), portArray[0]);
HttpHost reportAddress = new HttpHost(offer.getHostname(), portArray[1]);
TaskID taskId = TaskID.newBuilder()
.setValue("Task_Tracker_" + launchedTrackers++).build();
LOG.info("Launching task " + taskId.getValue() + " on "
+ httpAddress.toString());
// Add this tracker to Mesos tasks.
mesosTrackers.put(httpAddress, new MesosTracker(httpAddress, taskId,
mapSlots, reduceSlots));
// Create the environment depending on whether the executor is going to be
// run locally.
// TODO(vinod): Do not pass the mapred config options as environment
// variables.
Protos.Environment.Builder envBuilder = Protos.Environment
.newBuilder()
.addVariables(
Protos.Environment.Variable
.newBuilder()
.setName("mapred.job.tracker")
.setValue(jobTrackerAddress.getHostName() + ':'
+ jobTrackerAddress.getPort()))
.addVariables(
Protos.Environment.Variable
.newBuilder()
.setName("mapred.task.tracker.http.address")
.setValue(
httpAddress.getHostName() + ':' + httpAddress.getPort()))
.addVariables(
Protos.Environment.Variable
.newBuilder()
.setName("mapred.task.tracker.report.address")
.setValue(reportAddress.getHostName() + ':'
+ reportAddress.getPort()))
.addVariables(
Protos.Environment.Variable.newBuilder()
.setName("mapred.map.child.java.opts")
.setValue("-Xmx" + slotJVMHeap + "m"))
.addVariables(
Protos.Environment.Variable.newBuilder()
.setName("mapred.reduce.child.java.opts")
.setValue("-Xmx" + slotJVMHeap + "m"))
.addVariables(
Protos.Environment.Variable.newBuilder()
.setName("HADOOP_HEAPSIZE")
.setValue("" + TASKTRACKER_JVM_HEAP));
// Set java specific environment, appropriately.
Map<String, String> env = System.getenv();
if (env.containsKey("JAVA_HOME")) {
envBuilder.addVariables(Protos.Environment.Variable.newBuilder()
.setName("JAVA_HOME")
.setValue(env.get("JAVA_HOME")));
}
if (env.containsKey("JAVA_LIBRARY_PATH")) {
envBuilder.addVariables(Protos.Environment.Variable.newBuilder()
.setName("JAVA_LIBRARY_PATH")
.setValue(env.get("JAVA_LIBRARY_PATH")));
}
// Command info differs when performing a local run.
CommandInfo commandInfo = null;
String master = conf.get("mapred.mesos.master", "local");
if (master.equals("local")) {
try {
commandInfo = CommandInfo.newBuilder()
.setEnvironment(envBuilder)
.setValue(new File("bin/mesos-executor").getCanonicalPath())
.build();
} catch (IOException e) {
LOG.fatal("Failed to find Mesos executor ", e);
System.exit(1);
}
} else {
String uri = conf.get("mapred.mesos.executor");
commandInfo = CommandInfo.newBuilder()
.setEnvironment(envBuilder)
.setValue("cd hadoop && ./bin/mesos-executor")
.addUris(CommandInfo.URI.newBuilder().setValue(uri)).build();
}
TaskInfo info = TaskInfo
.newBuilder()
.setName(taskId.getValue())
.setTaskId(taskId)
.setSlaveId(offer.getSlaveId())
.addResources(
Resource
.newBuilder()
.setName("cpus")
.setType(Value.Type.SCALAR)
.setScalar(Value.Scalar.newBuilder().setValue(
(mapSlots + reduceSlots) * slotCpus)))
.addResources(
Resource
.newBuilder()
.setName("mem")
.setType(Value.Type.SCALAR)
.setScalar(Value.Scalar.newBuilder().setValue(
(mapSlots + reduceSlots) * slotMem)))
.addResources(
Resource
.newBuilder()
.setName("disk")
.setType(Value.Type.SCALAR)
.setScalar(Value.Scalar.newBuilder().setValue(
(mapSlots + reduceSlots) * slotDisk)))
.addResources(
Resource
.newBuilder()
.setName("ports")
.setType(Value.Type.RANGES)
.setRanges(
Value.Ranges
.newBuilder()
.addRange(Value.Range.newBuilder()
.setBegin(httpAddress.getPort())
.setEnd(httpAddress.getPort()))
.addRange(Value.Range.newBuilder()
.setBegin(reportAddress.getPort())
.setEnd(reportAddress.getPort()))))
.setExecutor(
ExecutorInfo
.newBuilder()
.setExecutorId(ExecutorID.newBuilder().setValue(
"executor_" + taskId.getValue()))