Package org.voltdb.export

Source Code of org.voltdb.export.ExportManager$GenerationDrainRunnable

/* This file is part of VoltDB.
* Copyright (C) 2008-2014 VoltDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with VoltDB.  If not, see <http://www.gnu.org/licenses/>.
*/

package org.voltdb.export;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicReference;

import org.voltcore.logging.Level;
import org.voltcore.logging.VoltLogger;
import org.voltcore.messaging.HostMessenger;
import org.voltcore.utils.COWSortedMap;
import org.voltcore.utils.DBBPool;
import org.voltdb.CatalogContext;
import org.voltdb.VoltDB;
import org.voltdb.catalog.Cluster;
import org.voltdb.catalog.Connector;
import org.voltdb.catalog.ConnectorProperty;
import org.voltdb.catalog.Database;
import org.voltdb.utils.LogKeys;
import org.voltdb.utils.VoltFile;

import com.google_voltpatches.common.base.Preconditions;
import com.google_voltpatches.common.base.Throwables;

/**
* Bridges the connection to an OLAP system and the buffers passed
* between the OLAP connection and the execution engine. Each processor
* implements ExportDataProcessor interface. The processors are passed one
* or more ExportDataSources. The sources map, currently, 1:1 with Export
* enabled tables. The ExportDataSource has poll() and ack() methods that
* processors may use to pull and acknowledge as processed, EE Export data.
* Data passed to processors is wrapped in ExportDataBlocks which in turn
* wrap a BBContainer.
*
* Processors are loaded by reflection based on configuration in project.xml.
*/
public class ExportManager
{

    /**
     * Processors also log using this facility.
     */
    private static final VoltLogger exportLog = new VoltLogger("EXPORT");

    private final COWSortedMap<Long,ExportGeneration> m_generations =
            new COWSortedMap<Long, ExportGeneration>();
    /*
     * When a generation is drained store a the id so
     * we can tell if a buffer comes late
     */
    private final CopyOnWriteArrayList<Long> m_generationGhosts =
            new CopyOnWriteArrayList<Long>();

    private final HostMessenger m_messenger;

    /**
     * Set of partition ids for which this export manager instance is master of
     */
    private final Set<Integer> m_masterOfPartitions = new HashSet<Integer>();

    /**
     * Thrown if the initial setup of the loader fails
     */
    public static class SetupException extends Exception {
        private static final long serialVersionUID = 1L;

        SetupException(final String msg) {
            super(msg);
        }

        SetupException(final Throwable cause) {
            super(cause);
        }
    }

    /**
     * Connections OLAP loaders. Currently at most one loader allowed.
     * Supporting multiple loaders mainly involves reference counting
     * the EE data blocks and bookkeeping ACKs from processors.
     */
    AtomicReference<ExportDataProcessor> m_processor = new AtomicReference<ExportDataProcessor>();

    /** Obtain the global ExportManager via its instance() method */
    private static ExportManager m_self;
    private final int m_hostId;

    private String m_loaderClass;

    private volatile Properties m_processorConfig = new Properties();

    /*
     * Issue a permit when a generation is drained so that when we are truncating if a generation
     * is completely truncated we can wait for the on generation drained task to finish.
     *
     * This eliminates a race with CL replay where it may do catalog updates and such while truncation
     * is still running on generation drained.
     */
    private final Semaphore m_onGenerationDrainedForTruncation = new Semaphore(0);

    public class GenerationDrainRunnable implements Runnable {

        private final ExportGeneration m_generation;

        public GenerationDrainRunnable(ExportGeneration generation) {
            m_generation = generation;
        }

        @Override
        public void run() {
            /*
             * Do all the work to switch to a new generation in the thread for the processor
             * of the old generation
             */
            ExportDataProcessor proc = m_processor.get();
            if (proc == null) {
                VoltDB.crashLocalVoltDB("No export data processor found", true, null);
            }
            proc.queueWork(new Runnable() {
                @Override
                public void run() {
                    try {
                        rollToNextGeneration(m_generation);
                    } catch (RuntimeException e) {
                        exportLog.error("Error rolling to next export generation", e);
                    } catch (Exception e) {
                        exportLog.error("Error rolling to next export generation", e);
                    } finally {
                        m_onGenerationDrainedForTruncation.release();
                    }
                }

            });
        }

    }

    private void rollToNextGeneration(ExportGeneration drainedGeneration) throws Exception {
        ExportDataProcessor newProcessor = null;
        ExportDataProcessor oldProcessor = null;
        synchronized (ExportManager.this) {
            boolean installNewProcessor = false;
            if (m_generations.containsValue(drainedGeneration)) {
                m_generations.remove(drainedGeneration.m_timestamp);
                m_generationGhosts.add(drainedGeneration.m_timestamp);
                installNewProcessor = (m_processor.get().getExportGeneration() == drainedGeneration);
                exportLog.info("Finished draining generation " + drainedGeneration.m_timestamp);
            } else {
                exportLog.warn("Finished draining a generation that is not known to export generations.");
            }

            try {
                if (m_loaderClass != null && !m_generations.isEmpty() && installNewProcessor) {
                    exportLog.info("Creating connector " + m_loaderClass);
                    final Class<?> loaderClass = Class.forName(m_loaderClass);
                    //Make it so
                    ExportGeneration nextGeneration = m_generations.firstEntry().getValue();
                    newProcessor = (ExportDataProcessor) loaderClass.newInstance();
                    newProcessor.addLogger(exportLog);
                    newProcessor.setExportGeneration(nextGeneration);
                    newProcessor.setProcessorConfig(m_processorConfig);
                    newProcessor.readyForData();

                    if (!nextGeneration.isContinueingGeneration()) {
                        /*
                         * Changes in partition count can make the load balancing strategy not capture
                         * all partitions for data that was from a previously larger cluster.
                         * For those use a naive leader election strategy that is implemented
                         * by export generation.
                         */
                        nextGeneration.kickOffLeaderElection();
                    } else {
                        /*
                         * This strategy is the one that piggy backs on
                         * regular partition mastership distribution to determine
                         * who will process export data for different partitions.
                         * We stashed away all the ones we have mastership of
                         * in m_masterOfPartitions
                         */
                        for (Integer partitionId : m_masterOfPartitions) {
                            nextGeneration.acceptMastershipTask(partitionId);
                        }
                    }
                    oldProcessor = m_processor.getAndSet(newProcessor);
                }
            } catch (Exception e) {
                VoltDB.crashLocalVoltDB("Error creating next export processor", true, e);
            }
        }

        /*
         * The old processor should shutdown if we installed a new processor.
         */
        if (oldProcessor != null) {
            oldProcessor.shutdown();
        }
        try {
            //We close and delete regardless
            drainedGeneration.closeAndDelete();
        } catch (IOException e) {
            e.printStackTrace();
            exportLog.error(e);
        }
    }

    /**
     * Construct ExportManager using catalog.
     * @param myHostId
     * @param catalogContext
     * @throws ExportManager.SetupException
     */
    public static synchronized void initialize(
            int myHostId,
            CatalogContext catalogContext,
            boolean isRejoin,
            HostMessenger messenger,
            List<Integer> partitions)
            throws ExportManager.SetupException
    {
        ExportManager em = new ExportManager(myHostId, catalogContext, messenger, partitions);
        Connector connector = getConnector(catalogContext);

        m_self = em;
        if (connector != null && connector.getEnabled() == true) {
            em.createInitialExportProcessor(catalogContext, connector, true, partitions, isRejoin);
        }
    }

    /**
     * Indicate to associated {@link ExportGeneration}s to become
     * masters for the given partition id
     * @param partitionId
     */
    synchronized public void acceptMastership(int partitionId) {
        if (m_loaderClass == null) {
            return;
        }
        Preconditions.checkArgument(
                m_masterOfPartitions.add(partitionId),
                "can't acquire mastership twice for partition id: " + partitionId
                );
        exportLog.info("ExportManager accepting mastership for partition " + partitionId);
        /*
         * Only the first generation will have a processor which
         * makes it safe to accept mastership.
         */
        ExportGeneration gen = m_generations.firstEntry().getValue();
        if (gen != null && gen.isContinueingGeneration()) {
            gen.acceptMastershipTask(partitionId);
        } else {
            exportLog.info("Failed to run accept mastership tasks for partition: " + partitionId);
        }
    }

    /**
     * Get the global instance of the ExportManager.
     * @return The global single instance of the ExportManager.
     */
    public static ExportManager instance() {
        return m_self;
    }

    public static void setInstanceForTest(ExportManager self) {
        m_self = self;
    }

    protected ExportManager() {
        m_hostId = 0;
        m_messenger = null;
    }

    private static Connector getConnector(CatalogContext catalogContext) {
        final Cluster cluster = catalogContext.catalog.getClusters().get("cluster");
        final Database db = cluster.getDatabases().get("database");
        final Connector conn= db.getConnectors().get("0");
        return conn;
    }

    /**
     * Read the catalog to setup manager and loader(s)
     * @param siteTracker
     */
    private ExportManager(
            int myHostId,
            CatalogContext catalogContext,
            HostMessenger messenger,
            List<Integer> partitions)
    throws ExportManager.SetupException
    {
        m_hostId = myHostId;
        m_messenger = messenger;
        final Cluster cluster = catalogContext.catalog.getClusters().get("cluster");
        final Connector conn = getConnector(catalogContext);

        if (conn == null) {
            exportLog.info("System is not using any export functionality.");
            return;
        }

        if (conn.getEnabled() == false) {
            exportLog.info("Export is disabled by user configuration.");
            return;
        }

        updateProcessorConfig(conn);

        exportLog.info(String.format("Export is enabled and can overflow to %s.", cluster.getExportoverflow()));

        m_loaderClass = conn.getLoaderclass();
    }

    private synchronized void createInitialExportProcessor(
            CatalogContext catalogContext,
            final Connector conn,
            boolean startup,
            List<Integer> partitions,
            boolean isRejoin) {
        try {
            exportLog.info("Creating connector " + m_loaderClass);
            ExportDataProcessor newProcessor = null;
            final Class<?> loaderClass = Class.forName(m_loaderClass);
            newProcessor = (ExportDataProcessor)loaderClass.newInstance();
            newProcessor.addLogger(exportLog);
            newProcessor.setProcessorConfig(m_processorConfig);
            m_processor.set(newProcessor);

            File exportOverflowDirectory = new File(catalogContext.cluster.getExportoverflow());

            /*
             * If this is a catalog update providing an existing generation,
             * the persisted stuff has already been initialized
             */
            if (startup) {
                initializePersistedGenerations(
                        exportOverflowDirectory,
                        catalogContext, conn);
            }

            /*
             * If this is startup there is no existing generation created for new export data
             * So construct one here, otherwise use the one provided
             */
            if (startup) {
                if (!m_generations.containsKey(catalogContext.m_uniqueId)) {
                    final ExportGeneration currentGeneration = new ExportGeneration(
                            catalogContext.m_uniqueId,
                            exportOverflowDirectory, isRejoin);
                    currentGeneration.setGenerationDrainRunnable(new GenerationDrainRunnable(currentGeneration));
                    currentGeneration.initializeGenerationFromCatalog(conn, m_hostId, m_messenger, partitions);
                    m_generations.put(catalogContext.m_uniqueId, currentGeneration);
                } else {
                    exportLog.info("Persisted export generation same as catalog exists. Persisted generation will be used and appended to");
                    ExportGeneration currentGeneration = m_generations.get(catalogContext.m_uniqueId);
                    currentGeneration.initializeMissingPartitionsFromCatalog(conn, m_hostId, m_messenger, partitions);
                }
            }
            final ExportGeneration nextGeneration = m_generations.firstEntry().getValue();
            /*
             * For the newly constructed processor, provide it the oldest known generation
             */
            newProcessor.setExportGeneration(nextGeneration);
            newProcessor.readyForData();

            if (startup) {
                /*
                 * If the oldest known generation was disk based,
                 * and we are using server side export we need to kick off a leader election
                 * to choose which server is going to export each partition
                 */
                if (!nextGeneration.isContinueingGeneration()) {
                    nextGeneration.kickOffLeaderElection();
                }
            } else {
                /*
                 * When it isn't startup, it is necessary to kick things off with the mastership
                 * settings that already exist
                 */
                if (!nextGeneration.isContinueingGeneration()) {
                    /*
                     * Changes in partition count can make the load balancing strategy not capture
                     * all partitions for data that was from a previously larger cluster.
                     * For those use a naive leader election strategy that is implemented
                     * by export generation.
                     */
                    nextGeneration.kickOffLeaderElection();
                } else {
                    /*
                     * This strategy is the one that piggy backs on
                     * regular partition mastership distribution to determine
                     * who will process export data for different partitions.
                     * We stashed away all the ones we have mastership of
                     * in m_masterOfPartitions
                     */
                    for( Integer partitionId: m_masterOfPartitions) {
                        nextGeneration.acceptMastershipTask(partitionId);
                    }
                }
            }
        }
        catch (final ClassNotFoundException e) {
            exportLog.l7dlog( Level.ERROR, LogKeys.export_ExportManager_NoLoaderExtensions.name(), e);
            Throwables.propagate(e);
        }
        catch (final Exception e) {
            Throwables.propagate(e);
        }
    }

    private void initializePersistedGenerations(
            File exportOverflowDirectory, CatalogContext catalogContext,
            Connector conn) throws IOException {
        TreeSet<File> generationDirectories = new TreeSet<File>();
        for (File f : exportOverflowDirectory.listFiles()) {
            if (f.isDirectory()) {
                if (!f.canRead() || !f.canWrite() || !f.canExecute()) {
                    throw new RuntimeException("Can't one of read/write/execute directory " + f);
                }
                generationDirectories.add(f);
            }
        }

        //Only give the processor to the oldest generation
        for (File generationDirectory : generationDirectories) {
            ExportGeneration generation = new ExportGeneration(generationDirectory, catalogContext.m_uniqueId);
            generation.setGenerationDrainRunnable(new GenerationDrainRunnable(generation));

            if (generation.initializeGenerationFromDisk(conn, m_messenger)) {
                m_generations.put( generation.m_timestamp, generation);
            } else {
                String list[] = generationDirectory.list();
                if (list != null && list.length == 0) {
                    try {
                        VoltFile.recursivelyDelete(generationDirectory);
                    } catch (IOException ioe) {
                    }
                } else {
                    exportLog.error("Invalid export generation in overflow directory " + generationDirectory
                            + " this will need to be manually cleaned up. number of files left: "
                            + (list != null ? list.length : 0));
                }
            }
        }
    }

    private void updateProcessorConfig(final Connector conn) {
        Properties newConfig = new Properties();

        if (conn.getConfig() != null) {
            Iterator<ConnectorProperty> connPropIt = conn.getConfig().iterator();
            while (connPropIt.hasNext()) {
                ConnectorProperty prop = connPropIt.next();
                newConfig.put(prop.getName(), prop.getValue().trim());
            }
        }
        m_processorConfig = newConfig;
    }

    public synchronized void updateCatalog(CatalogContext catalogContext, List<Integer> partitions)
    {
        final Cluster cluster = catalogContext.catalog.getClusters().get("cluster");
        final Database db = cluster.getDatabases().get("database");
        final Connector conn= db.getConnectors().get("0");
        if (conn == null || !conn.getEnabled()) {
            m_loaderClass = null;
            return;
        }

        m_loaderClass = conn.getLoaderclass();
        updateProcessorConfig(conn);

        File exportOverflowDirectory = new File(catalogContext.cluster.getExportoverflow());

        ExportGeneration newGeneration = null;
        try {
            newGeneration = new ExportGeneration(
                    catalogContext.m_uniqueId, exportOverflowDirectory, false);
            newGeneration.setGenerationDrainRunnable(new GenerationDrainRunnable(newGeneration));
            newGeneration.initializeGenerationFromCatalog(conn, m_hostId, m_messenger, partitions);
            m_generations.put(catalogContext.m_uniqueId, newGeneration);
        } catch (IOException e1) {
            VoltDB.crashLocalVoltDB("Error processing catalog update in export system", true, e1);
        }

        /*
         * If there is no existing export processor, create an initial one.
         * This occurs when export is turned on/off at runtime.
         */
        if (m_processor.get() == null) {
            createInitialExportProcessor(catalogContext, conn, false, partitions, false);
        }
    }

    public void shutdown() {
        for (ExportGeneration generation : m_generations.values()) {
            generation.close();
        }
        ExportDataProcessor proc = m_processor.getAndSet(null);
        if (proc != null) {
            proc.shutdown();
        }
        m_generations.clear();
        m_loaderClass = null;
    }

    public static long getQueuedExportBytes(int partitionId, String signature) {
        ExportManager instance = instance();
        try {
            Map<Long, ExportGeneration> generations = instance.m_generations;
            if (generations.isEmpty()) {
                assert(false);
                return -1;
            }

            long exportBytes = 0;
            for (ExportGeneration generation : generations.values()) {
                exportBytes += generation.getQueuedExportBytes( partitionId, signature);
            }

            return exportBytes;
        } catch (Exception e) {
            //Don't let anything take down the execution site thread
            exportLog.error(e);
        }
        return 0;
    }

    /*
     * This method pulls double duty as a means of pushing export buffers
     * and "syncing" export data to disk. Syncing doesn't imply fsync, it just means
     * writing the data to a file instead of keeping it all in memory.
     * End of stream indicates that no more data is coming from this source
     * for this generation.
     */
    public static void pushExportBuffer(
            long exportGeneration,
            int partitionId,
            String signature,
            long uso,
            long bufferPtr,
            ByteBuffer buffer,
            boolean sync,
            boolean endOfStream) {
        //For validating that the memory is released
        if (bufferPtr != 0) DBBPool.registerUnsafeMemory(bufferPtr);
        ExportManager instance = instance();
        try {
            ExportGeneration generation = instance.m_generations.get(exportGeneration);
            if (generation == null) {
                if (buffer != null) {
                    DBBPool.wrapBB(buffer).discard();
                }

                /*
                 * If the generation was already drained it is fine for a buffer to come late and miss it
                 */
                synchronized(instance) {
                    if (!instance.m_generationGhosts.contains(exportGeneration)) {
                        assert(false);
                        exportLog.error("Could not a find an export generation " + exportGeneration +
                        ". Should be impossible. Discarding export data");
                    }
                }
                return;
            }

            generation.pushExportBuffer(partitionId, signature, uso, buffer, sync, endOfStream);
        } catch (Exception e) {
            //Don't let anything take down the execution site thread
            exportLog.error("Error pushing export buffer", e);
        }
    }

    public void truncateExportToTxnId(long snapshotTxnId, long[] perPartitionTxnIds) {
        exportLog.info("Truncating export data after txnId " + snapshotTxnId);
        for (ExportGeneration generation : m_generations.values()) {
            //If the generation was completely drained, wait for the task to finish running
            //by waiting for the permit that will be generated
            if (generation.truncateExportToTxnId(snapshotTxnId, perPartitionTxnIds)) {
                try {
                    m_onGenerationDrainedForTruncation.acquire();
                } catch (InterruptedException e) {
                    VoltDB.crashLocalVoltDB("Interrupted truncating export data", true, e);
                }
            }
        }
    }
}
TOP

Related Classes of org.voltdb.export.ExportManager$GenerationDrainRunnable

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.