Package org.apache.flink.api.common.io

Source Code of org.apache.flink.api.common.io.FileOutputFormat$AbstractConfigBuilder

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.common.io;

import java.io.FileNotFoundException;
import java.io.IOException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.flink.api.common.operators.base.FileDataSinkBase;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.FileSystem.WriteMode;

/**
* The abstract base class for all output formats that are file based. Contains the logic to open/close the target
* file streams.
*/
public abstract class FileOutputFormat<IT> implements OutputFormat<IT>, InitializeOnMaster, CleanupWhenUnsuccessful {
 
  private static final long serialVersionUID = 1L;

  // --------------------------------------------------------------------------------------------
 
  /**
   * Behavior for creating output directories.
   */
  public static enum OutputDirectoryMode {
   
    /** A directory is always created, regardless of number of write tasks. */
    ALWAYS, 
   
    /** A directory is only created for parallel output tasks, i.e., number of output tasks > 1.
     * If number of output tasks = 1, the output is written to a single file. */
    PARONLY
  }
 
  // --------------------------------------------------------------------------------------------

  private static WriteMode DEFAULT_WRITE_MODE;
 
  private static OutputDirectoryMode DEFAULT_OUTPUT_DIRECTORY_MODE;
 
 
  private static final void initDefaultsFromConfiguration() {
    final boolean overwrite = GlobalConfiguration.getBoolean(ConfigConstants.FILESYSTEM_DEFAULT_OVERWRITE_KEY,
      ConfigConstants.DEFAULT_FILESYSTEM_OVERWRITE);
 
    DEFAULT_WRITE_MODE = overwrite ? WriteMode.OVERWRITE : WriteMode.NO_OVERWRITE;
   
    final boolean alwaysCreateDirectory = GlobalConfiguration.getBoolean(ConfigConstants.FILESYSTEM_OUTPUT_ALWAYS_CREATE_DIRECTORY_KEY,
      ConfigConstants.DEFAULT_FILESYSTEM_ALWAYS_CREATE_DIRECTORY);
 
    DEFAULT_OUTPUT_DIRECTORY_MODE = alwaysCreateDirectory ? OutputDirectoryMode.ALWAYS : OutputDirectoryMode.PARONLY;
  }
 
  static {
    initDefaultsFromConfiguration();
  }
 
  // -------------------------------------------------------------------------------------------- 
 
  /**
   * The LOG for logging messages in this class.
   */
  private static final Logger LOG = LoggerFactory.getLogger(FileOutputFormat.class);
 
  /**
   * The key under which the name of the target path is stored in the configuration.
   */
  public static final String FILE_PARAMETER_KEY = "flink.output.file";
 
  /**
   * The path of the file to be written.
   */
  protected Path outputFilePath;
 
  /**
   * The write mode of the output. 
   */
  private WriteMode writeMode;
 
  /**
   * The output directory mode
   */
  private OutputDirectoryMode outputDirectoryMode;
 
  // --------------------------------------------------------------------------------------------
 
  /** The stream to which the data is written; */
  protected transient FSDataOutputStream stream;
 
  /** The path that is actually written to (may a a file in a the directory defined by {@code outputFilePath} ) */
  private transient Path actualFilePath;
 
  /** Flag indicating whether this format actually created a file, which should be removed on cleanup. */
  private transient boolean fileCreated;

  // --------------------------------------------------------------------------------------------
 
  public FileOutputFormat() {}
 
  public FileOutputFormat(Path outputPath) {
    this.outputFilePath = outputPath;
  }
 
  public void setOutputFilePath(Path path) {
    if (path == null) {
      throw new IllegalArgumentException("Output file path may not be null.");
    }
   
    this.outputFilePath = path;
  }
 
  public Path getOutputFilePath() {
    return this.outputFilePath;
  }

 
  public void setWriteMode(WriteMode mode) {
    if (mode == null) {
      throw new NullPointerException();
    }
   
    this.writeMode = mode;
  }
 
  public WriteMode getWriteMode() {
    return this.writeMode;
  }

 
  public void setOutputDirectoryMode(OutputDirectoryMode mode) {
    if (mode == null) {
      throw new NullPointerException();
    }
   
    this.outputDirectoryMode = mode;
  }
 
  public OutputDirectoryMode getOutputDirectoryMode() {
    return this.outputDirectoryMode;
  }
 
  // ----------------------------------------------------------------

  @Override
  public void configure(Configuration parameters) {
    // get the output file path, if it was not yet set
    if (this.outputFilePath == null) {
      // get the file parameter
      String filePath = parameters.getString(FILE_PARAMETER_KEY, null);
      if (filePath == null) {
        throw new IllegalArgumentException("The output path has been specified neither via constructor/setters" +
            ", nor via the Configuration.");
      }
     
      try {
        this.outputFilePath = new Path(filePath);
      }
      catch (RuntimeException rex) {
        throw new RuntimeException("Could not create a valid URI from the given file path name: " + rex.getMessage());
      }
    }
   
    // check if have not been set and use the defaults in that case
    if (this.writeMode == null) {
      this.writeMode = DEFAULT_WRITE_MODE;
    }
   
    if (this.outputDirectoryMode == null) {
      this.outputDirectoryMode = DEFAULT_OUTPUT_DIRECTORY_MODE;
    }
  }

 
  @Override
  public void open(int taskNumber, int numTasks) throws IOException {
    if (taskNumber < 0 || numTasks < 1) {
      throw new IllegalArgumentException("TaskNumber: " + taskNumber + ", numTasks: " + numTasks);
    }
   
    if (LOG.isDebugEnabled()) {
      LOG.debug("Opening stream for output (" + (taskNumber+1) + "/" + numTasks + "). WriteMode=" + writeMode +
          ", OutputDirectoryMode=" + outputDirectoryMode);
    }
   
    Path p = this.outputFilePath;
    if (p == null) {
      throw new IOException("The file path is null.");
    }
   
    final FileSystem fs = p.getFileSystem();

    // if this is a local file system, we need to initialize the local output directory here
    if (!fs.isDistributedFS()) {
     
      if (numTasks == 1 && outputDirectoryMode == OutputDirectoryMode.PARONLY) {
        // output should go to a single file
       
        // prepare local output path. checks for write mode and removes existing files in case of OVERWRITE mode
        if(!fs.initOutPathLocalFS(p, writeMode, false)) {
          // output preparation failed! Cancel task.
          throw new IOException("Output path '" + p.toString() + "' could not be initialized. Canceling task...");
        }
      }
      else {
        // numTasks > 1 || outDirMode == OutputDirectoryMode.ALWAYS
       
        if(!fs.initOutPathLocalFS(p, writeMode, true)) {
          // output preparation failed! Cancel task.
          throw new IOException("Output directory '" + p.toString() + "' could not be created. Canceling task...");
        }
      }
    }
     
     
    // Suffix the path with the parallel instance index, if needed
    this.actualFilePath = (numTasks > 1 || outputDirectoryMode == OutputDirectoryMode.ALWAYS) ? p.suffix("/" + (taskNumber+1)) : p;

    // create output file
    this.stream = fs.create(this.actualFilePath, writeMode == WriteMode.OVERWRITE);
   
    // at this point, the file creation must have succeeded, or an exception has been thrown
    this.fileCreated = true;
  }

  @Override
  public void close() throws IOException {
    final FSDataOutputStream s = this.stream;
    if (s != null) {
      this.stream = null;
      s.close();
    }
  }
 
  /**
   * Initialization of the distributed file system if it is used.
   *
   * @param parallelism The task parallelism.
   */
  @Override
  public void initializeGlobal(int parallelism) throws IOException {
    final Path path = getOutputFilePath();
    final FileSystem fs = path.getFileSystem();
   
    // only distributed file systems can be initialized at start-up time.
    if (fs.isDistributedFS()) {
     
      final WriteMode writeMode = getWriteMode();
      final OutputDirectoryMode outDirMode = getOutputDirectoryMode();

      if (parallelism == 1 && outDirMode == OutputDirectoryMode.PARONLY) {
        // output is not written in parallel and should be written to a single file.
        // prepare distributed output path
        if(!fs.initOutPathDistFS(path, writeMode, false)) {
          // output preparation failed! Cancel task.
          throw new IOException("Output path could not be initialized.");
        }

      } else {
        // output should be written to a directory

        // only distributed file systems can be initialized at start-up time.
        if(!fs.initOutPathDistFS(path, writeMode, true)) {
          throw new IOException("Output directory could not be created.");
        }
      }
    }
  }
 
  @Override
  public void tryCleanupOnError() {
    if (this.fileCreated) {
      this.fileCreated = false;
     
      try {
        FileSystem.get(this.actualFilePath.toUri()).delete(actualFilePath, false);
      } catch (FileNotFoundException e) {
        // ignore, may not be visible yet or may be already removed
      } catch (Throwable t) {
        LOG.error("Could not remove the incomplete file " + actualFilePath);
      }
    }
  }
 
  // ============================================================================================
 
  /**
   * Creates a configuration builder that can be used to set the input format's parameters to the config in a fluent
   * fashion.
   *
   * @return A config builder for setting parameters.
   */
  public static ConfigBuilder configureFileFormat(FileDataSinkBase<?> target) {
    return new ConfigBuilder(target.getParameters());
  }
 
  /**
   * A builder used to set parameters to the output format's configuration in a fluent way.
   */
  public static abstract class AbstractConfigBuilder<T> {
   
    /**
     * The configuration into which the parameters will be written.
     */
    protected final Configuration config;
   
    // --------------------------------------------------------------------
   
    /**
     * Creates a new builder for the given configuration.
     *
     * @param targetConfig The configuration into which the parameters will be written.
     */
    protected AbstractConfigBuilder(Configuration targetConfig) {
      this.config = targetConfig;
    }
  }
 
  /**
   * A builder used to set parameters to the input format's configuration in a fluent way.
   */
  public static class ConfigBuilder extends AbstractConfigBuilder<ConfigBuilder> {
   
    /**
     * Creates a new builder for the given configuration.
     *
     * @param targetConfig The configuration into which the parameters will be written.
     */
    protected ConfigBuilder(Configuration targetConfig) {
      super(targetConfig);
    }
  }
}
TOP

Related Classes of org.apache.flink.api.common.io.FileOutputFormat$AbstractConfigBuilder

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.