Package io.crate.action.sql

Source Code of io.crate.action.sql.TransportSQLBulkAction$TransportHandler

* Licensed to CRATE Technology GmbH ("Crate") under one or more contributor
* license agreements.  See the NOTICE file distributed with this work for
* additional information regarding copyright ownership.  Crate licenses
* this file to you under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.  You may
* obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
* License for the specific language governing permissions and limitations
* under the License.
* However, if you have executed another commercial license agreement
* with Crate these terms will supersede the license and you may use the
* software solely pursuant to the terms of the relevant commercial agreement.

package io.crate.action.sql;

import io.crate.analyze.Analysis;
import io.crate.analyze.Analyzer;
import io.crate.executor.Executor;
import io.crate.executor.RowCountResult;
import io.crate.executor.TaskResult;
import io.crate.executor.transport.ResponseForwarder;
import io.crate.operation.collect.StatsTables;
import io.crate.planner.Plan;
import io.crate.planner.Planner;
import io.crate.sql.tree.Statement;
import io.crate.types.DataType;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.inject.Provider;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.BaseTransportRequestHandler;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportService;

import javax.annotation.Nullable;
import java.util.List;

public class TransportSQLBulkAction extends TransportBaseSQLAction<SQLBulkRequest, SQLBulkResponse> {

    public TransportSQLBulkAction(ClusterService clusterService,
                                  Settings settings,
                                  ThreadPool threadPool,
                                  Analyzer analyzer,
                                  Planner planner,
                                  Provider<Executor> executor,
                                  Provider<DDLAnalysisDispatcher> dispatcher,
                                  TransportService transportService,
                                  StatsTables statsTables) {
        super(clusterService, settings, SQLBulkAction.NAME, threadPool, analyzer, planner, executor, dispatcher, statsTables);
        transportService.registerHandler(SQLBulkAction.NAME, new TransportHandler());

    public Analysis getAnalysis(Statement statement, SQLBulkRequest request) {
        return analyzer.analyze(statement, SQLRequest.EMPTY_ARGS, request.bulkArgs());

    protected SQLBulkResponse emptyResponse(SQLBulkRequest request, String[] outputNames, @Nullable DataType[] types) {
        return new SQLBulkResponse(

    protected SQLBulkResponse emptyResponse(SQLBulkRequest request, Plan plan, String[] outputNames) {
        DataType[] dataTypes = plan.outputTypes().toArray(new DataType[plan.outputTypes().size()]);
        return new SQLBulkResponse(outputNames,

    protected SQLBulkResponse createResponseFromResult(SQLBulkRequest request,
                                                       String[] outputNames,
                                                       Long rowCount,
                                                       @Nullable DataType[] types) {
        // this is just a post-apocalypse-error... should be thrown in the analyzer/planner
        throw new UnsupportedOperationException("statement cannot be executed as bulk operation");

    protected SQLBulkResponse createResponseFromResult(Plan plan,
                                                       String[] outputNames,
                                                       List<TaskResult> result,
                                                       long requestCreationTime,
                                                       boolean includeTypesOnResponse) {
        DataType[] dataTypes = plan.outputTypes().toArray(new DataType[plan.outputTypes().size()]);
        SQLBulkResponse.Result[] results = new SQLBulkResponse.Result[result.size()];
        for (int i = 0, resultSize = result.size(); i < resultSize; i++) {
            TaskResult taskResult = result.get(i);
            assert taskResult instanceof RowCountResult : "Query operation not supported with bulk requests";
            results[i] = new SQLBulkResponse.Result(taskResult.errorMessage(), taskResult.rowCount());

        return new SQLBulkResponse(outputNames, results, requestCreationTime, dataTypes, includeTypesOnResponse);

    private class TransportHandler extends BaseTransportRequestHandler<SQLBulkRequest> {

        public SQLBulkRequest newInstance() {
            return new SQLBulkRequest();

        public void messageReceived(SQLBulkRequest request, final TransportChannel channel) throws Exception {
            // no need for a threaded listener
            ActionListener<SQLBulkResponse> listener = ResponseForwarder.forwardTo(channel);
            execute(request, listener);

        public String executor() {
            return ThreadPool.Names.SAME;

Related Classes of io.crate.action.sql.TransportSQLBulkAction$TransportHandler

Copyright © 2018 All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact