/*
* Copyright 2002-2011 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.springframework.amqp.rabbit.admin;
import java.io.File;
import java.io.FilenameFilter;
import java.io.UnsupportedEncodingException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.erlang.OtpAuthException;
import org.springframework.erlang.OtpException;
import org.springframework.erlang.connection.ConnectionFactory;
import org.springframework.erlang.connection.SimpleConnectionFactory;
import org.springframework.erlang.core.Application;
import org.springframework.erlang.core.ErlangTemplate;
import org.springframework.erlang.core.Node;
import org.springframework.jmx.export.annotation.ManagedOperation;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import org.springframework.util.exec.Execute;
import org.springframework.util.exec.Os;
/**
* Rabbit broker administration. Features:
*
* <ul>
* <li>Basic AMQP admin commands are provided, like declaring queues, exchanges and bindings.</li>
* <li>Manage user accounts and virtual hosts.</li>
* <li>Start and stop the broker process.</li>
* <li>Start and stop the broker application (in a running process).</li>
* <li>Inspect and manage the queues, e.g. listing message counts etc.</li>
* </ul>
*
* Depending on your platform, to {@link #startNode() start the broker} you might need to set some environment
* properties. The most common are available via constructors or setters in this class (e.g.
* {@link #setRabbitLogBaseDirectory(String) RABBITMQ_LOG_BASE}). All others you can set via the OS (any setting that
* RabbtMQ allows in its startup script), and some work via System properties as special convenience cases (
* <code>ERLANG_HOME</code> and <code>RABBITMQ_HOME</code> ).
*
* @author Mark Pollack
* @author Dave Syer
* @author Helena Edelson
*/
public class RabbitBrokerAdmin implements RabbitBrokerOperations {
private static final String DEFAULT_VHOST = "/";
private static String DEFAULT_NODE_NAME;
private static final int DEFAULT_PORT = 5672;
private static final String DEFAULT_ENCODING = "UTF-8";
/** Logger available to subclasses */
protected final Log logger = LogFactory.getLog(getClass());
private ErlangTemplate erlangTemplate;
private String encoding = DEFAULT_ENCODING;
private long timeout = 0;
private AsyncTaskExecutor executor;
private final String nodeName;
private final String cookie;
private final int port;
private String rabbitLogBaseDirectory;
private String rabbitMnesiaBaseDirectory;
private Map<String, String> moduleAdapter = new HashMap<String, String>();
static {
try {
String hostName = InetAddress.getLocalHost().getHostName();
DEFAULT_NODE_NAME = "rabbit@" + hostName;
} catch (UnknownHostException e) {
DEFAULT_NODE_NAME = "rabbit@localhost";
}
}
public RabbitBrokerAdmin() {
this(DEFAULT_NODE_NAME);
}
/**
* Create an instance by supplying the erlang node name (e.g. "rabbit@myserver"), or simply the hostname (if the
* alive name is "rabbit").
*
* @param nodeName the node name or hostname to use
*/
public RabbitBrokerAdmin(String nodeName) {
this(nodeName, null);
}
/**
* Create an instance by supplying the erlang node name and cookie (unique string).
*
* @param nodeName the node name or hostname to use
*
* @param cookie the cookie value to use
*/
public RabbitBrokerAdmin(String nodeName, String cookie) {
this(nodeName, DEFAULT_PORT, cookie);
}
/**
* Create an instance by supplying the erlang node name and port number. Use this on a UN*X system if you want to
* run the broker as a user without root privileges, supplying values that do not clash with the default broker
* (usually "rabbit@<servername>" and 5672). If, as well as managing an existing broker, you need to start the
* broker process, you will also need to set {@link #setRabbitLogBaseDirectory(String) RABBITMQ_LOG_BASE} and
* {@link #setRabbitMnesiaBaseDirectory(String) RABBITMQ_MNESIA_BASE} to point to writable directories).
*
* @param nodeName the node name or hostname to use
* @param port the port number (overriding the default which is 5672)
*/
public RabbitBrokerAdmin(String nodeName, int port) {
this(nodeName, port, null);
}
/**
* Create an instance by supplying the erlang node name, port number and cookie (unique string). If the node name
* does not contain an <code>@</code> character, it will be prepended with an alivename <code>rabbit@</code>
* (interpreting the supplied value as just the hostname).
*
* @param nodeName the node name or hostname to use
* @param port the port number (overriding the default which is 5672)
* @param cookie the cookie value to use
*/
public RabbitBrokerAdmin(String nodeName, int port, String cookie) {
if (!nodeName.contains("@")) {
nodeName = "rabbit@" + nodeName; // it was just the host
}
String[] parts = nodeName.split("@");
Assert.state(parts.length == 2, "The node name should be in the form alivename@host, e.g. rabbit@myserver");
if (Os.isFamily("windows") && !DEFAULT_NODE_NAME.equals(nodeName)) {
nodeName = parts[0] + "@" + parts[1].toUpperCase();
}
this.port = port;
this.cookie = cookie;
this.nodeName = nodeName;
SimpleAsyncTaskExecutor executor = new SimpleAsyncTaskExecutor();
executor.setDaemon(true);
this.executor = executor;
}
/**
* An async task executor for launching background processing when starting or stopping the broker.
*
* @param executor the executor to set
*/
public void setExecutor(AsyncTaskExecutor executor) {
this.executor = executor;
}
/**
* The location of <code>RABBITMQ_LOG_BASE</code> to override the system default (which may be owned by another
* user). Only needed for launching the broker process. Can also be set as a system property.
*
* @param rabbitLogBaseDirectory the rabbit log base directory to set
*/
public void setRabbitLogBaseDirectory(String rabbitLogBaseDirectory) {
this.rabbitLogBaseDirectory = rabbitLogBaseDirectory;
}
/**
* The location of <code>RABBITMQ_MNESIA_BASE</code> to override the system default (which may be owned by another
* user). Only needed for launching the broker process. Can also be set as a system property.
*
* @param rabbitMnesiaBaseDirectory the rabbit Mnesia base directory to set
*/
public void setRabbitMnesiaBaseDirectory(String rabbitMnesiaBaseDirectory) {
this.rabbitMnesiaBaseDirectory = rabbitMnesiaBaseDirectory;
}
/**
* The encoding to use for converting host names to byte arrays (which is needed on the remote side).
* @param encoding the encoding to use (default UTF-8)
*/
public void setEncoding(String encoding) {
this.encoding = encoding;
}
/**
* Timeout (milliseconds) to wait for the broker to come up. If the provided timeout is greater than zero then we
* wait for that period for the broker to be ready. If it is not ready after that time the process is stopped.
* Defaults to 0 (no wait).
*
* @param timeout the timeout value to set in milliseconds
*/
public void setStartupTimeout(long timeout) {
this.timeout = timeout;
}
/**
* Allows users to adapt Erlang RPC <code>(module, function)</code> pairs to older, or different, versions of the
* broker than the current target. The map is from String to String in the form
* <code>input_module%input_function -> output_module%output_function</code> (using a <code>%</code> separator).
*
* @param moduleAdapter the module adapter to set
*/
public void setModuleAdapter(Map<String, String> moduleAdapter) {
this.moduleAdapter = moduleAdapter;
}
@SuppressWarnings("unchecked")
public List<QueueInfo> getQueues() {
return (List<QueueInfo>) executeAndConvertRpc("rabbit_amqqueue", "info_all", getBytes(DEFAULT_VHOST));
}
@SuppressWarnings("unchecked")
public List<QueueInfo> getQueues(String virtualHost) {
return (List<QueueInfo>) executeAndConvertRpc("rabbit_amqqueue", "info_all", getBytes(virtualHost));
}
// User management
@ManagedOperation()
public void addUser(String username, String password) {
executeAndConvertRpc("rabbit_auth_backend_internal", "add_user", getBytes(username), getBytes(password));
}
@ManagedOperation
public void deleteUser(String username) {
executeAndConvertRpc("rabbit_auth_backend_internal", "delete_user", getBytes(username));
}
@ManagedOperation
public void changeUserPassword(String username, String newPassword) {
executeAndConvertRpc("rabbit_auth_backend_internal", "change_password", getBytes(username),
getBytes(newPassword));
}
@SuppressWarnings("unchecked")
@ManagedOperation
public List<String> listUsers() {
return (List<String>) executeAndConvertRpc("rabbit_auth_backend_internal", "list_users");
}
public int addVhost(String vhostPath) {
// TODO Auto-generated method stub
return 0;
}
public int deleteVhost(String vhostPath) {
// TODO Auto-generated method stub
return 0;
}
public void setPermissions(String username, Pattern configure, Pattern read, Pattern write) {
// TODO Auto-generated method stub
}
public void setPermissions(String username, Pattern configure, Pattern read, Pattern write, String vhostPath) {
// TODO Auto-generated method stub
}
public void clearPermissions(String username) {
// TODO Auto-generated method stub
}
public void clearPermissions(String username, String vhostPath) {
// TODO Auto-generated method stub
}
public List<String> listPermissions() {
// TODO Auto-generated method stub
return null;
}
public List<String> listPermissions(String vhostPath) {
// TODO Auto-generated method stub
return null;
}
public List<String> listUserPermissions(String username) {
// TODO Auto-generated method stub
return null;
}
@ManagedOperation
public void startBrokerApplication() {
RabbitStatus status = getStatus();
if (status.isReady()) {
logger.info("Rabbit Application already running.");
return;
}
if (!status.isAlive()) {
logger.info("Rabbit Process not running.");
startNode();
return;
}
logger.info("Starting Rabbit Application.");
// This call in particular seems to be prone to hanging, so do it in the background...
final CountDownLatch latch = new CountDownLatch(1);
Future<Object> result = executor.submit(new Callable<Object>() {
public Object call() throws Exception {
try {
return executeAndConvertRpc("rabbit", "start");
} finally {
latch.countDown();
}
}
});
boolean started = false;
try {
started = latch.await(timeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
result.cancel(true);
return;
}
if (timeout > 0 && started) {
if (!waitForReadyState() && !result.isDone()) {
result.cancel(true);
}
}
}
@ManagedOperation
public void stopBrokerApplication() {
logger.info("Stopping Rabbit Application.");
executeAndConvertRpc("rabbit", "stop");
if (timeout > 0) {
waitForUnreadyState();
}
}
@ManagedOperation
public void startNode() {
RabbitStatus status = getStatus();
if (status.isAlive()) {
logger.info("Rabbit Process already running.");
startBrokerApplication();
return;
}
if (!status.isRunning() && status.isReady()) {
logger.info("Rabbit Process not running but status is ready. Restarting.");
stopNode();
}
logger.info("Starting RabbitMQ node by shelling out command line.");
final Execute execute = new Execute();
String rabbitStartScript = null;
String hint = "";
if (Os.isFamily("windows") || Os.isFamily("dos")) {
rabbitStartScript = "sbin/rabbitmq-server.bat";
} else if (Os.isFamily("unix") || Os.isFamily("mac")) {
rabbitStartScript = "bin/rabbitmq-server";
hint = "Depending on your platform it might help to set RABBITMQ_LOG_BASE and RABBITMQ_MNESIA_BASE System properties to an empty directory.";
}
Assert.notNull(rabbitStartScript, "unsupported OS family");
String rabbitHome = System.getProperty("RABBITMQ_HOME", System.getenv("RABBITMQ_HOME"));
if (rabbitHome == null) {
if (Os.isFamily("windows") || Os.isFamily("dos")) {
rabbitHome = findDirectoryName("c:/Program Files", "rabbitmq");
} else if (Os.isFamily("unix") || Os.isFamily("mac")) {
rabbitHome = "/usr/lib/rabbitmq";
}
}
Assert.notNull(rabbitHome, "RABBITMQ_HOME system property (or environment variable) not set.");
rabbitHome = StringUtils.cleanPath(rabbitHome);
String rabbitStartCommand = rabbitHome + "/" + rabbitStartScript;
String[] commandline = new String[] { rabbitStartCommand };
List<String> env = new ArrayList<String>();
if (rabbitLogBaseDirectory != null) {
env.add("RABBITMQ_LOG_BASE=" + rabbitLogBaseDirectory);
} else {
addEnvironment(env, "RABBITMQ_LOG_BASE");
}
if (rabbitMnesiaBaseDirectory != null) {
env.add("RABBITMQ_MNESIA_BASE=" + rabbitMnesiaBaseDirectory);
} else {
addEnvironment(env, "RABBITMQ_MNESIA_BASE");
}
addEnvironment(env, "ERLANG_HOME");
// Make the nodename explicitly the same so the erl process knows who we are
env.add("RABBITMQ_NODENAME=" + nodeName);
// Set the port number for the new process
env.add("RABBITMQ_NODE_PORT=" + port);
// Ask for a detached erl process so stdout doesn't get diverted to a black hole when the JVM dies (without this
// you can start the Rabbit broker form Java but if you forget to stop it, the erl process is hosed).
env.add("RABBITMQ_SERVER_ERL_ARGS=-detached");
execute.setCommandline(commandline);
execute.setEnvironment(env.toArray(new String[0]));
final CountDownLatch running = new CountDownLatch(1);
final AtomicBoolean finished = new AtomicBoolean(false);
final String errorHint = hint;
executor.execute(new Runnable() {
public void run() {
try {
running.countDown();
int exit = execute.execute();
finished.set(true);
logger.info("Finished broker launcher process with exit code=" + exit);
if (exit != 0) {
throw new IllegalStateException("Could not start process." + errorHint);
}
} catch (Exception e) {
logger.error("Failed to start node", e);
}
}
});
try {
logger.info("Waiting for Rabbit process to be started");
Assert.state(running.await(1000L, TimeUnit.MILLISECONDS),
"Timed out waiting for thread to start Rabbit process.");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
if (finished.get()) {
// throw new
// IllegalStateException("Expected broker process to start in background, but it has exited early.");
}
if (timeout > 0) {
waitForReadyState();
}
}
private boolean waitForReadyState() {
return waitForState(new StatusCallback() {
public boolean get(RabbitStatus status) {
return status.isReady();
}
}, "ready");
}
private boolean waitForUnreadyState() {
return waitForState(new StatusCallback() {
public boolean get(RabbitStatus status) {
return !status.isRunning();
}
}, "unready");
}
private boolean waitForStoppedState() {
return waitForState(new StatusCallback() {
public boolean get(RabbitStatus status) {
return !status.isReady() && !status.isRunning();
}
}, "stopped");
}
private boolean waitForState(final StatusCallback callable, String state) {
if (timeout <= 0) {
return true;
}
RabbitStatus status = getStatus();
if (!callable.get(status)) {
logger.info("Waiting for broker to enter state: " + state);
Future<RabbitStatus> started = executor.submit(new Callable<RabbitStatus>() {
public RabbitStatus call() throws Exception {
RabbitStatus status = getStatus();
while (!callable.get(status)) {
// Any less than 1000L and we tend to clog up the socket?
Thread.sleep(500L);
status = getStatus();
}
return status;
}
});
try {
status = started.get(timeout, TimeUnit.MILLISECONDS);
// This seems to help... really it just means we didn't get the right status data
Thread.sleep(500L);
} catch (TimeoutException e) {
started.cancel(true);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
logger.error("Exception checking broker status for " + state, e.getCause());
}
if (!callable.get(status)) {
logger.error("Rabbit broker not in " + state + " state after timeout. Stopping process.");
stopNode();
return false;
} else {
logger.info("Finished waiting for broker to enter state: " + state);
if (logger.isDebugEnabled()) {
logger.info("Status: " + status);
}
return true;
}
} else {
logger.info("Broker already in state: " + state);
}
return true;
}
/**
* Find a directory whose name starts with a substring in a given parent directory. If there is none return null,
* otherwise sort the results and return the best match (an exact match if there is one or the last one in a lexical
* sort).
*
* @param parent
* @param child
* @return the full name of a directory
*/
private String findDirectoryName(String parent, String child) {
String result = null;
String[] names = new File(parent).list(new FilenameFilter() {
public boolean accept(File dir, String name) {
return name.equals("rabbitmq") && new File(dir, name).isDirectory();
}
});
if (names.length == 1) {
result = new File(parent, names[0]).getAbsolutePath();
return result;
}
List<String> sorted = Arrays.asList(new File(parent).list(new FilenameFilter() {
public boolean accept(File dir, String name) {
return name.startsWith("rabbitmq") && new File(dir, name).isDirectory();
}
}));
Collections.sort(sorted, Collections.reverseOrder());
if (!sorted.isEmpty()) {
result = new File(parent, sorted.get(0)).getAbsolutePath();
}
return result;
}
private void addEnvironment(List<String> env, String key) {
String value = System.getProperty(key);
if (value != null) {
logger.debug("Adding environment variable: " + key + "=" + value);
env.add(key + "=" + value);
}
}
@ManagedOperation
public void stopNode() {
logger.info("Stopping RabbitMQ node.");
try {
executeAndConvertRpc("rabbit", "stop_and_halt");
} catch (Exception e) {
logger.error("Failed to send stop signal", e);
}
if (timeout >= 0) {
waitForStoppedState();
}
}
@ManagedOperation
public void resetNode() {
executeAndConvertRpc("rabbit_mnesia", "reset");
}
@ManagedOperation
public void forceResetNode() {
executeAndConvertRpc("rabbit_mnesia", "force_reset");
}
@ManagedOperation
public RabbitStatus getStatus() {
try {
return (RabbitStatus) executeAndConvertRpc("rabbit", "status");
} catch (OtpAuthException e) {
throw new RabbitAdminAuthException(
"Could not authorise connection to Erlang process. This can happen if the broker is running, "
+ "but as root or rabbitmq and the current user is not authorised to connect. Try starting the "
+ "broker again as a different user.", e);
} catch (OtpException e) {
logger.debug("Ignoring OtpException (assuming that the broker is simply not running)");
if (logger.isTraceEnabled()) {
logger.trace("Status not available owing to exception", e);
}
return new RabbitStatus(Collections.<Application> emptyList(), Collections.<Node> emptyList(),
Collections.<Node> emptyList());
}
}
protected void initializeDefaultErlangTemplate() {
String peerNodeName = nodeName;
logger.debug("Creating jinterface connection with peerNodeName = [" + peerNodeName + "]");
SimpleConnectionFactory otpConnectionFactory = new SimpleConnectionFactory("rabbit-spring-monitor",
peerNodeName, this.cookie);
otpConnectionFactory.afterPropertiesSet();
createErlangTemplate(otpConnectionFactory);
}
protected void createErlangTemplate(ConnectionFactory otpConnectionFactory) {
erlangTemplate = new ErlangTemplate(otpConnectionFactory);
erlangTemplate.setErlangConverter(new RabbitControlErlangConverter(moduleAdapter));
erlangTemplate.afterPropertiesSet();
}
/**
* Convenience method for lazy initialization of the {@link ErlangTemplate} and associated trimmings. All RPC calls
* should go through this method.
*
* @param <T> the type of the result
* @param module the module to address remotely
* @param function the function to call
* @param args the arguments to pass
*
* @return the result from the remote erl process converted to the correct type
*/
@SuppressWarnings("unchecked")
private <T> T executeAndConvertRpc(String module, String function, Object... args) {
if (erlangTemplate == null) {
synchronized (this) {
if (erlangTemplate == null) {
initializeDefaultErlangTemplate();
}
}
}
String key = module + "%" + function;
if (moduleAdapter.containsKey(key)) {
String adapter = moduleAdapter.get(key);
String[] values = adapter.split("%");
Assert.state(values.length == 2,
"The module adapter should be a map from 'module%function' to 'module%function'. "
+ "This one contained [" + adapter + "] which cannot be parsed to a module, function pair.");
module = values[0];
function = values[1];
}
return (T) erlangTemplate.executeAndConvertRpc(module, function, args);
}
/**
* Safely convert a string to its bytes using the encoding provided.
*
* @see #setEncoding(String)
*
* @param string the value to convert
*
* @return the bytes from the string using the encoding provided
*
* @throws IllegalStateException if the encoding is ont supported
*/
private byte[] getBytes(String string) {
try {
return string.getBytes(encoding);
} catch (UnsupportedEncodingException e) {
throw new IllegalStateException("Unsupported encoding: " + encoding);
}
}
private static interface StatusCallback {
boolean get(RabbitStatus status);
}
}