Package com.xeiam.xchange.coinfloor.streaming

Source Code of com.xeiam.xchange.coinfloor.streaming.CoinfloorStreamingExchangeService

package com.xeiam.xchange.coinfloor.streaming;

import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.xeiam.xchange.ExchangeException;
import com.xeiam.xchange.ExchangeSpecification;
import com.xeiam.xchange.coinfloor.dto.streaming.CoinfloorExchangeEvent;
import com.xeiam.xchange.coinfloor.dto.streaming.CoinfloorStreamingConfiguration;
import com.xeiam.xchange.coinfloor.streaming.RequestFactory.CoinfloorRequest;
import com.xeiam.xchange.dto.Order;
import com.xeiam.xchange.dto.account.AccountInfo;
import com.xeiam.xchange.dto.marketdata.OrderBook;
import com.xeiam.xchange.dto.marketdata.Ticker;
import com.xeiam.xchange.dto.marketdata.Trades;
import com.xeiam.xchange.dto.trade.MarketOrder;
import com.xeiam.xchange.service.streaming.BaseWebSocketExchangeService;
import com.xeiam.xchange.service.streaming.ExchangeEvent;
import com.xeiam.xchange.service.streaming.ExchangeEventType;
import com.xeiam.xchange.service.streaming.StreamingExchangeService;

/**
* @author obsessiveOrange
*/
public class CoinfloorStreamingExchangeService extends BaseWebSocketExchangeService implements StreamingExchangeService {

  private final Logger logger = LoggerFactory.getLogger(CoinfloorStreamingExchangeService.class);

  private final CoinfloorStreamingConfiguration configuration;
  private final CoinfloorEventListener exchangeEventListener;
  private final BlockingQueue<ExchangeEvent> systemEventQueue = new LinkedBlockingQueue<ExchangeEvent>();
  private final BlockingQueue<CoinfloorExchangeEvent> updateEventQueue = new LinkedBlockingQueue<CoinfloorExchangeEvent>();

  ObjectMapper jsonObjectMapper;

  /**
   * @param exchangeSpecification
   * @param exchangeStreamingConfiguration
   */
  public CoinfloorStreamingExchangeService(ExchangeSpecification exchangeSpecification, CoinfloorStreamingConfiguration exchangeStreamingConfiguration) {

    super(exchangeSpecification, exchangeStreamingConfiguration);

    this.configuration = exchangeStreamingConfiguration;
    this.exchangeEventListener = new CoinfloorEventListener(consumerEventQueue, systemEventQueue);

    this.jsonObjectMapper = new ObjectMapper();

  }

  @Override
  public void connect() {

    String apiBase;
    if (configuration.isEncryptedChannel()) {
      apiBase = String.format("%s:%s", exchangeSpecification.getSslUriStreaming(), exchangeSpecification.getPort());
    }
    else {
      apiBase = String.format("%s:%s", exchangeSpecification.getPlainTextUriStreaming(), exchangeSpecification.getPort());
    }

    URI uri = URI.create(apiBase);

    Map<String, String> headers = new HashMap<String, String>(1);
    headers.put("Origin", String.format("%s:%s", exchangeSpecification.getHost(), exchangeSpecification.getPort()));

    logger.debug("Streaming URI='{}'", uri);

    // Use the default internal connect
    internalConnect(uri, exchangeEventListener, headers);

    try {
      if (getNextSystemEvent().getEventType() != ExchangeEventType.WELCOME) {
        throw new ExchangeException("Could not connect.");
      }
    } catch (InterruptedException e) {
      throw new ExchangeException("Could not connect.");
    }

    if (configuration.getauthenticateOnConnect()) {
      authenticate();
    }
  }

  public void authenticate() {

    if (exchangeSpecification.getUserName() == null || exchangeSpecification.getUserName() == null || exchangeSpecification.getUserName() == null) {
      throw new ExchangeException("Username (UserID), Cookie, and Password cannot be null");
    }
    try {
      Long.valueOf(exchangeSpecification.getUserName());
    } catch (NumberFormatException e) {
      throw new ExchangeException("Username (UserID) must be the string representation of a integer or long value.");
    }

    RequestFactory.CoinfloorAuthenticationRequest authVars =
        new RequestFactory.CoinfloorAuthenticationRequest(Long.valueOf(exchangeSpecification.getUserName()), (String) exchangeSpecification.getExchangeSpecificParametersItem("cookie"),
            exchangeSpecification.getPassword(), exchangeEventListener.getServerNonce());

    doNewRequest(authVars, ExchangeEventType.AUTHENTICATION);

  }

  @SuppressWarnings("incomplete-switch")
  private CoinfloorExchangeEvent doNewRequest(final CoinfloorRequest requestObject, ExchangeEventType expectedEventType) {

    try {
      try {
        logger.trace("Sent message: " + jsonObjectMapper.writeValueAsString(requestObject));
        send(jsonObjectMapper.writeValueAsString(requestObject));
      } catch (JsonProcessingException e) {
        throw new ExchangeException("Cannot convert Object to String", e);
      }

      CoinfloorExchangeEvent nextEvent = checkNextSystemEvent();
      while (!nextEvent.getEventType().equals(expectedEventType) || nextEvent.getTag() != requestObject.getTag()) {
        switch (nextEvent.getEventType()) {
        case USER_WALLET_UPDATE:
        case ORDER_ADDED:
        case TRADE:
        case ORDER_CANCELED:
        case TICKER:
        case AUTHENTICATION:
        case WELCOME:
          updateEventQueue.put(getNextSystemEvent());
          break;

        }
        synchronized (this) {
          nextEvent = checkNextSystemEvent();
        }
      }
      return getNextSystemEvent();
    } catch (Exception e) {
      throw new ExchangeException("Error processing request", e);
    }
  }

  /**
   * Get user's balances
   * Upon receipt of response, a CoinfloorExchangeEvent with payload Map<String, Object>, consisting of:
   * > A raw object of type CoinfloorBalances (key "raw")
   * > A generic object of type AccountInfo (key "generic")
   */
  public CoinfloorExchangeEvent getBalances() {

    return doNewRequest(new RequestFactory.GetBalancesRequest(), ExchangeEventType.USER_WALLET);
  }

  /**
   * Get user's open orders
   * Upon receipt of response, a CoinfloorExchangeEvent with payload Map<String, Object>, consisting of:
   * > A raw object of type CoinfloorOpenOrders (key "raw")
   * > A generic object of type OpenOrders (key "generic")
   */
  public CoinfloorExchangeEvent getOrders() {

    return doNewRequest(new RequestFactory.GetOrdersRequest(), ExchangeEventType.USER_ORDERS_LIST);
  }

  /**
   * Place an order
   * Upon receipt of response, a CoinfloorExchangeEvent with payload Map<String, Object>, consisting of:
   * > A raw object of type CoinfloorPlaceOrder (key "raw")
   * > A generic object of type String, representing the orderID (key "generic")
   */
  public CoinfloorExchangeEvent placeOrder(Order order) {

    return doNewRequest(new RequestFactory.PlaceOrderRequest(order), ExchangeEventType.USER_ORDER);
  }

  /**
   * Cancel an order
   * Upon receipt of response, a CoinfloorExchangeEvent with payload Map<String, Object>, consisting of:
   * > A raw object of type CoinfloorCancelOrder (key "raw")
   * > A generic object of type LimitOrder, representing the cancelled order (key "generic")
   */
  public CoinfloorExchangeEvent cancelOrder(int orderID) {

    return doNewRequest(new RequestFactory.CancelOrderRequest(orderID), ExchangeEventType.USER_ORDER_CANCELED);
  }

  /**
   * Get past 30-day trade volume
   * Upon receipt of response, a CoinfloorExchangeEvent with payload Map<String, Object>, consisting of:
   * > A raw object of type CoinfloorTradeVolume (key "raw")
   * > A generic object of type BigDecimal, representing the past-30 day volume (key "generic")
   */
  public CoinfloorExchangeEvent getTradeVolume(String currency) {

    return doNewRequest(new RequestFactory.GetTradeVolumeRequest(currency), ExchangeEventType.USER_TRADE_VOLUME);
  }

  /**
   * Estimate the results of a market order
   * Upon receipt of response, a CoinfloorExchangeEvent with payload Map<String, Object>, consisting of:
   * > A raw object of type CoinfloorEstimateMarketOrder (key "raw")
   * Note that this method has no (useful) generic return. The "generic" key corresponds to the same item as the "raw" key
   */
  public CoinfloorExchangeEvent estimateMarketOrder(MarketOrder order) {

    return doNewRequest(new RequestFactory.EstimateMarketOrderRequest(order), ExchangeEventType.USER_MARKET_ORDER_EST);
  }

  /**
   * Watch the orderbook
   * Upon receipt of response, a CoinfloorExchangeEvent with payload Map<String, Object>, consisting of:
   * > A raw object of type CoinfloorOrderbookReturn (key "raw")
   * > A generic object of type Depth (key "generic")
   */
  public CoinfloorExchangeEvent watchOrders(String tradableIdentifier, String tradingCurrency) {

    return doNewRequest(new RequestFactory.WatchOrdersRequest(tradableIdentifier, tradingCurrency), ExchangeEventType.SUBSCRIBE_ORDERS);
  }

  /**
   * Stop watching the orderbook
   * Upon receipt of response, a CoinfloorExchangeEvent with payload Map<String, Object>, consisting of:
   * > A raw object of type CoinfloorOrderbookReturn (key "raw")
   * > A generic object of type Depth (key "generic")
   */
  public CoinfloorExchangeEvent unwatchOrders(String tradableIdentifier, String tradingCurrency) {

    return doNewRequest(new RequestFactory.UnwatchOrdersRequest(tradableIdentifier, tradingCurrency), ExchangeEventType.SUBSCRIBE_ORDERS);
  }

  /**
   * Watch the ticker feed
   * Upon receipt of response, a CoinfloorExchangeEvent with payload Map<String, Object>, consisting of:
   * > A raw object of type CoinfloorTicker (key "raw")
   * > A generic object of type Ticker (key "generic")
   */
  public CoinfloorExchangeEvent watchTicker(String tradableIdentifier, String tradingCurrency) {

    return doNewRequest(new RequestFactory.WatchTickerRequest(tradableIdentifier, tradingCurrency), ExchangeEventType.SUBSCRIBE_TICKER);
  }

  /**
   * Stop watching the ticker feed
   * Upon receipt of response, a CoinfloorExchangeEvent with payload Map<String, Object>, consisting of:
   * > A raw object of type CoinfloorTicker (key "raw")
   * > A generic object of type Ticker (key "generic")
   */
  public CoinfloorExchangeEvent unwatchTicker(String tradableIdentifier, String tradingCurrency) {

    return doNewRequest(new RequestFactory.UnwatchTickerRequest(tradableIdentifier, tradingCurrency), ExchangeEventType.SUBSCRIBE_TICKER);
  }

  /**
   * Retrieves cached AccountInfo.
   * WARNING: EXPERIMENTAL METHOD
   *
   * @return the AccountInfo, as updated by last BalancesChanged event
   * @throws ExchangeException if getBalances() method has not been called, or data not recieved yet.
   */
  public AccountInfo getCachedAccountInfo() {

    return exchangeEventListener.getAdapterInstance().getCachedAccountInfo();
  }

  /**
   * Retrieves cached OrderBook.
   * WARNING: EXPERIMENTAL METHOD
   *
   * @return the OrderBook, as updated by last OrderOpened, OrdersMatched or OrderClosed event
   * @throws ExchangeException if watchOrders() method has not been called, or data not recieved yet.
   */
  public OrderBook getCachedOrderBook() {

    return exchangeEventListener.getAdapterInstance().getCachedOrderBook();
  }

  /**
   * Retrieves cached Trades.
   * WARNING: EXPERIMENTAL METHOD
   *
   * @return the Trades, as updated by last OrdersMatched event
   * @throws ExchangeException if watchOrders() method has not been called, or no trades have occurred yet.
   */
  public Trades getCachedTrades() {

    return exchangeEventListener.getAdapterInstance().getCachedTrades();
  }

  /**
   * Retrieves cached Ticker.
   * WARNING: EXPERIMENTAL METHOD
   *
   * @return the Ticker, as updated by last TickerChanged event
   * @throws ExchangeException if watchTicker() method has not been called, or no ticker data has been recieved.
   */
  public Ticker getCachedTicker() {

    return exchangeEventListener.getAdapterInstance().getCachedTicker();
  }

  @Override
  public CoinfloorExchangeEvent getNextEvent() throws InterruptedException {

    return (CoinfloorExchangeEvent) super.getNextEvent();
  }

  public CoinfloorExchangeEvent getNextSystemEvent() throws InterruptedException {

    CoinfloorExchangeEvent event = (CoinfloorExchangeEvent) systemEventQueue.take();
    return event;
  }

  public CoinfloorExchangeEvent checkNextSystemEvent() throws InterruptedException {

    while (true) {
      synchronized (systemEventQueue) {
        if (systemEventQueue.isEmpty()) {
          systemEventQueue.wait();
        }
      }
      CoinfloorExchangeEvent event = (CoinfloorExchangeEvent) systemEventQueue.peek();
      if (event == null) {
        continue;
      }
      return event;
    }
  }
}
TOP

Related Classes of com.xeiam.xchange.coinfloor.streaming.CoinfloorStreamingExchangeService

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.