
Source Code of

* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements.  See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License.  You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.


import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.WeakHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

import javax.jcr.RepositoryException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

* A caching data store that consists of {@link LocalCache} and {@link Backend}.
* {@link Backend} is single source of truth. All methods first try to fetch
* information from {@link LocalCache}. If record is not available in
* {@link LocalCache}, then it is fetched from {@link Backend} and saved to
* {@link LocalCache} for further access. This class is designed to work without
* {@link LocalCache} and then all information is fetched from {@link Backend}.
* To disable {@link LocalCache} set {@link #setCacheSize(long)} to 0. *
* Configuration:
* <pre>
* &lt;DataStore class="">
*     &lt;param name="{@link #setPath(String) path}" value="/data/datastore"/>
*     &lt;param name="{@link #setConfig(String) config}" value="${rep.home}/"/>
*     &lt;param name="{@link #setCacheSize(long) cacheSize}" value="68719476736"/>
*     &lt;param name="{@link #setSecret(String) secret}" value="123456"/>
*     &lt;param name="{@link #setCachePurgeTrigFactor(double)}" value="0.95d"/>
*     &lt;param name="{@link #setCachePurgeResizeFactor(double) cacheSize}" value="0.85d"/>
*     &lt;param name="{@link #setMinRecordLength(int) minRecordLength}" value="1024"/>
*     &lt;param name="{@link #setContinueOnAsyncUploadFailure(boolean) continueOnAsyncUploadFailure}" value="false"/>
*     &lt;param name="{@link #setConcurrentUploadsThreads(int) concurrentUploadsThreads}" value="10"/>
*     &lt;param name="{@link #setAsyncUploadLimit(int) asyncUploadLimit}" value="100"/>
* &lt/DataStore>
public abstract class CachingDataStore extends AbstractDataStore implements
        MultiDataStoreAware, AsyncUploadCallback {

     * Logger instance.
    private static final Logger LOG = LoggerFactory.getLogger(CachingDataStore.class);

     * The digest algorithm used to uniquely identify records.
    private static final String DIGEST = "SHA-1";

    private static final String DS_STORE = ".DS_Store";

     * Name of the directory used for temporary files. Must be at least 3
     * characters.
    private static final String TMP = "tmp";

     * All data identifiers that are currently in use are in this set until they
     * are garbage collected.
    protected Map<DataIdentifier, WeakReference<DataIdentifier>> inUse = Collections.synchronizedMap(new WeakHashMap<DataIdentifier, WeakReference<DataIdentifier>>());

    protected Backend backend;

     * The minimum size of an object that should be stored in this data store.
    private int minRecordLength = 16 * 1024;

    private String path;

    private File directory;

    private File tmpDir;

    private String secret;

     * The optional backend configuration.
    private String config;

     * The minimum modified date. If a file is accessed (read or write) with a
     * modified date older than this value, the modified date is updated to the
     * current time.
    private long minModifiedDate;

     * Cache purge trigger factor. Cache will undergo in auto-purge mode if
     * cache current size is greater than cachePurgeTrigFactor * cacheSize
    private double cachePurgeTrigFactor = 0.95d;

     * Cache resize factor. After auto-purge mode, cache current size would just
     * greater than cachePurgeResizeFactor * cacheSize cacheSize
    private double cachePurgeResizeFactor = 0.85d;

     * The number of bytes in the cache. The default value is 64 GB.
    private long cacheSize = 64L * 1024 * 1024 * 1024;

     * The local file system cache.
    private LocalCache cache;

     * Caching holding pending uploads
    private AsyncUploadCache asyncWriteCache;

    protected abstract Backend createBackend();

    protected abstract String getMarkerFile();

     * In {@link #init(String)},it resumes all incomplete asynchronous upload
     * from {@link AsyncUploadCache} and uploads them concurrently in multiple
     * threads. It throws {@link RepositoryException}, if file is not found in
     * local cache for that asynchronous upload. As far as code is concerned, it
     * is only possible when somebody has removed files from local cache
     * manually. If there is an exception and user want to proceed with
     * inconsistencies, set parameter continueOnAsyncUploadFailure to true in
     * repository.xml. This will ignore {@link RepositoryException} and log all
     * missing files and proceed after resetting {@link AsyncUploadCache} .
    private boolean continueOnAsyncUploadFailure;

     * The {@link #init(String)} methods checks for {@link #getMarkerFile()} and
     * if it doesn't exists migrates all files from fileystem to {@link Backend}
     * . This parameter governs number of threads which will upload files
     * concurrently to {@link Backend}.
    private int concurrentUploadsThreads = 10;

     * This parameter limits the number of asynchronous uploads slots to
     * {@link Backend}. Once this limit is reached, further uploads to
     * {@link Backend} are synchronous, till one of asynchronous uploads
     * completes and make asynchronous uploads slot available. To disable
     * asynchronous upload, set {@link #asyncUploadLimit} parameter to 0 in
     * repository.xml. By default it is 100
    private int asyncUploadLimit = 100;

     * Initialized the data store. If the path is not set, &lt;repository
     * home&gt;/repository/datastore is used. This directory is automatically
     * created if it does not yet exist. During first initialization, it upload
     * all files from local datastore to backed and local datastore act as a
     * local cache.
    public void init(String homeDir) throws RepositoryException {
        try {
            if (path == null) {
                path = homeDir + "/repository/datastore";
                tmpDir = new File(homeDir, "/repository/s3tmp");
            } else {
                // cache is moved from 'path' to 'path'/repository/datastore
                tmpDir = new File(path, "/repository/s3tmp");
                path = path + "/repository/datastore";
  "path=[" + path + ",] tmpPath= [" + tmpDir.getPath() + "]");
            directory = new File(path);
            if (!mkdirs(tmpDir)) {
      "tmp = " + tmpDir.getPath() + " cleaned");

            asyncWriteCache = new AsyncUploadCache();
            asyncWriteCache.init(homeDir, path, asyncUploadLimit);

            backend = createBackend();
            backend.init(this, path, config);
            String markerFileName = getMarkerFile();
            if (markerFileName != null) {
                // create marker file in homeDir to avoid deletion in cache
                // cleanup.
                File markerFile = new File(homeDir, markerFileName);
                if (!markerFile.exists()) {
          "load files from local cache");
                    try {
                    } catch (IOException e) {
                        throw new DataStoreException(
                            "Could not create marker file "
                                + markerFile.getAbsolutePath(), e);
                } else {
          "marker file = " + markerFile.getAbsolutePath()
                        + " exists");
            // upload any leftover async uploads to backend during last shutdown
            Set<String> fileList = asyncWriteCache.getAll();
            if (fileList != null && !fileList.isEmpty()) {
                List<String> errorFiles = new ArrayList<String>();
      "Uploading [" + fileList + "]  and size ["
                    + fileList.size() + "] from AsyncUploadCache.");
                long totalSize = 0;
                List<File> files = new ArrayList<File>(fileList.size());
                for (String fileName : fileList) {
                    File f = new File(path, fileName);
                    if (!f.exists()) {
                        LOG.error("Cannot upload pending file ["
                            + f.getAbsolutePath() + "]. File doesn't exist.");
                    } else {
                        totalSize += f.length();
                        files.add(new File(homeDir, fileName));
                new FilesUploader(files, totalSize, concurrentUploadsThreads,
                if (!continueOnAsyncUploadFailure && errorFiles.size() > 0) {
                    LOG.error("Pending uploads of files [" + errorFiles
                        + "] failed. Files do not exist in Local cache.");
                    LOG.error("To continue set [continueOnAsyncUploadFailure] to true in Datastore configuration in repository.xml."
                        + " There would be inconsistent data in repository due the missing files. ");
                    throw new RepositoryException(
                        "Cannot upload async uploads from local cache. Files not found.");
                } else {
                    if (errorFiles.size() > 0) {
                        LOG.error("Pending uploads of files ["
                            + errorFiles
                            + "] failed. Files do not exist"
                            + " in Local cache. Continuing as [continueOnAsyncUploadFailure] is set to true.");
          "Reseting AsyncWrite Cache list.");
            cache = new LocalCache(path, tmpDir.getAbsolutePath(), cacheSize,
                cachePurgeTrigFactor, cachePurgeResizeFactor, asyncWriteCache);
        } catch (Exception e) {
            throw new RepositoryException(e);

     * Creates a new data record in {@link Backend}. The stream is first
     * consumed and the contents are saved in a temporary file and the SHA-1
     * message digest of the stream is calculated. If a record with the same
     * SHA-1 digest (and length) is found then it is returned. Otherwise new
     * record is created in {@link Backend} and the temporary file is moved in
     * place to {@link LocalCache}.
     * @param input
     *            binary stream
     * @return {@link CachingDataRecord}
     * @throws DataStoreException
     *             if the record could not be created.
    public DataRecord addRecord(InputStream input) throws DataStoreException {
        File temporary = null;
        long startTime = System.currentTimeMillis();
        long length = 0;
        try {
            temporary = newTemporaryFile();
            DataIdentifier tempId = new DataIdentifier(temporary.getName());
            // Copy the stream to the temporary file and calculate the
            // stream length and the message digest of the stream
            MessageDigest digest = MessageDigest.getInstance(DIGEST);
            OutputStream output = new DigestOutputStream(new FileOutputStream(
                temporary), digest);
            try {
                length = IOUtils.copyLarge(input, output);
            } finally {
            long currTime = System.currentTimeMillis();
            DataIdentifier identifier = new DataIdentifier(
            if (LOG.isDebugEnabled()) {
                LOG.debug("getting SHA1 hash  [" + identifier + "] length ["
                    + length + "],   in [" + (currTime - startTime) + "] ms.");
            String fileName = getFileName(identifier);
            AsyncUploadCacheResult result = null;
            synchronized (this) {
                // check if async upload is already in progress
                if (!asyncWriteCache.hasEntry(fileName, true)) {
                    result =, temporary, true);
            if (LOG.isDebugEnabled()) {
                LOG.debug("storing  [" + identifier + "] in localCache took ["
                    + (System.currentTimeMillis() - currTime) + "] ms.");
            if (result != null) {
                if (result.canAsyncUpload()) {
                    backend.writeAsync(identifier, result.getFile(), this);
                } else {
                    backend.write(identifier, result.getFile());
            // this will also make sure that
            // tempId is not garbage collected until here
            if (LOG.isDebugEnabled()) {
                LOG.debug("write [" + identifier + "] length [" + length
                    + "],   in [" + (System.currentTimeMillis() - startTime)
                    + "] ms.");
            return new CachingDataRecord(this, identifier);
        } catch (NoSuchAlgorithmException e) {
            throw new DataStoreException(DIGEST + " not available", e);
        } catch (IOException e) {
            throw new DataStoreException("Could not add record", e);
        } finally {
            if (temporary != null) {
                // try to delete - but it's not a big deal if we can't

    public DataRecord getRecord(DataIdentifier identifier)
            throws DataStoreException {
        String fileName = getFileName(identifier);
        boolean touch = minModifiedDate > 0 ? true : false;
        synchronized (this) {
            try {
                if (asyncWriteCache.hasEntry(fileName, touch)) {
                    return new CachingDataRecord(this, identifier);
                } else if (cache.getFileIfStored(fileName) != null) {
                    if (touch) {
                        backend.exists(identifier, touch);
                    return new CachingDataRecord(this, identifier);
                } else if (backend.exists(identifier, touch)) {
                    return new CachingDataRecord(this, identifier);

            } catch (IOException ioe) {
                throw new DataStoreException("error in getting record ["
                    + identifier + "]", ioe);
        throw new DataStoreException("Record not found: " + identifier);

     * Get a data record for the given identifier or null it data record doesn't
     * exist in {@link Backend}
     * @param identifier
     *            identifier of record.
     * @return the {@link CachingDataRecord} or null.
    public DataRecord getRecordIfStored(DataIdentifier identifier)
            throws DataStoreException {
        String fileName = getFileName(identifier);
        boolean touch = minModifiedDate > 0 ? true : false;
        synchronized (this) {
            try {
                if (asyncWriteCache.hasEntry(fileName, touch)
                    || backend.exists(identifier, touch)) {
                    return new CachingDataRecord(this, identifier);
            } catch (IOException ioe) {
                throw new DataStoreException(ioe);
        return null;

    public void updateModifiedDateOnAccess(long before) {"minModifiedDate set to: " + before);
        minModifiedDate = before;

     * Retrieves all identifiers from {@link Backend}.
    public Iterator<DataIdentifier> getAllIdentifiers()
            throws DataStoreException {
        Set<DataIdentifier> ids = new HashSet<DataIdentifier>();
        for (String fileName : asyncWriteCache.getAll()) {
        Iterator<DataIdentifier> itr = backend.getAllIdentifiers();
        while (itr.hasNext()) {
        return ids.iterator();

     * This method deletes record from {@link Backend} and then from
     * {@link LocalCache}
    public void deleteRecord(DataIdentifier identifier)
            throws DataStoreException {
        String fileName = getFileName(identifier);
        synchronized (this) {
            try {
                // order is important here
            } catch (IOException ioe) {
                throw new DataStoreException(ioe);

    public synchronized int deleteAllOlderThan(long min)
            throws DataStoreException {
        Set<DataIdentifier> diSet = backend.deleteAllOlderThan(min);
        // remove entries from local cache
        for (DataIdentifier identifier : diSet) {
        try {
            for (String fileName : asyncWriteCache.deleteOlderThan(min)) {
        } catch (IOException e) {
            throw new DataStoreException(e);
        }"deleteAllOlderThan  exit. Deleted [" + diSet
            + "] records. Number of records deleted [" + diSet.size() + "]");
        return diSet.size();

     * Get stream of record from {@link LocalCache}. If record is not available
     * in {@link LocalCache}, this method fetches record from {@link Backend}
     * and stores it to {@link LocalCache}. Stream is then returned from cached
     * record.
    InputStream getStream(DataIdentifier identifier) throws DataStoreException {
        InputStream in = null;
        try {
            String fileName = getFileName(identifier);
            InputStream cached = cache.getIfStored(fileName);
            if (cached != null) {
                return cached;
            in =;
            return, in);
        } catch (IOException e) {
            throw new DataStoreException("IO Exception: " + identifier, e);
        } finally {

     * Return lastModified of record from {@link Backend} assuming
     * {@link Backend} as a single source of truth.
    public long getLastModified(DataIdentifier identifier)
            throws DataStoreException {
        if (LOG.isDebugEnabled()) {
            LOG.debug("accessed lastModified of identifier:" + identifier);
        String fileName = getFileName(identifier);
        long lastModified = asyncWriteCache.getLastModified(fileName);
        if (lastModified != 0) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("getlastModified of identifier [" + identifier
                    + "] from AsyncUploadCache = " + lastModified);
            return lastModified;

        } else {
            return backend.getLastModified(identifier);

     * Return the length of record from {@link LocalCache} if available,
     * otherwise retrieve it from {@link Backend}.
    public long getLength(DataIdentifier identifier) throws DataStoreException {
        String fileName = getFileName(identifier);
        Long length = cache.getFileLength(fileName);
        if (length != null) {
            return length.longValue();
        } else {
            InputStream in = null;
            InputStream cachedStream = null;
            try {
                in =;
                cachedStream =, in);
            } catch (IOException e) {
                throw new DataStoreException("IO Exception: " + identifier, e);
            } finally {
            length = cache.getFileLength(fileName);
            if (length != null) {
                return length.longValue();
        return backend.getLength(identifier);

    protected byte[] getOrCreateReferenceKey() throws DataStoreException {
        try {
            return secret.getBytes("UTF-8");
        } catch (UnsupportedEncodingException e) {
            throw new DataStoreException(e);

    public Set<String> getPendingUploads() {
        return asyncWriteCache.getAll();

    public void call(DataIdentifier identifier, File file,
            AsyncUploadCallback.RESULT resultCode) {
        String fileName = getFileName(identifier);
        if (AsyncUploadCallback.RESULT.SUCCESS.equals(resultCode)) {
            try {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Upload completed. [" + identifier + "].");
                AsyncUploadCacheResult result = asyncWriteCache.remove(fileName);
                if (result.doRequiresDelete()) {
                    // added record already marked for delete
            } catch (IOException ie) {
                LOG.warn("Cannot remove pending file upload. Dataidentifer [ "
                    + identifier + "], file [" + file.getAbsolutePath() + "]",
            } catch (DataStoreException dse) {
                LOG.warn("Cannot remove pending file upload. Dataidentifer [ "
                    + identifier + "], file [" + file.getAbsolutePath() + "]",
        } else if (AsyncUploadCallback.RESULT.FAILED.equals(resultCode)) {
            LOG.error("Async Upload failed. Dataidentifer [ " + identifier
                + "], file [" + file.getAbsolutePath() + "]");
        } else if (AsyncUploadCallback.RESULT.ABORTED.equals(resultCode)) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Async Upload Aborted. Dataidentifer [ " + identifier
                    + "], file [" + file.getAbsolutePath() + "]");
            try {
      "Async Upload Aborted. Dataidentifer [ " + identifier
                    + "], file [" + file.getAbsolutePath() + "] removed.");
            } catch (IOException ie) {
                LOG.warn("Cannot remove pending file upload. Dataidentifer [ "
                    + identifier + "], file [" + file.getAbsolutePath() + "]",

     * Returns a unique temporary file to be used for creating a new data
     * record.
    private File newTemporaryFile() throws IOException {
        return File.createTempFile(TMP, null, tmpDir);

     * Load files from {@link LocalCache} to {@link Backend}.
    private void uploadFilesFromCache() throws RepositoryException {
        ArrayList<File> files = new ArrayList<File>();
        listRecursive(files, directory);
        long totalSize = 0;
        for (File f : files) {
            totalSize += f.length();
        if (concurrentUploadsThreads > 1) {
            new FilesUploader(files, totalSize, concurrentUploadsThreads, false).upload();
        } else {
            uploadFilesInSingleThread(files, totalSize);

    private void uploadFilesInSingleThread(List<File> files, long totalSize)
            throws RepositoryException {
        long startTime = System.currentTimeMillis();"Upload:  {" + files.size() + "} files in single thread.");
        long currentCount = 0;
        long currentSize = 0;
        long time = System.currentTimeMillis();
        for (File f : files) {
            long now = System.currentTimeMillis();
            if (now > time + 5000) {
      "Uploaded:  {" + currentCount + "}/{" + files.size()
                    + "} files, {" + currentSize + "}/{" + totalSize
                    + "} size data");
                time = now;
            String name = f.getName();
            if (LOG.isDebugEnabled()) {
                LOG.debug("upload file = " + name);
            if (!name.startsWith(TMP) && !name.endsWith(DS_STORE)
                && f.length() > 0) {
                uploadFileToBackEnd(f, false);
            currentSize += f.length();
        long endTime = System.currentTimeMillis();"Uploaded:  {" + currentCount + "}/{" + files.size()
            + "} files, {" + currentSize + "}/{" + totalSize
            + "} size data, time taken {" + ((endTime - startTime) / 1000)
            + "} sec");

     * Traverse recursively and populate list with files.
    private static void listRecursive(List<File> list, File file) {
        File[] files = file.listFiles();
        if (files != null) {
            for (File f : files) {
                if (f.isDirectory()) {
                    listRecursive(list, f);
                } else {

     * Upload file from {@link LocalCache} to {@link Backend}.
     * @param f
     *            file to uploaded.
     * @throws DataStoreException
    private void uploadFileToBackEnd(File f, boolean updateAsyncUploadCache)
            throws DataStoreException {
        try {
            DataIdentifier identifier = new DataIdentifier(f.getName());
            backend.write(identifier, f);
            if (updateAsyncUploadCache) {
                String fileName = getFileName(identifier);
            if (LOG.isDebugEnabled()) {
                LOG.debug(f.getName() + "uploaded.");
        } catch (IOException ioe) {
            throw new DataStoreException(ioe);

     * Derive file name from identifier.
    private static String getFileName(DataIdentifier identifier) {
        String name = identifier.toString();
        return getFileName(name);

    private static String getFileName(String name) {
        return name.substring(0, 2) + "/" + name.substring(2, 4) + "/"
            + name.substring(4, 6) + "/" + name;

    private static DataIdentifier getIdentifier(String fileName) {
        return new DataIdentifier(
            fileName.substring(fileName.lastIndexOf("/") + 1));

    private void usesIdentifier(DataIdentifier identifier) {
        inUse.put(identifier, new WeakReference<DataIdentifier>(identifier));

    private static boolean mkdirs(File dir) throws IOException {
        if (dir.exists()) {
            if (dir.isFile()) {
                throw new IOException("Can not create a directory "
                    + "because a file exists with the same name: "
                    + dir.getAbsolutePath());
            return false;
        boolean created = dir.mkdirs();
        if (!created) {
            throw new IOException("Could not create directory: "
                + dir.getAbsolutePath());
        return created;

    public void clearInUse() {

    public boolean isInUse(DataIdentifier identifier) {
        return inUse.containsKey(identifier);

    public void close() throws DataStoreException {

     * Setter for configuration based secret
     * @param secret
     *            the secret used to sign reference binaries
    public void setSecret(String secret) {
        this.secret = secret;

     * Set the minimum object length.
     * @param minRecordLength
     *            the length
    public void setMinRecordLength(int minRecordLength) {
        this.minRecordLength = minRecordLength;

     * Return mininum object length.
    public int getMinRecordLength() {
        return minRecordLength;

     * Return path of configuration properties.
     * @return path of configuration properties.
    public String getConfig() {
        return config;

     * Set the configuration properties path.
     * @param config
     *            path of configuration properties.
    public void setConfig(String config) {
        this.config = config;

     * @return size of {@link LocalCache}.
    public long getCacheSize() {
        return cacheSize;

     * Set size of {@link LocalCache}.
     * @param cacheSize
     *            size of {@link LocalCache}.
    public void setCacheSize(long cacheSize) {
        this.cacheSize = cacheSize;

     * @return path of {@link LocalCache}.
    public String getPath() {
        return path;

     * Set path of {@link LocalCache}.
     * @param path
     *            of {@link LocalCache}.
    public void setPath(String path) {
        this.path = path;

     * @return Purge trigger factor of {@link LocalCache}.
    public double getCachePurgeTrigFactor() {
        return cachePurgeTrigFactor;

     * Set purge trigger factor of {@link LocalCache}.
     * @param cachePurgeTrigFactor
     *            purge trigger factor.
    public void setCachePurgeTrigFactor(double cachePurgeTrigFactor) {
        this.cachePurgeTrigFactor = cachePurgeTrigFactor;

     * @return Purge resize factor of {@link LocalCache}.
    public double getCachePurgeResizeFactor() {
        return cachePurgeResizeFactor;

     * Set purge resize factor of {@link LocalCache}.
     * @param cachePurgeResizeFactor
     *            purge resize factor.
    public void setCachePurgeResizeFactor(double cachePurgeResizeFactor) {
        this.cachePurgeResizeFactor = cachePurgeResizeFactor;

    public int getConcurrentUploadsThreads() {
        return concurrentUploadsThreads;

    public void setConcurrentUploadsThreads(int concurrentUploadsThreads) {
        this.concurrentUploadsThreads = concurrentUploadsThreads;

    public int getAsyncUploadLimit() {
        return asyncUploadLimit;

    public void setAsyncUploadLimit(int asyncUploadLimit) {
        this.asyncUploadLimit = asyncUploadLimit;

    public boolean isContinueOnAsyncUploadFailure() {
        return continueOnAsyncUploadFailure;

    public void setContinueOnAsyncUploadFailure(
            boolean continueOnAsyncUploadFailure) {
        this.continueOnAsyncUploadFailure = continueOnAsyncUploadFailure;

    public Backend getBackend() {
        return backend;

     * This class initiates files upload in multiple threads to backend.
    private class FilesUploader {
        final List<File> files;

        final long totalSize;

        volatile AtomicInteger currentCount = new AtomicInteger();

        volatile AtomicLong currentSize = new AtomicLong();

        volatile AtomicBoolean exceptionRaised = new AtomicBoolean();

        DataStoreException exception;

        final int threads;

        final boolean updateAsyncCache;

        FilesUploader(List<File> files, long totalSize, int threads,
                boolean updateAsyncCache) {
            this.files = files;
            this.threads = threads;
            this.totalSize = totalSize;
            this.updateAsyncCache = updateAsyncCache;

        void addCurrentCount(int delta) {

        void addCurrentSize(long delta) {

        synchronized void setException(DataStoreException exception) {
            this.exception = exception;

        boolean isExceptionRaised() {
            return exceptionRaised.get();

        void logProgress() {
  "Uploaded:  {" + currentCount.get() + "}/{" + files.size()
                + "} files, {" + currentSize.get() + "}/{" + totalSize
                + "} size data");

        void upload() throws DataStoreException {
            long startTime = System.currentTimeMillis();
  " Uploading " + files.size() + " using " + threads
                + " threads.");
            ExecutorService executor = Executors.newFixedThreadPool(threads,
                new NamedThreadFactory("backend-file-upload-worker"));
            int partitionSize = files.size() / (threads);
            int startIndex = 0;
            int endIndex = partitionSize;
            for (int i = 1; i <= threads; i++) {
                List<File> partitionFileList = Collections.unmodifiableList(files.subList(
                    startIndex, endIndex));
                FileUploaderThread fut = new FileUploaderThread(
                    partitionFileList, startIndex, endIndex, this,

                startIndex = endIndex;
                if (i == (threads - 1)) {
                    endIndex = files.size();
                } else {
                    endIndex = startIndex + partitionSize;
            // This will make the executor accept no new threads
            // and finish all existing threads in the queue

            try {
                // Wait until all threads are finish
                while (!isExceptionRaised()
                    && !executor.awaitTermination(15, TimeUnit.SECONDS)) {
            } catch (InterruptedException ie) {

            long endTime = System.currentTimeMillis();
  "Uploaded:  {" + currentCount.get() + "}/{" + files.size()
                + "} files, {" + currentSize.get() + "}/{" + totalSize
                + "} size data, time taken {" + ((endTime - startTime) / 1000)
                + "} sec");
            if (isExceptionRaised()) {
                executor.shutdownNow(); // Cancel currently executing tasks
                throw exception;


     * This class implements {@link Runnable} interface and uploads list of
     * files from startIndex to endIndex to {@link Backend}
    private class FileUploaderThread implements Runnable {
        final List<File> files;

        final FilesUploader filesUploader;

        final int startIndex;

        final int endIndex;

        final boolean updateAsyncCache;

        FileUploaderThread(List<File> files, int startIndex, int endIndex,
                FilesUploader controller, boolean updateAsyncCache) {
            this.files = files;
            this.filesUploader = controller;
            this.startIndex = startIndex;
            this.endIndex = endIndex;
            this.updateAsyncCache = updateAsyncCache;

        public void run() {
            long time = System.currentTimeMillis();
            if (LOG.isDebugEnabled()) {
                LOG.debug("Thread [ " + Thread.currentThread().getName()
                    + "]: Uploading files from startIndex[" + startIndex
                    + "] and endIndex [" + (endIndex - 1)
                    + "], both inclusive.");
            int uploadCount = 0;
            long uploadSize = 0;
            try {
                for (File f : files) {

                    if (filesUploader.isExceptionRaised()) {
                    String name = f.getName();
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("upload file = " + name);
                    if (!name.startsWith(TMP) && !name.endsWith(DS_STORE)
                        && f.length() > 0) {
                        uploadFileToBackEnd(f, updateAsyncCache);
                    uploadSize += f.length();
                    // update upload status at every 15 seconds.
                    long now = System.currentTimeMillis();
                    if (now > time + 15000) {
                        uploadCount = 0;
                        uploadSize = 0;
                        time = now;
                // update final state.
            } catch (DataStoreException e) {
                if (!filesUploader.isExceptionRaised()) {



Related Classes of

Copyright © 2018 All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact