package com.thinkaurelius.titan.diskstorage.cassandra;
import com.thinkaurelius.titan.CassandraStorageSetup;
import com.thinkaurelius.titan.core.TitanException;
import com.thinkaurelius.titan.diskstorage.StorageException;
import com.thinkaurelius.titan.diskstorage.TemporaryStorageException;
import com.thinkaurelius.titan.diskstorage.cassandra.utils.CassandraDaemonWrapper;
import com.thinkaurelius.titan.diskstorage.cassandra.thrift.thriftpool.CTConnection;
import com.thinkaurelius.titan.diskstorage.cassandra.thrift.thriftpool.CTConnectionFactory;
import com.thinkaurelius.titan.graphdb.configuration.GraphDatabaseConfiguration;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.Map;
import java.util.concurrent.*;
public class CassandraProcessStarter {
private Process cassandraProcess;
private Future<?> cassandraOutputLoggerFuture;
private CassandraOutputReader outputReader;
private Thread cassandraKiller;
private boolean delete = true;
private final ExecutorService cassandraOutputLogger =
Executors.newSingleThreadExecutor();
private boolean logCassandraOutput = true;
private final String address;
private final String cassandraConfigDir;
private final String cassandraDataDir;
private final String cassandraInclude;
private static final String cassandraCommand = "cassandra";
private static final int port =
AbstractCassandraStoreManager.PORT_DEFAULT;
private static final Logger log =
LoggerFactory.getLogger(CassandraProcessStarter.class);
private static final long CASSANDRA_STARTUP_TIMEOUT = 10000L;
private CassandraProcessStarter(String address) {
this.address = address;
cassandraDataDir = StringUtils.join(new String[]{"target",
"cassandra", "workdir", address}, File.separator);
cassandraConfigDir = StringUtils.join(new String[]{"target",
"cassandra", "conf", address}, File.separator);
cassandraInclude = cassandraConfigDir + File.separator
+ "cassandra.in.sh";
// We rely on maven to provide config files ahead of test execution
assert (new File(cassandraConfigDir).isDirectory());
assert (new File(cassandraInclude).isFile());
}
public CassandraProcessStarter() {
this("localhost-rp");
}
public CassandraProcessStarter setDelete(boolean delete) {
this.delete = delete;
return this;
}
public CassandraProcessStarter setLogging(boolean logging) {
logCassandraOutput = logging;
return this;
}
public boolean getLogging() {
return logCassandraOutput;
}
public void startCassandra() {
try {
final File cdd = new File(cassandraDataDir);
if (delete) {
if (cdd.isDirectory()) {
log.debug("Deleting dir {}...", cassandraDataDir);
FileUtils.deleteQuietly(new File(cassandraDataDir));
log.debug("Deleted dir {}", cassandraDataDir);
} else if (cdd.isFile()) {
log.debug("Deleting file {}...", cassandraDataDir);
cdd.delete();
log.debug("Deleted file {}", cassandraDataDir);
} else {
log.debug("Cassandra data directory {} does not exist; " +
"letting Cassandra startup script create it",
cassandraDataDir);
}
}
ProcessBuilder pb = new ProcessBuilder(cassandraCommand, "-f");
Map<String, String> env = pb.environment();
env.put("CASSANDRA_CONF", cassandraConfigDir);
env.put("CASSANDRA_INCLUDE", cassandraInclude);
pb.redirectErrorStream(true);
// Tail Cassandra
log.debug("Starting Cassandra process {}...",
StringUtils.join(pb.command(), ' '));
cassandraProcess = pb.start();
log.debug("Started Cassandra process {}.",
StringUtils.join(pb.command(), ' '));
// Register a Cassandra-killer shutdown hook
cassandraKiller = new CassandraKiller(cassandraProcess, address + ":" + port);
Runtime.getRuntime().addShutdownHook(cassandraKiller);
// Create Runnable to process Cassandra's stderr and stdout
log.debug("Starting Cassandra output handler task...");
outputReader = new CassandraOutputReader(cassandraProcess,
address, port, logCassandraOutput);
cassandraOutputLoggerFuture =
cassandraOutputLogger.submit(outputReader);
log.debug("Started Cassandra output handler task.");
// Block in a loop until connection to the Thrift port succeeds
long sleep = 0;
long connectAttemptStartTime = System.currentTimeMillis();
final long sleepGrowthIncrement = 100;
log.debug(
"Attempting to connect to Cassandra's Thrift service on {}:{}...",
address, port);
while (!Thread.currentThread().isInterrupted()) {
Socket s = new Socket();
s.setSoTimeout(50);
try {
s.connect(new InetSocketAddress(address, port));
long delay = System.currentTimeMillis() -
connectAttemptStartTime;
log.debug("Thrift connection to {}:{} succeeded " +
"(about {} ms after process start)",
new Object[]{address, port, delay});
break;
} catch (IOException e) {
sleep += sleepGrowthIncrement;
log.debug("Thrift connection failed; retrying in {} ms",
sleep);
Thread.sleep(sleep);
} finally {
if (s.isConnected()) {
s.close();
log.debug("Closed Thrift connection to {}:{}",
address, port);
}
}
}
/*
* Check that the Cassandra process logged that it
* successfully bound its Thrift port.
*
* This detects
*/
log.debug("Waiting for Cassandra process to log successful Thrift-port bind...");
if (!outputReader.awaitThrift(CASSANDRA_STARTUP_TIMEOUT, TimeUnit.MILLISECONDS)) {
String msg = "Cassandra process failed to bind Thrift-port within timeout.";
log.error(msg);
throw new TemporaryStorageException(msg);
}
log.debug("Cassandra process logged successful Thrift-port bind.");
} catch (Exception e) {
e.printStackTrace();
throw new TitanException(e);
}
}
public void stopCassandra() {
try {
if (null != cassandraOutputLoggerFuture) {
cassandraOutputLoggerFuture.cancel(true);
try {
cassandraOutputLoggerFuture.get();
} catch (CancellationException e) {
}
cassandraOutputLoggerFuture = null;
}
if (null != cassandraProcess) {
cassandraProcess.destroy();
cassandraProcess.waitFor();
cassandraProcess = null;
}
if (null != cassandraKiller) {
Runtime.getRuntime().removeShutdownHook(cassandraKiller);
cassandraKiller = null;
}
} catch (Exception e) {
e.printStackTrace();
throw new TitanException(e);
}
}
public void waitForClusterSize(int minSize) throws InterruptedException, StorageException {
CTConnectionFactory f = new CTConnectionFactory(new String[]{ address }, port,
null, null, // username, password
GraphDatabaseConfiguration.CONNECTION_TIMEOUT_DEFAULT,
AbstractCassandraStoreManager.THRIFT_DEFAULT_FRAME_SIZE);
CTConnection conn = null;
try {
conn = f.makeRawConnection();
CTConnectionFactory.waitForClusterSize(conn.getClient(), minSize);
} catch (TTransportException e) {
throw new TemporaryStorageException(e);
} finally {
if (null != conn)
if (conn.getTransport().isOpen())
conn.getTransport().close();
}
}
public static synchronized void startCleanEmbedded(String cassandraYamlPath) {
if (CassandraDaemonWrapper.isStarted()) {
log.debug("Already started embedded cassandra; subsequent attempts to start do nothing");
return;
}
try {
FileUtils.deleteDirectory(new File(CassandraStorageSetup.DATA_PATH));
} catch (IOException e) {
throw new RuntimeException(e);
}
CassandraDaemonWrapper.start(cassandraYamlPath);
}
}
class CassandraOutputReader implements Runnable {
private final Process p;
private final boolean logAllCassandraOutput;
private final String address;
private final int port;
private final Logger log;
private static final String THRIFT_BOUND_LOGLINE =
"Listening for thrift clients...";
private final CountDownLatch thriftBound = new CountDownLatch(1);
CassandraOutputReader(Process p, String address, int port, boolean logAllCassandraOutput) {
this.p = p;
this.address = address;
this.port = port;
this.logAllCassandraOutput = logAllCassandraOutput;
log = LoggerFactory.getLogger("Cassandra:" + address.replace('.', '-') + ":" + port);
}
public boolean awaitThrift(long timeout, TimeUnit units) throws InterruptedException {
return thriftBound.await(timeout, units);
}
@Override
public void run() {
InputStream is = p.getInputStream();
String prefix = null;
prefix = "[" + address + ":" + port + "]";
try {
BufferedReader br = new BufferedReader(
new InputStreamReader(is));
while (!Thread.currentThread().isInterrupted()) {
String l = br.readLine();
if (null == l)
break;
if (l.endsWith(THRIFT_BOUND_LOGLINE))
thriftBound.countDown();
if (logAllCassandraOutput)
log.debug("{} {}", prefix, l);
}
log.debug("Shutdown by interrupt");
} catch (IOException e) {
log.debug("IOException: {}", e.getMessage());
} catch (Exception e) {
log.debug("Terminated by Exception", e);
}
}
}
class CassandraKiller extends Thread {
private final Process cassandra;
private final String cassandraDesc;
CassandraKiller(Process cassandra, String cassandraDesc) {
this.cassandra = cassandra;
this.cassandraDesc = cassandraDesc;
}
@Override
public void run() {
System.err.println("Terminating Cassandra " + cassandraDesc + "...");
cassandra.destroy();
}
}