/*
* Copyright 2010 Red Hat, Inc.
* Red Hat licenses this file to you under the Apache License, version
* 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package org.hornetq.core.persistence.impl.journal;
import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import javax.xml.stream.XMLInputFactory;
import javax.xml.stream.XMLStreamConstants;
import javax.xml.stream.XMLStreamException;
import javax.xml.stream.XMLStreamReader;
import org.hornetq.api.core.Message;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientMessage;
import org.hornetq.api.core.client.ClientProducer;
import org.hornetq.api.core.client.ClientRequestor;
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.api.core.management.ManagementHelper;
import org.hornetq.core.message.impl.MessageImpl;
import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
import org.hornetq.core.remoting.impl.netty.TransportConstants;
import org.hornetq.core.server.HornetQServerLogger;
import org.hornetq.utils.Base64;
/**
* Read XML output from <code>org.hornetq.core.persistence.impl.journal.XmlDataExporter</code>, create a core session, and
* send the messages to a running instance of HornetQ. It uses the StAX <code>javax.xml.stream.XMLStreamReader</code>
* for speed and simplicity.
*
* @author Justin Bertram
*/
public final class XmlDataImporter
{
// Constants -----------------------------------------------------
// Attributes ----------------------------------------------------
private final XMLStreamReader reader;
// this session is really only needed if the "session" variable does not auto-commit sends
final ClientSession managementSession;
final boolean localSession;
final Map<String, String> addressMap = new HashMap<String, String>();
final Map<String, Long> queueIDs = new HashMap<String, Long>();
String tempFileName = "";
private final ClientSession session;
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
/**
* This is the normal constructor for programmatic access to the
* <code>org.hornetq.core.persistence.impl.journal.XmlDataImporter</code> if the session passed
* in uses auto-commit for sends.
* <p>
* If the session needs to be transactional then use the constructor which takes 2 sessions.
* @param inputStream the stream from which to read the XML for import
* @param session used for sending messages, must use auto-commit for sends
* @throws Exception
*/
public XmlDataImporter(InputStream inputStream, ClientSession session) throws Exception
{
this(inputStream, session, null);
}
/**
* This is the constructor to use if you wish to import all messages transactionally.
* <p>
* Pass in a session which doesn't use auto-commit for sends, and one that does (for management
* operations necessary during import).
* @param inputStream the stream from which to read the XML for import
* @param session used for sending messages, doesn't need to auto-commit sends
* @param managementSession used for management queries, must use auto-commit for sends
*/
public XmlDataImporter(InputStream inputStream, ClientSession session, ClientSession managementSession) throws Exception
{
reader = XMLInputFactory.newInstance().createXMLStreamReader(inputStream);
this.session = session;
if (managementSession != null)
{
this.managementSession = managementSession;
}
else
{
this.managementSession = session;
}
localSession = false;
}
public XmlDataImporter(InputStream inputStream, String host, String port, boolean transactional) throws Exception
{
reader = XMLInputFactory.newInstance().createXMLStreamReader(inputStream);
HashMap<String, Object> connectionParams = new HashMap<String, Object>();
connectionParams.put(TransportConstants.HOST_PROP_NAME, host);
connectionParams.put(TransportConstants.PORT_PROP_NAME, port);
ServerLocator serverLocator =
HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(
NettyConnectorFactory.class.getName(),
connectionParams));
ClientSessionFactory sf = serverLocator.createSessionFactory();
session = sf.createSession(false, !transactional, true);
managementSession = sf.createSession(false, true, true);
localSession = true;
}
public XmlDataImporter(String inputFile, String host, String port, boolean transactional) throws Exception
{
this(new FileInputStream(inputFile), host, port, transactional);
}
// Public --------------------------------------------------------
public static void main(String arg[])
{
if (arg.length < 3)
{
System.out.println("Use: java -cp hornetq-core.jar " + XmlDataImporter.class + " <inputFile> <host> <port> [<transactional>]");
System.exit(-1);
}
try
{
XmlDataImporter xmlDataImporter = new XmlDataImporter(arg[0], arg[1], arg[2], (arg.length > 3 && Boolean.parseBoolean(arg[3])));
xmlDataImporter.processXml();
}
catch (Exception e)
{
e.printStackTrace();
}
}
public void processXml() throws Exception
{
try
{
while (reader.hasNext())
{
HornetQServerLogger.LOGGER.debug("EVENT:[" + reader.getLocation().getLineNumber() + "][" + reader.getLocation().getColumnNumber() + "] ");
if (reader.getEventType() == XMLStreamConstants.START_ELEMENT)
{
if (XmlDataConstants.BINDINGS_CHILD.equals(reader.getLocalName()))
{
bindQueue();
}
if (XmlDataConstants.MESSAGES_CHILD.equals(reader.getLocalName()))
{
processMessage();
}
}
reader.next();
}
if (!session.isAutoCommitSends())
{
session.commit();
}
}
finally
{
// if the session was created in our constructor then close it (otherwise the caller will close it)
if (localSession)
{
session.close();
managementSession.close();
}
}
}
private void processMessage() throws Exception
{
Byte type = 0;
Byte priority = 0;
Long expiration = 0L;
Long timestamp = 0L;
org.hornetq.utils.UUID userId = null;
ArrayList<String> queues = new ArrayList<String>();
// get message's attributes
for (int i = 0; i < reader.getAttributeCount(); i++)
{
String attributeName = reader.getAttributeLocalName(i);
if (XmlDataConstants.MESSAGE_TYPE.equals(attributeName))
{
type = getMessageType(reader.getAttributeValue(i));
}
else if (XmlDataConstants.MESSAGE_PRIORITY.equals(attributeName))
{
priority = Byte.parseByte(reader.getAttributeValue(i));
}
else if (XmlDataConstants.MESSAGE_EXPIRATION.equals(attributeName))
{
expiration = Long.parseLong(reader.getAttributeValue(i));
}
else if (XmlDataConstants.MESSAGE_TIMESTAMP.equals(attributeName))
{
timestamp = Long.parseLong(reader.getAttributeValue(i));
}
else if (XmlDataConstants.MESSAGE_USER_ID.equals(attributeName))
{
userId = org.hornetq.utils.UUIDGenerator.getInstance().generateUUID();
}
}
Message message = session.createMessage(type, true, expiration, timestamp, priority);
message.setUserID(userId);
boolean endLoop = false;
// loop through the XML and gather up all the message's data (i.e. body, properties, queues, etc.)
while (reader.hasNext())
{
int eventType = reader.getEventType();
switch (eventType)
{
case XMLStreamConstants.START_ELEMENT:
if (XmlDataConstants.MESSAGE_BODY.equals(reader.getLocalName()))
{
processMessageBody(message);
}
else if (XmlDataConstants.PROPERTIES_CHILD.equals(reader.getLocalName()))
{
processMessageProperties(message);
}
else if (XmlDataConstants.QUEUES_CHILD.equals(reader.getLocalName()))
{
processMessageQueues(queues);
}
break;
case XMLStreamConstants.END_ELEMENT:
if (XmlDataConstants.MESSAGES_CHILD.equals(reader.getLocalName()))
{
endLoop = true;
}
break;
}
if (endLoop)
{
break;
}
reader.next();
}
sendMessage(queues, message);
}
private Byte getMessageType(String value)
{
Byte type = Message.DEFAULT_TYPE;
if (value.equals(XmlDataConstants.DEFAULT_TYPE_PRETTY))
{
type = Message.DEFAULT_TYPE;
}
else if (value.equals(XmlDataConstants.BYTES_TYPE_PRETTY))
{
type = Message.BYTES_TYPE;
}
else if (value.equals(XmlDataConstants.MAP_TYPE_PRETTY))
{
type = Message.MAP_TYPE;
}
else if (value.equals(XmlDataConstants.OBJECT_TYPE_PRETTY))
{
type = Message.OBJECT_TYPE;
}
else if (value.equals(XmlDataConstants.STREAM_TYPE_PRETTY))
{
type = Message.STREAM_TYPE;
}
else if (value.equals(XmlDataConstants.TEXT_TYPE_PRETTY))
{
type = Message.TEXT_TYPE;
}
return type;
}
private void sendMessage(ArrayList<String> queues, Message message) throws Exception
{
StringBuilder logMessage = new StringBuilder();
String destination = addressMap.get(queues.get(0));
logMessage.append("Sending ").append(message).append(" to address: ").append(destination).append("; routed to queues: ");
ByteBuffer buffer = ByteBuffer.allocate(queues.size() * 8);
for (String queue : queues)
{
long queueID = 0;
if (queueIDs.containsKey(queue))
{
queueID = queueIDs.get(queue);
}
else
{
// Get the ID of the queues involved so the message can be routed properly. This is done because we cannot
// send directly to a queue, we have to send to an address instead but not all the queues related to the
// address may need the message
ClientRequestor requestor = new ClientRequestor(managementSession, "jms.queue.hornetq.management");
ClientMessage managementMessage = managementSession.createMessage(false);
ManagementHelper.putAttribute(managementMessage, "core.queue." + queue, "ID");
managementSession.start();
HornetQServerLogger.LOGGER.debug("Requesting ID for: " + queue);
ClientMessage reply = requestor.request(managementMessage);
queueID = (Integer) ManagementHelper.getResult(reply);
requestor.close();
HornetQServerLogger.LOGGER.debug("ID for " + queue + " is: " + queueID);
queueIDs.put(queue, queueID); // store it so we don't have to look it up every time
}
logMessage.append(queue).append(", ");
buffer.putLong(queueID);
}
logMessage.delete(logMessage.length() - 2, logMessage.length()); // take off the trailing comma
HornetQServerLogger.LOGGER.debug(logMessage);
message.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, buffer.array());
ClientProducer producer = session.createProducer(destination);
producer.send(message);
producer.close();
if (tempFileName.length() > 0)
{
File tempFile = new File(tempFileName);
if (!tempFile.delete())
{
HornetQServerLogger.LOGGER.warn("Could not delete: " + tempFileName);
}
}
}
private void processMessageQueues(ArrayList<String> queues)
{
for (int i = 0; i < reader.getAttributeCount(); i++)
{
if (XmlDataConstants.QUEUE_NAME.equals(reader.getAttributeLocalName(i)))
{
queues.add(reader.getAttributeValue(i));
}
}
}
private void processMessageProperties(Message message)
{
String key = "";
String value = "";
String propertyType = "";
for (int i = 0; i < reader.getAttributeCount(); i++)
{
String attributeName = reader.getAttributeLocalName(i);
if (XmlDataConstants.PROPERTY_NAME.equals(attributeName))
{
key = reader.getAttributeValue(i);
}
else if (XmlDataConstants.PROPERTY_VALUE.equals(attributeName))
{
value = reader.getAttributeValue(i);
}
else if (XmlDataConstants.PROPERTY_TYPE.equals(attributeName))
{
propertyType = reader.getAttributeValue(i);
}
}
if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_SHORT))
{
message.putShortProperty(key, Short.parseShort(value));
}
else if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_BOOLEAN))
{
message.putBooleanProperty(key, Boolean.parseBoolean(value));
}
else if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_BYTE))
{
message.putByteProperty(key, Byte.parseByte(value));
}
else if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_BYTES))
{
message.putBytesProperty(key, decode(value));
}
else if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_DOUBLE))
{
message.putDoubleProperty(key, Double.parseDouble(value));
}
else if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_FLOAT))
{
message.putFloatProperty(key, Float.parseFloat(value));
}
else if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_INTEGER))
{
message.putIntProperty(key, Integer.parseInt(value));
}
else if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_LONG))
{
message.putLongProperty(key, Long.parseLong(value));
}
else if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_SIMPLE_STRING))
{
message.putStringProperty(new SimpleString(key), new SimpleString(value));
}
else if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_STRING))
{
message.putStringProperty(key, value);
}
}
private void processMessageBody(Message message) throws XMLStreamException, IOException
{
boolean isLarge = false;
for (int i = 0; i < reader.getAttributeCount(); i++)
{
String attributeName = reader.getAttributeLocalName(i);
if (XmlDataConstants.MESSAGE_IS_LARGE.equals(attributeName))
{
isLarge = Boolean.parseBoolean(reader.getAttributeValue(i));
}
}
reader.next();
if (isLarge)
{
tempFileName = UUID.randomUUID().toString() + ".tmp";
HornetQServerLogger.LOGGER.debug("Creating temp file " + tempFileName + " for large message.");
OutputStream out = new FileOutputStream(tempFileName);
try
{
while (reader.hasNext())
{
if (reader.getEventType() == XMLStreamConstants.END_ELEMENT)
{
break;
}
else
{
String characters = new String(reader.getTextCharacters(), reader.getTextStart(), reader.getTextLength());
String trimmedCharacters = characters.trim();
if (trimmedCharacters.length() > 0) // this will skip "indentation" characters
{
byte[] data = decode(trimmedCharacters);
out.write(data);
}
}
reader.next();
}
}
finally
{
out.close();
}
FileInputStream fileInputStream = new FileInputStream(tempFileName);
BufferedInputStream bufferedInput = new BufferedInputStream(fileInputStream);
((ClientMessage) message).setBodyInputStream(bufferedInput);
}
else
{
reader.next(); // step past the "indentation" characters to get to the CDATA with the message body
String characters = new String(reader.getTextCharacters(), reader.getTextStart(), reader.getTextLength());
message.getBodyBuffer().writeBytes(decode(characters.trim()));
}
}
private void bindQueue() throws Exception
{
String queueName = "";
String address = "";
String filter = "";
for (int i = 0; i < reader.getAttributeCount(); i++)
{
String attributeName = reader.getAttributeLocalName(i);
if (XmlDataConstants.BINDING_ADDRESS.equals(attributeName))
{
address = reader.getAttributeValue(i);
}
else if (XmlDataConstants.BINDING_QUEUE_NAME.equals(attributeName))
{
queueName = reader.getAttributeValue(i);
}
else if (XmlDataConstants.BINDING_FILTER_STRING.equals(attributeName))
{
filter = reader.getAttributeValue(i);
}
}
ClientSession.QueueQuery queueQuery = session.queueQuery(new SimpleString(queueName));
if (!queueQuery.isExists())
{
session.createQueue(address, queueName, filter, true);
HornetQServerLogger.LOGGER.debug("Binding queue(name=" + queueName + ", address=" + address + ", filter=" + filter + ")");
}
else
{
HornetQServerLogger.LOGGER.debug("Binding " + queueName + " already exists so won't re-bind.");
}
addressMap.put(queueName, address);
}
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
// Private -------------------------------------------------------
private static byte[] decode(String data)
{
return Base64.decode(data, Base64.DONT_BREAK_LINES | Base64.URL_SAFE);
}
// Inner classes -------------------------------------------------
}