Package org.apache.blur.manager

Source Code of org.apache.blur.manager.IndexManager

package org.apache.blur.manager;

/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements.  See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import static org.apache.blur.metrics.MetricsConstants.BLUR;
import static org.apache.blur.metrics.MetricsConstants.ORG_APACHE_BLUR;
import static org.apache.blur.utils.BlurConstants.FAMILY;
import static org.apache.blur.utils.BlurConstants.PRIME_DOC;
import static org.apache.blur.utils.BlurConstants.RECORD_ID;
import static org.apache.blur.utils.BlurConstants.ROW_ID;
import static org.apache.blur.utils.RowDocumentUtil.getRecord;
import static org.apache.blur.utils.RowDocumentUtil.getRow;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLongArray;

import org.apache.blur.analysis.FieldManager;
import org.apache.blur.concurrent.Executors;
import org.apache.blur.index.ExitableReader;
import org.apache.blur.index.ExitableReader.ExitingReaderException;
import org.apache.blur.log.Log;
import org.apache.blur.log.LogFactory;
import org.apache.blur.lucene.search.DeepPagingCache;
import org.apache.blur.lucene.search.FacetExecutor;
import org.apache.blur.lucene.search.FacetQuery;
import org.apache.blur.lucene.search.StopExecutionCollector.StopExecutionCollectorException;
import org.apache.blur.manager.clusterstatus.ClusterStatus;
import org.apache.blur.manager.results.BlurResultIterable;
import org.apache.blur.manager.results.BlurResultIterableSearcher;
import org.apache.blur.manager.results.MergerBlurResultIterable;
import org.apache.blur.manager.status.QueryStatus;
import org.apache.blur.manager.status.QueryStatusManager;
import org.apache.blur.manager.writer.BlurIndex;
import org.apache.blur.manager.writer.MutatableAction;
import org.apache.blur.server.IndexSearcherClosable;
import org.apache.blur.server.ShardContext;
import org.apache.blur.server.ShardServerContext;
import org.apache.blur.server.TableContext;
import org.apache.blur.thrift.BException;
import org.apache.blur.thrift.MutationHelper;
import org.apache.blur.thrift.UserConverter;
import org.apache.blur.thrift.generated.BlurException;
import org.apache.blur.thrift.generated.BlurQuery;
import org.apache.blur.thrift.generated.BlurQueryStatus;
import org.apache.blur.thrift.generated.BlurResult;
import org.apache.blur.thrift.generated.ErrorType;
import org.apache.blur.thrift.generated.Facet;
import org.apache.blur.thrift.generated.FetchResult;
import org.apache.blur.thrift.generated.FetchRowResult;
import org.apache.blur.thrift.generated.HighlightOptions;
import org.apache.blur.thrift.generated.QueryState;
import org.apache.blur.thrift.generated.Row;
import org.apache.blur.thrift.generated.RowMutation;
import org.apache.blur.thrift.generated.ScoreType;
import org.apache.blur.thrift.generated.Selector;
import org.apache.blur.trace.Trace;
import org.apache.blur.trace.Tracer;
import org.apache.blur.user.User;
import org.apache.blur.user.UserContext;
import org.apache.blur.utils.BlurConstants;
import org.apache.blur.utils.BlurExecutorCompletionService;
import org.apache.blur.utils.BlurExecutorCompletionService.Cancel;
import org.apache.blur.utils.BlurIterator;
import org.apache.blur.utils.BlurUtil;
import org.apache.blur.utils.ForkJoin;
import org.apache.blur.utils.ForkJoin.Merger;
import org.apache.blur.utils.ForkJoin.ParallelCall;
import org.apache.blur.utils.HighlightHelper;
import org.apache.blur.utils.ResetableDocumentStoredFieldVisitor;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.AtomicReader;
import org.apache.lucene.index.BaseCompositeReader;
import org.apache.lucene.index.BaseCompositeReaderUtil;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.FieldInfo;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.MultiFields;
import org.apache.lucene.index.SegmentReader;
import org.apache.lucene.index.StoredFieldVisitor;
import org.apache.lucene.index.Term;
import org.apache.lucene.index.Terms;
import org.apache.lucene.index.TermsEnum;
import org.apache.lucene.queries.BooleanFilter;
import org.apache.lucene.queryparser.classic.ParseException;
import org.apache.lucene.search.BooleanClause.Occur;
import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.DocIdSet;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.Filter;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.SortField;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.highlight.InvalidTokenOffsetsException;
import org.apache.lucene.search.similarities.Similarity;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef;

import com.yammer.metrics.Metrics;
import com.yammer.metrics.core.Meter;
import com.yammer.metrics.core.MetricName;
import com.yammer.metrics.core.Timer;
import com.yammer.metrics.core.TimerContext;

public class IndexManager {

  public static final String NOT_FOUND = "NOT_FOUND";
  private static final Log LOG = LogFactory.getLog(IndexManager.class);

  private static final Meter _readRecordsMeter;
  private static final Meter _readRowMeter;

  static {
    MetricName metricName1 = new MetricName(ORG_APACHE_BLUR, BLUR, "Read Records/s");
    MetricName metricName2 = new MetricName(ORG_APACHE_BLUR, BLUR, "Read Row/s");
    _readRecordsMeter = Metrics.newMeter(metricName1, "Records/s", TimeUnit.SECONDS);
    _readRowMeter = Metrics.newMeter(metricName2, "Row/s", TimeUnit.SECONDS);
  }

  private final Meter _queriesExternalMeter;
  private final Meter _queriesInternalMeter;

  private final IndexServer _indexServer;
  private final ClusterStatus _clusterStatus;
  private final ExecutorService _executor;
  private final ExecutorService _facetExecutor;
  private final ExecutorService _mutateExecutor;

  private final QueryStatusManager _statusManager = new QueryStatusManager();
  private final AtomicBoolean _closed = new AtomicBoolean(false);
  private final BlurPartitioner _blurPartitioner = new BlurPartitioner();
  private final BlurFilterCache _filterCache;
  private final long _defaultParallelCallTimeout = TimeUnit.MINUTES.toMillis(1);

  private final Timer _fetchTimer;
  private final int _fetchCount;
  private final int _maxHeapPerRowFetch;

  private final int _threadCount;
  private final int _mutateThreadCount;
  private final DeepPagingCache _deepPagingCache;

  public static AtomicBoolean DEBUG_RUN_SLOW = new AtomicBoolean(false);

  public IndexManager(IndexServer indexServer, ClusterStatus clusterStatus, BlurFilterCache filterCache,
      int maxHeapPerRowFetch, int fetchCount, int threadCount, int mutateThreadCount, long statusCleanupTimerDelay,
      int facetThreadCount, DeepPagingCache deepPagingCache) {
    _deepPagingCache = deepPagingCache;
    _indexServer = indexServer;
    _clusterStatus = clusterStatus;
    _filterCache = filterCache;

    MetricName metricName1 = new MetricName(ORG_APACHE_BLUR, BLUR, "External Queries/s");
    MetricName metricName2 = new MetricName(ORG_APACHE_BLUR, BLUR, "Internal Queries/s");
    MetricName metricName3 = new MetricName(ORG_APACHE_BLUR, BLUR, "Fetch Timer");

    _queriesExternalMeter = Metrics.newMeter(metricName1, "External Queries/s", TimeUnit.SECONDS);
    _queriesInternalMeter = Metrics.newMeter(metricName2, "Internal Queries/s", TimeUnit.SECONDS);
    _fetchTimer = Metrics.newTimer(metricName3, TimeUnit.MICROSECONDS, TimeUnit.SECONDS);

    if (threadCount == 0) {
      throw new RuntimeException("Thread Count cannot be 0.");
    }
    _threadCount = threadCount;
    if (mutateThreadCount == 0) {
      throw new RuntimeException("Mutate Thread Count cannot be 0.");
    }
    _mutateThreadCount = mutateThreadCount;
    _fetchCount = fetchCount;
    _maxHeapPerRowFetch = maxHeapPerRowFetch;

    _executor = Executors.newThreadPool("index-manager", _threadCount);
    _mutateExecutor = Executors.newThreadPool("index-manager-mutate", _mutateThreadCount);
    if (facetThreadCount < 1) {
      _facetExecutor = null;
    } else {
      _facetExecutor = Executors.newThreadPool(new SynchronousQueue<Runnable>(), "facet-execution", facetThreadCount);
    }

    _statusManager.setStatusCleanupTimerDelay(statusCleanupTimerDelay);
    _statusManager.init();
    LOG.info("Init Complete");

  }

  public synchronized void close() {
    if (!_closed.get()) {
      _closed.set(true);
      _statusManager.close();
      _executor.shutdownNow();
      _mutateExecutor.shutdownNow();
      if (_facetExecutor != null) {
        _facetExecutor.shutdownNow();
      }
      try {
        _indexServer.close();
      } catch (IOException e) {
        LOG.error("Unknown error while trying to close the index server", e);
      }
    }
  }

  public List<FetchResult> fetchRowBatch(final String table, List<Selector> selectors) throws BlurException {
    List<Future<FetchResult>> futures = new ArrayList<Future<FetchResult>>();
    for (Selector s : selectors) {
      final Selector selector = s;
      futures.add(_executor.submit(new Callable<FetchResult>() {
        @Override
        public FetchResult call() throws Exception {
          FetchResult fetchResult = new FetchResult();
          fetchRow(table, selector, fetchResult);
          return fetchResult;
        }
      }));
    }
    List<FetchResult> results = new ArrayList<FetchResult>();
    for (Future<FetchResult> future : futures) {
      try {
        results.add(future.get());
      } catch (InterruptedException e) {
        throw new BException("Unkown error while fetching batch table [{0}] selectors [{1}].", e, table, selectors);
      } catch (ExecutionException e) {
        throw new BException("Unkown error while fetching batch table [{0}] selectors [{1}].", e.getCause(), table,
            selectors);
      }
    }
    return results;
  }

  public void fetchRow(String table, Selector selector, FetchResult fetchResult) throws BlurException {
    validSelector(selector);
    TableContext tableContext = getTableContext(table);
    ReadInterceptor interceptor = tableContext.getReadInterceptor();
    Filter filter = interceptor.getFilter();
    BlurIndex index = null;
    String shard = null;
    Tracer trace = Trace.trace("manager fetch", Trace.param("table", table));
    IndexSearcherClosable searcher = null;
    try {
      if (selector.getLocationId() == null) {
        // Not looking up by location id so we should resetSearchers.
        ShardServerContext.resetSearchers();
        shard = MutationHelper.getShardName(table, selector.rowId, getNumberOfShards(table), _blurPartitioner);
        index = getBlurIndex(table, shard);
        searcher = index.getIndexSearcher();
        populateSelector(searcher, shard, table, selector);
      }
      String locationId = selector.getLocationId();
      if (locationId.equals(NOT_FOUND)) {
        fetchResult.setDeleted(false);
        fetchResult.setExists(false);
        return;
      }
      if (shard == null) {
        shard = getShard(locationId);
      }
      if (index == null) {
        index = getBlurIndex(table, shard);
      }
    } catch (BlurException e) {
      throw e;
    } catch (Exception e) {
      LOG.error("Unknown error while trying to get the correct index reader for selector [{0}].", e, selector);
      throw new BException(e.getMessage(), e);
    }
    TimerContext timerContext = _fetchTimer.time();
    boolean usedCache = true;
    try {
      ShardServerContext shardServerContext = ShardServerContext.getShardServerContext();
      if (shardServerContext != null) {
        searcher = shardServerContext.getIndexSearcherClosable(table, shard);
      }
      if (searcher == null) {
        // Was not pulled from cache, get a fresh one from the index.
        searcher = index.getIndexSearcher();
        usedCache = false;
      }
      FieldManager fieldManager = tableContext.getFieldManager();

      Query highlightQuery = getHighlightQuery(selector, table, fieldManager);

      fetchRow(searcher.getIndexReader(), table, shard, selector, fetchResult, highlightQuery, fieldManager,
          _maxHeapPerRowFetch, tableContext, filter);
    } catch (Exception e) {
      LOG.error("Unknown error while trying to fetch row.", e);
      throw new BException(e.getMessage(), e);
    } finally {
      trace.done();
      timerContext.stop();
      if (!usedCache && searcher != null) {
        // if the cached search was not used, close the searcher.
        // this will allow for closing of index
        try {
          searcher.close();
        } catch (IOException e) {
          LOG.error("Unknown error trying to call close on searcher [{0}]", e, searcher);
        }
      }
    }
  }

  private BlurIndex getBlurIndex(String table, String shard) throws BException, IOException {
    Map<String, BlurIndex> blurIndexes = _indexServer.getIndexes(table);
    if (blurIndexes == null) {
      LOG.error("Table [{0}] not found", table);
      // @TODO probably should make a enum for not found on this server so the
      // controller knows to try another server.
      throw new BException("Table [" + table + "] not found");
    }
    BlurIndex index = blurIndexes.get(shard);
    if (index == null) {
      LOG.error("Shard [{0}] not found in table [{1}]", shard, table);
      // @TODO probably should make a enum for not found on this server so the
      // controller knows to try another server.
      throw new BException("Shard [" + shard + "] not found in table [" + table + "]");
    }
    return index;
  }

  private Query getHighlightQuery(Selector selector, String table, FieldManager fieldManager) throws ParseException,
      BlurException {
    HighlightOptions highlightOptions = selector.getHighlightOptions();
    if (highlightOptions == null) {
      return null;
    }
    org.apache.blur.thrift.generated.Query query = highlightOptions.getQuery();
    if (query == null) {
      return null;
    }

    TableContext context = getTableContext(table);
    Filter preFilter = QueryParserUtil.parseFilter(table, query.recordFilter, false, fieldManager, _filterCache,
        context);
    Filter postFilter = QueryParserUtil.parseFilter(table, query.rowFilter, true, fieldManager, _filterCache, context);
    return QueryParserUtil.parseQuery(query.query, query.rowQuery, fieldManager, postFilter, preFilter,
        getScoreType(query.scoreType), context);
  }

  public static void populateSelector(IndexSearcherClosable searcher, String shardName, String table, Selector selector)
      throws IOException {
    Tracer trace = Trace.trace("populate selector");
    String rowId = selector.rowId;
    String recordId = selector.recordId;
    try {
      BooleanQuery query = new BooleanQuery();
      if (selector.recordOnly) {
        query.add(new TermQuery(new Term(RECORD_ID, recordId)), Occur.MUST);
        query.add(new TermQuery(new Term(ROW_ID, rowId)), Occur.MUST);
      } else {
        query.add(new TermQuery(new Term(ROW_ID, rowId)), Occur.MUST);
        query.add(new TermQuery(BlurUtil.PRIME_DOC_TERM), Occur.MUST);
      }
      TopDocs topDocs = searcher.search(query, 1);
      if (topDocs.totalHits > 1) {
        if (selector.recordOnly) {
          LOG.warn("Rowid [" + rowId + "], recordId [" + recordId
              + "] has more than one prime doc that is not deleted.");
        } else {
          LOG.warn("Rowid [" + rowId + "] has more than one prime doc that is not deleted.");
        }
      }
      if (topDocs.totalHits == 1) {
        selector.setLocationId(shardName + "/" + topDocs.scoreDocs[0].doc);
      } else {
        selector.setLocationId(NOT_FOUND);
      }
    } finally {
      trace.done();
    }
  }

  public static void validSelector(Selector selector) throws BlurException {
    String locationId = selector.locationId;
    String rowId = selector.rowId;
    String recordId = selector.recordId;
    boolean recordOnly = selector.recordOnly;

    if (locationId != null) {
      if (recordId != null && rowId != null) {
        throw new BException("Invalid selector locationId [" + locationId + "] and recordId [" + recordId
            + "] and rowId [" + rowId + "] are set, if using locationId, then rowId and recordId are not needed.");
      } else if (recordId != null) {
        throw new BException("Invalid selector locationId [" + locationId + "] and recordId [" + recordId
            + "] sre set, if using locationId recordId is not needed.");
      } else if (rowId != null) {
        throw new BException("Invalid selector locationId [" + locationId + "] and rowId [" + rowId
            + "] are set, if using locationId rowId is not needed.");
      }
    } else {
      if (rowId != null && recordId != null) {
        if (!recordOnly) {
          throw new BException("Invalid both rowid [" + rowId + "] and recordId [" + recordId
              + "] are set, and recordOnly is set to [false].  "
              + "If you want entire row, then remove recordId, if you want record only set recordOnly to [true].");
        }
      } else if (recordId != null) {
        throw new BException("Invalid recordId [" + recordId
            + "] is set but rowId is not set.  If rowId is not known then a query will be required.");
      }
    }
  }

  /**
   * Location id format is <shard>/luceneid.
   *
   * @param locationId
   * @return
   */
  private String getShard(String locationId) {
    String[] split = locationId.split("\\/");
    if (split.length != 2) {
      throw new IllegalArgumentException("Location id invalid [" + locationId + "]");
    }
    return split[0];
  }

  public BlurResultIterable query(final String table, final BlurQuery blurQuery, AtomicLongArray facetedCounts)
      throws Exception {
    boolean runSlow = DEBUG_RUN_SLOW.get();
    final AtomicBoolean running = new AtomicBoolean(true);
    User user = UserContext.getUser();
    final QueryStatus status = _statusManager.newQueryStatus(table, blurQuery, _threadCount, running,
        UserConverter.toThriftUser(user));
    _queriesExternalMeter.mark();
    try {
      Map<String, BlurIndex> blurIndexes;
      try {
        blurIndexes = _indexServer.getIndexes(table);
      } catch (IOException e) {
        LOG.error("Unknown error while trying to fetch index readers.", e);
        throw new BException(e.getMessage(), e);
      }
      String rowId = blurQuery.getRowId();
      if (rowId != null) {
        // reduce the index selection down to the only one that would contain
        // the row.
        Map<String, BlurIndex> map = new HashMap<String, BlurIndex>();
        String shard = MutationHelper.getShardName(table, rowId, getNumberOfShards(table), _blurPartitioner);
        BlurIndex index = getBlurIndex(table, shard);
        map.put(shard, index);
        blurIndexes = map;
      }
      Tracer trace = Trace.trace("query setup", Trace.param("table", table));
      ShardServerContext shardServerContext = ShardServerContext.getShardServerContext();
      ParallelCall<Entry<String, BlurIndex>, BlurResultIterable> call;
      TableContext context = getTableContext(table);
      FieldManager fieldManager = context.getFieldManager();
      org.apache.blur.thrift.generated.Query simpleQuery = blurQuery.query;
      ReadInterceptor interceptor = context.getReadInterceptor();
      Filter readFilter = interceptor.getFilter();
      if (rowId != null) {
        if (simpleQuery.recordFilter == null) {
          simpleQuery.recordFilter = "+" + BlurConstants.ROW_ID + ":" + rowId;
        } else {
          simpleQuery.recordFilter = "+" + BlurConstants.ROW_ID + ":" + rowId + " +(" + simpleQuery.recordFilter + ")";
        }
      }
      Filter recordFilterForSearch = QueryParserUtil.parseFilter(table, simpleQuery.recordFilter, false, fieldManager,
          _filterCache, context);
      Filter rowFilterForSearch = QueryParserUtil.parseFilter(table, simpleQuery.rowFilter, true, fieldManager,
          _filterCache, context);
      Filter docFilter;
      if (recordFilterForSearch == null && readFilter != null) {
        docFilter = readFilter;
      } else if (recordFilterForSearch != null && readFilter == null) {
        docFilter = recordFilterForSearch;
      } else if (recordFilterForSearch != null && readFilter != null) {
        // @TODO dangerous call because of the bitsets that booleanfilter
        // creates.
        BooleanFilter booleanFilter = new BooleanFilter();
        booleanFilter.add(recordFilterForSearch, Occur.MUST);
        booleanFilter.add(readFilter, Occur.MUST);
        docFilter = booleanFilter;
      } else {
        docFilter = null;
      }
      Query userQuery = QueryParserUtil.parseQuery(simpleQuery.query, simpleQuery.rowQuery, fieldManager,
          rowFilterForSearch, docFilter, getScoreType(simpleQuery.scoreType), context);

      Query facetedQuery;
      FacetExecutor executor = null;
      if (blurQuery.facets != null) {
        long[] facetMinimums = getFacetMinimums(blurQuery.facets);
        executor = new FacetExecutor(blurQuery.facets.size(), facetMinimums, facetedCounts, running);
        facetedQuery = new FacetQuery(userQuery, getFacetQueries(blurQuery, fieldManager, context, rowFilterForSearch,
            recordFilterForSearch), executor);
      } else {
        facetedQuery = userQuery;
      }

      Sort sort = getSort(blurQuery, fieldManager);
      call = new SimpleQueryParallelCall(running, table, status, facetedQuery, blurQuery.selector,
          _queriesInternalMeter, shardServerContext, runSlow, _fetchCount, _maxHeapPerRowFetch,
          context.getSimilarity(), context, sort, _deepPagingCache);
      trace.done();
      MergerBlurResultIterable merger = new MergerBlurResultIterable(blurQuery);
      BlurResultIterable merge = ForkJoin.execute(_executor, blurIndexes.entrySet(), call, new Cancel() {
        @Override
        public void cancel() {
          running.set(false);
        }
      }).merge(merger);

      if (executor != null) {
        executor.processFacets(_facetExecutor);
      }
      return fetchDataIfNeeded(merge, table, blurQuery.getSelector());
    } catch (StopExecutionCollectorException e) {
      BlurQueryStatus queryStatus = status.getQueryStatus();
      QueryState state = queryStatus.getState();
      if (state == QueryState.BACK_PRESSURE_INTERRUPTED) {
        throw new BlurException("Cannot execute query right now.", null, ErrorType.BACK_PRESSURE);
      } else if (state == QueryState.INTERRUPTED) {
        throw new BlurException("Cannot execute query right now.", null, ErrorType.QUERY_CANCEL);
      }
      throw e;
    } catch (ExitingReaderException e) {
      BlurQueryStatus queryStatus = status.getQueryStatus();
      QueryState state = queryStatus.getState();
      if (state == QueryState.BACK_PRESSURE_INTERRUPTED) {
        throw new BlurException("Cannot execute query right now.", null, ErrorType.BACK_PRESSURE);
      } else if (state == QueryState.INTERRUPTED) {
        throw new BlurException("Cannot execute query right now.", null, ErrorType.QUERY_CANCEL);
      }
      throw e;
    } finally {
      _statusManager.removeStatus(status);
    }
  }

  private Sort getSort(BlurQuery blurQuery, FieldManager fieldManager) throws IOException, BlurException {
    List<org.apache.blur.thrift.generated.SortField> sortFields = blurQuery.getSortFields();
    if (sortFields == null || sortFields.isEmpty()) {
      return null;
    }
    SortField[] fields = new SortField[sortFields.size()];
    int i = 0;
    for (org.apache.blur.thrift.generated.SortField sortField : sortFields) {
      if (sortField == null) {
        throw new BException("Sortfields [{0}] can not contain a null.", sortFields);
      }
      String fieldName = getFieldName(sortField);
      SortField field = fieldManager.getSortField(fieldName, sortField.reverse);
      fields[i++] = field;
    }
    return new Sort(fields);
  }

  private String getFieldName(org.apache.blur.thrift.generated.SortField sortField) throws BlurException {
    String family = sortField.getFamily();
    if (family == null) {
      family = BlurConstants.DEFAULT_FAMILY;
    }
    String column = sortField.getColumn();
    if (column == null) {
      throw new BException("Column in sortfield [{0}] can not be null.", sortField);
    }
    return family + "." + column;
  }

  private BlurResultIterable fetchDataIfNeeded(final BlurResultIterable iterable, final String table,
      final Selector selector) {
    if (selector == null) {
      return iterable;
    }
    return new BlurResultIterable() {

      @Override
      public BlurIterator<BlurResult, BlurException> iterator() throws BlurException {
        final BlurIterator<BlurResult, BlurException> iterator = iterable.iterator();
        return new BlurIterator<BlurResult, BlurException>() {

          @Override
          public BlurResult next() throws BlurException {
            BlurResult result = iterator.next();
            String locationId = result.getLocationId();
            FetchResult fetchResult = new FetchResult();
            Selector s = new Selector(selector);
            s.setLocationId(locationId);
            fetchRow(table, s, fetchResult);
            result.setFetchResult(fetchResult);
            return result;
          }

          @Override
          public boolean hasNext() throws BlurException {
            return iterator.hasNext();
          }

          @Override
          public long getPosition() throws BlurException {
            return iterator.getPosition();
          }
        };
      }

      @Override
      public void close() throws IOException {
        iterable.close();
      }

      @Override
      public void skipTo(long skipTo) {
        iterable.skipTo(skipTo);
      }

      @Override
      public long getTotalResults() {
        return iterable.getTotalResults();
      }

      @Override
      public Map<String, Long> getShardInfo() {
        return iterable.getShardInfo();
      }
    };
  }

  private long[] getFacetMinimums(List<Facet> facets) {
    long[] mins = new long[facets.size()];
    boolean smallerThanMaxLong = false;
    for (int i = 0; i < facets.size(); i++) {
      Facet facet = facets.get(i);
      if (facet != null) {
        long minimumNumberOfBlurResults = facet.getMinimumNumberOfBlurResults();
        mins[i] = minimumNumberOfBlurResults;
        if (minimumNumberOfBlurResults < Long.MAX_VALUE) {
          smallerThanMaxLong = true;
        }
      }
    }
    if (smallerThanMaxLong) {
      return mins;
    }
    return null;
  }

  public String parseQuery(String table, org.apache.blur.thrift.generated.Query simpleQuery) throws ParseException,
      BlurException {
    TableContext context = getTableContext(table);
    FieldManager fieldManager = context.getFieldManager();
    Filter preFilter = QueryParserUtil.parseFilter(table, simpleQuery.recordFilter, false, fieldManager, _filterCache,
        context);
    Filter postFilter = QueryParserUtil.parseFilter(table, simpleQuery.rowFilter, true, fieldManager, _filterCache,
        context);
    Query userQuery = QueryParserUtil.parseQuery(simpleQuery.query, simpleQuery.rowQuery, fieldManager, postFilter,
        preFilter, getScoreType(simpleQuery.scoreType), context);
    return userQuery.toString();
  }

  private TableContext getTableContext(final String table) {
    return TableContext.create(_clusterStatus.getTableDescriptor(true, _clusterStatus.getCluster(true, table), table));
  }

  private Query[] getFacetQueries(BlurQuery blurQuery, FieldManager fieldManager, TableContext context,
      Filter postFilter, Filter preFilter) throws ParseException {
    int size = blurQuery.facets.size();
    Query[] queries = new Query[size];
    for (int i = 0; i < size; i++) {
      queries[i] = QueryParserUtil.parseQuery(blurQuery.facets.get(i).queryStr, blurQuery.query.rowQuery, fieldManager,
          postFilter, preFilter, ScoreType.CONSTANT, context);
    }
    return queries;
  }

  private ScoreType getScoreType(ScoreType type) {
    if (type == null) {
      return ScoreType.SUPER;
    }
    return type;
  }

  public void cancelQuery(String table, String uuid) {
    _statusManager.cancelQuery(table, uuid);
  }

  public List<BlurQueryStatus> currentQueries(String table) {
    return _statusManager.currentQueries(table);
  }

  public BlurQueryStatus queryStatus(String table, String uuid) {
    return _statusManager.queryStatus(table, uuid);
  }

  public List<String> queryStatusIdList(String table) {
    return _statusManager.queryStatusIdList(table);
  }

  public static void fetchRow(IndexReader reader, String table, String shard, Selector selector,
      FetchResult fetchResult, Query highlightQuery, int maxHeap, TableContext tableContext, Filter filter)
      throws CorruptIndexException, IOException {
    fetchRow(reader, table, shard, selector, fetchResult, highlightQuery, null, maxHeap, tableContext, filter);
  }

  public static void fetchRow(IndexReader reader, String table, String shard, Selector selector,
      FetchResult fetchResult, Query highlightQuery, FieldManager fieldManager, int maxHeap, TableContext tableContext,
      Filter filter) throws CorruptIndexException, IOException {
    try {
      fetchResult.table = table;
      String locationId = selector.locationId;
      int lastSlash = locationId.lastIndexOf('/');
      int docId = Integer.parseInt(locationId.substring(lastSlash + 1));
      if (docId >= reader.maxDoc()) {
        throw new RuntimeException("Location id [" + locationId + "] with docId [" + docId + "] is not valid.");
      }

      boolean returnIdsOnly = false;
      if (selector.columnFamiliesToFetch != null && selector.columnsToFetch != null
          && selector.columnFamiliesToFetch.isEmpty() && selector.columnsToFetch.isEmpty()) {
        // exit early
        returnIdsOnly = true;
      }

      Tracer t1 = Trace.trace("fetchRow - live docs");
      Bits liveDocs = MultiFields.getLiveDocs(reader);
      t1.done();
      ResetableDocumentStoredFieldVisitor fieldVisitor = getFieldSelector(selector);
      if (selector.isRecordOnly()) {
        // select only the row for the given data or location id.
        if (isFiltered(docId, reader, filter)) {
          fetchResult.exists = false;
          fetchResult.deleted = false;
          return;
        } else if (liveDocs != null && !liveDocs.get(docId)) {
          fetchResult.exists = false;
          fetchResult.deleted = true;
          return;
        } else {
          fetchResult.exists = true;
          fetchResult.deleted = false;
          reader.document(docId, fieldVisitor);
          Document document = fieldVisitor.getDocument();
          if (highlightQuery != null && fieldManager != null) {
            HighlightOptions highlightOptions = selector.getHighlightOptions();
            String preTag = highlightOptions.getPreTag();
            String postTag = highlightOptions.getPostTag();
            try {
              document = HighlightHelper.highlight(docId, document, highlightQuery, fieldManager, reader, preTag,
                  postTag);
            } catch (InvalidTokenOffsetsException e) {
              LOG.error("Unknown error while tring to highlight", e);
            }
          }
          fieldVisitor.reset();
          fetchResult.recordResult = getRecord(document);
          return;
        }
      } else {
        Tracer trace = Trace.trace("fetchRow - Row read");
        try {
          if (liveDocs != null && !liveDocs.get(docId)) {
            fetchResult.exists = false;
            fetchResult.deleted = true;
            return;
          } else {
            fetchResult.exists = true;
            fetchResult.deleted = false;
            if (returnIdsOnly) {
              String rowId = selector.getRowId();
              if (rowId == null) {
                rowId = getRowId(reader, docId);
              }
              fetchResult.rowResult = new FetchRowResult();
              fetchResult.rowResult.row = new Row(rowId, null);
            } else {
              List<Document> docs;
              AtomicBoolean moreDocsToFetch = new AtomicBoolean(false);
              AtomicInteger totalRecords = new AtomicInteger();
              BlurHighlighter highlighter = new BlurHighlighter(highlightQuery, fieldManager, selector);
              Tracer docTrace = Trace.trace("fetchRow - Document read");
              docs = BlurUtil.fetchDocuments(reader, fieldVisitor, selector, maxHeap, table + "/" + shard,
                  tableContext.getDefaultPrimeDocTerm(), filter, moreDocsToFetch, totalRecords, highlighter);
              docTrace.done();
              Tracer rowTrace = Trace.trace("fetchRow - Row create");
              Row row = getRow(docs);
              if (row == null) {
                String rowId = selector.getRowId();
                if (rowId == null) {
                  rowId = getRowId(reader, docId);
                }
                row = new Row(rowId, null);
              }
              fetchResult.rowResult = new FetchRowResult(row, selector.getStartRecord(),
                  selector.getMaxRecordsToFetch(), moreDocsToFetch.get(), totalRecords.get());
              rowTrace.done();
            }
            return;
          }
        } finally {
          trace.done();
        }
      }
    } finally {
      if (fetchResult.rowResult != null) {
        if (fetchResult.rowResult.row != null && fetchResult.rowResult.row.records != null) {
          _readRecordsMeter.mark(fetchResult.rowResult.row.records.size());
        }
        _readRowMeter.mark();
      } else if (fetchResult.recordResult != null) {
        _readRecordsMeter.mark();
      }
    }
  }

  @SuppressWarnings("unchecked")
  private static boolean isFiltered(int notAdjustedDocId, IndexReader reader, Filter filter) throws IOException {
    if (filter == null) {
      return false;
    }
    if (reader instanceof BaseCompositeReader) {
      BaseCompositeReader<IndexReader> indexReader = (BaseCompositeReader<IndexReader>) reader;
      List<? extends IndexReader> sequentialSubReaders = BaseCompositeReaderUtil.getSequentialSubReaders(indexReader);
      int readerIndex = BaseCompositeReaderUtil.readerIndex(indexReader, notAdjustedDocId);
      int readerBase = BaseCompositeReaderUtil.readerBase(indexReader, readerIndex);
      int docId = notAdjustedDocId - readerBase;
      IndexReader orgReader = sequentialSubReaders.get(readerIndex);
      SegmentReader sReader = BlurUtil.getSegmentReader(orgReader);
      if (sReader != null) {
        SegmentReader segmentReader = (SegmentReader) sReader;
        DocIdSet docIdSet = filter.getDocIdSet(segmentReader.getContext(), segmentReader.getLiveDocs());
        DocIdSetIterator iterator = docIdSet.iterator();
        if (iterator == null) {
          return true;
        }
        if (iterator.advance(docId) == docId) {
          return false;
        }
        return true;
      }
      throw new RuntimeException("Reader has to be a SegmentReader [" + orgReader + "]");
    } else {
      throw new RuntimeException("Reader has to be a BaseCompositeReader [" + reader + "]");
    }
  }

  private static String getRowId(IndexReader reader, int docId) throws CorruptIndexException, IOException {
    reader.document(docId, new StoredFieldVisitor() {
      @Override
      public Status needsField(FieldInfo fieldInfo) throws IOException {
        if (ROW_ID.equals(fieldInfo.name)) {
          return StoredFieldVisitor.Status.STOP;
        }
        return StoredFieldVisitor.Status.NO;
      }
    });
    return reader.document(docId).get(ROW_ID);
  }

  private static String getColumnName(String fieldName) {
    return fieldName.substring(fieldName.lastIndexOf('.') + 1);
  }

  private static String getColumnFamily(String fieldName) {
    return fieldName.substring(0, fieldName.lastIndexOf('.'));
  }

  public static ResetableDocumentStoredFieldVisitor getFieldSelector(final Selector selector) {
    return new ResetableDocumentStoredFieldVisitor() {
      @Override
      public Status needsField(FieldInfo fieldInfo) throws IOException {
        if (ROW_ID.equals(fieldInfo.name)) {
          return StoredFieldVisitor.Status.YES;
        }
        if (RECORD_ID.equals(fieldInfo.name)) {
          return StoredFieldVisitor.Status.YES;
        }
        if (PRIME_DOC.equals(fieldInfo.name)) {
          return StoredFieldVisitor.Status.NO;
        }
        if (FAMILY.equals(fieldInfo.name)) {
          return StoredFieldVisitor.Status.YES;
        }
        if (selector.columnFamiliesToFetch == null && selector.columnsToFetch == null) {
          return StoredFieldVisitor.Status.YES;
        }
        String columnFamily = getColumnFamily(fieldInfo.name);
        if (selector.columnFamiliesToFetch != null) {
          if (selector.columnFamiliesToFetch.contains(columnFamily)) {
            return StoredFieldVisitor.Status.YES;
          }
        }
        String columnName = getColumnName(fieldInfo.name);
        if (selector.columnsToFetch != null) {
          Set<String> columns = selector.columnsToFetch.get(columnFamily);
          if (columns != null && columns.contains(columnName)) {
            return StoredFieldVisitor.Status.YES;
          }
        }
        return StoredFieldVisitor.Status.NO;
      }

    };
  }

  public IndexServer getIndexServer() {
    return _indexServer;
  }

  public long recordFrequency(final String table, final String columnFamily, final String columnName, final String value)
      throws Exception {
    Map<String, BlurIndex> blurIndexes;
    try {
      blurIndexes = _indexServer.getIndexes(table);
    } catch (IOException e) {
      LOG.error("Unknown error while trying to fetch index readers.", e);
      throw new BException(e.getMessage(), e);
    }
    return ForkJoin.execute(_executor, blurIndexes.entrySet(), new ParallelCall<Entry<String, BlurIndex>, Long>() {
      @Override
      public Long call(Entry<String, BlurIndex> input) throws Exception {
        BlurIndex index = input.getValue();
        IndexSearcherClosable searcher = index.getIndexSearcher();
        try {
          return recordFrequency(searcher.getIndexReader(), columnFamily, columnName, value);
        } finally {
          // this will allow for closing of index
          searcher.close();
        }
      }
    }).merge(new Merger<Long>() {
      @Override
      public Long merge(BlurExecutorCompletionService<Long> service) throws BlurException {
        long total = 0;
        while (service.getRemainingCount() > 0) {
          Future<Long> future = service.poll(_defaultParallelCallTimeout, TimeUnit.MILLISECONDS, true, table,
              columnFamily, columnName, value);
          total += service.getResultThrowException(future, table, columnFamily, columnName, value);
        }
        return total;
      }
    });
  }

  public List<String> terms(final String table, final String columnFamily, final String columnName,
      final String startWith, final short size) throws Exception {
    Map<String, BlurIndex> blurIndexes;
    try {
      blurIndexes = _indexServer.getIndexes(table);
    } catch (IOException e) {
      LOG.error("Unknown error while trying to fetch index readers.", e);
      throw new BException(e.getMessage(), e);
    }
    return ForkJoin.execute(_executor, blurIndexes.entrySet(),
        new ParallelCall<Entry<String, BlurIndex>, List<String>>() {
          @Override
          public List<String> call(Entry<String, BlurIndex> input) throws Exception {
            BlurIndex index = input.getValue();
            IndexSearcherClosable searcher = index.getIndexSearcher();
            try {
              return terms(searcher.getIndexReader(), columnFamily, columnName, startWith, size);
            } finally {
              // this will allow for closing of index
              searcher.close();
            }
          }
        }).merge(new Merger<List<String>>() {
      @Override
      public List<String> merge(BlurExecutorCompletionService<List<String>> service) throws BlurException {
        TreeSet<String> terms = new TreeSet<String>();
        while (service.getRemainingCount() > 0) {
          Future<List<String>> future = service.poll(_defaultParallelCallTimeout, TimeUnit.MILLISECONDS, true, table,
              columnFamily, columnName, startWith, size);
          terms.addAll(service.getResultThrowException(future, table, columnFamily, columnName, startWith, size));
        }
        return new ArrayList<String>(terms).subList(0, Math.min(size, terms.size()));
      }
    });
  }

  public static long recordFrequency(IndexReader reader, String columnFamily, String columnName, String value)
      throws IOException {
    return reader.docFreq(getTerm(columnFamily, columnName, value));
  }

  public static List<String> terms(IndexReader reader, String columnFamily, String columnName, String startWith,
      short size) throws IOException {
    if (startWith == null) {
      startWith = "";
    }
    Term term = getTerm(columnFamily, columnName, startWith);
    List<String> terms = new ArrayList<String>(size);
    AtomicReader areader = BlurUtil.getAtomicReader(reader);
    Terms termsAll = areader.terms(term.field());

    if (termsAll == null) {
      return terms;
    }

    TermsEnum termEnum = termsAll.iterator(null);

    termEnum.seekCeil(term.bytes());

    BytesRef currentTermText = termEnum.term();
    do {
      terms.add(currentTermText.utf8ToString());
      if (terms.size() >= size) {
        return terms;
      }
    } while ((currentTermText = termEnum.next()) != null);
    return terms;
  }

  private static Term getTerm(String columnFamily, String columnName, String value) {
    if (columnName == null) {
      throw new NullPointerException("ColumnName cannot be null.");
    }
    if (columnFamily == null) {
      return new Term(columnName, value);
    }
    return new Term(columnFamily + "." + columnName, value);
  }

  public void mutate(final RowMutation mutation) throws BlurException, IOException {
    long s = System.nanoTime();
    doMutates(Arrays.asList(mutation));
    long e = System.nanoTime();
    LOG.debug("doMutate took [{0} ms] to complete", (e - s) / 1000000.0);
  }

  public void mutate(final List<RowMutation> mutations) throws BlurException, IOException {
    long s = System.nanoTime();
    doMutates(mutations);
    long e = System.nanoTime();
    LOG.debug("doMutates took [{0} ms] to complete", (e - s) / 1000000.0);
  }

  private void doMutates(List<RowMutation> mutations) throws BlurException, IOException {
    mutations = MutatableAction.reduceMutates(mutations);
    Map<String, List<RowMutation>> map = getMutatesPerTable(mutations);
    for (Entry<String, List<RowMutation>> entry : map.entrySet()) {
      doMutates(entry.getKey(), entry.getValue());
    }
  }

  public void enqueue(List<RowMutation> mutations) throws BlurException, IOException {
    mutations = MutatableAction.reduceMutates(mutations);
    Map<String, List<RowMutation>> map = getMutatesPerTable(mutations);
    for (Entry<String, List<RowMutation>> entry : map.entrySet()) {
      doEnqueue(entry.getKey(), entry.getValue());
    }
  }

  private void doEnqueue(final String table, List<RowMutation> mutations) throws IOException, BlurException {
    final Map<String, BlurIndex> indexes = _indexServer.getIndexes(table);
    Map<String, List<RowMutation>> mutationsByShard = new HashMap<String, List<RowMutation>>();
    for (int i = 0; i < mutations.size(); i++) {
      RowMutation mutation = mutations.get(i);
      String shard = MutationHelper.getShardName(table, mutation.rowId, getNumberOfShards(table), _blurPartitioner);
      List<RowMutation> list = mutationsByShard.get(shard);
      if (list == null) {
        list = new ArrayList<RowMutation>();
        mutationsByShard.put(shard, list);
      }
      list.add(mutation);
    }
    for (Entry<String, List<RowMutation>> entry : mutationsByShard.entrySet()) {
      BlurIndex index = indexes.get(entry.getKey());
      index.enqueue(entry.getValue());
    }
  }

  private void doMutates(final String table, List<RowMutation> mutations) throws IOException, BlurException {
    final Map<String, BlurIndex> indexes = _indexServer.getIndexes(table);
    Map<String, List<RowMutation>> mutationsByShard = new HashMap<String, List<RowMutation>>();
    for (int i = 0; i < mutations.size(); i++) {
      RowMutation mutation = mutations.get(i);
      String shard = MutationHelper.getShardName(table, mutation.rowId, getNumberOfShards(table), _blurPartitioner);
      List<RowMutation> list = mutationsByShard.get(shard);
      if (list == null) {
        list = new ArrayList<RowMutation>();
        mutationsByShard.put(shard, list);
      }
      list.add(mutation);
    }

    List<Future<Void>> futures = new ArrayList<Future<Void>>();

    for (Entry<String, List<RowMutation>> entry : mutationsByShard.entrySet()) {
      final String shard = entry.getKey();
      final List<RowMutation> value = entry.getValue();
      futures.add(executeMutates(table, shard, indexes, value));
    }

    for (Future<Void> future : futures) {
      try {
        future.get();
      } catch (InterruptedException e) {
        throw new BException("Unknown error during mutation", e);
      } catch (ExecutionException e) {
        throw new BException("Unknown error during mutation", e.getCause());
      }
    }
  }

  private Future<Void> executeMutates(String table, String shard, Map<String, BlurIndex> indexes,
      List<RowMutation> mutations) throws BlurException, IOException {
    long s = System.nanoTime();
    try {
      final BlurIndex blurIndex = indexes.get(shard);
      if (blurIndex == null) {
        throw new BException("Shard [" + shard + "] in table [" + table + "] is not being served by this server.");
      }
      ShardContext shardContext = blurIndex.getShardContext();
      final MutatableAction mutatableAction = new MutatableAction(shardContext);
      mutatableAction.mutate(mutations);
      return _mutateExecutor.submit(new Callable<Void>() {
        @Override
        public Void call() throws Exception {
          blurIndex.process(mutatableAction);
          return null;
        }
      });
    } finally {
      long e = System.nanoTime();
      LOG.debug("executeMutates took [{0} ms] to complete", (e - s) / 1000000.0);
    }
  }

  private Map<String, List<RowMutation>> getMutatesPerTable(List<RowMutation> mutations) {
    Map<String, List<RowMutation>> map = new HashMap<String, List<RowMutation>>();
    for (RowMutation mutation : mutations) {
      String table = mutation.table;
      List<RowMutation> list = map.get(table);
      if (list == null) {
        list = new ArrayList<RowMutation>();
        map.put(table, list);
      }
      list.add(mutation);
    }
    return map;
  }

  private int getNumberOfShards(String table) {
    return getTableContext(table).getDescriptor().getShardCount();
  }

  static class SimpleQueryParallelCall implements ParallelCall<Entry<String, BlurIndex>, BlurResultIterable> {

    private final String _table;
    private final QueryStatus _status;
    private final Query _query;
    private final Selector _selector;
    private final AtomicBoolean _running;
    private final Meter _queriesInternalMeter;
    private final ShardServerContext _shardServerContext;
    private final boolean _runSlow;
    private final int _fetchCount;
    private final int _maxHeapPerRowFetch;
    private final Similarity _similarity;
    private final TableContext _context;
    private final Sort _sort;
    private final DeepPagingCache _deepPagingCache;

    public SimpleQueryParallelCall(AtomicBoolean running, String table, QueryStatus status, Query query,
        Selector selector, Meter queriesInternalMeter, ShardServerContext shardServerContext, boolean runSlow,
        int fetchCount, int maxHeapPerRowFetch, Similarity similarity, TableContext context, Sort sort,
        DeepPagingCache deepPagingCache) {
      _running = running;
      _table = table;
      _status = status;
      _query = query;
      _selector = selector;
      _queriesInternalMeter = queriesInternalMeter;
      _shardServerContext = shardServerContext;
      _runSlow = runSlow;
      _fetchCount = fetchCount;
      _maxHeapPerRowFetch = maxHeapPerRowFetch;
      _similarity = similarity;
      _context = context;
      _sort = sort;
      _deepPagingCache = deepPagingCache;
    }

    @Override
    public BlurResultIterable call(Entry<String, BlurIndex> entry) throws Exception {
      String shard = entry.getKey();
      _status.attachThread(shard);
      BlurIndex index = entry.getValue();
      IndexSearcherClosable searcher = index.getIndexSearcher();
      Tracer trace2 = null;
      try {
        IndexReader indexReader = searcher.getIndexReader();
        if (indexReader instanceof ExitableReader) {
          ExitableReader er = (ExitableReader) indexReader;
          er.setRunning(_running);
        } else {
          throw new IOException("IndexReader is not ExitableReader");
        }
        if (_shardServerContext != null) {
          _shardServerContext.setIndexSearcherClosable(_table, shard, searcher);
        }
        searcher.setSimilarity(_similarity);
        Tracer trace1 = Trace.trace("query rewrite", Trace.param("table", _table));
        Query rewrite;
        try {
          rewrite = searcher.rewrite((Query) _query.clone());
        } catch (ExitingReaderException e) {
          LOG.info("Query [{0}] has been cancelled during the rewrite phase.", _query);
          throw e;
        } finally {
          trace1.done();
        }

        // BlurResultIterableSearcher will close searcher, if shard server
        // context is null.
        trace2 = Trace.trace("query initial search");
        return new BlurResultIterableSearcher(_running, rewrite, _table, shard, searcher, _selector,
            _shardServerContext == null, _runSlow, _fetchCount, _maxHeapPerRowFetch, _context, _sort, _deepPagingCache);
      } catch (BlurException e) {
        switch (_status.getQueryStatus().getState()) {
        case INTERRUPTED:
          e.setErrorType(ErrorType.QUERY_CANCEL);
          throw e;
        case BACK_PRESSURE_INTERRUPTED:
          e.setErrorType(ErrorType.BACK_PRESSURE);
          throw e;
        default:
          e.setErrorType(ErrorType.UNKNOWN);
          throw e;
        }
      } finally {
        if (trace2 != null) {
          trace2.done();
        }
        _queriesInternalMeter.mark();
        _status.deattachThread(shard);
      }
    }
  }

  public void optimize(String table, int numberOfSegmentsPerShard) throws BException {
    Map<String, BlurIndex> blurIndexes;
    try {
      blurIndexes = _indexServer.getIndexes(table);
    } catch (IOException e) {
      LOG.error("Unknown error while trying to fetch index readers.", e);
      throw new BException(e.getMessage(), e);
    }

    Collection<BlurIndex> values = blurIndexes.values();
    for (BlurIndex index : values) {
      try {
        index.optimize(numberOfSegmentsPerShard);
      } catch (IOException e) {
        LOG.error("Unknown error while trying to optimize indexes.", e);
        throw new BException(e.getMessage(), e);
      }
    }
  }

  public void enqueue(RowMutation mutation) throws BlurException, IOException {
    enqueue(Arrays.asList(mutation));
  }

}
TOP

Related Classes of org.apache.blur.manager.IndexManager

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.