/** Time to wait between attepts if there is no Bloomberg connection available. */
private static final long RETRY_PERIOD = 30000;
@Override
protected void runOneCycle() {
Event event;
try {
event = getSession().nextEvent(1000L);
} catch (InterruptedException e) {
Thread.interrupted();
s_logger.warn("Unable to retrieve the next event available for processing on this session", e);
return;
} catch (ConnectionUnavailableException e) {
s_logger.warn("No connection to Bloomberg available, failed to get next event", e);
try {
Thread.sleep(RETRY_PERIOD);
} catch (InterruptedException e1) {
s_logger.warn("Interrupted waiting to retry", e1);
}
return;
} catch (RuntimeException e) {
s_logger.warn("Unable to retrieve the next event available for processing on this session", e);
return;
}
if (event == null) {
//getLogger().debug("Got NULL event");
return;
}
//getLogger().debug("Got event of type {}", event.eventType());
MessageIterator msgIter = event.messageIterator();
CorrelationID realCID = null;
while (msgIter.hasNext()) {
Message msg = msgIter.next();
if (event.eventType() == Event.EventType.SESSION_STATUS) {
if (msg.messageType().toString().equals("SessionTerminated")) {
getLogger().error("Session terminated");
terminate();
return;
}
}
CorrelationID bbgCID = msg.correlationID();
Element element = msg.asElement();
getLogger().debug("got msg with cid={} msg.asElement={}", bbgCID, msg.asElement());
if (bbgCID != null) {
realCID = _correlationIDMap.get(bbgCID);
if (realCID != null) {
BlockingQueue<Element> messages = _correlationIDElementMap.get(realCID);
if (messages == null) {
messages = new LinkedBlockingQueue<>();
_correlationIDElementMap.put(realCID, messages);
}
messages.add(element);
}
}
}
// wake up waiting client thread if response is completed and there is a thread waiting on the cid
if (event.eventType() == Event.EventType.RESPONSE && realCID != null) {
//cid is removed from the map by the request thread after it has been notified
synchronized (realCID) {
realCID.notify();
}
}