Package io.crate.executor.transport

Source Code of io.crate.executor.transport.TransportExecutor$Visitor

/*
* Licensed to CRATE Technology GmbH ("Crate") under one or more contributor
* license agreements.  See the NOTICE file distributed with this work for
* additional information regarding copyright ownership.  Crate licenses
* this file to you under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.  You may
* obtain a copy of the License at
*
*   http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
* License for the specific language governing permissions and limitations
* under the License.
*
* However, if you have executed another commercial license agreement
* with Crate these terms will supersede the license and you may use the
* software solely pursuant to the terms of the relevant commercial agreement.
*/

package io.crate.executor.transport;

import com.google.common.util.concurrent.ListenableFuture;
import io.crate.executor.Executor;
import io.crate.executor.Job;
import io.crate.executor.Task;
import io.crate.executor.TaskResult;
import io.crate.executor.task.LocalCollectTask;
import io.crate.executor.task.LocalMergeTask;
import io.crate.executor.transport.task.CreateTableTask;
import io.crate.executor.transport.task.DistributedMergeTask;
import io.crate.executor.transport.task.DropTableTask;
import io.crate.executor.transport.task.RemoteCollectTask;
import io.crate.executor.transport.task.elasticsearch.*;
import io.crate.metadata.Functions;
import io.crate.metadata.ReferenceResolver;
import io.crate.operation.ImplementationSymbolVisitor;
import io.crate.operation.collect.HandlerSideDataCollectOperation;
import io.crate.operation.collect.StatsTables;
import io.crate.operation.projectors.ProjectionToProjectorVisitor;
import io.crate.planner.Plan;
import io.crate.planner.RowGranularity;
import io.crate.planner.node.PlanNode;
import io.crate.planner.node.PlanVisitor;
import io.crate.planner.node.ddl.*;
import io.crate.planner.node.dml.ESDeleteByQueryNode;
import io.crate.planner.node.dml.ESDeleteNode;
import io.crate.planner.node.dml.ESIndexNode;
import io.crate.planner.node.dml.ESUpdateNode;
import io.crate.planner.node.dql.*;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.search.controller.SearchPhaseController;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.List;

public class TransportExecutor implements Executor {

    private final Functions functions;
    private final ReferenceResolver referenceResolver;
    private final SearchPhaseController searchPhaseController;
    private final StatsTables statsTables;
    private final Visitor visitor;
    private final ThreadPool threadPool;

    private final Settings settings;
    private final ClusterService clusterService;
    private final TransportActionProvider transportActionProvider;

    // operation for handler side collecting
    private final HandlerSideDataCollectOperation handlerSideDataCollectOperation;
    private final ProjectionToProjectorVisitor projectorVisitor;

    @Inject
    public TransportExecutor(Settings settings,
                             TransportActionProvider transportActionProvider,
                             ThreadPool threadPool,
                             Functions functions,
                             ReferenceResolver referenceResolver,
                             HandlerSideDataCollectOperation handlerSideDataCollectOperation,
                             SearchPhaseController searchPhaseController,
                             StatsTables statsTables,
                             ClusterService clusterService) {
        this.settings = settings;
        this.transportActionProvider = transportActionProvider;
        this.handlerSideDataCollectOperation = handlerSideDataCollectOperation;
        this.threadPool = threadPool;
        this.functions = functions;
        this.referenceResolver = referenceResolver;
        this.searchPhaseController = searchPhaseController;
        this.statsTables = statsTables;
        this.clusterService = clusterService;
        this.visitor = new Visitor();
        ImplementationSymbolVisitor clusterImplementationSymbolVisitor =
                new ImplementationSymbolVisitor(referenceResolver, functions, RowGranularity.CLUSTER);
        projectorVisitor = new ProjectionToProjectorVisitor(
                clusterService, settings, transportActionProvider, clusterImplementationSymbolVisitor);
    }

    @Override
    public Job newJob(Plan node) {
        final Job job = new Job();
        for (PlanNode planNode : node) {
            planNode.accept(visitor, job);
        }
        return job;
    }

    @Override
    @SuppressWarnings("unchecked")
    public List<ListenableFuture<TaskResult>> execute(Job job) {
        assert job.tasks().size() > 0;

        Task lastTask = null;
        for (Task task : job.tasks()) {
            // chaining tasks
            if (lastTask != null) {
                task.upstreamResult(lastTask.result());
            }
            task.start();
            lastTask = task;
        }

        assert lastTask != null;
        return (List<ListenableFuture<TaskResult>>)lastTask.result();
    }

    class Visitor extends PlanVisitor<Job, Void> {

        @Override
        public Void visitCollectNode(CollectNode node, Job context) {
            node.jobId(context.id()); // add jobId to collectNode
            if (node.isRouted()) {
                context.addTask(new RemoteCollectTask(
                    node,
                    transportActionProvider.transportCollectNodeAction(),
                    handlerSideDataCollectOperation));
            } else {
                context.addTask(new LocalCollectTask(handlerSideDataCollectOperation, node));
            }
            return null;
        }

        @Override
        public Void visitMergeNode(MergeNode node, Job context) {
            node.contextId(context.id());
            if (node.executionNodes().isEmpty()) {
                context.addTask(new LocalMergeTask(
                        threadPool,
                        clusterService,
                        settings,
                        transportActionProvider,
                        new ImplementationSymbolVisitor(referenceResolver, functions, RowGranularity.CLUSTER),
                        node,
                        statsTables));
            } else {
                context.addTask(new DistributedMergeTask(
                        transportActionProvider.transportMergeNodeAction(), node));
            }

            return null;
        }

        @Override
        public Void visitESSearchNode(QueryThenFetchNode node, Job context) {
            context.addTask(new QueryThenFetchTask(
                    node,
                    clusterService,
                    transportActionProvider.transportQueryShardAction(),
                    transportActionProvider.searchServiceTransportAction(),
                    searchPhaseController,
                    threadPool));
            return null;
        }

        @Override
        public Void visitESGetNode(ESGetNode node, Job context) {
            context.addTask(new ESGetTask(
                    functions,
                    projectorVisitor,
                    transportActionProvider.transportMultiGetAction(),
                    transportActionProvider.transportGetAction(),
                    node));
            return null;
        }

        @Override
        public Void visitESDeleteByQueryNode(ESDeleteByQueryNode node, Job context) {
            context.addTask(new ESDeleteByQueryTask(node,
                    transportActionProvider.transportDeleteByQueryAction()));
            return null;
        }

        @Override
        public Void visitESDeleteNode(ESDeleteNode node, Job context) {
            context.addTask(new ESDeleteTask(
                    transportActionProvider.transportDeleteAction(),
                    node));
            return null;
        }

        @Override
        public Void visitCreateTableNode(CreateTableNode node, Job context) {
            context.addTask(new CreateTableTask(clusterService,
                transportActionProvider.transportCreateIndexAction(),
                transportActionProvider.transportDeleteIndexAction(),
                transportActionProvider.transportPutIndexTemplateAction(),
                node)
            );
            return null;
        }

        @Override
        public Void visitESCreateTemplateNode(ESCreateTemplateNode node, Job context) {
            context.addTask(new ESCreateTemplateTask(node,
                    transportActionProvider.transportPutIndexTemplateAction()));
            return null;
        }

        @Override
        public Void visitESCountNode(ESCountNode node, Job context) {
            context.addTask(new ESCountTask(node,
                    transportActionProvider.transportCountAction()));
            return null;
        }

        @Override
        public Void visitESIndexNode(ESIndexNode node, Job context) {
            if (node.sourceMaps().size() > 1) {
                context.addTask(new ESBulkIndexTask(clusterService, settings,
                        transportActionProvider.transportShardBulkAction(),
                        transportActionProvider.transportCreateIndexAction(),
                        node));
            } else {
                context.addTask(new ESIndexTask(
                        transportActionProvider.transportIndexAction(),
                        node));
            }
            return null;
        }

        @Override
        public Void visitESUpdateNode(ESUpdateNode node, Job context) {
            // update with _version currently only possible in update by query
            if (node.ids().size() == 1 && node.routingValues().size() == 1) {
                context.addTask(new ESUpdateByIdTask(
                        transportActionProvider.transportUpdateAction(),
                        node));
            } else {
                context.addTask(new ESUpdateByQueryTask(
                        transportActionProvider.transportSearchAction(),
                        node));
            }
            return null;
        }

        @Override
        public Void visitDropTableNode(DropTableNode node, Job context) {
            context.addTask(new DropTableTask(
                    transportActionProvider.transportDeleteIndexTemplateAction(),
                    transportActionProvider.transportDeleteIndexAction(),
                    node));
            return null;
        }

        @Override
        public Void visitESDeleteIndexNode(ESDeleteIndexNode node, Job context) {
            context.addTask(new ESDeleteIndexTask(
                    transportActionProvider.transportDeleteIndexAction(),
                    node));
            return null;
        }

        @Override
        public Void visitESClusterUpdateSettingsNode(ESClusterUpdateSettingsNode node, Job context) {
            context.addTask(new ESClusterUpdateSettingsTask(
                    transportActionProvider.transportClusterUpdateSettingsAction(),
                    node));
            return null;
        }

        @Override
        protected Void visitPlanNode(PlanNode node, Job context) {
            throw new UnsupportedOperationException(
                    String.format("Can't generate job/task for planNode %s", node));
        }
    }
}
TOP

Related Classes of io.crate.executor.transport.TransportExecutor$Visitor

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.