Package org.broadinstitute.gatk.engine.traversals

Source Code of org.broadinstitute.gatk.engine.traversals.TraverseActiveRegions

/*
* Copyright (c) 2012 The Broad Institute
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following
* conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR
* THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/

package org.broadinstitute.gatk.engine.traversals;

import com.google.java.contract.Ensures;
import com.google.java.contract.Requires;
import org.apache.log4j.Logger;
import org.broadinstitute.gatk.engine.GenomeAnalysisEngine;
import org.broadinstitute.gatk.engine.contexts.AlignmentContext;
import org.broadinstitute.gatk.engine.contexts.ReferenceContext;
import org.broadinstitute.gatk.engine.datasources.providers.*;
import org.broadinstitute.gatk.engine.refdata.RefMetaDataTracker;
import org.broadinstitute.gatk.engine.walkers.ActiveRegionTraversalParameters;
import org.broadinstitute.gatk.engine.walkers.ActiveRegionWalker;
import org.broadinstitute.gatk.engine.walkers.Walker;
import org.broadinstitute.gatk.utils.GenomeLoc;
import org.broadinstitute.gatk.utils.SampleUtils;
import org.broadinstitute.gatk.utils.Utils;
import org.broadinstitute.gatk.utils.activeregion.ActiveRegion;
import org.broadinstitute.gatk.utils.activeregion.ActivityProfile;
import org.broadinstitute.gatk.utils.activeregion.ActivityProfileState;
import org.broadinstitute.gatk.utils.activeregion.BandPassActivityProfile;
import org.broadinstitute.gatk.utils.nanoScheduler.NSMapFunction;
import org.broadinstitute.gatk.utils.nanoScheduler.NSProgressFunction;
import org.broadinstitute.gatk.utils.nanoScheduler.NSReduceFunction;
import org.broadinstitute.gatk.utils.nanoScheduler.NanoScheduler;
import org.broadinstitute.gatk.utils.progressmeter.ProgressMeter;
import org.broadinstitute.gatk.utils.sam.GATKSAMRecord;

import java.io.PrintStream;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;

/**
* Implement active region traversal
*
* User: depristo
* Date: 1/9/13
* Time: 4:45 PM
*
* Live region:
*
*   The ART tracks a thing called the live region.  The live region is a position on a specific contig
*   of the alignment start of the last read we processed during this traversal.  Because the
*   read stream is sorted, future reads must occurs in the the live region.  Therefore the the dead region
*   (everything to the left of the live boundary) cannot have any more read data.  The live / dead
*   regions are used to decide when we can safely call map on active regions, as only active regions
*   contained completely within the dead region (including extensions) have a complete set of read data
*   in the collected read list.  All of the data related to the live region is captured by the local
*   variable spanOfLastReadSeen
*
*/
public final class TraverseActiveRegions<M, T> extends TraversalEngine<M,T,ActiveRegionWalker<M,T>,LocusShardDataProvider> {
    private final static boolean DEBUG = false;
    protected final static Logger logger = Logger.getLogger(TraversalEngine.class);
    protected final static boolean LOG_READ_CARRYING = false;

    // set by the traversal
    private boolean walkerHasPresetRegions = false;
    private int activeRegionExtension = -1;
    private int maxRegionSize = -1;
    private int minRegionSize = -1;

    private final LinkedList<ActiveRegion> workQueue = new LinkedList<>();

    private TAROrderedReadCache myReads = null;

    private GenomeLoc lastRegionProcessed = null;
    private GenomeLoc spanOfLastReadSeen = null;
    private ActivityProfile activityProfile = null;
    int maxReadsInMemory = 0;
    ActiveRegionWalker<M, T> walker;

    final NanoScheduler<MapData, M, T> nanoScheduler;

    /**
     * Data to use in the ActiveRegionWalker.map function produced by the NanoScheduler input iterator
     */
    private static class MapData {
        public ActiveRegion activeRegion;
        public RefMetaDataTracker tracker;

        private MapData(ActiveRegion activeRegion, RefMetaDataTracker tracker) {
            this.activeRegion = activeRegion;
            this.tracker = tracker;
        }
    }

    /**
     * Create a single threaded active region traverser
     */
    public TraverseActiveRegions() {
        this(1);
    }

    /**
     * Create an active region traverser that uses nThreads for getting its work done
     * @param nThreads number of threads
     */
    public TraverseActiveRegions(final int nThreads) {
        nanoScheduler = new NanoScheduler<>(nThreads);
        nanoScheduler.setProgressFunction(new NSProgressFunction<MapData>() {
            @Override
            public void progress(MapData lastActiveRegion) {
                if ( lastActiveRegion != null )
                    // note, need to use getStopLocation so we don't give an interval to ProgressMeterDaemon
                    printProgress(lastActiveRegion.activeRegion.getLocation().getStopLocation());
            }
        });
    }

    /**
     * Have the debugging output streams been initialized already?
     *
     * We have to do lazy initialization because when the initialize() function is called
     * the streams aren't yet initialized in the GATK walker.
     */
    private boolean streamsInitialized = false;

    @Override
    public void initialize(GenomeAnalysisEngine engine, Walker walker, ProgressMeter progressMeter) {
        super.initialize(engine, walker, progressMeter);

        this.walker = (ActiveRegionWalker<M,T>)walker;
        if ( this.walker.wantsExtendedReads() && ! this.walker.wantsNonPrimaryReads() ) {
            throw new IllegalArgumentException("Active region walker " + this.walker + " requested extended events but not " +
                    "non-primary reads, an inconsistent state.  Please modify the walker");
        }

        ActiveRegionTraversalParameters annotation = walker.getClass().getAnnotation(ActiveRegionTraversalParameters.class);
        this.activeRegionExtension = this.walker.activeRegionExtension == null ? annotation.extension() : this.walker.activeRegionExtension;
        this.maxRegionSize = this.walker.activeRegionMaxSize == null ? annotation.maxRegion() : this.walker.activeRegionMaxSize;
        this.minRegionSize = annotation.minRegion();
        final double bandPassSigma = this.walker.bandPassSigma == null ? annotation.bandPassSigma() : this.walker.bandPassSigma;
        walkerHasPresetRegions = this.walker.hasPresetActiveRegions();

        activityProfile = new BandPassActivityProfile(engine.getGenomeLocParser(), engine.getIntervals(), this.walker.maxProbPropagationDistance, this.walker.activeProbThreshold,
                BandPassActivityProfile.MAX_FILTER_SIZE, bandPassSigma);

        final int maxReadsAcrossSamples = annotation.maxReadsToHoldInMemoryPerSample() * SampleUtils.getSAMFileSamples(engine).size();
        final int maxReadsToHoldInMemory = Math.min(maxReadsAcrossSamples, annotation.maxReadsToHoldTotal());
        myReads = new TAROrderedReadCache(maxReadsToHoldInMemory);
    }

    // -------------------------------------------------------------------------------------
    //
    // Utility functions
    //
    // -------------------------------------------------------------------------------------

    /**
     * Load in the preset regions for contig into workQueue
     *
     * Should be called before starting to process work on contig
     *
     * Can only be called when walkerHasPresetRegions is true or an IllegalStateException will be thrown
     *
     * @param contig the contig we are about to process
     */
    protected void loadPresetRegionsForContigToWorkQueue(final String contig) {
        if ( ! walkerHasPresetRegions ) throw new IllegalStateException("only appropriate to call when walker has preset regions");

        final GenomeLoc contigSpan = engine.getGenomeLocParser().createOverEntireContig(contig);
        for ( final GenomeLoc loc : this.walker.getPresetActiveRegions().getOverlapping(contigSpan) ) {
            workQueue.add(new ActiveRegion(loc, null, true, engine.getGenomeLocParser(), getActiveRegionExtension()));
        }
    }

    protected int getActiveRegionExtension() {
        return activeRegionExtension;
    }

    protected int getMaxRegionSize() {
        return maxRegionSize;
    }

    protected int getMinRegionSize() {
        return minRegionSize;
    }

    @Override
    public String getTraversalUnits() {
        return "active regions";
    }

    @Override
    public String toString() {
        return "TraverseActiveRegions";
    }

    /**
     * Is the loc outside of the intervals being requested for processing by the GATK?
     * @param loc
     * @return
     */
    protected boolean outsideEngineIntervals(final GenomeLoc loc) {
        return engine.getIntervals() != null && ! engine.getIntervals().overlaps(loc);
    }

    // -------------------------------------------------------------------------------------
    //
    // Actual traverse function
    //
    // -------------------------------------------------------------------------------------

    /**
     * Did read appear in the last shard?
     *
     * When we transition across shard boundaries we see duplicate reads because
     * each shard contains the reads that *overlap* the shard.  So if we just finished
     * shard 1-1000 and are now in 1001-2000 we'll see duplicate reads from 1001
     * that overlapped 1-1000.  This function tests read to determine if we would have
     * seen it before by asking if read.getAlignmentStart() is less than the
     * stop position of the last seen read at the start of the traversal.  The reason
     * we need to use the location of the last read at the start of the traversal
     * is that we update the lastRead during the traversal, and we only want to filter
     * out reads whose start is before the last read of the previous shard, not the
     * current shard.
     *
     * @param locOfLastReadAtTraversalStart the location of the last read seen at the start of the traversal
     * @param read the read we want to test if it's already been seen in the last shard
     * @return true if read would have appeared in the last shard, false otherwise
     */
    @Requires({"read != null"})
    private boolean appearedInLastShard(final GenomeLoc locOfLastReadAtTraversalStart, final GATKSAMRecord read) {
        if ( locOfLastReadAtTraversalStart == null )
            // we're in the first shard, so obviously the answer is no
            return false;
        else {
            // otherwise check to see if the alignment occurred in the previous shard
            return read.getAlignmentStart() <= locOfLastReadAtTraversalStart.getStart()
                    // we're on the same contig
                    && read.getReferenceIndex() == locOfLastReadAtTraversalStart.getContigIndex();
        }

    }

    @Override
    public T traverse( final ActiveRegionWalker<M,T> walker,
                       final LocusShardDataProvider dataProvider,
                       T sum) {
        if ( LOG_READ_CARRYING || logger.isDebugEnabled() )
            logger.info(String.format("TraverseActiveRegions.traverse: Shard is %s", dataProvider));

        nanoScheduler.setDebug(false);
        final Iterator<MapData> activeRegionIterator = new ActiveRegionIterator(dataProvider);
        final TraverseActiveRegionMap myMap = new TraverseActiveRegionMap();
        final TraverseActiveRegionReduce myReduce = new TraverseActiveRegionReduce();
        final T result = nanoScheduler.execute(activeRegionIterator, myMap, sum, myReduce);

        return result;
    }

    private class ActiveRegionIterator implements Iterator<MapData> {
        private final LocusShardDataProvider dataProvider;
        private LinkedList<MapData> readyActiveRegions = new LinkedList<>();
        private boolean done = false;
        private final LocusView locusView;
        private final LocusReferenceView referenceView;
        private final GenomeLoc locOfLastReadAtTraversalStart;
        private final IntervalReferenceOrderedView referenceOrderedDataView;
        private final GenomeLoc currentWindow;
        private final boolean processRemainingActiveRegions;

        public ActiveRegionIterator( final LocusShardDataProvider dataProvider ) {
            this.dataProvider = dataProvider;
            locusView = new AllLocusView(dataProvider);
            referenceView = new LocusReferenceView( walker, dataProvider );

            // The data shard may carry a number of locations to process (due to being indexed together).
            // This value is just the interval we are processing within the entire provider
            currentWindow = dataProvider.getLocus();
            final int currentWindowPos = dataProvider.getShard().getGenomeLocs().indexOf(currentWindow);
            if ( currentWindowPos == -1 ) throw new IllegalStateException("Data provider " + dataProvider + " didn't have our current window in it " + currentWindow);
            processRemainingActiveRegions = currentWindowPos == dataProvider.getShard().getGenomeLocs().size() - 1;

            // the rodSpan covers all of the bases in the activity profile, including all of the bases
            // through the current window interval.  This is because we may issue a query to get data for an
            // active region spanning before the current interval as far back as the start of the current profile,
            // if we have pending work to do that finalizes in this interval.
            final GenomeLoc rodSpan = activityProfile.getSpan() == null ? currentWindow : activityProfile.getSpan().endpointSpan(currentWindow);
            if ( ! dataProvider.getShard().getLocation().containsP(rodSpan) ) throw new IllegalStateException("Rod span " + rodSpan + " isn't contained within the data shard " + dataProvider.getShard().getLocation() + ", meaning we wouldn't get all of the data we need");
            referenceOrderedDataView = new IntervalReferenceOrderedView( dataProvider, rodSpan );

            // We keep processing while the next reference location is within the interval
            locOfLastReadAtTraversalStart = spanOfLastSeenRead();

            // load in the workQueue the present regions that span the current contig, if it's different from the last one
            if ( walkerHasPresetRegions && ( lastRegionProcessed == null || ! currentWindow.onSameContig(lastRegionProcessed)) ) {
                loadPresetRegionsForContigToWorkQueue(currentWindow.getContig());
            }

            // remember the last region we processed for sanity checking later
            lastRegionProcessed = currentWindow;
        }

        @Override public void remove() { throw new UnsupportedOperationException("Cannot remove from ActiveRegionIterator"); }

        @Override
        public MapData next() {
            return readyActiveRegions.pop();
        }
        @Override
        public boolean hasNext() {
            if ( engine.exceedsRuntimeLimit() ) // too much time has been dedicated to doing work, just stop
                 return false;
            if ( ! readyActiveRegions.isEmpty() )
                return true;
            if ( done )
                return false;
            else {

                while( locusView.hasNext() ) {
                    final AlignmentContext locus = locusView.next();
                    final GenomeLoc location = locus.getLocation();

                    rememberLastLocusLocation(location);

                    // get all of the new reads that appear in the current pileup, and them to our list of reads
                    // provided we haven't seen them before
                    final Collection<GATKSAMRecord> reads = locusView.getLIBS().transferReadsFromAllPreviousPileups();
                    for( final GATKSAMRecord read : reads ) {
                        // note that ActiveRegionShards span entire contigs, so this check is in some
                        // sense no longer necessary, as any read that appeared in the last shard would now
                        // by definition be on a different contig.  However, the logic here doesn't hurt anything
                        // and makes us robust should we decided to provide shards that don't fully span
                        // contigs at some point in the future
                        if ( ! appearedInLastShard(locOfLastReadAtTraversalStart, read) ) {
                            rememberLastReadLocation(read);
                            myReads.add(read);
                        }
                    }

                    // skip this location -- it's not part of our engine intervals
                    if ( outsideEngineIntervals(location) )
                        continue;

                    // we've move across some interval boundary, restart profile
                    final boolean flushProfile = ! activityProfile.isEmpty()
                            && ( activityProfile.getContigIndex() != location.getContigIndex()
                            || location.getStart() != activityProfile.getStop() + 1);
                    final List<MapData> newActiveRegions = prepActiveRegionsForProcessing(walker, flushProfile, false, referenceOrderedDataView);

                    dataProvider.getShard().getReadMetrics().incrementNumIterations();

                    // create reference context. Note that if we have a pileup of "extended events", the context will
                    // hold the (longest) stretch of deleted reference bases (if deletions are present in the pileup).
                    final ReferenceContext refContext = referenceView.getReferenceContext(location);

                    // Iterate forward to get all reference ordered data covering this location
                    final RefMetaDataTracker tracker = referenceOrderedDataView.getReferenceOrderedDataAtLocus(locus.getLocation());

                    // Call the walkers isActive function for this locus and add them to the list to be integrated later
                    addIsActiveResult(walker, tracker, refContext, locus);

                    maxReadsInMemory = Math.max(myReads.size(), maxReadsInMemory);
                    printProgress(location);

                    if ( ! newActiveRegions.isEmpty() ) {
                        readyActiveRegions.addAll(newActiveRegions);
                        if ( DEBUG )
                            for ( final MapData region : newActiveRegions )
                                logger.info("Adding region to queue for processing " + region.activeRegion);
                        return true;
                    }
                }

                if ( processRemainingActiveRegions ) {
                    // we've run out of stuff to process, and since shards now span entire contig boundaries
                    // we should finalized our regions.  This allows us to continue to use our referenceOrderedDataView
                    // which would otherwise be shutdown.  Only followed when the microschedule says that we're
                    // inside of the last window in the current shard
                    readyActiveRegions.addAll(prepActiveRegionsForProcessing(walker, true, true, referenceOrderedDataView));
                }

                return ! readyActiveRegions.isEmpty();
            }
        }
    }

    // -------------------------------------------------------------------------------------
    //
    // Functions to manage and interact with the live / dead zone
    //
    // -------------------------------------------------------------------------------------

    /**
     * Update the live region to reflect that the last read we've seen in the traversal is read
     *
     * Requires that sequential calls always be provided reads in coordinate sorted order
     *
     * @param read the last read we've seen during the traversal
     */
    @Requires({"read != null"})
    protected void rememberLastReadLocation(final GATKSAMRecord read) {
        final GenomeLoc currentLocation = engine.getGenomeLocParser().createGenomeLoc(read);
        if ( spanOfLastReadSeen == null )
            spanOfLastReadSeen = currentLocation;
        else {
            if ( currentLocation.isBefore(spanOfLastReadSeen) )
                throw new IllegalStateException("Updating last read seen in the traversal with read " + read + " with span " + currentLocation + " but this occurs before the previously seen read " + spanOfLastReadSeen);
            spanOfLastReadSeen = currentLocation;
        }
    }

    /**
     * Update the live region to reflect that we've reached locus
     *
     * This function is complementary to #rememberLastReadLocation, but if we don't have any reads for a long
     * time (e.g., there's no coverage) we will keep active regions around far longer than necessary.
     *
     * Only updates the span if it's beyond the last seen
     *
     * @param currentLocation the current location we've processed on the genome
     */
    protected void rememberLastLocusLocation(final GenomeLoc currentLocation) {
        if ( spanOfLastReadSeen == null )
            spanOfLastReadSeen = currentLocation;
        else {
            if ( currentLocation.isPast(spanOfLastReadSeen) )
                spanOfLastReadSeen = currentLocation;
        }
    }


    /**
     * Get a GenomeLoc indicating the start (heading to the right) of the live ART region.
     * @return the left-most position of the live region on the genome
     */
    protected GenomeLoc spanOfLastSeenRead() {
        return spanOfLastReadSeen;
    }

    /**
     * Is the active region completely within the traversal's dead zone?
     *
     * @param region the region we want to test
     * @return true if the extended location of region is completely within the current dead zone, false otherwise
     */
    protected boolean regionCompletelyWithinDeadZone(final ActiveRegion region) {
        if ( spanOfLastSeenRead() == null )
            return false;

        final int contigCmp = region.getExtendedLoc().compareContigs(spanOfLastSeenRead());
        if ( contigCmp > 0 )
            throw new IllegalStateException("Active region " + region + " on a contig after last seen read " + spanOfLastSeenRead());
        else {
            return contigCmp < 0 || region.getExtendedLoc().getStop() < spanOfLastSeenRead().getStart();
        }
    }

    /**
     * Is the read dead?  That is, can it no longer be in any future active region, and therefore can be discarded?
     *
     * read: start |--------> stop ------ stop + extension
     * region:                      start |-----------------| end
     *
     * Since the regions are coming in order, read could potentially be contained in a future interval if
     * stop + activeRegionExtension >= end.  If, on the other hand, stop + extension is < the end
     * of this region, then we can discard it, since any future region could only include reads
     * up to end + 1 - extension.
     *
     * Note that this function doesn't care about the dead zone.  We're assuming that by
     * actually calling this function with an active region that region is already in the dead zone,
     * so checking that the read is in the dead zone doesn't make sense.
     *
     * @param read the read we're testing
     * @param activeRegion the current active region
     * @return true if the read is dead, false other
     */
    @Requires({"read != null", "activeRegion != null"})
    private boolean readCannotOccurInAnyMoreActiveRegions(final GATKSAMRecord read, final ActiveRegion activeRegion) {
        return read.getReferenceIndex() < activeRegion.getLocation().getContigIndex() ||
                ( read.getReferenceIndex() == activeRegion.getLocation().getContigIndex()
                        && read.getAlignmentEnd() + getActiveRegionExtension() < activeRegion.getLocation().getStop() );
    }

    // -------------------------------------------------------------------------------------
    //
    // Functions to write out activity profiles and active regions
    //
    // -------------------------------------------------------------------------------------

    /**
     * Initialize the debugging output streams (activity profile and active regions), if not done so already
     */
    @Ensures("streamsInitialized == true")
    private void initializeOutputStreamsIfNecessary() {
        if ( ! streamsInitialized ) {
            streamsInitialized = true;
            if ( walker.activityProfileOutStream != null ) {
                printIGVFormatHeader(walker.activityProfileOutStream, "line", "ActivityProfile");
            }

            if ( walker.activeRegionOutStream != null ) {
                printIGVFormatHeader(walker.activeRegionOutStream, "line", "ActiveRegions");
            }
        }
    }

    /**
     * Helper function to write out a IGV formatted line to out, at loc, with values
     *
     * http://www.broadinstitute.org/software/igv/IGV
     *
     * @param out a non-null PrintStream where we'll write our line
     * @param graphType the type of graph to show in IGV for this track
     * @param columns the column names for this IGV track
     */
    @Requires({
            "out != null",
            "graphType != null",
            "columns.length > 0"
    })
    private void printIGVFormatHeader(final PrintStream out, final String graphType, final String ... columns ) {
        out.printf("#track graphType=%s%n", graphType);
        out.printf("Chromosome\tStart\tEnd\tFeature\t%s%n", Utils.join("\t", columns));

    }

    /**
     * Helper function to write out a IGV formatted line to out, at loc, with values
     *
     * http://www.broadinstitute.org/software/igv/IGV
     *
     * @param out a non-null PrintStream where we'll write our line
     * @param loc the location of values
     * @param featureName string name of this feature (see IGV format)
     * @param values the floating point values to associate with loc and feature name in out
     */
    @Requires({
            "out != null",
            "loc != null",
            "values.length > 0"
    })
    private void printIGVFormatRow(final PrintStream out, final GenomeLoc loc, final String featureName, final double ... values) {
        // note that start and stop are 0 based, but the stop is exclusive so we don't subtract 1
        out.printf("%s\t%d\t%d\t%s", loc.getContig(), loc.getStart() - 1, loc.getStop(), featureName);
        for ( final double value : values )
            out.print(String.format("\t%.5f", value));
        out.println();
    }

    /**
     * Write out activity profile information, if requested by the walker
     *
     * @param states the states in the current activity profile
     */
    @Requires("states != null")
    private void writeActivityProfile(final List<ActivityProfileState> states) {
        if ( walker.activityProfileOutStream != null ) {
            initializeOutputStreamsIfNecessary();
            for ( final ActivityProfileState state : states ) {
                printIGVFormatRow(walker.activityProfileOutStream, state.getLoc(), "state", Math.min(state.isActiveProb, 1.0));
            }
        }
    }

    /**
     * Write out each active region to the walker activeRegionOutStream
     *
     * @param region the region we're currently operating on
     */
    @Requires("region != null")
    private void writeActiveRegion(final ActiveRegion region) {
        if( walker.activeRegionOutStream != null ) {
            initializeOutputStreamsIfNecessary();
            printIGVFormatRow(walker.activeRegionOutStream, region.getLocation().getStartLocation(),
                    "end-marker", 0.0);
            printIGVFormatRow(walker.activeRegionOutStream, region.getLocation(),
                    "size=" + region.getLocation().size(), region.isActive() ? 1.0 : -1.0);
        }
    }


    // -------------------------------------------------------------------------------------
    //
    // Functions to process active regions that are ready for map / reduce calls
    //
    // -------------------------------------------------------------------------------------

    /**
     * Invoke the walker isActive function, and incorporate its result into the activity profile
     *
     * @param walker the walker we're running
     * @param tracker the ref meta data tracker to pass on to the isActive function of walker
     * @param refContext the refContext to pass on to the isActive function of walker
     * @param locus the AlignmentContext to pass on to the isActive function of walker
     */
    private void addIsActiveResult(final ActiveRegionWalker<M, T> walker,
                                   final RefMetaDataTracker tracker, final ReferenceContext refContext,
                                   final AlignmentContext locus) {
        // must be called, even if we won't use the result, to satisfy walker contract
        final ActivityProfileState state = walker.isActive( tracker, refContext, locus );
        if ( walker.forceActive) state.isActiveProb = 1.0;
        if ( ! walkerHasPresetRegions ) {
            activityProfile.add(state);
        }
    }

    /**
     * Take the individual isActive calls and integrate them into contiguous active regions and
     * add these blocks of work to the work queue
     * band-pass filter the list of isActive probabilities and turn into active regions
     */
    private List<MapData> prepActiveRegionsForProcessing(final ActiveRegionWalker<M, T> walker,
                                                              final boolean flushActivityProfile,
                                                              final boolean forceAllRegionsToBeActive,
                                                              final IntervalReferenceOrderedView referenceOrderedDataView) {
        if ( ! walkerHasPresetRegions ) {
            // We don't have preset regions, so we get our regions from the activity profile
            final Collection<ActiveRegion> activeRegions = activityProfile.popReadyActiveRegions(getActiveRegionExtension(), getMinRegionSize(), getMaxRegionSize(), flushActivityProfile);
            workQueue.addAll(activeRegions);
            if ( ! activeRegions.isEmpty() && logger.isDebugEnabled() ) logger.debug("Integrated " + activityProfile.size() + " isActive calls into " + activeRegions.size() + " regions." );
        }

        // Since we've traversed sufficiently past this point (or this contig!) in the workQueue we can unload those regions and process them
        final LinkedList<MapData> readyRegions = new LinkedList<>();
        while( workQueue.peek() != null ) {
            final ActiveRegion activeRegion = workQueue.peek();
            if ( forceAllRegionsToBeActive || regionCompletelyWithinDeadZone(activeRegion) ) {
                writeActivityProfile(activeRegion.getSupportingStates());
                writeActiveRegion(activeRegion);
                readyRegions.add(prepActiveRegionForProcessing(workQueue.remove(), walker, referenceOrderedDataView));
            } else {
                break;
            }
        }

        return readyRegions;

    }

    private MapData prepActiveRegionForProcessing(final ActiveRegion activeRegion,
                                                  final ActiveRegionWalker<M, T> walker,
                                                  final IntervalReferenceOrderedView referenceOrderedDataView) {
        final List<GATKSAMRecord> stillLive = new LinkedList<>();
        for ( final GATKSAMRecord read : myReads.popCurrentReads() ) {
            boolean killed = false;
            final GenomeLoc readLoc = this.engine.getGenomeLocParser().createGenomeLoc( read );

            if( activeRegion.getLocation().overlapsP( readLoc ) ) {
                activeRegion.add(read);

                if ( ! walker.wantsNonPrimaryReads() ) {
                    killed = true;
                }
            } else if( walker.wantsExtendedReads() && activeRegion.getExtendedLoc().overlapsP( readLoc )) {
                activeRegion.add( read );
            }

            // if the read hasn't already been killed, check if it cannot occur in any more active regions, and maybe kill it
            if ( ! killed && readCannotOccurInAnyMoreActiveRegions(read, activeRegion) ) {
                killed = true;
            }

            // keep track of all of the still live active regions
            if ( ! killed ) stillLive.add(read);
        }
        myReads.addAll(stillLive);

        if ( logger.isDebugEnabled() ) {
            logger.debug(">> Map call with " + activeRegion.getReads().size() + " " + (activeRegion.isActive() ? "active" : "inactive") + " reads @ " + activeRegion.getLocation() + " with full extent: " + activeRegion.getReadSpanLoc());
        }

        if ( LOG_READ_CARRYING )
            logger.info(String.format("Processing region %20s span=%3d active?=%5b with %4d reads.  Overall max reads carried is %s",
                    activeRegion.getLocation(), activeRegion.getLocation().size(), activeRegion.isActive(), activeRegion.size(), maxReadsInMemory));

        // prepare the RefMetaDataTracker information
        final GenomeLoc loc = activeRegion.getLocation();
        // get all of the RODs that cover the active region (without extension)
        final RefMetaDataTracker tracker = referenceOrderedDataView.getReferenceOrderedDataForInterval(loc);
        // trim away all of the features that occurred before this location, as we will not need them in the future
        referenceOrderedDataView.trimCurrentFeaturesToLoc(loc);

        return new MapData(activeRegion, tracker);
    }

    private class TraverseActiveRegionMap implements NSMapFunction<MapData, M> {
        @Override
        public M apply(final MapData mapData) {
            if ( DEBUG ) logger.info("Executing walker.map for " + mapData.activeRegion + " in thread " + Thread.currentThread().getName());
            return walker.map(mapData.activeRegion, mapData.tracker);
        }
    }

    private class TraverseActiveRegionReduce implements NSReduceFunction<M, T> {
        @Override
        public T apply(M one, T sum) {
            return walker.reduce(one, sum);
        }
    }
}
TOP

Related Classes of org.broadinstitute.gatk.engine.traversals.TraverseActiveRegions

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.