Package com.backtype.hadoop.pail

Source Code of com.backtype.hadoop.pail.AbstractPail

package com.backtype.hadoop.pail;

import com.backtype.hadoop.formats.RecordInputStream;
import com.backtype.hadoop.formats.RecordOutputStream;
import com.backtype.support.Utils;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;


public abstract class AbstractPail {
    public static final String EXTENSION = ".pailfile";
    public static final String META_EXTENSION = ".metafile";
    public static final String META_TEMP_EXTENSION = ".metafiletmp";
    private static final String TEMP_EXTENSION = ".pailfiletmp";

    private class PailOutputStream implements RecordOutputStream {

        private Path tempFile;
        private Path finalFile;
        private RecordOutputStream delegate;

        public PailOutputStream(String userfilename, boolean overwrite) throws IOException {
            tempFile = new Path(_instance_root, userfilename + TEMP_EXTENSION);
            finalFile = new Path(_instance_root, userfilename + EXTENSION);
            if(finalFile.getName().equals(EXTENSION)) throw new IllegalArgumentException("Cannot create empty user file name");

            mkdirs(tempFile.getParent());
            if(exists(tempFile)) {
                delete(tempFile, false);
            }
            delegate = createOutputStream(tempFile);

            if(overwrite && exists(finalFile)) {
                delete(finalFile, false);
            }

            if(exists(finalFile)) {
                delegate.close();
                delete(tempFile, false);
                throw new IOException("File already exists " + finalFile.toString());
            }
        }

        public void writeRaw(byte[] record) throws IOException {
            writeRaw(record, 0, record.length);
        }

        public void close() throws IOException {
            delegate.close();
            if(!rename(tempFile, finalFile)) {
                throw new IOException("Unable to atomically create pailfile with rename " + tempFile.toString());
            }
        }

        public void writeRaw(byte[] record, int start, int length) throws IOException {
            delegate.writeRaw(record, start, length);
        }
    }

    private String _instance_root;

    public AbstractPail(String path) throws IOException {
        _instance_root = path;
    }

    public boolean exists(String userfilename) throws IOException {
        return exists(toStoredPath(userfilename));
    }

    public RecordOutputStream openWrite(String userfilename) throws IOException {
        return openWrite(userfilename, false);
    }

    public RecordOutputStream openWrite(String userfilename, boolean overwrite) throws IOException {
        return new PailOutputStream(userfilename, overwrite);
    }

    public RecordInputStream openRead(String userfilename) throws IOException {
        return createInputStream(toStoredPath(userfilename));
    }

    public void deleteMetadata(String metafilename) throws IOException {
        Path metaPath = toStoredMetadataPath(metafilename);
        delete(metaPath, false);
    }

    public void mkAttr(String attr) throws IOException {
        mkdirs(new Path(_instance_root + "/" + attr));
    }

    public void writeMetadata(String metafilename, String metadata) throws IOException {
        Path metaPath = toStoredMetadataPath(metafilename);
        Path metaTmpPath = toStoredMetadataTmpPath(metafilename);
        mkdirs(metaTmpPath.getParent());
        delete(metaPath, false);
        delete(metaTmpPath, false);
        RecordOutputStream os = createOutputStream(metaTmpPath);
        os.writeRaw(("M" + metadata).getBytes("UTF-8")); //ensure that it's not an empty record
        os.close();
        rename(metaTmpPath, metaPath);
    }

    public String getMetadata(String metafilename) throws IOException {
        Path metaPath = toStoredMetadataPath(metafilename);
        if(exists(metaPath)) {
            RecordInputStream is = createInputStream(metaPath);
            String metaStr = new String(is.readRawRecord(), "UTF-8");
            is.close();
            return metaStr.substring(1);
        } else {
            return null;
        }
    }

    protected abstract RecordInputStream createInputStream(Path path) throws IOException;
    protected abstract RecordOutputStream createOutputStream(Path path) throws IOException;
    protected abstract boolean delete(Path path, boolean recursive) throws IOException;
    protected abstract boolean exists(Path path) throws IOException;
    protected abstract boolean rename(Path source, Path dest) throws IOException;
    protected abstract boolean mkdirs(Path path) throws IOException;
    protected abstract FileStatus[] listStatus(Path path) throws IOException;

    public List<String> getUserFileNames() throws IOException {
        List<String> ret = new ArrayList<String>();
        getFilesHelper(new Path(_instance_root), "", EXTENSION, true, ret);
        return ret;
    }

    public Path toStoredPath(String userfilename) {
        return new Path(_instance_root, userfilename+EXTENSION);
    }

    public Path toStoredMetadataPath(String metadatafilename) {
        return new Path(_instance_root, metadatafilename+META_EXTENSION);
    }

    public Path toStoredMetadataTmpPath(String metadatafilename) {
        return new Path(_instance_root, metadatafilename+META_TEMP_EXTENSION);
    }


    public void delete(String userfilename) throws IOException {
        delete(toStoredPath(userfilename), false);
    }

    public List<Path> getStoredFiles() throws IOException {
        List<String> userfiles = getUserFileNames();
        List<Path> ret = new ArrayList<Path>();
        for(String u: userfiles) {
            ret.add(toStoredPath(u));
        }
        return ret;
    }


    public List<String> getMetadataFileNames() throws IOException {
        List<String> ret = new ArrayList<String>();
        getFilesHelper(new Path(_instance_root), "", META_EXTENSION, true, ret);
        return ret;
    }

    public List<Path> getStoredMetadataFiles() throws IOException {
        List<String> userfiles = getMetadataFileNames();
        List<Path> ret = new ArrayList<Path>();
        for(String u: userfiles) {
            ret.add(toStoredMetadataPath(u));
        }
        return ret;
    }

    public List<Path> getStoredFilesAndMetadata() throws IOException {
        List<String> relFiles = new ArrayList<String>();
        List<String> extensions = new ArrayList<String>();
        extensions.add(META_EXTENSION);
        extensions.add(EXTENSION);
        getFilesHelper(new Path(_instance_root), "", extensions, false, relFiles);
        List<Path> ret = new ArrayList<Path>();
        for(String rel: relFiles) {
            ret.add(new Path(_instance_root, rel));
        }
        return ret;
    }


    public List<Path> getStoredUnfinishedFiles() throws IOException {
        List<String> userfiles = new ArrayList<String>();
        getFilesHelper(new Path(_instance_root), "", TEMP_EXTENSION, true, userfiles);
        List<Path> ret = new ArrayList<Path>();
        for(String u: userfiles) {
            ret.add(new Path(_instance_root, u+TEMP_EXTENSION));
        }
        return ret;
    }

    protected List<String> readDir(String subdir, boolean dir) throws IOException {
        Path absDir;
        if(subdir.length()==0) {
            absDir = new Path(_instance_root);
        } else {
            absDir = new Path(_instance_root, subdir);
        }

        List<String> ret = new ArrayList<String>();
        FileStatus[] contents = listStatus(absDir);
        for(FileStatus fs: contents) {
            String name = fs.getPath().getName();
            if((fs.isDir() && dir || !fs.isDir() && !dir) && !name.contains("_")) {
                ret.add(name);
            }
        }
        return ret;
    }

    public List<String> getAttrsAtDir(String subdir) throws IOException {
        return readDir(subdir, true);
    }

    public List<String> getMetadataFileNames(String subdir) throws IOException {
        List<String> files = readDir(subdir, false);
        List<String> ret = new ArrayList<String>();
        for(String f: files) {
            if(f.endsWith(META_EXTENSION)) {
                ret.add(Utils.stripExtension(f, META_EXTENSION));
            }
        }
        return ret;
    }

    public String getInstanceRoot() {
        return _instance_root;
    }

    private String relify(String root, String name) {
        if(root.length()==0) return name;
        else return new Path(root, name).toString();
    }

    private void getFilesHelper(Path abs, String rel, String extension, boolean stripExtension, List<String> files) throws IOException {
        List<String> extensions = new ArrayList<String>();
        extensions.add(extension);
        getFilesHelper(abs, rel, extensions, stripExtension, files);
    }

    private void getFilesHelper(Path abs, String rel, List<String> extensions, boolean stripExtension, List<String> files) throws IOException {
        FileStatus[] contents = listStatus(abs);
        for(FileStatus stat: contents) {
            Path p = stat.getPath();
            if(stat.isDir()) {
                getFilesHelper(p, relify(rel, stat.getPath().getName()), extensions, stripExtension, files);
            } else {
                String filename = relify(rel, stat.getPath().getName());
                for(String extension: extensions) {
                    if(filename.endsWith(extension) && stat.getLen()>0) {
                        String toAdd;
                        if(stripExtension) {
                            toAdd = Utils.stripExtension(filename, extension);
                        } else {
                            toAdd = filename;
                        }
                        files.add(toAdd);
                        break;
                    }
                }
            }
        }
    }
}
TOP

Related Classes of com.backtype.hadoop.pail.AbstractPail

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.