package org.apache.oozie.action.hadoop;
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.oozie.WorkflowActionBean;
import org.apache.oozie.WorkflowJobBean;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.WorkflowAction;
import org.apache.oozie.service.WorkflowAppService;
import org.apache.oozie.util.IOUtils;
import org.apache.oozie.util.XConfiguration;
public class TestDistCpActionExecutor extends ActionExecutorTestCase{
@Override
protected void setSystemProps() {
super.setSystemProps();
setSystemProperty("oozie.service.ActionService.executor.classes", DistcpActionExecutor.class.getName());
}
public void testSimpestdistCpSubmitOK() throws Exception {
String actionXml = "<distcp>" +
"<job-tracker>" + getJobTrackerUri() + "</job-tracker>" +
"<name-node>" + getNameNodeUri() + "</name-node>" +
"<arg>-Dmyqueue</arg>" +
"<arg>input</arg>"+
"<arg>output</arg>" +
"</distcp>";
Context context = createContext(actionXml);
final RunningJob runningJob = submitAction(context);
waitFor(60 * 1000, new Predicate() {
public boolean evaluate() throws Exception {
return runningJob.isComplete();
}
});
assertTrue(runningJob.isSuccessful());
}
protected Context createContext(String actionXml) throws Exception {
DistcpActionExecutor ae = new DistcpActionExecutor();
Path appJarPath = new Path("lib/test.jar");
File jarFile = IOUtils.createJar(new File(getTestCaseDir()), "test.jar", LauncherMainTester.class);
InputStream is = new FileInputStream(jarFile);
OutputStream os = getFileSystem().create(new Path(getAppPath(), "lib/test.jar"));
IOUtils.copyStream(is, os);
Path appSoPath = new Path("lib/test.so");
getFileSystem().create(new Path(getAppPath(), appSoPath)).close();
XConfiguration protoConf = new XConfiguration();
protoConf.set(WorkflowAppService.HADOOP_USER, getTestUser());
protoConf.set(WorkflowAppService.HADOOP_UGI, getTestUser() + "," + getTestGroup());
protoConf.set(OozieClient.GROUP_NAME, getTestGroup());
protoConf.setStrings(WorkflowAppService.APP_LIB_PATH_LIST, appJarPath.toString(), appSoPath.toString());
injectKerberosInfo(protoConf);
WorkflowJobBean wf = createBaseWorkflow(protoConf, "action");
WorkflowActionBean action = (WorkflowActionBean) wf.getActions().get(0);
action.setType(ae.getType());
action.setConf(actionXml);
return new Context(wf, action);
}
protected RunningJob submitAction(Context context) throws Exception {
DistcpActionExecutor ae = new DistcpActionExecutor();
WorkflowAction action = context.getAction();
ae.prepareActionDir(getFileSystem(), context);
ae.submitLauncher(context, action);
String jobId = action.getExternalId();
String jobTracker = action.getTrackerUri();
String consoleUrl = action.getConsoleUrl();
assertNotNull(jobId);
assertNotNull(jobTracker);
assertNotNull(consoleUrl);
JobConf jobConf = new JobConf();
jobConf.set("mapred.job.tracker", jobTracker);
injectKerberosInfo(jobConf);
JobClient jobClient = new JobClient(jobConf);
final RunningJob runningJob = jobClient.getJob(JobID.forName(jobId));
assertNotNull(runningJob);
return runningJob;
}
}