Package org.radargun.stages.cache.background

Source Code of org.radargun.stages.cache.background.LogChecker$AbstractStressorRecord

package org.radargun.stages.cache.background;

import java.io.Serializable;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReferenceArray;

import org.radargun.logging.Log;
import org.radargun.logging.LogFactory;
import org.radargun.stages.cache.generators.KeyGenerator;
import org.radargun.traits.BasicOperations;
import org.radargun.traits.CacheListeners;
import org.radargun.traits.Debugable;
import org.radargun.utils.Utils;

/**
* Log checkers control that all operations executed by stressors are persisted in the log values.
* Each node checks all writes from all stressors, but there's not a one-to-one stressor-checker
* relation. Instead, each node holds a pool of checker threads and a shared data structure
* with records about each stressor. All records are iterated through in a round-robin fashion
* by the checker threads.
*
* When the checkers are dead on particular node, this node cannot check the stressors. For some
* scenarios this is limiting - therefore, stressors may be configured to unwind the log values
* even if the old records are not checked. Then, it has to notify the checker about this action
* via ignored_* key, to prevent it from failing the test.
*
* @see AbstractLogLogic
* @see Stressor
*
* @author Radim Vansa <rvansa@redhat.com>
*/
public abstract class LogChecker extends Thread {
   protected static final Log log = LogFactory.getLog(LogChecker.class);
   protected static final boolean trace = log.isTraceEnabled();
   protected static final long UNSUCCESSFUL_CHECK_MIN_DELAY_MS = 10;
   protected static final String LAST_OPERATION_PREFIX = "stressor_";
   protected final KeyGenerator keyGenerator;
   protected final int slaveIndex;
   protected final long logCounterUpdatePeriod;
   protected final Pool pool;
   protected final BasicOperations.Cache basicCache;
   protected final Debugable.Cache debugableCache;
   protected volatile boolean terminate = false;

   public LogChecker(String name, BackgroundOpsManager manager, Pool logCheckerPool) {
      super(name);
      keyGenerator = manager.getKeyGenerator();
      slaveIndex = manager.getSlaveIndex();
      logCounterUpdatePeriod = manager.getLogLogicConfiguration().getCounterUpdatePeriod();
      pool = logCheckerPool;
      this.basicCache = manager.getBasicCache();
      this.debugableCache = manager.getDebugableCache();
   }

   public static String checkerKey(int checkerSlaveId, int slaveAndThreadId) {
      return String.format("checker_%d_%d", checkerSlaveId, slaveAndThreadId);
   }

   public static String ignoredKey(int checkerSlaveId, int slaveAndThreadId) {
      return String.format("ignored_%d_%d", checkerSlaveId, slaveAndThreadId);
   }

   public static String lastOperationKey(int slaveAndThreadId) {
      return String.format(LAST_OPERATION_PREFIX + "%d", slaveAndThreadId);
   }

   public void requestTerminate() {
      terminate = true;
   }

   @Override
   public void run() {
      int delayedKeys = 0;
      while (!terminate) {
         AbstractStressorRecord record = null;
         try {
            if (delayedKeys > pool.getTotalThreads()) {
               Thread.sleep(UNSUCCESSFUL_CHECK_MIN_DELAY_MS);
            }
            record = pool.take();
            if (System.currentTimeMillis() < record.getLastUnsuccessfulCheckTimestamp() + UNSUCCESSFUL_CHECK_MIN_DELAY_MS) {
               delayedKeys++;
               continue;
            }
            delayedKeys = 0;
            if (record.getLastUnsuccessfulCheckTimestamp() > Long.MIN_VALUE) {
               // the last check was unsuccessful -> grab lastOperation BEFORE the value to check if we've lost that
               Object last = basicCache.get(lastOperationKey(record.getThreadId()));
               if (last != null) {
                  record.setLastStressorOperation(((LastOperation) last).getOperationId());
               }
            }
            if (record.getOperationId() == 0) {
               Object last = basicCache.get(checkerKey(slaveIndex, record.getThreadId()));
               if (last != null) {
                  LastOperation lastCheck = (LastOperation) last;
                  record = newRecord(record, lastCheck.getOperationId(), lastCheck.getSeed());
               }
               Object ignored = basicCache.get(ignoredKey(slaveIndex, record.getThreadId()));
               if (ignored != null && record.getOperationId() <= (Long) ignored) {
                  log.debugf("Ignoring operations %d - %d for thread %d", record.getOperationId(), ignored, record.getThreadId());
                  while (record.getOperationId() <= (Long) ignored) {
                     record.next();
                  }
               }
               if (record.getOperationId() != 0) {
                  log.debugf("Check for thread %d continues from operation %d",
                        record.getThreadId(), record.getOperationId());
               }
            }
            if (trace) {
               log.tracef("Checking operation %d for thread %d on key %d (%s)",
                     record.getOperationId(), record.getThreadId(), record.getKeyId(), keyGenerator.generateKey(record.getKeyId()));
            }
            boolean notification = record.hasNotification(record.getOperationId());
            Object value = findValue(record);
            boolean contains = containsOperation(value, record);
            if (notification && contains) {
               if (trace) {
                  log.tracef("Found operation %d for thread %d", record.getOperationId(), record.getThreadId());
               }
               if (record.getOperationId() % logCounterUpdatePeriod == 0) {
                  basicCache.put(checkerKey(slaveIndex, record.getThreadId()),
                        new LastOperation(record.getOperationId(), Utils.getRandomSeed(record.rand)));
               }
               record.next();
               record.setLastUnsuccessfulCheckTimestamp(Long.MIN_VALUE);
               pool.reportStoredOperation();
            } else {
               if (record.getLastStressorOperation() >= record.getOperationId()) {
                  // one more check to see whether some operations should not be ignored
                  Object ignored = basicCache.get(ignoredKey(slaveIndex, record.getThreadId()));
                  if (ignored != null && record.getOperationId() <= (Long) ignored) {
                     log.debugf("Operations %d - %d for thread %d are ignored.", record.getOperationId(), ignored, record.threadId);
                     while (record.getOperationId() <= (Long) ignored) {
                        record.next();
                     }
                     continue;
                  }

                  if (!notification) {
                     log.errorf("Missing notification for operation %d for thread %d on key %d (%s), required for %d, notified for %s",
                           record.getOperationId(), record.getThreadId(), record.getKeyId(),
                           keyGenerator.generateKey(record.getKeyId()), record.requireNotify, record.notifiedOps);
                     pool.reportMissingNotification();
                  }
                  if (!contains) {
                     log.errorf("Missing operation %d for thread %d on key %d (%s) %s",
                           record.getOperationId(), record.getThreadId(), record.getKeyId(),
                           keyGenerator.generateKey(record.getKeyId()),
                           value == null ? " - entry was completely lost" : "");
                     if (trace) {
                        log.trace("Not found in " + value);
                     }
                     pool.reportMissingOperation();
                     if (debugableCache != null) {
                        debugableCache.debugInfo();
                        debugableCache.debugKey(keyGenerator.generateKey(record.getKeyId()));
                        debugableCache.debugKey(keyGenerator.generateKey(~record.getKeyId()));
                     }
                  }
                  record.next();
               } else {
                  record.setLastUnsuccessfulCheckTimestamp(System.currentTimeMillis());
               }
            }
         } catch (Exception e) {
            log.error("Cannot check value " + record.getKeyId(), e);
         } finally {
            if (record == null) {
               try {
                  Thread.sleep(100);
               } catch (InterruptedException e) {
                  Thread.interrupted();
               }
            } else {
               pool.add(record);
            }
         }
      }
   }

   protected abstract AbstractStressorRecord newRecord(AbstractStressorRecord record, long operationId, long seed);

   protected abstract Object findValue(AbstractStressorRecord record) throws Exception;

   protected abstract boolean containsOperation(Object value, AbstractStressorRecord record);

   public static abstract class Pool implements CacheListeners.UpdatedListener, CacheListeners.CreatedListener {
      private final int totalThreads;
      private final AtomicReferenceArray<AbstractStressorRecord> allRecords;
      private final ConcurrentLinkedQueue<AbstractStressorRecord> records = new ConcurrentLinkedQueue<AbstractStressorRecord>();
      private final BackgroundOpsManager manager;
      private final AtomicLong missingOperations = new AtomicLong();
      private final AtomicLong missingNotifications = new AtomicLong();
      private final BasicOperations.Cache cache;
      private volatile long lastStoredOperationTimestamp = Long.MIN_VALUE;

      public Pool(int numThreads, int numSlaves, BackgroundOpsManager manager) {
         totalThreads = numThreads * numSlaves;
         allRecords = new AtomicReferenceArray<AbstractStressorRecord>(totalThreads);
         log.trace("Pool will contain " + allRecords.length() + records);
         this.manager = manager;
         this.cache = manager.getBasicCache();
      }

      protected void registerListeners() {
         if (!manager.getLogLogicConfiguration().isCheckNotifications()) {
            return;
         }
         CacheListeners listeners = manager.getListeners();
         if (listeners == null) {
            throw new IllegalArgumentException("Service does not support cache listeners");
         }
         Collection<CacheListeners.Type> supported = listeners.getSupportedListeners();
         if (!supported.containsAll(Arrays.asList(CacheListeners.Type.CREATED, CacheListeners.Type.UPDATED))) {
            throw new IllegalArgumentException("Service does not support required listener types; supported are: " + supported);
         }
         String cacheName = manager.getGeneralConfiguration().getCacheName();
         manager.getListeners().addCreatedListener(cacheName, this);
         manager.getListeners().addUpdatedListener(cacheName, this);
      }

      public long getMissingOperations() {
         return missingOperations.get();
      }

      public long getMissingNotifications() {
         return missingNotifications.get();
      }

      public void reportMissingOperation() {
         missingOperations.incrementAndGet();
      }

      public void reportMissingNotification() {
         missingNotifications.incrementAndGet();
      }

      public int getTotalThreads() {
         return totalThreads;
      }

      public void reportStoredOperation() {
         lastStoredOperationTimestamp = System.currentTimeMillis();
      }

      public long getLastStoredOperationTimestamp() {
         return lastStoredOperationTimestamp;
      }

      public AbstractStressorRecord take() {
         return records.poll();
      }

      public void add(AbstractStressorRecord record) {
         records.add(record);
      }

      public void addNew(AbstractStressorRecord record) {
         records.add(record);
         allRecords.set(record.getThreadId(), record);
      }

      public String waitUntilChecked(long timeout) {
         for (int i = 0; i < totalThreads; ++i) {
            AbstractStressorRecord record = allRecords.get(i);
            if (record == null) continue;
            try {
               LastOperation lastOperation = (LastOperation) cache.get(lastOperationKey(record.getThreadId()));
               if (lastOperation == null) {
                  log.trace("Thread " + record.getThreadId() + " has no recorded operation.");
               } else {
                  record.setLastStressorOperation(lastOperation.getOperationId());
               }
            } catch (Exception e) {
               log.error("Failed to read last operation key for thread " + record.getThreadId(), e);
            }
         }
         for (;;) {
            boolean allChecked = true;
            for (int i = 0; i < totalThreads; ++i) {
               AbstractStressorRecord record = allRecords.get(i);
               if (record == null) continue;
               if (record.getOperationId() <= record.getLastStressorOperation()) {
                  if (log.isTraceEnabled()) {
                     log.trace(String.format("Currently checked operation for thread %d is %d (key id %08X), last written is %d",
                           record.getThreadId(), record.getOperationId(), record.getKeyId(), record.getLastStressorOperation()));
                  }
                  allChecked = false;
                  break;
               }
            }
            if (lastStoredOperationTimestamp + timeout < System.currentTimeMillis()) {
               String error = "Waiting for checkers timed out after " + (System.currentTimeMillis() - lastStoredOperationTimestamp) + " ms";
               log.error(error);
               return error;
            }
            if (allChecked) {
               StringBuilder sb = new StringBuilder("All checks OK: ");
               for (int i = 0; i < totalThreads; ++i) {
                  AbstractStressorRecord record = allRecords.get(i);
                  if (record == null) continue;
                  sb.append(record.getThreadId()).append("# ")
                        .append(record.getOperationId()).append(" (")
                        .append(record.getLastStressorOperation()).append("), ");
               }
               log.debug(sb.toString());
               return null;
            }
            try {
               Thread.sleep(1000);
            } catch (InterruptedException e) {
               log.error("Interrupted waiting for checkers.", e);
               return e.toString();
            }
         }
      }

      protected void notify(int threadId, long operationId, Object key) {
         AbstractStressorRecord record = allRecords.get(threadId);
         record.notify(operationId, key);
      }

      protected void requireNotify(int threadId, long operationId) {
         AbstractStressorRecord record = allRecords.get(threadId);
         record.requireNotify(operationId);
      }

      protected void modified(Object key, Object value) {
         if (key instanceof String && ((String) key).startsWith(LAST_OPERATION_PREFIX)) {
            int threadId = Integer.parseInt(((String) key).substring(LAST_OPERATION_PREFIX.length()));
            LastOperation last = (LastOperation) value;
            requireNotify(threadId, last.getOperationId() + 1);
         }
      }
   }

   public static class LastOperation implements Serializable {
      private long operationId;
      private long seed;

      public LastOperation(long operationId, long seed) {
         this.operationId = operationId;
         this.seed = seed;
      }

      public long getOperationId() {
         return operationId;
      }

      public long getSeed() {
         return seed;
      }

      @Override
      public String toString() {
         return String.format("LastOperation{operationId=%d, seed=%016X}", operationId, seed);
      }
   }

   protected abstract static class AbstractStressorRecord {
      protected final Random rand;
      protected final int threadId;
      protected long currentKeyId;
      protected volatile long currentOp = -1;
      private long lastStressorOperation = -1;
      private long lastUnsuccessfulCheckTimestamp = Long.MIN_VALUE;
      private Set<Long> notifiedOps = new HashSet<Long>();
      private long requireNotify = Long.MAX_VALUE;

      public AbstractStressorRecord(long seed, int threadId, long operationId) {
         log.trace("Initializing record random with " + seed);
         this.rand = Utils.setRandomSeed(new Random(0), seed);
         this.threadId = threadId;
         this.currentOp = operationId;
      }

      public AbstractStressorRecord(Random rand, int threadId) {
         log.trace("Initializing record random with " + Utils.getRandomSeed(rand));
         this.rand = rand;
         this.threadId = threadId;
      }

      public abstract void next();

      public int getThreadId() {
         return threadId;
      }

      public long getLastStressorOperation() {
         return lastStressorOperation;
      }

      public void setLastStressorOperation(long lastStressorOperation) {
         this.lastStressorOperation = lastStressorOperation;
      }

      public long getLastUnsuccessfulCheckTimestamp() {
         return lastUnsuccessfulCheckTimestamp;
      }

      public void setLastUnsuccessfulCheckTimestamp(long lastUnsuccessfulCheckTimestamp) {
         this.lastUnsuccessfulCheckTimestamp = lastUnsuccessfulCheckTimestamp;
      }

      public long getKeyId() {
         return currentKeyId;
      }

      public long getOperationId() {
         return currentOp;
      }

      public synchronized void notify(long operationId, Object key) {
         if (operationId < currentOp || !notifiedOps.add(operationId)) {
            log.warn("Duplicit notification for operation " + operationId + " on key " + key);
         }
      }

      public synchronized void discardNotification(long operationId) {
         notifiedOps.remove(operationId);
         // temporary:
         for (long op : notifiedOps) {
            if (op < operationId) log.error("Old operation " + op + " in " + notifiedOps);
         }
      }

      public synchronized void requireNotify(long operationId) {
         if (operationId < requireNotify) {
            requireNotify = operationId;
         }
      }

      public synchronized boolean hasNotification(long operationId) {
         if (operationId < requireNotify) return true;
         return notifiedOps.contains(operationId);
      }
   }
}
TOP

Related Classes of org.radargun.stages.cache.background.LogChecker$AbstractStressorRecord

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.