package com.cloudhopper.mq.util;
/*
* #%L
* ch-mq
* %%
* Copyright (C) 2009 - 2013 Cloudhopper by Twitter
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import com.cloudhopper.datastore.DataStore;
import com.cloudhopper.datastore.DataStoreIterator;
import com.cloudhopper.mq.message.PriorityMQMessage;
import com.cloudhopper.mq.queue.Queue;
import com.cloudhopper.mq.queue.QueueConfiguration;
import com.cloudhopper.mq.queue.impl.DefaultQueueManager;
import com.cloudhopper.mq.queue.impl.QueueInfo;
import com.cloudhopper.mq.queue.impl.RunInfo;
import com.cloudhopper.mq.queue.impl.RunInfoFileUtil;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.util.Enumeration;
/**
* Views/dumps contents of a queue without removing items.
* @author garth
*/
public class QueueDumper extends QueueStringUtil {
public QueueDumper(QueueConfiguration config, OutputStream out) throws Exception {
super(new DefaultQueueManager(config));
this.writer = new PrintWriter(out);
}
private final PrintWriter writer;
public void cleanup() {
try {
queueManager.stop();
} catch (Exception e) {
e.printStackTrace();
}
}
public boolean hasQueue(String name) {
return queueManager.hasQueue(name);
}
public void inform() throws Exception {
Enumeration<Queue> queues = queueManager.getQueues();
while (queues.hasMoreElements()) {
Queue queue = queues.nextElement();
printHeader(queue);
}
}
public void dump() throws Exception {
Enumeration<Queue> queues = queueManager.getQueues();
while (queues.hasMoreElements()) {
Queue queue = queues.nextElement();
dump(queue);
}
}
public void dump(String name) throws Exception {
dump(queueManager.getQueue(name));
}
public void dump(Queue queue) throws Exception {
// queue
if (queue.getSize() > 0 && dataStore.hasAscendingIteratorSupport()) {
printHeader(queue);
DataStoreIterator iterator = dataStore.getAscendingIterator();
boolean jumped = iterator.jump(priorityKeyUtil.encode(queue.getId(), 0L));
if (!jumped) return;
while (true) {
try {
DataStoreIterator.Record record = iterator.getRecord();
byte[] keyBytes = record.getKey();
CompositeKey key = priorityKeyUtil.decode(keyBytes);
// long queueId = keyUtil.decodeQueueId(bytes);
// long itemId = keyUtil.decodeItemId(bytes);
if (key.getQueueId() != queue.getId()) break;
PriorityMQMessage.Key pkey = null;
try {
pkey = new PriorityMQMessage.Key(key.getItemId());
} catch (Exception e) {}
byte[] valueBytes = record.getValue();
//Object value = queue.getTranscoder().decode(valueBytes);
printKey(keyBytes, key.getQueueId(), key.getItemId(), pkey);
writer.println(byteArrayToHexString(valueBytes));
if (!iterator.next()) return;
} catch (Exception e) {
e.printStackTrace();
break;
}
}
}
}
private void printKey(byte[] key, int queueId, long itemId, PriorityMQMessage.Key pkey) {
writer.print("#");
writer.println(keyToString(key, queueId, itemId, pkey));
}
private void printHeader(Queue queue) throws IOException {
writer.print("##");
writer.println(queueInfoToString(queueManager.getQueueInfoMap().get(queue.getId())));
}
public static void main(String[] argv) throws Exception {
if (argv.length<3) {
System.err.println("QueueDumper <name> <dir> <url> <optional:queue_name> <optional:out_file>");
System.exit(1);
}
QueueConfiguration config = new QueueConfiguration();
config.setName(argv[0]);
config.setDirectory(argv[1]);
config.setDataStoreUrl(argv[2]);
config.setJmxEnabled(false);
OutputStream out = System.out;
String queueName = null;
String fileName = null;
if (argv.length > 3) queueName = argv[3];
if (argv.length > 4) out = new FileOutputStream(argv[4]);
QueueDumper dumper = new QueueDumper(config, out);
if (queueName != null) {
if (!dumper.hasQueue(queueName)) {
System.err.println(String.format("Queue %s not found in dataStore at %s", queueName, argv[2]));
System.exit(1);
} else {
dumper.dump(queueName);
}
} else {
dumper.dump();
}
System.exit(0);
}
}