Package org.apache.aurora.scheduler.log.mesos

Source Code of org.apache.aurora.scheduler.log.mesos.MesosLog$LogStream$LogEntry

/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.aurora.scheduler.log.mesos;

import java.lang.annotation.Retention;
import java.lang.annotation.Target;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;

import javax.annotation.Nullable;
import javax.inject.Inject;
import javax.inject.Provider;
import javax.inject.Qualifier;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import com.google.common.collect.UnmodifiableIterator;
import com.google.common.primitives.Longs;
import com.twitter.common.application.Lifecycle;
import com.twitter.common.base.Function;
import com.twitter.common.base.MorePreconditions;
import com.twitter.common.inject.TimedInterceptor.Timed;
import com.twitter.common.quantity.Amount;
import com.twitter.common.quantity.Time;
import com.twitter.common.stats.SlidingStats;
import com.twitter.common.stats.Stats;

import org.apache.aurora.scheduler.log.mesos.LogInterface.ReaderInterface;
import org.apache.aurora.scheduler.log.mesos.LogInterface.WriterInterface;
import org.apache.mesos.Log;

import static java.lang.annotation.ElementType.METHOD;
import static java.lang.annotation.ElementType.PARAMETER;
import static java.lang.annotation.RetentionPolicy.RUNTIME;
import static java.util.Objects.requireNonNull;

/**
* A {@code Log} implementation backed by a true distributed log in mesos core.
*/
public class MesosLog implements org.apache.aurora.scheduler.log.Log {

  private static final Logger LOG = Logger.getLogger(MesosLog.class.getName());

  /**
   * Binding annotation for the opaque value of a log noop entry.
   */
  @Qualifier
  @Retention(RUNTIME)
  @Target({ PARAMETER, METHOD })
  public @interface NoopEntry { }

  /**
   * Binding annotation for log read timeouts.
   */
  @Qualifier
  @Retention(RUNTIME)
  @Target({ PARAMETER, METHOD })
  public @interface ReadTimeout { }

  /**
   * Binding annotation for log write timeouts - used for truncates and appends.
   */
  @Qualifier
  @Retention(RUNTIME)
  @Target({ PARAMETER, METHOD })
  public @interface WriteTimeout { }

  private final Provider<LogInterface> logFactory;

  private final Provider<ReaderInterface> readerFactory;
  private final Amount<Long, Time> readTimeout;

  private final Provider<WriterInterface> writerFactory;
  private final Amount<Long, Time> writeTimeout;

  private final byte[] noopEntry;

  private final Lifecycle lifecycle;

  /**
   * Creates a new mesos log.
   *
   * @param logFactory Factory to provide access to log.
   * @param readerFactory Factory to provide access to log readers.
   * @param readTimeout Log read timeout.
   * @param writerFactory Factory to provide access to log writers.
   * @param writeTimeout Log write timeout.
   * @param noopEntry A no-op log entry blob.
   * @param lifecycle Lifecycle to use for initiating application teardown.
   */
  @Inject
  public MesosLog(
      Provider<LogInterface> logFactory,
      Provider<ReaderInterface> readerFactory,
      @ReadTimeout Amount<Long, Time> readTimeout,
      Provider<WriterInterface> writerFactory,
      @WriteTimeout Amount<Long, Time> writeTimeout,
      @NoopEntry byte[] noopEntry,
      Lifecycle lifecycle) {

    this.logFactory = requireNonNull(logFactory);

    this.readerFactory = requireNonNull(readerFactory);
    this.readTimeout = requireNonNull(readTimeout);

    this.writerFactory = requireNonNull(writerFactory);
    this.writeTimeout = requireNonNull(writeTimeout);

    this.noopEntry = requireNonNull(noopEntry);

    this.lifecycle = requireNonNull(lifecycle);
  }

  @Override
  public Stream open() {
    return new LogStream(
        logFactory.get(),
        readerFactory.get(),
        readTimeout,
        writerFactory,
        writeTimeout,
        noopEntry,
        lifecycle);
  }

  @VisibleForTesting
  static class LogStream implements org.apache.aurora.scheduler.log.Log.Stream {
    @VisibleForTesting
    static final class OpStats {
      private final String opName;
      private final SlidingStats timing;
      private final AtomicLong timeouts;
      private final AtomicLong failures;

      OpStats(String opName) {
        this.opName = MorePreconditions.checkNotBlank(opName);
        timing = new SlidingStats("scheduler_log_native_" + opName, "nanos");
        timeouts = exportLongStat("scheduler_log_native_%s_timeouts", opName);
        failures = exportLongStat("scheduler_log_native_%s_failures", opName);
      }

      private static AtomicLong exportLongStat(String template, Object... args) {
        return Stats.exportLong(String.format(template, args));
      }
    }

    private static final Function<Log.Entry, LogEntry> MESOS_ENTRY_TO_ENTRY =
        new Function<Log.Entry, LogEntry>() {
          @Override
          public LogEntry apply(Log.Entry entry) {
            return new LogEntry(entry);
          }
        };

    private final OpStats readStats = new OpStats("read");
    private final OpStats appendStats = new OpStats("append");
    private final OpStats truncateStats = new OpStats("truncate");
    private final AtomicLong entriesSkipped =
        Stats.exportLong("scheduler_log_native_native_entries_skipped");

    private final LogInterface log;

    private final ReaderInterface reader;
    private final long readTimeout;
    private final TimeUnit readTimeUnit;

    private final Provider<WriterInterface> writerFactory;
    private final long writeTimeout;
    private final TimeUnit writeTimeUnit;

    private final byte[] noopEntry;

    private final Lifecycle lifecycle;

    /**
     * The underlying writer to use for mutation operations.  This field has three states:
     * <ul>
     *   <li>present: the writer is active and available for use</li>
     *   <li>absent: the writer has not yet been initialized (initialization is lazy)</li>
     *   <li>{@code null}: the writer has suffered a fatal error and no further operations may
     *       be performed.</li>
     * </ul>
     * When {@code true}, indicates that the log has suffered a fatal error and no further
     * operations may be performed.
     */
    @Nullable private Optional<WriterInterface> writer = Optional.absent();

    LogStream(
        LogInterface log,
        ReaderInterface reader,
        Amount<Long, Time> readTimeout,
        Provider<WriterInterface> writerFactory,
        Amount<Long, Time> writeTimeout,
        byte[] noopEntry,
        Lifecycle lifecycle) {

      this.log = log;

      this.reader = reader;
      this.readTimeout = readTimeout.getValue();
      this.readTimeUnit = readTimeout.getUnit().getTimeUnit();

      this.writerFactory = writerFactory;
      this.writeTimeout = writeTimeout.getValue();
      this.writeTimeUnit = writeTimeout.getUnit().getTimeUnit();

      this.noopEntry = noopEntry;

      this.lifecycle = lifecycle;
    }

    @Override
    public Iterator<Entry> readAll() throws StreamAccessException {
      // TODO(John Sirois): Currently we must be the coordinator to ensure we get the 'full read'
      // of log entries expected by the users of the org.apache.aurora.scheduler.log.Log interface.
      // Switch to another method of ensuring this when it becomes available in mesos' log
      // interface.
      try {
        append(noopEntry);
      } catch (StreamAccessException e) {
        throw new StreamAccessException("Error writing noop prior to a read", e);
      }

      final Log.Position from = reader.beginning();
      final Log.Position to = end().unwrap();

      // Reading all the entries at once may cause large garbage collections. Instead, we
      // lazily read the entries one by one as they are requested.
      // TODO(Benjamin Hindman): Eventually replace this functionality with functionality
      // from the Mesos Log.
      return new UnmodifiableIterator<Entry>() {
        private long position = Longs.fromByteArray(from.identity());
        private final long endPosition = Longs.fromByteArray(to.identity());
        private Entry entry = null;

        @Override
        public boolean hasNext() {
          if (entry != null) {
            return true;
          }

          while (position <= endPosition) {
            long start = System.nanoTime();
            try {
              Log.Position p = log.position(Longs.toByteArray(position));
              if (LOG.isLoggable(Level.FINE)) {
                LOG.fine("Reading position " + position + " from the log");
              }
              List<Log.Entry> entries = reader.read(p, p, readTimeout, readTimeUnit);

              // N.B. HACK! There is currently no way to "increment" a position. Until the Mesos
              // Log actually provides a way to "stream" the log, we approximate as much by
              // using longs via Log.Position.identity and Log.position.
              position++;

              // Reading positions in this way means it's possible that we get an "invalid" entry
              // (e.g., in the underlying log terminology this would be anything but an append)
              // which will be removed from the returned entries resulting in an empty list.
              // We skip these.
              if (entries.isEmpty()) {
                entriesSkipped.getAndIncrement();
              } else {
                entry = MESOS_ENTRY_TO_ENTRY.apply(Iterables.getOnlyElement(entries));
                return true;
              }
            } catch (TimeoutException e) {
              readStats.timeouts.getAndIncrement();
              throw new StreamAccessException("Timeout reading from log.", e);
            } catch (Log.OperationFailedException e) {
              readStats.failures.getAndIncrement();
              throw new StreamAccessException("Problem reading from log", e);
            } finally {
              readStats.timing.accumulate(System.nanoTime() - start);
            }
          }
          return false;
        }

        @Override
        public Entry next() {
          if (entry == null && !hasNext()) {
            throw new NoSuchElementException();
          }

          Entry result = requireNonNull(entry);
          entry = null;
          return result;
        }
      };
    }

    @Override
    public LogPosition append(final byte[] contents) throws StreamAccessException {
      requireNonNull(contents);

      Log.Position position = mutate(appendStats, new Mutation<Log.Position>() {
        @Override
        public Log.Position apply(WriterInterface logWriter)
            throws TimeoutException, Log.WriterFailedException {
          return logWriter.append(contents, writeTimeout, writeTimeUnit);
        }
      });
      return LogPosition.wrap(position);
    }

    @Timed("scheduler_log_native_truncate_before")
    @Override
    public void truncateBefore(org.apache.aurora.scheduler.log.Log.Position position)
        throws StreamAccessException {

      Preconditions.checkArgument(position instanceof LogPosition);

      final Log.Position before = ((LogPosition) position).unwrap();
      mutate(truncateStats, new Mutation<Void>() {
        @Override
        public Void apply(WriterInterface logWriter)
            throws TimeoutException, Log.WriterFailedException {
          logWriter.truncate(before, writeTimeout, writeTimeUnit);
          return null;
        }
      });
    }

    private interface Mutation<T> {
      T apply(WriterInterface writer) throws TimeoutException, Log.WriterFailedException;
    }

    private StreamAccessException disableLog(AtomicLong stat, String message, Throwable cause) {
      stat.incrementAndGet();
      writer = null;
      lifecycle.shutdown();

      throw new StreamAccessException(message, cause);
    }

    private synchronized <T> T mutate(OpStats stats, Mutation<T> mutation) {
      if (writer == null) {
        throw new IllegalStateException("The log has encountered an error and cannot be used.");
      }

      long start = System.nanoTime();
      if (!writer.isPresent()) {
        writer = Optional.of(writerFactory.get());
      }
      try {
        return mutation.apply(writer.get());
      } catch (TimeoutException e) {
        throw disableLog(stats.timeouts, "Timeout performing log " + stats.opName, e);
      } catch (Log.WriterFailedException e) {
        throw disableLog(stats.failures, "Problem performing log" + stats.opName, e);
      } finally {
        stats.timing.accumulate(System.nanoTime() - start);
      }
    }

    private LogPosition end() {
      return LogPosition.wrap(reader.ending());
    }

    @VisibleForTesting
    static class LogPosition implements org.apache.aurora.scheduler.log.Log.Position {
      private final Log.Position underlying;

      LogPosition(Log.Position underlying) {
        this.underlying = underlying;
      }

      static LogPosition wrap(Log.Position position) {
        return new LogPosition(position);
      }

      Log.Position unwrap() {
        return underlying;
      }

      @Override
      public int compareTo(Position o) {
        Preconditions.checkArgument(o instanceof LogPosition);
        return underlying.compareTo(((LogPosition) o).underlying);
      }
    }

    private static class LogEntry implements org.apache.aurora.scheduler.log.Log.Entry {
      private final Log.Entry underlying;

      public LogEntry(Log.Entry entry) {
        this.underlying = entry;
      }

      @Override
      public byte[] contents() {
        return underlying.data;
      }
    }
  }
}
TOP

Related Classes of org.apache.aurora.scheduler.log.mesos.MesosLog$LogStream$LogEntry

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.