Package com.barchart.feed.ddf.instrument.provider

Source Code of com.barchart.feed.ddf.instrument.provider.DDF_InstrumentProvider$RemoteRunner

package com.barchart.feed.ddf.instrument.provider;

import static com.barchart.feed.ddf.util.HelperXML.XML_STOP;
import static com.barchart.feed.ddf.util.HelperXML.xmlFirstChild;

import java.io.BufferedInputStream;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.zip.GZIPInputStream;

import javax.xml.parsers.SAXParser;
import javax.xml.parsers.SAXParserFactory;

import org.openfeed.proto.inst.InstrumentDefinition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.w3c.dom.Element;
import org.xml.sax.Attributes;
import org.xml.sax.SAXException;
import org.xml.sax.helpers.DefaultHandler;

import rx.Observer;

import com.barchart.feed.api.model.meta.Instrument;
import com.barchart.feed.base.provider.Symbology;
import com.barchart.feed.ddf.util.HelperXML;
import com.barchart.feed.inst.InstrumentDefinitionResult;
import com.barchart.feed.inst.participant.InstrumentState;
import com.barchart.feed.inst.provider.InstrumentFactory;
import com.barchart.feed.inst.provider.InstrumentMap;

public final class DDF_InstrumentProvider {
 
  private static final long DEFAULT_TIMEOUT = 5000;
  private static final TimeUnit MILLIS = TimeUnit.MILLISECONDS;
  private static final int MAX_URL_LEN = 7500;
  private static final long REMOTE_LOOKUP_INTERVAL = 1000;
 
  private static final Logger log = LoggerFactory
      .getLogger(DDF_InstrumentProvider.class);
 
  private static final ConcurrentMap<String, List<InstrumentState>> symbolMap =
      DDF_RxInstrumentProvider.symbolMap;
 
  private static final ArrayBlockingQueue<String> remoteQueue =
      new ArrayBlockingQueue<String>(1000 * 1000);
 
  private static final List<String> failedRemoteQueue =
      new CopyOnWriteArrayList<String>();
 
  private static volatile InstrumentMap db = InstrumentMap.NULL;
 
  static final String cqgInstLoopURL(final CharSequence lookup) {
    return "http://" + SERVER_EXTRAS + "/symbology/?symbol=" + lookup +
                 "&provider=CQG";
  }
 
  private DDF_InstrumentProvider() {
   
  }
 
  /**
   * Default executor service with dameon threads
   */
  // Consider ExecutorCompletionService
  private volatile static ExecutorService executor = Executors.newCachedThreadPool(
     
      new ThreadFactory() {

    final AtomicLong counter = new AtomicLong(0);
   
    @Override
    public Thread newThread(final Runnable r) {
     
      final Thread t = new Thread(r, "Feed thread " +
          counter.getAndIncrement());
     
      t.setDaemon(true);
     
      return t;
    }
   
  });
 
  static {
    executor.submit(new RemoteRunner());
  }
 
  /**
   * Bind framework executor.
   * @param e
   */
  public synchronized static void bindExecutorService(final ExecutorService e) {
   
    log.debug("Binding new executor service");
   
    executor.shutdownNow();
    executor = e;
    executor.submit(new RemoteRunner());
  }
 
  /**
   * @param map Bind an already built db map
   */
  public synchronized static void bindDatabaseMap(final InstrumentMap map) {
   
    log.debug("Binding new database map");
   
    db = map;
  }
 
  /**
   * This takes an instrument stub from a feed message, and does several things.
   *
   * 1. Checks symbol against map.  If the stub represents an instrument already
   * in the system, then it returns the canonical reference to that instrument.
   *
   * 2. If the symbol is unknown, it will make a new InstrumentState from the info
   * in the stub and begin an async lookup of info.
   *
   * @param inst
   * @return
   */
  public static Instrument fromMessage(final Instrument inst) {
   
    if(inst == null || inst.isNull()) {
      return Instrument.NULL;
    }
   
    /* NOTE id() in ddf is just the realtime symbol, not an actual GUID */
    final String symbol = Symbology.formatSymbol(inst.id().toString());
   
    if(symbolMap.containsKey(symbol)) {
      return symbolMap.get(symbol).get(0);
    }
   
    if(db.containsKey(symbol)) {
     
      final InstrumentState instState = InstrumentStateFactory.
          newInstrument(symbol);
      instState.process(db.get(symbol));
      final List<InstrumentState> list = new ArrayList<InstrumentState>();
      list.add(instState);
      symbolMap.put(symbol, list);
      return instState;
    }
   
    /* New symbol, create stub */
    final InstrumentState instState = InstrumentStateFactory.
        newInstrumentFromStub(inst);
   
    final List<InstrumentState> list = new ArrayList<InstrumentState>();
    list.add(instState);
    symbolMap.put(symbol, list);
    log.debug("Put {} stub into map", symbol);
   
    /* Asnyc lookup */
    try {
      remoteQueue.put(symbol);
    } catch (final Exception e) {
      failedRemoteQueue.add(symbol);
    }
   
    return instState;
   
  }
 
  public static Instrument fromSymbol(String symbol) {
   
    if(symbol == null || symbol.isEmpty()) {
      return Instrument.NULL;
    }
   
    symbol = Symbology.formatSymbol(symbol);
   
    if(symbolMap.containsKey(symbol)) {
      return symbolMap.get(symbol).get(0);
    }
   
    if(db.containsKey(symbol)) {
      final InstrumentState instState = InstrumentStateFactory.
        newInstrument(symbol);
      instState.process(db.get(symbol));
      final List<InstrumentState> list = new ArrayList<InstrumentState>();
      list.add(instState);
      symbolMap.put(symbol, list);
      return instState;
    }
   
    final InstrumentState instState = InstrumentStateFactory.
        newInstrument(symbol);
   
    final List<InstrumentState> list = new ArrayList<InstrumentState>();
    list.add(instState);
    symbolMap.put(symbol, list);
   
    /* Asnyc lookup */
    try {
      remoteQueue.put(symbol);
    } catch (final Exception e) {
      failedRemoteQueue.add(symbol);
    }
   
    return instState;
   
  }
 
  public static Map<String, Instrument> fromSymbols(
      final Collection<String> symbols) {
   
    final Map<String, Instrument> map = new HashMap<String, Instrument>();
   
    for(final String symbol : symbols) {
      map.put(symbol, fromSymbol(symbol));
    }
   
    return map;
   
  }

  public static Instrument fromHistorical(String symbol) {
   
    if(symbol == null || symbol.isEmpty()) {
      return Instrument.NULL;
    }
   
    symbol = Symbology.formatHistoricalSymbol(symbol);
   
    if(symbolMap.containsKey(symbol)) {
      return symbolMap.get(symbol).get(0);
    }
   
    if(db.containsKey(symbol)) {
      final InstrumentState instState = InstrumentStateFactory.newInstrument(symbol);
      instState.process(db.get(symbol));
      final List<InstrumentState> list = new ArrayList<InstrumentState>();
      list.add(instState);
      symbolMap.put(symbol, list);
      return instState;
    }
   
    final InstrumentState instState = InstrumentStateFactory.
        newInstrument(symbol);
   
    final List<InstrumentState> list = new ArrayList<InstrumentState>();
    list.add(instState);
    symbolMap.put(symbol, list);
   
    /* Asnyc lookup */
    try {
      remoteQueue.put(symbol);
    } catch (final Exception e) {
      failedRemoteQueue.add(symbol);
    }
   
    return instState;
   
  }
 
  private static final Map<String, Instrument> cqgMap =
            new HashMap<String, Instrument>();

  public static Instrument findCQG(final String symbol) {
     
     Instrument result = cqgMap.get(symbol);
     
      if(result == null) {
             
            try {
                String barSymbol = remoteCQGLookup(symbol.toString());
               
                // ************** TODO
                //result = find(barSymbol);
               
                if(result == null || result.isNull()) {
                    log.warn("Unable to find barchart instrument for {}", symbol);
                    return Instrument.NULL;
                }
               
                cqgMap.put(symbol, result);
                   
            } catch (final Exception e) {
                log.error("Exception in CQG lookup on {} {}", symbol, e);
                return Instrument.NULL;
            }
      }
     
      return result;
     
  }
 
  static String remoteCQGLookup(String symbol) throws Exception {
       
        final String symbolURI = cqgInstLoopURL(symbol);
       
        final Element root = HelperXML.xmlDocumentDecode(symbolURI);

        final Element tag = xmlFirstChild(root, "symbol", XML_STOP);
       
        return tag.getTextContent();
       
  }
 
  private static Observer<InstrumentDefinitionResult> observer =
      new Observer<InstrumentDefinitionResult>() {

    @Override
    public void onNext(final InstrumentDefinitionResult result) {
     
      final String symbol = result.expression();
     
      /* If exception, add to failed */
      if(result.exception() != null) {
        failedRemoteQueue.add(symbol);
      }
     
      final InstrumentDefinition def = result.result();
     
      if(def == InstrumentDefinition.getDefaultInstance()) {
        log.trace("Instrument result was empty for {}", symbol);
        return// Ignore
      }
     
      final InstrumentState iState = symbolMap.get(symbol).get(0);
     
      if(iState == null || iState.isNull()) {
        final List<InstrumentState> list = new ArrayList<InstrumentState>();
        list.add(InstrumentFactory.instrumentState(result.result()));
        symbolMap.put(symbol, list);
      } else {
        iState.process(result.result());
      }
     
    }

    @Override
    public void onError(Throwable error) {
      // TODO Auto-generated method stub
     
    }

    @Override
    public void onCompleted() {
      // TODO Auto-generated method stub
     
    }
   
  };
 
  private static class InstDefResult implements InstrumentDefinitionResult {

    private final String symbol;
    private final InstrumentDefinition def;
    private final Throwable t;
   
    InstDefResult(final String symbol, final InstrumentDefinition def) {
      this.symbol = symbol;
      this.def = def;
      t = null;
    }
   
    InstDefResult(final String symbol, final Throwable t) {
      this.symbol = symbol;
      def = InstrumentDefinition.getDefaultInstance();
      this.t = t;
    }
   
    @Override
    public InstrumentDefinition result() {
      return def;
    }

    @Override
    public String expression() {
      return symbol;
    }

    @Override
    public Throwable exception() {
      return t;
    }
   
  }
 
  private static final String SERVER_EXTRAS = "extras.ddfplus.com";

  private static final String urlInstrumentLookup(final CharSequence lookup) {
    return "http://" + SERVER_EXTRAS + "/instruments/?lookup=" + lookup;
  }
 
  static Callable<InstrumentDefinition> remoteSingle(final String lookup) {
   
    return new Callable<InstrumentDefinition>() {
 
      @Override
      public InstrumentDefinition call() throws Exception {
       
        try {
         
          log.debug("Starting remote lookup for {}", lookup);
         
          final String symbolURI = urlInstrumentLookup(lookup);
          final Element root = HelperXML.xmlDocumentDecode(symbolURI);
          final Element tag = xmlFirstChild(root, "instrument", XML_STOP);
          final InstrumentDefinition instDOM = InstrumentXML.decodeXML(tag);
         
          if(instDOM == null || instDOM == InstrumentDefinition.getDefaultInstance()) {
            log.trace("Empty instrument def returned from remote lookup: {}", lookup);
            failedRemoteQueue.add(lookup);
            return InstrumentDefinition.getDefaultInstance();
          }
         
          return instDOM;
         
        } catch (final Throwable t) {
          failedRemoteQueue.add(lookup);
          log.trace("Remote instrument lookup callable exception: {}", t);
          return InstrumentDefinition.getDefaultInstance();
        }
       
      }
   
    };
   
  }
 
  static Callable<Map<String, InstrumentDefinition>> remoteBatch(final String symbols) {
   
    return new Callable<Map<String, InstrumentDefinition>>() {

      @Override
      public Map<String, InstrumentDefinition> call() throws Exception {
       
        try {
         
          final Map<String, InstrumentDefinition> defs =
              new HashMap<String, InstrumentDefinition>();
         
          log.debug("remote batch on {}", urlInstrumentLookup(symbols));
         
          final URL url = new URL(urlInstrumentLookup(symbols));
         
          final HttpURLConnection connection = (HttpURLConnection) url
              .openConnection();
         
          connection.setRequestProperty("Accept-Encoding", "gzip");
         
          connection.connect();
         
          InputStream input = connection.getInputStream();

          if (connection.getContentEncoding().equals("gzip")) {
            input = new GZIPInputStream(input);
          }
         
          final BufferedInputStream stream =
              new BufferedInputStream(input);

          final SAXParserFactory factory =
              SAXParserFactory.newInstance();
          final SAXParser parser = factory.newSAXParser();
         
          final DefaultHandler handler = new DefaultHandler() {
           
            InstrumentDefinition def;
           
            @Override
            public void startElement(final String uri,
                final String localName, final String qName,
                final Attributes ats) throws SAXException {
             
              if (qName != null && qName.equals("instrument")) {

                try {
                  def = InstrumentXML.decodeSAX(ats);
                  if (def != InstrumentDefinition.getDefaultInstance()) {
                    defs.put(def.getSymbol(), def);
                  }
                } catch (final SymbolNotFoundException se) {
                  observer.onNext(new InstDefResult(se.getMessage(), se));
                } catch (final Exception e) {
                  log.trace("Exception in parsing batch lookup {}", e);
                }
               
              }
             
            }
           
          };
         
          parser.parse(stream, handler);
         
          return defs;
         
        } catch (final Throwable t) {
          failedRemoteQueue.addAll(Arrays.asList(symbols.split(",")));
          return null;
        }
       
      }
     
    };
   
  }
 
  static class RemoteRunner implements Runnable {
   
    private List<Future<Map<String, InstrumentDefinition>>> futures =
        new ArrayList<Future<Map<String, InstrumentDefinition>>>();
   
    private final List<Callable<Map<String, InstrumentDefinition>>> callables =
        new ArrayList<Callable<Map<String, InstrumentDefinition>>>();
   
    @Override
    public void run() {
     
      try {
       
        while(!Thread.interrupted()) {
         
          Thread.sleep(REMOTE_LOOKUP_INTERVAL);
         
          while(!remoteQueue.isEmpty()) {
            callables.add(remoteBatch(buildQuerey()));
          }
         
          futures = executor.invokeAll(callables, DEFAULT_TIMEOUT, MILLIS);
         
          for(final Future<Map<String, InstrumentDefinition>> f : futures) {
           
            for(final Entry<String, InstrumentDefinition> e : f.get().entrySet()) {
             
              final InstrumentDefinition def = e.getValue();
             
              if(def == null || def == InstrumentDefinition.getDefaultInstance()) {
                observer.onNext(new InstDefResult(e.getKey(),
                    new Throwable("Could not find " + e.getKey())));
              } else {
                observer.onNext(new InstDefResult(e.getKey(), def));
              }
             
            }
           
          }
         
          futures.clear();
          callables.clear();
         
        }
       
      } catch (final Throwable t) {
        log.error("Exception in Remote Runner Thread", t);
      }
     
    }
   
  }
 
  private static String buildQuerey() throws Exception {
   
    final StringBuilder sb = new StringBuilder();
   
    int len = 0;
    int symCount = 0;
   
    while(len < MAX_URL_LEN && symCount < 400 && !remoteQueue.isEmpty()) {
     
      final String s = remoteQueue.take();
     
      log.debug("Pulled {} from remote queue", s);
     
      symCount++;
      len += s.length() + 1;
      sb.append(s).append(",");
     
    }
   
    /* Remove trailing comma */
    sb.deleteCharAt(sb.length() - 1);
   
    log.debug("Sending {} to remote lookup", sb.toString());
   
    return sb.toString();
   
  }
 
}
TOP

Related Classes of com.barchart.feed.ddf.instrument.provider.DDF_InstrumentProvider$RemoteRunner

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.