Package cascading.tuple

Source Code of cascading.tuple.TupleEntrySchemeIterator

/*
* Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
*
* Project and contact information: http://www.cascading.org/
*
* This file is part of the Cascading project.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cascading.tuple;

import java.io.Closeable;
import java.io.IOException;
import java.util.Collections;
import java.util.Set;

import cascading.flow.FlowProcess;
import cascading.scheme.ConcreteCall;
import cascading.scheme.Scheme;
import cascading.util.CloseableIterator;
import cascading.util.SingleCloseableInputIterator;
import cascading.util.Util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Class TupleEntrySchemeIterator is a helper class for wrapping a {@link Scheme} instance, calling
* {@link Scheme#source(cascading.flow.FlowProcess, cascading.scheme.SourceCall)} on every call to
* {@link #next()}. The behavior can be controlled via properties defined in {@link TupleEntrySchemeIteratorProps}.
* <p/>
* Use this class inside a custom {@link cascading.tap.Tap} when overriding the
* {@link cascading.tap.Tap#openForRead(cascading.flow.FlowProcess)} method.
*/
public class TupleEntrySchemeIterator<Config, Input> extends TupleEntryIterator
  {
  /** Field LOG */
  private static final Logger LOG = LoggerFactory.getLogger( TupleEntrySchemeIterator.class );

  private final FlowProcess<? extends Config> flowProcess;
  private final Scheme scheme;
  private final CloseableIterator<Input> inputIterator;
  private final Set<Class<? extends Exception>> permittedExceptions;
  private ConcreteCall sourceCall;

  private String identifier;
  private boolean isComplete = false;
  private boolean hasWaiting = false;
  private TupleException currentException;

  public TupleEntrySchemeIterator( FlowProcess<? extends Config> flowProcess, Scheme scheme, Input input )
    {
    this( flowProcess, scheme, input, null );
    }

  public TupleEntrySchemeIterator( FlowProcess<? extends Config> flowProcess, Scheme scheme, Input input, String identifier )
    {
    this( flowProcess, scheme, (CloseableIterator<Input>) new SingleCloseableInputIterator( (Closeable) input ), identifier );
    }

  public TupleEntrySchemeIterator( FlowProcess<? extends Config> flowProcess, Scheme scheme, CloseableIterator<Input> inputIterator )
    {
    this( flowProcess, scheme, inputIterator, null );
    }

  public TupleEntrySchemeIterator( FlowProcess<? extends Config> flowProcess, Scheme scheme, CloseableIterator<Input> inputIterator, String identifier )
    {
    super( scheme.getSourceFields() );
    this.flowProcess = flowProcess;
    this.scheme = scheme;
    this.inputIterator = inputIterator;
    this.identifier = identifier;

    Object permittedExceptions = flowProcess.getProperty( TupleEntrySchemeIteratorProps.PERMITTED_EXCEPTIONS );

    if( permittedExceptions != null )
      this.permittedExceptions = Util.asClasses( permittedExceptions.toString(), "unable to load permitted exception class" );
    else
      this.permittedExceptions = Collections.emptySet();

    if( this.identifier == null || this.identifier.isEmpty() )
      this.identifier = "'unknown'";

    if( !inputIterator.hasNext() )
      {
      isComplete = true;
      return;
      }

    sourceCall = new ConcreteCall();

    sourceCall.setIncomingEntry( getTupleEntry() );
    sourceCall.setInput( wrapInput( inputIterator.next() ) );

    try
      {
      this.scheme.sourcePrepare( flowProcess, sourceCall );
      }
    catch( IOException exception )
      {
      throw new TupleException( "unable to prepare source for input identifier: " + this.identifier, exception );
      }
    }

  protected FlowProcess<? extends Config> getFlowProcess()
    {
    return flowProcess;
    }

  protected Input wrapInput( Input input )
    {
    return input;
    }

  @Override
  public boolean hasNext()
    {
    if( isComplete )
      return false;

    if( hasWaiting )
      return true;

    try
      {
      getNext();
      }
    catch( Exception exception )
      {
      if( identifier == null || identifier.isEmpty() )
        identifier = "'unknown'";

      if( permittedExceptions.contains( exception.getClass() ) )
        {
        LOG.warn( "Caught permitted exception while reading {}", identifier, exception );
        return false;
        }

      currentException = new TupleException( "unable to read from input identifier: " + identifier, exception );

      return true;
      }

    if( !hasWaiting )
      isComplete = true;

    return !isComplete;
    }

  private TupleEntry getNext() throws IOException
    {
    Tuples.asModifiable( sourceCall.getIncomingEntry().getTuple() );
    hasWaiting = scheme.source( flowProcess, sourceCall );

    if( !hasWaiting && inputIterator.hasNext() )
      {
      sourceCall.setInput( wrapInput( inputIterator.next() ) );

      return getNext();
      }

    return getTupleEntry();
    }

  @Override
  public TupleEntry next()
    {
    try
      {
      if( currentException != null )
        throw currentException;
      }
    finally
      {
      currentException = null; // data may be trapped
      }

    if( isComplete )
      throw new IllegalStateException( "no next element" );

    try
      {
      if( hasWaiting )
        return getTupleEntry();

      return getNext();
      }
    catch( Exception exception )
      {
      throw new TupleException( "unable to source from input identifier: " + identifier, exception );
      }
    finally
      {
      hasWaiting = false;
      }
    }

  @Override
  public void remove()
    {
    throw new UnsupportedOperationException( "may not remove elements from this iterator" );
    }

  @Override
  public void close() throws IOException
    {
    try
      {
      if( sourceCall != null )
        scheme.sourceCleanup( flowProcess, sourceCall );
      }
    finally
      {
      inputIterator.close();
      }
    }
  }
TOP

Related Classes of cascading.tuple.TupleEntrySchemeIterator

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.