this.messageAuthorizationPolicy = messageAuthorizationPolicy;
}
public void start() throws Exception {
TransportServer server = getServer();
broker = brokerService.getBroker();
brokerInfo.setBrokerName(broker.getBrokerName());
brokerInfo.setBrokerId(broker.getBrokerId());
brokerInfo.setPeerBrokerInfos(broker.getPeerBrokerInfos());
brokerInfo.setFaultTolerantConfiguration(broker.isFaultTolerantConfiguration());
brokerInfo.setBrokerURL(server.getConnectURI().toString());
server.setAcceptListener(new TransportAcceptListener() {
public void onAccept(final Transport transport) {
try {
// Starting the connection could block due to
// wireformat negotiation, so start it in an async thread.
Thread startThread = new Thread("ActiveMQ Transport Initiator: " + transport.getRemoteAddress()) {
public void run() {
try {
Connection connection = createConnection(transport);
connection.start();
} catch (Exception e) {
ServiceSupport.dispose(transport);
onAcceptError(e);
}
}
};
startThread.setPriority(4);
startThread.start();
} catch (Exception e) {
String remoteHost = transport.getRemoteAddress();
ServiceSupport.dispose(transport);
onAcceptError(e, remoteHost);
}
}
public void onAcceptError(Exception error) {
onAcceptError(error, null);
}
private void onAcceptError(Exception error, String remoteHost) {
LOG.error("Could not accept connection " + (remoteHost == null ? "" : "from " + remoteHost) + ": " + error.getMessage());
LOG.debug("Reason: " + error.getMessage(), error);
}
});
server.setBrokerInfo(brokerInfo);
server.start();
DiscoveryAgent da = getDiscoveryAgent();
if (da != null) {
da.registerService(getConnectUri().toString());
da.start();