/**
*
* Copyright 2004 Protique Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
**/
package org.codehaus.activemq.message;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.activemq.message.util.WireByteArrayInputStream;
import org.codehaus.activemq.message.util.WireByteArrayOutputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.ObjectStreamException;
import java.io.Serializable;
/**
* Default implementation used for Java-Java protocols. When talking to non-Java nodes we may use a different wire
* format.
*
* @version $Revision: 1.14 $
*/
public class DefaultWireFormat extends WireFormat implements Serializable {
/**
* Current wire format version for this implementation
*/
public static final int WIRE_FORMAT_VERSION = 1;
private static final Log log = LogFactory.getLog(DefaultWireFormat.class);
private transient final PacketReader messageReader = new ActiveMQMessageReader();
private transient final PacketReader textMessageReader = new ActiveMQTextMessageReader();
private transient final PacketReader objectMessageReader = new ActiveMQObjectMessageReader();
private transient final PacketReader bytesMessageReader = new ActiveMQBytesMessageReader();
private transient final PacketReader streamMessageReader = new ActiveMQStreamMessageReader();
private transient final PacketReader mapMessageReader = new ActiveMQMapMessageReader();
private transient final PacketReader messageAckReader = new MessageAckReader();
private transient final PacketReader receiptReader = new ReceiptReader();
private transient final PacketReader consumerInfoReader = new ConsumerInfoReader();
private transient final PacketReader producerInfoReader = new ProducerInfoReader();
private transient final PacketReader transactionInfoReader = new TransactionInfoReader();
private transient final PacketReader xaTransactionInfoReader = new XATransactionInfoReader();
private transient final PacketReader brokerInfoReader = new BrokerInfoReader();
private transient final PacketReader connectionInfoReader = new ConnectionInfoReader();
private transient final PacketReader sessionInfoReader = new SessionInfoReader();
private transient final PacketReader durableUnsubscribeReader = new DurableUnsubscribeReader();
private transient final PacketReader reponseReceiptReader = new ResponseReceiptReader();
private transient final PacketReader intReponseReceiptReader = new IntResponseReceiptReader();
private transient final PacketReader capacityInfoReader = new CapacityInfoReader();
private transient final PacketReader capacityInfoRequestReader = new CapacityInfoRequestReader();
private transient final PacketReader wireFormatInfoReader = new WireFormatInfoReader();
private transient final PacketWriter messageWriter = new ActiveMQMessageWriter();
private transient final PacketWriter textMessageWriter = new ActiveMQTextMessageWriter();
private transient final PacketWriter objectMessageWriter = new ActiveMQObjectMessageWriter();
private transient final PacketWriter bytesMessageWriter = new ActiveMQBytesMessageWriter();
private transient final PacketWriter streamMessageWriter = new ActiveMQStreamMessageWriter();
private transient final PacketWriter mapMessageWriter = new ActiveMQMapMessageWriter();
private transient final PacketWriter messageAckWriter = new MessageAckWriter();
private transient final PacketWriter receiptWriter = new ReceiptWriter();
private transient final PacketWriter consumerInfoWriter = new ConsumerInfoWriter();
private transient final PacketWriter producerInfoWriter = new ProducerInfoWriter();
private transient final PacketWriter transactionInfoWriter = new TransactionInfoWriter();
private transient final PacketWriter xaTransactionInfoWriter = new XATransactionInfoWriter();
private transient final PacketWriter brokerInfoWriter = new BrokerInfoWriter();
private transient final PacketWriter connectionInfoWriter = new ConnectionInfoWriter();
private transient final PacketWriter sessionInfoWriter = new SessionInfoWriter();
private transient final PacketWriter durableUnsubscribeWriter = new DurableUnsubscribeWriter();
private transient final PacketWriter reponseReceiptWriter = new ResponseReceiptWriter();
private transient final PacketWriter intReponseReceiptWriter = new IntResponseReceiptWriter();
private transient final PacketWriter capacityInfoWriter = new CapacityInfoWriter();
private transient final PacketWriter capacityInfoRequestWriter = new CapacityInfoRequestWriter();
private transient final PacketWriter wireFormatInfoWriter = new WireFormatInfoWriter();
private transient WireByteArrayOutputStream internalBytesOut;
private transient DataOutputStream internalDataOut;
private transient WireByteArrayInputStream internalBytesIn;
private transient DataInputStream internalDataIn;
/**
* Default Constructor
*/
public DefaultWireFormat() {
internalBytesOut = new WireByteArrayOutputStream();
internalDataOut = new DataOutputStream(internalBytesOut);
internalBytesIn = new WireByteArrayInputStream();
internalDataIn = new DataInputStream(internalBytesIn);
}
/**
* @return new WireFormat
*/
public WireFormat copy() {
return new DefaultWireFormat();
}
/**
* @param in
* @return
* @throws IOException
*/
public Packet readPacket(DataInput in) throws IOException {
int type = in.readByte();
return readPacket(type, in);
}
/**
* @param firstByte
* @param dataIn
* @return
* @throws IOException
*
*/
public Packet readPacket(int firstByte, DataInput dataIn) throws IOException {
switch (firstByte) {
case Packet.ACTIVEMQ_MESSAGE :
return readPacket(dataIn, messageReader);
case Packet.ACTIVEMQ_TEXT_MESSAGE :
return readPacket(dataIn, textMessageReader);
case Packet.ACTIVEMQ_OBJECT_MESSAGE :
return readPacket(dataIn, objectMessageReader);
case Packet.ACTIVEMQ_BYTES_MESSAGE :
return readPacket(dataIn, bytesMessageReader);
case Packet.ACTIVEMQ_STREAM_MESSAGE :
return readPacket(dataIn, streamMessageReader);
case Packet.ACTIVEMQ_MAP_MESSAGE :
return readPacket(dataIn, mapMessageReader);
case Packet.ACTIVEMQ_MSG_ACK :
return readPacket(dataIn, messageAckReader);
case Packet.RECEIPT_INFO :
return readPacket(dataIn, receiptReader);
case Packet.CONSUMER_INFO :
return readPacket(dataIn, consumerInfoReader);
case Packet.PRODUCER_INFO :
return readPacket(dataIn, producerInfoReader);
case Packet.TRANSACTION_INFO :
return readPacket(dataIn, transactionInfoReader);
case Packet.XA_TRANSACTION_INFO :
return readPacket(dataIn, xaTransactionInfoReader);
case Packet.ACTIVEMQ_BROKER_INFO :
return readPacket(dataIn, brokerInfoReader);
case Packet.ACTIVEMQ_CONNECTION_INFO :
return readPacket(dataIn, connectionInfoReader);
case Packet.SESSION_INFO :
return readPacket(dataIn, sessionInfoReader);
case Packet.DURABLE_UNSUBSCRIBE :
return readPacket(dataIn, durableUnsubscribeReader);
case Packet.RESPONSE_RECEIPT_INFO :
return readPacket(dataIn, reponseReceiptReader);
case Packet.INT_RESPONSE_RECEIPT_INFO :
return readPacket(dataIn, intReponseReceiptReader);
case Packet.CAPACITY_INFO :
return readPacket(dataIn, capacityInfoReader);
case Packet.CAPACITY_INFO_REQUEST :
return readPacket(dataIn, capacityInfoRequestReader);
case Packet.WIRE_FORMAT_INFO :
return readPacket(dataIn, wireFormatInfoReader);
default :
log.error("Could not find PacketReader for packet type: "
+ AbstractPacket.getPacketTypeAsString(firstByte));
return null;
}
}
/**
* Write a Packet to a DataOutput
*
* @param packet
* @param dataOut
* @throws IOException
*/
public void writePacket(Packet packet, DataOutput dataOut) throws IOException {
PacketWriter writer = getWriter(packet);
if (writer != null) {
writePacket(packet, dataOut, writer);
}
}
/**
* A helper method which converts a packet into a byte array Overrides the WireFormat to make use of the internal
* BytesOutputStream
*
* @param packet
* @return a byte array representing the packet using some wire protocol
* @throws IOException
*/
public byte[] toBytes(Packet packet) throws IOException {
byte[] data = null;
PacketWriter writer = getWriter(packet);
if (writer != null) {
internalBytesOut.reset();
internalDataOut.writeByte(packet.getPacketType());
internalDataOut.writeInt(-1);//the length
writer.writePacket(packet, internalDataOut);
internalDataOut.flush();
data = internalBytesOut.toByteArray();
// lets subtract the header offset from the length
int length = data.length - 5;
packet.setMemoryUsage(length);
//write in the length to the data
data[1] = (byte) ((length >>> 24) & 0xFF);
data[2] = (byte) ((length >>> 16) & 0xFF);
data[3] = (byte) ((length >>> 8) & 0xFF);
data[4] = (byte) ((length >>> 0) & 0xFF);
}
return data;
}
/**
* Can this wireformat process packets of this version
* @param version the version number to test
* @return true if can accept the version
*/
public boolean canProcessWireFormatVersion(int version){
return version == WIRE_FORMAT_VERSION;
}
/**
* @return the current version of this wire format
*/
public int getCurrentWireFormatVersion(){
return WIRE_FORMAT_VERSION;
}
protected synchronized final void writePacket(Packet packet, DataOutput dataOut, PacketWriter writer)
throws IOException {
dataOut.writeByte(packet.getPacketType());
internalBytesOut.reset();
writer.writePacket(packet, internalDataOut);
internalDataOut.flush();
//reuse the byte buffer in the ByteArrayOutputStream
byte[] data = internalBytesOut.getData();
int count = internalBytesOut.size();
dataOut.writeInt(count);
//byte[] data = internalBytesOut.toByteArray();
//int count = data.length;
//dataOut.writeInt(count);
packet.setMemoryUsage(count);
dataOut.write(data, 0, count);
}
protected synchronized final Packet readPacket(DataInput dataIn, PacketReader reader) throws IOException {
Packet packet = reader.createPacket();
int length = dataIn.readInt();
packet.setMemoryUsage(length);
// read all the remaining data in one chunk ignoring the header
// TODO sometimes the length should exclude the header?
byte[] data = new byte[length];
dataIn.readFully(data);
//then splat into the internal datainput
internalBytesIn.restart(data);
reader.buildPacket(packet, internalDataIn);
return packet;
}
private Object readResolve() throws ObjectStreamException {
return new DefaultWireFormat();
}
private PacketWriter getWriter(Packet packet) throws IOException {
PacketWriter answer = null;
switch (packet.getPacketType()) {
case Packet.ACTIVEMQ_MESSAGE :
answer = messageWriter;
break;
case Packet.ACTIVEMQ_TEXT_MESSAGE :
answer = textMessageWriter;
break;
case Packet.ACTIVEMQ_OBJECT_MESSAGE :
answer = objectMessageWriter;
break;
case Packet.ACTIVEMQ_BYTES_MESSAGE :
answer = bytesMessageWriter;
break;
case Packet.ACTIVEMQ_STREAM_MESSAGE :
answer = streamMessageWriter;
break;
case Packet.ACTIVEMQ_MAP_MESSAGE :
answer = mapMessageWriter;
break;
case Packet.ACTIVEMQ_MSG_ACK :
answer = messageAckWriter;
break;
case Packet.RECEIPT_INFO :
answer = receiptWriter;
break;
case Packet.CONSUMER_INFO :
answer = consumerInfoWriter;
break;
case Packet.PRODUCER_INFO :
answer = producerInfoWriter;
break;
case Packet.TRANSACTION_INFO :
answer = transactionInfoWriter;
break;
case Packet.XA_TRANSACTION_INFO :
answer = xaTransactionInfoWriter;
break;
case Packet.ACTIVEMQ_BROKER_INFO :
answer = brokerInfoWriter;
break;
case Packet.ACTIVEMQ_CONNECTION_INFO :
answer = connectionInfoWriter;
break;
case Packet.SESSION_INFO :
answer = sessionInfoWriter;
break;
case Packet.DURABLE_UNSUBSCRIBE :
answer = durableUnsubscribeWriter;
break;
case Packet.RESPONSE_RECEIPT_INFO :
answer = reponseReceiptWriter;
break;
case Packet.INT_RESPONSE_RECEIPT_INFO :
answer = intReponseReceiptWriter;
break;
case Packet.CAPACITY_INFO :
answer = capacityInfoWriter;
break;
case Packet.CAPACITY_INFO_REQUEST :
answer = capacityInfoRequestWriter;
break;
case Packet.WIRE_FORMAT_INFO :
answer = wireFormatInfoWriter;
break;
default :
log.error("no PacketWriter for packet: " + packet);
}
return answer;
}
}