package com.cloudhopper.mq.broker;
/*
* #%L
* ch-mq
* %%
* Copyright (C) 2012 Cloudhopper by Twitter
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import com.cloudhopper.mq.broker.protocol.MaxTransferAttemptsCountException;
import com.cloudhopper.mq.broker.protocol.MaxTransferCountException;
import com.cloudhopper.mq.broker.protocol.TransferResponse;
import com.cloudhopper.mq.queue.Queue;
import com.cloudhopper.mq.queue.QueueTake;
import com.cloudhopper.mq.queue.QueueTimeoutException;
import com.cloudhopper.mq.transcoder.TranscoderWrapped;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
* @author joelauer, garth
*/
public class LocalToRemoteQueueProcessor implements RemoteQueueTransferListener, Runnable {
private static final Logger logger = LoggerFactory.getLogger(LocalToRemoteQueueProcessor.class);
private final DistributedQueueManager dqm;
private final Queue localQueue;
private final RemoteQueueInfo remoteQueue;
private final AtomicBoolean killed;
private final AtomicInteger concurrentRequests;
private final String name;
private final AsyncHttpClientFactory httpFactory;
public LocalToRemoteQueueProcessor(DistributedQueueManager dqm, Queue localQueue,
RemoteQueueInfo remoteQueue, AsyncHttpClientFactory httpFactory) {
super();
this.name = "CHMQ-LocalToRemoteQueueProcessor-" + remoteQueue.getName();
this.dqm = dqm;
this.localQueue = localQueue;
this.remoteQueue = remoteQueue;
this.killed = new AtomicBoolean(false);
this.concurrentRequests = new AtomicInteger(0);
this.httpFactory = httpFactory;
}
public Queue getLocalQueue() {
return this.localQueue;
}
public void start() {
logger.info("[{}] New LocalToRemoteQueueProcessor started for {}", localQueue.getName(), getName());
}
public boolean isAlive() {
return !this.killed.get();
}
public String getName() {
return this.name;
}
public int getConcurrentRequests() {
return concurrentRequests.get();
}
/**
* Kills the processor. Will only execute an "interrupt" if its current
* state is not set to killed.
*/
public void kill() {
logger.debug("[{}] Processor [{}]: kill() called", localQueue.getName(), this.getName());
boolean changed = this.killed.compareAndSet(false, true);
if (changed) {
logger.info("[{}] Processor thread [{}]: kill flag set to true and interrupting thread", localQueue.getName(), this.getName());
// this.interrupt();
} else {
logger.warn("[{}] Processor thread [{}]: kill flag already set to true; ignoring kill; no interrupt (duplicate kill request perhaps?)", localQueue.getName(), this.getName());
}
}
public boolean isKilled() {
return this.killed.get();
}
public void notifyRemoteBrokerIsNoLongerAvailable(String remoteBrokerUrl, String errorMessage) {
dqm.getDistributedQueueState().updateRemoteBrokerState(remoteBrokerUrl, RemoteBrokerInfo.STATE_NOT_AVAILABLE, errorMessage, System.currentTimeMillis());
}
public void notifyQueueOnRemoteBrokerIsNoLongerAvailable(String remoteBrokerUrl, String queueName) {
dqm.getDistributedQueueState().updateRemoteQueueWeight(queueName, remoteBrokerUrl, 0);
}
@Override
@SuppressWarnings("unchecked")
public void run() {
try {
run(null);
} catch (Exception e) {
logger.warn("["+localQueue.getName()+"] Error running LocalToRemoteQueueProcesor: ", e);
}
}
public void run(final CompletionHandler completionHandler) {
if (!this.killed.get()) {
// is the remote queue still available?
if (remoteQueue.isNotAvailable()) {
logger.debug("[{}] Remote queue [{}] no longer available, exiting processor", localQueue.getName(), remoteQueue.getName());
if (completionHandler != null) completionHandler.onFailure();
return;
}
// the item we are going to transfer
Object item = null;
try {
// take the item on the local queue
// Option 1: Don't wait on the take. This may produce multiple null results, as it won't wait on the
// queue for other operations (e.g. puts by other threads) to complete before returning null.
//item = localQueue.getTemporarySession().take(0l);
// Option 2: Exits on timeout. (because of put contention or takes from another thread).
// Reschedules if queueSize is still non-zero.
QueueTake take = localQueue.getTemporarySession();
try {
item = take.take(dqm.getConfiguration().getRemotingTakeTimeout());
} catch (QueueTimeoutException te) {
dqm.incrementRemotingTakeTimeouts();
logger.debug("[{}] Timeout waiting to take from Queue. Size = {}. Item = {}", new Object[] { localQueue.getName(), localQueue.getSize(), item });
}
} catch (InterruptedException e) {
logger.debug("[{}] Processor interrupted while attempting take() from local Queue", localQueue.getName());
if (completionHandler != null) completionHandler.onFailure();
return;
} catch (Exception e) {
logger.debug("Error taking from queue {}", localQueue);
logger.error("["+localQueue.getName()+"]Unable to cleanly take() item from local Queue", e);
if (completionHandler != null) completionHandler.onFailure();
return;
}
// make sure item wasn't null
if (item == null) {
// this is okay, and will occur during periods of multiple queues' heavy load
dqm.incrementRemotingTakeNulls();
logger.trace("[{}] Item was null from local Queue", localQueue.getName());
if (completionHandler != null) completionHandler.onFailure();
return;
}
// transcode item.... (should be impossible to fail)
//byte[] data = localQueue.getTranscoder().encode(item);
// at this point, we "took" an item from the local queue, we now
// will attempt to transfer this item to any queues that are available
// prepare everything for the transfer request
// final TransferItem transferItem = new TransferItem(item, data, localQueue.getElementType());
final TransferItem transferItem = new TransferItem(item, localQueue);
AsyncRemoteQueueTransfer transfer = new AsyncRemoteQueueTransfer(httpFactory.getClient(),
dqm.getConfiguration(), this,
remoteQueue, transferItem);
this.concurrentRequests.incrementAndGet();
transfer.transfer(new TransferResponseHandler() {
public void onThrowable(Throwable e) {
try {
if (e instanceof MaxTransferCountException) {
logger.error("["+localQueue.getName()+"]", e);
} else if (e instanceof MaxTransferAttemptsCountException) {
logger.error("["+localQueue.getName()+"]", e);
} else if (e instanceof NoMoreRemoteBrokersException) {
// try our best to put this item back onto the queue
logger.warn("[{}] No more remote brokers for item, going to put it back on the queue (remoteQueue.brokerSize={})", localQueue.getName(), remoteQueue.getSize());
try {
localQueue.getTemporarySession().put(transferItem.getItem());
} catch (InterruptedException ex) {
// hmmm.. we already set the "killed" flag to true earlier which
// should actually have blocked any external interrupt of this thread
logger.error("[{}] Processor interrupted while attempting put() item back onto Queue", localQueue.getName());
} catch (Exception ex) {
logger.error("["+localQueue.getName()+"] Unable to cleanly put() item back onto local Queue", e);
}
// hmm.. there is a synchronization problem with how this
// all works -- if "NoMoreRemoteBrokers" was thrown, but a monitoring
// event occurred that changed the distributed queue state
}
} finally {
concurrentRequests.decrementAndGet();
if (completionHandler != null) completionHandler.onFailure();
}
}
public void onComplete(TransferResponse response) {
try {
// Don't need to do anything with the response
logger.trace("[{}] Completed response: {}", localQueue.getName(), response);
} finally {
concurrentRequests.decrementAndGet();
if (completionHandler != null) completionHandler.onSuccess();
}
}
});
}
}
}