Package com.sun.jini.outrigger.snaplogstore

Source Code of com.sun.jini.outrigger.snaplogstore.LogOutputFile

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
*      http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.sun.jini.outrigger.snaplogstore;

import com.sun.jini.outrigger.LogOps;
import com.sun.jini.outrigger.OutriggerServerImpl;
import com.sun.jini.outrigger.StorableObject;
import com.sun.jini.outrigger.StorableResource;
import net.jini.id.Uuid;

import java.io.File;
import java.io.FileDescriptor;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.io.RandomAccessFile;
import java.util.ArrayList;
import java.util.Observable;
import java.util.logging.Level;
import java.util.logging.Logger;

import net.jini.space.InternalSpaceException;

/**
* A class to write a log file, to be read later by
* <code>LogInputFile</code>.  Each operation on the file is forced to
* disk, so when the operation logging function returns, the data is
* committed to the log in a recoverable way.
* <p>
* <code>LogOutputFile</code> cannot extend <code>Observable</code>
* because it must extend <code>LogFile</code> (clearly
* <code>Observable</code> should have been an interface).  It acts as
* an <code>Observable</code> by having a method that returns its
* "observable part", which is an object that reports observable
* events.  Right now the only observable event is the switching to a
* new physical file when the current one becomes full.
*
* @author Sun Microsystems, Inc.
* @see LogInputFile
* @see java.util.Observable
*/
class LogOutputFile extends LogFile implements LogOps {
    private RandomAccessFile  logFile = null;// the current output log file
    private FileDescriptor  logFD;     // the current log file descriptor
    private ObjectOutputStream  out;     // objects written
    private int      suffix;     // the current suffix number
    private int      opCnt;     // number of ops on current file
    private int      maxOps;     // max ops to allow in file
    private Observable    observable;// handle Observer/Observable

    private long logBytes = 0;
    private final byte[] intBuf = new byte[4];
    private final byte[] zeroBuf = new byte[4];

    private long deferedUpdateLength = 0;
    private long deferedPosition = 0;

    private static final long intBytes = 4;

    /** Logger for logging persistent store related information */
    private static final Logger logger =
  Logger.getLogger(OutriggerServerImpl.storeLoggerName);

    /**
     * Create a <code>LogOutputFile</code> object that will stream
     * output to a series of files described by <code>basePath</code>,
     * as interpreted by the relevant <code>LogFile</code>
     * constructor.  When the file becomes full (the maximum number of
     * operations is reached), the file is closed and a new file with
     * the next highest suffix is created.  The
     * <code>Observable</code> notification for this event passes a
     * <code>File</code> argument for the filled file as the argument
     * to <code>Observer</code>.
     *
     * @see #observable()
     */
    //@see com.sun.jini.mercury.LogStream#LogStream(String)
    LogOutputFile(String basePath, int maxOps) throws IOException {
  super(basePath);
  ArrayList inDir = new ArrayList();
  suffix = existingLogs(inDir);
  this.maxOps = maxOps;
  nextPath();
    }

    /**
     * Return an <code>Observable</code> object that represents this object
     * in the Observer/Observable pattern.
     *
     * @see java.util.Observer
     */
    Observable observable() {
  if (observable == null) {       // defer allocation until needed
      observable = new Observable() {  // we only use this if changed
    public void notifyObservers() {
        setChanged();
        super.notifyObservers();
    }
    public void notifyObservers(Object arg) {
        setChanged();
        super.notifyObservers(arg);
    }
      };
  }
  return observable;
    }

    /**
     * Switch this over to the next path in the list
     */
    private void nextPath() throws IOException {
  boolean completed = false;

  if (logFile != null) {

      // If there was a deferred header, write it out now
      //
      if (deferedUpdateLength != 0) {
    logFD.sync();    // force the bytes to disk
    logFile.seek(deferedPosition);
    writeInt((int)deferedUpdateLength);
      }
      try {
    close();             // close the stream and the file
      } catch (IOException ignore) { } // assume this is okay
      completed = true;
  }

  suffix++;      // go to next suffix
  logFile = new RandomAccessFile(baseDir.getPath() + File.separator +
               baseFile + suffix, "rw");
  logFD = logFile.getFD();
  out = new ObjectOutputStream(new LogOutputStream(logFile));

  writeInt(LOG_VERSION);

  logBytes = logFile.getFilePointer();
  logFile.setLength(logBytes);

  // always start out with zero length header for the next update
  logFile.write(zeroBuf);

  // force length header to disk
  logFD.sync();

  deferedUpdateLength = 0;
  opCnt = 0;

  /*
   * Tell consumer about the completed log.  This is done after the
   * new one is created so that the old path can be known not
   * to be the newest (because something newer is there).
   */
  if (observable != null && completed)
      observable.notifyObservers();
    }

    /**
     * Close the log, but don't remove it.
     */
    synchronized void close() throws IOException {
  if (logFile != null) {
      try {
    out.close();
    logFile.close();
      } finally {
    logFile = null;
      }
  }
    }

    /**
     * Override destroy so we can try to close logFile before calling
     * super tries to delete all the files.
     */
    void destroy() {
  try {
      close();
  } catch (Throwable t) {
      // Don't let failure keep us from deleting the files we can     
  }
  super.destroy();
    }

    /**
     * Log a server boot.
     */
    public synchronized void bootOp(long time, long sessionId) {
  try {
      out.writeByte(BOOT_OP);
      out.writeLong(time);
      out.writeLong(sessionId);
      flush();
  } catch (IOException e) {
      failed(e);
  }
    }

    /**
     * Log a change in join state
     */
    public synchronized void joinStateOp(StorableObject state) {
  try {
      out.writeByte(JOINSTATE_OP);
      out.writeObject(new BaseObject(state));
      flush();
  } catch (IOException e) {
      failed(e);
  }
    }

    /**
     * Log a <code>write</code> operation.
     */
    public synchronized void writeOp(StorableResource entry, Long txnId) {
  try {
      out.writeByte(WRITE_OP);
      out.writeObject(new Resource(entry));
      out.writeObject(txnId);

      // A write operation under a transaction does not need to be
      // flushed until it is prepared.
      //
      flush(txnId == null);
  } catch (IOException e) {
      failed(e);
  }
    }

    // Inherit java doc from supertype
    public synchronized void writeOp(StorableResource entries[], Long txnId) {
  try {
      out.writeByte(BATCH_WRITE_OP);
      out.writeObject(txnId);

      // In the middle of records we need to use the stream's
      // writeInt, not our private one     
      out.writeInt(entries.length);
      for (int i=0; i<entries.length; i++) {
    out.writeObject(new Resource(entries[i]));
      }

      // A write operation under a transaction does not need to be
      // flushed until it is prepared.
      //
      flush(txnId == null, entries.length);
  } catch (IOException e) {
      failed(e);
  }
    }

    /**
     * Log a <code>take</code> operation.
     */
    public synchronized void takeOp(Uuid cookie, Long txnId) {
  try {
      out.writeByte(TAKE_OP);
      cookie.write(out);
      out.writeObject(txnId);

      // A take operation under a transaction does not need to be
      // flushed until it is prepared.
      //
      flush(txnId == null);
  } catch (IOException e) {
      failed(e);
  }
    }

    // Inherit java doc from supertype
    public synchronized void takeOp(Uuid cookies[], Long txnId) {
  try {
      out.writeByte(BATCH_TAKE_OP);
      out.writeObject(txnId);

      // In the middle of records we need to use the stream's
      // writeInt, not our private one     
      out.writeInt(cookies.length);
      for (int i=0; i<cookies.length; i++) {
    cookies[i].write(out);
      }

      // A take operation under a transaction does not need to be
      // flushed until it is prepared.
      //
      flush(txnId == null, cookies.length);
  } catch (IOException e) {
      failed(e);
  }
    }

    /**
     * Log a <code>notify</code> operation.
     */
    public synchronized void registerOp(StorableResource registration,
          String type, StorableObject[] templates)
    {
  try {
      out.writeByte(REGISTER_OP);
      out.writeObject(new Registration(registration, type, templates));
      flush();
  } catch (IOException e) {
      failed(e);
  }
    }

    /**
     * Log a <code>renew</code> operation.
     */
    public synchronized void renewOp(Uuid cookie, long expiration) {
  try {
      out.writeByte(RENEW_OP);
      cookie.write(out);
      out.writeLong(expiration);
      flush();
  } catch (IOException e) {
      failed(e);
  }
    }

    /**
     * Log a <code>cancel</code> operation.
     */
    public synchronized void cancelOp(Uuid cookie, boolean expired) {
  try {
      out.writeByte(CANCEL_OP);
      cookie.write(out);

      // cancels due to expiration don't need to be flushed
      // right away
      flush(!expired);
  } catch (IOException e) {
      failed(e);
  }
    }

    /**
     * Log a transaction <code>prepare</code> operation.
     */
    public synchronized void prepareOp(Long txnId,
               StorableObject transaction) {
  try {
      out.writeByte(PREPARE_OP);
      out.writeObject(txnId);
      out.writeObject(new BaseObject(transaction));
      flush();
  } catch (IOException e) {
      failed(e);
  }
    }

    /**
     * Log a transaction <code>commit</code> operation.
     */
    public synchronized void commitOp(Long txnId) {
  try {
      out.writeByte(COMMIT_OP);
      out.writeObject(txnId);
      flush();
  } catch (IOException e) {
      failed(e);
  }
    }

    /**
     * Log a transaction <code>abort</code> operation.
     */
    public synchronized void abortOp(Long txnId) {
  try {
      out.writeByte(ABORT_OP);
      out.writeObject(txnId);
      flush();
  } catch (IOException e) {
      failed(e);
  }
    }

    public synchronized void uuidOp(Uuid uuid) {
  try {
      out.writeByte(UUID_OP);
      uuid.write(out);
      flush();
  } catch (IOException e) {
      failed(e);
  }
    }

    /**
     * Flush the current output after an operation.  If the number of
     * operations is exceeded, shift over to the next path. 
     */
    private void flush() throws IOException {
  flush(true);
    }
   

    /**
     * Conditionally flush the current output. If the number of
     * operations is exceeded, shift over to the next path even if
     * <code>forceToDisk</code> is <code>false</code>.
     */
    private synchronized void flush(boolean forceToDisk)
  throws IOException
    {
  flush(forceToDisk, 1);
    }

    /**
     * Conditionally flush the current output. If the number of
     * operations is exceeded, shift over to the next path even if
     * <code>forceToDisk</code> is <code>false</code>.
     */
    private synchronized void flush(boolean forceToDisk,
            int effectiveOpCount)
  throws IOException
    {
  assert effectiveOpCount > 0;

  out.flush();

  if (forceToDisk) {

      // must force contents to disk before writing real length header
      logFD.sync();
  }

  long entryEnd = logFile.getFilePointer();
  long updateLen = entryEnd - logBytes - intBytes;

        // If we are not forcing to disk, we want to defer the write of the
        // first header. This will leave a zero just after the last sync'ed
        // record and will assure that LogInputFile will not read a partially
        // written record.
        //
        if (!forceToDisk) {

      // If this is the first flush(false) we save the header information
      // and location for later. Otherwise we write out the header
      // normally.
      //
      if (deferedUpdateLength == 0) {
    deferedUpdateLength = updateLen;  // save the header length
    deferedPosition = logBytes;       // and position for later
      } else {
    // write real length header
    logFile.seek(logBytes);
    writeInt((int)updateLen);
      }
  } else {

      // If there was a deferred header, write that out now and
      // then write the current header.
      //
      if (deferedUpdateLength != 0) {
    logFile.seek(deferedPosition);
    writeInt((int)deferedUpdateLength);
    deferedUpdateLength = 0;
      }
      // write real length header
      logFile.seek(logBytes);
      writeInt((int)updateLen);
  }

  // pad out update record so length header does not span disk blocks
  entryEnd = (entryEnd + 3) & ~3L;

  // write zero length header for next update
  logFile.seek(entryEnd);
  logFile.write(zeroBuf);
  logBytes = entryEnd;

  if (forceToDisk)
      logFD.sync();
 
  opCnt += effectiveOpCount;
  if (opCnt >= maxOps)
      nextPath();
  else
      out.reset();    // not critical to flush this
    }

    /**
     * Write an int value in single write operation. Note we only use
     * this method when writing log file and recored headers.  We
     * can't use it inside records because the data inside records is
     * written/read using <code>ObjectIn/OutputStream</code> and this
     * method writes directly to the <code>RandomAccessFile</code>.
     *  
     * @param val int value
     * @throws IOException if any other I/O error occurs
     */
    private void writeInt(int val) throws IOException {
  intBuf[0] = (byte) (val >> 24);
  intBuf[1] = (byte) (val >> 16);
  intBuf[2] = (byte) (val >> 8);
  intBuf[3] = (byte) val;
  logFile.write(intBuf);
    }

    private void failed(Exception e) throws InternalSpaceException {
  logger.log(Level.SEVERE,
       "Unexpected I/O error while persisting Space data",
       e);
  System.exit(-5);
    }
}
TOP

Related Classes of com.sun.jini.outrigger.snaplogstore.LogOutputFile

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.