package com.cloudhopper.mq.broker.server;
/*
* #%L
* ch-mq
* %%
* Copyright (C) 2012 Cloudhopper by Twitter
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import com.cloudhopper.commons.util.StringUtil;
import com.cloudhopper.commons.util.URL;
import com.cloudhopper.mq.broker.DistributedQueueManager;
import com.cloudhopper.mq.broker.protocol.ProtocolConstants;
import com.cloudhopper.mq.queue.Queue;
import com.cloudhopper.mq.queue.QueueManager;
import com.cloudhopper.mq.transcoder.Transcoder;
import java.io.InputStream;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.Enumeration;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Simple servlet providing an interface to a QueueManager.
*/
public class BrokerServlet extends HttpServlet {
private static final Logger logger = LoggerFactory.getLogger(BrokerServlet.class);
// reference to queue manager
private final QueueManager queueManager;
// reference to distributed queue manager
private final DistributedQueueManager dqm;
// the "put" handler for queue items
private BrokerServerPutHandler putHandler;
public BrokerServlet(QueueManager queueManager, DistributedQueueManager dqm) {
this.queueManager = queueManager;
this.dqm = dqm;
this.putHandler = new DefaultBrokerServerPutHandler();
}
public void setPutHandler(BrokerServerPutHandler putHandler) {
this.putHandler = putHandler;
}
@Override
@SuppressWarnings("unchecked")
public void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
try {
PrintWriter ps = response.getWriter();
if (request.getQueryString() == null) {
response.setStatus(HttpServletResponse.SC_BAD_REQUEST);
ps.print("A query property must exist");
return;
}
// the "group" query property is optional
String group = request.getParameter("group");
if (StringUtil.isEmpty(group)) {
// do nothing
} else {
// verify the group matches
if (!group.equalsIgnoreCase(dqm.getConfiguration().getGroupName())) {
response.setStatus(HttpServletResponse.SC_BAD_REQUEST);
ps.print("The group '" + group + "' in your request does not match the group '" + dqm.getConfiguration().getGroupName() + "' configured for this server");
return;
}
}
// the "cmd" query property MUST exist for every request
String cmd = request.getParameter("cmd");
if (StringUtil.isEmpty(cmd)) {
response.setStatus(HttpServletResponse.SC_BAD_REQUEST);
ps.print("The cmd query parameter must exist");
return;
}
// process commands...
if (cmd.equals("queues")) {
ps.println("<table cellspacing=0 cellpadding=1 border=1>");
ps.println("<tr><td>Queue Name</td><td>Size</td><td>Producers</td><td>Consumers</td><td>Last Producer Date</td><td>Last Consumer Date</td></tr>");
// print out a list of every queue
Enumeration<Queue> queues = queueManager.getQueues();
while (queues.hasMoreElements()) {
Queue queue = queues.nextElement();
ps.print("<tr><td>");
ps.print(queue.getName());
ps.print("</td><td>");
ps.print(queue.getSize());
ps.print("</td><td>");
ps.print(queue.getProducerCount());
ps.print("</td><td>");
ps.print(queue.getConsumerCount());
ps.print("</td><td>");
ps.print(queue.getLastProducerCountChangeTime());
ps.print("</td><td>");
ps.print(queue.getLastConsumerCountChangeTime());
ps.print("</td></tr>");
}
ps.println("</table>");
} else if (cmd.equals("monitor")) {
// is the optional "since" parameter included -- this is asking
// for updates only after this timestamp....
long since = -1;
String sinceString = request.getParameter("since");
if (!StringUtil.isEmpty(sinceString)) {
try {
since = Long.parseLong(sinceString);
} catch (Exception e) {
response.setStatus(HttpServletResponse.SC_BAD_REQUEST);
ps.print("Unable to convert since query parameter to a long value");
return;
}
}
// print out result code, version, areaId
ps.print(ProtocolConstants.KEY_RESULT_CODE + ":" + ProtocolConstants.RC_OK + ProtocolConstants.LINE_DELIMITER);
ps.print(ProtocolConstants.KEY_VERSION + ":" + ProtocolConstants.CURRENT_VERSION + ProtocolConstants.LINE_DELIMITER);
ps.print(ProtocolConstants.KEY_AREA_ID + ":" + dqm.getConfiguration().getAreaId() + ProtocolConstants.LINE_DELIMITER);
// print out a list of every queue
Enumeration<Queue> queues = queueManager.getQueues();
while (queues.hasMoreElements()) {
Queue queue = queues.nextElement();
// should we match the last timestamp for a consumer change?
if (since > 0) {
// if this queue hasn't been changed since the last time
if (queue.getLastConsumerCountChangeTime() < since) {
continue;
}
}
// is this a "local only" queue?
if (queue.isLocalOnly()) {
continue;
}
ps.print(ProtocolConstants.KEY_QUEUE);
ps.print(":");
ps.print(queue.getName());
ps.print(",");
ps.print(queue.getConsumerCount());
ps.print(ProtocolConstants.LINE_DELIMITER);
}
} else if (cmd.equals("transfer")) {
String queueName = request.getParameter("queue");
if (StringUtil.isEmpty(queueName)) {
response.setStatus(HttpServletResponse.SC_BAD_REQUEST);
ps.print("Parameter queue is required for a transfer request");
return;
}
byte[] entityContent = requestToByteArray(request);
if (entityContent == null || entityContent.length <= 0) {
response.setStatus(HttpServletResponse.SC_BAD_REQUEST);
ps.print("Probably not a POST request since body does not contain an entity");
return;
}
logger.trace("[{}] Received transfer request to queue with an item having a byteLength [{}]", queueName, entityContent.length);
// at this point, we'll be returning an actual response back
int result = 0;
String message = "OK";
// ahh, let's see if the queue exists...
if (!queueManager.hasQueue(queueName)) {
result = ProtocolConstants.RC_NO_QUEUE;
message = "The queue " + queueName + " was not found";
} else {
// get the queue -- we know it exists
Queue queue = queueManager.getQueue(queueName);
// are there still consumers?
if (queue.getConsumerCount() <= 0) {
result = ProtocolConstants.RC_NO_CONSUMER;
message = "No local consumers for queue " + queueName;
} else {
// get the transcoder
Transcoder tc = queue.getTranscoder();
try {
// transcode the item
Object item = item = tc.decode(entityContent);
logger.trace("[{}] Parsed item: {}", queueName, item);
try {
// if we get here, put the item ont the queue
// delegate the actual "putting" of the item
this.putHandler.put(queueManager, queue, item);
} catch (BrokerProtocolException e) {
logger.error("", e);
result = e.getErrorCode();
message = e.getMessage();
}
} catch (Throwable t) {
logger.error("", t);
result = ProtocolConstants.RC_TRANSCODING_FAILED;
message = "Unable to transcode byte array into an item";
}
}
}
if (result == ProtocolConstants.RC_OK) {
logger.trace("[{}] Item transfer successful", queueName);
} else {
logger.error("[{}] Item transfer failed to Queue with result [{}] message [{}]", new Object[] { queueName, result, message });
}
// print out result code, then message
ps.print(ProtocolConstants.KEY_RESULT_CODE + ":" + result + ProtocolConstants.LINE_DELIMITER);
ps.print(ProtocolConstants.KEY_MESSAGE + ":" + message + ProtocolConstants.LINE_DELIMITER);
} else {
response.setStatus(HttpServletResponse.SC_BAD_REQUEST);
ps.print("Unable to process cmd '" + cmd + "'");
return;
}
// default = SC_OK, and is bad form to set status after writing output;
// some implementations may clear the buffer when setting status
//response.setStatus(HttpServletResponse.SC_OK);
} catch (Exception e) {
if (e instanceof IOException) throw (IOException)e;
else throw new ServletException(e);
}
}
private static byte[] requestToByteArray(HttpServletRequest request) throws IOException {
int len = request.getContentLength();
if (len <= 0) {
throw new IllegalArgumentException("Zero or unknown body length");
}
InputStream is = request.getInputStream();
if (is == null) {
return null;
}
try {
byte[] b = new byte[len];
is.read(b);
return b;
} finally {
is.close();
}
}
}