package com.facebook.presto.jdbc.internal.airlift.http.client.jetty;
import com.facebook.presto.jdbc.internal.guava.base.Objects;
import com.facebook.presto.jdbc.internal.guava.base.Preconditions;
import com.facebook.presto.jdbc.internal.guava.base.Throwables;
import com.facebook.presto.jdbc.internal.guava.collect.AbstractIterator;
import com.facebook.presto.jdbc.internal.guava.collect.ImmutableList;
import com.facebook.presto.jdbc.internal.guava.collect.ImmutableListMultimap;
import com.facebook.presto.jdbc.internal.guava.collect.ListMultimap;
import com.facebook.presto.jdbc.internal.guava.io.CountingInputStream;
import com.facebook.presto.jdbc.internal.guava.net.HostAndPort;
import com.facebook.presto.jdbc.internal.guava.primitives.Ints;
import com.facebook.presto.jdbc.internal.guava.util.concurrent.AbstractFuture;
import com.facebook.presto.jdbc.internal.airlift.http.client.AsyncHttpClient;
import com.facebook.presto.jdbc.internal.airlift.http.client.BodyGenerator;
import com.facebook.presto.jdbc.internal.airlift.http.client.ResponseTooLargeException;
import com.facebook.presto.jdbc.internal.airlift.http.client.HttpClientConfig;
import com.facebook.presto.jdbc.internal.airlift.http.client.HttpRequestFilter;
import com.facebook.presto.jdbc.internal.airlift.http.client.Request;
import com.facebook.presto.jdbc.internal.airlift.http.client.RequestStats;
import com.facebook.presto.jdbc.internal.airlift.http.client.ResponseHandler;
import com.facebook.presto.jdbc.internal.airlift.http.client.StaticBodyGenerator;
import com.facebook.presto.jdbc.internal.airlift.log.Logger;
import com.facebook.presto.jdbc.internal.airlift.units.DataSize;
import com.facebook.presto.jdbc.internal.airlift.units.DataSize.Unit;
import com.facebook.presto.jdbc.internal.airlift.units.Duration;
import com.facebook.presto.jdbc.internal.jetty.client.HttpClient;
import com.facebook.presto.jdbc.internal.jetty.client.HttpRequest;
import com.facebook.presto.jdbc.internal.jetty.client.Socks4Proxy;
import com.facebook.presto.jdbc.internal.jetty.client.api.ContentProvider;
import com.facebook.presto.jdbc.internal.jetty.client.api.Response;
import com.facebook.presto.jdbc.internal.jetty.client.api.Response.Listener;
import com.facebook.presto.jdbc.internal.jetty.client.api.Result;
import com.facebook.presto.jdbc.internal.jetty.client.util.BytesContentProvider;
import com.facebook.presto.jdbc.internal.jetty.client.util.InputStreamResponseListener;
import com.facebook.presto.jdbc.internal.jetty.http.HttpFields;
import com.facebook.presto.jdbc.internal.jetty.http.HttpHeader;
import com.facebook.presto.jdbc.internal.jetty.util.HttpCookieStore;
import com.facebook.presto.jdbc.internal.jetty.util.ssl.SslContextFactory;
import org.weakref.jmx.Flatten;
import org.weakref.jmx.Managed;
import javax.annotation.concurrent.GuardedBy;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import static com.facebook.presto.jdbc.internal.guava.base.Preconditions.checkNotNull;
import static java.lang.Math.min;
public class JettyHttpClient
implements AsyncHttpClient
{
private final HttpClient httpClient;
private final long maxContentLength;
private final RequestStats stats = new RequestStats();
private final List<HttpRequestFilter> requestFilters;
private final Exception created = new Exception();
private final String name;
public JettyHttpClient()
{
this(new HttpClientConfig(), ImmutableList.<HttpRequestFilter>of());
}
public JettyHttpClient(HttpClientConfig config)
{
this(config, ImmutableList.<HttpRequestFilter>of());
}
public JettyHttpClient(HttpClientConfig config, Iterable<? extends HttpRequestFilter> requestFilters)
{
this.name = "Anonymous";
maxContentLength = checkNotNull(config, "config is null").getMaxContentLength().toBytes();
httpClient = createHttpClient(config, created);
try {
httpClient.start();
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw Throwables.propagate(e);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
this.requestFilters = ImmutableList.copyOf(requestFilters);
}
public JettyHttpClient(HttpClientConfig config, JettyIoPool jettyIoPool, Iterable<? extends HttpRequestFilter> requestFilters)
{
this.name = jettyIoPool.getName();
maxContentLength = checkNotNull(config, "config is null").getMaxContentLength().toBytes();
httpClient = createHttpClient(config, created);
httpClient.setExecutor(jettyIoPool.getExecutor());
httpClient.setByteBufferPool(jettyIoPool.setByteBufferPool());
httpClient.setScheduler(jettyIoPool.setScheduler());
try {
httpClient.start();
// remove the GZIP encoding from the client
// todo change this when https://bugs.eclipse.org/bugs/show_bug.cgi?id=433690 is resolved
httpClient.getContentDecoderFactories().clear();
}
catch (Exception e) {
throw Throwables.propagate(e);
}
this.requestFilters = ImmutableList.copyOf(requestFilters);
}
private HttpClient createHttpClient(HttpClientConfig config, Exception created)
{
created.fillInStackTrace();
SslContextFactory sslContextFactory = new SslContextFactory();
sslContextFactory.setEndpointIdentificationAlgorithm("HTTPS");
if (config.getKeyStorePath() != null) {
sslContextFactory.setKeyStorePath(config.getKeyStorePath());
sslContextFactory.setKeyStorePassword(config.getKeyStorePassword());
}
HttpClient httpClient = new HttpClient(sslContextFactory);
httpClient.setMaxConnectionsPerDestination(config.getMaxConnectionsPerServer());
// disable cookies
httpClient.setCookieStore(new HttpCookieStore.Empty());
long idleTimeout = Long.MAX_VALUE;
if (config.getKeepAliveInterval() != null) {
idleTimeout = min(idleTimeout, config.getKeepAliveInterval().toMillis());
}
if (config.getReadTimeout() != null) {
idleTimeout = min(idleTimeout, config.getReadTimeout().toMillis());
}
if (idleTimeout != Long.MAX_VALUE) {
httpClient.setIdleTimeout(idleTimeout);
}
if (config.getConnectTimeout() != null) {
httpClient.setConnectTimeout(config.getConnectTimeout().toMillis());
}
HostAndPort socksProxy = config.getSocksProxy();
if (socksProxy != null) {
httpClient.getProxyConfiguration().getProxies().add(new Socks4Proxy(socksProxy.getHostText(), socksProxy.getPortOrDefault(1080)));
}
return httpClient;
}
@Override
public <T, E extends Exception> T execute(Request request, ResponseHandler<T, E> responseHandler)
throws E
{
long requestStart = System.nanoTime();
// apply filters
request = applyRequestFilters(request);
// create jetty request and response listener
HttpRequest jettyRequest = buildJettyRequest(request);
InputStreamResponseListener listener = new InputStreamResponseListener(maxContentLength)
{
@Override
public void onContent(Response response, ByteBuffer content)
{
// ignore empty blocks
if (content.remaining() == 0) {
return;
}
super.onContent(response, content);
}
};
// fire the request
jettyRequest.send(listener);
// wait for response to begin
Response response;
try {
response = listener.get(httpClient.getIdleTimeout(), TimeUnit.MILLISECONDS);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
return responseHandler.handleException(request, e);
}
catch (TimeoutException e) {
return responseHandler.handleException(request, e);
}
catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause instanceof Exception) {
return responseHandler.handleException(request, (Exception) cause);
}
else {
return responseHandler.handleException(request, new RuntimeException(cause));
}
}
// process response
long responseStart = System.nanoTime();
JettyResponse jettyResponse = null;
T value;
try {
jettyResponse = new JettyResponse(response, listener.getInputStream());
value = responseHandler.handle(request, jettyResponse);
}
finally {
recordRequestComplete(stats, request, requestStart, jettyResponse, responseStart);
}
return value;
}
@Override
public <T, E extends Exception> HttpResponseFuture<T> executeAsync(Request request, ResponseHandler<T, E> responseHandler)
{
checkNotNull(request, "request is null");
checkNotNull(responseHandler, "responseHandler is null");
request = applyRequestFilters(request);
HttpRequest jettyRequest = buildJettyRequest(request);
JettyResponseFuture<T, E> future = new JettyResponseFuture<>(request, jettyRequest, responseHandler, stats);
BufferingResponseListener listener = new BufferingResponseListener(future, Ints.saturatedCast(maxContentLength));
try {
jettyRequest.send(listener);
}
catch (RuntimeException e) {
// normally this is a rejected execution exception because the client has been closed
future.failed(e);
}
return future;
}
private Request applyRequestFilters(Request request)
{
for (HttpRequestFilter requestFilter : requestFilters) {
request = requestFilter.filterRequest(request);
}
return request;
}
private HttpRequest buildJettyRequest(Request finalRequest)
{
HttpRequest jettyRequest = (HttpRequest) httpClient.newRequest(finalRequest.getUri());
// jetty client always adds the user agent header
// todo should there be a default?
jettyRequest.getHeaders().remove(HttpHeader.USER_AGENT);
jettyRequest.method(finalRequest.getMethod());
for (Entry<String, String> entry : finalRequest.getHeaders().entries()) {
jettyRequest.header(entry.getKey(), entry.getValue());
}
BodyGenerator bodyGenerator = finalRequest.getBodyGenerator();
if (bodyGenerator != null) {
if (bodyGenerator instanceof StaticBodyGenerator) {
StaticBodyGenerator staticBodyGenerator = (StaticBodyGenerator) bodyGenerator;
jettyRequest.content(new BytesContentProvider(staticBodyGenerator.getBody()));
}
else {
jettyRequest.content(new BodyGeneratorContentProvider(bodyGenerator, httpClient.getExecutor()));
}
}
return jettyRequest;
}
public List<HttpRequestFilter> getRequestFilters()
{
return requestFilters;
}
@Override
@Managed
@Flatten
public RequestStats getStats()
{
return stats;
}
@Override
public void close()
{
try {
httpClient.stop();
}
catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
}
catch (Exception ignored) {
}
}
private static class JettyResponse
implements com.facebook.presto.jdbc.internal.airlift.http.client.Response
{
private final Response response;
private final CountingInputStream inputStream;
public JettyResponse(Response response, InputStream inputStream)
{
this.response = response;
this.inputStream = new CountingInputStream(inputStream);
}
@Override
public int getStatusCode()
{
return response.getStatus();
}
@Override
public String getStatusMessage()
{
return response.getReason();
}
@Override
public String getHeader(String name)
{
return response.getHeaders().getStringField(name);
}
@Override
public ListMultimap<String, String> getHeaders()
{
HttpFields headers = response.getHeaders();
ImmutableListMultimap.Builder<String, String> builder = ImmutableListMultimap.builder();
for (String name : headers.getFieldNamesCollection()) {
for (String value : headers.getValuesList(name)) {
builder.put(name, value);
}
}
return builder.build();
}
@Override
public long getBytesRead()
{
return inputStream.getCount();
}
@Override
public InputStream getInputStream()
{
return inputStream;
}
@Override
public String toString()
{
return Objects.toStringHelper(this)
.add("statusCode", getStatusCode())
.add("statusMessage", getStatusMessage())
.add("headers", getHeaders())
.toString();
}
}
private static class JettyResponseFuture<T, E extends Exception>
extends AbstractFuture<T>
implements HttpResponseFuture<T>
{
public enum JettyAsyncHttpState
{
WAITING_FOR_CONNECTION,
SENDING_REQUEST,
WAITING_FOR_RESPONSE,
PROCESSING_RESPONSE,
DONE,
FAILED,
CANCELED
}
private static final Logger log = Logger.get(JettyResponseFuture.class);
private final long requestStart = System.nanoTime();
private final AtomicReference<JettyAsyncHttpState> state = new AtomicReference<>(JettyAsyncHttpState.WAITING_FOR_CONNECTION);
private final Request request;
private final com.facebook.presto.jdbc.internal.jetty.client.api.Request jettyRequest;
private final ResponseHandler<T, E> responseHandler;
private final RequestStats stats;
public JettyResponseFuture(Request request, com.facebook.presto.jdbc.internal.jetty.client.api.Request jettyRequest, ResponseHandler<T, E> responseHandler, RequestStats stats)
{
this.request = request;
this.jettyRequest = jettyRequest;
this.responseHandler = responseHandler;
this.stats = stats;
}
@Override
public String getState()
{
return state.get().toString();
}
@Override
public boolean cancel(boolean mayInterruptIfRunning)
{
state.set(JettyAsyncHttpState.CANCELED);
jettyRequest.abort(new CancellationException());
return super.cancel(mayInterruptIfRunning);
}
protected void completed(Response response, InputStream content)
{
if (state.get() == JettyAsyncHttpState.CANCELED) {
return;
}
T value;
try {
value = processResponse(response, content);
}
catch (Throwable e) {
// this will be an instance of E from the response handler or an Error
storeException(e);
return;
}
state.set(JettyAsyncHttpState.DONE);
set(value);
}
private T processResponse(Response response, InputStream content)
throws E
{
// this time will not include the data fetching portion of the response,
// since the response is fully cached in memory at this point
long responseStart = System.nanoTime();
state.set(JettyAsyncHttpState.PROCESSING_RESPONSE);
JettyResponse jettyResponse = null;
T value;
try {
jettyResponse = new JettyResponse(response, content);
value = responseHandler.handle(request, jettyResponse);
}
finally {
recordRequestComplete(stats, request, requestStart, jettyResponse, responseStart);
}
return value;
}
protected void failed(Throwable throwable)
{
if (state.get() == JettyAsyncHttpState.CANCELED) {
return;
}
// give handler a chance to rewrite the exception or return a value instead
if (throwable instanceof Exception) {
try {
T value = responseHandler.handleException(request, (Exception) throwable);
// handler returned a value, store it in the future
state.set(JettyAsyncHttpState.DONE);
set(value);
return;
}
catch (Throwable newThrowable) {
throwable = newThrowable;
}
}
// at this point "throwable" will either be an instance of E
// from the response handler or not an instance of Exception
storeException(throwable);
}
private void storeException(Throwable throwable)
{
if (throwable instanceof CancellationException) {
state.set(JettyAsyncHttpState.CANCELED);
}
else {
state.set(JettyAsyncHttpState.FAILED);
}
if (throwable == null) {
throwable = new Throwable("Throwable is null");
log.error(throwable, "Something is broken");
}
setException(throwable);
}
@Override
public String toString()
{
return Objects.toStringHelper(this)
.add("requestStart", requestStart)
.add("state", state)
.add("request", request)
.toString();
}
}
private static void recordRequestComplete(RequestStats requestStats, Request request, long requestStart, JettyResponse response, long responseStart)
{
if (response == null) {
return;
}
Duration responseProcessingTime = Duration.nanosSince(responseStart);
Duration requestProcessingTime = new Duration(responseStart - requestStart, TimeUnit.NANOSECONDS);
requestStats.record(request.getMethod(),
response.getStatusCode(),
response.getBytesRead(),
response.getBytesRead(),
requestProcessingTime,
responseProcessingTime);
}
private static class BodyGeneratorContentProvider
implements ContentProvider
{
private static final ByteBuffer DONE = ByteBuffer.allocate(0);
private static final ByteBuffer EXCEPTION = ByteBuffer.allocate(0);
private final BodyGenerator bodyGenerator;
private final Executor executor;
public BodyGeneratorContentProvider(BodyGenerator bodyGenerator, Executor executor)
{
this.bodyGenerator = bodyGenerator;
this.executor = executor;
}
@Override
public long getLength()
{
return -1;
}
@Override
public Iterator<ByteBuffer> iterator()
{
final BlockingQueue<ByteBuffer> chunks = new ArrayBlockingQueue<>(16);
final AtomicReference<Exception> exception = new AtomicReference<>();
executor.execute(new Runnable()
{
@Override
public void run()
{
BodyGeneratorOutputStream out = new BodyGeneratorOutputStream(chunks);
try {
bodyGenerator.write(out);
out.close();
}
catch (Exception e) {
exception.set(e);
chunks.add(EXCEPTION);
}
}
});
return new AbstractIterator<ByteBuffer>()
{
@Override
protected ByteBuffer computeNext()
{
ByteBuffer chunk;
try {
chunk = chunks.take();
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted", e);
}
if (chunk == EXCEPTION) {
throw Throwables.propagate(exception.get());
}
if (chunk == DONE) {
return endOfData();
}
return chunk;
}
};
}
private final class BodyGeneratorOutputStream
extends OutputStream
{
private final BlockingQueue<ByteBuffer> chunks;
private BodyGeneratorOutputStream(BlockingQueue<ByteBuffer> chunks)
{
this.chunks = chunks;
}
@Override
public void write(int b)
throws IOException
{
try {
// must copy array since it could be reused
chunks.put(ByteBuffer.wrap(new byte[] {(byte) b}));
}
catch (InterruptedException e) {
throw new InterruptedIOException();
}
}
@Override
public void write(byte[] b, int off, int len)
throws IOException
{
try {
// must copy array since it could be reused
byte[] copy = Arrays.copyOfRange(b, off, len);
chunks.put(ByteBuffer.wrap(copy));
}
catch (InterruptedException e) {
throw new InterruptedIOException();
}
}
@Override
public void close()
throws IOException
{
try {
chunks.put(DONE);
}
catch (InterruptedException e) {
throw new InterruptedIOException();
}
}
}
}
private static class BufferingResponseListener
extends Listener.Adapter
{
private final JettyResponseFuture<?, ?> future;
private final int maxLength;
@GuardedBy("this")
private byte[] buffer = new byte[(int) new DataSize(64, Unit.KILOBYTE).toBytes()];
@GuardedBy("this")
private int size;
public BufferingResponseListener(JettyResponseFuture<?, ?> future, int maxLength)
{
this.future = checkNotNull(future, "future is null");
Preconditions.checkArgument(maxLength > 0, "maxLength must be greater than zero");
this.maxLength = maxLength;
}
@Override
public void onHeaders(Response response)
{
long length = response.getHeaders().getLongField(HttpHeader.CONTENT_LENGTH.asString());
if (length > maxLength) {
response.abort(new ResponseTooLargeException());
}
if (length > buffer.length) {
buffer = Arrays.copyOf(buffer, Ints.saturatedCast(length));
}
}
@Override
public synchronized void onContent(Response response, ByteBuffer content)
{
int length = content.remaining();
int requiredCapacity = size + length;
if (requiredCapacity > buffer.length) {
if (requiredCapacity > maxLength) {
response.abort(new ResponseTooLargeException());
return;
}
// newCapacity = min(log2ceiling(requiredCapacity), maxLength);
int newCapacity = min(Integer.highestOneBit(requiredCapacity) << 1, maxLength);
buffer = Arrays.copyOf(buffer, newCapacity);
}
content.get(buffer, size, length);
size += length;
}
@Override
public synchronized void onComplete(Result result)
{
Throwable throwable = result.getFailure();
if (throwable != null) {
future.failed(throwable);
}
else {
future.completed(result.getResponse(), new ByteArrayInputStream(buffer, 0, size));
}
}
}
}