Package org.jboss.as.domain.controller

Source Code of org.jboss.as.domain.controller.DomainDeploymentHandler$DomainUpdate

/*
* JBoss, Home of Professional Open Source.
* Copyright 2010, Red Hat, Inc., and individual contributors
* as indicated by the @author tags. See the copyright.txt file in the
* distribution for a full listing of individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/

package org.jboss.as.domain.controller;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import org.jboss.as.domain.client.api.ServerIdentity;
import org.jboss.as.domain.client.api.deployment.DeploymentAction;
import org.jboss.as.domain.client.api.deployment.DeploymentPlan;
import org.jboss.as.domain.client.api.deployment.DeploymentSetPlan;
import org.jboss.as.domain.client.api.deployment.IncompleteDeploymentReplaceException;
import org.jboss.as.domain.client.api.deployment.InvalidDeploymentPlanException;
import org.jboss.as.domain.client.api.deployment.ServerGroupDeploymentPlan;
import org.jboss.as.domain.client.impl.DomainClientProtocol;
import org.jboss.as.domain.client.impl.DomainUpdateApplierResponse;
import org.jboss.as.domain.client.impl.deployment.DeploymentActionImpl;
import org.jboss.as.model.AbstractDomainModelUpdate;
import org.jboss.as.model.AbstractModelUpdate;
import org.jboss.as.model.AbstractServerModelUpdate;
import org.jboss.as.model.DeploymentUnitElement;
import org.jboss.as.model.DomainDeploymentAdd;
import org.jboss.as.model.DomainDeploymentFullReplaceUpdate;
import org.jboss.as.model.DomainDeploymentRedeployUpdate;
import org.jboss.as.model.DomainDeploymentRemove;
import org.jboss.as.model.DomainModel;
import org.jboss.as.model.DomainServerGroupUpdate;
import org.jboss.as.model.ServerGroupDeploymentAdd;
import org.jboss.as.model.ServerGroupDeploymentRemove;
import org.jboss.as.model.ServerGroupDeploymentReplaceUpdate;
import org.jboss.as.model.ServerGroupDeploymentStartStopUpdate;
import org.jboss.as.model.ServerGroupElement;
import org.jboss.as.model.UpdateResultHandlerResponse;
import org.jboss.logging.Logger;

/**
* Handles the DomainController's execution of a {@link DeploymentPlan}.
*
* @author Brian Stansberry
*/
public class DomainDeploymentHandler {

    static Logger logger = Logger.getLogger("org.jboss.as.domain.deployment");

    private final DomainController domainController;
    private final ExecutorService executorService;

    public DomainDeploymentHandler(final DomainController domainController, final ExecutorService executorService) {
        this.domainController = domainController;
        this.executorService = executorService;
    }

    public void executeDeploymentPlan(final DeploymentPlan plan, final BlockingQueue<List<StreamedResponse>> responseQueue) {

        // Run the plan in a separate thread so caller can process responses
        Runnable r = new Runnable() {
            @Override
            public void run() {
                boolean failed = false;
                try {
                    runDeploymentPlan(plan, responseQueue);
                    logger.infof("Completed deployment plan %s", plan.getId());
                }
                catch (InterruptedException e) {
                    failed = true;
                    Thread.currentThread().interrupt();
                    logger.errorf(e, "Interrupted while executing deployment plan %s", plan.getId());
                }
                catch (Exception e) {
                    logger.errorf(e, "Caught exception executing deployment plan %s", plan.getId());
                    failed = true;
                }
                catch (Error e) {
                    logger.errorf(e, "Caught error executing deployment plan %s", plan.getId());
                    throw e;
                }

                if (failed) {
                    try {
                        pushSingleResponse(responseQueue, new StreamedResponse((byte) DomainClientProtocol.RETURN_DEPLOYMENT_PLAN_COMPLETE, null, true));
                    } catch (InterruptedException e1) {
                        Thread.currentThread().interrupt();
                    }
                }
            }
        };

        getDeploymentExecutor().submit(r);
    }

    /**
     * The actual deployment plan execution logic
     * @throws InterruptedException
     */
    private void runDeploymentPlan(final DeploymentPlan plan, final BlockingQueue<List<StreamedResponse>> responseQueue) throws InterruptedException {

        pushSingleResponse(responseQueue, new StreamedResponse((byte) DomainClientProtocol.RETURN_DEPLOYMENT_PLAN_ID, plan.getId()));

        List<DeploymentSetPlan> setPlans = plan.getDeploymentSetPlans();
        List<DeploymentSetUpdates> updateSets = new ArrayList<DeploymentSetUpdates>(setPlans.size());
        for (DeploymentSetPlan setPlan : setPlans) {
            try {
                updateSets.add(createDeploymentSetUpdates(setPlan, getDomainModel(), setPlan.getDeploymentActions()));
            } catch (InvalidDeploymentPlanException e) {
                logger.errorf(e, "Deployment plan %s is invalid", plan.getId());
                pushSingleResponse(responseQueue, new StreamedResponse((byte) DomainClientProtocol.RETURN_DEPLOYMENT_PLAN_INVALID, e, true));
                return;
            }
        }

        List<DeploymentSetUpdates> rollbackSets = new ArrayList<DeploymentSetUpdates>(setPlans.size());
        boolean ok = true;
        boolean rollbackFailed = false;
        for (DeploymentSetUpdates updateSet : updateSets) {
            if (ok) {
                try {
                    ok = executeDeploymentSet(updateSet, responseQueue);
                    if (ok) {
                        rollbackSets.add(0, updateSet); // roll back in reverse order
                    }
                    else {
                        logger.debugf("Deployment set %s did not succeed", updateSet.setPlan.getId());
                    }
                }
                catch (RollbackFailedException e) {
                    // Deployment set execution failed and it also failed to roll itself back
                    ok = false;
                    rollbackFailed = true;
                    logger.errorf("Rollback of deployment set %s did not succeed", updateSet.setPlan.getId());
                }
            } else {
                // A previous set failed; just inform client this set is cancelled
                cancelDeploymentSet(updateSet, false, responseQueue);
            }
        }

        if (plan.isGlobalRollback()) {
            if (!ok && !rollbackFailed) {
                // Rollback the sets that succeeded before the one that failed
                // The one that failed will have rolled itself back.
                for (Iterator<DeploymentSetUpdates> iter = rollbackSets.iterator(); iter.hasNext();) {
                    DeploymentSetUpdates updateSet = iter.next();
                    try {
                        rollbackDeploymentSet(updateSet, responseQueue);
                        iter.remove();
                    }
                    catch (RollbackFailedException e) {
                        rollbackFailed = true;
                        logger.errorf("Rollback of deployment set %s did not succeed", updateSet.setPlan.getId());
                        // Don't try further rollbacks
                        break;
                    }
                }

            }

            if (rollbackFailed) {
                // Any remaining members in rollbackSets are there because rollback
                // of another set failed. So send notifications
                for (DeploymentSetUpdates updateSet : rollbackSets) {
                    cancelDeploymentSet(updateSet, true, responseQueue);
                }
            }
        }

        pushSingleResponse(responseQueue, new StreamedResponse((byte) DomainClientProtocol.RETURN_DEPLOYMENT_PLAN_COMPLETE, null, true));
    }

    private DomainModel getDomainModel() {
        return domainController.getDomainModel();
    }

    private boolean executeDeploymentSet(final DeploymentSetUpdates updateSet,
            final BlockingQueue<List<StreamedResponse>> responseQueue) throws InterruptedException, RollbackFailedException {

        logger.debugf("Executing deployment set %s", updateSet.setPlan.getId());

        // Execute domain model update on domain controller and server managers
        List<DomainUpdateApplierResponse> rsps = domainController.applyUpdatesToModel(updateSet.getDomainUpdates());

        // Inform client of results
        pushSingleResponse(responseQueue, new StreamedResponse((byte) DomainClientProtocol.RETURN_DEPLOYMENT_SET_ID, updateSet.setPlan.getId()));
        DeploymentAction lastResponseAction = null;
        for (int i = 0; i < rsps.size(); i++) {
            DomainUpdateApplierResponse duar = rsps.get(i);
            // There can be multiple domain updates for a given action, but we
            // only send one response. Use this update result for the response if
            // 1) it failed or 2) it's the last update associated with the action
            if (duar.getDomainFailure() != null || duar.getHostFailures().size() > 0 || updateSet.isLastDomainUpdateForAction(i)) {
                DeploymentAction action = updateSet.getDeploymentActionForDomainUpdate(i);
                if (action != lastResponseAction) {
                    List<StreamedResponse> rspList = new ArrayList<StreamedResponse>(2);
                    rspList.add(new StreamedResponse((byte) DomainClientProtocol.RETURN_DEPLOYMENT_ACTION_ID, action.getId()));
                    rspList.add(new StreamedResponse((byte) DomainClientProtocol.RETURN_DEPLOYMENT_ACTION_MODEL_RESULT, duar));
                    responseQueue.put(rspList);
                    lastResponseAction = action;
                }
            }
        }

        // See if the above was successful before moving on to servers
        DomainUpdateApplierResponse last = rsps.get(rsps.size() - 1);
        if (last.getDomainFailure() != null || last.getHostFailures().size() > 0) {
            // DomainModel update failed; don't apply to servers. The DomainController will
            // have already rolled back the domain model update
            return false;
        }

        // Apply to servers
        Runnable r = getServerUpdateTask(updateSet, rsps, false, responseQueue);
        r.run();

        boolean ok = true;
        for (ServerUpdatePolicy policy : updateSet.updatePolicies.values()) {
            if (policy.isFailed()) {
                logger.infof("Deployment set failed on %s", policy.getServerGroupName());
                ok = false;
                break;
            }
        }

        if (!ok) {
            rollbackDeploymentSet(updateSet, responseQueue);
        }
        return ok;
    }

    private ExecutorService getDeploymentExecutor() {
        return executorService;
    }

    private void cancelDeploymentSet(final DeploymentSetUpdates updateSet,
                                     final boolean forRollback,
                                     final BlockingQueue<List<StreamedResponse>> responseQueue) throws InterruptedException {

        byte type = forRollback ? (byte) DomainClientProtocol.RETURN_DEPLOYMENT_SET_ROLLBACK
                                : (byte) DomainClientProtocol.RETURN_DEPLOYMENT_SET_ID;
        pushSingleResponse(responseQueue, new StreamedResponse(type, updateSet.setPlan.getId()));

        List<StreamedResponse> rspList = new ArrayList<StreamedResponse>();
        DomainUpdateApplierResponse duar = new DomainUpdateApplierResponse(true);
        for (ActionUpdates au : updateSet.actionUpdates) {
            rspList.add(new StreamedResponse((byte) DomainClientProtocol.RETURN_DEPLOYMENT_ACTION_ID, au.action.getId()));
            rspList.add(new StreamedResponse((byte) DomainClientProtocol.RETURN_DEPLOYMENT_ACTION_MODEL_RESULT, duar));
        }

        responseQueue.put(rspList);
    }

    private void rollbackDeploymentSet(final DeploymentSetUpdates updateSet,
            final BlockingQueue<List<StreamedResponse>> responseQueue) throws InterruptedException, RollbackFailedException {

        logger.debugf("Rolling back deployment set %s", updateSet.setPlan.getId());

        // Execute domain model update on domain controller and server managers
        List<DomainUpdateApplierResponse> rsps = domainController.applyUpdatesToModel(updateSet.getDomainRollbacks());

        // Inform client of results
        pushSingleResponse(responseQueue, new StreamedResponse((byte) DomainClientProtocol.RETURN_DEPLOYMENT_SET_ROLLBACK, updateSet.setPlan.getId()));
        DeploymentAction lastResponseAction = null;
        for (int i = 0; i < rsps.size(); i++) {
            DomainUpdateApplierResponse duar = rsps.get(i);
            // There can be multiple domain updates for a given action, but we
            // only send one response. Use this update result for the response if
            // 1) it failed or 2) it's the last update associated with the action
            if (duar.getDomainFailure() != null || duar.getHostFailures().size() > 0 || updateSet.isLastDomainRollbackForAction(i)) {
                DeploymentAction action = updateSet.getDeploymentActionForDomainUpdate(i);
                if (action != lastResponseAction) {
                    List<StreamedResponse> rspList = new ArrayList<StreamedResponse>(2);
                    rspList.add(new StreamedResponse((byte) DomainClientProtocol.RETURN_DEPLOYMENT_ACTION_ID, action.getId()));
                    rspList.add(new StreamedResponse((byte) DomainClientProtocol.RETURN_DEPLOYMENT_ACTION_MODEL_RESULT, duar));
                    responseQueue.put(rspList);
                    lastResponseAction = action;
                }
            }
        }

        DomainUpdateApplierResponse last = rsps.get(rsps.size() - 1);
        if (last.getDomainFailure() != null || last.getHostFailures().size() > 0) {
            throw new RollbackFailedException();
        }

        // Apply to servers
        Runnable r = getServerUpdateTask(updateSet, rsps, true, responseQueue);
        r.run();

        for (ServerUpdatePolicy policy : updateSet.rollbackPolicies.values()) {
            if (policy.isFailed()) {
                throw new RollbackFailedException();
            }
        }
    }

    private Runnable getServerUpdateTask(final DeploymentSetUpdates updateSet,
            final List<DomainUpdateApplierResponse> rsps,
            final boolean forRollbacks,
            final BlockingQueue<List<StreamedResponse>> responseQueue) {

        logger.debugf("Creating server tasks for %s domain responses", rsps.size());
        // Organize all the impacted servers by ServerGroup, sorted within a group by ServerManager
        Map<String, SortedSet<ServerIdentity>> serversByGroup = new HashMap<String, SortedSet<ServerIdentity>>();
        for (DomainUpdateApplierResponse duar : rsps) {
            for (ServerIdentity serverId : duar.getServers()) {
                String serverGroupName = serverId.getServerGroupName();
                SortedSet<ServerIdentity> set = serversByGroup.get(serverGroupName);
                if (set == null) {
                    logger.debugf("Found affected servers in server group %s", serverGroupName);
                    set = new TreeSet<ServerIdentity>(ServerIdentityComparator.INSTANCE);
                    serversByGroup.put(serverGroupName, set);
                }
                set.add(serverId);
            }
        }

        // What kind of tasks we create depends on whether shutdown is configured
        boolean shutdown = updateSet.setPlan.isShutdown();
        long gracefulTimeout = updateSet.setPlan.getGracefulShutdownTimeout();

        List<Runnable> masterList = new ArrayList<Runnable>();
        ConcurrentGroupServerUpdatePolicy predecessor = null;
        for (Set<ServerGroupDeploymentPlan> groupPlans : updateSet.setPlan.getServerGroupDeploymentPlans()) {

            List<Runnable> concurrentGroupsList = new ArrayList<Runnable>(groupPlans.size());
            ConcurrentGroupServerUpdatePolicy parent = new ConcurrentGroupServerUpdatePolicy(predecessor, groupPlans);
            predecessor = parent;

            for (ServerGroupDeploymentPlan groupPlan : groupPlans) {
                String serverGroupName = groupPlan.getServerGroupName();
                SortedSet<ServerIdentity> servers = serversByGroup.get(serverGroupName);
                if (servers == null) {
                    // Just KISS and use a placeholder
                    servers = new TreeSet<ServerIdentity>();
                }

                ServerUpdatePolicy policy = null;
                if (forRollbacks) {
                    policy = new ServerUpdatePolicy(parent, serverGroupName, servers);
                    updateSet.rollbackPolicies.put(serverGroupName, policy);
                }
                else {
                    policy = new ServerUpdatePolicy(parent, serverGroupName, servers, groupPlan);
                    updateSet.updatePolicies.put(serverGroupName, policy);
                }

                List<Runnable> groupTasks = new ArrayList<Runnable>(servers.size());
                if (shutdown) {
                    for (ServerIdentity server : servers) {
                        groupTasks.add (new ServerRestartTask(server, updateSet, forRollbacks, policy, responseQueue, gracefulTimeout));
                    }
                }
                else if (forRollbacks) {
                    // need rollback tasks for this server
                    for (ServerIdentity server : servers) {
                        List<AbstractServerModelUpdate<?>> serverRollbacks = null;

                        // Find out what happened first time
                        List<UpdateResultHandlerResponse<?>> origResults  = updateSet.serverResults.get(server);
                        if (origResults == null) {
                            // Must be a new server that appeared. Assume it got
                            // the invalid model on start; roll 'em all back
                            serverRollbacks = updateSet.getServerRollbacks();
                        }
                        else {
                            serverRollbacks = new ArrayList<AbstractServerModelUpdate<?>>();
                            boolean rollingBack = false;
                            for (int i = origResults.size() - 1; i >= 0; i--) {
                                UpdateResultHandlerResponse<?> origResult = origResults.get(i);
                                if (!rollingBack) {
                                    // See if we need to roll this one back. Once we hit one in the list,
                                    // the rest need to be rolled back as well
                                    rollingBack = !origResult.isCancelled() && !origResult.isRolledBack();
                                }

                                if (rollingBack) {
                                    serverRollbacks.add(updateSet.getRollbackUpdateForServerUpdate(i));
                                }
                            }
                        }
                        if (serverRollbacks.size() > 0) {
                            groupTasks.add (new RunningServerUpdateTask(server, updateSet, serverRollbacks, policy, responseQueue));
                        }
                    }
                }
                else {
                    // Standard case
                    for (ServerIdentity server : servers) {
                        groupTasks.add (new RunningServerUpdateTask(server, updateSet, policy, responseQueue, groupPlan.isRollback()));
                    }
                }

                if (groupPlan.isRollingToServers()) {
                    concurrentGroupsList.add(new RollingUpdateTask(groupTasks));
                }
                else {
                    concurrentGroupsList.add(new ConcurrentUpdateTask(groupTasks, getDeploymentExecutor()));
                }
            }
            masterList.add(new ConcurrentUpdateTask(concurrentGroupsList, getDeploymentExecutor()));
        }

        return new RollingUpdateTask(masterList);
    }

    private static void pushSingleResponse(BlockingQueue<List<StreamedResponse>> queue, StreamedResponse response) throws InterruptedException {
        queue.put(Collections.singletonList(response));
    }

    /** Performs the translation from DeploymentAction to domain and server model updates */
    private static DeploymentSetUpdates createDeploymentSetUpdates(DeploymentSetPlan plan, DomainModel model, List<DeploymentAction> actions) throws InvalidDeploymentPlanException {

        logger.debugf("Creating DeploymentSetUpdates for deployment set %s", plan.getId());

        if (actions.size() == 0) {
            throw new InvalidDeploymentPlanException(String.format("%s %s contains no deployment actions", DeploymentSetPlan.class.getSimpleName(), plan.getId()));
        }

        List<ActionUpdates> actionUpdates = new ArrayList<ActionUpdates>();

        for (DeploymentAction action : actions) {
            ActionUpdates au = new ActionUpdates(action);
            actionUpdates.add(au);

            try {
                DeploymentActionImpl dai = (DeploymentActionImpl) action;
                switch (action.getType()) {
                    case ADD: {
                        String deploymentName = dai.getDeploymentUnitUniqueName();
                        logger.tracef("Add of deployment %s", deploymentName);
                        String runtimeName = dai.getNewContentFileName();
                        byte[] hash = dai.getNewContentHash();
                        if (runtimeName == null) {
                            // This is a request to add already existing content to this set's server groups
                            DeploymentUnitElement de = model.getDeployment(deploymentName);
                            if (de == null) {
                                throw new InvalidDeploymentPlanException("Unknown deployment unit " + deploymentName);
                            }
                            runtimeName = de.getRuntimeName();
                            hash = de.getSha1Hash();
                        }
                        else if (model.getDeployment(deploymentName) == null) {
                            // Deployment is new to the domain; add it
                            DomainDeploymentAdd dda = new DomainDeploymentAdd(deploymentName, runtimeName, hash, false);
                            au.domainUpdates.add(new DomainUpdate(dda, model, true));
                        }
                        // Now add to serve groups
                        ServerGroupDeploymentAdd sgda = new ServerGroupDeploymentAdd(deploymentName, runtimeName, hash, false);
                        addServerGroupUpdates(plan, au, sgda, model);
                        break;
                    }
                    case DEPLOY: {
                        logger.tracef("Deploy of deployment %s", dai.getDeploymentUnitUniqueName());
                        ServerGroupDeploymentStartStopUpdate sgdssu = new ServerGroupDeploymentStartStopUpdate(dai.getDeploymentUnitUniqueName(), true);
                        addServerGroupUpdates(plan, au, sgdssu, model);
                        break;
                    }
                    case FULL_REPLACE: {
                        logger.tracef("Full replace of deployment %s", dai.getDeploymentUnitUniqueName());
                        // Validate all relevant server groups are touched
                        String deploymentName = dai.getDeploymentUnitUniqueName();
                        Set<String> names = new LinkedHashSet<String>(model.getServerGroupNames());
                        for (Set<ServerGroupDeploymentPlan> ssgp : plan.getServerGroupDeploymentPlans()) {
                            for (ServerGroupDeploymentPlan sgdp : ssgp) {
                                names.remove(sgdp.getServerGroupName());
                            }
                        }
                        for (Iterator<String> it = names.iterator(); it.hasNext();) {
                            String name = it.next();
                            ServerGroupElement sge = model.getServerGroup(name);
                            if (sge.getDeployment(dai.getDeploymentUnitUniqueName()) == null) {
                                it.remove();
                            }
                        }
                        if (names.size() > 0) {
                            throw new IncompleteDeploymentReplaceException(deploymentName, names.toArray(new String[names.size()]));
                        }
                        DeploymentUnitElement deployment = model.getDeployment(dai.getDeploymentUnitUniqueName());
                        boolean start = deployment != null && deployment.isStart();
                        DomainDeploymentFullReplaceUpdate ddfru = new DomainDeploymentFullReplaceUpdate(deploymentName, dai.getNewContentFileName(), dai.getNewContentHash(), start);
                        au.domainUpdates.add(new DomainUpdate(ddfru, model, true));
                        break;
                    }
                    case REDEPLOY: {
                        logger.tracef("Redeploy of deployment %s", dai.getDeploymentUnitUniqueName());
                        DomainDeploymentRedeployUpdate ddru = new DomainDeploymentRedeployUpdate(dai.getDeploymentUnitUniqueName());
                        au.domainUpdates.add(new DomainUpdate(ddru, model, true));
                        break;
                    }
                    case REMOVE: {
                        logger.tracef("Remove of deployment %s", dai.getDeploymentUnitUniqueName());
                        ServerGroupDeploymentRemove sgdr = new ServerGroupDeploymentRemove(dai.getDeploymentUnitUniqueName());
                        addServerGroupUpdates(plan, au, sgdr, model);
                        // If no server group is using this, remove from domain
                        Set<String> names = model.getServerGroupNames();
                        for (Set<ServerGroupDeploymentPlan> ssgp : plan.getServerGroupDeploymentPlans()) {
                            for (ServerGroupDeploymentPlan sgdp : ssgp) {
                                names.remove(sgdp.getServerGroupName());
                            }
                        }
                        boolean left = false;
                        for (String name : names) {
                            ServerGroupElement sge = model.getServerGroup(name);
                            if (sge.getDeployment(dai.getDeploymentUnitUniqueName()) != null) {
                                left = true;
                                break;
                            }
                        }
                        if (!left) {
                            DomainDeploymentRemove ddr = new DomainDeploymentRemove(dai.getDeploymentUnitUniqueName());
                            au.domainUpdates.add(new DomainUpdate(ddr, model, true));
                        }
                        break;
                    }
                    case REPLACE: {
                        logger.tracef("Replace of deployment %s", dai.getDeploymentUnitUniqueName());
                        ServerGroupDeploymentReplaceUpdate sgdru = new ServerGroupDeploymentReplaceUpdate(dai.getDeploymentUnitUniqueName(), dai.getNewContentFileName(), dai.getNewContentHash(), dai.getReplacedDeploymentUnitUniqueName());
                        addServerGroupUpdates(plan, au, sgdru, model);
                        break;
                    }
                    case UNDEPLOY: {
                        logger.tracef("Undeploy of deployment %s", dai.getDeploymentUnitUniqueName());
                        ServerGroupDeploymentStartStopUpdate sgdssu = new ServerGroupDeploymentStartStopUpdate(dai.getDeploymentUnitUniqueName(), false);
                        addServerGroupUpdates(plan, au, sgdssu, model);
                        break;
                    }
                    default:
                        throw new IllegalStateException(String.format("Unknown %s %s", DeploymentAction.class.getSimpleName(), action.getType()));
                }
            }
            catch (InvalidDeploymentPlanException e) {
                throw e;
            }
            catch (Exception e) {
                throw new InvalidDeploymentPlanException(String.format("Deployment action %s of type %s primarily affecting deployment %s is invalid", action.getId(), action.getType(), action.getDeploymentUnitUniqueName()), e);
            }
        }

        logger.debugf("Created %s action updates", actionUpdates.size());

        return new DeploymentSetUpdates(actionUpdates, plan);
    }

    private static void addServerGroupUpdates(final DeploymentSetPlan plan, final ActionUpdates au, final AbstractModelUpdate<ServerGroupElement, ?> serverGroupUpdate, DomainModel model) {
        boolean setSMU = true;
        for (Set<ServerGroupDeploymentPlan> ssgp : plan.getServerGroupDeploymentPlans()) {
            for (ServerGroupDeploymentPlan sgdp : ssgp) {
                AbstractDomainModelUpdate<?> dmu = DomainServerGroupUpdate.create(sgdp.getServerGroupName(), serverGroupUpdate);
                DomainUpdate du = new DomainUpdate(dmu, model, setSMU);
                au.domainUpdates.add(du);
                setSMU = setSMU && du.serverUpdate == null;
            }
        }
    }

    /** Holder for information about a deployment set */
    private static class DeploymentSetUpdates {
        private final List<ActionUpdates> actionUpdates;
        private final DeploymentSetPlan setPlan;
        private final Map<ServerIdentity, List<UpdateResultHandlerResponse<?>>> serverResults = new ConcurrentHashMap<ServerIdentity, List<UpdateResultHandlerResponse<?>>>();
        private final Map<String, ServerUpdatePolicy> updatePolicies = new HashMap<String, ServerUpdatePolicy>();;
        private final Map<String, ServerUpdatePolicy> rollbackPolicies = new HashMap<String, ServerUpdatePolicy>();;

        private DeploymentSetUpdates(final List<ActionUpdates> actionUpdates, final DeploymentSetPlan setPlan) {
            this.actionUpdates = actionUpdates;
            this.setPlan = setPlan;
        }

        private List<AbstractDomainModelUpdate<?>> getDomainUpdates() {
            List<AbstractDomainModelUpdate<?>>result = new ArrayList<AbstractDomainModelUpdate<?>>();
            for (ActionUpdates au : actionUpdates) {
                for (DomainUpdate du : au.domainUpdates) {
                    result.add(du.update);
                }
            }
            return result;
        }

        private List<AbstractServerModelUpdate<?>> getServerUpdates() {
            List<AbstractServerModelUpdate<?>>result = new ArrayList<AbstractServerModelUpdate<?>>();
            for (ActionUpdates au : actionUpdates) {
                for (DomainUpdate du : au.domainUpdates) {
                    if (du.serverUpdate != null) {
                        result.add(du.serverUpdate.update);
                    }
                }
            }
            return result;
        }

        private List<AbstractDomainModelUpdate<?>> getDomainRollbacks() {
            List<AbstractDomainModelUpdate<?>>result = new ArrayList<AbstractDomainModelUpdate<?>>();
            for (ActionUpdates au : actionUpdates) {
                for (DomainUpdate du : au.domainUpdates) {
                    if (du.compensatingUpdate != null)
                    result.add(du.compensatingUpdate);
                }
            }
            return result;
        }

        private List<AbstractServerModelUpdate<?>> getServerRollbacks() {
            List<AbstractServerModelUpdate<?>>result = new ArrayList<AbstractServerModelUpdate<?>>();
            for (ActionUpdates au : actionUpdates) {
                for (DomainUpdate du : au.domainUpdates) {
                    if (du.serverUpdate != null) {
                        if (du.serverUpdate.compensatingUpdate != null) {
                            result.add(du.serverUpdate.compensatingUpdate);
                        }
                    }
                }
            }
            return result;
        }

        private boolean isLastDomainUpdateForAction(int index) {
            int count = 0;
            for (ActionUpdates au : actionUpdates) {
                count += au.domainUpdates.size();
                if (index < count)
                    return (index == count - 1);
            }
            throw new IndexOutOfBoundsException(index + " is larger than the index of the last domain update (" + (getDomainUpdates().size() - 1) + ")");
        }

        private boolean isLastDomainRollbackForAction(int index) {
            int count = 0;
            for (int i = actionUpdates.size() -1; i >= 0; i--) {
                ActionUpdates au = actionUpdates.get(i);
                int rbCount = 0;
                for (DomainUpdate du : au.domainUpdates) {
                    if (du.compensatingUpdate != null)
                        rbCount++;
                }
                count += rbCount;
                if (index < count)
                    return (index == count - 1);
            }
            throw new IndexOutOfBoundsException(index + " is larger than the index of the last domain rollback update (" + (getDomainRollbacks().size() - 1) + ")");
        }

        private DeploymentAction getDeploymentActionForDomainUpdate(int index) {
            int count = 0;
            for (ActionUpdates au : actionUpdates) {
                count += au.domainUpdates.size();
                if (index < count) {
                    return au.action;
                }
            }
            throw new IndexOutOfBoundsException(index + " is larger than the index of the last domain update (" + (getDomainUpdates().size() - 1) + ")");
        }

        private boolean isLastServerUpdateForAction(int index) {
            int count = 0;
            for (ActionUpdates au : actionUpdates) {
                int suCount = 0;
                for (DomainUpdate du : au.domainUpdates) {
                    if (du.serverUpdate != null)
                        suCount++;
                }
                count += suCount;
                if (index < count)
                    return (index == count - 1);
            }
            throw new IndexOutOfBoundsException(index + " is larger than the index of the last server update (" + (getServerUpdates().size() - 1) + ")");
        }

        private boolean isLastServerRollbackUpdateForAction(int index) {
            int count = 0;
            for (int i = actionUpdates.size() -1; i >= 0; i--) {
                ActionUpdates au = actionUpdates.get(i);
                int suCount = 0;
                for (DomainUpdate du : au.domainUpdates) {
                    if (du.serverUpdate != null && du.serverUpdate.compensatingUpdate != null)
                        suCount++;
                }
                count += suCount;
                if (index < count)
                    return (index == count - 1);
            }
            throw new IndexOutOfBoundsException(index + " is larger than the index of the last server rollback update (" + (getServerRollbacks().size() - 1) + ")");
        }

        private DeploymentAction getDeploymentActionForServerUpdate(int index) {
            int count = 0;
            for (ActionUpdates au : actionUpdates) {
                int suCount = 0;
                for (DomainUpdate du : au.domainUpdates) {
                    if (du.serverUpdate != null)
                        suCount++;
                }
                count += suCount;
                if (index < count) {
                    return au.action;
                }
            }
            throw new IndexOutOfBoundsException(index + " is larger than the index of the last server update (" + (getServerUpdates().size() - 1) + ")");
        }

        private DeploymentAction getDeploymentActionForServerRollbackUpdate(int index) {
            int count = 0;
            for (ActionUpdates au : actionUpdates) {
                int suCount = 0;
                for (DomainUpdate du : au.domainUpdates) {
                    if (du.serverUpdate != null && du.serverUpdate.compensatingUpdate != null)
                        suCount++;
                }
                count += suCount;
                if (index < count) {
                    return au.action;
                }
            }
            throw new IndexOutOfBoundsException(index + " is larger than the index of the last server update (" + (getServerUpdates().size() - 1) + ")");
        }

        private AbstractServerModelUpdate<?> getRollbackUpdateForServerUpdate(int index) {

            int count = 0;
            for (ActionUpdates au : actionUpdates) {
                for (DomainUpdate du : au.domainUpdates) {
                    if (du.serverUpdate != null) {
                        if (index == count++) {
                            return du.serverUpdate.compensatingUpdate;
                        }
                    }
                }
            }
            throw new IndexOutOfBoundsException(index + " is larger than the index of the last domain update (" + (getServerUpdates().size() - 1) + ")");
        }
    }

    /** Simple data class to associate an action with the relevant update objects */
    private static class ActionUpdates {
        private final DeploymentAction action;
        private final List<DomainUpdate> domainUpdates = new ArrayList<DomainUpdate>();

        private ActionUpdates(DeploymentAction action) {
            this.action = action;
        }
    }

    private abstract class AbstractServerUpdateTask implements Runnable {
        protected final ServerUpdatePolicy updatePolicy;
        protected final ServerIdentity serverId;
        protected final DeploymentSetUpdates updates;
        protected final BlockingQueue<List<StreamedResponse>> responseQueue;
        protected final boolean forRollback;

        AbstractServerUpdateTask(final ServerIdentity serverId, final DeploymentSetUpdates updates,
                final boolean forRollback, final ServerUpdatePolicy updatePolicy,
                final BlockingQueue<List<StreamedResponse>> responseQueue) {
            this.serverId = serverId;
            this.updatePolicy = updatePolicy;
            this.responseQueue = responseQueue;
            this.updates = updates;
            this.forRollback = forRollback;
        }

        @Override
        public void run() {
            if (updatePolicy.canUpdateServer(serverId)) {
                processUpdates();
            }
            else {
                sendCancelledResponses();
            }
        }

        protected abstract void processUpdates();

        private void sendCancelledResponses() {
            UpdateResultHandlerResponse<Void> urhr = UpdateResultHandlerResponse.createCancellationResponse();
            for (ActionUpdates au : updates.actionUpdates) {
                sendServerUpdateResult(au.action.getId(), urhr);
                if (Thread.interrupted()) {
                    break;
                }
            }
        }

        protected void sendServerUpdateResult(UUID actionId, UpdateResultHandlerResponse<?> urhr) {
            List<StreamedResponse> responses = new ArrayList<StreamedResponse>();

            byte type = forRollback ? (byte) DomainClientProtocol.RETURN_SERVER_DEPLOYMENT_ROLLBACK
                                    : (byte) DomainClientProtocol.RETURN_SERVER_DEPLOYMENT;
            responses.add(new StreamedResponse(type, actionId));

            responses.add(new StreamedResponse((byte) DomainClientProtocol.RETURN_HOST_NAME, serverId.getHostName()));
            responses.add(new StreamedResponse((byte) DomainClientProtocol.RETURN_SERVER_GROUP_NAME, serverId.getServerGroupName()));
            responses.add(new StreamedResponse((byte) DomainClientProtocol.RETURN_SERVER_NAME, serverId.getServerName()));
            responses.add(new StreamedResponse((byte) DomainClientProtocol.RETURN_SERVER_DEPLOYMENT_RESULT, urhr));
            try {
                responseQueue.put(responses);
            } catch (InterruptedException e) {
                logger.errorf("%s interrupted sending server update responses for %s %s", toString(), DeploymentSetPlan.class.getSimpleName(), updates.setPlan.getId());
                Thread.currentThread().interrupt();
            }
        }

        @Override
        public String toString() {
            StringBuilder sb = new StringBuilder(getClass().getSimpleName());
            sb.append("{server=");
            sb.append(serverId.getServerName());
            sb.append("}");
            return sb.toString();
        }
    }

    /** Applies updates a running server */
    private class RunningServerUpdateTask extends AbstractServerUpdateTask {

        private final boolean allowOverallRollback;
        private final List<AbstractServerModelUpdate<?>> serverUpdates;

        /**
         * Constructor for the normal case
         */
        private RunningServerUpdateTask(final ServerIdentity serverId,
                final DeploymentSetUpdates updates,
                final ServerUpdatePolicy updatePolicy,
                final BlockingQueue<List<StreamedResponse>> responseQueue,
                final boolean allowOverallRollback) {
            super(serverId, updates, false, updatePolicy, responseQueue);
            this.allowOverallRollback = allowOverallRollback;
            this.serverUpdates = updates.getServerUpdates();
        }

        /**
         * Constructor for the rollback case
         */
        private RunningServerUpdateTask(final ServerIdentity serverId,
                final DeploymentSetUpdates updates,
                final List<AbstractServerModelUpdate<?>> serverUpdates,
                final ServerUpdatePolicy updatePolicy,
                final BlockingQueue<List<StreamedResponse>> responseQueue) {
            super(serverId, updates, true, updatePolicy, responseQueue);
            this.allowOverallRollback = false;
            this.serverUpdates = serverUpdates;
        }

        @Override
        protected void processUpdates() {
            try {
                logger.debugf("Applying %s updates to  %s", updates.getServerUpdates().size(), serverId);
                List<UpdateResultHandlerResponse<?>> rsps =
                    domainController.applyUpdatesToServer(serverId, serverUpdates, allowOverallRollback);
                if (!forRollback) {
                    updates.serverResults.put(serverId, rsps);
                }
                updatePolicy.recordServerResult(serverId, rsps);

                // Push responses to client
                DeploymentAction lastResponseAction = null;
                for (int i = 0; i < rsps.size(); i++) {
                    UpdateResultHandlerResponse<?> urhr = rsps.get(i);
                    // There can be multiple server updates for a given action, but we
                    // only send one response. Use this update result for the response if
                    // 1) it failed or 2) it's the last update associated with the action
                    boolean sendResponse = urhr.getFailureResult() != null;
                    if (!sendResponse) {
                        sendResponse = forRollback ? updates.isLastServerRollbackUpdateForAction(i)
                                                   : updates.isLastServerUpdateForAction(i);
                    }
                    if (sendResponse) {
                        DeploymentAction action = forRollback ? updates.getDeploymentActionForServerRollbackUpdate(i)
                                                              : updates.getDeploymentActionForServerUpdate(i);
                        if (action != lastResponseAction) {
                            sendServerUpdateResult(action.getId(), urhr);
                            lastResponseAction = action;
                        }
                    }
                }
            }
            catch (Exception e) {
                logger.errorf(e, "Caught exception applying updates to %s", serverId);
            }
        }
    }

    /** Restarts a server */
    private class ServerRestartTask extends AbstractServerUpdateTask {

        private final long gracefulTimeout;

        private ServerRestartTask(final ServerIdentity serverId, final DeploymentSetUpdates updates,
                final boolean forRollback, final ServerUpdatePolicy updatePolicy,
                final BlockingQueue<List<StreamedResponse>> responseQueue, final long gracefulTimeout) {
            super(serverId, updates, forRollback, updatePolicy, responseQueue);
            this.gracefulTimeout = gracefulTimeout;
        }

        @Override
        protected void processUpdates() {

            UpdateResultHandlerResponse<?> urhr = domainController.restartServer(serverId, gracefulTimeout);
            List<UpdateResultHandlerResponse<?>> rsps = Collections.<UpdateResultHandlerResponse<?>>singletonList(urhr);
            updates.serverResults.put(serverId, rsps);
            updatePolicy.recordServerResult(serverId, rsps);

            for (ActionUpdates au : updates.actionUpdates) {
                sendServerUpdateResult(au.action.getId(), urhr);
                if (Thread.interrupted()) {
                    break;
                }
            }
        }
    }

    /** Used to order ServerIdentity instances based on host name */
    private static class ServerIdentityComparator implements Comparator<ServerIdentity> {

        private static final ServerIdentityComparator INSTANCE = new ServerIdentityComparator();

        @Override
        public int compare(ServerIdentity o1, ServerIdentity o2) {
            int val = o1.getHostName().compareTo(o2.getHostName());
            if (val == 0) {
                val = o1.getServerName().compareTo(o2.getServerName());
            }
            return val;
        }
    }

    /** Exception indicating the rollback of a deployment set failed */
    private static class RollbackFailedException extends Exception {
        private static final long serialVersionUID = 3524620474555254562L;
    }

    /** Associates a domain update with its compensating update and its server update */
    private static class DomainUpdate {
        private final AbstractDomainModelUpdate<?> update;
        private final AbstractDomainModelUpdate<?> compensatingUpdate;
        private final ServerUpdate serverUpdate;

        private DomainUpdate(final AbstractDomainModelUpdate<?> update,
                             final DomainModel model,
                             final boolean createServerUpdate) {
            this.update = update;
            this.compensatingUpdate = update.getCompensatingUpdate(model);
            if (createServerUpdate) {
                AbstractServerModelUpdate<?> smu = update.getServerModelUpdate();
                if (smu != null) {
                    AbstractServerModelUpdate<?> csmu = compensatingUpdate == null ? null : compensatingUpdate.getServerModelUpdate();
                    serverUpdate = new ServerUpdate(smu, csmu);
                }
                else serverUpdate = null;
            }
            else serverUpdate = null;
        }
    }

    /** Associates a server update with its compensating update */
    private static class ServerUpdate {
        private final AbstractServerModelUpdate<?> update;
        private final AbstractServerModelUpdate<?> compensatingUpdate;

        private ServerUpdate(final AbstractServerModelUpdate<?> update,
                final AbstractServerModelUpdate<?> compensatingUpdate) {
            this.update = update;
            this.compensatingUpdate = compensatingUpdate;
        }
    }
}
TOP

Related Classes of org.jboss.as.domain.controller.DomainDeploymentHandler$DomainUpdate

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.
.js','ga'); ga('create', 'UA-20639858-1', 'auto'); ga('send', 'pageview');