Package azkaban.app

Source Code of azkaban.app.AzkabanApplication

/*
* Copyright 2010 LinkedIn, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package azkaban.app;


import java.io.File;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.List;
import java.util.TimeZone;

import org.apache.log4j.Logger;
import org.apache.velocity.app.VelocityEngine;
import org.apache.velocity.runtime.log.Log4JLogChute;
import org.apache.velocity.runtime.resource.loader.ClasspathResourceLoader;
import org.joda.time.DateTimeZone;

import azkaban.common.jobs.Job;
import azkaban.common.utils.Props;
import azkaban.common.utils.Utils;
import azkaban.flow.CachingFlowManager;
import azkaban.flow.FlowManager;
import azkaban.flow.RefreshableFlowManager;
import azkaban.jobcontrol.impl.jobs.locks.NamedPermitManager;
import azkaban.jobcontrol.impl.jobs.locks.ReadWriteLockManager;

import azkaban.jobs.JobExecutorManager;
import azkaban.jobs.builtin.JavaJob;
import azkaban.jobs.builtin.JavaProcessJob;
import azkaban.jobs.builtin.NoopJob;
import azkaban.jobs.builtin.PigProcessJob;
import azkaban.jobs.builtin.ProcessJob;
import azkaban.jobs.builtin.PythonJob;
import azkaban.jobs.builtin.RubyJob;
import azkaban.jobs.builtin.ScriptJob;
import azkaban.scheduler.LocalFileScheduleLoader;
import azkaban.scheduler.ScheduleManager;
import azkaban.serialization.DefaultExecutableFlowSerializer;
import azkaban.serialization.ExecutableFlowSerializer;
import azkaban.serialization.FlowExecutionSerializer;
import azkaban.serialization.de.DefaultExecutableFlowDeserializer;
import azkaban.serialization.de.ExecutableFlowDeserializer;
import azkaban.serialization.de.FlowExecutionDeserializer;
import com.google.common.collect.ImmutableMap;

/**
* Master application that runs everything
*
* This class will be loaded up either by running from the command line or via a
* servlet context listener.
*
* @author jkreps
*
*/
public class AzkabanApplication
{

    private static final Logger logger = Logger.getLogger(AzkabanApplication.class);
    private static final String INSTANCE_NAME = "instance.name";
    private static final String DEFAULT_TIMEZONE_ID = "default.timezone.id";
   
    private final String _instanceName;
    private final List<File> _jobDirs;
    private final File _logsDir;
    private final File _tempDir;
    private final VelocityEngine _velocityEngine;
    private final JobManager _jobManager;
    private final Mailman _mailer;
    private final ClassLoader _baseClassLoader;
    private final String _hdfsUrl;
    private final FlowManager _allFlows;
   
    private final JobExecutorManager _jobExecutorManager;
    private final ScheduleManager _schedulerManager;
   
    public AzkabanApplication(List<File> jobDirs, File logDir, File tempDir, boolean enableDevMode) throws IOException {
        this._jobDirs = Utils.nonNull(jobDirs);
        this._logsDir = Utils.nonNull(logDir);
        this._tempDir = Utils.nonNull(tempDir);

        if(!this._logsDir.exists())
            this._logsDir.mkdirs();

        if(!this._tempDir.exists())
            this._tempDir.mkdirs();

        for(File jobDir: _jobDirs) {
            if(!jobDir.exists()) {
                logger.warn("Job directory " + jobDir + " does not exist. Creating.");
                jobDir.mkdirs();
            }
        }

        if(jobDirs.size() < 1)
            throw new IllegalArgumentException("No job directory given.");

        Props defaultProps = PropsUtils.loadPropsInDirs(_jobDirs, ".properties", ".schema");

        _baseClassLoader = getBaseClassloader();

        String defaultTimezoneID = defaultProps.getString(DEFAULT_TIMEZONE_ID, null);
        if (defaultTimezoneID != null) {
          DateTimeZone.setDefault(DateTimeZone.forID(defaultTimezoneID));
          TimeZone.setDefault(TimeZone.getTimeZone(defaultTimezoneID));
        }
       
        NamedPermitManager permitManager = getNamedPermitManager(defaultProps);
        JobWrappingFactory factory = new JobWrappingFactory(
                permitManager,
                new ReadWriteLockManager(),
                _logsDir.getAbsolutePath(),
                "java",
                new ImmutableMap.Builder<String, Class<? extends Job>>()
                 .put("java", JavaJob.class)
                 .put("command", ProcessJob.class)
                 .put("javaprocess", JavaProcessJob.class)
                 .put("pig", PigProcessJob.class)
                 .put("propertyPusher", NoopJob.class)
                 .put("python", PythonJob.class)
                 .put("ruby", RubyJob.class)
                 .put("script", ScriptJob.class).build());

        _hdfsUrl = defaultProps.getString("hdfs.instance.url", null);
        _jobManager = new JobManager(factory,
                                     _logsDir.getAbsolutePath(),
                                     defaultProps,
                                     _jobDirs,
                                     _baseClassLoader);

        _mailer = new Mailman(defaultProps.getString("mail.host", "localhost"),
                              defaultProps.getString("mail.user", ""),
                              defaultProps.getString("mail.password", ""));

        String failureEmail = defaultProps.getString("job.failure.email", null);
        String successEmail = defaultProps.getString("job.success.email", null);
        int schedulerThreads = defaultProps.getInt("scheduler.threads", 50);
        _instanceName = defaultProps.getString(INSTANCE_NAME, "");
       
        final File initialJobDir = _jobDirs.get(0);
        File schedule = getScheduleFile(defaultProps, initialJobDir);
        File backup = getBackupFile(defaultProps, initialJobDir);
        File executionsStorageDir = new File(
                defaultProps.getString("azkaban.executions.storage.dir", initialJobDir.getAbsolutePath() + "/executions")
        );
        if (! executionsStorageDir.exists()) executionsStorageDir.mkdirs();
        long lastExecutionId = getLastExecutionId(executionsStorageDir);
        logger.info(String.format("Using path[%s] for storing executions.", executionsStorageDir));
        logger.info(String.format("Last known execution id was [%s]", lastExecutionId));

        final ExecutableFlowSerializer flowSerializer = new DefaultExecutableFlowSerializer();
        final ExecutableFlowDeserializer flowDeserializer = new DefaultExecutableFlowDeserializer(_jobManager, factory);

        FlowExecutionSerializer flowExecutionSerializer = new FlowExecutionSerializer(flowSerializer);
        FlowExecutionDeserializer flowExecutionDeserializer = new FlowExecutionDeserializer(flowDeserializer);

        _allFlows = new CachingFlowManager(
                new RefreshableFlowManager(
                        _jobManager,
                        flowExecutionSerializer,
                        flowExecutionDeserializer,
                        executionsStorageDir,
                        lastExecutionId
                ),
                defaultProps.getInt("azkaban.flow.cache.size", 1000)
        );
        _jobManager.setFlowManager(_allFlows);

        _jobExecutorManager = new JobExecutorManager(
                    _allFlows,
                    _jobManager,
                    _mailer,
                    failureEmail,
                    successEmail,
                    schedulerThreads
                  );
       
        this._schedulerManager = new ScheduleManager(_jobExecutorManager, new LocalFileScheduleLoader(schedule, backup));

        /* set predefined log url prefix
        */
        String server_url = defaultProps.getString("server.url", null) ;
        if (server_url != null) {
            if (server_url.endsWith("/"))
              _jobExecutorManager.setRuntimeProperty(AppCommon.DEFAULT_LOG_URL_PREFIX, server_url + "logs?file=" );
            else
              _jobExecutorManager.setRuntimeProperty(AppCommon.DEFAULT_LOG_URL_PREFIX, server_url + "/logs?file=" );
        }

        this._velocityEngine = configureVelocityEngine(enableDevMode);
    }

    private VelocityEngine configureVelocityEngine(boolean devMode) {
        VelocityEngine engine = new VelocityEngine();
        engine.setProperty("resource.loader", "classpath");
        engine.setProperty("classpath.resource.loader.class",
                           ClasspathResourceLoader.class.getName());
        engine.setProperty("classpath.resource.loader.cache", !devMode);
        engine.setProperty("classpath.resource.loader.modificationCheckInterval", 5L);
        engine.setProperty("resource.manager.logwhenfound", false);
        engine.setProperty("input.encoding", "UTF-8");
        engine.setProperty("output.encoding", "UTF-8");
        engine.setProperty("directive.foreach.counter.name", "idx");
        engine.setProperty("directive.foreach.counter.initial.value", 0);
        //engine.setProperty("runtime.references.strict", true);
        engine.setProperty("directive.set.null.allowed", true);
        engine.setProperty("resource.manager.logwhenfound", false);
        engine.setProperty("velocimacro.permissions.allow.inline", true);
        engine.setProperty("velocimacro.library.autoreload", devMode);
        engine.setProperty("velocimacro.library", "/azkaban/web/macros.vm");
        engine.setProperty("velocimacro.permissions.allow.inline.to.replace.global", true);
        engine.setProperty("velocimacro.context.localscope", true);
        engine.setProperty("velocimacro.arguments.strict", true);
        engine.setProperty("runtime.log.invalid.references", devMode);
        engine.setProperty("runtime.log.logsystem.class", Log4JLogChute.class);
        engine.setProperty("runtime.log.logsystem.log4j.logger",
                           Logger.getLogger("org.apache.velocity.Logger"));
        engine.setProperty("parser.pool.size", 3);
        return engine;
    }

    public String getLogDirectory() {
        return _logsDir.getAbsolutePath();
    }

    public String getTempDirectory() {
        return _tempDir.getAbsolutePath();
    }

    public List<File> getJobDirectories() {
        return _jobDirs;
    }

    public JobExecutorManager getJobExecutorManager() {
        return _jobExecutorManager;
    }
   
    public ScheduleManager getScheduleManager() {
        return _schedulerManager;
    }
   
    public VelocityEngine getVelocityEngine() {
        return _velocityEngine;
    }

    public JobManager getJobManager() {
        return _jobManager;
    }

    public String getHdfsUrl() {
        return this._hdfsUrl;
    }

    public boolean hasHdfsUrl() {
        return this._hdfsUrl != null;
    }

    public ClassLoader getClassLoader() {
        return _baseClassLoader;
    }

    public String getAppInstanceName() {
        return _instanceName;
    }
   
    public FlowManager getAllFlows()
    {
        return _allFlows;
    }

    private ClassLoader getBaseClassloader() throws MalformedURLException
    {
        final ClassLoader retVal;

        String hadoopHome = System.getenv("HADOOP_HOME");
        if(hadoopHome == null) {
            logger.info("HADOOP_HOME not set, using default hadoop config.");
            retVal = getClass().getClassLoader();
        } else {
            logger.info("Using hadoop config found in " + hadoopHome);
            retVal = new URLClassLoader(new URL[] { new File(hadoopHome, "conf").toURI().toURL() },
                                        getClass().getClassLoader());
        }

        return retVal;
    }

    private NamedPermitManager getNamedPermitManager(Props props) throws MalformedURLException
    {
        int workPermits = props.getInt("total.job.permits", Integer.MAX_VALUE);
        NamedPermitManager permitManager = new NamedPermitManager();
        permitManager.createNamedPermit("default", workPermits);

        return permitManager;
    }

    private File getBackupFile(Props defaultProps, File initialJobDir)
    {
        File retVal = new File(initialJobDir.getAbsoluteFile(), "jobs.schedule.backup");

        String backupFile = defaultProps.getString("schedule.backup.file", null);
        if(backupFile != null)
            retVal = new File(backupFile);
        else
            logger.info("Schedule backup file param not set. Defaulting to " + retVal.getAbsolutePath());

        return retVal;
    }

    private File getScheduleFile(Props defaultProps, File initialJobDir)
    {
        File retVal = new File(initialJobDir.getAbsoluteFile(), "jobs.schedule");

        String scheduleFile = defaultProps.getString("schedule.file", null);
        if(scheduleFile != null)
            retVal = new File(scheduleFile);
        else
            logger.info("Schedule file param not set. Defaulting to " + retVal.getAbsolutePath());

        return retVal;
    }

    private long getLastExecutionId(File executionsStorageDir)
    {
        long lastId = 0;

        for (File file : executionsStorageDir.listFiles()) {
            final String filename = file.getName();
            if (filename.endsWith(".json")) {
                try {
                    lastId = Math.max(
                            lastId,
                            Long.parseLong(filename.substring(0, filename.length() - 5))
                    );
                }
                catch (NumberFormatException e) {
                }
            }
        }

        return lastId;
    }

   
    public String getRuntimeProperty(String name) {
        return _jobExecutorManager.getRuntimeProperty(name);
    }

    public void setRuntimeProperty(String key, String value) {
      _jobExecutorManager.setRuntimeProperty(key, value);
    }
}
TOP

Related Classes of azkaban.app.AzkabanApplication

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.