package com.lingbobu.flashdb.transfer.impl;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.SocketException;
import org.apache.commons.net.ftp.FTP;
import org.apache.commons.net.ftp.FTPClient;
import org.apache.commons.net.ftp.FTPFile;
import org.apache.commons.net.ftp.FTPFileFilter;
import org.apache.commons.net.ftp.FTPReply;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* FTP 访问接口封装
*/
public class FtpClientEx {
private static final Logger LOG = LoggerFactory.getLogger(FtpClientEx.class);
public FtpClientEx(String hostname, int port, String username, String password) {
FTPClient ftpClient = new FTPClient();
ftpClient.setControlEncoding("UTF-8");
try {
ftpClient.connect(hostname, port);
ftpClient.login(username, password);
int reply = ftpClient.getReplyCode();
if (! FTPReply.isPositiveCompletion(reply)) {
close();
throw new RuntimeException("FTP server refused connection.");
}
ftpClient.enterLocalPassiveMode(); //可解决部分FTP传输不稳定的问题
if (! ftpClient.setFileType(FTP.BINARY_FILE_TYPE)) {
close();
throw new RuntimeException("FTP server refused setFileType.");
}
} catch (SocketException e) {
closeQuiet(ftpClient);
throw new RuntimeException(e);
} catch (IOException e) {
closeQuiet(ftpClient);
throw new RuntimeException(e);
}
this.ftpClient = ftpClient;
}
private FTPClient ftpClient;
public void close() {
if (ftpClient != null) {
closeQuiet(ftpClient);
ftpClient = null;
}
}
private static void closeQuiet(FTPClient ftpClient) {
try {
ftpClient.disconnect();
} catch (IOException e) {
}
}
/**
* 判断FTP上的路径是目录还是文件。若文件/目录不存在则抛异常
* @param pathname
* @return
*/
public boolean isDirectory(String pathname) {
try {
FTPFile file = ftpClient.mlistFile(pathname);
if (file == null)
throw new RuntimeException("FTP pathname '"+ pathname +"' not exists.");
return file.isDirectory();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
/**
* 切换当前工作目录
* @param pathname
*/
public void changeWorkingDirectory(String pathname) {
try {
if (! ftpClient.changeWorkingDirectory(pathname))
throw new RuntimeException("Cann't ftpClient.changeWorkingDirectory('"+ pathname +"').");
} catch (IOException e) {
throw new RuntimeException(e);
}
}
/**
* 读取当前目录下的指定文件
* @param filename 文件名(不含目录路径)
* @return
*/
public InputStream openFile(String filename) {
try {
InputStream stream = ftpClient.retrieveFileStream(filename);
if (stream == null)
throw new RuntimeException("Cann't ftpClient.openFile('"+ filename +"') because "+ftpClient.getReplyCode()+" # "+ftpClient.getReplyString());
return stream;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
/**
* 关闭已打开的文件流
* @param stream
*/
public void closeFileInputStream(InputStream stream, String filename) {
try {
stream.close();
if (! ftpClient.completePendingCommand())
throw new RuntimeException("Cann't completePendingCommand when closeFileInputStream(.., '"+filename+"')");
} catch (IOException e) {
throw new RuntimeException(e);
}
}
/**
* 列出当前目录下的所有文件(目录除外)
* @param fileExt
* @return
*/
public String[] listFiles(String fileExt) {
FTPFileFilter filter = null;
if (fileExt != null) {
final String suffix = "." + fileExt;
filter = new FTPFileFilter(){
@Override
public boolean accept(FTPFile file) {
if (! file.isFile()) return false;
return file.getName().endsWith(suffix);
}
};
}
FTPFile[] ftpFiles;
try {
ftpFiles = ftpClient.listFiles(null, filter);
} catch (IOException e) {
throw new RuntimeException(e);
}
if (ftpFiles == null || ftpFiles.length == 0)
return new String[0];
String[] result = new String[ftpFiles.length];
for (int i=0; i < ftpFiles.length; i++)
result[i] = ftpFiles[i].getName();
return result;
}
/**
* 列出当前目录下的所有子目录
* @param fileExt
* @return
*/
public String[] listDirectories() {
FTPFileFilter filter = new FTPFileFilter(){
@Override
public boolean accept(FTPFile file) {
return file.isDirectory();
}
};
FTPFile[] ftpFiles;
try {
ftpFiles = ftpClient.listFiles(null, filter);
} catch (IOException e) {
throw new RuntimeException(e);
}
if (ftpFiles == null || ftpFiles.length == 0)
return new String[0];
String[] result = new String[ftpFiles.length];
for (int i=0; i < ftpFiles.length; i++)
result[i] = ftpFiles[i].getName();
return result;
}
/**
* 创建目录. 若目录已存在则返回false
* @param pathname
*/
public boolean makeDirectory(String pathname) {
try {
return ftpClient.makeDirectory(pathname);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
/**
* 删除目录. 若目录不存在或非空则返回false
* @param pathname
*/
public boolean removeDirectory(String pathname) {
try {
return ftpClient.removeDirectory(pathname);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
/**
* 移动文件
* @param fromPathName
* @param toPathName
*/
public void moveFile(String fromPathName, String toPathName) {
try {
if (! ftpClient.rename(fromPathName, toPathName))
throw new RuntimeException("Cann't rename '"+ fromPathName +"' TO '"+ toPathName +"'");
} catch (IOException e) {
throw new RuntimeException(e);
}
}
/**
* 复制指定ftp目录下(直接所属)的所有文件到本地的知道目录
* @param ftpPath 远程FTP路径
* @param fileExt 文件扩展名
* @param localPath 本地路径
* @return
*/
public void copyDirectory(String ftpPath, String fileExt, String localPath) {
File localDir = new File(localPath);
if (! localDir.exists())
throw new RuntimeException("Local directory '"+ localPath +"' not exists.");
if (! localDir.isDirectory())
throw new RuntimeException("Local '"+ localPath +"' is not directory.");
try {
if (! ftpClient.changeWorkingDirectory(ftpPath))
throw new RuntimeException("Cann't ftpClient.changeWorkingDirectory('"+ ftpPath +"').");
FTPFileFilter filter = null;
if (fileExt != null) {
final String suffix = "." + fileExt;
filter = new FTPFileFilter(){
@Override
public boolean accept(FTPFile file) {
if (! file.isFile()) return false;
return file.getName().endsWith(suffix);
}
};
}
FTPFile[] ftpFiles = ftpClient.listFiles(null, filter);
if (ftpFiles == null || ftpFiles.length == 0)
return;
LOG.info("Starting copyDirectory. Total "+ ftpFiles.length +" files ...");
for (FTPFile ftpFile : ftpFiles) {
LOG.info("Copying file '"+ ftpFile.getName() +"' ...");
File outfile = new File(localDir, ftpFile.getName());
OutputStream oStream = new FileOutputStream(outfile);
try {
ftpClient.retrieveFile(ftpFile.getName(), oStream);
} finally {
oStream.close();
}
}
LOG.info("Completed copyDirectory.");
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
this.close();
}
}
}