Package org.mokai.impl.camel

Source Code of org.mokai.impl.camel.CamelRoutingEngine

package org.mokai.impl.camel;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import javax.management.InstanceAlreadyExistsException;

import net.gescobar.jmx.Management;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.camel.CamelContext;
import org.apache.camel.ExchangePattern;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.jms.JmsComponent;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.Validate;
import org.mokai.Connector;
import org.mokai.ConnectorService;
import org.mokai.ExecutionException;
import org.mokai.ExposableConfiguration;
import org.mokai.Message;
import org.mokai.Message.Direction;
import org.mokai.ObjectAlreadyExistsException;
import org.mokai.ObjectNotFoundException;
import org.mokai.RoutingEngine;
import org.mokai.persist.MessageCriteria;
import org.mokai.persist.MessageCriteria.OrderType;
import org.mokai.persist.MessageStore;
import org.mokai.persist.impl.DefaultMessageStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* An <a href="http://camel.apache.org">Apache Camel</a> based implementation of the {@link RoutingEngine}
*
* @author German Escobar
*/
public class CamelRoutingEngine implements RoutingEngine {

  private Logger log = LoggerFactory.getLogger(CamelRoutingEngine.class);

  private Map<String,ConnectorService> applications = new ConcurrentHashMap<String,ConnectorService>();

  private Map<String,ConnectorService> connections = new ConcurrentHashMap<String,ConnectorService>();

  private CamelContext camelContext;

  private JmsComponent jmsComponent;

  private ResourceRegistry resourceRegistry;

  private ConnectorServiceChangeListener connectorServiceChangeListener;

  private ExecutorService executor =
      new ThreadPoolExecutor(2, 25, Long.MAX_VALUE, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());

  public CamelRoutingEngine() {
    this.jmsComponent = defaultJmsComponent();
    init();
  }

  /**
   * Used by the constructor to create a "default" JmsComponent that will
   * be used in the routing engine.
   *
   * @return a Camel JmsComponent object.
   */
  private JmsComponent defaultJmsComponent() {
    // a simple activemq connection factory
    ActiveMQConnectionFactory connectionFactory =
        new ActiveMQConnectionFactory("vm://broker1?broker.persistent=false");

    // create the default JmsComponent
    JmsComponent jmsComponent = new JmsComponent();
    jmsComponent.setConnectionFactory(connectionFactory);

    return jmsComponent;
  }

  public CamelRoutingEngine(JmsComponent jmsComponent) {
    this.jmsComponent = jmsComponent;
    init();
  }

  private void init() throws ExecutionException {
    resourceRegistry = new ResourceRegistry();

    // create a default redelivery policy and add it to the resource registry
    RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
    resourceRegistry.putResource(RedeliveryPolicy.class, redeliveryPolicy);

    // create a default message store and add it to the registry
    MessageStore messageStore = new DefaultMessageStore();
    resourceRegistry.putResource(MessageStore.class, messageStore);

    camelContext = new DefaultCamelContext();
    resourceRegistry.putResource(CamelContext.class, camelContext);

    camelContext.addComponent("activemq", jmsComponent);

    final ConnectionsRouter connectionsRouter = new ConnectionsRouter();
    connectionsRouter.setRoutingEngine(this);

    final ApplicationsRouter applicationsRouter = new ApplicationsRouter();
    applicationsRouter.setRoutingEngine(this);

    try {
      camelContext.addRoutes(new RouteBuilder() {

        @Override
        public void configure() throws Exception {
          // internal router
          from(UriConstants.CONNECTIONS_ROUTER).bean(connectionsRouter);

          // sent messages - we pass a delegate in case the MessageStore changes
          PersistenceProcessor sentProcessor = new PersistenceProcessor(resourceRegistry);
          from(UriConstants.CONNECTIONS_PROCESSED_MESSAGES).process(sentProcessor);

          // failed messages - we pass a delegate in case the MessageStore changes
          PersistenceProcessor failedProcessor = new PersistenceProcessor(resourceRegistry);
          from(UriConstants.CONNECTIONS_FAILED_MESSAGES).process(failedProcessor);

          // unroutable messages - we pass a delegate in case the MessageStore changes
          PersistenceProcessor unRoutableProcessor = new PersistenceProcessor(resourceRegistry);
          from(UriConstants.CONNECTIONS_UNROUTABLE_MESSAGES).process(unRoutableProcessor);
        }

      });

      camelContext.addRoutes(new RouteBuilder() {

        @Override
        public void configure() throws Exception {
          // internal router
          from(UriConstants.APPLICATIONS_ROUTER).bean(applicationsRouter);

          // sent messages - we pass a delegate in case the MessageStore changes
          PersistenceProcessor sentProcessor = new PersistenceProcessor(resourceRegistry);
          from(UriConstants.APPLICATIONS_PROCESSED_MESSAGES).process(sentProcessor);

          // failed messages - we pass a delegate in case the MessageStore changes
          PersistenceProcessor failedProcessor = new PersistenceProcessor(resourceRegistry);
          from(UriConstants.APPLICATIONS_FAILED_MESSAGES).process(failedProcessor);

          // unroutable messages - we pass a delegate in case the MessageStore changes
          PersistenceProcessor unRoutableProcessor = new PersistenceProcessor(resourceRegistry);
          from(UriConstants.APPLICATIONS_UNROUTABLE_MESSAGES).process(unRoutableProcessor);
        }

      });

      // start the
      camelContext.start();

      try {
        Management.register( new RoutingEngineMBean(this), "org.mokai:type=RoutingEngine");
      } catch (InstanceAlreadyExistsException e) {
        log.warn("Couldn't register Routing Engine JMX MBean: " + e.getMessage(), e);
      }

      log.info("<< Mokai is ready >>");
    } catch (Exception e) {
      throw new ExecutionException(e);
    }

  }

  public final synchronized void start() throws ExecutionException {
    log.debug("starting all connectors ... ");

    // start connections in separate threads
    for (final ConnectorService cs : connections.values()) {
      executor.execute(new Runnable() {

        @Override
        public void run() {
          try {
            cs.start();
          } catch (Exception e) {
            log.error("connection '" + cs.getId() + "' couldnt be started: " + e.getMessage(), e);
          }
        }

      });
    }

    // start applications in separate threads
    for (final ConnectorService cs : applications.values()) {
      executor.execute(new Runnable() {

        @Override
        public void run() {
          try {
            cs.start();
          } catch (Exception e) {
            log.error("application '" + cs.getId() + "' couldn't be started: "  + e.getMessage(), e);
          }
        }

      });
    }

    log.info("all connectors started");
  }

  public final synchronized void stop() throws ExecutionException {
    log.debug("stopping all connectors ... ");

    // stop applications
    for (final ConnectorService cs : applications.values()) {
      try {
        cs.stop();
      } catch (Exception e) {
        log.error("application '" + cs.getId() + "' couldnt be destroyed: " + e.getMessage(), e);
      }
    }

    // stop connections
    for (final ConnectorService cs : connections.values()) {
      try {
        cs.stop();
      } catch (Exception e) {
        log.error("connection '" + cs.getId() + "' couldnt be destroyed: "  + e.getMessage(), e);
      }
    }

    executor.shutdown();
    try {
      boolean terminated = executor.awaitTermination(5, TimeUnit.SECONDS);
      if (!terminated) {
        log.info("connectors didn't stop after 5 seconds");
      }
    } catch (Exception e) {}

    log.info("all connectors stopped");
  }

  public final synchronized void shutdown() throws ExecutionException {
    stop();

    try {
      camelContext.stop();
    } catch (Exception e) {
      log.error("Exception while shutting down Mokai: " + e.getMessage(), e);
    }
  }

  @Override
  public final ConnectorService addApplication(String id, Connector connector)
      throws IllegalArgumentException, ObjectAlreadyExistsException {
    // fix id
    String fixedId = StringUtils.lowerCase(id);
    fixedId = StringUtils.deleteWhitespace(fixedId);

    // check if already exists
    if (applications.containsKey(fixedId)) {
      throw new ObjectAlreadyExistsException("Application with id '" + fixedId + "' already exists");
    }

    log.debug("adding application with id '" + fixedId + "'");

    // create the ConnectorService instance and set the change listener
    CamelApplicationService applicationService = new CamelApplicationService(fixedId, connector, resourceRegistry);
    applicationService.setChangeListener(connectorServiceChangeListener);

    // configure the connector
    LifecycleMethodsHelper.configure(connector);

    applications.put(fixedId, applicationService);

    log.info("application with id " + fixedId + " added");

    registerConnector(applicationService, "org.mokai.applications");

    return applicationService;
  }

  @Override
  public final RoutingEngine removeApplication(String id) throws IllegalArgumentException, ObjectNotFoundException {
    removeAndDestroyConnector(id, applications, "applications");

    return this;
  }

  @Override
  public final ConnectorService getApplication(String id) {
    return getConnector(id, applications);
  }

  @Override
  public final List<ConnectorService> getApplications() {
    return getConnectors(applications);
  }

  @Override
  public final ConnectorService addConnection(String id, Connector connector)
      throws IllegalArgumentException, ObjectAlreadyExistsException {
    // fix id
    String fixedId = StringUtils.lowerCase(id);
    fixedId = StringUtils.deleteWhitespace(fixedId);

    // check if already exists
    if (connections.containsKey(fixedId)) {
      throw new ObjectAlreadyExistsException("Connection with id '" + fixedId + "' already exists");
    }

    log.debug("adding connection with id '" + fixedId + "'");

    // create the ConnectorService instance and set the change listener
    CamelConnectionService connectionService = new CamelConnectionService(fixedId, connector, resourceRegistry);
    connectionService.setChangeListener(connectorServiceChangeListener);

    // configure the connector
    LifecycleMethodsHelper.configure(connector);

    connections.put(fixedId, connectionService);

    log.info("connection with id " + fixedId + " added");

    registerConnector(connectionService, "org.mokai.connections");

    return connectionService;
  }

  @Override
  public final RoutingEngine removeConnection(String id) throws IllegalArgumentException, ObjectNotFoundException {
    removeAndDestroyConnector(id, connections, "connections");
    return this;
  }

  @Override
  public final ConnectorService getConnection(String id) {
    return getConnector(id, connections);
  }

  @Override
  public final List<ConnectorService> getConnections() {
    return getConnectors(connections);
  }

  private void registerConnector(AbstractCamelConnectorService connectorService, String namespace) {
    String objectName = namespace + ":id=" + connectorService.getId();
    Connector connector = connectorService.getConnector();

    try {
      Management.register( new ConnectorServiceMBean(connectorService), objectName);
    } catch (Exception e) {
      log.warn("Couldn't register JMX MBean '" + objectName + "': " + e.getMessage(), e);
    }

    try {
      if (ExposableConfiguration.class.isInstance(connector)) {
        Management.register( ((ExposableConfiguration<?>) connector).getConfiguration(),
            namespace + ":id=" + connectorService.getId() + ",type=Config");
      }
    } catch (Exception e) {
      log.warn("Couldn't register JMX MBean '" + objectName + "': " + e.getMessage(), e);
    }
  }

  private void removeAndDestroyConnector(String id, Map<String,ConnectorService> map, String mapName) {
    Validate.notEmpty(id);

    ConnectorService cs = map.remove(id);
    if (cs == null) {
      throw new ObjectNotFoundException("Connector with id " + id + " doesnt exists in map of " + mapName);
    }

    // call the destroy method on the processor or receiver service
    cs.destroy();
  }

  private ConnectorService getConnector(String id, Map<String,ConnectorService> map) {
    Validate.notEmpty(id);

    ConnectorService cs = map.get(id);
    if (cs != null && ConnectorService.class.isInstance(cs)) {
      return cs;
    }

    return null;
  }

  private List<ConnectorService> getConnectors(Map<String,ConnectorService> map) {
    List<ConnectorService> connectorsList = new ArrayList<ConnectorService>();

    // add only the processor services
    for (ConnectorService cs : map.values()) {
      connectorsList.add(cs);
    }

    // sort the processors by priority
    Collections.sort(connectorsList, new Comparator<ConnectorService>() {

      @Override
      public int compare(ConnectorService o1, ConnectorService o2) {
        if (o1.getPriority() > o2.getPriority()) {
          return 1;
        } else if (o1.getPriority() < o2.getPriority()) {
          return -1;
        }

        return 0;
      }

    });

    return connectorsList;
  }

  public final void retryFailedMessages() {
    log.trace("retrying failed messages ... ");

    long startTime = new Date().getTime();

    MessageStore messageStore = resourceRegistry.getResource(MessageStore.class);

    ProducerTemplate producer = camelContext.createProducerTemplate();

    MessageCriteria criteria = new MessageCriteria()
        .addStatus(Message.STATUS_FAILED)
        .orderBy("creation_time")
        .orderType(OrderType.UPWARDS);

    Collection<Message> messages = messageStore.list(criteria);
    logCollectionSize(messages.size()); // log the size of the collection
    for (Message message : messages) {
      // update the message and send it
      message.setStatus(Message.STATUS_RETRYING);
      message.setModificationTime(new Date());
      messageStore.saveOrUpdate(message);

      if (message.getDirection().equals(Direction.TO_CONNECTIONS)) {
        producer.sendBody(UriConstants.CONNECTIONS_ROUTER, ExchangePattern.InOnly, message);
      } else if (message.getDirection().equals(Direction.TO_APPLICATIONS)) {
        producer.sendBody(UriConstants.APPLICATIONS_ROUTER, ExchangePattern.InOnly, message);
      } else {
        log.warn("message with direction " + message.getDirection() + " cannot be retried ... ignoring");
      }
    }

    long endTime = new Date().getTime();

    log.debug("retry failed messages took " + (endTime - startTime) + " milis.");
  }

  private void logCollectionSize(int size) {
    if (size > 0) {
      log.debug("processing " + size + " failed messages ...");
    }
  }

  public final void retryUnRoutableMessages() {

  }

  public final RedeliveryPolicy getRedeliveryPolicy() {
    return resourceRegistry.getResource(RedeliveryPolicy.class);
  }

  public final void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
    resourceRegistry.getResource(RedeliveryPolicy.class);
  }

  @Override
  public final MessageStore getMessageStore() {
    return resourceRegistry.getResource(MessageStore.class);
  }

  public final void setMessageStore(MessageStore messageStore) {
    resourceRegistry.putResource(MessageStore.class, messageStore);
  }

  public final CamelContext getCamelContext() {
    return camelContext;
  }

  public ConnectorServiceChangeListener getConnectorServiceChangeListener() {
    return connectorServiceChangeListener;
  }

  public void setConnectorServiceChangeListener(ConnectorServiceChangeListener connectorServiceChangeListener) {
    this.connectorServiceChangeListener = connectorServiceChangeListener;
  }

}
TOP

Related Classes of org.mokai.impl.camel.CamelRoutingEngine

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.