/*
* Copyright © 2014 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package co.cask.cdap.common.twill;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.guice.ConfigModule;
import co.cask.cdap.common.guice.LocationRuntimeModule;
import co.cask.cdap.common.guice.TwillModule;
import co.cask.cdap.common.guice.ZKClientModule;
import co.cask.cdap.common.runtime.DaemonMain;
import com.google.common.base.Charsets;
import com.google.common.io.Files;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.Service;
import com.google.inject.Guice;
import com.google.inject.Injector;
import org.apache.hadoop.conf.Configuration;
import org.apache.twill.api.ElectionHandler;
import org.apache.twill.api.TwillApplication;
import org.apache.twill.api.TwillController;
import org.apache.twill.api.TwillPreparer;
import org.apache.twill.api.TwillRunner;
import org.apache.twill.api.TwillRunnerService;
import org.apache.twill.api.logging.PrinterLogHandler;
import org.apache.twill.common.ServiceListenerAdapter;
import org.apache.twill.internal.zookeeper.LeaderElection;
import org.apache.twill.zookeeper.ZKClientService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.Writer;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Generic wrapper class to run twill applications.
*/
public abstract class TwillRunnerMain extends DaemonMain {
private static final Logger LOG = LoggerFactory.getLogger(TwillRunnerMain.class);
private static final long MAX_BACKOFF_TIME_MS = TimeUnit.MILLISECONDS.convert(10, TimeUnit.MINUTES);
private static final long SUCCESSFUL_RUN_DURATON_MS = TimeUnit.MILLISECONDS.convert(20, TimeUnit.MINUTES);
protected final CConfiguration cConf;
protected final Configuration hConf;
private final AtomicBoolean isLeader = new AtomicBoolean(false);
private Injector baseInjector;
private ZKClientService zkClientService;
private LeaderElection leaderElection;
private volatile TwillRunnerService twillRunnerService;
private volatile TwillController twillController;
private String serviceName;
private TwillApplication twillApplication;
private long lastRunTimeMs = System.currentTimeMillis();
private int currentRun = 0;
private boolean stopFlag = false;
protected TwillRunnerMain(CConfiguration cConf, Configuration hConf) {
this.cConf = cConf;
this.hConf = hConf;
}
protected abstract TwillApplication createTwillApplication();
protected abstract void scheduleSecureStoreUpdate(TwillRunner twillRunner);
protected TwillPreparer prepare(TwillPreparer preparer) {
return preparer;
}
@Override
public void init(String[] args) {
twillApplication = createTwillApplication();
if (twillApplication == null) {
throw new IllegalArgumentException("TwillApplication cannot be null");
}
serviceName = twillApplication.configure().getName();
baseInjector = Guice.createInjector(
new ConfigModule(cConf, hConf),
new ZKClientModule(),
new LocationRuntimeModule().getDistributedModules()
);
// Initialize ZK client
zkClientService = baseInjector.getInstance(ZKClientService.class);
}
@Override
public void start() {
zkClientService.startAndWait();
leaderElection = new LeaderElection(zkClientService, "/election/" + serviceName, new ElectionHandler() {
@Override
public void leader() {
LOG.info("Became leader.");
Injector injector = baseInjector.createChildInjector(new TwillModule());
twillRunnerService = injector.getInstance(TwillRunnerService.class);
twillRunnerService.startAndWait();
scheduleSecureStoreUpdate(twillRunnerService);
run();
isLeader.set(true);
}
@Override
public void follower() {
LOG.info("Became follower.");
if (twillRunnerService != null && twillRunnerService.isRunning()) {
twillRunnerService.stopAndWait();
}
isLeader.set(false);
}
});
leaderElection.start();
}
@Override
public void stop() {
LOG.info("Stopping {}", serviceName);
stopFlag = true;
if (isLeader.get() && twillController != null && twillController.isRunning()) {
twillController.stopAndWait();
}
leaderElection.stopAndWait();
zkClientService.stopAndWait();
}
@Override
public void destroy() {
LOG.info("Destroying {}", serviceName);
if (twillRunnerService != null && twillRunnerService.isRunning()) {
twillRunnerService.stopAndWait();
}
}
private void run() {
// If service is already running, return handle to that instance
Iterable<TwillController> twillControllers = lookupService();
Iterator<TwillController> iterator = twillControllers.iterator();
if (iterator.hasNext()) {
LOG.info("{} application is already running", serviceName);
twillController = iterator.next();
if (iterator.hasNext()) {
LOG.warn("Found more than one instance of {} running. Stopping the others...", serviceName);
for (; iterator.hasNext(); ) {
TwillController controller = iterator.next();
LOG.warn("Stopping one extra instance of {}", serviceName);
controller.stopAndWait();
}
LOG.warn("Done stopping extra instances of {}", serviceName);
}
} else {
LOG.info("Starting {} application", serviceName);
TwillPreparer twillPreparer = getPreparer();
twillController = twillPreparer.start();
twillController.addListener(new ServiceListenerAdapter() {
@Override
public void failed(Service.State from, Throwable failure) {
LOG.error("{} failed with exception... restarting with back-off.", serviceName, failure);
backOffRun();
}
@Override
public void terminated(Service.State from) {
LOG.warn("{} got terminated... restarting with back-off", serviceName);
backOffRun();
}
}, MoreExecutors.sameThreadExecutor());
}
}
private void backOffRun() {
if (stopFlag) {
LOG.warn("Not starting a new run when stopFlag is true");
return;
}
if (System.currentTimeMillis() - lastRunTimeMs > SUCCESSFUL_RUN_DURATON_MS) {
currentRun = 0;
}
try {
long sleepMs = Math.min(500 * (long) Math.pow(2, currentRun), MAX_BACKOFF_TIME_MS);
LOG.info("Current restart run = {}. Backing off for {} ms...", currentRun, sleepMs);
TimeUnit.MILLISECONDS.sleep(sleepMs);
} catch (InterruptedException e) {
LOG.warn("Got interrupted exception: ", e);
Thread.currentThread().interrupt();
}
run();
++currentRun;
lastRunTimeMs = System.currentTimeMillis();
}
protected File getSavedHConf() throws IOException {
File hConfFile = saveHConf(hConf, File.createTempFile("hConf", ".xml"));
hConfFile.deleteOnExit();
return hConfFile;
}
protected File getSavedCConf() throws IOException {
File cConfFile = saveCConf(cConf, File.createTempFile("cConf", ".xml"));
cConfFile.deleteOnExit();
return cConfFile;
}
private TwillPreparer getPreparer() {
return prepare(twillRunnerService.prepare(twillApplication)
.addLogHandler(new PrinterLogHandler(new PrintWriter(System.out)))
);
}
private static File saveHConf(Configuration conf, File file) throws IOException {
Writer writer = Files.newWriter(file, Charsets.UTF_8);
try {
conf.writeXml(writer);
} finally {
writer.close();
}
return file;
}
private File saveCConf(CConfiguration conf, File file) throws IOException {
Writer writer = Files.newWriter(file, Charsets.UTF_8);
try {
conf.writeXml(writer);
} finally {
writer.close();
}
return file;
}
/**
* Wait for sometime while looking up service in twill.
*/
private Iterable<TwillController> lookupService() {
int count = 100;
Iterable<TwillController> iterable = twillRunnerService.lookup(serviceName);
try {
for (int i = 0; i < count; ++i) {
if (iterable.iterator().hasNext()) {
return iterable;
}
TimeUnit.MILLISECONDS.sleep(20);
}
} catch (InterruptedException e) {
LOG.warn("Got interrupted exception: ", e);
Thread.currentThread().interrupt();
}
return iterable;
}
}