Package org.infinispan.distribution

Source Code of org.infinispan.distribution.TransactionLoggerImpl

/*
* JBoss, Home of Professional Open Source
* Copyright 2009 Red Hat Inc. and/or its affiliates and other
* contributors as indicated by the @author tags. All rights reserved.
* See the copyright.txt in the distribution for a full listing of
* individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
package org.infinispan.distribution;

import org.infinispan.commands.CommandsFactory;
import org.infinispan.commands.tx.CommitCommand;
import org.infinispan.commands.tx.PrepareCommand;
import org.infinispan.commands.tx.RollbackCommand;
import org.infinispan.commands.write.WriteCommand;
import org.infinispan.context.Flag;
import org.infinispan.context.InvocationContext;
import org.infinispan.context.impl.TxInvocationContext;
import org.infinispan.transaction.xa.GlobalTransaction;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;

import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import static java.util.Arrays.asList;

/**
* A transaction logger to log ongoing transactions in an efficient and thread-safe manner while a rehash is going on.
* <p/>
* Transaction logs can then be replayed after the state transferred during a rehash has been written.
*
* @author Manik Surtani
* @author Dan Berindei <dberinde@redhat.com>
* @since 4.0
*/
public class TransactionLoggerImpl implements TransactionLogger {
   private static final Log log = LogFactory.getLog(TransactionLoggerImpl.class);
   private static final boolean trace = log.isTraceEnabled();

   // A low number.  If we have less than this number of transactions, lock anyway.
   private static final int DRAIN_LOCK_THRESHOLD = 25;
   // If we see the queue growing this many times, lock.
   private static final int GROWTH_COUNT_THRESHOLD = 3;
   private int previousSize, growthCount;

   private volatile boolean loggingEnabled;
   // This lock is used to block new transactions during rehash
   // Write commands must acquire the read lock for the duration of the command
   // We acquire the write lock to block new transactions
   // That means we wait for pending write commands to finish, and we might have to wait a lot if
   // a command is deadlocked
   // TODO Find a way to interrupt all transactions waiting for answers from remote nodes, instead
   // of waiting for all of them to finish
   private ReentrantReadWriteLock txLock = new ReentrantReadWriteLock();

   final BlockingQueue<WriteCommand> commandQueue = new LinkedBlockingQueue<WriteCommand>();
   final Map<GlobalTransaction, PrepareCommand> uncommittedPrepares = new ConcurrentHashMap<GlobalTransaction, PrepareCommand>();

   private final CommandsFactory cf;

   public TransactionLoggerImpl(CommandsFactory cf) {
      this.cf = cf;
   }

   @Override
   public void enable() {
      loggingEnabled = true;
   }

   @Override
   public List<WriteCommand> drain() {
      List<WriteCommand> list = new LinkedList<WriteCommand>();
      commandQueue.drainTo(list);
      return list;
   }

   @Override
   public List<WriteCommand> drainAndLock() throws InterruptedException {
      blockNewTransactions();
      return drain();
   }

   @Override
   public void unlockAndDisable() {
      loggingEnabled = false;
      uncommittedPrepares.clear();
      unblockNewTransactions();
   }

   @Override
   public void afterCommand(InvocationContext ctx, WriteCommand command) throws InterruptedException {
      // for transactions the real work starts with the prepare command, so don't log anything here
      if (ctx.isInTxScope())
         return;

      if (!ctx.hasFlag(Flag.SKIP_LOCKING))
         txLock.readLock().unlock();

      if (loggingEnabled && command.isSuccessful()) {
         commandQueue.put(command);
      }
   }

   @Override
   public void afterCommand(TxInvocationContext ctx, PrepareCommand command) throws InterruptedException {
      if (!ctx.hasFlag(Flag.SKIP_LOCKING))
         txLock.readLock().unlock();

      if (loggingEnabled) {
         if (command.isOnePhaseCommit())
            logModificationsInTransaction(command);
         else
            uncommittedPrepares.put(command.getGlobalTransaction(), command);
      }
   }

   @Override
   public void afterCommand(TxInvocationContext ctx, CommitCommand command) throws InterruptedException {
      if (!ctx.hasFlag(Flag.SKIP_LOCKING))
         txLock.readLock().unlock();

      if (loggingEnabled) {
         PrepareCommand pc = uncommittedPrepares.remove(command.getGlobalTransaction());
         if (pc == null)
            logModifications(ctx.getModifications());
         else
            logModificationsInTransaction(pc);
      }
   }

   @Override
   public void afterCommand(TxInvocationContext ctx, RollbackCommand command) {
      if (!ctx.hasFlag(Flag.SKIP_LOCKING))
         txLock.readLock().unlock();

      if (loggingEnabled) {
         uncommittedPrepares.remove(command.getGlobalTransaction());
      }
   }

   private void logModificationsInTransaction(PrepareCommand command) throws InterruptedException {
      logModifications(asList(command.getModifications()));
   }

   private void logModifications(Collection<WriteCommand> mods) throws InterruptedException {
      for (WriteCommand wc : mods) {
         commandQueue.put(wc);
      }
   }

   @Override
   public void beforeCommand(InvocationContext ctx, WriteCommand command) throws InterruptedException {
      // for transactions the real work starts with the prepare command, so don't block here
      if (ctx.isInTxScope())
         return;

      if (!ctx.hasFlag(Flag.SKIP_LOCKING))
         txLock.readLock().lock();
   }

   @Override
   public void beforeCommand(TxInvocationContext ctx, PrepareCommand command) throws InterruptedException {
      if (!ctx.hasFlag(Flag.SKIP_LOCKING))
         txLock.readLock().lock();
   }

   @Override
   public void beforeCommand(TxInvocationContext ctx, CommitCommand command) throws InterruptedException {
      if (!ctx.hasFlag(Flag.SKIP_LOCKING))
         txLock.readLock().lock();

      // if the prepare command wasn't logged, do it here instead
      if (loggingEnabled) {
         GlobalTransaction gtx;
         if (!uncommittedPrepares.containsKey(gtx = ctx.getGlobalTransaction()))
            uncommittedPrepares.put(gtx, cf.buildPrepareCommand(gtx, ctx.getModifications(), false));
      }
   }

   @Override
   public void beforeCommand(TxInvocationContext ctx, RollbackCommand command) throws InterruptedException {
      if (!ctx.hasFlag(Flag.SKIP_LOCKING))
         txLock.readLock().lock();
   }

   private int size() {
      return loggingEnabled ? commandQueue.size() : 0;
   }

   @Override
   public boolean isEnabled() {
      return loggingEnabled;
   }

   @Override
   public boolean shouldDrainWithoutLock() {
      if (loggingEnabled) {
         int sz = size();
         boolean shouldLock = (previousSize > 0 && growthCount > GROWTH_COUNT_THRESHOLD) || sz < DRAIN_LOCK_THRESHOLD;
         if (!shouldLock) {
            if (sz > previousSize && previousSize > 0) growthCount++;
            previousSize = sz;
            return true;
         } else {
            return false;
         }
      } else return false;
   }

   @Override
   public Collection<PrepareCommand> getPendingPrepares() {
      Collection<PrepareCommand> commands = new HashSet<PrepareCommand>(uncommittedPrepares.values());
      uncommittedPrepares.clear();
      return commands;
   }

   @Override
   public void blockNewTransactions() throws InterruptedException {
      if (trace) log.debug("Blocking new transactions");
      // we just want to ensure that all the modifications that passed through the tx gate have ended
      txLock.writeLock().lockInterruptibly();
   }

   @Override
   public void unblockNewTransactions() {
      if (trace) log.debug("Unblocking new transactions");
      txLock.writeLock().unlock();
   }


   @Override
   public String toString() {
      return "TransactionLoggerImpl{" +
            "commandQueue=" + commandQueue +
            ", uncommittedPrepares=" + uncommittedPrepares +
            '}';
   }
}
TOP

Related Classes of org.infinispan.distribution.TransactionLoggerImpl

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.