Package com.spotify.netty.handler.codec.zmtp

Source Code of com.spotify.netty.handler.codec.zmtp.EndToEndTest$Handler

/*
* Copyright (c) 2012-2014 Spotify AB
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package com.spotify.netty.handler.codec.zmtp;

import com.google.common.collect.Queues;

import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandler;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.junit.Test;

import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.concurrent.BlockingQueue;

import static com.spotify.netty.handler.codec.zmtp.ZMTPConnectionType.Addressed;
import static com.spotify.netty.handler.codec.zmtp.ZMTPSession.DEFAULT_SIZE_LIMIT;
import static com.spotify.netty.handler.codec.zmtp.ZMTPSocketType.DEALER;
import static com.spotify.netty.handler.codec.zmtp.ZMTPSocketType.ROUTER;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.hamcrest.core.Is.is;
import static org.hamcrest.core.IsNull.notNullValue;
import static org.hamcrest.core.IsNull.nullValue;
import static org.junit.Assert.assertThat;

public class EndToEndTest {

  public static final byte[] NO_IDENTITY = null;

  public static final boolean INTEROP_ON = true;
  private static final boolean INTEROP_OFF = false;

  public static final InetSocketAddress ANY_PORT = new InetSocketAddress("127.0.0.1", 0);

  private Channel bind(final SocketAddress address, final ChannelHandler codec,
                       final ChannelHandler handler) {
    final ServerBootstrap bootstrap = new ServerBootstrap(new NioServerSocketChannelFactory());
    bootstrap.setPipeline(Channels.pipeline(codec, handler));
    return bootstrap.bind(address);
  }

  private Channel connect(final SocketAddress address, final ChannelHandler codec,
                          final ChannelHandler handler) {
    final ClientBootstrap bootstrap = new ClientBootstrap(new NioClientSocketChannelFactory());
    bootstrap.setPipeline(Channels.pipeline(codec, handler));
    return bootstrap.connect(address).getChannel();
  }

  public void testRequestReply(final ChannelHandler serverCodec, final ChannelHandler clientCodec)
      throws InterruptedException {

    // Set up server & client
    Handler server = new Handler();
    Handler client = new Handler();
    Channel serverChannel = bind(ANY_PORT, serverCodec, server);
    SocketAddress address = serverChannel.getLocalAddress();
    Channel clientChannel = connect(address, clientCodec, client);
    Channel clientConnectedChannel = client.connected.poll(5, SECONDS);
    assertThat(clientConnectedChannel, is(notNullValue()));
    Channel serverConnectedChannel = server.connected.poll(5, SECONDS);
    assertThat(serverConnectedChannel, is(notNullValue()));

    // Make sure there's no left over messages/connections on the wires
    Thread.sleep(1000);
    assertThat("unexpected server message", server.messages.poll(), is(nullValue()));
    assertThat("unexpected client message", client.messages.poll(), is(nullValue()));
    assertThat("unexpected server connection", server.connected.poll(), is(nullValue()));
    assertThat("unexpected client connection", client.connected.poll(), is(nullValue()));

    // Send and receive request
    clientChannel.write(helloWorldMessage());
    ZMTPIncomingMessage receivedRequest = server.messages.poll(5, SECONDS);
    assertThat(receivedRequest, is(notNullValue()));
    assertThat(receivedRequest.getMessage(), is(helloWorldMessage()));

    // Send and receive reply
    serverConnectedChannel.write(fooBarMessage());
    ZMTPIncomingMessage receivedReply = client.messages.poll(5, SECONDS);
    assertThat(receivedReply, is(notNullValue()));
    assertThat(receivedReply.getMessage(), is(fooBarMessage()));

    // Make sure there's no left over messages/connections on the wires
    Thread.sleep(1000);
    assertThat("unexpected server message", server.messages.poll(), is(nullValue()));
    assertThat("unexpected client message", client.messages.poll(), is(nullValue()));
    assertThat("unexpected server connection", server.connected.poll(), is(nullValue()));
    assertThat("unexpected client connection", client.connected.poll(), is(nullValue()));
  }

  @Test
  public void testZMTP10_RouterDealer() throws InterruptedException {
    ZMTP10Codec server = new ZMTP10Codec(new ZMTPSession(
        Addressed, DEFAULT_SIZE_LIMIT, NO_IDENTITY, ROUTER));
    ZMTP10Codec client = new ZMTP10Codec(new ZMTPSession(
        Addressed, DEFAULT_SIZE_LIMIT, NO_IDENTITY, DEALER));
    testRequestReply(server, client);
  }

  @Test
  public void testZMTP20_RouterDealer_WithInterop() throws InterruptedException {
    ZMTP20Codec server = new ZMTP20Codec(new ZMTPSession(
        Addressed, DEFAULT_SIZE_LIMIT, NO_IDENTITY, ROUTER), INTEROP_ON);
    ZMTP20Codec client = new ZMTP20Codec(new ZMTPSession(
        Addressed, DEFAULT_SIZE_LIMIT, NO_IDENTITY, DEALER), INTEROP_ON);
    testRequestReply(server, client);
  }

  @Test
  public void test_ZMTP20Server_ZMTP10Client_RouterDealer() throws InterruptedException {
    ZMTP20Codec server = new ZMTP20Codec(new ZMTPSession(
        Addressed, DEFAULT_SIZE_LIMIT, NO_IDENTITY, ROUTER), INTEROP_ON);
    ZMTP10Codec client = new ZMTP10Codec(new ZMTPSession(
        Addressed, DEFAULT_SIZE_LIMIT, NO_IDENTITY, DEALER));
    testRequestReply(server, client);
  }

  @Test
  public void testZMTP20_RouterDealer_WithNoInterop() throws InterruptedException {
    ZMTP20Codec server = new ZMTP20Codec(new ZMTPSession(
        Addressed, DEFAULT_SIZE_LIMIT, NO_IDENTITY, ROUTER), INTEROP_OFF);
    ZMTP20Codec client = new ZMTP20Codec(new ZMTPSession(
        Addressed, DEFAULT_SIZE_LIMIT, NO_IDENTITY, DEALER), INTEROP_OFF);
    testRequestReply(server, client);
  }

  private ZMTPMessage helloWorldMessage() {
    return ZMTPMessage.fromStringsUTF8(true, "", "hello", "world");
  }

  private ZMTPMessage fooBarMessage() {
    return ZMTPMessage.fromStringsUTF8(true, "", "foo", "bar");
  }

  private static class Handler extends SimpleChannelUpstreamHandler {

    private BlockingQueue<Channel> connected = Queues.newLinkedBlockingQueue();
    private BlockingQueue<ZMTPIncomingMessage> messages = Queues.newLinkedBlockingQueue();

    @Override
    public void channelConnected(final ChannelHandlerContext ctx,
                                 final ChannelStateEvent e) throws Exception {
      super.channelConnected(ctx, e);
      connected.add(ctx.getChannel());
    }

    @Override
    public void messageReceived(final ChannelHandlerContext ctx, final MessageEvent e)
        throws Exception {
      messages.put((ZMTPIncomingMessage) e.getMessage());
    }
  }
}
TOP

Related Classes of com.spotify.netty.handler.codec.zmtp.EndToEndTest$Handler

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.