Package com.cloudhopper.mq.broker

Source Code of com.cloudhopper.mq.broker.LocalToRemoteQueueProcessor

package com.cloudhopper.mq.broker;

/*
* #%L
* ch-mq
* %%
* Copyright (C) 2012 Cloudhopper by Twitter
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*      http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import com.cloudhopper.mq.broker.protocol.MaxTransferAttemptsCountException;
import com.cloudhopper.mq.broker.protocol.MaxTransferCountException;
import com.cloudhopper.mq.broker.protocol.TransferResponse;
import com.cloudhopper.mq.queue.Queue;
import com.cloudhopper.mq.queue.QueueTake;
import com.cloudhopper.mq.queue.QueueTimeoutException;
import com.cloudhopper.mq.transcoder.TranscoderWrapped;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
*
* @author joelauer, garth
*/
public class LocalToRemoteQueueProcessor implements RemoteQueueTransferListener, Runnable {

    private static final Logger logger = LoggerFactory.getLogger(LocalToRemoteQueueProcessor.class);
    private final DistributedQueueManager dqm;
    private final Queue localQueue;
    private final RemoteQueueInfo remoteQueue;
    private final AtomicBoolean killed;
    private final AtomicInteger concurrentRequests;
    private final String name;
    private final AsyncHttpClientFactory httpFactory;

    public LocalToRemoteQueueProcessor(DistributedQueueManager dqm, Queue localQueue,
               RemoteQueueInfo remoteQueue, AsyncHttpClientFactory httpFactory) {
        super();
  this.name = "CHMQ-LocalToRemoteQueueProcessor-" + remoteQueue.getName();
        this.dqm = dqm;
        this.localQueue = localQueue;
        this.remoteQueue = remoteQueue;
        this.killed = new AtomicBoolean(false);
  this.concurrentRequests = new AtomicInteger(0);
  this.httpFactory = httpFactory;
    }

    public Queue getLocalQueue() {
  return this.localQueue;
    }

    public void start() {
  logger.info("[{}] New LocalToRemoteQueueProcessor started for {}", localQueue.getName(), getName());
    }

    public boolean isAlive() {
  return !this.killed.get();
    }

    public String getName() {
  return this.name;
    }

    public int getConcurrentRequests() {
  return concurrentRequests.get();
    }

    /**
     * Kills the processor.  Will only execute an "interrupt" if its current
     * state is not set to killed.
     */
    public void kill() {
        logger.debug("[{}] Processor [{}]: kill() called", localQueue.getName(), this.getName());
        boolean changed = this.killed.compareAndSet(false, true);
        if (changed) {
            logger.info("[{}] Processor thread [{}]: kill flag set to true and interrupting thread", localQueue.getName(), this.getName());
            // this.interrupt();
        } else {
            logger.warn("[{}] Processor thread [{}]: kill flag already set to true; ignoring kill; no interrupt (duplicate kill request perhaps?)", localQueue.getName(), this.getName());
        }
    }

    public boolean isKilled() {
        return this.killed.get();
    }

    public void notifyRemoteBrokerIsNoLongerAvailable(String remoteBrokerUrl, String errorMessage) {
        dqm.getDistributedQueueState().updateRemoteBrokerState(remoteBrokerUrl, RemoteBrokerInfo.STATE_NOT_AVAILABLE, errorMessage, System.currentTimeMillis());
    }

    public void notifyQueueOnRemoteBrokerIsNoLongerAvailable(String remoteBrokerUrl, String queueName) {
        dqm.getDistributedQueueState().updateRemoteQueueWeight(queueName, remoteBrokerUrl, 0);
    }

    @Override
    @SuppressWarnings("unchecked")
    public void run() {
  try {
      run(null);
  } catch (Exception e) {
      logger.warn("["+localQueue.getName()+"] Error running LocalToRemoteQueueProcesor: ", e);
  }
    }
   
    public void run(final CompletionHandler completionHandler) {
  if (!this.killed.get()) {
      // is the remote queue still available?
      if (remoteQueue.isNotAvailable()) {
    logger.debug("[{}] Remote queue [{}] no longer available, exiting processor", localQueue.getName(), remoteQueue.getName());
    if (completionHandler != null) completionHandler.onFailure();
    return;
      }
     
      // the item we are going to transfer
      Object item = null;
     
      try {
    // take the item on the local queue
   
    // Option 1: Don't wait on the take. This may produce multiple null results, as it won't wait on the
    //           queue for other operations (e.g. puts by other threads) to complete before returning null.
    //item = localQueue.getTemporarySession().take(0l);
   
    // Option 2: Exits on timeout. (because of put contention or takes from another thread).
    //           Reschedules if queueSize is still non-zero.
    QueueTake take = localQueue.getTemporarySession();
    try {
        item = take.take(dqm.getConfiguration().getRemotingTakeTimeout());
    } catch (QueueTimeoutException te) {
        dqm.incrementRemotingTakeTimeouts();
        logger.debug("[{}] Timeout waiting to take from Queue. Size = {}. Item = {}", new Object[] { localQueue.getName(), localQueue.getSize(), item });
    }

      } catch (InterruptedException e) {
    logger.debug("[{}] Processor interrupted while attempting take() from local Queue", localQueue.getName());
    if (completionHandler != null) completionHandler.onFailure();
    return;
      } catch (Exception e) {
    logger.debug("Error taking from queue {}", localQueue);
    logger.error("["+localQueue.getName()+"]Unable to cleanly take() item from local Queue", e);
    if (completionHandler != null) completionHandler.onFailure();
    return;
      }
     
      // make sure item wasn't null
      if (item == null) {
    // this is okay, and will occur during periods of multiple queues' heavy load
    dqm.incrementRemotingTakeNulls();
    logger.trace("[{}] Item was null from local Queue", localQueue.getName());
    if (completionHandler != null) completionHandler.onFailure();
    return;
      }
     
      // transcode item.... (should be impossible to fail)
      //byte[] data = localQueue.getTranscoder().encode(item);
     
      // at this point, we "took" an item from the local queue, we now
      // will attempt to transfer this item to any queues that are available
      // prepare everything for the transfer request
      // final TransferItem transferItem = new TransferItem(item, data, localQueue.getElementType());
      final TransferItem transferItem = new TransferItem(item, localQueue);
      AsyncRemoteQueueTransfer transfer = new AsyncRemoteQueueTransfer(httpFactory.getClient(),
                       dqm.getConfiguration(), this,
                       remoteQueue, transferItem);
      this.concurrentRequests.incrementAndGet();
      transfer.transfer(new TransferResponseHandler() {
       
        public void onThrowable(Throwable e) {
      try {
          if (e instanceof MaxTransferCountException) {
        logger.error("["+localQueue.getName()+"]", e);
          } else if (e instanceof MaxTransferAttemptsCountException) {
        logger.error("["+localQueue.getName()+"]", e);
          } else if (e instanceof NoMoreRemoteBrokersException) {
        // try our best to put this item back onto the queue
        logger.warn("[{}] No more remote brokers for item, going to put it back on the queue (remoteQueue.brokerSize={})", localQueue.getName(), remoteQueue.getSize());
        try {
            localQueue.getTemporarySession().put(transferItem.getItem());
        } catch (InterruptedException ex) {
            // hmmm.. we already set the "killed" flag to true earlier which
            // should actually have blocked any external interrupt of this thread
            logger.error("[{}] Processor interrupted while attempting put() item back onto Queue", localQueue.getName());
        } catch (Exception ex) {
            logger.error("["+localQueue.getName()+"] Unable to cleanly put() item back onto local Queue", e);
        }
        // hmm.. there is a synchronization problem with how this
        // all works -- if "NoMoreRemoteBrokers" was thrown, but a monitoring
        // event occurred that changed the distributed queue state
          }
      } finally {
          concurrentRequests.decrementAndGet();
          if (completionHandler != null) completionHandler.onFailure();
      }
        }
       
        public void onComplete(TransferResponse response) {
      try {
          // Don't need to do anything with the response
          logger.trace("[{}] Completed response: {}", localQueue.getName(), response);
      } finally {
          concurrentRequests.decrementAndGet();
          if (completionHandler != null) completionHandler.onSuccess();
      }
        }
    });     
        }
    }

}
TOP

Related Classes of com.cloudhopper.mq.broker.LocalToRemoteQueueProcessor

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.