package com.facebook.presto.jdbc.internal.airlift.http.client.testing;
import com.facebook.presto.jdbc.internal.guava.base.Function;
import com.facebook.presto.jdbc.internal.guava.util.concurrent.ForwardingListenableFuture;
import com.facebook.presto.jdbc.internal.guava.util.concurrent.ListenableFuture;
import com.facebook.presto.jdbc.internal.guava.util.concurrent.ListeningExecutorService;
import com.facebook.presto.jdbc.internal.guava.util.concurrent.MoreExecutors;
import com.facebook.presto.jdbc.internal.airlift.http.client.AsyncHttpClient;
import com.facebook.presto.jdbc.internal.airlift.http.client.Request;
import com.facebook.presto.jdbc.internal.airlift.http.client.RequestStats;
import com.facebook.presto.jdbc.internal.airlift.http.client.Response;
import com.facebook.presto.jdbc.internal.airlift.http.client.ResponseHandler;
import com.facebook.presto.jdbc.internal.airlift.units.Duration;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import static com.facebook.presto.jdbc.internal.guava.base.Preconditions.checkNotNull;
import static com.facebook.presto.jdbc.internal.guava.base.Preconditions.checkState;
public class TestingHttpClient
implements AsyncHttpClient
{
private final Function<Request, Response> processor;
private final ListeningExecutorService executor;
private final RequestStats stats = new RequestStats();
private final AtomicBoolean closed = new AtomicBoolean();
public TestingHttpClient(Function<Request, Response> processor)
{
this(processor, MoreExecutors.sameThreadExecutor());
}
public TestingHttpClient(Function<Request, Response> processor, ExecutorService executor)
{
this.processor = processor;
this.executor = MoreExecutors.listeningDecorator(executor);
}
@Override
public <T, E extends Exception> HttpResponseFuture<T> executeAsync(final Request request, final ResponseHandler<T, E> responseHandler)
{
checkNotNull(request, "request is null");
checkNotNull(responseHandler, "responseHandler is null");
checkState(!closed.get(), "client is closed");
final AtomicReference<String> state = new AtomicReference<>("SENDING_REQUEST");
ListenableFuture<T> future = executor.submit(new Callable<T>()
{
@Override
public T call()
throws Exception
{
return execute(request, responseHandler, state);
}
});
return new TestingHttpResponseFuture<>(future, state);
}
@Override
public <T, E extends Exception> T execute(Request request, ResponseHandler<T, E> responseHandler)
throws E
{
checkNotNull(request, "request is null");
checkNotNull(responseHandler, "responseHandler is null");
checkState(!closed.get(), "client is closed");
return execute(request, responseHandler, new AtomicReference<>("SENDING_REQUEST"));
}
private <T, E extends Exception> T execute(Request request, ResponseHandler<T, E> responseHandler, AtomicReference<String> state)
throws E
{
state.set("PROCESSING_REQUEST");
Response response;
Duration requestProcessingTime = null;
try {
long requestStart = System.nanoTime();
response = processor.apply(request);
requestProcessingTime = Duration.nanosSince(requestStart);
}
catch (Throwable e) {
state.set("FAILED");
stats.record(request.getMethod(),
0,
0,
0,
requestProcessingTime,
null);
if (e instanceof Exception) {
return responseHandler.handleException(request, (Exception) e);
}
else {
throw e;
}
}
checkState(response != null, "response is null");
// notify handler
state.set("PROCESSING_RESPONSE");
long responseStart = System.nanoTime();
try {
return responseHandler.handle(request, response);
}
finally {
state.set("DONE");
Duration responseProcessingTime = Duration.nanosSince(responseStart);
stats.record(request.getMethod(),
response.getStatusCode(),
response.getBytesRead(),
response.getBytesRead(),
requestProcessingTime,
responseProcessingTime);
}
}
@Override
public RequestStats getStats()
{
return stats;
}
@Override
public void close()
{
closed.set(true);
}
private class TestingHttpResponseFuture<T>
extends ForwardingListenableFuture<T>
implements HttpResponseFuture<T>
{
private final AtomicReference<String> state;
private final ListenableFuture<T> future;
private TestingHttpResponseFuture(ListenableFuture<T> future, AtomicReference<String> state)
{
this.future = future;
this.state = state;
}
@Override
protected ListenableFuture<T> delegate()
{
return future;
}
@Override
public String getState()
{
return state.get();
}
}
}