Package org.apache.servicemix.jbi.framework

Source Code of org.apache.servicemix.jbi.framework.EndpointRegistry$InterfaceConnection

/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements.  See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License.  You may obtain a copy of the License at
*
*      http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.servicemix.jbi.framework;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

import javax.jbi.JBIException;
import javax.jbi.component.ComponentContext;
import javax.jbi.servicedesc.ServiceEndpoint;
import javax.management.JMException;
import javax.management.ObjectName;
import javax.xml.namespace.QName;

import org.apache.servicemix.jbi.event.EndpointEvent;
import org.apache.servicemix.jbi.event.EndpointListener;
import org.apache.servicemix.jbi.framework.support.EndpointProcessor;
import org.apache.servicemix.jbi.servicedesc.AbstractServiceEndpoint;
import org.apache.servicemix.jbi.servicedesc.ExternalEndpoint;
import org.apache.servicemix.jbi.servicedesc.InternalEndpoint;
import org.apache.servicemix.jbi.servicedesc.LinkedEndpoint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Registry for Components
*
* @version $Revision: 1148931 $
*/
public class EndpointRegistry {
   
    private static final transient Logger LOGGER = LoggerFactory.getLogger(EndpointRegistry.class);
   
    private Registry registry;
   
    private Map<AbstractServiceEndpoint, Endpoint> endpointMBeans;
   
    private Map<String, ServiceEndpoint> internalEndpoints;
   
    private Map<String, ServiceEndpoint> externalEndpoints;
   
    private Map<String, ServiceEndpoint> linkedEndpoints;
   
    private Map<QName, InterfaceConnection> interfaceConnections;
   
    private List<EndpointProcessor> endpointProcessors;
   
    private Executor executor = Executors.newSingleThreadExecutor();
   
    /**
     * Constructor
     *
     * @param cr
     */
    public EndpointRegistry(Registry registry) {
        this.registry = registry;
        this.endpointMBeans = new ConcurrentHashMap<AbstractServiceEndpoint, Endpoint>();
        this.internalEndpoints = new ConcurrentHashMap<String, ServiceEndpoint>();
        this.externalEndpoints = new ConcurrentHashMap<String, ServiceEndpoint>();
        this.linkedEndpoints = new ConcurrentHashMap<String, ServiceEndpoint>();
        this.interfaceConnections = new ConcurrentHashMap<QName, InterfaceConnection>();
        this.endpointProcessors = getEndpointProcessors();
        this.executor.execute(new Runnable() {
            public void run() {
                LOGGER.debug("Initializing endpoint event dispatch thread");
            }
        });
    }
   
    private List<EndpointProcessor> getEndpointProcessors() {
        List<EndpointProcessor> l = new ArrayList<EndpointProcessor>();
        String[] classes = {"org.apache.servicemix.jbi.framework.support.SUDescriptorProcessor",
                            "org.apache.servicemix.jbi.framework.support.WSDL1Processor",
                            "org.apache.servicemix.jbi.framework.support.WSDL2Processor" };
        for (int i = 0; i < classes.length; i++) {
            try {
                EndpointProcessor p = (EndpointProcessor) Class.forName(classes[i]).newInstance();
                p.init(registry);
                l.add(p);
            } catch (Throwable e) {
                LOGGER.warn("Disabled endpoint processor '{}", classes[i], e);
            }
        }
        return l;
    }
   
    public ServiceEndpoint[] getEndpointsForComponent(ComponentNameSpace cns) {
        Collection<ServiceEndpoint> endpoints = new ArrayList<ServiceEndpoint>();
        for (Iterator<ServiceEndpoint> iter = getInternalEndpoints().iterator(); iter.hasNext();) {
            InternalEndpoint endpoint = (InternalEndpoint) iter.next();
            if (cns.equals(endpoint.getComponentNameSpace())) {
                endpoints.add(endpoint);
            }
        }
        return asEndpointArray(endpoints);
    }
   
    public ServiceEndpoint[] getAllEndpointsForComponent(ComponentNameSpace cns) {
        Collection<ServiceEndpoint> endpoints = new ArrayList<ServiceEndpoint>();
        for (Iterator<ServiceEndpoint> iter = getInternalEndpoints().iterator(); iter.hasNext();) {
            InternalEndpoint endpoint = (InternalEndpoint) iter.next();
            if (cns.equals(endpoint.getComponentNameSpace())) {
                endpoints.add(endpoint);
            }
        }
        for (Iterator<ServiceEndpoint> iter = getExternalEndpoints().iterator(); iter.hasNext();) {
            ExternalEndpoint endpoint = (ExternalEndpoint) iter.next();
            if (cns.equals(endpoint.getComponentNameSpace())) {
                endpoints.add(endpoint);
            }
        }
        return asEndpointArray(endpoints);
    }
   
    /**
     * Returns a collection of Endpoint objects
     */
    public Collection<Endpoint> getEndpointMBeans() {
        return endpointMBeans.values();
    }

    /**
     * Get all endpoints for a given service
     *
     * @param serviceName
     * @return array of endpoints
     */
    public ServiceEndpoint[] getEndpointsForService(QName serviceName) {
        Collection<ServiceEndpoint> collection = getEndpointsByService(serviceName, getInternalEndpoints());
        return asEndpointArray(collection);
    }

    /**
     * This will return the endpoints for all services and endpoints that implement the named interface (portType in
     * WSDL 1.1). This method does NOT include external endpoints.
     *
     * @param interfaceName qualified name of interface/portType that is implemented by the endpoint; if
     * <code>null</code> then all activated endpoints in the JBI environment must be returned.
     * @return an array of available endpoints for the specified interface name; must be non-null; may be empty.
     */
    public ServiceEndpoint[] getEndpointsForInterface(QName interfaceName) {
        if (interfaceName == null) {
            return asEndpointArray(internalEndpoints.values());
        }
        InterfaceConnection conn = interfaceConnections.get(interfaceName);
        if (conn != null) {
            String key = getKey(conn.service, conn.endpoint);
            ServiceEndpoint ep = internalEndpoints.get(key);
            if (ep == null) {
                LOGGER.warn("Connection for interface " + interfaceName + " could not find target for service "
                                + conn.service + " and endpoint " + conn.endpoint);
                return new ServiceEndpoint[0];
            } else {
                return new ServiceEndpoint[] {ep };
            }
        }
        Collection<ServiceEndpoint> result = getEndpointsByInterface(interfaceName, getInternalEndpoints());
        return asEndpointArray(result);
    }

    /**
     * Activate an endpoint
     *
     * @param provider
     * @param serviceName
     * @param endpointName
     * @return the endpoint
     * @throws JBIException
     */
    public InternalEndpoint registerInternalEndpoint(ComponentContextImpl provider,
                                                     QName serviceName,
                                                     String endpointName) throws JBIException {
        // Create endpoint
        String key = getKey(serviceName, endpointName);
        InternalEndpoint registered = (InternalEndpoint) internalEndpoints.get(key);
        // Check if the endpoint has already been activated by another component
        if (registered != null && registered.isLocal()) {
            throw new JBIException("An internal endpoint for service " + serviceName
                                        + " and endpoint " + endpointName + " is already registered");
        }       
        // Create a new endpoint
        InternalEndpoint serviceEndpoint = new InternalEndpoint(provider.getComponentNameSpace(), endpointName, serviceName);
        // Get interface from activationSpec
        if (provider.getActivationSpec().getInterfaceName() != null) {
            serviceEndpoint.addInterface(provider.getActivationSpec().getInterfaceName());
        }
        // Get interfaces
        for (Iterator<EndpointProcessor> it = endpointProcessors.iterator(); it.hasNext();) {
            EndpointProcessor p = it.next();
            p.process(serviceEndpoint);
        }
        // Set remote namespaces
        if (registered != null) {
            InternalEndpoint[] remote = registered.getRemoteEndpoints();
            for (int i = 0; i < remote.length; i++) {
                serviceEndpoint.addRemoteEndpoint(remote[i]);
            }
        }
        // Register endpoint
        internalEndpoints.put(key, serviceEndpoint);
        registerEndpoint(serviceEndpoint);
        fireEvent(serviceEndpoint, EndpointEvent.INTERNAL_ENDPOINT_REGISTERED);
        return serviceEndpoint;
    }

    /**
     * Called by component context when endpoints are being deactivated.
     *
     * @param provider
     * @param serviceEndpoint
     */
    public void unregisterInternalEndpoint(ComponentContext provider, InternalEndpoint serviceEndpoint) {
        if (serviceEndpoint.isClustered()) {
            fireEvent(serviceEndpoint, EndpointEvent.INTERNAL_ENDPOINT_UNREGISTERED);
            // set endpoint to be no more local
            serviceEndpoint.setComponentName(null);
        } else {
            String key = getKey(serviceEndpoint);
            internalEndpoints.remove(key);
            unregisterEndpoint(serviceEndpoint);
            fireEvent(serviceEndpoint, EndpointEvent.INTERNAL_ENDPOINT_UNREGISTERED);
        }
    }
   
    /**
     * Registers a remote endpoint
     *
     * @param remote
     */
    public void registerRemoteEndpoint(InternalEndpoint remote) {
        InternalEndpoint endpoint = (InternalEndpoint) internalEndpoints.get(getKey(remote));
        // Create endpoint if not already existing
        if (endpoint == null) {
            endpoint = new InternalEndpoint(null, remote.getEndpointName(), remote.getServiceName());
            internalEndpoints.put(getKey(endpoint), endpoint);
        }
        // Add remote endpoint
        endpoint.addRemoteEndpoint(remote);
        fireEvent(remote, EndpointEvent.REMOTE_ENDPOINT_REGISTERED);
    }
   
    /**
     * Unregisters a remote endpoint
     *
     * @param remote
     */
    public void unregisterRemoteEndpoint(InternalEndpoint remote) {
        String key = getKey(remote);
        InternalEndpoint endpoint = (InternalEndpoint) internalEndpoints.get(key);
        if (endpoint != null) {
            endpoint.removeRemoteEndpoint(remote);
            if (!endpoint.isClustered() && !endpoint.isLocal()) {
                internalEndpoints.remove(key);
                unregisterEndpoint(endpoint);
            }
            fireEvent(remote, EndpointEvent.REMOTE_ENDPOINT_UNREGISTERED);
        }
    }

    /**
     * Get the named ServiceEndpoint, if activated
     *
     * @param service
     * @param name
     * @return the activated ServiceEndpoint or null
     */
    public ServiceEndpoint getEndpoint(QName service, String name) {
        String key = getKey(service, name);
        ServiceEndpoint ep = linkedEndpoints.get(key);
        if (ep == null) {
            ep = internalEndpoints.get(key);
        }
        return ep;
    }
   
    public ServiceEndpoint getInternalEndpoint(QName service, String name) {
        return internalEndpoints.get(getKey(service, name));
    }

    /**
     * Registers the given external endpoint with the NMR. This indicates to the NMR that the given endpoint is used as
     * a proxy for external service consumers to access an internal service of the same service name (but a different
     * endpoint name).
     *
     * @param cns
     * @param externalEndpoint the external endpoint to be registered, must be non-null.
     * @throws JBIException
     */
    public void registerExternalEndpoint(ComponentNameSpace cns, ServiceEndpoint externalEndpoint) throws JBIException {
        ExternalEndpoint serviceEndpoint = new ExternalEndpoint(cns, externalEndpoint);
        if (externalEndpoints.get(getKey(serviceEndpoint)) != null) {
            throw new JBIException("An external endpoint for service " + externalEndpoint.getServiceName()
                                    + " and endpoint " + externalEndpoint.getEndpointName() + " is already registered");
        }
        registerEndpoint(serviceEndpoint);
        externalEndpoints.put(getKey(serviceEndpoint), serviceEndpoint);
        fireEvent(serviceEndpoint, EndpointEvent.EXTERNAL_ENDPOINT_REGISTERED);
    }

    /**
     * Deregisters the given external endpoint with the NMR. This indicates to the NMR that the given external endpoint
     * can no longer be used as a proxy for external service consumers to access an internal service of the same service
     * name.
     *
     * @param cns
     * @param externalEndpoint the external endpoint to be deregistered; must be non-null.
     */
    public void unregisterExternalEndpoint(ComponentNameSpace cns, ServiceEndpoint externalEndpoint) {
        ExternalEndpoint ep = (ExternalEndpoint) externalEndpoints.remove(getKey(externalEndpoint));
        unregisterEndpoint(ep);
        fireEvent(ep, EndpointEvent.EXTERNAL_ENDPOINT_UNREGISTERED);
    }

    /**
     * This methods returns only registered external endpoints
     *
     * @param interfaceName qualified name of interface implemented by the endpoints; must be non-null.
     * @return an array of available external endpoints for the specified interface name; must be non-null; may be
     * empty.
     */
    public ServiceEndpoint[] getExternalEndpointsForInterface(QName interfaceName) {
        Collection<ServiceEndpoint> endpoints = getEndpointsByInterface(interfaceName, getExternalEndpoints());
        return asEndpointArray(endpoints);
    }

    /**
     * Get external endpoints for the service
     *
     * @param serviceName qualified name of service that contains the endpoints; must be non-null.
     * @return an array of available external endpoints for the specified service name; must be non-null; may be empty.
     */
    public ServiceEndpoint[] getExternalEndpointsForService(QName serviceName) {
        Collection<ServiceEndpoint> endpoints = getEndpointsByService(serviceName, getExternalEndpoints());
        return asEndpointArray(endpoints);
    }

    /**
     * Helper method to convert the given collection into an array of endpoints
     *
     * @param collection
     * @return array of endpoints
     */
    protected ServiceEndpoint[] asEndpointArray(Collection<ServiceEndpoint> collection) {
        if (collection == null) {
            return new ServiceEndpoint[0];
        }
        ServiceEndpoint[] answer = new ServiceEndpoint[collection.size()];
        answer = collection.toArray(answer);
        return answer;
    }

    /**
     * return a collection of endpoints
     *
     * @param serviceName
     * @param endpoints
     * @return collection of endpoints
     */
    protected Collection<ServiceEndpoint> getEndpointsByService(QName serviceName, Collection<ServiceEndpoint> endpoints) {
        Collection<ServiceEndpoint> answer = new ArrayList<ServiceEndpoint>();
        for (Iterator<ServiceEndpoint> i = endpoints.iterator(); i.hasNext();) {
            ServiceEndpoint endpoint = i.next();
            if (endpoint.getServiceName().equals(serviceName)) {
                answer.add(endpoint);
            }
        }
        return answer;
    }
   
    /**
     * Filters the given endpoints and returns those implementing the
     * given interface name.  If interfaceName is null, then no filter
     * is applied.
     *
     */
    protected Collection<ServiceEndpoint> getEndpointsByInterface(QName interfaceName, Collection<ServiceEndpoint> endpoints) {
        if (interfaceName == null) {
            return endpoints;
        }
        Set<ServiceEndpoint> answer = new HashSet<ServiceEndpoint>();
        for (Iterator<ServiceEndpoint> i = endpoints.iterator(); i.hasNext();) {
            ServiceEndpoint endpoint = i.next();
            QName[] interfaces = endpoint.getInterfaces();
            if (interfaces != null) {
                for (int k = 0; k < interfaces.length; k++) {
                    QName qn = interfaces[k];
                    if (qn != null && qn.equals(interfaceName)) {
                        answer.add(endpoint);
                        break;
                    }
                }
            }
        }
        return answer;
    }

    /**
     * @return all default endpoints
     */
    protected Collection<ServiceEndpoint> getInternalEndpoints() {
        return internalEndpoints.values();
    }

    /**
     * @return all external endpoints
     */
    protected Collection<ServiceEndpoint> getExternalEndpoints() {
        return externalEndpoints.values();
    }

    /**
     * Registers an endpoint connection.
     *
     * @param fromSvc
     * @param fromEp
     * @param toSvc
     * @param toEp
     * @param link
     * @throws JBIException
     */
    public void registerEndpointConnection(QName fromSvc, String fromEp,
                                           QName toSvc, String toEp, String link) throws JBIException {
        LinkedEndpoint ep = new LinkedEndpoint(fromSvc, fromEp, toSvc, toEp, link);
        if (linkedEndpoints.get(getKey(ep)) != null) {
            throw new JBIException("An endpoint connection for service " + ep.getServiceName() + " and name "
                                        + ep.getEndpointName() + " is already registered");
        }
        linkedEndpoints.put(getKey(ep), ep);
        registerEndpoint(ep);
        fireEvent(ep, EndpointEvent.LINKED_ENDPOINT_REGISTERED);
    }

    /**
     * Unregister an endpoint connection.
     *
     * @param fromSvc
     * @param fromEp
     */
    public void unregisterEndpointConnection(QName fromSvc, String fromEp) {
        LinkedEndpoint ep = (LinkedEndpoint) linkedEndpoints.remove(getKey(fromSvc, fromEp));
        unregisterEndpoint(ep);
        fireEvent(ep, EndpointEvent.LINKED_ENDPOINT_UNREGISTERED);
    }
   
    /**
     * Registers an interface connection.
     *
     * @param fromItf
     * @param toSvc
     * @param toEp
     * @throws JBIException
     */
    public void registerInterfaceConnection(QName fromItf, QName toSvc, String toEp) throws JBIException {
        if (interfaceConnections.get(fromItf) != null) {
            throw new JBIException("An interface connection for " + fromItf + " is already registered");
        }
        interfaceConnections.put(fromItf, new InterfaceConnection(toSvc, toEp));
    }

    /**
     * Unregisters an interface connection.
     *
     * @param fromItf
     */
    public void unregisterInterfaceConnection(QName fromItf) {
        interfaceConnections.remove(fromItf);
       
    }
   
    private void registerEndpoint(AbstractServiceEndpoint serviceEndpoint) {
        try {
            Endpoint endpoint = new Endpoint(serviceEndpoint, registry);
            ObjectName objectName = registry.getContainer().getManagementContext().createObjectName(endpoint);
            registry.getContainer().getManagementContext().registerMBean(objectName, endpoint, EndpointMBean.class);
            endpointMBeans.put(serviceEndpoint, endpoint);
        } catch (JMException e) {
            LOGGER.error("Could not register MBean for endpoint", e);
        }
    }
   
    private void unregisterEndpoint(AbstractServiceEndpoint se) {
        Endpoint ep = endpointMBeans.remove(se);
        try {
            registry.getContainer().getManagementContext().unregisterMBean(ep);
        } catch (JBIException e) {
            LOGGER.error("Could not unregister MBean for endpoint", e);
        }
    }

    private String getKey(ServiceEndpoint ep) {
        return getKey(ep.getServiceName(), ep.getEndpointName());
    }
   
    private String getKey(QName svcName, String epName) {
        return svcName + epName;
    }

    private static class InterfaceConnection {
        QName service;
        String endpoint;
        InterfaceConnection(QName service, String endpoint) {
            this.service = service;
            this.endpoint = endpoint;
        }
    }

    protected synchronized void fireEvent(final ServiceEndpoint ep, final int type) {
        executor.execute(new Runnable() {
            public void run() {
                EndpointEvent event = new EndpointEvent(ep, type);
                EndpointListener[] listeners = (EndpointListener[]) registry.getContainer().getListeners(EndpointListener.class);
                for (int i = 0; i < listeners.length; i++) {
                    switch (type) {
                    case EndpointEvent.INTERNAL_ENDPOINT_REGISTERED:
                        listeners[i].internalEndpointRegistered(event);
                        break;
                    case EndpointEvent.INTERNAL_ENDPOINT_UNREGISTERED:
                        listeners[i].internalEndpointUnregistered(event);
                        break;
                    case EndpointEvent.EXTERNAL_ENDPOINT_REGISTERED:
                        listeners[i].externalEndpointRegistered(event);
                        break;
                    case EndpointEvent.EXTERNAL_ENDPOINT_UNREGISTERED:
                        listeners[i].externalEndpointUnregistered(event);
                        break;
                    case EndpointEvent.LINKED_ENDPOINT_REGISTERED:
                        listeners[i].linkedEndpointRegistered(event);
                        break;
                    case EndpointEvent.LINKED_ENDPOINT_UNREGISTERED:
                        listeners[i].linkedEndpointUnregistered(event);
                        break;
                    case EndpointEvent.REMOTE_ENDPOINT_REGISTERED:
                        listeners[i].remoteEndpointRegistered(event);
                        break;
                    case EndpointEvent.REMOTE_ENDPOINT_UNREGISTERED:
                        listeners[i].remoteEndpointUnregistered(event);
                        break;
                    default:
                        break;
                    }
                }
            }
        });
    }

}
TOP

Related Classes of org.apache.servicemix.jbi.framework.EndpointRegistry$InterfaceConnection

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.