Package de.scoopgmbh.copper.batcher.impl

Source Code of de.scoopgmbh.copper.batcher.impl.BatcherQueue$BatchInfo

/*
* Copyright 2002-2013 SCOOP Software GmbH
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*      http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package de.scoopgmbh.copper.batcher.impl;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

import de.scoopgmbh.copper.batcher.BatchCommand;
import de.scoopgmbh.copper.batcher.BatchExecutorBase;


class BatcherQueue {

  enum State {
    STARTED, STOPPING, STOPPED
  };

  static class BatchInfo {

    @SuppressWarnings({ "rawtypes" })
    static final Comparator<BatchCommand> comparator = new Comparator<BatchCommand>() {
      public int compare(BatchCommand o1, BatchCommand o2) {
        if (o1.targetTime() < o2.targetTime())
          return -1;
        if (o1.targetTime() == o2.targetTime())
          return 0;
        return 1;
      }
    };

    long minTargetTime = Long.MAX_VALUE;
    int preferredSize;
    int maximumSize;
    BatchCommandArray batch;
    Condition signaller;

    BatchInfo(BatchExecutorBase<?,?> executor) {
      this.preferredSize = executor.preferredBatchSize();
      this.maximumSize = executor.maximumBatchSize();
      if (maximumSize < preferredSize)
        throw new IllegalArgumentException(
            "Preferred batch size must not exceed maximum batch size");
      this.batch = new BatchCommandArray(executor.prioritize(), initialArraySize());
    }

    int initialArraySize() {
      return Math.min(preferredSize * 2, maximumSize);
    }

    public List<BatchCommand<?,?>> removeCommands(boolean stopped) {
      int batchSize = batch.size();
      if (batchSize <= maximumSize) {
        BatchCommandArray commands = this.batch;
        minTargetTime = Long.MAX_VALUE;
        batch = new BatchCommandArray(commands.sorted, initialArraySize());
        signaller = null;
        return commands;
      }
      BatchCommand<?,?>[] commands = new BatchCommand<?,?>[maximumSize];
      batch.removeElementsFromStart(commands);
      minTargetTime = 0;
      if (!stopped && batch.size() < preferredSize) {
        minTargetTime = Long.MAX_VALUE;
        if (batch.size() > 0)
          minTargetTime = batch.get(0).targetTime();
      }
      signaller = null;
      return Arrays.asList(commands);
    }

    long waitDelay() {
      long ret = minTargetTime - System.currentTimeMillis();
      return (ret < 0) ? 0 : ret;
    }

    /**
     * @return the new target time, if changed, else -1
     */
    Long add(BatchCommand<?, ?> cmd) {
      batch.add(cmd);
      if (this.preferredSize == batch.size() && minTargetTime > 0)
        return (minTargetTime = 0);
      if (cmd.targetTime() < minTargetTime)
        return (minTargetTime = cmd.targetTime());
      return null;
    }

  }

  Map<BatchExecutorBase<?,?>, BatchInfo> batchMap;
  LinkedList<Condition> freeConditions;
  LinkedList<Condition> unusedConditions;
  ArrayList<BatchInfo> batches;
  ReentrantLock lock;
  int numThreads;
  State state;

  public BatcherQueue() {
    this.numThreads = 0;
    lock = new ReentrantLock(false);
    batches = new ArrayList<BatchInfo>();
    batchMap = new HashMap<BatchExecutorBase<?,?>, BatchInfo>();
    freeConditions = new LinkedList<Condition>();
    unusedConditions = new LinkedList<Condition>();
    state = State.STARTED;
  }

  public void submitBatchCommand(BatchCommand<?, ?> cmd) {
    lock.lock();
    try {
      BatchInfo batchInfo = batchMap.get(cmd.executor());
      if (batchInfo == null) {
        batchInfo = new BatchInfo(cmd.executor());
        batchMap.put(cmd.executor(), batchInfo);
        batches.add(batchInfo);
      } else {
        if (batchInfo.batch.size() > batchInfo.maximumSize) {
          //for we do not use fair locks, try to let consumers fetch their bulk first before flooding
          //the queue with commands
          lock.unlock();
          Thread.yield();
          lock.lock();
        }
      }
      Long targetTime = batchInfo.add(cmd);
      enqueueBatch(batchInfo, targetTime);
    } finally {
      lock.unlock();
    }
  }

  void enqueueBatch(BatchInfo batchInfo, Long targetTime) {
    if (targetTime != null && targetTime.longValue() == Long.MAX_VALUE)
      targetTime = null;
    int queuePosition = -1;
    if (targetTime != null) {
      queuePosition = reinsert(batchInfo, targetTime);
    }
    if (batchInfo.signaller == null) {
      if (!freeConditions.isEmpty()) {
        batchInfo.signaller = freeConditions.remove();
        batchInfo.signaller.signal();
      } else if (targetTime != null) {
        if (queuePosition < numThreads) {
          BatchInfo removedBatch = batches.get(numThreads);
          batchInfo.signaller = removedBatch.signaller;
          removedBatch.signaller = null;
          batchInfo.signaller.signal();
        }
      }
    } else {
      if (targetTime != null) {
        batchInfo.signaller.signal();
      }
    }
  }

  int reinsert(BatchInfo batchInfo, long targetTime) {
    int queuePosition = findQueuePosition(batchInfo);
    for (; queuePosition > 0; --queuePosition) {
      BatchInfo currentBatch = batches.get(queuePosition - 1);
      if (currentBatch.minTargetTime <= targetTime)
        break;
      batches.set(queuePosition - 1, batchInfo);
      batches.set(queuePosition, currentBatch);
    }
    return queuePosition;
  }

  int findQueuePosition(BatchInfo batchInfo) {
    int i = batches.size();
    while (batches.get(--i) != batchInfo)
      ;
    return i;
  }

  int findQueuePosition(Condition condition) {
    int i = batches.size() - 1;
    for (; i > -1 && batches.get(i).signaller != condition; --i)
      ;
    return i;
  }

  void stop() throws InterruptedException {
    while (true) {
      lock.lock();
      try {
        state = State.STOPPING;
        boolean stopped = true;
        for (BatchInfo batch : batches) {
          if (batch.batch.size() > 0)
            stopped = false;
        }
        if (stopped) {
          state = State.STOPPED;
          signalAll();
          return;
        }
        signalAll();
      } finally {
        lock.unlock();
      }
      Thread.sleep(100);
    }
  }

  private void signalAll() {
    for (BatchInfo batch : batches) {
      batch.minTargetTime = 0;
      if (batch.signaller != null) {
        batch.signaller.signal();
      }
    }
    for (Condition cond : freeConditions) {
      cond.signal();
    }
  }
 

  public List<BatchCommand<?,?>> poll() throws InterruptedException {
    lock.lockInterruptibly();
    try {
      outerLoop: while (true) {
        ++numThreads;
        Condition myCondition = null;
        if (!unusedConditions.isEmpty())
          myCondition = unusedConditions.pop();
        else {
          myCondition = lock.newCondition();
        }
        if (batches.size() >= numThreads) {
          batches.get(numThreads - 1).signaller = myCondition;
        } else {
          freeConditions.add(myCondition);
        }
        waitLoop: while (true) {
          int queuePosition = findQueuePosition(myCondition);
          if (queuePosition == -1) {
            if (state == State.STOPPED) {
              unusedConditions.add(myCondition);
              return null;
            }
            myCondition.await();
            continue waitLoop;
          }
          BatchInfo myBatch = batches.get(queuePosition);
          long waitDelay = myBatch.waitDelay();
          if (state != State.STOPPED && waitDelay > 0) {
            myCondition.await(waitDelay, TimeUnit.MILLISECONDS);
          } else {
            --numThreads;
            List<BatchCommand<?,?>> commands = myBatch
                .removeCommands(state == State.STOPPING);
            batches.remove(queuePosition);
            batches.add(myBatch);
            enqueueBatch(myBatch, myBatch.minTargetTime);
            unusedConditions.add(myCondition);
            if (commands.size() > 0) {
              return commands;
            }
            if (state == State.STOPPED)
              return null;
            continue outerLoop;
          }
        }
      }
    } finally {
      lock.unlock();
    }
  }

}
TOP

Related Classes of de.scoopgmbh.copper.batcher.impl.BatcherQueue$BatchInfo

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.