Package io.crate.operation.collect

Source Code of io.crate.operation.collect.ShardProjectorChain

/*
* Licensed to CRATE Technology GmbH ("Crate") under one or more contributor
* license agreements.  See the NOTICE file distributed with this work for
* additional information regarding copyright ownership.  Crate licenses
* this file to you under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.  You may
* obtain a copy of the License at
*
*   http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
* License for the specific language governing permissions and limitations
* under the License.
*
* However, if you have executed another commercial license agreement
* with Crate these terms will supersede the license and you may use the
* software solely pursuant to the terms of the relevant commercial agreement.
*/

package io.crate.operation.collect;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import io.crate.operation.projectors.CollectingProjector;
import io.crate.operation.projectors.ResultProvider;
import io.crate.operation.projectors.ProjectionToProjectorVisitor;
import io.crate.operation.projectors.Projector;
import io.crate.planner.RowGranularity;
import io.crate.planner.projection.Projection;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

/**
* a chain of connected projectors data is flowing through
* with separate chains, zero or more chains to be executed on data shards
* connected to the final chain of which there can be only one,
* executed on a node.
*
* Currently only one projector can be executed on a shard
*
* <pre>
* Data -> Shard chain 0 \
*                        \
* Data -> Shard chain 1 ----> Node chain --> Result
*                        /
* Data -> Shard chain 2 /
* </pre>
*
* If there is no projector to be executed on a shard,
* all data will go directly to the node chain.
* <p>
* Usage:
*
* <ul>
*     <li> construct one from a list of projections
*     <li> get a shard projector by calling {@linkplain #newShardDownstreamProjector(io.crate.operation.projectors.ProjectionToProjectorVisitor)}
*          from a shard context. do this for every shard you have
*     <li> call {@linkplain #startProjections()}
*     <li> feed data to the shard projectors
*     <li> get your result from {@linkplain #result()}
*
*/
public class ShardProjectorChain implements ResultProvider {

    private final List<Projection> projections;
    protected final List<Projector> shardProjectors;
    protected final List<Projector> nodeProjectors;
    private Projector firstNodeProjector;
    private ResultProvider lastProjector;
    private int shardProjectionsIndex = -1;


    public ShardProjectorChain(int numShards, List<Projection> projections, ProjectionToProjectorVisitor nodeProjectorVisitor) {
        this.projections = projections;
        nodeProjectors = new ArrayList<>();

        if (projections.size() == 0) {
            firstNodeProjector = new CollectingProjector();
            lastProjector = (ResultProvider) firstNodeProjector;
            nodeProjectors.add(firstNodeProjector);
            shardProjectors = ImmutableList.of();
            return;
        }

        int idx = 0;
        for (Projection projection : projections) {
            if (projection.requiredGranularity() == RowGranularity.SHARD) {
                shardProjectionsIndex = idx;
                break; // we can quit here since currently
                       // there can be only 1 projection on the shard
            }
            idx++;
        }

        Projector previousProjector = null;
        // create the node level projectors
        for (int i = shardProjectionsIndex + 1; i < projections.size(); i++) {
            Projector projector = nodeProjectorVisitor.process(projections.get(i));
            nodeProjectors.add(projector);
            if (previousProjector != null) {
                previousProjector.downstream(projector);
            } else {
                firstNodeProjector = projector;
            }
            previousProjector = projector;
        }
        if (shardProjectionsIndex >= 0) {
            shardProjectors = new ArrayList<>((shardProjectionsIndex + 1) * numShards);
            // shardprojector will be created later
            if (nodeProjectors.isEmpty()) {
                // no node projectors
                previousProjector = firstNodeProjector = new CollectingProjector();
            }
        } else {
            shardProjectors = ImmutableList.of();
        }
        assert previousProjector != null;
        if (previousProjector instanceof ResultProvider) {
            lastProjector = (ResultProvider) previousProjector;
        } else {
            lastProjector = new CollectingProjector();
            previousProjector.downstream((Projector) lastProjector);
        }
    }


    /**
     * Creates a new shard downstream chain if needed and returns a projector to be used as downstream
     * this method also calls startProjection on newly created shard level projectors.
     *
     * @param projectorVisitor the visitor to create projections out of a projection
     * @return a new projector connected to the internal chain
     */
    public Projector newShardDownstreamProjector(ProjectionToProjectorVisitor projectorVisitor) {
        if (shardProjectionsIndex < 0) {
            return firstNodeProjector;
        }
        Projector previousProjector = firstNodeProjector;
        Projector projector = null;
        for (int i = shardProjectionsIndex; i >= 0; i--) {
            projector = projectorVisitor.process(projections.get(i));
            projector.downstream(previousProjector);
            shardProjectors.add(projector);
            previousProjector = projector;
        }
        return projector;
    }

    public ListenableFuture<Object[][]> result() {
        return lastProjector.result();
    }

    @Override
    public Iterator<Object[]> iterator() throws IllegalStateException {
        return lastProjector.iterator();
    }

    public void startProjections() {
        for (Projector projector : Lists.reverse(nodeProjectors)) {
            projector.startProjection();
        }
        if (shardProjectionsIndex >= 0) {
            for (Projector p : shardProjectors) {
                p.startProjection();
            }
        }
    }
}
TOP

Related Classes of io.crate.operation.collect.ShardProjectorChain

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.