this.readyFactory = readyFactory;
return this;
}
public void connect(final PingClientHandler clientHandler) {
final Queue q;
final ScheduledExecutorService re;
final boolean shouldCloseQueue;
final ReadyFactory rf;
final InternalPingServerReadyFactory readyFactoryToClose;
if (readyFactory == null) {
readyFactoryToClose = new InternalPingServerReadyFactory();
rf = readyFactoryToClose;
} else {
rf = readyFactory;
readyFactoryToClose = null;
}
if (queue == null) {
try {
q = new Queue();
} catch (IOException e) {
clientHandler.failed(e);
return;
}
re = Executors.newSingleThreadScheduledExecutor();
shouldCloseQueue = true;
} else {
q = queue;
re = repeatExecutor;
shouldCloseQueue = false;
}
final Address a;
if (host != null) {
if (port < 0) {
a = new Address(host, address.getPort());
} else {
a = new Address(host, port);
}
} else {
a = address;
}
final Set<InstanceMapper> instanceMappers = new HashSet<>();
re.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
q.post(new Runnable() {
@Override
public void run() {
Date now = new Date();
for (InstanceMapper i : instanceMappers) {
i.repeat(now, minTimeToRepeat, timeoutFromBeginning);
}
Iterator<InstanceMapper> ii = instanceMappers.iterator();
while (ii.hasNext()) {
InstanceMapper i = ii.next();
if (i.instances.isEmpty()) {
ii.remove();
}
}
}
});
}
}, 0, (long) (repeatTime * 1000d), TimeUnit.MILLISECONDS);
q.post(new Runnable() {
@Override
public void run() {
ByteBufferAllocator allocator = new OnceByteBufferAllocator();
Ready ready = rf.create(q, allocator);
ready.connect(a, new ReadyConnection() {
private final InstanceMapper instanceMapper = new InstanceMapper();
@Override
public void handle(Address address, ByteBuffer buffer) {
ByteBuffer bb = buffer.duplicate();
int version = buffer.get() & 0xFF;
if (version != 1) {
LOGGER.warn("Invalid version: {}", version);
return;
}
int messageType = buffer.get() & 0xFF;
if (messageType != 2) {
LOGGER.warn("Invalid message type: {}", messageType);
return;
}
int ipVersion = buffer.get() & 0xFF;
int crcPos = buffer.position();
short crc = buffer.getShort();
if (CrcUtils.crc16(bb, crcPos) != crc) {
LOGGER.warn("Invalid crc");
return;
}
byte[] ip = new byte[(ipVersion == 4) ? 4 : 16];
buffer.get(ip);
int numberOfRetries = buffer.getInt();
int[] statuses = new int[numberOfRetries];
double[] times = new double[numberOfRetries];
for (int i = 0; i < numberOfRetries; i++) {
int replyTime = buffer.getInt();
if (replyTime < 0) {
// Error
statuses[i] = -replyTime;
times[i] = Double.NaN;
} else {
times[i] = replyTime / 1000d;
statuses[i] = PingClientHandler.VALID_STATUS;
}
}
instanceMapper.handle(new PingableAddress(ip), statuses, times);
}
@Override
public void failed(IOException e) {
if (shouldCloseQueue) {
re.shutdown();
q.close();
}
if (readyFactoryToClose != null) {
readyFactoryToClose.close();
}
if (instanceMappers.remove(instanceMapper)) {
clientHandler.failed(e);
}
}
@Override
public void connected(final FailableCloseableByteBufferHandler write) {
instanceMappers.add(instanceMapper);
final PingWriter w = new PingWriter(write);
clientHandler.launched(new PingClientHandler.Callback() {
@Override
public void close() {
if (instanceMappers.remove(instanceMapper)) {
instanceMapper.closedByUser();
}
write.close();
if (shouldCloseQueue) {
re.shutdown();
q.close();
}
if (readyFactoryToClose != null) {
readyFactoryToClose.close();
}
}
@Override
public void ping(PingableAddress address, int numberOfRetries, double timeBetweenRetries, double retryTimeout, PingCallback callback) {
Instance i = new Instance(callback, w, address, numberOfRetries, timeBetweenRetries, retryTimeout);
instanceMapper.map(i);
w.ping(address, numberOfRetries, timeBetweenRetries, retryTimeout);
}
});
}
@Override
public void close() {
if (shouldCloseQueue) {
re.shutdown();
q.close();
}
if (readyFactoryToClose != null) {
readyFactoryToClose.close();
}
if (instanceMappers.remove(instanceMapper)) {