package org.infinispan.client.hotrod.impl;
import org.infinispan.client.hotrod.Flag;
import org.infinispan.client.hotrod.exceptions.HotRodClientException;
import org.infinispan.client.hotrod.exceptions.InvalidResponseException;
import org.infinispan.client.hotrod.exceptions.TimeoutException;
import org.infinispan.client.hotrod.exceptions.TransportException;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
/**
* // TODO: Document this
*
* @author mmarkus
* @since 4.1
*/
public class HotrodOperationsImpl implements HotrodOperations, HotrodConstants {
private static Log log = LogFactory.getLog(HotrodOperationsImpl.class);
private final byte[] cacheNameBytes;
private static final AtomicLong MSG_ID = new AtomicLong();
private static final AtomicInteger TOPOLOGY_ID = new AtomicInteger();
private TransportFactory transportFactory;
private byte clientIntelligence = CLIENT_INTELLIGENCE_TOPOLOGY_AWARE;
public HotrodOperationsImpl(String cacheName, TransportFactory transportFactory) {
cacheNameBytes = cacheName.getBytes(); //todo add charset here
this.transportFactory = transportFactory;
}
public byte[] get(byte[] key, Flag[] flags) {
Transport transport = transportFactory.getTransport();
try {
short status = sendKeyOperation(key, transport, GET_REQUEST, flags, GET_RESPONSE);
if (status == KEY_DOES_NOT_EXIST_STATUS) {
return null;
}
if (status == NO_ERROR_STATUS) {
return transport.readArray();
}
} finally {
releaseTransport(transport);
}
throw new IllegalStateException("We should not reach here!");
}
public byte[] remove(byte[] key, Flag[] flags) {
Transport transport = transportFactory.getTransport();
try {
short status = sendKeyOperation(key, transport, REMOVE_REQUEST, flags, REMOVE_RESPONSE);
if (status == KEY_DOES_NOT_EXIST_STATUS) {
return null;
} else if (status == NO_ERROR_STATUS) {
return returnPossiblePrevValue(transport, flags);
}
} finally {
releaseTransport(transport);
}
throw new IllegalStateException("We should not reach here!");
}
public boolean containsKey(byte[] key, Flag... flags) {
Transport transport = transportFactory.getTransport();
try {
short status = sendKeyOperation(key, transport, CONTAINS_KEY_REQUEST, flags, CONTAINS_KEY_RESPONSE);
if (status == KEY_DOES_NOT_EXIST_STATUS) {
return false;
} else if (status == NO_ERROR_STATUS) {
return true;
}
} finally {
releaseTransport(transport);
}
throw new IllegalStateException("We should not reach here!");
}
public BinaryVersionedValue getWithVersion(byte[] key, Flag... flags) {
Transport transport = transportFactory.getTransport();
try {
short status = sendKeyOperation(key, transport, GET_WITH_VERSION, flags, GET_WITH_VERSION_RESPONSE);
if (status == KEY_DOES_NOT_EXIST_STATUS) {
return null;
}
if (status == NO_ERROR_STATUS) {
long version = transport.readLong();
if (log.isTraceEnabled()) {
log.trace("Received version: " + version);
}
byte[] value = transport.readArray();
return new BinaryVersionedValue(version, value);
}
} finally {
releaseTransport(transport);
}
throw new IllegalStateException("We should not reach here!");
}
public byte[] put(byte[] key, byte[] value, int lifespan, int maxIdle, Flag... flags) {
Transport transport = transportFactory.getTransport();
try {
short status = sendPutOperation(key, value, transport, PUT_REQUEST, PUT_RESPONSE, lifespan, maxIdle, flags);
if (status != NO_ERROR_STATUS) {
throw new InvalidResponseException("Unexpected response status: " + Integer.toHexString(status));
}
return returnPossiblePrevValue(transport, flags);
} finally {
releaseTransport(transport);
}
}
public byte[] putIfAbsent(byte[] key, byte[] value, int lifespan, int maxIdle, Flag... flags) {
Transport transport = transportFactory.getTransport();
try {
short status = sendPutOperation(key, value, transport, PUT_IF_ABSENT_REQUEST, PUT_IF_ABSENT_RESPONSE, lifespan, maxIdle, flags);
if (status == NO_ERROR_STATUS || status == NOT_PUT_REMOVED_REPLACED_STATUS) {
return returnPossiblePrevValue(transport, flags);
}
} finally {
releaseTransport(transport);
}
throw new IllegalStateException("We should not reach here!");
}
public byte[] replace(byte[] key, byte[] value, int lifespan, int maxIdle, Flag... flags) {
Transport transport = transportFactory.getTransport();
try {
short status = sendPutOperation(key, value, transport, REPLACE_REQUEST, REPLACE_RESPONSE, lifespan, maxIdle, flags);
if (status == NO_ERROR_STATUS) {
return returnPossiblePrevValue(transport, flags);
} else if (status == NOT_PUT_REMOVED_REPLACED_STATUS) {
return null;
}
} finally {
releaseTransport(transport);
}
throw new IllegalStateException("We should not reach here!");
}
/**
* request : [header][key length][key][lifespan][max idle][entry_version][value length][value] response: If
* ForceReturnPreviousValue has been passed, this responses will contain previous [value length][value] for that key.
* If the key does not exist or previous was null, value length would be 0. Otherwise, if no ForceReturnPreviousValue
* was sent, the response would be empty.
*/
public VersionedOperationResponse replaceIfUnmodified(byte[] key, byte[] value, int lifespan, int maxIdle, long version, Flag... flags) {
Transport transport = transportFactory.getTransport();
try {
// 1) write header
long messageId = writeHeader(transport, REPLACE_IF_UNMODIFIED_REQUEST, flags);
//2) write message body
transport.writeArray(key);
transport.writeVInt(lifespan);
transport.writeVInt(maxIdle);
transport.writeLong(version);
transport.writeArray(value);
return returnVersionedOperationResponse(transport, messageId, REPLACE_IF_UNMODIFIED_RESPONSE, flags);
} finally {
releaseTransport(transport);
}
}
/**
* Request: [header][key length][key][entry_version]
*/
public VersionedOperationResponse removeIfUnmodified(byte[] key, long version, Flag... flags) {
Transport transport = transportFactory.getTransport();
try {
// 1) write header
long messageId = writeHeader(transport, REMOVE_IF_UNMODIFIED_REQUEST, flags);
//2) write message body
transport.writeArray(key);
transport.writeLong(version);
//process response and return
return returnVersionedOperationResponse(transport, messageId, REMOVE_IF_UNMODIFIED_RESPONSE, flags);
} finally {
releaseTransport(transport);
}
}
public void clear(Flag... flags) {
Transport transport = transportFactory.getTransport();
try {
// 1) write header
long messageId = writeHeader(transport, CLEAR_REQUEST, flags);
readHeaderAndValidate(transport, messageId, CLEAR_RESPONSE);
} finally {
releaseTransport(transport);
}
}
public Map<String, String> stats() {
Transport transport = transportFactory.getTransport();
try {
// 1) write header
long messageId = writeHeader(transport, STATS_REQUEST);
readHeaderAndValidate(transport, messageId, STATS_RESPONSE);
int nrOfStats = transport.readVInt();
Map<String, String> result = new HashMap<String, String>();
for (int i = 0; i < nrOfStats; i++) {
String statName = transport.readString();
String statValue = transport.readString();
result.put(statName, statValue);
}
return result;
} finally {
releaseTransport(transport);
}
}
@Override
public boolean ping() {
Transport transport = null;
try {
transport = transportFactory.getTransport();
// 1) write header
long messageId = writeHeader(transport, PING_REQUEST);
short respStatus = readHeaderAndValidate(transport, messageId, HotrodConstants.PING_RESPONSE);
if (respStatus == NO_ERROR_STATUS) {
return true;
}
throw new IllegalStateException("Unknown response status: " + Integer.toHexString(respStatus));
} catch (TransportException te) {
log.trace("Exception while ping", te);
return false;
}
finally {
releaseTransport(transport);
}
}
//[header][key length][key][lifespan][max idle][value length][value]
private short sendPutOperation(byte[] key, byte[] value, Transport transport, short opCode, byte opRespCode, int lifespan, int maxIdle, Flag[] flags) {
// 1) write header
long messageId = writeHeader(transport, opCode, flags);
// 2) write key and value
transport.writeArray(key);
transport.writeVInt(lifespan);
transport.writeVInt(maxIdle);
transport.writeArray(value);
transport.flush();
// 3) now read header
//return status (not error status for sure)
return readHeaderAndValidate(transport, messageId, opRespCode);
}
/*
* Magic | MessageId | Version | Opcode | CacheNameLength | CacheName | Flags | Client Intelligence | Topology Id
*/
private long writeHeader(Transport transport, short operationCode, Flag... flags) {
transport.writeByte(REQUEST_MAGIC);
long messageId = MSG_ID.incrementAndGet();
transport.writeVLong(messageId);
transport.writeByte(HOTROD_VERSION);
transport.writeByte(operationCode);
transport.writeArray(cacheNameBytes);
int flagInt = 0;
if (flags != null) {
for (Flag flag : flags) {
flagInt = flag.getFlagInt() | flagInt;
}
}
transport.writeVInt(flagInt);
transport.writeByte(clientIntelligence);
transport.writeVInt(TOPOLOGY_ID.get());
if (log.isTraceEnabled()) {
log.trace("wrote header for message " + messageId + ". Operation code: " + operationCode + ". Flags: " + Integer.toHexString(flagInt));
}
return messageId;
}
/**
* Magic | Message Id | Op code | Status | Topology Change Marker
*/
private short readHeaderAndValidate(Transport transport, long messageId, short opRespCode) {
short magic = transport.readByte();
if (magic != RESPONSE_MAGIC) {
String message = "Invalid magic number. Expected " + Integer.toHexString(RESPONSE_MAGIC) + " and received " + Integer.toHexString(magic);
log.error(message);
throw new InvalidResponseException(message);
}
long receivedMessageId = transport.readVLong();
if (receivedMessageId != messageId) {
String message = "Invalid message id. Expected " + Long.toHexString(messageId) + " and received " + Long.toHexString(receivedMessageId);
log.error(message);
throw new InvalidResponseException(message);
}
if (log.isTraceEnabled()) {
log.trace("Received response for message id: " + receivedMessageId);
}
short receivedOpCode = transport.readByte();
if (receivedOpCode != opRespCode) {
if (receivedOpCode == ERROR_RESPONSE) {
checkForErrorsInResponseStatus(transport.readByte(), messageId, transport);
throw new IllegalStateException("Error expected! (i.e. exception in the prev statement)");
}
throw new InvalidResponseException("Invalid response operation. Expected " + Integer.toHexString(opRespCode) + " and received " + Integer.toHexString(receivedOpCode));
}
if (log.isTraceEnabled()) {
log.trace("Received operation code is: " + receivedOpCode);
}
short status = transport.readByte();
checkForErrorsInResponseStatus(status, messageId, transport);
short topologyChangeByte = transport.readByte();
if (topologyChangeByte == 1) {
int newTopology = transport.readVInt();
TOPOLOGY_ID.set(newTopology);
int clusterSize = transport.readVInt();
List<InetSocketAddress> hotRodServers = new ArrayList<InetSocketAddress>(clusterSize);
for (int i = 0; i < clusterSize; i++) {
String host = transport.readString();
int port = transport.readUnsignedShort();
hotRodServers.add(new InetSocketAddress(host, port));
}
if (log.isInfoEnabled()) {
log.info("Received topology change response. New cluster size = " + clusterSize +", new topology id = " + newTopology + ", new topology " + hotRodServers);
}
transportFactory.updateServers(hotRodServers);
}
return status;
}
private void checkForErrorsInResponseStatus(short status, long messageId, Transport transport) {
if (log.isTraceEnabled()) {
log.trace("Received operation status: " + status);
}
switch ((int) status) {
case INVALID_MAGIC_OR_MESSAGE_ID_STATUS:
case REQUEST_PARSING_ERROR_STATUS:
case UNKNOWN_COMMAND_STATUS:
case SERVER_ERROR_STATUS:
case UNKNOWN_VERSION_STATUS: {
String msgFromServer = transport.readString();
if (log.isWarnEnabled()) {
log.warn("Error status received from the server:" + msgFromServer + " for message id " + messageId);
}
throw new HotRodClientException(msgFromServer, messageId, status);
}
case COMMAND_TIMEOUT_STATUS: {
if (log.isTraceEnabled()) {
log.trace("timeout message received from the server");
}
throw new TimeoutException();
}
case NO_ERROR_STATUS:
case KEY_DOES_NOT_EXIST_STATUS:
case NOT_PUT_REMOVED_REPLACED_STATUS: {
//don't do anything, these are correct responses
break;
}
default: {
throw new IllegalStateException("Unknown status: " + Integer.toHexString(status));
}
}
}
private boolean hasForceReturn(Flag[] flags) {
if (flags == null) return false;
for (Flag flag : flags) {
if (flag == Flag.FORCE_RETURN_VALUE) return true;
}
return false;
}
private short sendKeyOperation(byte[] key, Transport transport, byte opCode, Flag[] flags, byte opRespCode) {
// 1) write [header][key length][key]
long messageId = writeHeader(transport, opCode, flags);
transport.writeArray(key);
transport.flush();
// 2) now read the header
return readHeaderAndValidate(transport, messageId, opRespCode);
}
private byte[] returnPossiblePrevValue(Transport transport, Flag[] flags) {
return hasForceReturn(flags) ? transport.readArray() : null;
}
private void releaseTransport(Transport transport) {
if (transport != null)
transportFactory.releaseTransport(transport);
}
private VersionedOperationResponse returnVersionedOperationResponse(Transport transport, long messageId, byte response, Flag[] flags) {
//3) ...
short respStatus = readHeaderAndValidate(transport, messageId, response);
//4 ...
VersionedOperationResponse.RspCode code;
if (respStatus == NO_ERROR_STATUS) {
code = VersionedOperationResponse.RspCode.SUCCESS;
} else if (respStatus == NOT_PUT_REMOVED_REPLACED_STATUS) {
code = VersionedOperationResponse.RspCode.MODIFIED_KEY;
} else if (respStatus == KEY_DOES_NOT_EXIST_STATUS) {
code = VersionedOperationResponse.RspCode.NO_SUCH_KEY;
} else {
throw new IllegalStateException("Unknown response status: " + Integer.toHexString(respStatus));
}
byte[] prevValue = returnPossiblePrevValue(transport, flags);
return new VersionedOperationResponse(prevValue, code);
}
}