Package de.scoopgmbh.copper.monitoring.core.data

Source Code of de.scoopgmbh.copper.monitoring.core.data.MonitoringDataStorage$TargetFile

/*
* Copyright 2002-2013 SCOOP Software GmbH
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*      http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package de.scoopgmbh.copper.monitoring.core.data;
import java.io.File;
import java.io.FileFilter;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.OutputStream;
import java.io.RandomAccessFile;
import java.nio.BufferOverflowException;
import java.nio.MappedByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.FileChannel;
import java.nio.channels.FileChannel.MapMode;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.ListIterator;
import java.util.NoSuchElementException;
import java.util.concurrent.TimeUnit;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.KryoException;
import com.esotericsoftware.kryo.io.ByteBufferOutputStream;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;

import de.scoopgmbh.copper.monitoring.core.model.MonitoringData;
import de.scoopgmbh.copper.monitoring.core.model.MonitoringDataStorageContentInfo;
import de.scoopgmbh.copper.monitoring.core.model.MonitoringDataStorageInfo;

/**
*  stores monitoring data in chunked files
*
*/
public class MonitoringDataStorage {

  static final int FILE_CHUNK_SIZE=4*1024*1024;
  static final int LIMIT_POSITION = 0;
  static final int EARLIEST_POSITION = LIMIT_POSITION+4;
  static final int LATEST_POSITION = EARLIEST_POSITION+8;
  static final int FIRST_RECORD_POSITION = LATEST_POSITION+8;
  static final int HEADER_END = FIRST_RECORD_POSITION;

  final File targetPath;
  final String filenamePrefix;
        TargetFile currentTarget;
  final ArrayList<TargetFile> writtenFiles = new ArrayList<TargetFile>();
  long lastTimeStamp = 0;
  final Object lock = new Object();
  boolean closed = false;
  long totalSize;
  final long maxTotalSize;
  final long discardDataBeforeDateMillis;
 
 
  static final class TargetFile {
    private File             file;
    private RandomAccessFile memoryMappedFile;
    private MappedByteBuffer out;
    private long             earliestTimestamp = Long.MAX_VALUE;
    private long             latestTimestamp = Long.MIN_VALUE;
    private int              limit = FIRST_RECORD_POSITION;
    private int              fileSize;
    private Kryo kryo = SerializeUtil.createKryo();
    private ByteBufferOutputStream bostr;
    private Output output;
   
    /**
     * @param file
     * @throws IOException
     */
    public TargetFile(File file, boolean readOnly) throws IOException {
      this.file = file;
      if (!readOnly) {
        memoryMappedFile = new RandomAccessFile(file, "rw");
        out = memoryMappedFile.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, FILE_CHUNK_SIZE);
        fileSize = FILE_CHUNK_SIZE;
        limit = FIRST_RECORD_POSITION;
        out.putLong(LIMIT_POSITION,limit);
        out.putLong(EARLIEST_POSITION,earliestTimestamp);
        out.putLong(LATEST_POSITION,latestTimestamp);
        out.position(FIRST_RECORD_POSITION);
          bostr = new ByteBufferOutputStream(out);
          output = new Output(bostr);
      } else {
        memoryMappedFile = new RandomAccessFile(file,"r");
        fileSize = (int)file.length();
        MappedByteBuffer bb = memoryMappedFile.getChannel().map(MapMode.READ_ONLY, 0, HEADER_END);
        earliestTimestamp = bb.getLong(EARLIEST_POSITION);
        latestTimestamp = bb.getLong(LATEST_POSITION);
        limit = bb.getInt(LIMIT_POSITION);
        memoryMappedFile.close();
        memoryMappedFile = null;
      }
    }

    public void openForWriting() throws IOException {
      OpenedFile of = new OpenedFile(this, Long.MIN_VALUE, Long.MAX_VALUE, false);
      memoryMappedFile = new RandomAccessFile(file, "rw");
      out = memoryMappedFile.getChannel().map(MapMode.READ_WRITE, 0, FILE_CHUNK_SIZE);
      out.position(limit);
      out.putInt(LIMIT_POSITION,FIRST_RECORD_POSITION);
        bostr = new ByteBufferOutputStream(out);
        output = new Output(bostr);
      kryo = SerializeUtil.createKryo();
     
      MonitoringData md = null;
      Output o = new Output(nullOut);
      while ((md = of.pop()) != null) {
        kryo.writeClassAndObject(o,  md);
      }
    }

    @Override
    protected void finalize() throws Throwable {
      super.finalize();
      close();
    }
    public synchronized void close() {
      kryo = null;
      if (output != null) {
        try {
          output.close();
        } catch (BufferOverflowException be) {
          /* we can silently ignore this because we alway flush() the output and this is a remainder of the exception when a file is fully written */
        }
        output = null;
      }
      if (bostr != null) {
        try {
          bostr.close();
        } catch (IOException e) {
          /* ignore */
        }
        bostr = null;
      }
      out = null;
      if (memoryMappedFile == null)
        return;
      if (memoryMappedFile.getChannel().isOpen()) {
        try {
          memoryMappedFile.close();
        } catch (IOException ex) {
          /* ignore */
        }
      }
      memoryMappedFile = null;
    }
   
    public boolean serialize(MonitoringData monitoringData) {
      try {
        try {
          kryo.writeClassAndObject(output, monitoringData);
          output.flush();
          return true;
        } catch (KryoException ky) {
          // The BufferOverflowException might get wrapped, so we have to handle both.
          // The exception indicates that we have tried to write beyond the buffer's capacity
          if (ky.getCause() != null && ky.getCause() instanceof BufferOverflowException) {
            throw (BufferOverflowException)ky.getCause();
          }
            throw ky;
        }
      } catch (BufferOverflowException be) {
        return false;
      }
    }
  }
 
  @Override
  protected void finalize() throws Throwable {
    super.finalize();
    close();
  }
 
  public MonitoringDataStorage() throws IOException {
    this(createTempFolder("coppermonitoring"), "data", Long.MAX_VALUE, TimeUnit.DAYS, 366L*100L /* hundred years should be enough for everyone */);
  }

  private static File createTempFolder(String tempDirPrefix) throws IOException {
    File temp = File.createTempFile(tempDirPrefix, "");
    temp.delete();
    temp.mkdir();
    return temp;
  }
 
  public MonitoringDataStorage(String tempDirPrefix, long maxTotalSize, long daysToKeep) throws IOException {
    this(createTempFolder(tempDirPrefix), "data", maxTotalSize, TimeUnit.DAYS, daysToKeep);
  }
 
  public MonitoringDataStorage(File targetPath, String filenamePrefix) {
    this(targetPath, filenamePrefix, Long.MAX_VALUE, TimeUnit.DAYS, 366L*100L /* hundred years should be enough for everyone */);
  }
 
  public MonitoringDataStorage(File targetPath, String filenamePrefix, long maxSize, TimeUnit maxAgeUnit, long duration) {
    this.targetPath = targetPath;
    this.filenamePrefix = filenamePrefix;
    this.maxTotalSize = maxSize;
    this.discardDataBeforeDateMillis = maxAgeUnit.toMillis(duration);
    loadFiles();
    ensureCurrentFile(0);
  }
 
  private void loadFiles()  {
    FileFilter f = new FileFilter() {
     
      @Override
      public boolean accept(File pathname) {
        if (!pathname.getName().startsWith(filenamePrefix))
          return false;
        String name = pathname.getName().substring(filenamePrefix.length());
        return name.matches("\\.[0-9]+");
      }
    };
    TargetFile latestFile = null;
    long latestTimestamp = Long.MIN_VALUE;
    for (File file : targetPath.listFiles(f)) {
      try {
        TargetFile tf = new TargetFile(file, true);
        totalSize += tf.fileSize;
        if (tf.latestTimestamp > latestTimestamp) {
          latestTimestamp = tf.latestTimestamp;
          if (latestFile != null)
            writtenFiles.add(latestFile);
          latestFile = tf;
        } else {
          writtenFiles.add(tf);
        }
      } catch (IOException e) {
        /* silently discard broken files */
      }
    }
    if (latestFile != null) {
      try {
        latestFile.openForWriting();
        currentTarget = latestFile;
      } catch (IOException ioe) {
        /* should never happen */
        writtenFiles.add(latestFile);       
      }
    }
  }
 
  static OutputStream nullOut = new OutputStream() {
    @Override
    public void write(int b) throws IOException {}
    @Override
    public void write(byte[] b, int off, int len) throws IOException {};
  };

  void ensureCurrentFile(int additionalBytes) {
    if (currentTarget == null) {
      currentTarget = createTargetFile();
    }
    if (currentTarget.out.position() + additionalBytes > currentTarget.fileSize) {
      closeCurrentTarget();
      currentTarget = createTargetFile();
    }
  }

  private void closeCurrentTarget() {
    writtenFiles.add(currentTarget);
    currentTarget.close();
    currentTarget = null;
  }

  private TargetFile createTargetFile(){
    try {
      long currentTimeStamp = System.currentTimeMillis();
      File f = null;
      while (true) {
        f = new File(targetPath, filenamePrefix+"."+currentTimeStamp);
        if (!f.exists()) {
          houseKeeping(FILE_CHUNK_SIZE)
          TargetFile newTarget = new TargetFile(f,  false);
          totalSize += newTarget.fileSize;
          return newTarget;
        }
        ++currentTimeStamp;
      }
    } catch (FileNotFoundException e) {
      throw new RuntimeException(e);
    } catch (IOException e) {
      throw new RuntimeException(e);
    }
  }

 
    private void houseKeeping(int additionalSize) {
      long discardBefore = System.currentTimeMillis()-discardDataBeforeDateMillis;
      synchronized (lock) {
        Collections.sort(writtenFiles, new Comparator<TargetFile>() {
        @Override
        public int compare(TargetFile o1, TargetFile o2) {
          if (o1.latestTimestamp < o2.latestTimestamp)
            return -1;
          if (o1.latestTimestamp > o2.latestTimestamp)
            return 1;
          return System.identityHashCode(o1)-System.identityHashCode(o2);
        }
      });
        int i = 0;
        while (totalSize + additionalSize > maxTotalSize && i < writtenFiles.size()) {
          int size = writtenFiles.get(i).fileSize;
          if (!writtenFiles.get(i).file.delete())
            break;
          writtenFiles.remove(i);
          totalSize -= size;
          ++i;
        }
        i = 0;
        while (!writtenFiles.isEmpty() && writtenFiles.get(0).latestTimestamp < discardBefore && i < writtenFiles.size()) {
          int size = writtenFiles.get(0).fileSize;
          if (!writtenFiles.get(0).file.delete())
            break;
          writtenFiles.remove(0);
          totalSize -= size;
          ++i;
        }
    }
   
  }
   
   /**
   * @param monitoringData  {@link MonitoringData#getTimeStamp() must be not null}
   */
  public void write(MonitoringData monitoringData) {
     assert monitoringData.getTimeStamp()!=null;
      long referenceMillis = monitoringData.getTimeStamp().getTime();
        synchronized (lock) {
          if (closed){
            throw new RuntimeException(new ClosedChannelException());
          }
          ensureCurrentFile(0);
        if (!currentTarget.serialize(monitoringData)) {
          closeCurrentTarget();
            write(monitoringData);
            return;
        }

            if (currentTarget.earliestTimestamp > referenceMillis) {
              currentTarget.earliestTimestamp = referenceMillis;
              currentTarget.out.putLong(EARLIEST_POSITION,referenceMillis);
            }
            if (currentTarget.latestTimestamp < referenceMillis) {
              currentTarget.latestTimestamp = referenceMillis;
              currentTarget.out.putLong(LATEST_POSITION,referenceMillis);
            }
            currentTarget.limit = currentTarget.out.position();
            currentTarget.out.putInt(LIMIT_POSITION,currentTarget.limit);
        }
    }
   
   
    public void close() throws IOException {
      synchronized (lock) {
        if (currentTarget != null)
          closeCurrentTarget();
        closed = true;
      }
    }
    static class OpenedFile {

      ArrayList<MonitoringData> monitoringDataList;
     
      public OpenedFile(TargetFile f, long fromTime, long toTime, boolean reverse) throws IOException {
        RandomAccessFile randomAccessFile = new RandomAccessFile(f.file, "r");
        randomAccessFile.seek(FIRST_RECORD_POSITION);
        byte[] data = new byte[f.limit-FIRST_RECORD_POSITION];
        int c = 0;
        while ((c += randomAccessFile.read(data,c,data.length-c)) < data.length);
        randomAccessFile.close();
        readData(new Input(data),SerializeUtil.createKryo(), fromTime, toTime, reverse);
       
      }

    private void readData(Input i, Kryo kryo, long fromTime, long toTime, final boolean reverse) {
      monitoringDataList = new ArrayList<MonitoringData>();
      try {
        while (i.available() > 0){
          MonitoringData data = (MonitoringData)kryo.readClassAndObject(i);
          if (data.getTimeStamp().getTime() <= toTime && data.getTimeStamp().getTime()  >= fromTime) {
            monitoringDataList.add(data);
          }
        }
      } catch (IOException e) {
        throw new RuntimeException(e);
      }
      Collections.sort(monitoringDataList, new Comparator<MonitoringData>(){
        @Override
        public int compare(MonitoringData o1, MonitoringData o2) {
          return o1.getTimeStamp().compareTo(o2.getTimeStamp());
        }
      });
      if (!reverse)
        Collections.reverse(monitoringDataList);
    }
   
    public MonitoringData pop() {
      if (monitoringDataList.isEmpty())
        return null;
      return monitoringDataList.remove(monitoringDataList.size()-1);
    }
   
    public void push(MonitoringData data) {
      monitoringDataList.add(data);
    }
   
     
    }

    /**
     * @param fromDate null = no lower bound
     * @param toDate null = no upper bound
     * @return
     */
    public Iterable<MonitoringData> read(Date fromDate, Date toDate) {
      return read(fromDate, toDate, false);
    }
   
    public Iterable<MonitoringData> readReverse(Date fromDate, Date toDate) {
      return read(fromDate, toDate, true);
    }
   
    public Iterable<MonitoringData> read(Date fromDate, Date toDate, final boolean reverse) {
      final ArrayList<TargetFile> filesToRead = new ArrayList<TargetFile>();
      final long fromTime = fromDate == null?Long.MIN_VALUE:fromDate.getTime();
      final long toTime = toDate == null?Long.MAX_VALUE:toDate.getTime();
        synchronized (lock) {
          for (TargetFile target : writtenFiles) {
          if ((fromTime <= target.latestTimestamp || toTime >= target.earliestTimestamp) && target.limit > FIRST_RECORD_POSITION)
            filesToRead.add(target);
        }
          if (currentTarget != null && (fromTime <= currentTarget.latestTimestamp || toTime >= currentTarget.earliestTimestamp) && currentTarget.limit > FIRST_RECORD_POSITION) {
            filesToRead.add(currentTarget);
          }
        }
        Comparator<TargetFile> cmp =
            reverse?
            new Comparator<TargetFile>() {
              @Override
              public int compare(TargetFile o1, TargetFile o2) {
                if (o1.latestTimestamp < o2.latestTimestamp)
                  return 1;
                if (o1.latestTimestamp > o2.latestTimestamp)
                  return -1;
                if (o1.earliestTimestamp < o2.earliestTimestamp)
                  return 1;
                if (o1.earliestTimestamp > o2.earliestTimestamp)
                  return -1;
                return System.identityHashCode(o2)-System.identityHashCode(o1);
              }
            } :
            new Comparator<TargetFile>() {
              @Override
              public int compare(TargetFile o1, TargetFile o2) {
                if (o1.earliestTimestamp < o2.earliestTimestamp)
                  return -1;
                if (o1.earliestTimestamp > o2.earliestTimestamp)
                  return 1;
                if (o1.latestTimestamp < o2.latestTimestamp)
                  return -1;
                if (o1.latestTimestamp > o2.latestTimestamp)
                  return 1;
                return System.identityHashCode(o2)-System.identityHashCode(o1);
              }
            };
               
      Collections.sort(filesToRead, cmp);
     
      return new Iterable<MonitoringData>() {
     
      @Override
      public Iterator<MonitoringData> iterator() {
        return new Iterator<MonitoringData>() {
         
          @SuppressWarnings("unchecked")
          //Has to be sorted in order of ascending earliestTimestamp. openFiles() depends on that
          ArrayList<TargetFile> files = (ArrayList<TargetFile>)filesToRead.clone();
          ArrayList<OpenedFile> openFiles = new ArrayList<OpenedFile>();
          long currentTimestamp;
          MonitoringData nextElement;
         
          boolean openFiles() {
            if (files.isEmpty()) {
              return false;
            }
            if (currentTimestamp == 0)
              currentTimestamp = reverse?files.get(0).latestTimestamp:files.get(0).earliestTimestamp;
            ListIterator<TargetFile> it = files.listIterator();
            while (it.hasNext()) {
              TargetFile tf = it.next();
              if (reverse) {
                if (tf.latestTimestamp < currentTimestamp)
                  break;
              } else {
                if (tf.earliestTimestamp > currentTimestamp)
                  break;
              }
              it.remove();
              try {
                openFiles.add(new OpenedFile(tf, fromTime, toTime, reverse));
                //we have to readjust current time stamp,
                //because the already-opened file may skip some timestamps that are present in the newly-opened file
                //else we could skip opening a file in the middle of these timestamps
                currentTimestamp = reverse?
                    Math.max(currentTimestamp, tf.latestTimestamp)
                    :Math.min(currentTimestamp, tf.earliestTimestamp);
              } catch (IOException e) {
                //if file cannot be opened, we simple ignore it.
              }
            }
            return !openFiles.isEmpty();
          }
         
          @Override
          public boolean hasNext() {
            if (nextElement != null)
              return true;
            openFiles();
            return (nextElement = popNext()) != null;
          }

          private MonitoringData popNext()  {
            MonitoringData monitoringData = null;
            OpenedFile dpSource = null;
            ListIterator<OpenedFile> it = openFiles.listIterator();
            while (it.hasNext()) {
              OpenedFile of = it.next();
              MonitoringData nextDp = of.pop();
              if (nextDp == null) {
                it.remove();
                continue;
              }
              if (monitoringData == null || ((!reverse && nextDp.getTimeStamp().getTime() < monitoringData.getTimeStamp().getTime()) || (reverse && nextDp.getTimeStamp().getTime() > monitoringData.getTimeStamp().getTime()))) {
                if (monitoringData != null) {
                  dpSource.push(monitoringData);
                }
                dpSource = of;
                monitoringData = nextDp;
              }
            }
            if (monitoringData != null) {
              currentTimestamp = monitoringData.getTimeStamp().getTime();
              if (!files.isEmpty() && ((!reverse && files.get(0).earliestTimestamp < currentTimestamp) || (reverse && files.get(0).latestTimestamp > currentTimestamp))) {
                dpSource.push(monitoringData);
                openFiles();
                return popNext();
              }             
            } else if (!files.isEmpty()) {
              currentTimestamp = reverse?files.get(0).latestTimestamp:files.get(0).earliestTimestamp;
              openFiles();
              return popNext();
            }

            return monitoringData;
          }

          @Override
          public MonitoringData next() {
            if (!hasNext())
              throw new NoSuchElementException();
            MonitoringData next = nextElement;
            nextElement = null;
            return next;
          }

          @Override
          public void remove() {
            throw new UnsupportedOperationException();
          }
         
        };
      }
    };
     
    }
   
  public Date getMinDate() {
        synchronized (lock) {
          long min=currentTarget == null?System.currentTimeMillis():currentTarget.earliestTimestamp;
          for (TargetFile target : writtenFiles) {
          min = Math.min(target.earliestTimestamp, min);
        }
        return new Date(min);
        }
  }

  public Date getMaxDate() {
        synchronized (lock) {
        long max=currentTarget == null?System.currentTimeMillis():currentTarget.latestTimestamp;
          for (TargetFile target : writtenFiles) {
          max = Math.max(target.latestTimestamp, max);
        }
        return new Date(max);
        }
  }

  public MonitoringDataStorageInfo getMonitroingDataStorageInfo() {
   
    final HashMap<String,MonitoringDataStorageContentInfo> classToInfo = new HashMap<String,MonitoringDataStorageContentInfo>();
    for (MonitoringData data: read(null,null)){
      String clazz = data.getClass().getName();
      MonitoringDataStorageContentInfo info = classToInfo.get(clazz);
      if (info==null){
        info=new MonitoringDataStorageContentInfo(clazz,1l);
      } else {
        info.setCount(info.getCount()+1);
      }
      classToInfo.put(clazz, info);
    }
    long lSize = 0;
      synchronized (lock) {
        for (TargetFile tf : writtenFiles)
          lSize += tf.limit;
        if (currentTarget != null)
          lSize += currentTarget.limit;
      }
    final double size = (lSize/1024.0d/1024.0d);
    return new MonitoringDataStorageInfo(size,targetPath.getAbsolutePath(),new ArrayList<MonitoringDataStorageContentInfo>(classToInfo.values()),getMinDate(),getMaxDate());
  }

   
}
TOP

Related Classes of de.scoopgmbh.copper.monitoring.core.data.MonitoringDataStorage$TargetFile

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.