Package org.apache.hadoop.hive.ql.exec

Source Code of org.apache.hadoop.hive.ql.exec.MoveTask

/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hive.ql.exec;

import java.io.IOException;
import java.io.Serializable;
import java.security.AccessControlException;
import java.util.ArrayList;
import java.util.Arrays;

import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.loadFileDesc;
import org.apache.hadoop.hive.ql.plan.loadTableDesc;
import org.apache.hadoop.hive.ql.plan.moveWork;
import org.apache.hadoop.util.StringUtils;

/**
* MoveTask implementation
**/
public class MoveTask extends Task<moveWork> implements Serializable {

  private static final long serialVersionUID = 1L;

  public int execute() {

    try {
      // Do any hive related operations like moving tables and files
      // to appropriate locations
      loadFileDesc lfd = work.getLoadFileWork();
      if (lfd != null) {
        Path targetPath = new Path(lfd.getTargetDir());
        Path sourcePath = new Path(lfd.getSourceDir());
        FileSystem fs = sourcePath.getFileSystem(conf);
        if (lfd.getIsDfsDir()) {
          // Just do a rename on the URIs, they belong to the same FS
          String mesg = "Moving data to: " + lfd.getTargetDir();
          String mesg_detail = " from " +  lfd.getSourceDir();
          console.printInfo(mesg, mesg_detail);

          // delete the output directory if it already exists
          fs.delete(targetPath, true);
          // if source exists, rename. Otherwise, create a empty directory
          if (fs.exists(sourcePath)) {
            if (!fs.rename(sourcePath, targetPath))
              throw new HiveException ("Unable to rename: " + sourcePath + " to: "
                                       + targetPath);
          } else
            if (!fs.mkdirs(targetPath))
              throw new HiveException ("Unable to make directory: " + targetPath);
        } else {
          // This is a local file
          String mesg = "Copying data to local directory " + lfd.getTargetDir();
          String mesg_detail =  " from " + lfd.getSourceDir();
          console.printInfo(mesg, mesg_detail);

          // delete the existing dest directory
          LocalFileSystem dstFs = FileSystem.getLocal(conf);

          if(dstFs.delete(targetPath, true) || !dstFs.exists(targetPath)) {
            console.printInfo(mesg, mesg_detail);
            // if source exists, rename. Otherwise, create a empty directory
            if (fs.exists(sourcePath))
              fs.copyToLocalFile(sourcePath, targetPath);
            else {
              if (!dstFs.mkdirs(targetPath))
                throw new HiveException ("Unable to make local directory: " + targetPath);
            }
          } else {
            throw new AccessControlException("Unable to delete the existing destination directory: " + targetPath);
          }
        }
      }

      // Next we do this for tables and partitions
      loadTableDesc tbd = work.getLoadTableWork();
      if (tbd != null) {
        String mesg = "Loading data to table " + tbd.getTable().getTableName() +
        ((tbd.getPartitionSpec().size() > 0) ?
            " partition " + tbd.getPartitionSpec().toString() : "");
        String mesg_detail = " from " + tbd.getSourceDir();
        console.printInfo(mesg, mesg_detail);

        if (work.getCheckFileFormat()) {

          // Get all files from the src directory
          FileStatus [] dirs;
          ArrayList<FileStatus> files;
          FileSystem fs;
          try {
            fs = FileSystem.get
              (db.getTable(MetaStoreUtils.DEFAULT_DATABASE_NAME, tbd.getTable().getTableName()).getDataLocation(),conf);
            dirs = fs.globStatus(new Path(tbd.getSourceDir()));
            files = new ArrayList<FileStatus>();
            for (int i=0; (dirs != null && i<dirs.length); i++) {
              files.addAll(Arrays.asList(fs.listStatus(dirs[i].getPath())));
              // We only check one file, so exit the loop when we have at least one.
              if (files.size()>0) break;
            }
          } catch (IOException e) {
            throw new HiveException("addFiles: filesystem error in check phase", e);
          }

          // Check if the file format of the file matches that of the table.
          boolean flag = HiveFileFormatUtils.checkInputFormat(fs, conf, tbd.getTable().getInputFileFormatClass(), files);
          if(!flag)
            throw new HiveException("Wrong file format. Please check the file's format.");
        }

        if(tbd.getPartitionSpec().size() == 0) {
          db.loadTable(new Path(tbd.getSourceDir()), tbd.getTable().getTableName(), tbd.getReplace(), new Path(tbd.getTmpDir()));
        } else {
          LOG.info("Partition is: " + tbd.getPartitionSpec().toString());
          db.loadPartition(new Path(tbd.getSourceDir()), tbd.getTable().getTableName(),
              tbd.getPartitionSpec(), tbd.getReplace(), new Path(tbd.getTmpDir()));
        }
      }

      return 0;
    }
    catch (Exception e) {
      console.printError("Failed with exception " +   e.getMessage(), "\n" + StringUtils.stringifyException(e));
      return (1);
    }
  }
  /*
   * Does the move task involve moving to a local file system
   */
  public boolean isLocal() {
    loadTableDesc tbd = work.getLoadTableWork();
    if (tbd != null)
      return false;
   
    loadFileDesc lfd = work.getLoadFileWork();
    if (lfd != null) {
      if (lfd.getIsDfsDir()) {
        return false;
      }
      else
        return true;
    }
   
    return false;
  }
}
TOP

Related Classes of org.apache.hadoop.hive.ql.exec.MoveTask

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.