Package org.apache.sqoop.manager

Source Code of org.apache.sqoop.manager.DirectPostgresqlManager$PostgresqlAsyncSink

/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.sqoop.manager;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.util.ArrayList;
import java.util.List;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import com.cloudera.sqoop.SqoopOptions;
import com.cloudera.sqoop.io.SplittableBufferedWriter;
import com.cloudera.sqoop.util.AsyncSink;
import com.cloudera.sqoop.util.DirectImportUtils;
import com.cloudera.sqoop.util.ErrorableAsyncSink;
import com.cloudera.sqoop.util.ErrorableThread;
import com.cloudera.sqoop.util.Executor;
import com.cloudera.sqoop.util.ImportException;
import com.cloudera.sqoop.util.JdbcUrl;
import com.cloudera.sqoop.util.LoggingAsyncSink;
import com.cloudera.sqoop.util.PerfCounters;

/**
* Manages direct dumps from Postgresql databases via psql COPY TO STDOUT
* commands.
*/
public class DirectPostgresqlManager
    extends com.cloudera.sqoop.manager.PostgresqlManager {
  public static final Log LOG = LogFactory.getLog(
      DirectPostgresqlManager.class.getName());

  public DirectPostgresqlManager(final SqoopOptions opts) {
    // Inform superclass that we're overriding import method via alt.
    // constructor.
    super(opts, true);
  }

  private static final String PSQL_CMD = "psql";

  /** Copies data directly into HDFS, adding the user's chosen line terminator
      char to each record.
    */
  static class PostgresqlAsyncSink extends ErrorableAsyncSink {
    private final SplittableBufferedWriter writer;
    private final PerfCounters counters;
    private final SqoopOptions options;

    PostgresqlAsyncSink(final SplittableBufferedWriter w,
        final SqoopOptions opts, final PerfCounters ctrs) {
      this.writer = w;
      this.options = opts;
      this.counters = ctrs;
    }

    public void processStream(InputStream is) {
      child = new PostgresqlStreamThread(is, writer, options, counters);
      child.start();
    }

    private static class PostgresqlStreamThread extends ErrorableThread {
      public static final Log LOG = LogFactory.getLog(
          PostgresqlStreamThread.class.getName());

      private final SplittableBufferedWriter writer;
      private final InputStream stream;
      private final SqoopOptions options;
      private final PerfCounters counters;

      PostgresqlStreamThread(final InputStream is,
          final SplittableBufferedWriter w,
          final SqoopOptions opts, final PerfCounters ctrs) {
        this.stream = is;
        this.writer = w;
        this.options = opts;
        this.counters = ctrs;
      }

      public void run() {
        BufferedReader r = null;
        SplittableBufferedWriter w = this.writer;

        char recordDelim = this.options.getOutputRecordDelim();

        try {
          r = new BufferedReader(new InputStreamReader(this.stream));

          // read/write transfer loop here.
          while (true) {
            String inLine = r.readLine();
            if (null == inLine) {
              break; // EOF
            }

            w.write(inLine);
            w.write(recordDelim);
            w.allowSplit();
            counters.addBytes(1 + inLine.length());
          }
        } catch (IOException ioe) {
          LOG.error("IOException reading from psql: " + ioe.toString());
          // set the error bit so our caller can see that something went wrong.
          setError();
        } finally {
          if (null != r) {
            try {
              r.close();
            } catch (IOException ioe) {
              LOG.info("Error closing FIFO stream: " + ioe.toString());
            }
          }

          if (null != w) {
            try {
              w.close();
            } catch (IOException ioe) {
              LOG.info("Error closing HDFS stream: " + ioe.toString());
            }
          }
        }
      }
    }
  }

  /**
   * Takes a list of columns and turns them into a string like
   * "col1, col2, col3...".
   */
  private String getColumnListStr(String [] cols) {
    if (null == cols) {
      return null;
    }

    StringBuilder sb = new StringBuilder();
    boolean first = true;
    for (String col : cols) {
      if (!first) {
        sb.append(", ");
      }
      sb.append(col);
      first = false;
    }

    return sb.toString();
  }

  /**
   * @return the Postgresql-specific SQL command to copy the
   * table ("COPY .... TO STDOUT").
   */
  private String getCopyCommand(String tableName) {

    // Format of this command is:
    //
    //     COPY table(col, col....) TO STDOUT
    // or  COPY ( query ) TO STDOUT
    //   WITH DELIMITER 'fieldsep'
    //   CSV
    //   QUOTE 'quotechar'
    //   ESCAPE 'escapechar'
    //   FORCE QUOTE col, col, col....

    StringBuilder sb = new StringBuilder();
    String [] cols = getColumnNames(tableName);

    String escapedTableName = escapeTableName(tableName);

    sb.append("COPY ");
    String whereClause = this.options.getWhereClause();
    if (whereClause != null && whereClause.length() > 0) {
      // Import from a SELECT QUERY
      sb.append("(");
      sb.append("SELECT ");
      if (null != cols) {
        sb.append(getColumnListStr(cols));
      } else {
        sb.append("*");
      }

      sb.append(" FROM ");
      sb.append(escapedTableName);
      sb.append(" WHERE ");
      sb.append(whereClause);
      sb.append(")");
    } else {
      // Import just the table.
      sb.append(escapedTableName);
      if (null != cols) {
        // specify columns.
        sb.append("(");
        sb.append(getColumnListStr(cols));
        sb.append(")");
      }
    }

    // Translate delimiter characters to '\ooo' octal representation.
    sb.append(" TO STDOUT WITH DELIMITER E'\\");
    sb.append(Integer.toString((int) this.options.getOutputFieldDelim(), 8));
    sb.append("' CSV ");
    if (this.options.getOutputEnclosedBy() != '\0') {
      sb.append("QUOTE E'\\");
      sb.append(Integer.toString((int) this.options.getOutputEnclosedBy(), 8));
      sb.append("' ");
    }
    if (this.options.getOutputEscapedBy() != '\0') {
      sb.append("ESCAPE E'\\");
      sb.append(Integer.toString((int) this.options.getOutputEscapedBy(), 8));
      sb.append("' ");
    }

    // add the "FORCE QUOTE col, col, col..." clause if quotes are required.
    if (null != cols && this.options.isOutputEncloseRequired()) {
      sb.append("FORCE QUOTE ");
      sb.append(getColumnListStr(cols));
    }

    sb.append(";");

    String copyCmd = sb.toString();
    LOG.debug("Copy command is " + copyCmd);
    return copyCmd;
  }

  /** Write the COPY command to a temp file.
    * @return the filename we wrote to.
    */
  private String writeCopyCommand(String command) throws IOException {
    String tmpDir = options.getTempDir();
    File tempFile = File.createTempFile("tmp-", ".sql", new File(tmpDir));
    BufferedWriter w = new BufferedWriter(
        new OutputStreamWriter(new FileOutputStream(tempFile)));
    w.write(command);
    w.newLine();
    w.close();
    return tempFile.toString();
  }

  /** Write the user's password to a file that is chmod 0600.
      @return the filename.
    */
  private String writePasswordFile(String password) throws IOException {

    String tmpDir = options.getTempDir();
    File tempFile = File.createTempFile("pgpass", ".pgpass", new File(tmpDir));
    LOG.debug("Writing password to tempfile: " + tempFile);

    // Make sure it's only readable by the current user.
    DirectImportUtils.setFilePermissions(tempFile, "0600");

    // Actually write the password data into the file.
    BufferedWriter w = new BufferedWriter(
        new OutputStreamWriter(new FileOutputStream(tempFile)));
    w.write("*:*:*:*:" + password);
    w.close();
    return tempFile.toString();
  }

  // TODO(aaron): Refactor this method to be much shorter.
  // CHECKSTYLE:OFF
  @Override
  /**
   * Import the table into HDFS by using psql to pull the data out of the db
   * via COPY FILE TO STDOUT.
   */
  public void importTable(com.cloudera.sqoop.manager.ImportJobContext context)
    throws IOException, ImportException {

    String tableName = context.getTableName();
    SqoopOptions options = context.getOptions();

    LOG.info("Beginning psql fast path import");

    if (options.getFileLayout() != SqoopOptions.FileLayout.TextFile) {
      // TODO(aaron): Support SequenceFile-based load-in
      LOG.warn("File import layout" + options.getFileLayout()
          + " is not supported by");
      LOG.warn("Postgresql direct import; import will proceed as text files.");
    }

    String commandFilename = null;
    String passwordFilename = null;
    Process p = null;
    AsyncSink sink = null;
    AsyncSink errSink = null;
    PerfCounters counters = new PerfCounters();

    try {
      // Get the COPY TABLE command to issue, write this to a file, and pass
      // it in to psql with -f filename.  Then make sure we delete this file
      // in our finally block.
      String copyCmd = getCopyCommand(tableName);
      commandFilename = writeCopyCommand(copyCmd);

      // Arguments to pass to psql on the command line.
      ArrayList<String> args = new ArrayList<String>();

      // Environment to pass to psql.
      List<String> envp = Executor.getCurEnvpStrings();

      // We need to parse the connect string URI to determine the database
      // name and the host and port. If the host is localhost and the port is
      // not specified, we don't want to pass this to psql, because we want to
      // force the use of a UNIX domain socket, not a TCP/IP socket.
      String connectString = options.getConnectString();
      String databaseName = JdbcUrl.getDatabaseName(connectString);
      String hostname = JdbcUrl.getHostName(connectString);
      int port = JdbcUrl.getPort(connectString);

      if (null == databaseName) {
        throw new ImportException("Could not determine database name");
      }

      LOG.info("Performing import of table " + tableName + " from database "
          + databaseName);
      args.add(PSQL_CMD); // requires that this is on the path.
      args.add("--tuples-only");
      args.add("--quiet");

      String username = options.getUsername();
      if (username != null) {
        args.add("--username");
        args.add(username);
        String password = options.getPassword();
        if (null != password) {
          passwordFilename = writePasswordFile(password);
          // Need to send PGPASSFILE environment variable specifying
          // location of our postgres file.
          envp.add("PGPASSFILE=" + passwordFilename);
        }
      }

      args.add("--host");
      args.add(hostname);

      if (port != -1) {
        args.add("--port");
        args.add(Integer.toString(port));
      }

      if (null != databaseName && databaseName.length() > 0) {
        args.add(databaseName);
      }

      // The COPY command is in a script file.
      args.add("-f");
      args.add(commandFilename);

      // begin the import in an external process.
      LOG.debug("Starting psql with arguments:");
      for (String arg : args) {
        LOG.debug("  " + arg);
      }

      // This writer will be closed by AsyncSink.
      SplittableBufferedWriter w = DirectImportUtils.createHdfsSink(
          options.getConf(), options, context);

      // Actually start the psql dump.
      p = Runtime.getRuntime().exec(args.toArray(new String[0]),
          envp.toArray(new String[0]));

      // read from the stdout pipe into the HDFS writer.
      InputStream is = p.getInputStream();
      sink = new PostgresqlAsyncSink(w, options, counters);

      LOG.debug("Starting stream sink");
      counters.startClock();
      sink.processStream(is);
      errSink = new LoggingAsyncSink(LOG);
      errSink.processStream(p.getErrorStream());
    } finally {
      // block until the process is done.
      LOG.debug("Waiting for process completion");
      int result = 0;
      if (null != p) {
        while (true) {
          try {
            result = p.waitFor();
          } catch (InterruptedException ie) {
            // interrupted; loop around.
            continue;
          }

          break;
        }
      }

      // Remove any password file we wrote
      if (null != passwordFilename) {
        if (!new File(passwordFilename).delete()) {
          LOG.error("Could not remove postgresql password file "
              + passwordFilename);
          LOG.error("You should remove this file to protect your credentials.");
        }
      }

      if (null != commandFilename) {
        // We wrote the COPY comand to a tmpfile. Remove it.
        if (!new File(commandFilename).delete()) {
          LOG.info("Could not remove temp file: " + commandFilename);
        }
      }

      // block until the stream sink is done too.
      int streamResult = 0;
      if (null != sink) {
        while (true) {
          try {
            streamResult = sink.join();
          } catch (InterruptedException ie) {
            // interrupted; loop around.
            continue;
          }

          break;
        }
      }

      // Attempt to block for stderr stream sink; errors are advisory.
      if (null != errSink) {
        try {
          if (0 != errSink.join()) {
            LOG.info("Encountered exception reading stderr stream");
          }
        } catch (InterruptedException ie) {
          LOG.info("Thread interrupted waiting for stderr to complete: "
              + ie.toString());
        }
      }

      LOG.info("Transfer loop complete.");

      if (0 != result) {
        throw new IOException("psql terminated with status "
            + Integer.toString(result));
      }

      if (0 != streamResult) {
        throw new IOException("Encountered exception in stream sink");
      }

      counters.stopClock();
      LOG.info("Transferred " + counters.toString());
    }
  }

  @Override
  public boolean supportsStagingForExport() {
    return false;
  }
  // CHECKSTYLE:ON
}
TOP

Related Classes of org.apache.sqoop.manager.DirectPostgresqlManager$PostgresqlAsyncSink

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.