Package reactor.timer

Source Code of reactor.timer.HashWheelTimer$WaitStrategy

/*
* Copyright (c) 2011-2014 Pivotal Software, Inc.
*
*  Licensed under the Apache License, Version 2.0 (the "License");
*  you may not use this file except in compliance with the License.
*  You may obtain a copy of the License at
*
*         http://www.apache.org/licenses/LICENSE-2.0
*
*  Unless required by applicable law or agreed to in writing, software
*  distributed under the License is distributed on an "AS IS" BASIS,
*  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*  See the License for the specific language governing permissions and
*  limitations under the License.
*/

package reactor.timer;

import reactor.event.registry.Registration;
import reactor.event.selector.Selector;
import reactor.function.Consumer;
import reactor.function.Pausable;
import reactor.jarjar.com.lmax.disruptor.EventFactory;
import reactor.jarjar.com.lmax.disruptor.RingBuffer;
import reactor.support.NamedDaemonThreadFactory;
import reactor.util.Assert;

import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

/**
* Hash Wheel Timer, as per the paper:
*
* Hashed and hierarchical timing wheels:
* http://www.cs.columbia.edu/~nahum/w6998/papers/ton97-timing-wheels.pdf
*
* More comprehensive slides, explaining the paper can be found here:
* http://www.cse.wustl.edu/~cdgill/courses/cs6874/TimingWheels.ppt
*
* Hash Wheel timer is an approximated timer that allows performant execution of
* larger amount of tasks with better performance compared to traditional scheduling.
*
* @author Oleksandr Petrov
*/
public class HashWheelTimer implements Timer {

  public static final  int    DEFAULT_WHEEL_SIZE = 512;
  private static final String DEFAULT_TIMER_NAME = "hash-wheel-timer";

  private final RingBuffer<Set<TimerRegistration>> wheel;
  private final int                                resolution;
  private final Thread                             loop;
  private final Executor                           executor;
  private final WaitStrategy                       waitStrategy;

  /**
   * Create a new {@code HashWheelTimer} using the given with default resolution of 100 milliseconds and
   * default wheel size.
   */
  public HashWheelTimer() {
    this(100, DEFAULT_WHEEL_SIZE, new SleepWait());
  }

  /**
   * Create a new {@code HashWheelTimer} using the given timer resolution. All times will rounded up to the closest
   * multiple of this resolution.
   *
   * @param resolution
   *     the resolution of this timer, in milliseconds
   */
  public HashWheelTimer(int resolution) {
    this(resolution, DEFAULT_WHEEL_SIZE, new SleepWait());
  }

  /**
   * Create a new {@code HashWheelTimer} using the given timer {@param res} and {@param wheelSize}. All times will
   * rounded up to the closest multiple of this resolution.
   *
   * @param res
   *     resolution of this timer in milliseconds
   * @param wheelSize
   *     size of the Ring Buffer supporting the Timer, the larger the wheel, the less the lookup time is
   *     for sparse timeouts. Sane default is 512.
   * @param waitStrategy
   *     strategy for waiting for the next tick
   */
  public HashWheelTimer(int res, int wheelSize, WaitStrategy waitStrategy) {
    this(DEFAULT_TIMER_NAME, res, wheelSize, waitStrategy, Executors.newFixedThreadPool(1));
  }

  /**
   * Create a new {@code HashWheelTimer} using the given timer {@param resolution} and {@param wheelSize}. All times will
   * rounded up to the closest multiple of this resolution.
   *
   * @param name
   *     name for daemon thread factory to be displayed
   * @param res
   *     resolution of this timer in milliseconds
   * @param wheelSize
   *     size of the Ring Buffer supporting the Timer, the larger the wheel, the less the lookup time is
   *     for sparse timeouts. Sane default is 512.
   * @param strategy
   *     strategy for waiting for the next tick
   * @param exec
   *     Executor instance to submit tasks to
   */
  public HashWheelTimer(String name, int res, int wheelSize, WaitStrategy strategy, Executor exec) {
    this.waitStrategy = strategy;

    this.wheel = RingBuffer.createSingleProducer(new EventFactory<Set<TimerRegistration>>() {
      @Override
      public Set<TimerRegistration> newInstance() {
        return new ConcurrentSkipListSet<TimerRegistration>();
      }
    }, wheelSize);

    this.resolution = res;
    this.loop = new NamedDaemonThreadFactory(name).newThread(new Runnable() {
      @Override
      public void run() {
        long deadline = System.currentTimeMillis();

        while(true) {
          Set<TimerRegistration> registrations = wheel.get(wheel.getCursor());

          for(TimerRegistration r : registrations) {
            if(r.isCancelled()) {
              registrations.remove(r);
            } else if(r.ready()) {
              executor.execute(r);
              registrations.remove(r);

              if(!r.isCancelAfterUse()) {
                reschedule(r);
              }
            } else if(r.isPaused()) {
              reschedule(r);
            } else {
              r.decrement();
            }
          }

          deadline += resolution;

          try {
            waitStrategy.waitUntil(deadline);
          } catch(InterruptedException e) {
            return;
          }

          wheel.publish(wheel.next());
        }
      }
    });

    this.executor = exec;
    this.start();
  }

  @Override
  public long getResolution() {
    return resolution;
  }

  @SuppressWarnings("unchecked")
  @Override
  public TimerRegistration<? extends Consumer<Long>> schedule(Consumer<Long> consumer,
                                                              long period,
                                                              TimeUnit timeUnit,
                                                              long delayInMilliseconds) {
    Assert.isTrue(!loop.isInterrupted(), "Cannot submit tasks to this timer as it has been cancelled.");
    return schedule(TimeUnit.MILLISECONDS.convert(period, timeUnit), delayInMilliseconds, consumer);
  }

  @Override
  @SuppressWarnings("unchecked")
  public TimerRegistration<? extends Consumer<Long>> submit(Consumer<Long> consumer,
                                                            long period,
                                                            TimeUnit timeUnit) {
    Assert.isTrue(!loop.isInterrupted(), "Cannot submit tasks to this timer as it has been cancelled.");
    long ms = TimeUnit.MILLISECONDS.convert(period, timeUnit);
    return (TimerRegistration<? extends Consumer<Long>>)schedule(ms, ms, consumer).cancelAfterUse();
  }

  @Override
  @SuppressWarnings("unchecked")
  public TimerRegistration<? extends Consumer<Long>> submit(Consumer<Long> consumer) {
    return (TimerRegistration<? extends Consumer<Long>>)submit(consumer, resolution, TimeUnit.MILLISECONDS);
  }

  @Override
  @SuppressWarnings("unchecked")
  public TimerRegistration<? extends Consumer<Long>> schedule(Consumer<Long> consumer,
                                                              long period,
                                                              TimeUnit timeUnit) {
    return (TimerRegistration<? extends Consumer<Long>>)schedule(TimeUnit.MILLISECONDS.convert(period, timeUnit), 0, consumer);
  }

  @SuppressWarnings("unchecked")
  private TimerRegistration<? extends Consumer<Long>> schedule(long recurringTimeout,
                                                               long firstDelay,
                                                               Consumer<Long> consumer) {
    Assert.isTrue(recurringTimeout >= resolution,
                  "Cannot schedule tasks for amount of time less than timer precision.");

    long offset = recurringTimeout / resolution;
    long rounds = offset / wheel.getBufferSize();

    long firstFireOffset = firstDelay / resolution;
    long firstFireRounds = firstFireOffset / wheel.getBufferSize();

    TimerRegistration r = new TimerRegistration(firstFireRounds, offset, consumer, rounds);
    wheel.get(wheel.getCursor() + firstFireOffset + 1).add(r);
    return r;
  }

  /**
   * Reschedule a {@link TimerRegistration}  for the next fire
   *
   * @param registration
   */
  private void reschedule(TimerRegistration registration) {
    registration.reset();
    wheel.get(wheel.getCursor() + registration.getOffset()).add(registration);
  }

  /**
   * Start the Timer
   */
  public void start() {
    this.loop.start();
    wheel.publish(0);
  }

  /**
   * Cancel current Timer
   */
  public void cancel() {
    this.loop.interrupt();
  }


  /**
   * Timer Registration
   *
   * @param <T>
   *     type of the Timer Registration Consumer
   */
  public static class TimerRegistration<T extends Consumer<Long>> implements Runnable,
                                                                             Comparable,
                                                                             Pausable,
                                                                             Registration {

    public static int STATUS_PAUSED    = 1;
    public static int STATUS_CANCELLED = -1;
    public static int STATUS_READY     = 0;

    private final T             delegate;
    private final long          rescheduleRounds;
    private final long          scheduleOffset;
    private final AtomicLong    rounds;
    private final AtomicInteger status;
    private final AtomicBoolean cancelAfterUse;

    /**
     * Creates a new Timer Registration with given {@param rounds}, {@param offset} and {@param delegate}.
     *
     * @param rounds
     *     amount of rounds the Registration should go through until it's elapsed
     * @param offset
     *     offset of in the Ring Buffer for rescheduling
     * @param delegate
     *     delegate that will be ran whenever the timer is elapsed
     */
    public TimerRegistration(long rounds, long offset, T delegate, long rescheduleRounds) {
      this.rescheduleRounds = rescheduleRounds;
      this.scheduleOffset = offset;
      this.delegate = delegate;
      this.rounds = new AtomicLong(rounds);
      this.status = new AtomicInteger(STATUS_READY);
      this.cancelAfterUse = new AtomicBoolean(false);
    }

    /**
     * Decrement an amount of runs Registration has to run until it's elapsed
     */
    public void decrement() {
      rounds.decrementAndGet();
    }

    /**
     * Check whether the current Registration is ready for execution
     *
     * @return whether or not the current Registration is ready for execution
     */
    public boolean ready() {
      return status.get() == STATUS_READY && rounds.get() == 0;
    }

    /**
     * Run the delegate of the current Registration
     */
    @Override
    public void run() {
      delegate.accept(TimeUtils.approxCurrentTimeMillis());
    }

    /**
     * Reset the Registration
     */
    public void reset() {
      this.status.set(STATUS_READY);
      this.rounds.set(rescheduleRounds);
    }

    /**
     * Cancel the registration
     *
     * @return current Registration
     */
    public Registration cancel() {
      this.status.set(STATUS_CANCELLED);
      return this;
    }

    /**
     * Check whether the current Registration is cancelled
     *
     * @return whether or not the current Registration is cancelled
     */
    @Override
    public boolean isCancelled() {
      return status.get() == STATUS_CANCELLED;
    }

    /**
     * Pause the current Regisration
     *
     * @return current Registration
     */
    @Override
    public Registration pause() {
      this.status.set(STATUS_PAUSED);
      return this;
    }

    /**
     * Check whether the current Registration is paused
     *
     * @return whether or not the current Registration is paused
     */
    @Override
    public boolean isPaused() {
      return this.status.get() == STATUS_PAUSED;
    }

    /**
     * Resume current Registration
     *
     * @return current Registration
     */
    @Override
    public Registration resume() {
      reset();
      return this;
    }

    /**
     * Get the offset of the Registration relative to the current Ring Buffer position
     * to make it fire timely.
     *
     * @return the offset of current Registration
     */
    private long getOffset() {
      return this.scheduleOffset;
    }

    @Override
    public Selector getSelector() {
      return null;
    }

    @Override
    public Object getObject() {
      return delegate;
    }

    /**
     * Cancel this {@link reactor.timer.HashWheelTimer.TimerRegistration} after it has been selected and used. {@link
     * reactor.event.dispatch.Dispatcher} implementations should respect this value and perform
     * the cancellation.
     *
     * @return {@literal this}
     */
    public TimerRegistration<T> cancelAfterUse() {
      cancelAfterUse.set(false);
      return this;
    }

    @Override
    public boolean isCancelAfterUse() {
      return this.cancelAfterUse.get();
    }

    @Override
    public int compareTo(Object o) {
      TimerRegistration other = (TimerRegistration)o;
      if(rounds.get() == other.rounds.get()) {
        return other == this ? 0 : -1;
      } else {
        return Long.compare(rounds.get(), other.rounds.get());
      }
    }

    @Override
    public String toString() {
      return String.format("HashWheelTimer { Rounds left: %d, Status: %d }", rounds.get(), status.get());
    }
  }

  @Override
  public String toString() {
    return String.format("HashWheelTimer { Buffer Size: %d, Resolution: %d }",
                         wheel.getBufferSize(),
                         resolution);
  }


  /**
   * Wait strategy for the timer
   */
  public static interface WaitStrategy {

    /**
     * Wait until the given deadline, {@param deadlineMilliseconds}
     *
     * @param deadlineMilliseconds
     *     deadline to wait for, in milliseconds
     */
    public void waitUntil(long deadlineMilliseconds) throws InterruptedException;
  }

  /**
   * Yielding wait strategy.
   *
   * Spins in the loop, until the deadline is reached. Releases the flow control
   * by means of Thread.yield() call. This strategy is less precise than BusySpin
   * one, but is more scheduler-friendly.
   */
  public static class YieldingWait implements WaitStrategy {

    @Override
    public void waitUntil(long deadlineMilliseconds) throws InterruptedException {
      while(deadlineMilliseconds >= System.currentTimeMillis()) {
        Thread.yield();
        if(Thread.currentThread().isInterrupted()) {
          throw new InterruptedException();
        }
      }
    }
  }

  /**
   * BusySpin wait strategy.
   *
   * Spins in the loop until the deadline is reached. In a multi-core environment,
   * will occupy an entire core. Is more precise than Sleep wait strategy, but
   * consumes more resources.
   */
  public static class BusySpinWait implements WaitStrategy {

    @Override
    public void waitUntil(long deadlineMilliseconds) throws InterruptedException {
      while(deadlineMilliseconds >= System.currentTimeMillis()) {
        if(Thread.currentThread().isInterrupted()) {
          throw new InterruptedException();
        }
      }
    }
  }

  /**
   * Sleep wait strategy.
   *
   * Will release the flow control, giving other threads a possibility of execution
   * on the same processor. Uses less resources than BusySpin wait, but is less
   * precise.
   */
  public static class SleepWait implements WaitStrategy {

    @Override
    public void waitUntil(long deadlineMilliseconds) throws InterruptedException {
      long sleepTimeMs = deadlineMilliseconds - System.currentTimeMillis();
      if(sleepTimeMs > 0) {
        Thread.sleep(sleepTimeMs);
      }
    }
  }

}
TOP

Related Classes of reactor.timer.HashWheelTimer$WaitStrategy

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.