Package com.foundationdb.qp.storeadapter.indexcursor

Source Code of com.foundationdb.qp.storeadapter.indexcursor.MergeJoinSorter$SortKey

/**
* Copyright (C) 2009-2013 FoundationDB, LLC
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/
package com.foundationdb.qp.storeadapter.indexcursor;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;

import com.fasterxml.sort.IterableSorterException;
import com.fasterxml.sort.IteratingSorter;
import com.foundationdb.qp.operator.API;
import com.foundationdb.qp.operator.API.Ordering;
import com.foundationdb.qp.operator.CursorLifecycle;
import com.foundationdb.qp.operator.QueryBindings;
import com.foundationdb.qp.operator.QueryContext;
import com.foundationdb.qp.operator.RowCursor;
import com.foundationdb.qp.operator.RowCursorImpl;
import com.foundationdb.qp.storeadapter.Sorter;
import com.foundationdb.qp.row.Row;
import com.foundationdb.qp.row.ValuesHolderRow;
import com.foundationdb.qp.rowtype.RowType;
import com.foundationdb.server.PersistitValueValueSource;
import com.foundationdb.server.PersistitValueValueTarget;
import com.foundationdb.server.collation.AkCollator;
import com.foundationdb.server.error.MergeSortIOException;
import com.foundationdb.server.error.StorageKeySizeExceededException;
import com.foundationdb.server.types.TInstance;
import com.foundationdb.server.types.common.types.TString;
import com.foundationdb.server.types.value.ValueSource;
import com.foundationdb.util.tap.InOutTap;
import com.persistit.Key;
import com.persistit.KeyState;
import com.persistit.Persistit;
import com.persistit.Value;
import com.persistit.exception.ConversionException;
import com.persistit.exception.KeyTooLongException;
import com.fasterxml.sort.DataReader;
import com.fasterxml.sort.DataReaderFactory;
import com.fasterxml.sort.DataWriter;
import com.fasterxml.sort.DataWriterFactory;
import com.fasterxml.sort.SortConfig;
import com.fasterxml.sort.TempFileProvider;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* <h1>Overview</h1>
*
* Sort rows by inserting them in sorted order into a memory buffer, then into on-disk files, and performing
* a multi-way merge sort on the resulting files.
*
* <h1>Behavior</h1>
*
* The rows of the input stream are written into a memory pool (40MB as a default). When (if) the memory pool is filled,
* the pool is written to disk, then emptied and rows are written again to the memory pool. When the final row is written
* to the disk files are merged in an n-way merge sort. The default is 16 way sort. When all of the temp files are merged
* into one file, the rows are read from the file in sorted order.
*
* If the initial input stream does not produce enough data to overflow the memory pool, no disk files will be produced.
*
* <h1>Performance</h1>
*
* The MergeJoinSorter generates IO dependent upon the size of the input stream.
* If the input stream generates less than the memory pool size (40MB default), there is no IO generated.
* If the input stream generates more than the memory pool size, but less than 16x the pool size, it should
* generate two read and two writes for each row. One write to, one read from into the initial temporary file,
* one write to, one read from the final sorted temporary file. For each 16x larger the input set gets it adds
* one more write to/read from temporary file cycle. 
*
* <h1>Memory Requirements</h1>
*
* The MergeJoinSorter allocates a single memory buffer for each instance to perform an initial sort,
* defaulting to 40MB in size.
*/

public class MergeJoinSorter implements Sorter {
    private static final Logger LOG = LoggerFactory.getLogger(MergeJoinSorter.class);

    private QueryContext context;
    private QueryBindings bindings;
    private RowCursor input;
    private RowType rowType;
    private Ordering ordering;
    private InOutTap loadTap;

    private final SorterAdapter<?, ?, ?> sorterAdapter;
    private final List<Integer> orderChanges;
    private IteratingSorter<SortKey> iteratingSorter;
    private Iterator<SortKey> sortIterator;
    private Key sortKey;
    private Comparator<SortKey> compare;
    private API.SortOption sortOption;
   
    public MergeJoinSorter (QueryContext context,
            QueryBindings bindings,
            RowCursor input,
            RowType rowType,
            API.Ordering ordering,
            API.SortOption sortOption,
            InOutTap loadTap)
    {
        this.context = context;
        this.bindings = bindings;
        this.input = input;
        this.rowType = rowType;
        this.ordering = ordering.copy();
        this.loadTap = loadTap;
        this.sortOption = sortOption;
       
        this.sortKey = context.getStore().createKey();
        this.sorterAdapter = new ValueSorterAdapter();
        // Note: init may change this.ordering
        sorterAdapter.init(rowType, this.ordering, this.sortKey, null, this.context, this.bindings, sortOption);
        // Explicitly use input ordering to avoid appended field
        this.orderChanges = new ArrayList<>();
        List<Comparator<KeyState>> comparators = new ArrayList<>();
        for(int i = 0; i < ordering.sortColumns(); ++i) {
            Comparator<KeyState> c = ordering.ascending(i) ? ASC_COMPARATOR : DESC_COMPARATOR;
            if(i == 0 || ordering.ascending(i-1) != ordering.ascending(i)) {
                orderChanges.add(i);
                comparators.add(c);
            }
        }
        this.orderChanges.add(ordering.sortColumns());
        this.compare = new KeySortCompare(comparators);
       
    }

    @Override
    public RowCursor sort() {
        try {
            loadTree();
        } catch (IOException e) {
            throw new MergeSortIOException(e);
        }
        return cursor();
    }

    @Override
    public void close() {
        if(iteratingSorter != null) {
            iteratingSorter.close();
            iteratingSorter = null;
            sortIterator = null;
        }
    }
   
    private void loadTree() throws IOException {
        iteratingSorter = new IteratingSorter<>(getSortConfig(new MergeTempFileProvider(context)),
                                                new KeyReaderFactory(),
                                                new KeyWriterFactory(),
                                                compare);
        sortIterator = iteratingSorter.sort(new KeyReadCursor(input));
    }
   
    private RowCursor cursor() {
        return new KeyFinalCursor(sortIterator, rowType, sortOption, compare);
    }

    public KeyReadCursor readCursor() {
        return new KeyReadCursor(input);
    }
   
    private SortConfig getSortConfig (MergeTempFileProvider tmpFileProvider) {
        long maxMemory = Long.parseLong(context.getServiceManager().getConfigurationService().getProperty("fdbsql.sort.memory"));
        return new SortConfig().withTempFileProvider(tmpFileProvider).withMaxMemoryUsage(maxMemory);
    }
    /*
     * Base class for reading/writing bytes -
     * KeyState[] is list of key segments broken by ASC/DESC ordering
     * rowKey is the whole, unaltered row of data.
     */
    public static class SortKey {
        public List<KeyState> sortKeys;
        public Value rowValue;
    
        public SortKey () {
            this.sortKeys = new ArrayList<>();
            this.rowValue = new Value((Persistit)null);
            rowValue.clear();
        }
       
        public SortKey (List<KeyState> sortKeys, Value value) {
            this.sortKeys = sortKeys;
            this.rowValue = value;
        }
       
        // Sorter uses size of elements to determine when the
        // presort buffer is full.
        public int getSize() {
            int size = 0;
            for (KeyState state : sortKeys) {
                size += state.getBytes().length + 4;
                size += 4;
            }
            size += rowValue.getEncodedSize() + 4;
            return size;
        }
    }

    private class KeyReaderFactory extends DataReaderFactory<SortKey> {

        @Override
        public DataReader<SortKey> constructReader(InputStream arg0)
                throws IOException {
            return new KeyReader(arg0);
        }
       
    }
   
    /*
     * Class to reak keys from the sorter (i.e. from disk)
     * back into memory for processing.
     */
    public static class KeyReader extends DataReader<SortKey> {

        private InputStream is;
        private ByteBuffer length;
        public KeyReader (InputStream is) {
            this.is = is;
            length = ByteBuffer.allocate(4);
        }
       
        @Override
        public void close() throws IOException {
            is.close();
        }

        @Override
        public int estimateSizeInBytes(SortKey arg0) {
            return arg0.getSize();
        }

        @Override
        public SortKey readNext() throws IOException {
           
            SortKey key = new SortKey();
            int states = readLength();
            if (states < 0) {
                return null;
            }
            for (int i = 0; i < states; i++) {
                KeyState state = readKeyState();
                if (state == null) {
                    return null;
                }
                key.sortKeys.add(state);
            }
           
            key.rowValue = readValue();
           
            return key;
        }

        private KeyState readKeyState() throws IOException {
            int size = readLength();
            if (size < 1) {
                return null;
            }
            byte[] bytes = new byte[size];
            int bytesRead = is.read(bytes);
           
            assert bytesRead == size: "Invalid byte count on key state read";
           
            return new KeyState(bytes);
        }
       
        private Value readValue() throws IOException {
            int size = readLength();
            if (size < 1) { return null; }
            Value value = new Value ((Persistit)null);
            value.setMaximumSize(size);
            value.ensureFit(size);
            int bytesRead = is.read(value.getEncodedBytes(), 0, size);
            assert bytesRead == size : "Invalid byte count on value read";
            value.setEncodedSize(size);
            return value;
        }
       
        private int readLength() throws IOException {
            length.clear();
            int bytesRead = is.read(length.array());
            if (bytesRead == -1) { // EOF marker
                return -1;
            }
            assert bytesRead == 4 : "Invalid byte count on length read";
            return length.getInt();
        }
    }

    /*
     * Class to read rows from the input cursor to the Sort,
     * converting them to SortKey elements for the Sorter.
     */
    public class KeyReadCursor extends DataReader<SortKey> {
       
        private int rowCount = 0;
        private int rowFields;
        private TInstance tFieldTypes[];
        private AkCollator collators[];
        private PersistitValueValueTarget valueTarget;
        private RowCursor input;
        boolean done = false;
       
        public KeyReadCursor (RowCursor input) {
            this.rowFields = rowType.nFields();
            this.tFieldTypes = new TInstance[rowFields];
            this.collators = new AkCollator[rowFields];
            for (int i = 0; i < rowFields; i++) {
                tFieldTypes[i] = rowType.typeAt(i);
                if (tFieldTypes[i] != null && tFieldTypes[i].typeClass() instanceof TString) {
                    collators[i] = TString.getCollator(tFieldTypes[i]);
                }
            }
            valueTarget = new PersistitValueValueTarget();
            this.input = input;
        }
       
        @Override
        public void close() throws IOException {
            // Do Nothing;
        }

        @Override
        public int estimateSizeInBytes(SortKey arg0) {
            return arg0.getSize();
        }

        @Override
        public SortKey readNext() {
            SortKey sortKey = null;
            if(done) {
                return sortKey;
            }
            loadTap.in();
            try {
                Row row = input.next();
                context.checkQueryCancelation();
   
                if (row != null) {
                    ++rowCount;
                    sortKey = new SortKey (createKey(row, rowCount), createValue(row));
                } else {
                    done = true;
                }
            } finally {
                loadTap.out();
            }
            return sortKey;
        }
       
        private List<KeyState> createKey(Row row, int rowCount) {
            KeyState[] states = new KeyState[orderChanges.size() - 1];
            for(int i = 0; i < states.length; ++i) {
                int startOffset = orderChanges.get(i);
                int endOffset = orderChanges.get(i + 1);
                boolean isLast = i == states.length - 1;
                // Loop for key growth
                while(true) {
                    try {
                        sortKey.clear();
                        for(int j = startOffset; j < endOffset; ++j) {
                            sorterAdapter.evaluateToKey(row, j);
                        }
                        if(isLast && sorterAdapter.preserveDuplicates()) {
                            sortKey.append(rowCount);
                        }
                        break;
                    } catch (KeyTooLongException | StorageKeySizeExceededException e) {
                        enlargeKey(sortKey);
                    }
                }
                states[i] = new KeyState(sortKey);
            }
            return Arrays.asList(states);
        }

        private static final int SIZE_GRANULARITY = 256;
        private Value createValue(Row row)
        {
            // Do a rough calculation of size of the row data
            int size = 0;
            for (int i = 0; i < rowFields; i++) {
                if (tFieldTypes[i] == null) {
                    size += 1;
                } else if (tFieldTypes[i].typeClass().hasFixedSerializationSize()) {
                    size += tFieldTypes[i].typeClass().fixedSerializationSize() + 2;
                } else {
                    ValueSource src = row.value(i);
                    if (!src.isNull()) {
                        switch (TInstance.underlyingType(src.getType())) {
                        case STRING:
                            size += AkCollator.getDebugString(src, collators[i]).length() * 2 + 3;
                            break;
                        case BYTES:
                            size += src.getBytes().length;
                            break;
                        default:
                            throw new IllegalArgumentException("Unexpected UnderlyingType: " + src.getType());
                        }
                    } else {
                        size += 1;
                    }
                }
            }
            size = ((size  + SIZE_GRANULARITY - 1) / SIZE_GRANULARITY) * SIZE_GRANULARITY;
           
            // Create a new conversion value
            Value convertValue =  new Value ((Persistit)null,
                        Math.max(size, Value.INITIAL_SIZE),
                        Math.max(size, Value.DEFAULT_MAXIMUM_SIZE));
            valueTarget.attach(convertValue);           
            // Covert the row to the Value for storage in the SortKey
            while(true) {
                try {
                    convertValue.clear();
                    convertValue.setStreamMode(true);
                    for (int i = 0; i < rowFields; i++) {
                        ValueSource field = row.value(i);
                        if (field.isNull()) {
                            valueTarget.putNull();
                        } else {
                            tFieldTypes[i].writeCanonical(field, valueTarget);
                        }
                    }
                    break;
                } catch (ConversionException e) {
                    enlargeValue(convertValue);
                }
            }
            // reset some more un-needed internal state. But this requires
            // making a copy of the internal data, again.
            return new Value(convertValue);
        }
       
        private void enlargeValue (Value value) {
            if (value.getMaximumSize() == Value.MAXIMUM_SIZE) {
                throw new KeyTooLongException("Maximum size exceeded=" + Value.MAXIMUM_SIZE);
            }
            value.setMaximumSize(Math.min(value.getMaximumSize() *2, Value.MAXIMUM_SIZE));
        }
       
        private void enlargeKey (Key key) {
            if (key.getMaximumSize() == Key.MAX_KEY_LENGTH_UPPER_BOUND) {
                throw new KeyTooLongException("Maximum size exceeded=" + Key.MAX_KEY_LENGTH_UPPER_BOUND);
            }
            key.setMaximumSize(Math.min((key.getMaximumSize() * 2), Key.MAX_KEY_LENGTH_UPPER_BOUND));
        }
       
        public int rowCount() {
            return rowCount;
        }
    }
   
    private class KeyWriterFactory extends DataWriterFactory<SortKey> {

        @Override
        public DataWriter<SortKey> constructWriter(OutputStream arg0)
                throws IOException {
            return new KeyWriter(arg0);
        }
    }
   
    public static class KeyWriter extends DataWriter<SortKey> {
        private OutputStream os;
        private ByteBuffer length;

        public KeyWriter(OutputStream os) {
            this.os = os;
            length = ByteBuffer.allocate(4);
        }
        @Override
        public void close() throws IOException {
            os.close();
           
        }

        @Override
        public void writeEntry(SortKey arg0) throws IOException {
            writeInt(arg0.sortKeys.size());
            for (KeyState state : arg0.sortKeys) {
                writeKeyState (state);
            }
            writeKey (arg0.rowValue);
        }
       
        private void writeKeyState (KeyState state) throws IOException {
            writeInt (state.getBytes().length);
            os.write(state.getBytes());
        }
       
        private void writeKey (Value key) throws IOException {
            writeInt(key.getEncodedSize());
            os.write(key.getEncodedBytes(), 0, key.getEncodedSize());
        }
       
        private void writeInt (int size) throws IOException {
            length.clear();
            length.putInt(size);
            os.write(length.array());
        }
    }

    /*
     * Class to provide temporary file names for inserting the
     * overflow buffers to disk. Implemented to the MergeJoin sort interface
     */
    public class MergeTempFileProvider implements TempFileProvider {
       
        private final File directory;
        private final String prefix;
        private final String suffix;
        public MergeTempFileProvider (QueryContext context) {
            directory = new File (context.getServiceManager().getConfigurationService().getProperty("fdbsql.tmp_dir"));
            suffix = ".tmp";
            String tmpPrefix;
            tmpPrefix = "sort-" +  context.getSessionId() + "-";
            prefix = tmpPrefix;
        }

        @Override
        public File provide() throws IOException {
            File f = File.createTempFile(prefix, suffix, directory);
            f.deleteOnExit();
            return f;
        }
    }
   
    /*
     * Class to create a cursor which reads the final sorted output
     * from the file, returning each sorted item as a Row.
     */
    public static class KeyFinalCursor extends RowCursorImpl {

        private final Iterator<SortKey> sortIterator;
        private final RowType rowType;
        private PersistitValueValueSource valueSource;
        private API.SortOption sortOption;
        private Comparator<SortKey> compare;
        private SortKey nextKey;
       
        public KeyFinalCursor(Iterator<SortKey> sortIterator, RowType rowType, API.SortOption sortOption, Comparator<SortKey> compare) {
            this.sortIterator = sortIterator;
            this.rowType = rowType;
            this.sortOption = sortOption;
            this.compare = compare;
            valueSource = new PersistitValueValueSource();
        }
       
        @Override
        public Row next() {
            CursorLifecycle.checkIdleOrActive(this);
            Row row = null;
            try {
                SortKey key = (nextKey != null) ? nextKey : (sortIterator.hasNext() ? sortIterator.next() : null);
                if(key != null) {
                    if(sortOption == API.SortOption.SUPPRESS_DUPLICATES) {
                        nextKey = skipDuplicates(key);
                    }
                    row = createRow(key);
                }
            } catch(IterableSorterException e) {
                throw new MergeSortIOException(e.getMessage());
            }
            return row;
        }
       
        /*
         * The FasterXML.Sort isn't capable of removing duplicates, it just
         * puts them in order in the sort output. Skip the duplicates by reading
         * until the end of the stream or until a different key appears
         */
        private SortKey skipDuplicates (SortKey startKey) {
            while (true) {
                SortKey newKey = sortIterator.hasNext() ? sortIterator.next() : null;
                if (newKey == null || compare.compare(startKey, newKey) != 0) {
                    return newKey;
                }
            }
        }
       
        private Row createRow (SortKey key) {
            ValuesHolderRow rowCopy = new ValuesHolderRow(rowType);
            valueSource.attach(key.rowValue);
            for(int i = 0 ; i < rowType.nFields(); ++i) {
                valueSource.getReady(rowType.typeAt(i));
                if (valueSource.isNull()) {
                    rowCopy.valueAt(i).putNull();
                } else {
                    rowType.typeAt(i).writeCanonical(valueSource, rowCopy.valueAt(i));
                }
            }
            return rowCopy;
        }
    }
   
    /*
     * Comparison function, implemented for MergeSort to compare
     * the KeyState lists generated by the KeyReadCursor
     */
    public static class KeySortCompare implements Comparator<SortKey> {
        private final Comparator<KeyState>[] comparators;

        @SuppressWarnings("unchecked")
        private KeySortCompare (List<Comparator<KeyState>> comparators) {
            this.comparators = comparators.toArray(new Comparator[comparators.size()]);
        }

        @Override
        public int compare(SortKey o1, SortKey o2) {
            int val = 0;
            for (int i = 0; (i < comparators.length) && (val == 0); ++i) {
                val = comparators[i].compare(o1.sortKeys.get(i), o2.sortKeys.get(i));
            }
            return val;
        }
    }

    private static final Comparator<KeyState> ASC_COMPARATOR = new Comparator<KeyState>() {
        @Override
        public int compare(KeyState k1, KeyState k2) {
            return k1.compareTo(k2);
        }
    };

    private static final Comparator<KeyState> DESC_COMPARATOR = new Comparator<KeyState>() {
        @Override
        public int compare(KeyState k1, KeyState k2) {
            return k2.compareTo(k1);
        }
    };
}
TOP

Related Classes of com.foundationdb.qp.storeadapter.indexcursor.MergeJoinSorter$SortKey

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.