Package cascading.flow.stream.element

Source Code of cascading.flow.stream.element.OperatorStage

/*
* Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
*
* Project and contact information: http://www.cascading.org/
*
* This file is part of the Cascading project.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cascading.flow.stream.element;

import cascading.flow.FlowElement;
import cascading.flow.FlowProcess;
import cascading.flow.planner.Scope;
import cascading.flow.stream.duct.Duct;
import cascading.operation.ConcreteCall;
import cascading.pipe.Operator;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntry;
import cascading.tuple.TupleEntryCollector;
import cascading.tuple.util.TupleBuilder;
import cascading.tuple.util.TupleViews;

import static cascading.tuple.util.TupleViews.*;

/**
*
*/
public abstract class OperatorStage<Incoming> extends ElementStage<Incoming, TupleEntry>
  {
  /**
   * In 2.2 the collector is now nulled before the
   * {@link cascading.operation.Operation#cleanup(cascading.flow.FlowProcess, cascading.operation.OperationCall)}
   * is called. This property retains the collector to remain compatible with 2.1.
   */
  public static final String RETAIN_COLLECTOR = "cascading.compatibility.retain.collector";

  protected ConcreteCall operationCall;
  protected TupleEntry incomingEntry;
  protected Fields argumentsSelector;
  protected TupleEntry argumentsEntry;
  protected Fields remainderFields;
  protected Fields outgoingSelector;
  protected TupleEntry outgoingEntry;

  protected TupleBuilder argumentsBuilder;
  protected TupleBuilder outgoingBuilder;

  private final boolean retainCollector;

  protected TupleEntryCollector outputCollector;

  public OperatorStage( FlowProcess flowProcess, FlowElement flowElement )
    {
    super( flowProcess, flowElement );

    this.retainCollector = Boolean.parseBoolean( flowProcess.getStringProperty( RETAIN_COLLECTOR ) );
    }

  public abstract Operator getOperator();

  protected abstract Fields getOutgoingSelector();

  protected Fields getOperationDeclaredFields()
    {
    return outgoingScopes.get( 0 ).getOperationDeclaredFields();
    }

  protected abstract Fields getIncomingPassThroughFields();

  protected abstract Fields getIncomingArgumentsFields();

  protected TupleBuilder createArgumentsBuilder( final Fields incomingFields, final Fields argumentsSelector )
    {
    if( incomingFields.isUnknown() )
      return new TupleBuilder()
      {
      @Override
      public Tuple makeResult( Tuple input, Tuple output )
        {
        return input.get( incomingFields, argumentsSelector );
        }
      };

    if( argumentsSelector.isAll() )
      return new TupleBuilder()
      {
      @Override
      public Tuple makeResult( Tuple input, Tuple output )
        {
        return input;
        }
      };

    if( argumentsSelector.isNone() )
      return new TupleBuilder()
      {
      @Override
      public Tuple makeResult( Tuple input, Tuple output )
        {
        return Tuple.NULL;
        }
      };

    final Fields inputDeclarationFields = Fields.asDeclaration( incomingFields );

    return new TupleBuilder()
    {
    Tuple result = createNarrow( inputDeclarationFields.getPos( argumentsSelector ) );

    @Override
    public Tuple makeResult( Tuple input, Tuple output )
      {
      return TupleViews.reset( result, input );
      }
    };
    }

  protected TupleBuilder createOutgoingBuilder( final Operator operator, final Fields incomingFields, final Fields argumentSelector, final Fields remainderFields, final Fields declaredFields, final Fields outgoingSelector )
    {
    final Fields inputDeclarationFields = Fields.asDeclaration( incomingFields );

    if( operator.getOutputSelector().isResults() )
      return new TupleBuilder()
      {
      @Override
      public Tuple makeResult( Tuple input, Tuple output )
        {
        return output;
        }
      };

    if( operator.getOutputSelector().isAll() && !( incomingFields.isUnknown() || declaredFields.isUnknown() ) )
      return new TupleBuilder()
      {
      Tuple result = createComposite( inputDeclarationFields, declaredFields );

      @Override
      public Tuple makeResult( Tuple input, Tuple output )
        {
        return TupleViews.reset( result, input, output );
        }
      };

    if( operator.getOutputSelector().isReplace() )
      {
      if( incomingFields.isUnknown() )
        return new TupleBuilder()
        {
        Fields resultFields = operator.getFieldDeclaration().isArguments() ? argumentSelector : declaredFields;

        @Override
        public Tuple makeResult( Tuple input, Tuple output )
          {
          Tuple result = new Tuple( input );

          result.set( Fields.UNKNOWN, resultFields, output );

          return result;
          }
        };

      return new TupleBuilder()
      {
      Fields resultFields = operator.getFieldDeclaration().isArguments() ? argumentSelector : declaredFields;
      Tuple result = createOverride( inputDeclarationFields, resultFields );

      @Override
      public Tuple makeResult( Tuple input, Tuple output )
        {
        return TupleViews.reset( result, input, output );
        }
      };
      }

    if( operator.getOutputSelector().isSwap() )
      {
      if( remainderFields.size() == 0 ) // the same as Fields.RESULTS
        return new TupleBuilder()
        {
        @Override
        public Tuple makeResult( Tuple input, Tuple output )
          {
          return output;
          }
        };
      else if( declaredFields.isUnknown() )
        return new TupleBuilder()
        {
        @Override
        public Tuple makeResult( Tuple input, Tuple output )
          {
          return input.get( incomingFields, remainderFields ).append( output );
          }
        };
      else
        return new TupleBuilder()
        {
        Tuple view = createNarrow( inputDeclarationFields.getPos( remainderFields ) );
        Tuple result = createComposite( Fields.asDeclaration( remainderFields ), declaredFields );

        @Override
        public Tuple makeResult( Tuple input, Tuple output )
          {
          TupleViews.reset( view, input );

          return TupleViews.reset( result, view, output );
          }
        };
      }

    if( incomingFields.isUnknown() || declaredFields.isUnknown() )
      return new TupleBuilder()
      {
      Fields selector = outgoingSelector.isUnknown() ? Fields.ALL : outgoingSelector;
      TupleEntry incoming = new TupleEntry( incomingFields, true );
      TupleEntry declared = new TupleEntry( declaredFields, true );

      @Override
      public Tuple makeResult( Tuple input, Tuple output )
        {
        incoming.setTuple( input );
        declared.setTuple( output );

        return TupleEntry.select( selector, incoming, declared );
        }
      };

    return new TupleBuilder()
    {
    Fields inputFields = operator.getFieldDeclaration().isArguments() ? Fields.mask( inputDeclarationFields, declaredFields ) : inputDeclarationFields;
    Tuple appended = createComposite( inputFields, declaredFields );
    Fields allFields = Fields.resolve( Fields.ALL, inputFields, declaredFields );
    Tuple result = createNarrow( allFields.getPos( outgoingSelector ), appended );


    @Override
    public Tuple makeResult( Tuple input, Tuple output )
      {
      TupleViews.reset( appended, input, output );

      return result;
      }
    };
    }

  @Override
  public void initialize()
    {
    Scope outgoingScope = outgoingScopes.get( 0 );

    operationCall = new ConcreteCall( outgoingScope.getArgumentsDeclarator(), outgoingScope.getOperationDeclaredFields() );

    argumentsSelector = outgoingScope.getArgumentsSelector();
    remainderFields = outgoingScope.getRemainderPassThroughFields();
    outgoingSelector = getOutgoingSelector();

    argumentsEntry = new TupleEntry( outgoingScope.getArgumentsDeclarator(), true );

    outgoingEntry = new TupleEntry( getOutgoingFields(), true );

    operationCall.setArguments( argumentsEntry );

    argumentsBuilder = createArgumentsBuilder( getIncomingArgumentsFields(), argumentsSelector );
    outgoingBuilder = createOutgoingBuilder( getOperator(), getIncomingPassThroughFields(), argumentsSelector, remainderFields, getOperationDeclaredFields(), outgoingSelector );
    }

  @Override
  public void prepare()
    {
    super.prepare(); // if fails, skip the this prepare

    ( (Operator) getFlowElement() ).getOperation().prepare( flowProcess, operationCall );
    }

  @Override
  public void complete( Duct previous )
    {
    try
      {
      ( (Operator) getFlowElement() ).getOperation().flush( flowProcess, operationCall );
      }
    finally
      {
      super.complete( previous );
      }
    }

  @Override
  public void cleanup()
    {
    if( !retainCollector ) // see comments for RETAIN_COLLECTOR
      operationCall.setOutputCollector( null );

    try
      {
      ( (Operator) getFlowElement() ).getOperation().cleanup( flowProcess, operationCall );
      }
    finally
      {
      super.cleanup(); // guarantee this happens
      }
    }
  }
TOP

Related Classes of cascading.flow.stream.element.OperatorStage

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.