Package com.dp.nebula.wormhole.plugins.reader.sftpreader

Source Code of com.dp.nebula.wormhole.plugins.reader.sftpreader.SftpDirSplitter

package com.dp.nebula.wormhole.plugins.reader.sftpreader;

import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Vector;

import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Logger;

import com.dp.nebula.wormhole.common.AbstractSplitter;
import com.dp.nebula.wormhole.common.DefaultParam;
import com.dp.nebula.wormhole.common.interfaces.IParam;
import com.dp.nebula.wormhole.plugins.common.PCInfo;
import com.jcraft.jsch.Channel;
import com.jcraft.jsch.ChannelSftp;
import com.jcraft.jsch.JSch;
import com.jcraft.jsch.Session;
import com.jcraft.jsch.SftpATTRS;
import com.jcraft.jsch.SftpException;
import com.jcraft.jsch.ChannelSftp.LsEntry;

public class SftpDirSplitter extends AbstractSplitter {
  private static final Logger LOGGER = Logger
      .getLogger(SftpDirSplitter.class);
  private static final int UNDEFINED_PORT = -1;

  private String dir = "";
  private URI uri = null;
  private String scheme = "";
  private String host = "";
  private int port = 58422;
  private String path = "";
  private String username = "";
  private String password = "";
  private List<IParam> paramsList = null;

  private JSch jsch = null;
  private Session session = null;
  private Channel channel = null;
  private ChannelSftp c = null;

  @Override
  public void init(IParam jobParams) {
    super.init(jobParams);
    jsch = new JSch();
  }

  @SuppressWarnings("unchecked")
  @Override
  public List<IParam> split() {
    paramsList = new ArrayList<IParam>();
    dir = param.getValue(ParamKey.dir, null);
    if (dir == null) {
      LOGGER.error("Can't find the param [" + ParamKey.dir
          + "] in sftp-spliter-param.");
      return paramsList;
    }

    String[] dirs = StringUtils.split(dir, ',');
    for (int i = 0; i < dirs.length; i++) {
      String oneDir = dirs[i];
     
      // if (oneDir.endsWith("*")) {
      // oneDir = oneDir.substring(0, oneDir.lastIndexOf('*'));
      // }

      if (oneDir.endsWith("/")) {
        oneDir = oneDir.substring(0, oneDir.lastIndexOf('/'));
      }

      uri = URI.create(oneDir);
      scheme = uri.getScheme();
      host = uri.getHost();
      port = uri.getPort();
      path = uri.getPath();
      username = uri.getUserInfo();
      password = param.getValue(ParamKey.password, this.password);

      if (!scheme.equalsIgnoreCase("sftp") || StringUtils.isBlank(host)
          || UNDEFINED_PORT == port || StringUtils.isBlank(path)
          || StringUtils.isBlank(username)
          || StringUtils.isBlank(password)) {
        throw new IllegalArgumentException(
            "paramkey dir is not set properly, the correct sftp path format like: "
                + "sftp://[<user>@]<host>[:<port>]/<path>/<file>");
      }

      PCInfo pi = new PCInfo();
      pi.setIp(host);
      pi.setPort(port);
      pi.setUser(username);
      pi.setPwd(password);
      pi.setPath(path);

      try {
        session = jsch.getSession(username, host, port);
        session.setUserInfo(pi);
        session.connect();

        channel = session.openChannel("sftp");
        channel.connect();
        c = (ChannelSftp) channel;

        if (c != null) {
          Boolean containsWildcardCharacter = false;
          Boolean isDirectory = false;
          try {
            SftpATTRS ss = c.lstat(path);
            isDirectory = ss.isDir();
          } catch (SftpException e) {
            containsWildcardCharacter = true;
          }
         
          Vector<LsEntry> files = (Vector<LsEntry>) c.ls(path);
          for (LsEntry lsEntry : files) {
            SftpATTRS sftpAttrs = lsEntry.getAttrs();
           
            if (sftpAttrs == null) {
              throw new IllegalArgumentException(
                  "paramkey dir not found on the remote server: "
                      + lsEntry.getFilename());
            } else if (sftpAttrs.isDir()) {
              LOGGER.info(lsEntry.getFilename()
                  + " is a directory, permission string is "
                  + sftpAttrs.getPermissionsString());
            } else {
              LOGGER.info(lsEntry.getFilename()
                  + " is a file, permission string is "
                  + sftpAttrs.getPermissionsString());
            }
           
            if (lsEntry.getFilename().startsWith(".")
                || lsEntry.getFilename()
                    .startsWith("_")
                || lsEntry.getFilename().endsWith(
                    ".index"))
              continue;

            if (lsEntry.getAttrs().isDir()) {
              continue;
            }

            IParam oParams = param.clone();
           
            String dir = oneDir;
            String absolutePath = null;
            if (containsWildcardCharacter){
              dir = oneDir.substring(0, oneDir.lastIndexOf('/'));
              absolutePath = dir + "/"
              + lsEntry.getFilename();
            }else {
              if (!isDirectory){
                absolutePath = dir;
              }else{
                absolutePath = dir + "/" + lsEntry.getFilename();
              }
            }
           
            LOGGER.info(ParamKey.dir + " split filename:"
                + absolutePath + "\tfile length:"
                + lsEntry.getAttrs().getSize());

            oParams.putValue(ParamKey.dir, absolutePath);
            paramsList.add(oParams);
           
          }
        }
        c.disconnect();
        session.disconnect();

      } catch (Exception e) {
        c.disconnect();
        session.disconnect();
        LOGGER.error("something wrong with jsch:" + e.getCause());
      }
    }

    LOGGER.info("the number of splitted files: " + paramsList.size());
    return paramsList;
  }

  public static void main(String[] args) {
    SftpDirSplitter s = new SftpDirSplitter();
    HashMap<String, String> paramMap = new HashMap<String, String>();
    paramMap.put(ParamKey.dir,
        "sftp://hadoop@192.168.7.80:58422/home/hadoop/wormhole-sftp-test/test*");
    paramMap.put(ParamKey.password, "hadoopdev");
    IParam params = new DefaultParam(paramMap);
    s.init(params);
    s.split();
  }
}
TOP

Related Classes of com.dp.nebula.wormhole.plugins.reader.sftpreader.SftpDirSplitter

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.