Package com.persistit

Source Code of com.persistit.TimelyResource$Entry

/**
* Copyright 2012 Akiban Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.persistit;

import static com.persistit.TransactionIndex.tss2vh;
import static com.persistit.TransactionIndex.vh2step;
import static com.persistit.TransactionIndex.vh2ts;
import static com.persistit.TransactionStatus.ABORTED;
import static com.persistit.TransactionStatus.PRIMORDIAL;
import static com.persistit.TransactionStatus.TIMED_OUT;
import static com.persistit.TransactionStatus.UNCOMMITTED;

import java.util.ArrayList;
import java.util.List;

import com.persistit.Version.PrunableVersion;
import com.persistit.Version.VersionCreator;
import com.persistit.exception.PersistitException;
import com.persistit.exception.PersistitInterruptedException;
import com.persistit.exception.RollbackException;
import com.persistit.exception.TimeoutException;
import com.persistit.exception.WWRetryException;

/**
* <p>
* Transactionally manage multiple versions of a resource. For example, a
* resource which caches state created and either committed or rolled back by a
* transaction may be use a TimelyResource to hold its versions. Each version
* must implement the {@link Version} interface and may optionally implement
* {@link PrunableVersion} in which case its {@link PrunableVersion#prune()}
* method is called when <code>TimelyResource</code> detects that the version is
* obsolete.
* </p>
* <p>
* The method {@link #addVersion(Version, Transaction)} attempts to add a new
* version on behalf of the supplied transaction.
* </p>
* <p>
* The method {@link #getVersion()} returns the snapshot version associated with
* the transaction, in other words, an instance of a P which was either
* committed last before this transaction started, or which was created by this
* transaction. If there is no such version the method returns <code>null</code>
* .
* </p>
* <p>
* The method {@link #getVersion(com.persistit.Version.VersionCreator)} does the
* same thing, except that if there is no snapshot version the
* {@link VersionCreator#createVersion(TimelyResource)} method is called to
* construct a new version.
* </p>
*
* @author peter
*
* @param <V>
*            specific type of {@link Version} this <code>TimelyResource</code>
*            manages
*/
public class TimelyResource<V extends Version> {

    private final Persistit _persistit;
    private volatile Entry _first;

    public TimelyResource(final Persistit persistit) {
        _persistit = persistit;
        _persistit.addTimelyResource(this);
    }

    private long tss2v(final Transaction txn) {
        if (txn == null || !txn.isActive()) {
            return tss2vh(_persistit.getTimestampAllocator().updateTimestamp(), 0);
        } else {
            return tss2vh(txn.getStartTimestamp(), txn.getStep());
        }
    }

    public synchronized void delete() throws RollbackException, PersistitException {
        if (_first == null) {
            throw new IllegalStateException("There is no resource to delete");
        }
        if (!_first.isDeleted()) {
            final Transaction txn = _persistit.getTransaction();
            final long version = tss2v(txn);
            if (version == _first.getVersion()) {
                _first.setDeleted();
            } else {
                final V resource = _first.getResource();
                final Entry entry = new Entry(version, resource);
                entry.setDeleted();
                addVersion(entry, txn);
            }
        }
    }

    public void addVersion(final V resource, final Transaction txn) throws PersistitInterruptedException,
            RollbackException {
        if (resource == null) {
            throw new NullPointerException("Null resource");
        }
        addVersion(new Entry(tss2v(txn), resource), txn);
    }

    public V getVersion() throws TimeoutException, PersistitInterruptedException {
        final Entry first = _first;
        if (first != null && first.getVersion() == PRIMORDIAL) {
            return first.getResource();
        }
        final Transaction txn = _persistit.getTransaction();
        return getVersion(tss2v(txn));
    }

    public V getVersion(final VersionCreator<V> creator) throws PersistitException, RollbackException {
        final Entry first = _first;
        if (first != null && first.getVersion() == PRIMORDIAL) {
            return first.getResource();
        }
        final Transaction txn = _persistit.getTransaction();
        V version = getVersion(tss2v(txn));
        if (version == null) {
            version = creator.createVersion(this);
            addVersion(version, txn);
        }
        return version;
    }

    /**
     * @return <code>true</code> if and only if this <code>TimelyResource</code>
     *         has no <code>Version</code> instances.
     */
    public boolean isEmpty() throws TimeoutException, PersistitInterruptedException {
        Entry first = _first;
        if (first == null) {
            return true;
        }
        if (first.getVersion() == PRIMORDIAL) {
            return false;
        }
        first = getEntry(tss2v(_persistit.getTransaction()));
        if (first == null) {
            return true;
        }
        if (first.isDeleted()) {
            return true;
        }
        return false;
    }

    /**
     *
     * @return Whether this resource exists only within the context of the
     *         current transaction.
     * @throws TimeoutException
     * @throws PersistitInterruptedException
     */

    public boolean isTransactionPrivate(final boolean byStep) throws TimeoutException, PersistitInterruptedException {
        Entry entry = _first;
        if (entry != null && entry.getVersion() == PRIMORDIAL) {
            return false;
        }
        final Transaction txn = _persistit.getTransaction();
        final long versionHandle = tss2v(txn);
        entry = getEntry(versionHandle);
        if (entry == null) {
            return true;
        } else {
            if (byStep) {
                return entry.getVersion() == versionHandle;
            } else {
                return vh2ts(entry.getVersion()) == vh2ts(versionHandle);
            }
        }
    }

    /**
     * @return Count of versions currently being managed.
     */
    int getVersionCount() {
        int count = 0;
        for (Entry e = _first; e != null; e = e._previous) {
            count++;
        }
        return count;
    }

    @Override
    public String toString() {
        final StringBuilder sb = new StringBuilder("TimelyResource(");
        boolean first = true;
        for (Entry entry = _first; entry != null; entry = entry.getPrevious()) {
            if (sb.length() > 1000) {
                sb.append("...");
                break;
            }
            if (!first) {
                sb.append(',');
            } else {
                first = false;
            }
            sb.append(entry);
        }
        sb.append(')');
        return sb.toString();
    }

    synchronized void setPrimordial() {
        if (_first != null && _first.getPrevious() == null) {
            _first.setPrimordial();
        } else {
            throw new IllegalStateException("Cannot be made primordial: " + this);
        }
    }

    /**
     * Remove all obsolete <code>Version</code> instances. For each instance
     * that implements <code>PrunableVersion</code>, invoke its
     * {@link PrunableVersion#prune()} method.
     *
     * @throws TimeoutException
     * @throws PersistitException
     */
    void prune() throws TimeoutException, PersistitException {
        final List<Entry> entriesToPrune = new ArrayList<Entry>();
        PrunableVersion versionToVacate = null;

        final TransactionIndex ti = _persistit.getTransactionIndex();

        synchronized (this) {
            try {

                Entry newer = null;
                Entry latest = null;
                boolean isPrimordial = true;
                long lastCommit = UNCOMMITTED;

                for (Entry entry = _first; entry != null; entry = entry.getPrevious()) {
                    boolean keepIt = false;
                    final long versionHandle = entry.getVersion();
                    final long tc = ti.commitStatus(versionHandle, UNCOMMITTED, 0);
                    if (tc >= PRIMORDIAL) {
                        if (tc == UNCOMMITTED) {
                            keepIt = true;
                            isPrimordial = false;
                        } else {
                            final boolean hasConcurrent = ti.hasConcurrentTransaction(tc, lastCommit);
                            if (latest == null || hasConcurrent) {
                                keepIt = true;
                                if (latest == null) {
                                    latest = entry;
                                }
                            }
                            if (keepIt && ti.hasConcurrentTransaction(0, tc)) {
                                isPrimordial = false;
                            }
                        }
                        lastCommit = tc;
                    } else {
                        assert tc == ABORTED;
                    }
                    if (keepIt) {
                        newer = entry;
                    } else {
                        if (tc == ABORTED ^ entry.isDeleted()) {
                            entriesToPrune.add(entry);
                        }
                        if (newer == null) {
                            _first = entry.getPrevious();
                        } else {
                            newer.setPrevious(entry.getPrevious());
                        }
                    }
                }
                if (isPrimordial && _first != null) {
                    assert _first.getPrevious() == null;
                    if (_first.isDeleted()) {
                        final V version = _first.getResource();
                        if (version instanceof PrunableVersion) {
                            versionToVacate = (PrunableVersion) version;
                        }
                        entriesToPrune.add(_first);
                        _first = null;
                    } else {
                        _first.setPrimordial();
                    }
                }
            } catch (final InterruptedException ie) {
                throw new PersistitInterruptedException(ie);
            }
        }
        for (final Entry e : entriesToPrune) {
            if (versionToVacate != null) {
                versionToVacate.vacate();
            }
            e.prune();
        }
    }

    /**
     * Helper method that adds an Entry containing a Version and its timestamp
     * information. This method checks for write-write dependencies throws a
     * RollbackException if there is a conflict.
     *
     * @param entry
     * @param txn
     * @throws PersistitException
     * @throws RollbackException
     */
    private void addVersion(final Entry entry, final Transaction txn) throws PersistitInterruptedException,
            RollbackException {
        final TransactionIndex ti = _persistit.getTransactionIndex();
        while (true) {
            try {
                synchronized (this) {
                    if (_first != null) {
                        if (_first.getVersion() > entry.getVersion()) {
                            /*
                             * This thread lost a race to make the most recent
                             * version
                             */
                            throw new RollbackException();
                        }
                        if (txn.isActive()) {
                            for (Entry e = _first; e != null; e = e.getPrevious()) {
                                final long version = e.getVersion();
                                final long depends = ti.wwDependency(version, txn.getTransactionStatus(), 0);
                                if (depends == TIMED_OUT) {
                                    throw new WWRetryException(version);
                                }
                                if (depends != 0 && depends != ABORTED) {
                                    /*
                                     * version is from a concurrent transaction
                                     * that already committed or timed out
                                     * waiting to see. Either way, must abort.
                                     */
                                    throw new RollbackException();
                                }
                            }
                        }
                    }
                    entry.setPrevious(_first);
                    _first = entry;
                    break;
                }
            } catch (final WWRetryException re) {
                try {
                    final long depends = _persistit.getTransactionIndex().wwDependency(re.getVersionHandle(),
                            txn.getTransactionStatus(), SharedResource.DEFAULT_MAX_WAIT_TIME);
                    if (depends != 0 && depends != ABORTED) {
                        /*
                         * version is from concurrent txn that already committed
                         * or timed out waiting to see. Either way, must abort.
                         */
                        throw new RollbackException();
                    }
                } catch (final InterruptedException ie) {
                    throw new PersistitInterruptedException(ie);
                }
            } catch (final InterruptedException ie) {
                throw new PersistitInterruptedException(ie);
            }
        }
    }

    /**
     * Get the <code>Version</code> from the snapshot view specified by the
     * supplied version handle.
     *
     * @param version
     *            versionHandle
     * @return <code>Version</code> for given version handle
     * @throws TimeoutException
     * @throws PersistitInterruptedException
     */
    V getVersion(final long version) throws TimeoutException, PersistitInterruptedException {
        final Entry e = getEntry(version);
        return e == null ? null : e.getResource();
    }

    Entry getEntry(final long version) throws TimeoutException, PersistitInterruptedException {
        final TransactionIndex ti = _persistit.getTransactionIndex();
        try {
            /*
             * Note: not necessary to synchronize here. A concurrent transaction
             * may modify _first, but this method does not need to see that
             * version since it has not been committed. Conversely, if there is
             * some transaction that committed before this transaction's start
             * timestamp, then there is a happened-before relationship due to
             * the synchronization in transaction registration; since _first is
             * volatile, we are guaranteed to see the modification made by the
             * committed transaction.
             */
            for (Entry e = _first; e != null; e = e._previous) {
                final long commitTs = ti.commitStatus(e.getVersion(), vh2ts(version), vh2step(version));
                if (commitTs >= 0 && commitTs != UNCOMMITTED) {
                    if (e.isDeleted()) {
                        return null;
                    }
                    return e;
                }
            }
            return null;
        } catch (final InterruptedException e) {
            throw new PersistitInterruptedException(e);
        }
    }

    /**
     * <p>
     * Holder for one instance of an object, such as a {@link Tree}, whose
     * availability is governed by visibility within a transaction.
     * </p>
     * <p>
     * An Entry has a version handle just like a version in an {@link MVV}. It
     * is invisible to any transactions that started before the start timestamp,
     * in write-write conflict with any concurrent transaction, and available to
     * any transaction that starts after the version has committed. Visibility
     * within the transaction that creates the TimelyResourceEntry is determined
     * by the relative step numbers of the current transaction and the version
     * handle. If there exists a TimelyResource instance with start and commit
     * timestamps ts1, tc1 then any attempt to add another TimelyResource will
     * fail unless ts2 > tc1.
     * </p>
     *
     * @author peter
     */

    private class Entry {

        private long _version;
        private V _resource;
        private volatile boolean _deleted;
        private volatile Entry _previous;

        private Entry(final long versionHandle, final V resource) {
            _version = versionHandle;
            _resource = resource;
        }

        private V getResource() {
            return _resource;
        }

        private void setResource(final V resource) {
            _resource = resource;
        }

        private Entry getPrevious() {
            return _previous;
        }

        private void setPrevious(final Entry tr) {
            _previous = tr;
        }

        private long getVersion() {
            return _version;
        }

        private void setPrimordial() {
            _version = PRIMORDIAL;
        }

        private void setDeleted() {
            _deleted = true;
        }

        private boolean isDeleted() {
            return _deleted;
        }

        private boolean prune() throws PersistitException {
            if (_resource instanceof PrunableVersion) {
                return ((PrunableVersion) _resource).prune();
            } else {
                return true;
            }
        }

        @Override
        public String toString() {
            String tcStatus;
            try {
                final long tc = _persistit.getTransactionIndex().commitStatus(_version, Long.MAX_VALUE, 0);
                tcStatus = TransactionStatus.tcString(tc);
            } catch (final Exception e) {
                tcStatus = e.toString();
            }
            return String.format("(ts=%s tc=%s)->%s%s", TransactionStatus.versionString(_version), tcStatus, _resource,
                    _previous != null ? "*" : "");
        }
    }

}
TOP

Related Classes of com.persistit.TimelyResource$Entry

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.