Package com.barchart.feed.ddf.instrument.provider

Source Code of com.barchart.feed.ddf.instrument.provider.DDF_RxInstrumentProvider

package com.barchart.feed.ddf.instrument.provider;

import static com.barchart.feed.ddf.instrument.provider.XmlTagExtras.LOOKUP;
import static com.barchart.feed.ddf.util.HelperXML.XML_STOP;
import static com.barchart.feed.ddf.util.HelperXML.xmlFirstChild;
import static com.barchart.feed.ddf.util.HelperXML.xmlStringDecode;

import java.io.BufferedInputStream;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicLong;
import java.util.zip.GZIPInputStream;

import javax.xml.parsers.SAXParser;
import javax.xml.parsers.SAXParserFactory;

import org.openfeed.proto.inst.InstrumentDefinition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.w3c.dom.Element;
import org.xml.sax.Attributes;
import org.xml.sax.SAXException;
import org.xml.sax.helpers.DefaultHandler;

import rx.Observable;
import rx.subjects.ReplaySubject;

import com.barchart.feed.api.consumer.MetadataService.Result;
import com.barchart.feed.api.consumer.MetadataService.SearchContext;
import com.barchart.feed.api.model.meta.Instrument;
import com.barchart.feed.api.model.meta.id.InstrumentID;
import com.barchart.feed.api.model.meta.id.VendorID;
import com.barchart.feed.base.provider.Symbology;
import com.barchart.feed.ddf.instrument.provider.DDF_InstrumentProvider.RemoteRunner;
import com.barchart.feed.ddf.util.HelperXML;
import com.barchart.feed.inst.participant.InstrumentState;
import com.barchart.feed.inst.provider.InstrumentFactory;

public class DDF_RxInstrumentProvider {

  private static final Logger log = LoggerFactory.getLogger(
      DDF_RxInstrumentProvider.class);
 
  private static final int MAX_URL_LEN = 7500;
     
  static final ConcurrentMap<String, List<InstrumentState>> symbolMap =
      new ConcurrentHashMap<String, List<InstrumentState>>();
 
  public static VendorID CQG_VENDOR_ID = new VendorID("CQG");
 
  /* ***** ***** ***** Begin Executor ***** ***** ***** */
 
  /**
   * Default executor service with dameon threads
   */
  private volatile static ExecutorService executor = Executors.newCachedThreadPool(
     
      new ThreadFactory() {

    final AtomicLong counter = new AtomicLong(0);
   
    @Override
    public Thread newThread(final Runnable r) {
     
          final Thread t = new Thread(r, "Feed thread " + counter.getAndIncrement());
     
      t.setDaemon(true);
     
      return t;
    }
   
  });
 
  /**
   * Bind framework executor.
   * @param e
   */
  public synchronized static void bindExecutorService(final ExecutorService e) {
   
    log.debug("Binding new executor service");
   
    executor.shutdownNow();
    executor = e;
    executor.submit(new RemoteRunner());
   
  }
 
  /* ***** ***** ***** Begin ID Lookup ***** ***** ***** */
 
  public static Observable<Instrument> fromID(final InstrumentID... ids) {
    // TODO
    return null;
  }
 
  /* ***** ***** ***** Begin String Search ***** ***** ***** */
 
  public static Observable<Result<Instrument>> fromString(final String... symbols) {
    return fromString(SearchContext.NULL, symbols);
  }
 
  public static Observable<Result<Instrument>> fromString(final SearchContext ctx,
      final String... symbols) {
   
    ReplaySubject<Result<Instrument>> sub = ReplaySubject.create();
    executor.submit(runnableFromString(sub, ctx, symbols));
   
    return sub;
  }
 
  public static Runnable runnableFromString(
      final ReplaySubject<Result<Instrument>> sub,
      final SearchContext ctx, final String... symbols) {
   
    return new Runnable() {

      @Override
      public void run() {
       
        final Map<String, List<InstrumentState>> res =
            new HashMap<String, List<InstrumentState>>();
       
        final List<String> toBatch = new ArrayList<String>();
       
        /* Filter out cached symbols */
        for(String symbol : symbols) {
         
          // TODO Add map from user input to formated for query and replace back into result
          symbol = Symbology.formatSymbol(symbol);
         
          if (symbolMap.containsKey(symbol)) {
            res.put(symbol, symbolMap.get(symbol));
          } else {
            toBatch.add(symbol);
          }
         
        }
       
        try {
         
          final List<String> queries = buildQueries(toBatch);
         
          for(final String query : queries) {
           
            final Map<String, List<InstrumentState>> lookup = remoteLookup(query);
           
            /* Store instruments returned from lookup */
            for(final Entry<String, List<InstrumentState>> e : lookup.entrySet()) {
              symbolMap.put(e.getKey(), e.getValue());
             
              /* Add alternate options symbol */
              final InstrumentState inst = e.getValue().get(0);
              if(inst != null) {
               
                if(inst.symbol().contains("|")) {
                  symbolMap.put(inst.vendorSymbols().get(VendorID.BARCHART), e.getValue());
                }
               
              }
             
            }
           
            res.putAll(lookup);
          }
         
          sub.onNext(result(res));
          sub.onCompleted();
        } catch (Exception e1) {
          sub.onError(e1);
        }
      }
     
    };
   
  }
 
  private static Map<String, List<InstrumentState>> remoteLookup(final String query) {
   
    try {
   
      final Map<String, List<InstrumentState>> result =
          new HashMap<String, List<InstrumentState>>();
     
      log.debug("remote batch on {}", urlInstrumentLookup(query));
     
      final URL url = new URL(urlInstrumentLookup(query));
     
      final HttpURLConnection connection = (HttpURLConnection) url.openConnection();
     
      connection.setRequestProperty("Accept-Encoding", "gzip");
     
      connection.connect();
     
      InputStream input = connection.getInputStream();
 
      if (connection.getContentEncoding().equals("gzip")) {
        input = new GZIPInputStream(input);
      }
     
      final BufferedInputStream stream = new BufferedInputStream(input);
 
      final SAXParserFactory factory = SAXParserFactory.newInstance();
      final SAXParser parser = factory.newSAXParser();
      final DefaultHandler handler = handler(result);
     
      parser.parse(stream, handler);
     
      return result;
   
    } catch (final Exception e) {
      throw new RuntimeException(e);
    }
   
  }
 
  protected static DefaultHandler handler(final Map<String, List<InstrumentState>> result) {
    return new DefaultHandler() {

      @Override
      public void startElement(final String uri,
          final String localName, final String qName,
          final Attributes ats) throws SAXException {

        if (qName != null && qName.equals("instrument")) {

          try {

            final String lookup = xmlStringDecode(ats, LOOKUP, XML_STOP);
            final InstrumentDefinition def = InstrumentXML.decodeSAX(ats);

            InstrumentState inst;

            if (def != InstrumentDefinition.getDefaultInstance()) {
              inst = InstrumentFactory.instrumentState(def);
            } else {
              inst = InstrumentState.NULL;
            }

            final List<InstrumentState> insts = new ArrayList<InstrumentState>();
            insts.add(inst);
            result.put(lookup, insts);

          } catch (final SymbolNotFoundException se) {
            throw new RuntimeException(se); // would be nice to add to map
          } catch (final Exception e) {
            throw new RuntimeException(e);
          }

        }

      }

    };
  }

  /* ***** ***** ***** CQG ***** ***** ***** */
 
  private static final ConcurrentMap<String, String> cqgSymMap =
      new ConcurrentHashMap<String, String>();
 
  public static Observable<String> fromCQGString(final String symbol) {
    try {
      return Observable.from(executor.submit(callableFromCQGString(SearchContext.NULL, symbol)));
    } catch (Exception e) {
      throw new RuntimeException(e);
    }
  }
 
  public static Callable<String> callableFromCQGString(
      final SearchContext ctx, final String symbol) {
   
    return new Callable<String>() {

      @Override
      public String call() throws Exception {
       
        /* Filter out cached symbols */
        if(cqgSymMap.containsKey(symbol)) {
          return cqgSymMap.get(symbol);
        }
       
        final String query = cqgInstLoopURL(symbol);
       
        final Element root = HelperXML.xmlDocumentDecode(query);

            final Element tag = xmlFirstChild(root, "symbol", XML_STOP);
           
            final String result = Symbology.formatSymbol(tag.getTextContent());
           
            if(result != null) {
              cqgSymMap.put(symbol, result);
            }
           
            return result;
       
      }
    };
   
  }
 
  private static Result<Instrument> result(final Map<String, List<InstrumentState>> res) {
   
    return new Result<Instrument>() {

      @Override
      public SearchContext context() {
        return SearchContext.NULL;
      }

      @Override
      public Map<String, List<Instrument>> results() {
        Map<String, List<Instrument>> result = new HashMap<String, List<Instrument>>();
        for(final Entry<String, List<InstrumentState>> e : res.entrySet()) {
          final List<Instrument> list = new ArrayList<Instrument>();
          list.add(e.getValue().get(0));
          result.put(e.getKey(), list);
        }
        return result;
      }

      @Override
      public boolean isNull() {
        return false;
      }
    };
   
  }
 
  /* ***** ***** ***** Begin Remote Lookup ***** ***** ***** */
 
  private static final String SERVER_EXTRAS = "extras.ddfplus.com";
 
  private static final String CQG_SYMBOL = "&symbology=CQG";

  private static final String urlInstrumentLookup(final CharSequence lookup) {
    return "http://" + SERVER_EXTRAS + "/instruments/?lookup=" + lookup + CQG_SYMBOL;
  }
 
  private static final String cqgInstLoopURL(final CharSequence lookup) {
    return "http://" + SERVER_EXTRAS + "/symbology/?symbol=" + lookup +
                 "&provider=CQG";
  }
 
  static List<String> buildQueries(final List<String> symbols) throws Exception {
   
    final List<String> queries = new ArrayList<String>();
   
    while(!symbols.isEmpty()) {
   
      final StringBuilder sb = new StringBuilder();
      int len = 0;
      int symCount = 0;
     
      while(len < MAX_URL_LEN && symCount < 400 && !symbols.isEmpty()) {
     
        final String s = symbols.remove(0);
     
        log.debug("Pulled {} from remote queue", s);
     
        symCount++;
        len += s.length() + 1;
        sb.append(s).append(",");
     
      }
   
      /* Remove trailing comma */
      sb.deleteCharAt(sb.length() - 1);
     
      queries.add(sb.toString());
     
      log.debug("Sending {} to remote lookup", sb.toString());
   
    }
   
    return queries;
   
  }
 
}
TOP

Related Classes of com.barchart.feed.ddf.instrument.provider.DDF_RxInstrumentProvider

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.