Package io.crate.action.sql.query

Source Code of io.crate.action.sql.query.CrateSearchService

/*
* Licensed to CRATE Technology GmbH ("Crate") under one or more contributor
* license agreements.  See the NOTICE file distributed with this work for
* additional information regarding copyright ownership.  Crate licenses
* this file to you under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.  You may
* obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
* License for the specific language governing permissions and limitations
* under the License.
*
* However, if you have executed another commercial license agreement
* with Crate these terms will supersede the license and you may use the
* software solely pursuant to the terms of the relevant commercial agreement.
*/

package io.crate.action.sql.query;

import com.google.common.collect.ImmutableMap;
import io.crate.Constants;
import io.crate.core.StringUtils;
import io.crate.executor.transport.task.elasticsearch.SortOrder;
import io.crate.lucene.LuceneQueryBuilder;
import io.crate.metadata.ColumnIdent;
import io.crate.metadata.Functions;
import io.crate.metadata.ReferenceInfo;
import io.crate.metadata.doc.DocSysColumns;
import io.crate.operation.Input;
import io.crate.operation.collect.CollectInputSymbolVisitor;
import io.crate.operation.reference.doc.lucene.CollectorContext;
import io.crate.operation.reference.doc.lucene.LuceneCollectorExpression;
import io.crate.operation.reference.doc.lucene.LuceneDocLevelReferenceResolver;
import io.crate.planner.symbol.*;
import io.crate.types.DataType;
import io.crate.types.DataTypes;
import org.apache.lucene.index.AtomicReaderContext;
import org.apache.lucene.search.*;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.cache.recycler.CacheRecycler;
import org.elasticsearch.cache.recycler.PageCacheRecycler;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.fielddata.IndexFieldData;
import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.index.query.ParsedQuery;
import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.indices.IndicesLifecycle;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.warmer.IndicesWarmer;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.InternalSearchService;
import org.elasticsearch.search.MultiValueMode;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.dfs.DfsPhase;
import org.elasticsearch.search.fetch.FetchPhase;
import org.elasticsearch.search.fetch.source.FetchSourceContext;
import org.elasticsearch.search.internal.DefaultSearchContext;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.internal.ShardSearchRequest;
import org.elasticsearch.search.query.QueryPhase;
import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.search.sort.SortParseElement;
import org.elasticsearch.threadpool.ThreadPool;

import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;

public class CrateSearchService extends InternalSearchService {

    private final SortSymbolVisitor sortSymbolVisitor;
    private final Functions functions;

    @Inject
    public CrateSearchService(Settings settings,
                              ClusterService clusterService,
                              IndicesService indicesService,
                              IndicesLifecycle indicesLifecycle,
                              IndicesWarmer indicesWarmer,
                              ThreadPool threadPool,
                              ScriptService scriptService,
                              CacheRecycler cacheRecycler,
                              PageCacheRecycler pageCacheRecycler,
                              BigArrays bigArrays,
                              DfsPhase dfsPhase,
                              QueryPhase queryPhase,
                              FetchPhase fetchPhase,
                              Functions functions) {
        super(settings, clusterService, indicesService, indicesLifecycle,
                indicesWarmer,
                threadPool,
                scriptService,
                cacheRecycler, pageCacheRecycler, bigArrays, dfsPhase, queryPhase, fetchPhase);
        this.functions = functions;
        CollectInputSymbolVisitor<LuceneCollectorExpression<?>> inputSymbolVisitor =
                new CollectInputSymbolVisitor<>(functions, LuceneDocLevelReferenceResolver.INSTANCE);
        sortSymbolVisitor = new SortSymbolVisitor(inputSymbolVisitor);
    }

    public QuerySearchResult executeQueryPhase(QueryShardRequest request) {
        SearchContext context = createAndPutContext(request);
        try {
            context.indexShard().searchService().onPreQueryPhase(context);
            long time = System.nanoTime();
            contextProcessing(context);
            queryPhase.execute(context);

            assert context.searchType() != SearchType.COUNT : "searchType COUNT is not supported using QueryShardRequests";
            contextProcessedSuccessfully(context);

            context.indexShard().searchService().onQueryPhase(context, System.nanoTime() - time);
            return context.queryResult();
        } catch (Throwable e) {
            context.indexShard().searchService().onFailedQueryPhase(context);
            logger.trace("Query phase failed", e);
            freeContext(context.id());
            throw ExceptionsHelper.convertToRuntime(e);
        } finally {
            cleanContext(context);
        }
    }

    private SearchContext createAndPutContext(QueryShardRequest request) {
        SearchContext context = createContext(request, null);
        boolean success = false;
        try {
            activeContexts.put(context.id(), context);
            context.indexShard().searchService().onNewContext(context);
            success = true;
            return context;
        } finally {
            if (!success) {
                freeContext(context.id());
            }
        }
    }

    /**
     * Creates a new SearchContext. <br />
     * <p>
     * This is similar to
     * {@link org.elasticsearch.search.InternalSearchService#createContext(org.elasticsearch.search.internal.ShardSearchRequest, org.elasticsearch.index.engine.Engine.Searcher)}
     * but uses Symbols to create the lucene query / sorting.
     * </p>
     *
     * <p>
     * Note: Scrolling isn't supported.
     * </p>
     */
    private SearchContext createContext(QueryShardRequest request, @Nullable Engine.Searcher searcher) {
        IndexService indexService = indicesService.indexServiceSafe(request.index());
        IndexShard indexShard = indexService.shardSafe(request.shardId());

        SearchShardTarget searchShardTarget = new SearchShardTarget(
                clusterService.localNode().id(),
                request.index(),
                request.shardId()
        );

        Engine.Searcher engineSearcher = searcher == null ? indexShard.acquireSearcher("search") : searcher;

        ShardSearchRequest shardSearchRequest = new ShardSearchRequest();
        shardSearchRequest.types(new String[] {Constants.DEFAULT_MAPPING_TYPE });

        // TODO: use own CrateSearchContext that doesn't require ShardSearchRequest
        SearchContext context = new DefaultSearchContext(
                idGenerator.incrementAndGet(),
                shardSearchRequest,
                searchShardTarget,
                engineSearcher,
                indexService,
                indexShard,
                scriptService,
                cacheRecycler,
                pageCacheRecycler,
                bigArrays
        );
        SearchContext.setCurrent(context);

        try {
            LuceneQueryBuilder builder = new LuceneQueryBuilder(functions, context, indexService.cache());
            LuceneQueryBuilder.Context ctx = builder.convert(request.whereClause());
            context.parsedQuery(new ParsedQuery(ctx.query(), ImmutableMap.<String, Filter>of()));
            Float minScore = ctx.minScore();
            if (minScore != null) {
                context.minimumScore(minScore);
            }

            // the OUTPUTS_VISITOR sets the sourceFetchContext / version / minScore onto the SearchContext
            OutputContext outputContext = new OutputContext(context, request.partitionBy());
            OUTPUTS_VISITOR.process(request.outputs(), outputContext);

            context.sort(generateLuceneSort(
                    context, request.orderBy(), request.reverseFlags(), request.nullsFirst()));

            context.from(request.offset());
            context.size(request.limit());

            // pre process
            dfsPhase.preProcess(context);
            queryPhase.preProcess(context);
            fetchPhase.preProcess(context);

            // compute the context keep alive
            long keepAlive = defaultKeepAlive;
            context.keepAlive(keepAlive);
        } catch (Throwable e) {
            context.close();
            throw ExceptionsHelper.convertToRuntime(e);
        }
        return context;
    }

    private static final OutputSymbolVisitor OUTPUTS_VISITOR = new OutputSymbolVisitor();

    private static class OutputContext {
        private final SearchContext searchContext;
        private final List<ReferenceInfo> partitionBy;
        private final List<String> fields = new ArrayList<>();
        public boolean needWholeSource = false;

        private OutputContext(SearchContext searchContext, List<ReferenceInfo> partitionBy) {
            this.searchContext = searchContext;
            this.partitionBy = partitionBy;
        }
    }

    private static class OutputSymbolVisitor extends SymbolVisitor<OutputContext, Void> {

        public void process(List<Symbol> outputs, OutputContext context) {
            for (Symbol output : outputs) {
                process(output, context);
            }
            if (!context.needWholeSource) {
                if (context.fields.isEmpty()) {
                    context.searchContext.fetchSourceContext(new FetchSourceContext(false));
                } else {
                    Set<String> fields = StringUtils.commonAncestors(context.fields);
                    context.searchContext.fetchSourceContext(
                            new FetchSourceContext(fields.toArray(new String[fields.size()])));
                }
            }
        }

        @Override
        public Void visitReference(Reference symbol, OutputContext context) {
            ColumnIdent columnIdent = symbol.info().ident().columnIdent();
            if (columnIdent.isSystemColumn()) {
                if (DocSysColumns.VERSION.equals(columnIdent)) {
                    context.searchContext.version(true);
                } else {
                    context.needWholeSource = true;
                }
            } else if (!context.partitionBy.contains(symbol.info())) {
                context.fields.add(columnIdent.fqn());
            }
            return null;
        }

        @Override
        public Void visitDynamicReference(DynamicReference symbol, OutputContext context) {
            return visitReference(symbol, context);
        }

        @Override
        protected Void visitSymbol(Symbol symbol, OutputContext context) {
            throw new UnsupportedOperationException(SymbolFormatter.format(
                    "Can't use %s as an output", symbol));
        }
    }

    private static final Map<DataType, SortField.Type> luceneTypeMap = ImmutableMap.<DataType, SortField.Type>builder()
            .put(DataTypes.STRING, SortField.Type.STRING)
            .put(DataTypes.LONG, SortField.Type.LONG)
            .put(DataTypes.INTEGER, SortField.Type.INT)
            .put(DataTypes.DOUBLE, SortField.Type.DOUBLE)
            .put(DataTypes.FLOAT, SortField.Type.FLOAT)
            .build();


    @Nullable
    private Sort generateLuceneSort(SearchContext context,
                                    List<Symbol> symbols,
                                    boolean[] reverseFlags,
                                    Boolean[] nullsFirst) {
        if (symbols.isEmpty()) {
            return null;
        }
        SortField[] sortFields = new SortField[symbols.size()];
        for (int i = 0, symbolsSize = symbols.size(); i < symbolsSize; i++) {
            sortFields[i] = sortSymbolVisitor.generateSortField(
                    symbols.get(i), new SortSymbolContext(context, reverseFlags[i], nullsFirst[i]));
        }
        return new Sort(sortFields);
    }

    private static class SortSymbolContext {

        private final boolean reverseFlag;
        private final CollectorContext context;
        private final Boolean nullFirst;

        public SortSymbolContext(SearchContext searchContext, boolean reverseFlag, Boolean nullFirst) {
            this.nullFirst = nullFirst;
            this.context = new CollectorContext();
            this.context.searchContext(searchContext);
            this.reverseFlag = reverseFlag;
        }
    }

    private static class SortSymbolVisitor extends SymbolVisitor<SortSymbolContext, SortField> {

        private final CollectInputSymbolVisitor<LuceneCollectorExpression<?>> inputSymbolVisitor;

        public SortSymbolVisitor(CollectInputSymbolVisitor<LuceneCollectorExpression<?>> inputSymbolVisitor) {
            super();
            this.inputSymbolVisitor = inputSymbolVisitor;
        }

        public SortField generateSortField(Symbol symbol, SortSymbolContext sortSymbolContext) {
            return process(symbol, sortSymbolContext);
        }


        /**
         * generate a SortField from a Reference symbol.
         *
         * the implementation is similar to what {@link org.elasticsearch.search.sort.SortParseElement}
         * does.
         */
        @Override
        public SortField visitReference(Reference symbol, SortSymbolContext context) {
            // can't use the SortField(fieldName, type) constructor
            // because values are saved using docValues and therefore they're indexed in lucene as binary and not
            // with the reference valueType.
            // this is why we use a custom comparator source with the same logic as ES

            ColumnIdent columnIdent = symbol.info().ident().columnIdent();
            if (columnIdent.isColumn() && SortParseElement.SCORE_FIELD_NAME.equals(columnIdent.name())) {
                return !context.reverseFlag ? SortParseElement.SORT_SCORE_REVERSE : SortParseElement.SORT_SCORE;
            }

            MultiValueMode sortMode = context.reverseFlag ? MultiValueMode.MAX : MultiValueMode.MIN;
            SearchContext searchContext = context.context.searchContext();

            FieldMapper fieldMapper = context.context.searchContext().smartNameFieldMapper(columnIdent.fqn());
            SortOrder sortOrder = new SortOrder(context.reverseFlag, context.nullFirst);
            IndexFieldData.XFieldComparatorSource fieldComparatorSource =
                    searchContext.fieldData().getForField(fieldMapper).comparatorSource(sortOrder.missing(), sortMode);

            return new SortField(
                    fieldMapper.names().indexName(),
                    fieldComparatorSource,
                    context.reverseFlag
            );
        }

        @Override
        public SortField visitFunction(final Function function, final SortSymbolContext context) {
            CollectInputSymbolVisitor.Context inputContext = inputSymbolVisitor.process(function);
            ArrayList<Input<?>> inputs = inputContext.topLevelInputs();
            assert inputs.size() == 1;
            final Input functionInput = inputs.get(0);
            @SuppressWarnings("unchecked")
            final List<LuceneCollectorExpression> expressions = inputContext.docLevelExpressions();
            final SortField.Type type = luceneTypeMap.get(function.valueType());
            final SortOrder sortOrder = new SortOrder(context.reverseFlag, context.nullFirst);
            assert type != null : "Could not get lucene sort type for " + function.valueType();

            return new SortField(function.toString(), new IndexFieldData.XFieldComparatorSource() {
                @Override
                public FieldComparator<?> newComparator(String fieldName, int numHits, int sortPos, boolean reversed) throws IOException {
                    return new InputFieldComparator(
                            numHits,
                            context.context,
                            expressions,
                            functionInput,
                            function.valueType(),
                            missingObject(sortOrder.missing(), reversed)
                    );
                }

                @Override
                public SortField.Type reducedType() {
                    return type;
                }
            }, context.reverseFlag);
        }

        @Override
        protected SortField visitSymbol(Symbol symbol, SortSymbolContext context) {
            throw new UnsupportedOperationException(
                    SymbolFormatter.format("sorting on %s is not supported", symbol));
        }
    }

    static class InputFieldComparator extends FieldComparator {

        private final Object[] values;
        private final Input input;
        private final List<LuceneCollectorExpression> collectorExpressions;
        private final Object missingValue;
        private final DataType valueType;
        private Object bottom;
        private Object top;

        public InputFieldComparator(int numHits,
                                    CollectorContext context,
                                    List<LuceneCollectorExpression> collectorExpressions,
                                    Input input,
                                    DataType valueType,
                                    Object missingValue) {
            this.collectorExpressions = collectorExpressions;
            this.missingValue = missingValue;
            for (int i = 0, collectorExpressionsSize = collectorExpressions.size(); i < collectorExpressionsSize; i++) {
                LuceneCollectorExpression collectorExpression = collectorExpressions.get(i);
                collectorExpression.startCollect(context);
            }
            this.valueType = valueType;
            this.values = new Object[numHits];
            this.input = input;
        }

        @Override
        @SuppressWarnings("unchecked")
        public int compare(int slot1, int slot2) {
            return valueType.compareValueTo(values[slot1], values[slot2]);
        }

        @Override
        public void setBottom(int slot) {
            bottom = values[slot];
        }

        @Override
        public void setTopValue(Object value) {
            top = value;
        }

        @SuppressWarnings("unchecked")
        @Override
        public int compareBottom(int doc) throws IOException {
            for (int i = 0, collectorExpressionsSize = collectorExpressions.size(); i < collectorExpressionsSize; i++) {
                LuceneCollectorExpression collectorExpression = collectorExpressions.get(i);
                collectorExpression.setNextDocId(doc);
            }
            return valueType.compareValueTo(bottom, input.value());
        }

        @SuppressWarnings("unchecked")
        @Override
        public int compareTop(int doc) throws IOException {
            for (int i = 0, collectorExpressionsSize = collectorExpressions.size(); i < collectorExpressionsSize; i++) {
                LuceneCollectorExpression collectorExpression = collectorExpressions.get(i);
                collectorExpression.setNextDocId(doc);
            }
            return valueType.compareValueTo(top, input.value());
        }

        @Override
        public void copy(int slot, int doc) throws IOException {
            for (int i = 0, collectorExpressionsSize = collectorExpressions.size(); i < collectorExpressionsSize; i++) {
                LuceneCollectorExpression collectorExpression = collectorExpressions.get(i);
                collectorExpression.setNextDocId(doc);
            }
            Object value = input.value();
            if (value == null) {
                values[slot] = missingValue;
            } else {
                values[slot] = value;
            }
        }

        @Override
        public FieldComparator setNextReader(AtomicReaderContext context) throws IOException {
            for (int i = 0, collectorExpressionsSize = collectorExpressions.size(); i < collectorExpressionsSize; i++) {
                LuceneCollectorExpression collectorExpression = collectorExpressions.get(i);
                collectorExpression.setNextReader(context);
            }
            return this;
        }

        @Override
        public Object value(int slot) {
            return values[slot];
        }
    }
}
TOP

Related Classes of io.crate.action.sql.query.CrateSearchService

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.