Package voldemort.store.readonly.swapper

Source Code of voldemort.store.readonly.swapper.HttpStoreSwapper

package voldemort.store.readonly.swapper;

import java.io.File;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;

import org.apache.http.HttpResponse;
import org.apache.http.NameValuePair;
import org.apache.http.client.HttpClient;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.message.BasicNameValuePair;
import org.apache.log4j.Logger;

import voldemort.VoldemortException;
import voldemort.cluster.Cluster;
import voldemort.cluster.Node;
import voldemort.store.readonly.ReadOnlyUtils;
import voldemort.utils.VoldemortIOUtils;

import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;

public class HttpStoreSwapper extends StoreSwapper {

    private static final Logger logger = Logger.getLogger(HttpStoreSwapper.class);

    private final HttpClient httpClient;
    private final String readOnlyMgmtPath;
    private boolean deleteFailedFetch = false;
    private boolean rollbackFailedSwap = false;

    public HttpStoreSwapper(Cluster cluster,
                            ExecutorService executor,
                            HttpClient httpClient,
                            String readOnlyMgmtPath,
                            boolean deleteFailedFetch,
                            boolean rollbackFailedSwap) {
        super(cluster, executor);
        this.httpClient = httpClient;
        this.readOnlyMgmtPath = readOnlyMgmtPath;
        this.deleteFailedFetch = deleteFailedFetch;
        this.rollbackFailedSwap = rollbackFailedSwap;
    }

    public HttpStoreSwapper(Cluster cluster,
                            ExecutorService executor,
                            HttpClient httpClient,
                            String readOnlyMgmtPath) {
        super(cluster, executor);
        this.httpClient = httpClient;
        this.readOnlyMgmtPath = readOnlyMgmtPath;
    }

    @Override
    public List<String> invokeFetch(final String storeName,
                                    final String basePath,
                                    final long pushVersion) {
        // do fetch
        Map<Integer, Future<String>> fetchDirs = new HashMap<Integer, Future<String>>();
        for(final Node node: cluster.getNodes()) {
            fetchDirs.put(node.getId(), executor.submit(new Callable<String>() {

                public String call() throws Exception {
                    String url = node.getHttpUrl() + "/" + readOnlyMgmtPath;
                    HttpPost post = new HttpPost(url);

                    List<NameValuePair> params = new ArrayList<NameValuePair>();
                    params.add(new BasicNameValuePair("operation", "fetch"));
                    String storeDir = basePath + "/node-" + node.getId();
                    params.add(new BasicNameValuePair("dir", storeDir));
                    params.add(new BasicNameValuePair("store", storeName));
                    if(pushVersion > 0)
                        params.add(new BasicNameValuePair("pushVersion", Long.toString(pushVersion)));
                    post.setEntity(new UrlEncodedFormEntity(params));

                    logger.info("Invoking fetch for node " + node.getId() + " for " + storeDir);

                    HttpResponse httpResponse = null;
                    try {
                        httpResponse = httpClient.execute(post);
                        int responseCode = httpResponse.getStatusLine().getStatusCode();
                        InputStream is = httpResponse.getEntity().getContent();
                        String response = VoldemortIOUtils.toString(is, 30000);

                        if(responseCode != HttpURLConnection.HTTP_OK)
                            throw new VoldemortException("Fetch request on node "
                                                         + node.getId()
                                                         + " ("
                                                         + url
                                                         + ") failed: "
                                                         + httpResponse.getStatusLine()
                                                                       .getReasonPhrase());
                        logger.info("Fetch succeeded on node " + node.getId());
                        return response.trim();
                    } finally {
                        VoldemortIOUtils.closeQuietly(httpResponse);
                    }
                }
            }));
        }

        // wait for all operations to complete successfully
        TreeMap<Integer, String> results = Maps.newTreeMap();
        HashMap<Integer, Exception> exceptions = Maps.newHashMap();

        for(int nodeId = 0; nodeId < cluster.getNumberOfNodes(); nodeId++) {
            Future<String> val = fetchDirs.get(nodeId);
            try {
                results.put(nodeId, val.get());
            } catch(Exception e) {
                exceptions.put(nodeId, new VoldemortException(e));
            }
        }

        if(!exceptions.isEmpty()) {

            if(deleteFailedFetch) {
                // Delete data from successful nodes
                for(int successfulNodeId: results.keySet()) {
                    HttpResponse httpResponse = null;
                    try {
                        String url = cluster.getNodeById(successfulNodeId).getHttpUrl() + "/"
                                     + readOnlyMgmtPath;
                        HttpPost post = new HttpPost(url);

                        List<NameValuePair> params = new ArrayList<NameValuePair>();
                        params.add(new BasicNameValuePair("operation", "failed-fetch"));
                        params.add(new BasicNameValuePair("dir", results.get(successfulNodeId)));
                        params.add(new BasicNameValuePair("store", storeName));
                        post.setEntity(new UrlEncodedFormEntity(params));

                        logger.info("Deleting fetched data from node " + successfulNodeId);

                        httpResponse = httpClient.execute(post);
                        int responseCode = httpResponse.getStatusLine().getStatusCode();
                        String response = httpResponse.getStatusLine().getReasonPhrase();

                        if(responseCode == 200) {
                            logger.info("Deleted successfully on node " + successfulNodeId);
                        } else {
                            throw new VoldemortException(response);
                        }
                    } catch(Exception e) {
                        logger.error("Exception thrown during delete operation on node "
                                     + successfulNodeId + " : ", e);
                    } finally {
                        VoldemortIOUtils.closeQuietly(httpResponse);
                    }
                }
            }

            // Finally log the errors for the user
            for(int failedNodeId: exceptions.keySet()) {
                logger.error("Error on node " + failedNodeId + " during push : ",
                             exceptions.get(failedNodeId));
            }

            throw new VoldemortException("Exception during pushes to nodes "
                                         + Joiner.on(",").join(exceptions.keySet()) + " failed");
        }

        return Lists.newArrayList(results.values());
    }

    @Override
    public void invokeSwap(final String storeName, final List<String> fetchFiles) {
        // do swap in parallel
        Map<Integer, String> previousDirs = new HashMap<Integer, String>();
        HashMap<Integer, Exception> exceptions = Maps.newHashMap();

        for(final Node node: cluster.getNodes()) {
            HttpResponse httpResponse = null;
            try {
                String url = node.getHttpUrl() + "/" + readOnlyMgmtPath;
                HttpPost post = new HttpPost(url);

                List<NameValuePair> params = new ArrayList<NameValuePair>();
                params.add(new BasicNameValuePair("operation", "swap"));
                String dir = fetchFiles.get(node.getId());
                logger.info("Attempting swap for node " + node.getId() + " dir = " + dir);
                params.add(new BasicNameValuePair("dir", dir));
                params.add(new BasicNameValuePair("store", storeName));
                post.setEntity(new UrlEncodedFormEntity(params));

                httpResponse = httpClient.execute(post);
                int responseCode = httpResponse.getStatusLine().getStatusCode();
                String previousDir = VoldemortIOUtils.toString(httpResponse.getEntity()
                                                                           .getContent(), 30000);

                if(responseCode != HttpURLConnection.HTTP_OK)
                    throw new VoldemortException("Swap request on node " + node.getId() + " ("
                                                 + url + ") failed: "
                                                 + httpResponse.getStatusLine().getReasonPhrase());
                logger.info("Swap succeeded on node " + node.getId());
                previousDirs.put(node.getId(), previousDir);
            } catch(Exception e) {
                exceptions.put(node.getId(), e);
                logger.error("Exception thrown during swap operation on node " + node.getId()
                             + ": ", e);
            } finally {
                VoldemortIOUtils.closeQuietly(httpResponse, node.toString());
            }
        }

        if(!exceptions.isEmpty()) {
            if(rollbackFailedSwap) {
                // Rollback data on successful nodes
                for(int successfulNodeId: previousDirs.keySet()) {
                    HttpResponse httpResponse = null;
                    try {
                        String url = cluster.getNodeById(successfulNodeId).getHttpUrl() + "/"
                                     + readOnlyMgmtPath;
                        HttpPost post = new HttpPost(url);

                        List<NameValuePair> params = new ArrayList<NameValuePair>();
                        params.add(new BasicNameValuePair("operation", "rollback"));
                        params.add(new BasicNameValuePair("store", storeName));
                        params.add(new BasicNameValuePair("pushVersion",
                                                          Long.toString(ReadOnlyUtils.getVersionId(new File(previousDirs.get(successfulNodeId))))));
                        post.setEntity(new UrlEncodedFormEntity(params));

                        logger.info("Rolling back data on successful node " + successfulNodeId);

                        httpResponse = httpClient.execute(post);
                        int responseCode = httpResponse.getStatusLine().getStatusCode();
                        String response = httpResponse.getStatusLine().getReasonPhrase();

                        if(responseCode == 200) {
                            logger.info("Rollback succeeded for node " + successfulNodeId);
                        } else {
                            throw new VoldemortException(response);
                        }
                    } catch(Exception e) {
                        logger.error("Exception thrown during rollback ( after swap ) operation on node "
                                             + successfulNodeId + " : ",
                                     e);
                    } finally {
                        VoldemortIOUtils.closeQuietly(httpResponse);
                    }
                }
            }

            // Finally log the errors for the user
            for(int failedNodeId: exceptions.keySet()) {
                logger.error("Error on node " + failedNodeId + " during swap : ",
                             exceptions.get(failedNodeId));
            }

            throw new VoldemortException("Exception during swaps on nodes "
                                         + Joiner.on(",").join(exceptions.keySet()) + " failed");
        }

    }

    @Override
    public void invokeRollback(String storeName, final long pushVersion) {
        Exception exception = null;
        for(Node node: cluster.getNodes()) {
            HttpResponse httpResponse = null;
            try {
                logger.info("Attempting rollback for node " + node.getId() + " storeName = "
                            + storeName);
                String url = node.getHttpUrl() + "/" + readOnlyMgmtPath;
                HttpPost post = new HttpPost(url);

                List<NameValuePair> params = new ArrayList<NameValuePair>();
                params.add(new BasicNameValuePair("operation", "rollback"));
                params.add(new BasicNameValuePair("store", storeName));
                params.add(new BasicNameValuePair("pushVersion", Long.toString(pushVersion)));
                post.setEntity(new UrlEncodedFormEntity(params));

                httpResponse = httpClient.execute(post);
                int responseCode = httpResponse.getStatusLine().getStatusCode();
                String response = httpResponse.getStatusLine().getReasonPhrase();
                if(responseCode == 200) {
                    logger.info("Rollback succeeded for node " + node.getId());
                } else {
                    throw new VoldemortException(response);
                }
            } catch(Exception e) {
                exception = e;
                logger.error("Exception thrown during rollback operation on node " + node.getId()
                             + ": ", e);
            } finally {
                VoldemortIOUtils.closeQuietly(httpResponse, String.valueOf(node));
            }
        }

        if(exception != null)
            throw new VoldemortException(exception);
    }

}
TOP

Related Classes of voldemort.store.readonly.swapper.HttpStoreSwapper

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.