Package io.fabric8.container.process

Source Code of io.fabric8.container.process.ProcessControllerFactoryService

/**
*  Copyright 2005-2014 Red Hat, Inc.
*
*  Red Hat licenses this file to you under the Apache License, version
*  2.0 (the "License"); you may not use this file except in compliance
*  with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
*  Unless required by applicable law or agreed to in writing, software
*  distributed under the License is distributed on an "AS IS" BASIS,
*  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
*  implied.  See the License for the specific language governing
*  permissions and limitations under the License.
*/
package io.fabric8.container.process;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.fabric8.api.Constants;
import io.fabric8.api.Container;
import io.fabric8.api.CreateChildContainerOptions;
import io.fabric8.api.CreateContainerBasicOptions;
import io.fabric8.api.DataStore;
import io.fabric8.api.FabricService;
import io.fabric8.api.jcip.ThreadSafe;
import io.fabric8.api.scr.AbstractComponent;
import io.fabric8.api.scr.Configurer;
import io.fabric8.api.scr.ValidatingReference;
import io.fabric8.utils.Objects;
import io.fabric8.process.manager.Installation;
import io.fabric8.process.manager.ProcessManager;
import io.fabric8.service.child.ChildContainerController;
import io.fabric8.service.child.ChildContainers;
import io.fabric8.service.child.ProcessControllerFactory;
import io.fabric8.zookeeper.utils.ZooKeeperMasterCache;
import io.fabric8.zookeeper.utils.ZooKeeperUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.ConfigurationPolicy;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.Service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

/**
*/
@ThreadSafe
@Component(name = "io.fabric8.container.process.controller", label = "Fabric8 Child Process Container Controller",
        policy = ConfigurationPolicy.OPTIONAL, immediate = true, metatype = true)
@Service(ProcessControllerFactory.class)
public final class ProcessControllerFactoryService extends AbstractComponent implements ProcessControllerFactory {
    private static final transient Logger LOG = LoggerFactory.getLogger(ProcessControllerFactoryService.class);

    private static final int DEFAULT_EXTERNAL_PORT = 9000;

    @Reference
    private Configurer configurer;

    @Reference(referenceInterface = FabricService.class)
    private final ValidatingReference<FabricService> fabricService = new ValidatingReference<FabricService>();

    @Reference(referenceInterface = CuratorFramework.class, bind = "bindCurator", unbind = "unbindCurator")
    private final ValidatingReference<CuratorFramework> curator = new ValidatingReference<CuratorFramework>();

    @Reference(referenceInterface = ProcessManager.class)
    private final ValidatingReference<ProcessManager> processManager = new ValidatingReference<ProcessManager>();

    @Property(name = "monitorPollTime", longValue = 1500,
            label = "Monitor poll period",
            description = "The number of milliseconds after which the processes will be polled to check they are started and still alive.")
    private long monitorPollTime = 1500;

    private int externalJolokiaPort;
    private int externalPortCounter;
    private int[] containerLocalIp4Address = {127, 0, 0, 0};

    private Timer keepAliveTimer;

    private final Runnable configurationChangeHandler = new Runnable() {
        @Override
        public void run() {
            onConfigurationChanged();
        }
    };

    private final ConcurrentMap<String, Set<String>> containerToZKPathMap = new ConcurrentHashMap<>();
    private ZooKeeperMasterCache zkMasterCache;


    @Activate
    void activate() {
        activateComponent();
        keepAliveTimer = new Timer("fabric8-process-container-monitor");

        TimerTask timerTask = new TimerTask() {
            @Override
            public void run() {
                checkProcessesStatus();
            }
        };
        keepAliveTimer.schedule(timerTask, monitorPollTime, monitorPollTime);

        DataStore dataStore = fabricService.get().adapt(DataStore.class);
        dataStore.trackConfiguration(configurationChangeHandler);
    }

    @Deactivate
    void deactivate() {
        if (zkMasterCache != null) {
            zkMasterCache = null;
        }
        DataStore dataStore = fabricService.get().adapt(DataStore.class);
        dataStore.untrackConfiguration(configurationChangeHandler);
        if (keepAliveTimer != null) {
            keepAliveTimer.cancel();
            keepAliveTimer = null;
        }
        deactivateComponent();
    }


    @Override
    public ChildContainerController createController(CreateChildContainerOptions options) {
        FabricService fabric = getFabricService();
        boolean isJavaOrProcessContainer = ChildContainers.isJavaOrProcessContainer(fabric, options);
        if (isJavaOrProcessContainer) {
            return createProcessManagerController();
        }
        return null;
    }

    @Override
    public ChildContainerController getControllerForContainer(Container container) {
        Installation installation = getProcessManager().getInstallation(container.getId());
        ChildContainerController answer = null;
        if (installation != null) {
            answer = createProcessManagerController();
        }
        return answer;
    }

    /**
     * Allocates a new jolokia port for the given container ID
     *
     * @param containerId
     * @return
     */
    public synchronized int createJolokiaPort(String containerId) {
        FabricService fabricService = getFabricService();
        Container currentContainer = fabricService.getCurrentContainer();
        Set<Integer> usedPortByHost = fabricService.getPortService().findUsedPortByHost(currentContainer);

        while (true) {
            if (externalJolokiaPort <= 0) {
                externalJolokiaPort = JolokiaAgentHelper.DEFAULT_JOLOKIA_PORT;
            } else {
                externalJolokiaPort++;
            }
            if (!usedPortByHost.contains(externalJolokiaPort)) {
                Container container = fabricService.getCurrentContainer();
                String pid = JolokiaAgentHelper.JOLOKIA_PORTS_PID;
                String key = containerId;
                fabricService.getPortService().registerPort(container, pid, key, externalJolokiaPort);
                return externalJolokiaPort;
            }
        }
    }

    /**
     * Allocates a new external port for the given containerId and portKey
     */
    public synchronized int createExternalPort(String containerId, String portKey, Set<Integer> usedPortByHost, CreateContainerBasicOptions options) {
        while (true) {
            if (externalPortCounter <= 0) {
                externalPortCounter = options.getMinimumPort();
                if (externalPortCounter == 0) {
                    externalPortCounter = DEFAULT_EXTERNAL_PORT;
                }
            } else {
                externalPortCounter++;
            }
            if (!usedPortByHost.contains(externalPortCounter)) {
                Container container = getFabricService().getCurrentContainer();
                String pid = Constants.PORTS_PID;
                String key = containerId + "-" + portKey;
                getFabricService().getPortService().registerPort(container, pid, key, externalPortCounter);
                return externalPortCounter;
            }
        }
    }

    /**
     * Creates a new local container address such as 127.0.0.1, 127.0.0.2, 127.0.0.3 etc so each container can have its own local address
     * for example when working with Cassandra; it allow the same ports to be used but on different addresses.
     */
    public synchronized String createContainerLocalAddress(String containerId, CreateContainerBasicOptions options) {
        for (int i = containerLocalIp4Address.length - 1; i >= 0; i--) {
            int counter = ++containerLocalIp4Address[i];
            if (counter > 255) {
                containerLocalIp4Address[i] = 0;
            } else {
                break;
            }
        }
        StringBuilder builder = new StringBuilder();
        for (int counter : containerLocalIp4Address) {
            if (builder.length() > 0) {
                builder.append(".");
            }
            builder.append("" + counter);
        }
        return builder.toString();
    }

    private void onConfigurationChanged() {
        ProcessManager manager = getProcessManager();
        FabricService fabric = getFabricService();
        if (manager != null && fabric != null) {
            ImmutableMap<String, Installation> map = manager.listInstallationMap();
            ImmutableSet<Map.Entry<String, Installation>> entries = map.entrySet();
            for (Map.Entry<String, Installation> entry : entries) {
                String id = entry.getKey();
                Installation installation = entry.getValue();
                try {
                    Container container = null;
                    try {
                        container = fabric.getContainer(id);
                    } catch (Exception e) {
                        LOG.debug("No container for id: " + id + ". " + e, e);
                    }
                    if (container != null && installation != null) {
                        ChildContainerController controllerForContainer = getControllerForContainer(container);
                        if (controllerForContainer instanceof ProcessManagerController) {
                            ProcessManagerController processManagerController = (ProcessManagerController) controllerForContainer;
                            processManagerController.updateInstallation(container, installation);
                        }
                    }
                } catch (Exception e) {
                    LOG.warn("Failed to get PID for process " + id + ". " + e, e);
                }
            }
        }

    }

    /**
     * Loads the list of ZooKeeper paths written for the given container.
     *
     * Since we write them as ephemeral nodes; we can use RAM to cache these since they are only
     * required to eagerly remove the ZK entries
     */
    public Set<String> loadZooKeeperPaths(Installation installation, Container container) {
        String id = container.getId();
        return loadZooKeeperPaths(id);
    }

    private Set<String> loadZooKeeperPaths(String id) {
        HashSet<String> newValue = new HashSet<String>();
        Set<String> oldValue = containerToZKPathMap.putIfAbsent(id, newValue);
        return oldValue != null ? oldValue : newValue;
    }

    public void saveZooKeeperPaths(Installation installation, Container container, Set<String> zkPaths) {
        String id = container.getId();
        containerToZKPathMap.put(id, zkPaths);
    }

    public void addZooKeeperPaths(Installation installation, Container container, List<String> newZkPaths) {
        Set<String> allZkPaths = loadZooKeeperPaths(installation, container);
        boolean changed = false;
        for (String newZkPath : newZkPaths) {
            if (allZkPaths.add(newZkPath)) {
                changed = true;
            }
        }
        if (changed) {
            saveZooKeeperPaths(installation, container, allZkPaths);
        }
    }

    public void deleteContainerZooKeeperPaths(Installation installation, Container container) {
        String id = container.getId();
        deleteContainerZooKeeperPaths(id);
    }

    private void deleteContainerPathsForDeadContainers(Set<String> aliveIds) {
        Set<String> containerIds = new HashSet<String>(containerToZKPathMap.keySet());
        containerIds.removeAll(aliveIds);
        for (String id : containerIds) {
            deleteContainerZooKeeperPaths(id);
        }
    }

    public void deleteContainerZooKeeperPaths(String id) {
        Set<String> zkPaths =  containerToZKPathMap.remove(id);
        if (zkPaths != null) {
            deleteZooKeeperPaths(zkPaths);
        }
    }

    private void deleteZooKeeperPaths(Set<String> zkPaths) {
        for (String path : zkPaths) {
            try {
                CuratorFramework curatorFramework = getCuratorFramework();
                if (ZooKeeperUtils.exists(curatorFramework, path) != null) {
                    LOG.info("unregistered container entry at ZK path: " + path);
                    ZooKeeperUtils.deleteSafe(curatorFramework, path);
                }
            } catch (Exception e) {
                LOG.warn("Failed to remove web app json at path " + path + ". " + e, e);
            }

        }
    }

    private ProcessManagerController createProcessManagerController() {
        return new ProcessManagerController(this, configurer, getProcessManager(), getFabricService(), getCuratorFramework());
    }

    CuratorFramework getCuratorFramework() {
        return curator.get();
    }

    private ProcessManager getProcessManager() {
        return processManager.get();
    }

    FabricService getFabricService() {
        return fabricService.get();
    }

    void bindConfigurer(Configurer configurer) {
        this.configurer = configurer;
    }

    void unbindConfigurer(Configurer configurer) {
        this.configurer = null;
    }

    void bindFabricService(FabricService fabricService) {
        this.fabricService.bind(fabricService);
    }

    void unbindFabricService(FabricService fabricService) {
        this.fabricService.unbind(fabricService);
    }

    void bindCurator(CuratorFramework curator) {
        this.curator.bind(curator);
        zkMasterCache = new ZooKeeperMasterCache(curator);
    }

    void unbindCurator(CuratorFramework curator) {
        this.curator.unbind(curator);
    }

    void bindProcessManager(ProcessManager processManager) {
        this.processManager.bind(processManager);
    }

    void unbindProcessManager(ProcessManager processManager) {
        this.processManager.unbind(processManager);
    }


    private void checkProcessesStatus() {
        ProcessManager manager = getProcessManager();
        FabricService fabric = getFabricService();
        if (manager != null && fabric != null) {
            Set<String> aliveIds = new HashSet<>();
            ImmutableMap<String, Installation> map = manager.listInstallationMap();
            ImmutableSet<Map.Entry<String, Installation>> entries = map.entrySet();
            for (Map.Entry<String, Installation> entry : entries) {
                String id = entry.getKey();
                Installation installation = entry.getValue();
                try {
                    Container container = null;
                    try {
                        container = fabric.getContainer(id);
                    } catch (Exception e) {
                        LOG.debug("No container for id: " + id + ". " + e, e);
                    }
                    if (container != null) {
                        Long pid = installation.getActivePid();
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Polling container " + id + " for its PID");
                        }
                        if (pid == null) {
                            if (container.isAlive()) {
                                container.setAlive(false);
                            }
                        } else if (pid != null && pid != 0) {
                            if (!container.isAlive()) {
                                container.setAlive(true);
                            }
                            if (!Objects.equal(container.getProvisionResult(), Container.PROVISION_SUCCESS)) {
                                container.setProvisionResult(Container.PROVISION_SUCCESS);
                            }
                            aliveIds.add(id);

                            Map<String, String> envVars = ProcessManagerController.getInstallationProxyPorts(installation);
                            List<String> newZkPaths = JolokiaAgentHelper.jolokiaKeepAliveCheck(zkMasterCache, fabric, container, envVars);
                            if (!newZkPaths.isEmpty()) {
                                ChildContainerController controller = getControllerForContainer(container);
                                if (controller instanceof ProcessManagerController) {
                                    ProcessManagerController processManagerController = (ProcessManagerController) controller;
                                    addZooKeeperPaths(installation, container, newZkPaths);
                                }
                            }
                        }
                    }
                } catch (Exception e) {
                    LOG.warn("Failed to get PID for process " + id + ". " + e, e);
                }
            }
            deleteContainerPathsForDeadContainers(aliveIds);
        }
    }
}
TOP

Related Classes of io.fabric8.container.process.ProcessControllerFactoryService

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.