Package com.opengamma.engine.calcnode

Source Code of com.opengamma.engine.calcnode.RemoteNodeServer$FunctionBlacklistQueryProvider

/**
* Copyright (C) 2009 - present by OpenGamma Inc. and the OpenGamma group of companies
*
* Please see distribution for license.
*/
package com.opengamma.engine.calcnode;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.fudgemsg.FudgeContext;
import org.fudgemsg.FudgeMsgEnvelope;
import org.fudgemsg.mapping.FudgeDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.opengamma.engine.cache.IdentifierMap;
import com.opengamma.engine.calcnode.msg.Init;
import com.opengamma.engine.calcnode.msg.Ready;
import com.opengamma.engine.calcnode.msg.RemoteCalcNodeMessage;
import com.opengamma.engine.calcnode.msg.RemoteCalcNodeMessageVisitor;
import com.opengamma.engine.calcnode.stats.FunctionCosts;
import com.opengamma.engine.function.FunctionCompilationContext;
import com.opengamma.engine.function.blacklist.DefaultFunctionBlacklistMaintainer;
import com.opengamma.engine.function.blacklist.DefaultFunctionBlacklistQuery;
import com.opengamma.engine.function.blacklist.DummyFunctionBlacklistMaintainer;
import com.opengamma.engine.function.blacklist.DummyFunctionBlacklistQuery;
import com.opengamma.engine.function.blacklist.FunctionBlacklistMaintainer;
import com.opengamma.engine.function.blacklist.FunctionBlacklistPolicy;
import com.opengamma.engine.function.blacklist.FunctionBlacklistProvider;
import com.opengamma.engine.function.blacklist.FunctionBlacklistQuery;
import com.opengamma.engine.function.blacklist.ManageableFunctionBlacklist;
import com.opengamma.engine.function.blacklist.ManageableFunctionBlacklistProvider;
import com.opengamma.engine.function.blacklist.MultipleFunctionBlacklistMaintainer;
import com.opengamma.engine.function.blacklist.MultipleFunctionBlacklistQuery;
import com.opengamma.transport.FudgeConnection;
import com.opengamma.transport.FudgeConnectionReceiver;

/**
* Server end to RemoteNodeClient to receive requests from remote calculation nodes and marshal
* them into RemoteNodeJobInvokers that a JobDispatcher can then use.
*/
public class RemoteNodeServer implements FudgeConnectionReceiver {

  private static final Logger s_logger = LoggerFactory.getLogger(RemoteNodeServer.class);
 
  /**
   * Callback interface for supplying a blacklist maintainer to each host invoker.
   */
  public interface FunctionBlacklistMaintainerProvider {

    /**
     * Returns the maintenance interface.
     *
     * @param hostId the host handshake identifier
     * @return the interface, or null for none
     */
    FunctionBlacklistMaintainer getUpdate(String hostId);
   
  }
 
  /**
   * Implementation of {@link FunctionBlacklistMaintainerProvider} that always returns a fixed blacklist maintainer.
   */
  public static class StaticFunctionBlacklistMaintainerProvider implements FunctionBlacklistMaintainerProvider {

    private final FunctionBlacklistMaintainer _maintainer;

    public StaticFunctionBlacklistMaintainerProvider(final FunctionBlacklistMaintainer maintainer) {
      _maintainer = maintainer;
    }

    @Override
    public FunctionBlacklistMaintainer getUpdate(final String hostId) {
      return _maintainer;
    }

  }

  /**
   * Implementation of {@link FunctionBlacklistMaintainerProvider} that updates a blacklist which is the host identifier prefixed with a fixed string. For example host "calc42" might use a blacklist
   * called "REMOTE_NODE_calc42" if the bean is set with the default prefix of "REMOTE_NODE_".
   */
  public static class FunctionBlacklistMaintainerProviderBean implements FunctionBlacklistMaintainerProvider {

    private ManageableFunctionBlacklistProvider _blacklistProvider;

    private String _blacklistPrefix = "REMOTE_NODE_";

    private FunctionBlacklistPolicy _policy;

    public void setBlacklistProvider(final ManageableFunctionBlacklistProvider blacklistProvider) {
      _blacklistProvider = blacklistProvider;
    }

    public ManageableFunctionBlacklistProvider getBlacklistProvider() {
      return _blacklistProvider;
    }

    public void setBlacklistPrefix(final String prefix) {
      _blacklistPrefix = prefix;
    }

    public String getBlacklistPrefix() {
      return _blacklistPrefix;
    }

    public void setBlacklistPolicy(final FunctionBlacklistPolicy policy) {
      _policy = policy;
    }

    public FunctionBlacklistPolicy getBlacklistPolicy() {
      return _policy;
    }

    @Override
    public FunctionBlacklistMaintainer getUpdate(final String hostId) {
      if (getBlacklistPolicy().isEmpty()) {
        return null;
      }
      final ManageableFunctionBlacklist blacklist = getBlacklistProvider().getBlacklist(getBlacklistPrefix() + hostId);
      if (blacklist == null) {
        return null;
      }
      return new DefaultFunctionBlacklistMaintainer(getBlacklistPolicy(), blacklist);
    }

  }

  /**
   * Implementation of {@link FunctionBlacklistMaintainerProvider} that wraps multiple maintainers up to create a {@link MultipleFunctionBlacklistMaintainer} for the host.
   */
  public static class MultipleFunctionBlacklistMaintainerProvider implements FunctionBlacklistMaintainerProvider {

    private final Collection<FunctionBlacklistMaintainerProvider> _providers;

    public MultipleFunctionBlacklistMaintainerProvider(final Collection<FunctionBlacklistMaintainerProvider> providers) {
      _providers = providers;
    }

    @Override
    public FunctionBlacklistMaintainer getUpdate(final String hostId) {
      final Collection<FunctionBlacklistMaintainer> maintainers = new ArrayList<FunctionBlacklistMaintainer>(_providers.size());
      for (FunctionBlacklistMaintainerProvider provider : _providers) {
        final FunctionBlacklistMaintainer maintainer = provider.getUpdate(hostId);
        if (maintainer != null) {
          maintainers.add(maintainer);
        }
      }
      if (maintainers.isEmpty()) {
        return null;
      } else {
        return new MultipleFunctionBlacklistMaintainer(maintainers);
      }
    }

  }

  /**
   * Callback interface for supplying a blacklist query to each host invoker.
   */
  public interface FunctionBlacklistQueryProvider {

    /**
     * Returns the query interface.
     *
     * @param hostId the host handshake identifier
     * @return the interface, or null for none
     */
    FunctionBlacklistQuery getQuery(String hostId);
   
  }

  /**
   * Implementation of {@link FunctionBlacklistQueryProvider} that always returns a fixed blacklist query.
   */
  public static class StaticFunctionBlacklistQueryProvider implements FunctionBlacklistQueryProvider {

    private final FunctionBlacklistQuery _query;

    public StaticFunctionBlacklistQueryProvider(final FunctionBlacklistQuery query) {
      _query = query;
    }

    @Override
    public FunctionBlacklistQuery getQuery(final String hostId) {
      return _query;
    }

  }

  /**
   * Implementation of {@link FunctionBlacklistQueryProvider} that queries a blacklist which is the host identifier prefixed with a fixed string. For example host "calc42" might use a blacklist called
   * "REMOTE_NODE_calc42" if the bean is set with the default prefix of "REMOTE_NODE_".
   */
  public static class FunctionBlacklistQueryProviderBean implements FunctionBlacklistQueryProvider {

    private FunctionBlacklistProvider _blacklistProvider;

    private String _blacklistPrefix = "REMOTE_NODE_";

    public void setBlacklistProvider(final FunctionBlacklistProvider blacklistProvider) {
      _blacklistProvider = blacklistProvider;
    }

    public FunctionBlacklistProvider getBlacklistProvider() {
      return _blacklistProvider;
    }

    public void setBlacklistPrefix(final String prefix) {
      _blacklistPrefix = prefix;
    }

    public String getBlacklistPrefix() {
      return _blacklistPrefix;
    }

    @Override
    public FunctionBlacklistQuery getQuery(final String hostId) {
      return new DefaultFunctionBlacklistQuery(getBlacklistProvider().getBlacklist(getBlacklistPrefix() + hostId));
    }

  }

  /**
   * Implementation of {@link FunctionBlacklistQueryProvider} that wraps multiple queries up to create a {@link MultipleFunctionBlacklistQuery} for the host.
   */
  public static class MultipleFunctionBlacklistQueryProvider implements FunctionBlacklistQueryProvider {

    private final Collection<FunctionBlacklistQueryProvider> _providers;

    public MultipleFunctionBlacklistQueryProvider(final Collection<FunctionBlacklistQueryProvider> providers) {
      _providers = providers;
    }

    @Override
    public FunctionBlacklistQuery getQuery(final String hostId) {
      final Collection<FunctionBlacklistQuery> queries = new ArrayList<FunctionBlacklistQuery>(_providers.size());
      for (FunctionBlacklistQueryProvider provider : _providers) {
        final FunctionBlacklistQuery query = provider.getQuery(hostId);
        if (query != null) {
          queries.add(query);
        }
      }
      if (queries.isEmpty()) {
        return null;
      } else {
        return new MultipleFunctionBlacklistQuery(queries);
      }
    }

  }

  private final JobInvokerRegister _jobInvokerRegister;
  private final IdentifierMap _identifierMap;
  private final ExecutorService _executorService = Executors.newCachedThreadPool();
  private final FunctionCosts _functionCosts;
  private final FunctionCompilationContext _functionCompilationContext;
  private Set<Capability> _capabilitiesToAdd;
  private FunctionBlacklistMaintainerProvider _blacklistUpdate;
  private FunctionBlacklistQueryProvider _blacklistQuery;

  public RemoteNodeServer(final JobInvokerRegister jobInvokerRegister, final IdentifierMap identifierMap,
      final FunctionCosts functionCosts, final FunctionCompilationContext functionCompilationContext) {
    _jobInvokerRegister = jobInvokerRegister;
    _identifierMap = identifierMap;
    _functionCosts = functionCosts;
    _functionCompilationContext = functionCompilationContext;
  }

  /**
   * Specify capabilities to add to those explicitly declared by the remote nodes. If the nodes declare these
   * in the initial connection they will be overridden. After the initial connection any changes the node
   * sends will take effect again.
   *
   * @param parameters Capabilities to add
   */
  public void setCapabilitiesToAdd(final Map<String, Double> parameters) {
    _capabilitiesToAdd = new HashSet<Capability>();
    for (Map.Entry<String, Double> parameter : parameters.entrySet()) {
      _capabilitiesToAdd.add(Capability.parameterInstanceOf(parameter.getKey(), parameter.getValue()));
    }
  }

  /**
   * Returns the mechanism for updating a node-specific blacklist with job failures.
   *
   * @return the update mechanism, null for none
   */
  public FunctionBlacklistMaintainerProvider getBlacklistUpdate() {
    return _blacklistUpdate;
  }

  /**
   * Sets a mechanism for updating a node-specific blacklist with job failures.
   *
   * @param provider the update mechanism, null for none
   */
  public void setBlacklistUpdate(final FunctionBlacklistMaintainerProvider provider) {
    _blacklistUpdate = provider;
  }

  protected FunctionBlacklistMaintainer getBlacklistUpdate(final String nodeId) {
    final FunctionBlacklistMaintainerProvider provider = getBlacklistUpdate();
    if (provider != null) {
      final FunctionBlacklistMaintainer maintainer = provider.getUpdate(nodeId);
      if (maintainer != null) {
        return maintainer;
      }
    }
    return new DummyFunctionBlacklistMaintainer();
  }

  /**
   * Returns the mechanism for querying a node-specific blacklist for job item suppression.
   *
   * @return the query mechanism, null for none
   */
  public FunctionBlacklistQueryProvider getBlacklistQuery() {
    return _blacklistQuery;
  }

  /**
   * Sets the mechanism for querying a node-specific blacklist for job item suppression.
   *
   * @param provider the query mechanism, null for none
   */
  public void setBlacklistQuery(final FunctionBlacklistQueryProvider provider) {
    _blacklistQuery = provider;
  }

  protected FunctionBlacklistQuery getBlacklistQuery(final String nodeId) {
    final FunctionBlacklistQueryProvider provider = getBlacklistQuery();
    if (provider != null) {
      final FunctionBlacklistQuery query = provider.getQuery(nodeId);
      if (query != null) {
        return query;
      }
    }
    return new DummyFunctionBlacklistQuery();
  }

  protected JobInvokerRegister getJobInvokerRegister() {
    return _jobInvokerRegister;
  }

  protected ExecutorService getExecutorService() {
    return _executorService;
  }

  protected IdentifierMap getIdentifierMap() {
    return _identifierMap;
  }

  protected FunctionCosts getFunctionCosts() {
    return _functionCosts;
  }

  protected FunctionCompilationContext getFunctionCompilationContext() {
    return _functionCompilationContext;
  }

  @Override
  public void connectionReceived(final FudgeContext fudgeContext, final FudgeMsgEnvelope message, final FudgeConnection connection) {
    final FudgeDeserializer deserializer = new FudgeDeserializer(fudgeContext);
    final RemoteCalcNodeMessage remoteCalcNodeMessage = deserializer.fudgeMsgToObject(RemoteCalcNodeMessage.class, message.getMessage());
    remoteCalcNodeMessage.accept(new RemoteCalcNodeMessageVisitor() {

      @Override
      protected void visitUnexpectedMessage(final RemoteCalcNodeMessage message) {
        s_logger.warn("Unexpected message {}", message);
      }

      @Override
      protected void visitReadyMessage(final Ready message) {
        s_logger.info("Remote node {} connected - {}", message.getHostId(), connection);
        final RemoteNodeJobInvoker invoker = new RemoteNodeJobInvoker(getExecutorService(), message, connection, getIdentifierMap(), getFunctionCosts(), getBlacklistQuery(message.getHostId()),
            getBlacklistUpdate(message.getHostId()));
        if (_capabilitiesToAdd != null) {
          invoker.addCapabilities(_capabilitiesToAdd);
        }
        final Init init = new Init(getFunctionCompilationContext().getFunctionInitId());
        invoker.sendMessage(init);
        getJobInvokerRegister().registerJobInvoker(invoker);
      }

    });
  }

}
TOP

Related Classes of com.opengamma.engine.calcnode.RemoteNodeServer$FunctionBlacklistQueryProvider

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.