Package org.springframework.osgi.service.importer.support.internal.aop

Source Code of org.springframework.osgi.service.importer.support.internal.aop.ServiceDynamicInterceptor

/*
* Copyright 2006-2008 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*      http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.osgi.service.importer.support.internal.aop;

import java.util.Collections;
import java.util.Iterator;
import java.util.List;

import org.osgi.framework.BundleContext;
import org.osgi.framework.Constants;
import org.osgi.framework.Filter;
import org.osgi.framework.ServiceEvent;
import org.osgi.framework.ServiceListener;
import org.osgi.framework.ServiceReference;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.osgi.service.ServiceUnavailableException;
import org.springframework.osgi.service.importer.DefaultOsgiServiceDependency;
import org.springframework.osgi.service.importer.OsgiServiceDependency;
import org.springframework.osgi.service.importer.OsgiServiceLifecycleListener;
import org.springframework.osgi.service.importer.ServiceProxyDestroyedException;
import org.springframework.osgi.service.importer.event.OsgiServiceDependencyWaitEndedEvent;
import org.springframework.osgi.service.importer.event.OsgiServiceDependencyWaitTimedOutEvent;
import org.springframework.osgi.service.importer.event.OsgiServiceDependencyWaitStartingEvent;
import org.springframework.osgi.service.importer.support.internal.dependency.ImporterStateListener;
import org.springframework.osgi.service.importer.support.internal.support.DefaultRetryCallback;
import org.springframework.osgi.service.importer.support.internal.support.RetryCallback;
import org.springframework.osgi.service.importer.support.internal.support.RetryTemplate;
import org.springframework.osgi.service.importer.support.internal.support.ServiceWrapper;
import org.springframework.osgi.service.importer.support.internal.util.OsgiServiceBindingUtils;
import org.springframework.osgi.util.OsgiListenerUtils;
import org.springframework.osgi.util.OsgiServiceReferenceUtils;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;

/**
* Interceptor adding dynamic behaviour for unary service (..1 cardinality). It
* will look for a service using the given filter, retrying if the service is
* down or unavailable. Will dynamically rebound a new service, if one is
* available with a higher service ranking. <p/> <p/> In case no service is
* available, it will throw an exception.
*
* <p/> <strong>Note</strong>: this is a stateful interceptor and should not be
* shared.
*
* @author Costin Leau
*/
public class ServiceDynamicInterceptor extends ServiceInvoker implements InitializingBean,
    ApplicationEventPublisherAware {

  /**
   * Override the default implementation to plug in event notification.
   *
   * @author Costin Leau
   *
   */
  private class EventSenderRetryTemplate extends RetryTemplate {

    public EventSenderRetryTemplate(int retryNumbers, long waitTime) {
      super(retryNumbers, waitTime, lock);
    }

    public EventSenderRetryTemplate() {
      super(lock);
    }

    public Object execute(RetryCallback callback) {
      //send event
      publishEvent(new OsgiServiceDependencyWaitStartingEvent(eventSource, dependency, this.getWaitTime()
          * this.getRetryNumbers()));

      Object result = null;

      long start = System.currentTimeMillis();
      long stop;

      try {
        result = super.execute(callback);
        stop = System.currentTimeMillis() - start;
      }
      catch (RuntimeException exception) {
        stop = System.currentTimeMillis() - start;
        publishEvent(new OsgiServiceDependencyWaitTimedOutEvent(eventSource, dependency, stop));
        throw exception;
      }

      // send finalization event
      if (callback.isComplete(result)) {
        publishEvent(new OsgiServiceDependencyWaitEndedEvent(eventSource, dependency, stop));
      }
      else {
        publishEvent(new OsgiServiceDependencyWaitTimedOutEvent(eventSource, dependency, stop));
      }

      return result;
    }
  }

  /**
   * Listener tracking the OSGi services which form the dynamic reference.
   */
  // NOTE: while the listener here seems to share the same functionality as
  // the one in ServiceCollection in reality there are a big number of
  // differences in them - for example this one supports rebind
  // while the collection does not.
  //
  // the only common part is the TCCL handling before calling the listeners.
  private class Listener implements ServiceListener {

    public void serviceChanged(ServiceEvent event) {
      ClassLoader tccl = Thread.currentThread().getContextClassLoader();
      try {
        Thread.currentThread().setContextClassLoader(classLoader);
        ServiceReference ref = event.getServiceReference();

        // service id
        long serviceId = ((Long) ref.getProperty(Constants.SERVICE_ID)).longValue();
        // service ranking
        Integer rank = (Integer) ref.getProperty(Constants.SERVICE_RANKING);
        int ranking = (rank == null ? 0 : rank.intValue());

        boolean debug = log.isDebugEnabled();

        switch (event.getType()) {

          case (ServiceEvent.REGISTERED):
            // same as ServiceEvent.REGISTERED
          case (ServiceEvent.MODIFIED): {
            // flag indicating if the service is bound or rebound
            boolean servicePresent = false;

            synchronized (lock) {
              servicePresent = (wrapper != null && wrapper.isServiceAlive());
            }

            if (updateWrapperIfNecessary(ref, serviceId, ranking)) {
              // inform listeners
              OsgiServiceBindingUtils.callListenersBind(bundleContext, proxy, ref, listeners);

              if (!servicePresent) {
                notifySatisfiedStateListeners();
              }
            }

            break;
          }
          case (ServiceEvent.UNREGISTERING): {

            boolean serviceRemoved = false;
            /**
             * used if the service goes down and there is no
             * replacement
             */
            /**
             * since the listeners will require a valid proxy, the
             * invalidation has to happen *after* calling the
             * listeners
             */
            ServiceWrapper oldWrapper = wrapper;

            synchronized (lock) {
              // remove service
              if (wrapper != null) {
                if (serviceId == wrapper.getServiceId()) {
                  serviceRemoved = true;
                  wrapper = null;

                }
              }
            }

            ServiceReference newReference = null;

            boolean isDestroyed = false;

            synchronized (lock) {
              isDestroyed = destroyed;
            }

            // discover a new reference only if we are still running
            if (!isDestroyed) {
              newReference = OsgiServiceReferenceUtils.getServiceReference(bundleContext,
                (filter == null ? null : filter.toString()));

              // we have a rebind (a new service was bound)
              // so another candidate has to be searched from the existing candidates
              // - as they are alive already, we have to send an event for them ourselves
              // MODIFIED will be used for clarity
              if (newReference != null) {
                // update the listeners (through a MODIFIED event
                serviceChanged(new ServiceEvent(ServiceEvent.MODIFIED, newReference));
              }
            }

            // if no new reference was found and the service was indeed removed (it was bound to the interceptor)
            // then do an unbind
            if (newReference == null && serviceRemoved) {

              // reuse the old service for the time being
              synchronized (lock) {
                wrapper = oldWrapper;
              }

              // inform listeners
              OsgiServiceBindingUtils.callListenersUnbind(bundleContext, proxy, ref, listeners);

              // clean up wrapper
              synchronized (lock) {
                wrapper = null;
              }

              if (debug) {
                String message = "Service reference [" + ref + "] was unregistered";
                if (serviceRemoved) {
                  message += " and unbound from the service proxy";
                }
                else {
                  message += " but did not affect the service proxy";
                }
                log.debug(message);
              }

              // update internal state listeners (unsatisfied event)
              notifyUnsatisfiedStateListeners();
            }

            break;
          }
          default:
            throw new IllegalArgumentException("unsupported event type");
        }
      }
      catch (Throwable e) {
        // The framework will swallow these exceptions without logging,
        // so log them here
        log.fatal("Exception during service event handling", e);
      }
      finally {
        Thread.currentThread().setContextClassLoader(tccl);
      }
    }

    private void notifySatisfiedStateListeners() {
      synchronized (stateListeners) {
        for (Iterator iterator = stateListeners.iterator(); iterator.hasNext();) {
          ImporterStateListener stateListener = (ImporterStateListener) iterator.next();
          stateListener.importerSatisfied(eventSource, dependency);
        }
      }
    }

    private void notifyUnsatisfiedStateListeners() {
      synchronized (stateListeners) {
        for (Iterator iterator = stateListeners.iterator(); iterator.hasNext();) {
          ImporterStateListener stateListener = (ImporterStateListener) iterator.next();
          stateListener.importerUnsatisfied(eventSource, dependency);
        }
      }
    }

    private boolean updateWrapperIfNecessary(ServiceReference ref, long serviceId, int serviceRanking) {
      boolean updated = false;
      try {
        synchronized (lock) {
          if (wrapper != null && wrapper.isServiceAlive()) {
            // if have a higher rank service
            if (serviceRanking > wrapper.getServiceRanking()) {
              updated = true;
              updateReferenceHolders(ref);
            }
            // if equality, use the service id
            if (serviceRanking == wrapper.getServiceRanking()) {
              if (serviceId < wrapper.getServiceId()) {
                updated = true;
                updateReferenceHolders(ref);
              }
            }
          }
          // we don't have any valid services bounded yet so just bind
          // the new
          // one
          else {
            updated = true;
            updateReferenceHolders(ref);
          }
          lock.notifyAll();
          return updated;
        }
      }
      finally {
        if (log.isDebugEnabled()) {
          String message = "Service reference [" + ref + "]";
          if (updated)
            message += " bound to proxy";
          else
            message += " not bound to proxy";
          log.debug(message);
        }
      }
    }

    /**
     * Update internal holders for the backing ServiceReference.
     *
     * @param ref
     */
    private void updateReferenceHolders(ServiceReference ref) {
      // no need for a lock since this method is called from a synchronized block
      wrapper = new ServiceWrapper(ref, bundleContext);
      referenceDelegate.swapDelegates(ref);
    }
  }


  private static final int hashCode = ServiceDynamicInterceptor.class.hashCode() * 13;

  private final BundleContext bundleContext;

  private final Filter filter;

  /** TCCL to set when calling listeners */
  private final ClassLoader classLoader;

  private final SwappingServiceReferenceProxy referenceDelegate;

  /** event listener */
  private final ServiceListener listener;

  /** mandatory flag */
  private boolean serviceRequiredAtStartup = true;

  /** flag indicating whether the destruction has started or not */
  private boolean isDuringDestruction = false;

  /** flag indicating whether the proxy is already destroyed or not */
  private boolean destroyed = false;

  /** private lock */
  /**
   * used for reading/setting property and sending notifications between the
   * event listener and any threads waiting for an OSGi service to appear
   */
  private final Object lock = new Object();

  /** utility service wrapper */
  private ServiceWrapper wrapper;

  /** retry template */
  private final RetryTemplate retryTemplate = new EventSenderRetryTemplate();

  /** dependable service importer */
  private Object eventSource;

  /** event source (importer) name */
  private String sourceName;

  /** listener that need to be informed of bind/rebind/unbind */
  private OsgiServiceLifecycleListener[] listeners = new OsgiServiceLifecycleListener[0];

  /** reference to the created proxy passed to the listeners */
  private Object proxy;

  /** event publisher */
  private ApplicationEventPublisher applicationEventPublisher;

  /** dependency object */
  private OsgiServiceDependency dependency;

  /** internal state listeners */
  private List stateListeners = Collections.EMPTY_LIST;


  public ServiceDynamicInterceptor(BundleContext context, Filter filter, ClassLoader classLoader) {
    this.bundleContext = context;
    this.filter = filter;
    this.classLoader = classLoader;

    referenceDelegate = new SwappingServiceReferenceProxy();
    listener = new Listener();
  }

  public Object getTarget() {
    Object target = lookupService();

    // nothing found
    if (target == null) {
      throw new ServiceUnavailableException(filter);
    }
    return target;
  }

  /**
   * Look the service by waiting the service to appear. Note this method
   * should use the same lock as the listener handling the service reference.
   */
  private Object lookupService() {
    synchronized (lock) {
      return (Object) retryTemplate.execute(new DefaultRetryCallback() {

        public Object doWithRetry() {
          // before checking for a service, check whether the proxy is still valid
          if (destroyed && !isDuringDestruction)
            throw new ServiceProxyDestroyedException();

          return (wrapper != null) ? wrapper.getService() : null;
        }
      });
    }
  }

  private void publishEvent(ApplicationEvent event) {
    if (applicationEventPublisher != null) {
      if (log.isTraceEnabled())
        log.trace("Publishing event through publisher " + applicationEventPublisher);
      try {
        applicationEventPublisher.publishEvent(event);
      }
      catch (IllegalStateException ise) {
        log.debug(
          "Event "
              + event
              + " not published as the publisher is not initialized - usually this is caused by eager initialization of the importers by post processing",
          ise);
      }

    }
    else if (log.isTraceEnabled())
      log.trace("No application event publisher set; no events will be published");
  }

  public void afterPropertiesSet() {
    Assert.notNull(proxy);
    Assert.notNull(eventSource);

    boolean debug = log.isDebugEnabled();

    dependency = new DefaultOsgiServiceDependency(sourceName, filter, serviceRequiredAtStartup);

    if (debug)
      log.debug("Adding OSGi mandatoryListeners for services matching [" + filter + "]");
    OsgiListenerUtils.addSingleServiceListener(bundleContext, listener, filter);

    if (serviceRequiredAtStartup) {
      if (debug)
        log.debug("1..x cardinality - looking for service [" + filter + "] at startup...");
      Object target = getTarget();
      if (debug)
        log.debug("Service retrieved " + target);
    }
  }

  public void destroy() {
    OsgiListenerUtils.removeServiceListener(bundleContext, listener);
    synchronized (lock) {
      // set this flag first to make sure no rebind is done
      destroyed = true;
      isDuringDestruction = true;
      if (wrapper != null) {
        ServiceReference ref = wrapper.getReference();
        if (ref != null) {
          // send unregistration event to the listener
          listener.serviceChanged(new ServiceEvent(ServiceEvent.UNREGISTERING, ref));
        }
      }
      /** destruction process has ended */
      isDuringDestruction = false;
      // notify also any proxies that still wait on the service
      lock.notifyAll();
    }
  }

  /**
   * {@inheritDoc}
   *
   * This particular interceptor returns a delegated service reference so that
   * callers can keep the reference even if the underlying target service
   * reference changes in time.
   */
  public ServiceReference getServiceReference() {
    return referenceDelegate;
  }

  public void setRetryParams(int numberRetries, long timeout) {
    retryTemplate.reset(numberRetries, timeout);
  }

  public RetryTemplate getRetryTemplate() {
    return retryTemplate;
  }

  public OsgiServiceLifecycleListener[] getListeners() {
    return listeners;
  }

  public void setListeners(OsgiServiceLifecycleListener[] listeners) {
    this.listeners = listeners;
  }

  public void setServiceImporter(Object importer) {
    this.eventSource = importer;
  }

  public void setServiceImporterName(String name) {
    this.sourceName = name;
  }

  public void setRequiredAtStartup(boolean requiredAtStartup) {
    this.serviceRequiredAtStartup = requiredAtStartup;
  }

  public void setProxy(Object proxy) {
    this.proxy = proxy;
  }

  public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
    this.applicationEventPublisher = applicationEventPublisher;
  }

  /** Internal state listeners */
  public void setStateListeners(List stateListeners) {
    this.stateListeners = stateListeners;
  }

  public boolean equals(Object other) {
    if (this == other)
      return true;
    if (other instanceof ServiceDynamicInterceptor) {
      ServiceDynamicInterceptor oth = (ServiceDynamicInterceptor) other;
      return (serviceRequiredAtStartup == oth.serviceRequiredAtStartup
          && ObjectUtils.nullSafeEquals(wrapper, oth.wrapper)
          && ObjectUtils.nullSafeEquals(filter, oth.filter) && ObjectUtils.nullSafeEquals(retryTemplate,
        oth.retryTemplate));
    }
    else
      return false;
  }

  public int hashCode() {
    return hashCode;
  }
}
TOP

Related Classes of org.springframework.osgi.service.importer.support.internal.aop.ServiceDynamicInterceptor

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.