Package com.atomikos.persistence.imp

Source Code of com.atomikos.persistence.imp.StreamObjectLog

/**
* Copyright (C) 2000-2010 Atomikos <info@atomikos.com>
*
* This code ("Atomikos TransactionsEssentials"), by itself,
* is being distributed under the
* Apache License, Version 2.0 ("License"), a copy of which may be found at
* http://www.atomikos.com/licenses/apache-license-2.0.txt .
* You may not use this file except in compliance with the License.
*
* While the License grants certain patent license rights,
* those patent license rights only extend to the use of
* Atomikos TransactionsEssentials by itself.
*
* This code (Atomikos TransactionsEssentials) contains certain interfaces
* in package (namespace) com.atomikos.icatch
* (including com.atomikos.icatch.Participant) which, if implemented, may
* infringe one or more patents held by Atomikos.
* It should be appreciated that you may NOT implement such interfaces;
* licensing to implement these interfaces must be obtained separately from Atomikos.
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/

package com.atomikos.persistence.imp;

import java.util.Enumeration;
import java.util.Hashtable;
import java.util.Stack;
import java.util.Vector;

import com.atomikos.persistence.LogException;
import com.atomikos.persistence.LogStream;
import com.atomikos.persistence.ObjectLog;
import com.atomikos.persistence.Recoverable;

/**
*
* implementation. It keeps on growing, and only does a checkpoint on restart.
*
*
*/

public class StreamObjectLog implements ObjectLog
{
    protected LogStream logstream_;
    protected Hashtable logTable_;

    protected long size_;
    private boolean initialized_ = false;
    protected boolean panic_ = false;
    // if true: flush methods generate exception
    // set by writeCheckpoint
    // for diagnostics

    private long count_;
    // how many flushes since last checkpoint?

    private long maxCount_;
    // how many is max count until next checkpoint?

    private StreamObjectLog ()
    {
        // not to be called
    }

    /**
     * Constructor. Builds a new StreamObjectLog with the given logstream and
     * the specified maximum number of entries.
     *
     * @param logstream
     *            The underlying logstream. This stream should be reserved for
     *            this instance! Upon close, the underlying stream will also be
     *            closed.
     * @param checkpointInterval
     *            How many flush() calls between two checkpoints?
     * @param console
     *            For output of feedback.
     *
     */

    public StreamObjectLog ( LogStream logstream , long checkpointInterval )
    {
        logstream_ = logstream;
        size_ = 0;
        logTable_ = new Hashtable ();
        maxCount_ = checkpointInterval;
        count_ = 0;
    }

    /**
     * Checks if count limit is reached, writes checkpoint if so.
     */

    private synchronized void writeCheckpoint () throws LogException
    {

        count_++;
        if ( count_ >= maxCount_ ) {
            logstream_.writeCheckpoint ( logTable_.elements () );
            count_ = 0;
        }

    }

    /**
     * @see ObjectLog
     */

    public synchronized void init () throws LogException
    {
        Stack errors = new Stack ();
        Vector recovered = null;

        if ( initialized_ )
            return;

        try {
            recovered = logstream_.recover ();

            if ( recovered != null ) {

                Enumeration entries = recovered.elements ();
                while ( entries.hasMoreElements () ) {
                    SystemLogImage entry = (SystemLogImage) entries
                            .nextElement ();

                    if ( entry.getId () != null ) {

                        if ( !entry.isForgettable () ) {
                            if ( !logTable_.containsKey ( entry.getId () ) )
                                size_++;

                            // this replaces previous entries of this tid
                            logTable_.put ( entry.getId (), entry );

                        } else if ( logTable_.containsKey ( entry.getId () ) ) {

                            // condition needed for duplicate deletes
                            // otherwise size will not be right.
                            // duplicate deletes are possible because a
                            // terminator entry may be written more than once

                            logTable_.remove ( entry.getId () );

                            size_--;

                        }

                    } // if
                } // while
            } // if

        } // try
        catch ( LogException le ) {
            throw le;
        } catch ( Exception e ) {

            // bad exit condition
            errors.push ( e );
            throw new LogException ( e.getMessage (), errors );

        }// catch

        finally {
            initialized_ = true;
        }

        // write checkpoint, so that instance will work with new file.
        // ONLY done if init worked OK!
        // After this, fout_ is ready for flush

        logstream_.writeCheckpoint ( logTable_.elements () );
    }

    /**
     * @see ObjectLog
     */

    public synchronized Vector recover () throws LogException
    {
        Stack errors = new Stack ();
        Vector hist = new Vector ();

        if ( !initialized_ )
            throw new LogException ( "Not initialized" );
        Enumeration enumm = logTable_.elements ();

        while ( enumm.hasMoreElements () ) {
            SystemLogImage next = (SystemLogImage) enumm.nextElement ();

            hist.addElement ( next.getObjectImage ().restore () );
        }// while

        return hist;

    }// recover

    /**
     * @see ObjectLog
     */

    public synchronized void flush ( Recoverable rec ) throws LogException
    {
        if ( rec == null )
            return;

        SystemLogImage simg = new SystemLogImage ( rec, false );
        flush ( simg , true );

    }

    protected synchronized void flush ( SystemLogImage img , boolean shouldSync )
            throws LogException
    {
        Stack errors = new Stack ();

        if ( img == null )
            return;

        // test if last checkpoint was written ok.
        if ( panic_ )
            throw new LogException ( "StreamObjectLog: PANIC" );

        try {

            try {
                logstream_.flushObject ( img , shouldSync );
                writeCheckpoint ();
                // fout_.flush();
            } catch ( LogException ioerr ) {
                ioerr.printStackTrace ();
                errors.push ( ioerr );
                // make sure that logfile remains in consistent state by
                // checkpointing
                try {
                    logstream_.writeCheckpoint ( logTable_.elements () );
                } catch ( Exception e ) {
                    errors.push ( e );
                }
                throw new LogException ( ioerr.getMessage (), errors );
            }

            // replace/add local tid status in logTable_.
            // for Checkpointing!

            if ( img.isForgettable () ) {
                if ( logTable_.containsKey ( img.getId () ) ) {
                    // to avoid that logTable_ keeps growing!
                    logTable_.remove ( img.getId () );
                    size_--;
                }

            } else {
                if ( !logTable_.containsKey ( img.getId () ) ) {
                    size_++;
                }
                logTable_.put ( img.getId (), img );

            }

        }// try
        catch ( LogException le ) {
            System.err.println ( "Error in StreamObjectLog.flush() "
                    + le.getMessage () );
            throw le;
        } catch ( Exception e ) {

            System.err.println ( "Error in StreamObjectLog.flush() "
                    + e.getMessage () );

            errors.push ( e );
            throw new LogException ( e.getMessage (), errors );
        }// catch
    }

    /**
     * @see ObjectLog
     */

    public synchronized Recoverable recover ( Object id ) throws LogException
    {
        if ( !logTable_.containsKey ( id ) )
            return null;
        SystemLogImage simg = (SystemLogImage) logTable_.get ( id );
        return simg.getObjectImage ().restore ();

    }

    /**
     * @see ObjectLog
     */

    public synchronized void delete ( Object id ) throws LogException
    {
        SystemLogImage previous = (SystemLogImage) logTable_.get ( id );
        if ( previous == null ) {
            // all actives are in table -> if not there: already deleted
            return;
        }
        Recoverable bogus = previous.getRecoverable ();
        SystemLogImage simg = new SystemLogImage ( bogus, true );
        flush ( simg , false );
    }

    /**
     * @see ObjectLog
     */

    public synchronized void close () throws LogException
    {
        Stack errors = new Stack ();
        try {
            if ( logstream_ != null ) {
                logstream_.close ();
            }
            initialized_ = false;
            // so logstream will be read on re-init ( restart of client TM )

        } catch ( LogException le ) {
            throw le;
        } catch ( Exception e ) {
          e.printStackTrace();
            errors.push ( e );
            throw new LogException ( e.getMessage (), errors );
        }
    }

}
TOP

Related Classes of com.atomikos.persistence.imp.StreamObjectLog

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.