Package com.linkedin.r2.transport.http.client

Source Code of com.linkedin.r2.transport.http.client.TestHttpNettyClient$NoCreations

/*
   Copyright (c) 2012 LinkedIn Corp.

   Licensed under the Apache License, Version 2.0 (the "License");
   you may not use this file except in compliance with the License.
   You may obtain a copy of the License at

       http://www.apache.org/licenses/LICENSE-2.0

   Unless required by applicable law or agreed to in writing, software
   distributed under the License is distributed on an "AS IS" BASIS,
   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   See the License for the specific language governing permissions and
   limitations under the License.
*/

/**
* $Id: $
*/

package com.linkedin.r2.transport.http.client;

import java.io.IOException;
import java.net.SocketAddress;
import java.net.URI;
import java.nio.channels.UnresolvedAddressException;
import java.security.NoSuchAlgorithmException;
import java.util.Collections;
import java.util.HashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.CountDownLatch;

import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLParameters;

import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.jboss.netty.handler.codec.frame.TooLongFrameException;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import com.linkedin.common.callback.Callback;
import com.linkedin.common.callback.FutureCallback;
import com.linkedin.common.util.None;
import com.linkedin.r2.RemoteInvocationException;
import com.linkedin.r2.message.RequestContext;
import com.linkedin.r2.message.rest.RestRequest;
import com.linkedin.r2.message.rest.RestRequestBuilder;
import com.linkedin.r2.message.rest.RestResponse;
import com.linkedin.r2.transport.common.bridge.client.TransportCallbackAdapter;
import com.linkedin.r2.transport.common.bridge.client.TransportClient;
import com.linkedin.r2.transport.common.bridge.common.TransportCallback;

/**
* @author Steven Ihde
* @version $Revision: $
*/

public class TestHttpNettyClient
{
  private NioClientSocketChannelFactory _factory;
  private ScheduledExecutorService _scheduler;

  private static final int TEST_MAX_RESPONSE_SIZE = 500000;

  private static final int RESPONSE_OK = 1;
  private static final int TOO_LARGE = 2;

  @BeforeClass
  public void setup()
  {
    _factory = new NioClientSocketChannelFactory(
            Executors.newCachedThreadPool(),
            Executors.newCachedThreadPool());
    _scheduler = Executors.newSingleThreadScheduledExecutor();
  }

  @AfterClass
  public void tearDown()
  {
    _factory.releaseExternalResources();
    _scheduler.shutdown();
  }

  @Test
  public void testNoChannelTimeout() throws InterruptedException
  {
    HttpNettyClient client = new HttpNettyClient(new NoCreations(), _scheduler, 500, 500, 1024*1024*2);

    RestRequest r = new RestRequestBuilder(URI.create("http://localhost/")).build();
    FutureCallback<RestResponse> cb = new FutureCallback<RestResponse>();
    TransportCallback<RestResponse> callback = new TransportCallbackAdapter<RestResponse>(cb);
    client.restRequest(r, new RequestContext(), new HashMap<String,String>(), callback);
    try
    {
      // This timeout needs to be significantly larger than the getTimeout of the netty client;
      // we're testing that the client will generate its own timeout
      cb.get(30, TimeUnit.SECONDS);
      Assert.fail("Get was supposed to time out");
    }
    catch (TimeoutException e)
    {
      // TimeoutException means the timeout for Future.get() elapsed and nothing happened.
      // Instead, we are expecting our callback to be invoked before the future timeout
      // with a timeout generated by the HttpNettyClient.
      Assert.fail("Unexpected TimeoutException, should have been ExecutionException", e);
    }
    catch (ExecutionException e)
    {
      verifyCauseChain(e, RemoteInvocationException.class, TimeoutException.class);
    }
  }

  @Test
  public void testNoResponseTimeout() throws InterruptedException, IOException
  {
    TestServer testServer = new TestServer();

    HttpNettyClient client = new HttpNettyClient(_factory, _scheduler, 1, 500, 10000, 500, 1024*1024*2);

    RestRequest r = new RestRequestBuilder(testServer.getNoResponseURI()).build();
    FutureCallback<RestResponse> cb = new FutureCallback<RestResponse>();
    TransportCallback<RestResponse> callback = new TransportCallbackAdapter<RestResponse>(cb);
    client.restRequest(r, new RequestContext(), new HashMap<String, String>(), callback);

    try
    {
      // This timeout needs to be significantly larger than the getTimeout of the netty client;
      // we're testing that the client will generate its own timeout
      cb.get(30, TimeUnit.SECONDS);
      Assert.fail("Get was supposed to time out");
    }
    catch (TimeoutException e)
    {
      // TimeoutException means the timeout for Future.get() elapsed and nothing happened.
      // Instead, we are expecting our callback to be invoked before the future timeout
      // with a timeout generated by the HttpNettyClient.
      Assert.fail("Unexpected TimeoutException, should have been ExecutionException", e);
    }
    catch (ExecutionException e)
    {
      verifyCauseChain(e, RemoteInvocationException.class, TimeoutException.class);
    }
    testServer.shutdown();
  }

  @Test
  public void testBadAddress() throws InterruptedException, IOException, TimeoutException
  {
    HttpNettyClient client = new HttpNettyClient(_factory, _scheduler, 1, 30000, 10000, 500,1024*1024*2);

    RestRequest r = new RestRequestBuilder(URI.create("http://this.host.does.not.exist.linkedin.com")).build();
    FutureCallback<RestResponse> cb = new FutureCallback<RestResponse>();
    TransportCallback<RestResponse> callback = new TransportCallbackAdapter<RestResponse>(cb);
    client.restRequest(r, new RequestContext(), new HashMap<String,String>(), callback);
    try
    {
      cb.get(30, TimeUnit.SECONDS);
      Assert.fail("Get was supposed to fail");
    }
    catch (ExecutionException e)
    {
      verifyCauseChain(e, RemoteInvocationException.class, UnresolvedAddressException.class);
    }
  }

  @Test
  public void testMaxResponseSize() throws InterruptedException, IOException, TimeoutException
  {
    testResponseSize(TEST_MAX_RESPONSE_SIZE-1, RESPONSE_OK);

    testResponseSize(TEST_MAX_RESPONSE_SIZE, RESPONSE_OK);

    testResponseSize(TEST_MAX_RESPONSE_SIZE+1, TOO_LARGE);
  }


  public void testResponseSize(int responseSize, int expectedResult) throws InterruptedException, IOException, TimeoutException
  {
    TestServer testServer = new TestServer();

    HttpNettyClient client = new HttpNettyClient(_factory, _scheduler, 1, 50000, 10000, 500, TEST_MAX_RESPONSE_SIZE);

    RestRequest r = new RestRequestBuilder(testServer.getResponseOfSizeURI(responseSize)).build();
    FutureCallback<RestResponse> cb = new FutureCallback<RestResponse>();
    TransportCallback<RestResponse> callback = new TransportCallbackAdapter<RestResponse>(cb);
    client.restRequest(r, new RequestContext(), new HashMap<String, String>(), callback);

    try
    {
      cb.get(30, TimeUnit.SECONDS);
      if(expectedResult == TOO_LARGE)
      {
        Assert.fail("Max response size exceeded, expected exception. ");
      }
    }
    catch (ExecutionException e)
    {
      if(expectedResult == RESPONSE_OK)
      {
        Assert.fail("Unexpected ExecutionException, response was <= max response size.");
      }
      verifyCauseChain(e, RemoteInvocationException.class, TooLongFrameException.class);

    }
    testServer.shutdown();
  }

  @Test
  public void testShutdown() throws ExecutionException, TimeoutException, InterruptedException
  {
    TransportClient client = new HttpClientFactory().getClient(
            Collections.<String, String>emptyMap());

    FutureCallback<None> shutdownCallback = new FutureCallback<None>();
    client.shutdown(shutdownCallback);
    shutdownCallback.get(30, TimeUnit.SECONDS);

    // Now verify a new request will also fail
    RestRequest r = new RestRequestBuilder(URI.create("http://no.such.host.linkedin.com")).build();
    FutureCallback<RestResponse> callback = new FutureCallback<RestResponse>();
    client.restRequest(r, new RequestContext(), new HashMap<String,String>(), new TransportCallbackAdapter<RestResponse>(callback));
    try
    {
      callback.get(30, TimeUnit.SECONDS);
    }
    catch (ExecutionException e)
    {
      // Expected
    }

  }
  @Test
  private void testShutdownStuckInPool() throws InterruptedException, ExecutionException, TimeoutException

  {
    // Test that shutdown works when the outstanding request is stuck in the pool waiting for a channel
    HttpNettyClient client = new HttpNettyClient(new NoCreations(), _scheduler, 60000, 1,1024*1024*2);

    RestRequest r = new RestRequestBuilder(URI.create("http://some.host/")).build();
    FutureCallback<RestResponse> futureCallback = new FutureCallback<RestResponse>();
    client.restRequest(r, new RequestContext(), new HashMap<String, String>(), new TransportCallbackAdapter<RestResponse>(futureCallback));

    FutureCallback<None> shutdownCallback = new FutureCallback<None>();
    client.shutdown(shutdownCallback);

    shutdownCallback.get(30, TimeUnit.SECONDS);

    try
    {
      futureCallback.get(30, TimeUnit.SECONDS);
      Assert.fail("get should have thrown exception");
    }
    catch (ExecutionException e)
    {
      verifyCauseChain(e, RemoteInvocationException.class, TimeoutException.class);
    }
  }

  @Test
  public void testShutdownRequestOutstanding()
          throws IOException, ExecutionException, TimeoutException, InterruptedException
  {
    // Test that it works when the shutdown kills the outstanding request...
    testShutdownRequestOutstanding(500, 60000,
                                   RemoteInvocationException.class,
                                   TimeoutException.class);
  }

  @Test
  public void testShutdownRequestOutstanding2()
          throws IOException, ExecutionException, TimeoutException, InterruptedException
  {
    // Test that it works when the request timeout kills the outstanding request...
    testShutdownRequestOutstanding(60000, 500,
                                   RemoteInvocationException.class,
                                   // sometimes the test fails with ChannelClosedException
                                   // TimeoutException.class
                                   Exception.class
                                   );

  }

  private void testShutdownRequestOutstanding(int shutdownTimeout, int requestTimeout,
                                              Class<?>... causeChain)
          throws InterruptedException, IOException, ExecutionException, TimeoutException
  {
    TestServer testServer = new TestServer();

    HttpNettyClient client = new HttpNettyClient(_factory, _scheduler, 1, requestTimeout, 10000, shutdownTimeout, 1024*1024*2);

    RestRequest r = new RestRequestBuilder(testServer.getNoResponseURI()).build();
    FutureCallback<RestResponse> cb = new FutureCallback<RestResponse>();
    TransportCallback<RestResponse> callback = new TransportCallbackAdapter<RestResponse>(cb);
    client.restRequest(r, new RequestContext(), new HashMap<String,String>(), callback);

    FutureCallback<None> shutdownCallback = new FutureCallback<None>();
    client.shutdown(shutdownCallback);
    shutdownCallback.get(30, TimeUnit.SECONDS);

    try
    {
      // This timeout needs to be significantly larger than the getTimeout of the netty client;
      // we're testing that the client will generate its own timeout
      cb.get(30, TimeUnit.SECONDS);
      Assert.fail("Get was supposed to time out");
    }
    catch (TimeoutException e)
    {
      // TimeoutException means the timeout for Future.get() elapsed and nothing happened.
      // Instead, we are expecting our callback to be invoked before the future timeout
      // with a timeout generated by the HttpNettyClient.
      Assert.fail("Get timed out, should have thrown ExecutionException", e);
    }
    catch (ExecutionException e)
    {
      verifyCauseChain(e, causeChain);
    }
    testServer.shutdown();
  }

  private static void verifyCauseChain(Throwable throwable, Class<?>... causes)
  {
    Throwable t = throwable;
    for (Class<?> c : causes)
    {
      Throwable cause = t.getCause();
      if (cause == null)
      {
        Assert.fail("Cause chain ended too early", throwable);
      }
      if (!c.isAssignableFrom(cause.getClass()))
      {
        Assert.fail("Expected cause " + c.getName() + " not " + cause.getClass().getName(), throwable);
      }
      t = cause;
    }

  }

  // Test that cannot pass pass SSLParameters without SSLContext.
  // This in fact tests HttpClientPipelineFactory constructor through HttpNettyClient
  // constructor.
  @Test
  public void testClientPipelineFactory1() throws NoSuchAlgorithmException
  {
    try
    {
      new HttpNettyClient(_factory, _scheduler, 1, 1, 1, 1, 1, null, new SSLParameters(), Integer.MAX_VALUE, _scheduler, Integer.MAX_VALUE);
    }
    catch (IllegalArgumentException e)
    {
      // Check exception message to make sure it's the expected one.
      Assert.assertEquals(e.getMessage(),
                          "SSLParameters passed with no SSLContext");
    }
  }

  // Test that cannot set cipher suites in SSLParameters that don't have any match in
  // SSLContext.
  // This in fact tests HttpClientPipelineFactory constructor through HttpNettyClient
  // constructor.
  @Test
  public void testClientPipelineFactory2Fail() throws NoSuchAlgorithmException
  {
    String[] requestedCipherSuites = { "Unsupported" };
    SSLParameters sslParameters = new SSLParameters();
    sslParameters.setCipherSuites(requestedCipherSuites);
    try
    {
      new HttpNettyClient(_factory,
                          _scheduler,
                          1,
                          1,
                          1,
                          1,
                          1,
                          SSLContext.getDefault(),
                          sslParameters,
                          Integer.MAX_VALUE,
                          _scheduler,
                          Integer.MAX_VALUE);
    }
    catch (IllegalArgumentException e)
    {
      // Check exception message to make sure it's the expected one.
      Assert.assertEquals(e.getMessage(),
                          "None of the requested cipher suites: [Unsupported] are found in SSLContext");
    }
  }

  // Test that can set cipher suites in SSLParameters that have at least one match in
  // SSLContext.
  // This in fact tests HttpClientPipelineFactory constructor through HttpNettyClient
  // constructor.
  @Test
  public void testClientPipelineFactory2Pass() throws NoSuchAlgorithmException
  {
    String[] requestedCipherSuites = { "Unsupported", "SSL_RSA_WITH_RC4_128_SHA" };
    SSLParameters sslParameters = new SSLParameters();
    sslParameters.setCipherSuites(requestedCipherSuites);
    new HttpNettyClient(_factory,
                        _scheduler,
                        1,
                        1,
                        1,
                        1,
                        1,
                        SSLContext.getDefault(),
                        sslParameters,
                        Integer.MAX_VALUE,
                        _scheduler,
                        Integer.MAX_VALUE);
  }

  // Test that cannot set protocols in SSLParameters that don't have any match in
  // SSLContext.
  // This in fact tests HttpClientPipelineFactory constructor through HttpNettyClient
  // constructor.
  @Test
  public void testClientPipelineFactory3Fail() throws NoSuchAlgorithmException
  {
    String[] requestedProtocols = { "Unsupported" };
    SSLParameters sslParameters = new SSLParameters();
    sslParameters.setProtocols(requestedProtocols);
    try
    {
      new HttpNettyClient(_factory,
                          _scheduler,
                          1,
                          1,
                          1,
                          1,
                          1,
                          SSLContext.getDefault(),
                          sslParameters,
                          Integer.MAX_VALUE,
                          _scheduler,
                          Integer.MAX_VALUE);
    }
    catch (IllegalArgumentException e)
    {
      // Check exception message to make sure it's the expected one.
      Assert.assertEquals(e.getMessage(),
                          "None of the requested protocols: [Unsupported] are found in SSLContext");
    }
  }

  // Test that can set protocols in SSLParameters that have at least one match in
  // SSLContext.
  // This in fact tests HttpClientPipelineFactory constructor through HttpNettyClient
  // constructor.
  @Test
  public void testClientPipelineFactory3Pass() throws NoSuchAlgorithmException
  {
    String[] requestedProtocols = { "Unsupported", "TLSv1" };
    SSLParameters sslParameters = new SSLParameters();
    sslParameters.setProtocols(requestedProtocols);
    new HttpNettyClient(_factory,
                        _scheduler,
                        1,
                        1,
                        1,
                        1,
                        1,
                        SSLContext.getDefault(),
                        sslParameters,
                        Integer.MAX_VALUE,
                        _scheduler,
                        Integer.MAX_VALUE);
  }

  @Test
  public void testPoolStatsProviderManager() throws InterruptedException, ExecutionException, TimeoutException
  {
    final CountDownLatch setLatch = new CountDownLatch(1);
    final CountDownLatch removeLatch = new CountDownLatch(1);
    AbstractJmxManager manager = new AbstractJmxManager()
    {
      @Override
      public void onProviderCreate(PoolStatsProvider provider)
      {
        setLatch.countDown();
      }

      @Override
      public void onProviderShutdown(PoolStatsProvider provider)
      {
        removeLatch.countDown();
      }
    };

    HttpNettyClient client = new HttpNettyClient(_factory,
                                                  _scheduler,
                                                  1,
                                                  1,
                                                  1,
                                                  1000,
                                                  1,
                                                  null,
                                                  null,
                                                  Integer.MAX_VALUE,
                                                  _scheduler,
                                                  Integer.MAX_VALUE,
                                                  HttpClientFactory.DEFAULT_CLIENT_NAME,
                                                  manager);
    // test setPoolStatsProvider
    try
    {
      setLatch.await(30, TimeUnit.SECONDS);
    }
    catch (InterruptedException e)
    {
      Assert.fail("PoolStatsAware setPoolStatsProvider didn't get called when creating channel pool.");
    }
    // test removePoolStatsProvider
    FutureCallback<None> shutdownCallback = new FutureCallback<None>();
    client.shutdown(shutdownCallback);
    try
    {
      removeLatch.await(30, TimeUnit.SECONDS);
    }
    catch (InterruptedException e)
    {
      Assert.fail("PoolStatsAware removePoolStatsProvider didn't get called when shutting down channel pool.");
    }
    shutdownCallback.get(30, TimeUnit.SECONDS);
  }

  /*
  //@Test
  public void testFullPoolTimeout()
  {
    AsyncPool<Object> pool = new AsyncPoolImpl<Object>("object pool",
                                                       new TestAsyncPool.SynchronousLifecycle(),
                                                       1,
                                                       100,
                                                       _executor
                                                       );
    pool.start();
    FutureCallback<Object> cb1 = new FutureCallback<Object>();
    pool.get(cb1);
    try
    {
      Object o = cb1.getRaw();
      Assert.assertNotNull(o);
    }
    catch (Exception e)
    {
      Assert.fail("getResponse failed unexpectedly", e);
    }

    FutureCallback<Object> cb2 = new FutureCallback<Object>();
    // This get should fail since pool size is one and previous object not returned
    pool.get(cb2);

    try
    {
      Object o = cb2.getRaw();
      Assert.fail("Get was supposed to time out");
    }
    catch (TimeoutException e)
    {
      // correct
    }
    catch (Exception e)
    {
      Assert.fail("Failed with unexpected exception instead of TimeoutException", e);
    }

  }
*/
  private static class NoCreations implements ChannelPoolFactory
  {
    @Override
    public AsyncPool<Channel> getPool(SocketAddress address)
    {
      return new AsyncPoolImpl<Channel>("fake pool", new AsyncPool.Lifecycle<Channel>()
      {
        @Override
        public void create(Callback<Channel> channelCallback)
        {

        }

        @Override
        public boolean validateGet(Channel obj)
        {
          return false;
        }

        @Override
        public boolean validatePut(Channel obj)
        {
          return false;
        }

        @Override
        public void destroy(Channel obj, boolean error, Callback<Channel> channelCallback)
        {

        }

        @Override
        public PoolStats.LifecycleStats getStats()
        {
          return null;
        }
      }, 0, 0, null);
    }

  }

}
TOP

Related Classes of com.linkedin.r2.transport.http.client.TestHttpNettyClient$NoCreations

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.