package com.cloudhopper.mq.transcoder;
/*
* #%L
* ch-mq
* %%
* Copyright (C) 2012 Cloudhopper by Twitter
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import java.nio.ByteBuffer;
import com.cloudhopper.mq.message.MQMessage;
import com.cloudhopper.mq.message.PriorityMQMessage;
import com.cloudhopper.mq.util.Priority;
import com.cloudhopper.mq.util.Tiny;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Wrapper transcoder for MQMessages which includes transfers, transferAttempts
* and priority.
* @author garth
*/
public class PriorityMQMessageTranscoder<T> implements Transcoder<PriorityMQMessage<T>>, TranscoderWrapped {
private static final Logger logger = LoggerFactory.getLogger(PriorityMQMessageTranscoder.class);
private static final long PRIORITY_MAGIC_NUMBER = 318279641053737399L;
public PriorityMQMessageTranscoder(Transcoder<? super T> transcoder) {
this.transcoder = transcoder;
}
private Transcoder<? super T> transcoder;
public Transcoder<? super T> getBaseTranscoder() {
return this.transcoder;
}
/**
* Encode the given object for storage.
*
* @param decoded the object
* @return the byte array representation of the object
*/
public byte[] encode(PriorityMQMessage<T> decoded) {
byte[] meta = getMetadata(decoded);
byte[] data = transcoder.encode(decoded.getBody());
byte[] header = getHeader(data.length, meta.length);
byte[] message = new byte[header.length+data.length+meta.length];
ByteBuffer buf = ByteBuffer.wrap(message);
buf.put(header);
buf.put(data);
buf.put(meta);
return message;
}
/**
* Decode the cached object into the object it represents.
*
* @param encoded the byte array representation of the object
* @return the object
*/
public PriorityMQMessage<T> decode(byte[] encoded) {
ByteBuffer buf = ByteBuffer.wrap(encoded);
//header, 16 bytes
long magic = buf.getLong();
if (magic != PRIORITY_MAGIC_NUMBER) {
//This isn't a PriorityMQMessage. Rather than bail, let's try to treat it as a
//message of type T, transcode it as that, and then construct the PriorityMQMessage
//from scratch. This way, we can have backwards compatibility when loading items
//into new priority queues over remote brokers by upgrading the receiving side first,
//then the sending side.
logger.trace("No PRIORITY_MAGIC_NUMBER detected, attempting backwards compatible.");
return new PriorityMQMessage(Priority.MIN_PRIORITY, (T)transcoder.decode(encoded));
} else {
long dataLen = buf.getLong();
long metaLen = buf.getLong();
byte[] bencoded = new byte[((int)dataLen)];
buf.get(bencoded);
byte[] mencoded = new byte[((int)metaLen)];
buf.get(mencoded);
PriorityMQMessage<T> message = createFromMetadata(mencoded);
message.setBody((T)transcoder.decode(bencoded));
return message;
}
}
/**
* Header
* 8 bytes (magic number), 8 bytes (length of data as long), 8 bytes (length of metadata as long)
*/
private byte[] getHeader(long dataLen, long metaLen) {
byte[] b = new byte[24];
ByteBuffer buf = ByteBuffer.wrap(b);
buf.putLong(PRIORITY_MAGIC_NUMBER);
buf.putLong(dataLen);
buf.putLong(metaLen);
return b;
}
/**
* Metadata
* - Transfer attempts (short)
* - Transfers (short)
* - Log item identifier (String)
* - Priority (byte)
* - Timestamp (long)
* - Sequence (byte)
*/
private byte[] getMetadata(PriorityMQMessage<T> msg) {
byte[] logItemIdentifier = msg.getLogItemIdentifier().getBytes();
byte[] b = new byte[14+logItemIdentifier.length];
ByteBuffer buf = ByteBuffer.wrap(b);
buf.put(msg.getPriority().byteValue());
buf.put(new Tiny(msg.getSequence()).byteValue());
buf.putLong(msg.getTimestamp());
buf.putShort(msg.getTransferAttemptsCount());
buf.putShort(msg.getTransferCount());
buf.put(logItemIdentifier);
return b;
}
private PriorityMQMessage<T> createFromMetadata(byte[] meta) {
ByteBuffer buf = ByteBuffer.wrap(meta);
byte priority = buf.get(); //1
byte sequence = buf.get(); //1
long timestamp = buf.getLong(); //8
short transferAttempts = buf.getShort(); //2
short transfers = buf.getShort(); //2
byte[] b = new byte[meta.length-14];
buf.get(b);
String logItemIdentifier = new String(b);
return new PriorityMQMessage<T>(transferAttempts, transfers, logItemIdentifier, new Priority(priority), timestamp, new Tiny(sequence).intValue());
}
}