Package org.apache.flink.runtime.operators.hash

Source Code of org.apache.flink.runtime.operators.hash.CompactingHashTable

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.operators.hash;

import java.io.EOFException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypePairComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.memorymanager.ListMemorySegmentSource;
import org.apache.flink.runtime.util.IntArrayList;
import org.apache.flink.runtime.util.LongArrayList;
import org.apache.flink.runtime.util.MathUtils;
import org.apache.flink.util.MutableObjectIterator;

/**
* An implementation of an in-memory Hash Table for variable-length records.
* <p>
* The design of this class follows on many parts the design presented in
* "Hash joins and hash teams in Microsoft SQL Server", by Goetz Graefe et al..
* <p>
* <hr>
* The layout of the buckets inside a memory segment is as follows:
*
* <pre>
* +----------------------------- Bucket x ----------------------------
* |Partition (1 byte) | reserved (3 bytes) | element count (4 bytes) |
* | next-bucket-in-chain-pointer (8 bytes) |
* |
* |hashCode 1 (4 bytes) | hashCode 2 (4 bytes) | hashCode 3 (4 bytes) |
* | ... hashCode n-1 (4 bytes) | hashCode n (4 bytes)
* |
* |pointer 1 (8 bytes) | pointer 2 (8 bytes) | pointer 3 (8 bytes) |
* | ... pointer n-1 (8 bytes) | pointer n (8 bytes)
* |
* +---------------------------- Bucket x + 1--------------------------
* |Partition (1 byte) | reserved (3 bytes) | element count (4 bytes) |
* | next-bucket-in-chain-pointer (8 bytes) |
* |
* |hashCode 1 (4 bytes) | hashCode 2 (4 bytes) | hashCode 3 (4 bytes) |
* | ... hashCode n-1 (4 bytes) | hashCode n (4 bytes)
* |
* |pointer 1 (8 bytes) | pointer 2 (8 bytes) | pointer 3 (8 bytes) |
* | ... pointer n-1 (8 bytes) | pointer n (8 bytes)
* +-------------------------------------------------------------------
* | ...
* |
* </pre>
* @param <T> Record type stored in hash table
*/
public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{

  private static final Logger LOG = LoggerFactory.getLogger(CompactingHashTable.class);
 
  // ------------------------------------------------------------------------
  //                         Internal Constants
  // ------------------------------------------------------------------------
 
  private static final int MIN_NUM_MEMORY_SEGMENTS = 33;
 
  /**
   * The maximum number of partitions
   */
  private static final int MAX_NUM_PARTITIONS = 32;
 
  /**
   * The default record width that is used when no width is given. The record width is
   * used to determine the ratio of the number of memory segments intended for partition
   * buffers and the number of memory segments in the hash-table structure.
   */
  private static final int DEFAULT_RECORD_LEN = 24; //FIXME maybe find a better default
 
  /**
   * The length of the hash code stored in the bucket.
   */
  private static final int HASH_CODE_LEN = 4;
 
  /**
   * The length of a pointer from a hash bucket to the record in the buffers.
   */
  private static final int POINTER_LEN = 8;
 
  /**
   * The number of bytes that the entry in the hash structure occupies, in bytes.
   * It corresponds to a 4 byte hash value and an 8 byte pointer.
   */
  private static final int RECORD_TABLE_BYTES = HASH_CODE_LEN + POINTER_LEN;
 
  /**
   * The total storage overhead per record, in bytes. This corresponds to the space in the
   * actual hash table buckets, consisting of a 4 byte hash value and an 8 byte
   * pointer, plus the overhead for the stored length field.
   */
  private static final int RECORD_OVERHEAD_BYTES = RECORD_TABLE_BYTES + 2;
 
  // -------------------------- Bucket Size and Structure -------------------------------------
 
  private static final int NUM_INTRA_BUCKET_BITS = 7;
 
  private static final int HASH_BUCKET_SIZE = 0x1 << NUM_INTRA_BUCKET_BITS;
 
  private static final int BUCKET_HEADER_LENGTH = 16;
 
  private static final int NUM_ENTRIES_PER_BUCKET = (HASH_BUCKET_SIZE - BUCKET_HEADER_LENGTH) / RECORD_TABLE_BYTES;
 
  private static final int BUCKET_POINTER_START_OFFSET = BUCKET_HEADER_LENGTH + (NUM_ENTRIES_PER_BUCKET * HASH_CODE_LEN);
 
  // ------------------------------ Bucket Header Fields ------------------------------
 
  /**
   * Offset of the field in the bucket header indicating the bucket's partition.
   */
  private static final int HEADER_PARTITION_OFFSET = 0;
 
  /**
   * Offset of the field in the bucket header indicating the bucket's status (spilled or in-memory).
   */
  private static final int HEADER_COUNT_OFFSET = 4;
 
  /**
   * Offset of the field in the bucket header that holds the forward pointer to its
   * first overflow bucket.
   */
  private static final int HEADER_FORWARD_OFFSET = 8;
 
  /**
   * Constant for the forward pointer, indicating that the pointer is not set.
   */
  private static final long BUCKET_FORWARD_POINTER_NOT_SET = ~0x0L;
 
  // ------------------------------------------------------------------------
  //                              Members
  // ------------------------------------------------------------------------

  /**
   * The free memory segments currently available to the hash join.
   */
  private final ArrayList<MemorySegment> availableMemory;
 
  /**
   * The size of the segments used by the hash join buckets. All segments must be of equal size to ease offset computations.
   */
  private final int segmentSize;
 
  /**
   * The number of hash table buckets in a single memory segment - 1.
   * Because memory segments can be comparatively large, we fit multiple buckets into one memory segment.
   * This variable is a mask that is 1 in the lower bits that define the number of a bucket
   * in a segment.
   */
  private final int bucketsPerSegmentMask;
 
  /**
   * The number of bits that describe the position of a bucket in a memory segment. Computed as log2(bucketsPerSegment).
   */
  private final int bucketsPerSegmentBits;
 
  /**
   * An estimate for the average record length.
   */
  private final int avgRecordLen;
 
  // ------------------------------------------------------------------------
 
  /**
   * The partitions of the hash table.
   */
  private final ArrayList<InMemoryPartition<T>> partitions;
 
  /**
   * The array of memory segments that contain the buckets which form the actual hash-table
   * of hash-codes and pointers to the elements.
   */
  private MemorySegment[] buckets;
 
  /**
   * temporary storage for partition compaction (always attempts to allocate as many segments as the largest partition)
   */
  private InMemoryPartition<T> compactionMemory;
 
  /**
   * The number of buckets in the current table. The bucket array is not necessarily fully
   * used, when not all buckets that would fit into the last segment are actually used.
   */
  private int numBuckets;
 
  /**
   * flag necessary so a resize is never triggered during a resize since the code paths are interleaved
   */
  private boolean isResizing = false;
 
  private AtomicBoolean closed = new AtomicBoolean();
 
  private boolean running = true;
   
  private int pageSizeInBits;

  // ------------------------------------------------------------------------
  //                         Construction and Teardown
  // ------------------------------------------------------------------------
 
  public CompactingHashTable(TypeSerializer<T> buildSideSerializer, TypeComparator<T> buildSideComparator, List<MemorySegment> memorySegments)
  {
    this(buildSideSerializer, buildSideComparator, memorySegments, DEFAULT_RECORD_LEN);
  }
 
  public CompactingHashTable(TypeSerializer<T> buildSideSerializer, TypeComparator<T> buildSideComparator, List<MemorySegment> memorySegments, int avgRecordLen)
  {
    super(buildSideSerializer, buildSideComparator);
    // some sanity checks first
    if (memorySegments == null) {
      throw new NullPointerException();
    }
    if (memorySegments.size() < MIN_NUM_MEMORY_SEGMENTS) {
      throw new IllegalArgumentException("Too few memory segments provided. Hash Table needs at least " +
        MIN_NUM_MEMORY_SEGMENTS + " memory segments.");
    }
   
    this.availableMemory = (memorySegments instanceof ArrayList) ?
        (ArrayList<MemorySegment>) memorySegments :
        new ArrayList<MemorySegment>(memorySegments);

   
    this.avgRecordLen = buildSideSerializer.getLength() > 0 ? buildSideSerializer.getLength() : avgRecordLen;
   
    // check the size of the first buffer and record it. all further buffers must have the same size.
    // the size must also be a power of 2
    this.segmentSize = memorySegments.get(0).size();
    if ( (this.segmentSize & this.segmentSize - 1) != 0) {
      throw new IllegalArgumentException("Hash Table requires buffers whose size is a power of 2.");
    }
    int bucketsPerSegment = this.segmentSize >> NUM_INTRA_BUCKET_BITS;
    if (bucketsPerSegment == 0) {
      throw new IllegalArgumentException("Hash Table requires buffers of at least " + HASH_BUCKET_SIZE + " bytes.");
    }
    this.bucketsPerSegmentMask = bucketsPerSegment - 1;
    this.bucketsPerSegmentBits = MathUtils.log2strict(bucketsPerSegment);
   
    this.partitions = new ArrayList<InMemoryPartition<T>>();
   
    // because we allow to open and close multiple times, the state is initially closed
    this.closed.set(true);
    // so far no partition has any MemorySegments
  }
 
 
  // ------------------------------------------------------------------------
  //                              Life-Cycle
  // ------------------------------------------------------------------------
 
  /**
   * Build the hash table
   */
  public void open() {
    // sanity checks
    if (!this.closed.compareAndSet(true, false)) {
      throw new IllegalStateException("Hash Table cannot be opened, because it is currently not closed.");
    }
   
    // create the partitions
    final int partitionFanOut = getPartitioningFanOutNoEstimates(this.availableMemory.size());
    createPartitions(partitionFanOut);
   
    // set up the table structure. the write behind buffers are taken away, as are one buffer per partition
    final int numBuckets = getInitialTableSize(this.availableMemory.size(), this.segmentSize,
      partitionFanOut, this.avgRecordLen);
   
    initTable(numBuckets, (byte) partitionFanOut);
  }

 
  /**
   * Closes the hash table. This effectively releases all internal structures and closes all
   * open files and removes them. The call to this method is valid both as a cleanup after the
   * complete inputs were properly processed, and as an cancellation call, which cleans up
   * all resources that are currently held by the hash join. If another process still access the hash
   * table after close has been called no operations will be performed.
   */
  public void close() {
    // make sure that we close only once
    if (!this.closed.compareAndSet(false, true)) {
      return;
    }
   
    LOG.debug("Closing hash table and releasing resources.");
   
    // release the table structure
    releaseTable();
   
    // clear the memory in the partitions
    clearPartitions();
  }
 
  public void abort() {
    this.running = false;
   
    LOG.debug("Cancelling hash table operations.");
  }
 
  public List<MemorySegment> getFreeMemory() {
    if (!this.closed.get()) {
      throw new IllegalStateException("Cannot return memory while join is open.");
    }
   
    return this.availableMemory;
  }
 
 
  public void buildTable(final MutableObjectIterator<T> input) throws IOException {
    T record = this.buildSideSerializer.createInstance();
   
    // go over the complete input and insert every element into the hash table
    while (this.running && ((record = input.next(record)) != null)) {
      insert(record);
    }
  }
 
  public final void insert(T record) throws IOException {
    if(this.closed.get()) {
      return;
    }
    final int hashCode = hash(this.buildSideComparator.hash(record));
    final int posHashCode = hashCode % this.numBuckets;
   
    // get the bucket for the given hash code
    final int bucketArrayPos = posHashCode >>> this.bucketsPerSegmentBits;
    final int bucketInSegmentPos = (posHashCode & this.bucketsPerSegmentMask) << NUM_INTRA_BUCKET_BITS;
    final MemorySegment bucket = this.buckets[bucketArrayPos];
   
    // get the basic characteristics of the bucket
    final int partitionNumber = bucket.get(bucketInSegmentPos + HEADER_PARTITION_OFFSET);
    InMemoryPartition<T> partition = this.partitions.get(partitionNumber);
   
   
    long pointer;
    try {
      pointer = partition.appendRecord(record);
      if((pointer >> this.pageSizeInBits) > this.compactionMemory.getBlockCount()) {
        this.compactionMemory.allocateSegments((int)(pointer >> this.pageSizeInBits));
      }
    } catch (EOFException e) {
      try {
        compactPartition(partitionNumber);
        // retry append
        partition = this.partitions.get(partitionNumber); // compaction invalidates reference
        pointer = partition.appendRecord(record);
      } catch (EOFException ex) {
        throw new RuntimeException("Memory ran out. Compaction failed. " +
                      getMemoryConsumptionString() +
                      " Message: " + ex.getMessage());
      } catch (IndexOutOfBoundsException ex) {
        throw new RuntimeException("Memory ran out. Compaction failed. " +
                      getMemoryConsumptionString() +
                      " Message: " + ex.getMessage());
      }
    } catch (IndexOutOfBoundsException e1) {
      try {
        compactPartition(partitionNumber);
        // retry append
        partition = this.partitions.get(partitionNumber); // compaction invalidates reference
        pointer = partition.appendRecord(record);
      } catch (EOFException ex) {
        throw new RuntimeException("Memory ran out. Compaction failed. " +
                      getMemoryConsumptionString() +
                      " Message: " + ex.getMessage());
      } catch (IndexOutOfBoundsException ex) {
        throw new RuntimeException("Memory ran out. Compaction failed. " +
                      getMemoryConsumptionString() +
                      " Message: " + ex.getMessage());
      }
    }
    insertBucketEntryFromStart(partition, bucket, bucketInSegmentPos, hashCode, pointer);
  }
 
 
  @Override
  public <PT> HashTableProber<PT> getProber(TypeComparator<PT> probeSideComparator, TypePairComparator<PT, T> pairComparator) {
    return new HashTableProber<PT>(probeSideComparator, pairComparator);
  }
 
  /**
   *
   * @return Iterator over hash table
   * @see EntryIterator
   */
  public MutableObjectIterator<T> getEntryIterator() {
    return new EntryIterator(this);
  }
 
  /**
   * Replaces record in hash table if record already present or append record if not.
   * May trigger expensive compaction.
   *
   * @param record record to insert or replace
   * @param tempHolder instance of T that will be overwritten
   * @throws IOException
   */
  public void insertOrReplaceRecord(T record, T tempHolder) throws IOException {
    if(this.closed.get()) {
      return;
    }
   
    final int searchHashCode = hash(this.buildSideComparator.hash(record));
    final int posHashCode = searchHashCode % this.numBuckets;
   
    // get the bucket for the given hash code
    MemorySegment originalBucket = this.buckets[posHashCode >> this.bucketsPerSegmentBits];
    int originalBucketOffset = (posHashCode & this.bucketsPerSegmentMask) << NUM_INTRA_BUCKET_BITS;
    MemorySegment bucket = originalBucket;
    int bucketInSegmentOffset = originalBucketOffset;
   
    // get the basic characteristics of the bucket
    final int partitionNumber = bucket.get(bucketInSegmentOffset + HEADER_PARTITION_OFFSET);
    InMemoryPartition<T> partition = this.partitions.get(partitionNumber);
    final MemorySegment[] overflowSegments = partition.overflowSegments;
   
    this.buildSideComparator.setReference(record);
   
    int countInSegment = bucket.getInt(bucketInSegmentOffset + HEADER_COUNT_OFFSET);
    int numInSegment = 0;
    int posInSegment = bucketInSegmentOffset + BUCKET_HEADER_LENGTH;
   
    long currentForwardPointer = BUCKET_FORWARD_POINTER_NOT_SET;

    // loop over all segments that are involved in the bucket (original bucket plus overflow buckets)
    while (true) {
     
      while (numInSegment < countInSegment) {
       
        final int thisCode = bucket.getInt(posInSegment);
        posInSegment += HASH_CODE_LEN;
         
        // check if the hash code matches
        if (thisCode == searchHashCode) {
          // get the pointer to the pair
          final int pointerOffset = bucketInSegmentOffset + BUCKET_POINTER_START_OFFSET + (numInSegment * POINTER_LEN);
          final long pointer = bucket.getLong(pointerOffset);
          numInSegment++;
         
          // deserialize the key to check whether it is really equal, or whether we had only a hash collision
          try {
            tempHolder = partition.readRecordAt(pointer, tempHolder);
            if (this.buildSideComparator.equalToReference(tempHolder)) {
              long newPointer = partition.appendRecord(record);
              bucket.putLong(pointerOffset, newPointer);
              partition.setCompaction(false);
              if((newPointer >> this.pageSizeInBits) > this.compactionMemory.getBlockCount()) {
                this.compactionMemory.allocateSegments((int)(newPointer >> this.pageSizeInBits));
              }
              return;
            }
          } catch (EOFException e) {
            // system is out of memory so we attempt to reclaim memory with a copy compact run
            long newPointer;
            try {
              compactPartition(partition.getPartitionNumber());
              // retry append
              partition = this.partitions.get(partitionNumber); // compaction invalidates reference
              newPointer = partition.appendRecord(record);
            } catch (EOFException ex) {
              throw new RuntimeException("Memory ran out. Compaction failed. " +
                            getMemoryConsumptionString() +
                            " Message: " + ex.getMessage());
            } catch (IndexOutOfBoundsException ex) {
              throw new RuntimeException("Memory ran out. Compaction failed. " +
                            getMemoryConsumptionString() +
                            " Message: " + ex.getMessage());
            }
            bucket.putLong(pointerOffset, newPointer);
            return;
          } catch (IndexOutOfBoundsException e) {
            // system is out of memory so we attempt to reclaim memory with a copy compact run
            long newPointer;
            try {
              compactPartition(partition.getPartitionNumber());
              // retry append
              partition = this.partitions.get(partitionNumber); // compaction invalidates reference
              newPointer = partition.appendRecord(record);
            } catch (EOFException ex) {
              throw new RuntimeException("Memory ran out. Compaction failed. " +
                            getMemoryConsumptionString() +
                            " Message: " + ex.getMessage());
            } catch (IndexOutOfBoundsException ex) {
              throw new RuntimeException("Memory ran out. Compaction failed. " +
                            getMemoryConsumptionString() +
                            " Message: " + ex.getMessage());
            }
            bucket.putLong(pointerOffset, newPointer);
            return;
          } catch (IOException e) {
            throw new RuntimeException("Error deserializing record from the hashtable: " + e.getMessage(), e);
          }
        }
        else {
          numInSegment++;
        }
      }
     
      // this segment is done. check if there is another chained bucket
      long newForwardPointer = bucket.getLong(bucketInSegmentOffset + HEADER_FORWARD_OFFSET);
      if (newForwardPointer == BUCKET_FORWARD_POINTER_NOT_SET) {
        // nothing found. append and insert
        long pointer = partition.appendRecord(record);
        //insertBucketEntryFromStart(partition, originalBucket, originalBucketOffset, searchHashCode, pointer);
        insertBucketEntryFromSearch(partition, originalBucket, bucket, originalBucketOffset, bucketInSegmentOffset, countInSegment, currentForwardPointer, searchHashCode, pointer);
        if((pointer >> this.pageSizeInBits) > this.compactionMemory.getBlockCount()) {
          this.compactionMemory.allocateSegments((int)(pointer >> this.pageSizeInBits));
        }
        return;
      }
     
      final int overflowSegNum = (int) (newForwardPointer >>> 32);
      bucket = overflowSegments[overflowSegNum];
      bucketInSegmentOffset = (int) (newForwardPointer & 0xffffffff);
      countInSegment = bucket.getInt(bucketInSegmentOffset + HEADER_COUNT_OFFSET);
      posInSegment = bucketInSegmentOffset + BUCKET_HEADER_LENGTH;
      numInSegment = 0;
      currentForwardPointer = newForwardPointer;
    }
  }

  private void insertBucketEntryFromStart(InMemoryPartition<T> p, MemorySegment bucket,
      int bucketInSegmentPos, int hashCode, long pointer)
  throws IOException
  {
    boolean checkForResize = false;
    // find the position to put the hash code and pointer
    final int count = bucket.getInt(bucketInSegmentPos + HEADER_COUNT_OFFSET);
    if (count < NUM_ENTRIES_PER_BUCKET) {
      // we are good in our current bucket, put the values
      bucket.putInt(bucketInSegmentPos + BUCKET_HEADER_LENGTH + (count * HASH_CODE_LEN), hashCode)// hash code
      bucket.putLong(bucketInSegmentPos + BUCKET_POINTER_START_OFFSET + (count * POINTER_LEN), pointer); // pointer
      bucket.putInt(bucketInSegmentPos + HEADER_COUNT_OFFSET, count + 1); // update count
    } else {
      // we need to go to the overflow buckets
      final long originalForwardPointer = bucket.getLong(bucketInSegmentPos + HEADER_FORWARD_OFFSET);
      final long forwardForNewBucket;
     
      if (originalForwardPointer != BUCKET_FORWARD_POINTER_NOT_SET) {
       
        // forward pointer set
        final int overflowSegNum = (int) (originalForwardPointer >>> 32);
        final int segOffset = (int) (originalForwardPointer & 0xffffffff);
        final MemorySegment seg = p.overflowSegments[overflowSegNum];
       
        final int obCount = seg.getInt(segOffset + HEADER_COUNT_OFFSET);
       
        // check if there is space in this overflow bucket
        if (obCount < NUM_ENTRIES_PER_BUCKET) {
          // space in this bucket and we are done
          seg.putInt(segOffset + BUCKET_HEADER_LENGTH + (obCount * HASH_CODE_LEN), hashCode)// hash code
          seg.putLong(segOffset + BUCKET_POINTER_START_OFFSET + (obCount * POINTER_LEN), pointer); // pointer
          seg.putInt(segOffset + HEADER_COUNT_OFFSET, obCount + 1); // update count
          return;
        } else {
          // no space here, we need a new bucket. this current overflow bucket will be the
          // target of the new overflow bucket
          forwardForNewBucket = originalForwardPointer;
        }
      } else {
        // no overflow bucket yet, so we need a first one
        forwardForNewBucket = BUCKET_FORWARD_POINTER_NOT_SET;
      }
     
      // we need a new overflow bucket
      MemorySegment overflowSeg;
      final int overflowBucketNum;
      final int overflowBucketOffset;
     
      // first, see if there is space for an overflow bucket remaining in the last overflow segment
      if (p.nextOverflowBucket == 0) {
        // no space left in last bucket, or no bucket yet, so create an overflow segment
        overflowSeg = getNextBuffer();
        overflowBucketOffset = 0;
        overflowBucketNum = p.numOverflowSegments;
       
        // add the new overflow segment
        if (p.overflowSegments.length <= p.numOverflowSegments) {
          MemorySegment[] newSegsArray = new MemorySegment[p.overflowSegments.length * 2];
          System.arraycopy(p.overflowSegments, 0, newSegsArray, 0, p.overflowSegments.length);
          p.overflowSegments = newSegsArray;
        }
        p.overflowSegments[p.numOverflowSegments] = overflowSeg;
        p.numOverflowSegments++;
        checkForResize = true;
      } else {
        // there is space in the last overflow bucket
        overflowBucketNum = p.numOverflowSegments - 1;
        overflowSeg = p.overflowSegments[overflowBucketNum];
        overflowBucketOffset = p.nextOverflowBucket << NUM_INTRA_BUCKET_BITS;
      }
     
      // next overflow bucket is one ahead. if the segment is full, the next will be at the beginning
      // of a new segment
      p.nextOverflowBucket = (p.nextOverflowBucket == this.bucketsPerSegmentMask ? 0 : p.nextOverflowBucket + 1);
     
      // insert the new overflow bucket in the chain of buckets
      // 1) set the old forward pointer
      // 2) let the bucket in the main table point to this one
      overflowSeg.putLong(overflowBucketOffset + HEADER_FORWARD_OFFSET, forwardForNewBucket);
      final long pointerToNewBucket = (((long) overflowBucketNum) << 32) | ((long) overflowBucketOffset);
      bucket.putLong(bucketInSegmentPos + HEADER_FORWARD_OFFSET, pointerToNewBucket);
     
      // finally, insert the values into the overflow buckets
      overflowSeg.putInt(overflowBucketOffset + BUCKET_HEADER_LENGTH, hashCode)// hash code
      overflowSeg.putLong(overflowBucketOffset + BUCKET_POINTER_START_OFFSET, pointer); // pointer
     
      // set the count to one
      overflowSeg.putInt(overflowBucketOffset + HEADER_COUNT_OFFSET, 1);
      if(checkForResize && !this.isResizing) {
        // check if we should resize buckets
        if(this.buckets.length <= getOverflowSegmentCount()) {
          resizeHashTable();
        }
      }
    }
  }
 
  private void insertBucketEntryFromSearch(InMemoryPartition<T> partition, MemorySegment originalBucket, MemorySegment currentBucket, int originalBucketOffset, int currentBucketOffset, int countInCurrentBucket, long currentForwardPointer, int hashCode, long pointer) throws IOException {
    boolean checkForResize = false;
    if (countInCurrentBucket < NUM_ENTRIES_PER_BUCKET) {
      // we are good in our current bucket, put the values
      currentBucket.putInt(currentBucketOffset + BUCKET_HEADER_LENGTH + (countInCurrentBucket * HASH_CODE_LEN), hashCode)// hash code
      currentBucket.putLong(currentBucketOffset + BUCKET_POINTER_START_OFFSET + (countInCurrentBucket * POINTER_LEN), pointer); // pointer
      currentBucket.putInt(currentBucketOffset + HEADER_COUNT_OFFSET, countInCurrentBucket + 1); // update count
    } else {
      // we need a new overflow bucket
      MemorySegment overflowSeg;
      final int overflowBucketNum;
      final int overflowBucketOffset;
     
      // first, see if there is space for an overflow bucket remaining in the last overflow segment
      if (partition.nextOverflowBucket == 0) {
        // no space left in last bucket, or no bucket yet, so create an overflow segment
        overflowSeg = getNextBuffer();
        overflowBucketOffset = 0;
        overflowBucketNum = partition.numOverflowSegments;
       
        // add the new overflow segment
        if (partition.overflowSegments.length <= partition.numOverflowSegments) {
          MemorySegment[] newSegsArray = new MemorySegment[partition.overflowSegments.length * 2];
          System.arraycopy(partition.overflowSegments, 0, newSegsArray, 0, partition.overflowSegments.length);
          partition.overflowSegments = newSegsArray;
        }
        partition.overflowSegments[partition.numOverflowSegments] = overflowSeg;
        partition.numOverflowSegments++;
        checkForResize = true;
      } else {
        // there is space in the last overflow segment
        overflowBucketNum = partition.numOverflowSegments - 1;
        overflowSeg = partition.overflowSegments[overflowBucketNum];
        overflowBucketOffset = partition.nextOverflowBucket << NUM_INTRA_BUCKET_BITS;
      }
     
      // next overflow bucket is one ahead. if the segment is full, the next will be at the beginning
      // of a new segment
      partition.nextOverflowBucket = (partition.nextOverflowBucket == this.bucketsPerSegmentMask ? 0 : partition.nextOverflowBucket + 1);
     
      // insert the new overflow bucket in the chain of buckets
      // 1) set the old forward pointer
      // 2) let the bucket in the main table point to this one
      overflowSeg.putLong(overflowBucketOffset + HEADER_FORWARD_OFFSET, currentForwardPointer);
      final long pointerToNewBucket = (((long) overflowBucketNum) << 32) | ((long) overflowBucketOffset);
      originalBucket.putLong(originalBucketOffset + HEADER_FORWARD_OFFSET, pointerToNewBucket);
     
      // finally, insert the values into the overflow buckets
      overflowSeg.putInt(overflowBucketOffset + BUCKET_HEADER_LENGTH, hashCode)// hash code
      overflowSeg.putLong(overflowBucketOffset + BUCKET_POINTER_START_OFFSET, pointer); // pointer
     
      // set the count to one
      overflowSeg.putInt(overflowBucketOffset + HEADER_COUNT_OFFSET, 1);
      if(checkForResize && !this.isResizing) {
        // check if we should resize buckets
        if(this.buckets.length <= getOverflowSegmentCount()) {
          resizeHashTable();
        }
      }
    }
  }
 
  // --------------------------------------------------------------------------------------------
  //                          Setup and Tear Down of Structures
  // --------------------------------------------------------------------------------------------

  private void createPartitions(int numPartitions) {
    this.partitions.clear();
   
    ListMemorySegmentSource memSource = new ListMemorySegmentSource(this.availableMemory);
    this.pageSizeInBits = MathUtils.log2strict(this.segmentSize);
   
    for (int i = 0; i < numPartitions; i++) {
      this.partitions.add(new InMemoryPartition<T>(this.buildSideSerializer, i, memSource, this.segmentSize, pageSizeInBits));
    }
    this.compactionMemory = new InMemoryPartition<T>(this.buildSideSerializer, -1, memSource, this.segmentSize, pageSizeInBits);
  }
 
  private void clearPartitions() {
    for (int i = 0; i < this.partitions.size(); i++) {
      InMemoryPartition<T> p = this.partitions.get(i);
      p.clearAllMemory(this.availableMemory);
    }
    this.partitions.clear();
    this.compactionMemory.clearAllMemory(availableMemory);
  }
 
  private void initTable(int numBuckets, byte numPartitions) {
    final int bucketsPerSegment = this.bucketsPerSegmentMask + 1;
    final int numSegs = (numBuckets >>> this.bucketsPerSegmentBits) + ( (numBuckets & this.bucketsPerSegmentMask) == 0 ? 0 : 1);
    final MemorySegment[] table = new MemorySegment[numSegs];
   
    // go over all segments that are part of the table
    for (int i = 0, bucket = 0; i < numSegs && bucket < numBuckets; i++) {
      final MemorySegment seg = getNextBuffer();
     
      // go over all buckets in the segment
      for (int k = 0; k < bucketsPerSegment && bucket < numBuckets; k++, bucket++) {
        final int bucketOffset = k * HASH_BUCKET_SIZE; 
       
        // compute the partition that the bucket corresponds to
        final byte partition = assignPartition(bucket, numPartitions);
       
        // initialize the header fields
        seg.put(bucketOffset + HEADER_PARTITION_OFFSET, partition);
        seg.putInt(bucketOffset + HEADER_COUNT_OFFSET, 0);
        seg.putLong(bucketOffset + HEADER_FORWARD_OFFSET, BUCKET_FORWARD_POINTER_NOT_SET);
      }
     
      table[i] = seg;
    }
    this.buckets = table;
    this.numBuckets = numBuckets;
  }
 
  private void releaseTable() {
    // set the counters back
    this.numBuckets = 0;
    if (this.buckets != null) {
      for (int i = 0; i < this.buckets.length; i++) {
        this.availableMemory.add(this.buckets[i]);
      }
      this.buckets = null;
    }
  }
 
  private final MemorySegment getNextBuffer() {
    // check if the list directly offers memory
    int s = this.availableMemory.size();
    if (s > 0) {
      return this.availableMemory.remove(s-1);
    } else {
      throw new RuntimeException("Memory ran out. " + getMemoryConsumptionString());
    }
  }

  // --------------------------------------------------------------------------------------------
  //                             Utility Computational Functions
  // --------------------------------------------------------------------------------------------
 
  /**
   * Gets the number of partitions to be used for an initial hash-table, when no estimates are
   * available.
   * <p>
   * The current logic makes sure that there are always between 10 and 32 partitions, and close
   * to 0.1 of the number of buffers.
   *
   * @param numBuffers The number of buffers available.
   * @return The number of partitions to use.
   */
  private static final int getPartitioningFanOutNoEstimates(int numBuffers) {
    return Math.max(10, Math.min(numBuffers / 10, MAX_NUM_PARTITIONS));
  }
 
  /**
   * @return String containing a summary of the memory consumption for error messages
   */
  private String getMemoryConsumptionString() {
    String result = new String("numPartitions: " + this.partitions.size() +
        " minPartition: " + getMinPartition() +
        " maxPartition: " + getMaxPartition() +
        " number of overflow segments: " + getOverflowSegmentCount() +
        " bucketSize: " + this.buckets.length +
        " Overall memory: " + getSize() +
        " Partition memory: " + getPartitionSize());
    return result;
  }
 
  /**
   * Size of all memory segments owned by this hash table
   *
   * @return size in bytes
   */
  private long getSize() {
    long numSegments = 0;
    numSegments += this.availableMemory.size();
    numSegments += this.buckets.length;
    for(InMemoryPartition<T> p : this.partitions) {
      numSegments += p.getBlockCount();
      numSegments += p.numOverflowSegments;
    }
    numSegments += this.compactionMemory.getBlockCount();
    return numSegments*this.segmentSize;
  }
 
  /**
   * Size of all memory segments owned by the partitions of this hash table excluding the compaction partition
   *
   * @return size in bytes
   */
  private long getPartitionSize() {
    long numSegments = 0;
    for(InMemoryPartition<T> p : this.partitions) {
      numSegments += p.getBlockCount();
    }
    return numSegments*this.segmentSize;
  }
 
  /**
   * @return number of memory segments in the largest partition
   */
  private int getMaxPartition() {
    int maxPartition = 0;
    for(InMemoryPartition<T> p1 : this.partitions) {
      if(p1.getBlockCount() > maxPartition) {
        maxPartition = p1.getBlockCount();
      }
    }
    return maxPartition;
  }
 
  /**
   * @return number of memory segments in the smallest partition
   */
  private int getMinPartition() {
    int minPartition = Integer.MAX_VALUE;
    for(InMemoryPartition<T> p1 : this.partitions) {
      if(p1.getBlockCount() < minPartition) {
        minPartition = p1.getBlockCount();
      }
    }
    return minPartition;
  }
 
  /**
   * @return number of memory segments used in overflow buckets
   */
  private int getOverflowSegmentCount() {
    int result = 0;
    for(InMemoryPartition<T> p : this.partitions) {
      result += p.numOverflowSegments;
    }
    return result;
  }
 
  /**
   * tries to find a good value for the number of buckets
   * will ensure that the number of buckets is a multiple of numPartitions
   *
   * @return number of buckets
   */
  private static final int getInitialTableSize(int numBuffers, int bufferSize, int numPartitions, int recordLenBytes) {
    final long totalSize = ((long) bufferSize) * numBuffers;
    final long numRecordsStorable = totalSize / (recordLenBytes + RECORD_OVERHEAD_BYTES);
    final long bucketBytes = numRecordsStorable * RECORD_OVERHEAD_BYTES;
    long numBuckets = bucketBytes / (2 * HASH_BUCKET_SIZE) + 1;
    while(numBuckets % numPartitions != 0) {
      numBuckets++;
    }
    return numBuckets > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) numBuckets;
  }
 
  /**
   * Assigns a partition to a bucket.
   *
   * @param bucket bucket index
   * @param numPartitions number of partitions
   * @return The hash code for the integer.
   */
  private static final byte assignPartition(int bucket, byte numPartitions) {
    return (byte) (bucket % numPartitions);
  }
 
  /**
   * Attempts to double the number of buckets
   *
   * @return true on success
   * @throws IOException
   */
  private boolean resizeHashTable() throws IOException {
    final int newNumBuckets = 2*this.numBuckets;
    final int bucketsPerSegment = this.bucketsPerSegmentMask + 1;
    final int newNumSegments = (newNumBuckets + (bucketsPerSegment-1)) / bucketsPerSegment;
    final int additionalSegments = newNumSegments-this.buckets.length;
    final int numPartitions = this.partitions.size();
    if(this.availableMemory.size() < additionalSegments) {
      for(int i = 0; i < numPartitions; i++) {
        compactPartition(i);
        if(this.availableMemory.size() >= additionalSegments) {
          break;
        }
      }
    }
    if(this.availableMemory.size() < additionalSegments || this.closed.get()) {
      return false;
    } else {
      this.isResizing = true;
      // allocate new buckets
      final int startOffset = (this.numBuckets * HASH_BUCKET_SIZE) % this.segmentSize;
      MemorySegment[] newBuckets = new MemorySegment[additionalSegments];
      final int oldNumBuckets = this.numBuckets;
      final int oldNumSegments = this.buckets.length;
      MemorySegment[] mergedBuckets = new MemorySegment[newNumSegments];
      System.arraycopy(this.buckets, 0, mergedBuckets, 0, this.buckets.length);
      System.arraycopy(newBuckets, 0, mergedBuckets, this.buckets.length, newBuckets.length);
      this.buckets = mergedBuckets;
      this.numBuckets = newNumBuckets;
      // initialize all new buckets
      boolean oldSegment = (startOffset != 0);
      final int startSegment = oldSegment ? (oldNumSegments-1) : oldNumSegments;
      for (int i = startSegment, bucket = oldNumBuckets; i < newNumSegments && bucket < this.numBuckets; i++) {
        MemorySegment seg;
        int bucketOffset = 0;
        if(oldSegment) { // the first couple of new buckets may be located on an old segment
          seg = this.buckets[i];
          for (int k = (oldNumBuckets % bucketsPerSegment) ; k < bucketsPerSegment && bucket < this.numBuckets; k++, bucket++) {
            bucketOffset = k * HASH_BUCKET_SIZE; 
            // initialize the header fields
            seg.put(bucketOffset + HEADER_PARTITION_OFFSET, assignPartition(bucket, (byte)numPartitions));
            seg.putInt(bucketOffset + HEADER_COUNT_OFFSET, 0);
            seg.putLong(bucketOffset + HEADER_FORWARD_OFFSET, BUCKET_FORWARD_POINTER_NOT_SET);
          }
        } else {
          seg = getNextBuffer();
          // go over all buckets in the segment
          for (int k = 0; k < bucketsPerSegment && bucket < this.numBuckets; k++, bucket++) {
            bucketOffset = k * HASH_BUCKET_SIZE; 
            // initialize the header fields
            seg.put(bucketOffset + HEADER_PARTITION_OFFSET, assignPartition(bucket, (byte)numPartitions));
            seg.putInt(bucketOffset + HEADER_COUNT_OFFSET, 0);
            seg.putLong(bucketOffset + HEADER_FORWARD_OFFSET, BUCKET_FORWARD_POINTER_NOT_SET);
          }
        }       
        this.buckets[i] = seg;
        oldSegment = false; // we write on at most one old segment
      }
      int hashOffset = 0;
      int hash = 0;
      int pointerOffset = 0;
      long pointer = 0;
      IntArrayList hashList = new IntArrayList(NUM_ENTRIES_PER_BUCKET);
      LongArrayList pointerList = new LongArrayList(NUM_ENTRIES_PER_BUCKET);
      IntArrayList overflowHashes = new IntArrayList(64);
      LongArrayList overflowPointers = new LongArrayList(64);
      // go over all buckets and split them between old and new buckets
      for(int i = 0; i < numPartitions; i++) {
        InMemoryPartition<T> partition = this.partitions.get(i);
        final MemorySegment[] overflowSegments = partition.overflowSegments;
        int posHashCode = 0;
        for (int j = 0, bucket = i; j < this.buckets.length && bucket < oldNumBuckets; j++) {
          MemorySegment segment = this.buckets[j];
          // go over all buckets in the segment belonging to the partition
          for (int k = bucket % bucketsPerSegment; k < bucketsPerSegment && bucket < oldNumBuckets; k += numPartitions, bucket += numPartitions) {
            int bucketOffset = k * HASH_BUCKET_SIZE;
            if((int)segment.get(bucketOffset + HEADER_PARTITION_OFFSET) != i) {
              throw new IOException("Accessed wrong bucket! wanted: " + i + " got: " + segment.get(bucketOffset + HEADER_PARTITION_OFFSET));
            }
            // loop over all segments that are involved in the bucket (original bucket plus overflow buckets)
            int countInSegment = segment.getInt(bucketOffset + HEADER_COUNT_OFFSET);
            int numInSegment = 0;
            pointerOffset = bucketOffset + BUCKET_POINTER_START_OFFSET;
            hashOffset = bucketOffset + BUCKET_HEADER_LENGTH;
            while (true) {
              while (numInSegment < countInSegment) {
                hash = segment.getInt(hashOffset);
                if((hash % this.numBuckets) != bucket && (hash % this.numBuckets) != (bucket+oldNumBuckets)) {
                  throw new IOException("wanted: " + bucket + " or " + (bucket + oldNumBuckets) + " got: " + hash%this.numBuckets);
                }
                pointer = segment.getLong(pointerOffset);
                hashList.add(hash);
                pointerList.add(pointer);
                pointerOffset += POINTER_LEN;
                hashOffset += HASH_CODE_LEN;
                numInSegment++;
              }
              // this segment is done. check if there is another chained bucket
              final long forwardPointer = segment.getLong(bucketOffset + HEADER_FORWARD_OFFSET);
              if (forwardPointer == BUCKET_FORWARD_POINTER_NOT_SET) {
                break;
              }
              final int overflowSegNum = (int) (forwardPointer >>> 32);
              segment = overflowSegments[overflowSegNum];
              bucketOffset = (int)(forwardPointer & 0xffffffff);
              countInSegment = segment.getInt(bucketOffset + HEADER_COUNT_OFFSET);
              pointerOffset = bucketOffset + BUCKET_POINTER_START_OFFSET;
              hashOffset = bucketOffset + BUCKET_HEADER_LENGTH;
              numInSegment = 0;
            }
            segment = this.buckets[j];
            bucketOffset = k * HASH_BUCKET_SIZE;
            // reset bucket for re-insertion
            segment.putInt(bucketOffset + HEADER_COUNT_OFFSET, 0);
            segment.putLong(bucketOffset + HEADER_FORWARD_OFFSET, BUCKET_FORWARD_POINTER_NOT_SET);
            // refill table
            if(hashList.size() != pointerList.size()) {
              throw new IOException("Pointer and hash counts do not match. hashes: " + hashList.size() + " pointer: " + pointerList.size());
            }
            int newSegmentIndex = (bucket + oldNumBuckets) / bucketsPerSegment;
            MemorySegment newSegment = this.buckets[newSegmentIndex];
            // we need to avoid overflows in the first run
            int oldBucketCount = 0;
            int newBucketCount = 0;
            while(!hashList.isEmpty()) {
              hash = hashList.removeInt(hashList.size()-1);
              pointer = pointerList.removeLong(pointerList.size()-1);
              posHashCode = hash % this.numBuckets;
              if(posHashCode == bucket && oldBucketCount < NUM_ENTRIES_PER_BUCKET) {
                bucketOffset = (bucket % bucketsPerSegment) * HASH_BUCKET_SIZE;
                insertBucketEntryFromStart(partition, segment, bucketOffset, hash, pointer);
                oldBucketCount++;
              } else if(posHashCode == (bucket + oldNumBuckets) && newBucketCount < NUM_ENTRIES_PER_BUCKET) {
                bucketOffset = ((bucket + oldNumBuckets) % bucketsPerSegment) * HASH_BUCKET_SIZE;
                insertBucketEntryFromStart(partition, newSegment, bucketOffset, hash, pointer);
                newBucketCount++;
              } else if(posHashCode == (bucket + oldNumBuckets) || posHashCode == bucket) {
                overflowHashes.add(hash);
                overflowPointers.add(pointer);
              } else {
                throw new IOException("Accessed wrong bucket. Target: " + bucket + " or " + (bucket + oldNumBuckets) + " Hit: " + posHashCode);
              }
            }
            hashList.clear();
            pointerList.clear();
          }
        }
        // reset partition's overflow buckets and reclaim their memory
        this.availableMemory.addAll(partition.resetOverflowBuckets());
        // clear overflow lists
        int bucketArrayPos = 0;
        int bucketInSegmentPos = 0;
        MemorySegment bucket = null;
        while(!overflowHashes.isEmpty()) {
          hash = overflowHashes.removeInt(overflowHashes.size()-1);
          pointer = overflowPointers.removeLong(overflowPointers.size()-1);
          posHashCode = hash % this.numBuckets;
          bucketArrayPos = posHashCode >>> this.bucketsPerSegmentBits;
          bucketInSegmentPos = (posHashCode & this.bucketsPerSegmentMask) << NUM_INTRA_BUCKET_BITS;
          bucket = this.buckets[bucketArrayPos];
          insertBucketEntryFromStart(partition, bucket, bucketInSegmentPos, hash, pointer);
        }
        overflowHashes.clear();
        overflowPointers.clear();
      }
      this.isResizing = false;
      return true;
    }
  }
 
  /**
   * Compacts (garbage collects) partition with copy-compact strategy using compaction partition
   *
   * @param partitionNumber partition to compact
   * @throws IOException
   */
  private void compactPartition(final int partitionNumber) throws IOException {
    // do nothing if table was closed, parameter is invalid or no garbage exists
    if(this.closed.get() || partitionNumber >= this.partitions.size() || this.partitions.get(partitionNumber).isCompacted()) {
      return;
    }
    // release all segments owned by compaction partition
    this.compactionMemory.clearAllMemory(availableMemory);
    this.compactionMemory.allocateSegments(1);
    this.compactionMemory.pushDownPages();
    T tempHolder = this.buildSideSerializer.createInstance();
    final int numPartitions = this.partitions.size();
    InMemoryPartition<T> partition = this.partitions.remove(partitionNumber);
    MemorySegment[] overflowSegments = partition.overflowSegments;
    long pointer = 0L;
    int pointerOffset = 0;
    int bucketOffset = 0;
    final int bucketsPerSegment = this.bucketsPerSegmentMask + 1;
    for (int i = 0, bucket = partitionNumber; i < this.buckets.length && bucket < this.numBuckets; i++) {
      MemorySegment segment = this.buckets[i];
      // go over all buckets in the segment belonging to the partition
      for (int k = bucket % bucketsPerSegment; k < bucketsPerSegment && bucket < this.numBuckets; k += numPartitions, bucket += numPartitions) {
        bucketOffset = k * HASH_BUCKET_SIZE;
        if((int)segment.get(bucketOffset + HEADER_PARTITION_OFFSET) != partitionNumber) {
          throw new IOException("Accessed wrong bucket! wanted: " + partitionNumber + " got: " + segment.get(bucketOffset + HEADER_PARTITION_OFFSET));
        }
        // loop over all segments that are involved in the bucket (original bucket plus overflow buckets)
        int countInSegment = segment.getInt(bucketOffset + HEADER_COUNT_OFFSET);
        int numInSegment = 0;
        pointerOffset = bucketOffset + BUCKET_POINTER_START_OFFSET;
        while (true) {
          while (numInSegment < countInSegment) {
            pointer = segment.getLong(pointerOffset);
            tempHolder = partition.readRecordAt(pointer, tempHolder);
            pointer = this.compactionMemory.appendRecord(tempHolder);
            segment.putLong(pointerOffset, pointer);
            pointerOffset += POINTER_LEN;
            numInSegment++;
          }
          // this segment is done. check if there is another chained bucket
          final long forwardPointer = segment.getLong(bucketOffset + HEADER_FORWARD_OFFSET);
          if (forwardPointer == BUCKET_FORWARD_POINTER_NOT_SET) {
            break;
          }
          final int overflowSegNum = (int) (forwardPointer >>> 32);
          segment = overflowSegments[overflowSegNum];
          bucketOffset = (int)(forwardPointer & 0xffffffff);
          countInSegment = segment.getInt(bucketOffset + HEADER_COUNT_OFFSET);
          pointerOffset = bucketOffset + BUCKET_POINTER_START_OFFSET;
          numInSegment = 0;
        }
        segment = this.buckets[i];
      }
    }
    // swap partition with compaction partition
    this.compactionMemory.setPartitionNumber(partitionNumber);
    this.partitions.add(partitionNumber, compactionMemory);
    this.partitions.get(partitionNumber).overflowSegments = partition.overflowSegments;
    this.partitions.get(partitionNumber).numOverflowSegments = partition.numOverflowSegments;
    this.partitions.get(partitionNumber).nextOverflowBucket = partition.nextOverflowBucket;
    this.partitions.get(partitionNumber).setCompaction(true);
    //this.partitions.get(partitionNumber).pushDownPages();
    this.compactionMemory = partition;
    this.compactionMemory.resetRecordCounter();
    this.compactionMemory.setPartitionNumber(-1);
    this.compactionMemory.overflowSegments = null;
    this.compactionMemory.numOverflowSegments = 0;
    this.compactionMemory.nextOverflowBucket = 0;
    // try to allocate maximum segment count
    this.compactionMemory.clearAllMemory(this.availableMemory);
    int maxSegmentNumber = this.getMaxPartition();
    this.compactionMemory.allocateSegments(maxSegmentNumber);
    this.compactionMemory.resetRWViews();
    this.compactionMemory.pushDownPages();
  }
 
  /**
   * Compacts partition but may not reclaim all garbage
   *
   * @param partitionNumber partition number
   * @throws IOException
   */
  @SuppressWarnings("unused")
  private void fastCompactPartition(int partitionNumber) throws IOException {
    // stop if no garbage exists
    if(this.partitions.get(partitionNumber).isCompacted()) {
      return;
    }
    //TODO IMPLEMENT ME
    return;
  }
 
  /**
   * This function hashes an integer value. It is adapted from Bob Jenkins' website
   * <a href="http://www.burtleburtle.net/bob/hash/integer.html">http://www.burtleburtle.net/bob/hash/integer.html</a>.
   * The hash function has the <i>full avalanche</i> property, meaning that every bit of the value to be hashed
   * affects every bit of the hash value.
   *
   * @param code The integer to be hashed.
   * @return The hash code for the integer.
   */
  private static final int hash(int code) {
    code = (code + 0x7ed55d16) + (code << 12);
    code = (code ^ 0xc761c23c) ^ (code >>> 19);
    code = (code + 0x165667b1) + (code << 5);
    code = (code + 0xd3a2646c) ^ (code << 9);
    code = (code + 0xfd7046c5) + (code << 3);
    code = (code ^ 0xb55a4f09) ^ (code >>> 16);
    return code >= 0 ? code : -(code + 1);
  }
 
  /**
   * Iterator that traverses the whole hash table once
   *
   * If entries are inserted during iteration they may be overlooked by the iterator
   */
  public class EntryIterator implements MutableObjectIterator<T> {
   
    private CompactingHashTable<T> table;
   
    private ArrayList<T> cache; // holds full bucket including its overflow buckets
       
    private int currentBucketIndex = 0;
    private int currentSegmentIndex = 0;
    private int currentBucketOffset = 0;
    private int bucketsPerSegment;
   
    private boolean done;
   
    private EntryIterator(CompactingHashTable<T> compactingHashTable) {
      this.table = compactingHashTable;
      this.cache = new ArrayList<T>(64);
      this.done = false;
      this.bucketsPerSegment = table.bucketsPerSegmentMask + 1;
    }

    @Override
    public T next(T reuse) throws IOException {
      if(done || this.table.closed.get()) {
        return null;
      } else if(!cache.isEmpty()) {
        reuse = cache.remove(cache.size()-1);
        return reuse;
      } else {
        while(!done && cache.isEmpty()) {
          done = !fillCache();
        }
        if(!done) {
          reuse = cache.remove(cache.size()-1);
          return reuse;
        } else {
          return null;
        }
      }
    }

    /**
     * utility function that inserts all entries from a bucket and its overflow buckets into the cache
     *
     * @return true if last bucket was not reached yet
     * @throws IOException
     */
    private boolean fillCache() throws IOException {
      if(currentBucketIndex >= table.numBuckets) {
        return false;
      }
      MemorySegment bucket = table.buckets[currentSegmentIndex];
      // get the basic characteristics of the bucket
      final int partitionNumber = bucket.get(currentBucketOffset + HEADER_PARTITION_OFFSET);
      final InMemoryPartition<T> partition = table.partitions.get(partitionNumber);
      final MemorySegment[] overflowSegments = partition.overflowSegments;
     
      int countInSegment = bucket.getInt(currentBucketOffset + HEADER_COUNT_OFFSET);
      int numInSegment = 0;
      int posInSegment = currentBucketOffset + BUCKET_POINTER_START_OFFSET;
      int bucketOffset = currentBucketOffset;

      // loop over all segments that are involved in the bucket (original bucket plus overflow buckets)
      while (true) {
        while (numInSegment < countInSegment) {
          long pointer = bucket.getLong(posInSegment);
          posInSegment += POINTER_LEN;
          numInSegment++;
          T target = table.buildSideSerializer.createInstance();
          try {
            target = partition.readRecordAt(pointer, target);
            cache.add(target);
          } catch (IOException e) {
              throw new RuntimeException("Error deserializing record from the Hash Table: " + e.getMessage(), e);
          }
        }
        // this segment is done. check if there is another chained bucket
        final long forwardPointer = bucket.getLong(bucketOffset + HEADER_FORWARD_OFFSET);
        if (forwardPointer == BUCKET_FORWARD_POINTER_NOT_SET) {
          break;
        }
        final int overflowSegNum = (int) (forwardPointer >>> 32);
        bucket = overflowSegments[overflowSegNum];
        bucketOffset = (int)(forwardPointer & 0xffffffff);
        countInSegment = bucket.getInt(bucketOffset + HEADER_COUNT_OFFSET);
        posInSegment = bucketOffset + BUCKET_POINTER_START_OFFSET;
        numInSegment = 0;
      }
      currentBucketIndex++;
      if(currentBucketIndex % bucketsPerSegment == 0) {
        currentSegmentIndex++;
        currentBucketOffset = 0;
      } else {
        currentBucketOffset += HASH_BUCKET_SIZE;
      }
      return true;
    }
   
  }
 
  public final class HashTableProber<PT> extends AbstractHashTableProber<PT, T>{
   
    private InMemoryPartition<T> partition;
   
    private MemorySegment bucket;
   
    private int pointerOffsetInBucket;
   
   
    private HashTableProber(TypeComparator<PT> probeTypeComparator, TypePairComparator<PT, T> pairComparator)
    {
      super(probeTypeComparator, pairComparator);
    }
   
    public T getMatchFor(PT probeSideRecord, T targetForMatch) {
      if(closed.get()) {
        return null;
      }
      final int searchHashCode = hash(this.probeTypeComparator.hash(probeSideRecord));
     
      final int posHashCode = searchHashCode % numBuckets;
     
      // get the bucket for the given hash code
      MemorySegment bucket = buckets[posHashCode >> bucketsPerSegmentBits];
      int bucketInSegmentOffset = (posHashCode & bucketsPerSegmentMask) << NUM_INTRA_BUCKET_BITS;
     
      // get the basic characteristics of the bucket
      final int partitionNumber = bucket.get(bucketInSegmentOffset + HEADER_PARTITION_OFFSET);
      final InMemoryPartition<T> p = partitions.get(partitionNumber);
      final MemorySegment[] overflowSegments = p.overflowSegments;
     
      this.pairComparator.setReference(probeSideRecord);
     
      int countInSegment = bucket.getInt(bucketInSegmentOffset + HEADER_COUNT_OFFSET);
      int numInSegment = 0;
      int posInSegment = bucketInSegmentOffset + BUCKET_HEADER_LENGTH;

      // loop over all segments that are involved in the bucket (original bucket plus overflow buckets)
      while (true) {
       
        while (numInSegment < countInSegment) {
         
          final int thisCode = bucket.getInt(posInSegment);
          posInSegment += HASH_CODE_LEN;
           
          // check if the hash code matches
          if (thisCode == searchHashCode) {
            // get the pointer to the pair
            final int pointerOffset = bucketInSegmentOffset + BUCKET_POINTER_START_OFFSET + (numInSegment * POINTER_LEN);
            final long pointer = bucket.getLong(pointerOffset);
            numInSegment++;
           
            // deserialize the key to check whether it is really equal, or whether we had only a hash collision
            try {
              targetForMatch = p.readRecordAt(pointer, targetForMatch);
             
              if (this.pairComparator.equalToReference(targetForMatch)) {
                this.partition = p;
                this.bucket = bucket;
                this.pointerOffsetInBucket = pointerOffset;
                return targetForMatch;
              }
            }
            catch (IOException e) {
              throw new RuntimeException("Error deserializing record from the hashtable: " + e.getMessage(), e);
            }
          }
          else {
            numInSegment++;
          }
        }
       
        // this segment is done. check if there is another chained bucket
        final long forwardPointer = bucket.getLong(bucketInSegmentOffset + HEADER_FORWARD_OFFSET);
        if (forwardPointer == BUCKET_FORWARD_POINTER_NOT_SET) {
          return null;
        }
       
        final int overflowSegNum = (int) (forwardPointer >>> 32);
        bucket = overflowSegments[overflowSegNum];
        bucketInSegmentOffset = (int) (forwardPointer & 0xffffffff);
        countInSegment = bucket.getInt(bucketInSegmentOffset + HEADER_COUNT_OFFSET);
        posInSegment = bucketInSegmentOffset + BUCKET_HEADER_LENGTH;
        numInSegment = 0;
      }
    }
   
    public void updateMatch(T record) throws IOException {
      if(closed.get()) {
        return;
      }
      long newPointer;
      try {
        newPointer = this.partition.appendRecord(record);
      } catch (EOFException e) {
        // system is out of memory so we attempt to reclaim memory with a copy compact run
        try {
          int partitionNumber = this.partition.getPartitionNumber();
          compactPartition(partitionNumber);
          // retry append
          this.partition = partitions.get(partitionNumber);
          newPointer = this.partition.appendRecord(record);
        } catch (EOFException ex) {
          throw new RuntimeException("Memory ran out. Compaction failed. " +
                        getMemoryConsumptionString() +
                        " Message: " + ex.getMessage());
        } catch (IndexOutOfBoundsException ex) {
          throw new RuntimeException("Memory ran out. Compaction failed. " +
                        getMemoryConsumptionString() +
                        " Message: " + ex.getMessage());
        }
      } catch (IndexOutOfBoundsException e) {
        // system is out of memory so we attempt to reclaim memory with a copy compact run
        try {
          int partitionNumber = this.partition.getPartitionNumber();
          compactPartition(partitionNumber);
          // retry append
          this.partition = partitions.get(partitionNumber);
          newPointer = this.partition.appendRecord(record);
        } catch (EOFException ex) {
          throw new RuntimeException("Memory ran out. Compaction failed. " +
                        getMemoryConsumptionString() +
                        " Message: " + ex.getMessage());
        } catch (IndexOutOfBoundsException ex) {
          throw new RuntimeException("Memory ran out. Compaction failed. " +
                        getMemoryConsumptionString() +
                        " Message: " + ex.getMessage());
        }
      }
      this.bucket.putLong(this.pointerOffsetInBucket, newPointer);
      this.partition.setCompaction(false);
    }
  }
}
TOP

Related Classes of org.apache.flink.runtime.operators.hash.CompactingHashTable

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.