Package reactor.queue

Source Code of reactor.queue.IndexedChronicleQueuePersistor

/*
* Copyright (c) 2011-2014 Pivotal Software, Inc.
*
*  Licensed under the Apache License, Version 2.0 (the "License");
*  you may not use this file except in compliance with the License.
*  You may obtain a copy of the License at
*
*         http://www.apache.org/licenses/LICENSE-2.0
*
*  Unless required by applicable law or agreed to in writing, software
*  distributed under the License is distributed on an "AS IS" BASIS,
*  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*  See the License for the specific language governing permissions and
*  limitations under the License.
*/

package reactor.queue;

import net.openhft.chronicle.*;
import net.openhft.chronicle.tools.ChronicleTools;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.io.Buffer;
import reactor.io.encoding.Codec;
import reactor.io.encoding.JavaSerializationCodec;

import javax.annotation.Nonnull;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicLong;

/**
* A {@link QueuePersistor} implementation that uses a <a href="https://github.com/peter-lawrey/Java-Chronicle">Java
* Chronicle</a> {@literal IndexedChronicle} to persist items in the queue.
*
* @author Jon Brisbin
* @author Stephane Maldini
* @see <a href="https://github.com/peter-lawrey/Java-Chronicle">Java Chronicle</a>
*/
public class IndexedChronicleQueuePersistor<T> implements QueuePersistor<T> {

  private static final Logger LOG = LoggerFactory.getLogger(IndexedChronicleQueuePersistor.class);

  private final Object     monitor = new Object();
  private final AtomicLong lastId  = new AtomicLong();
  private final AtomicLong size    = new AtomicLong(0);

  private final ExcerptTailer       exTrailer;
  private final ExcerptTailer       indexTrailer;
  private final ExcerptAppender     exAppender;
  private final String              basePath;
  private final Codec<Buffer, T, T> codec;
  private final boolean             deleteOnExit;
  private final IndexedChronicle    data;

  /**
   * Create an {@link IndexedChronicleQueuePersistor} based on the given base path.
   *
   * @param basePath Directory in which to create the Chronicle.
   * @throws IOException
   */
  public IndexedChronicleQueuePersistor(@Nonnull String basePath) throws IOException {
    this(basePath, new JavaSerializationCodec<T>(), false, true, ChronicleConfig.DEFAULT.clone());
  }

  /**
   * Create an {@link IndexedChronicleQueuePersistor} based on the given base path, encoder and decoder. Optionally,
   * passing {@literal false} to {@code clearOnStart} skips clearing the Chronicle on start for appending.
   *
   * @param basePath     Directory in which to create the Chronicle.
   * @param codec        Codec to turn objects into {@link reactor.io.Buffer Buffers} and visa-versa.
   * @param clearOnStart Whether or not to clear the Chronicle on start.
   * @param deleteOnExit Whether or not to delete the Chronicle when the program exits.
   * @param config       ChronicleConfig to use.
   * @throws IOException
   */
  public IndexedChronicleQueuePersistor(@Nonnull String basePath,
                                        @Nonnull Codec<Buffer, T, T> codec,
                                        boolean clearOnStart,
                                        boolean deleteOnExit,
                                        @Nonnull ChronicleConfig config) throws IOException {
    this.basePath = basePath;
    this.codec = codec;
    this.deleteOnExit = deleteOnExit;

    if (clearOnStart) {
      for (String name : new String[]{basePath + ".data", basePath + ".index"}) {
        File file = new File(name);
        if (file.exists()) {
          file.delete();
        }
      }
    }

    ChronicleTools.warmup();
    data = new IndexedChronicle(basePath, config);
    lastId.set(data.findTheLastIndex());

    Excerpt ex = data.createExcerpt();
    int status = ex.readInt();
    ex.skip(4);
    while (ex.nextIndex()) {
      int len = ex.readInt();
      size.incrementAndGet();
      ex.skip(len);
    }
    indexTrailer = data.createTailer();

    exTrailer = data.createTailer();
    exAppender = data.createAppender();
  }

  /**
   * Close the underlying chronicles.
   */
  @Override
  public void close() {
    try {
      data.close();

      if (deleteOnExit) {
        ChronicleTools.deleteOnExit(basePath);
      }
    } catch (IOException e) {
      throw new IllegalStateException(e.getMessage(), e);
    }
  }

  @Override
  public long lastId() {
    return lastId.get();
  }

  @Override
  public long size() {
    return size.get();
  }

  @Override
  public boolean hasNext() {
    return indexTrailer.nextIndex();
  }

  @Override
  public Long offer(@Nonnull T t) {
    synchronized (monitor) {
      Buffer buff = codec.encoder().apply(t);

      int len = buff.remaining();
      exAppender.startExcerpt(4 + len);
      exAppender.writeInt(len);
      exAppender.write(buff.byteBuffer());
      exAppender.finish();

      size.incrementAndGet();
      lastId.set(exAppender.lastWrittenIndex());
    }

    if (LOG.isTraceEnabled()) {
      LOG.trace("Offered {} to Chronicle at index {}, size {}", t, lastId(), size());
    }

    return lastId();
  }

  @Override
  public T get(Long id) {
    if (!exTrailer.index(id)) {
      return null;
    }
    return read(exTrailer);
  }

  @Override
  public T remove() {
    synchronized (monitor) {
      T obj = read(indexTrailer);
      size.decrementAndGet();
      return obj;
    }
  }

  @Override
  public Iterator<T> iterator() {
    return new Iterator<T>() {
      public boolean hasNext() {
        return IndexedChronicleQueuePersistor.this.hasNext();
      }

      @SuppressWarnings("unchecked")
      @Override
      public T next() {
        return read(indexTrailer);
      }

      @Override
      public void remove() {
        throw new IllegalStateException("This Iterator is read-only.");
      }
    };
  }

  @SuppressWarnings("unchecked")
  private T read(ExcerptCommon ex) {
    try {
      int len = ex.readInt();
      ByteBuffer bb = ByteBuffer.allocate(len);
      ex.read(bb);
      bb.flip();
      return codec.decoder(null).apply(new Buffer(bb));
    } finally {
      ex.finish();
    }
  }

}
TOP

Related Classes of reactor.queue.IndexedChronicleQueuePersistor

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.