Package io.netty.channel

Source Code of io.netty.channel.ChannelDeregistrationTest

/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
*   http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.NetUtil;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import io.netty.util.concurrent.PausableEventExecutor;
import io.netty.util.concurrent.Promise;
import org.junit.Test;

import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.junit.Assert.*;

/**
* These tests should work with any {@link SingleThreadEventLoop} implementation. We chose the
* {@link io.netty.channel.nio.NioEventLoop} because it's the most commonly used {@link EventLoop}.
*/
public class ChannelDeregistrationTest {

    private static final Runnable NOEXEC = new Runnable() {
        @Override
        public void run() {
            fail();
        }
    };

    private static final Runnable NOOP = new Runnable() {
        @Override
        public void run() {
        }
    };

    /**
     * Test deregistration and re-registration of a {@link Channel} within a {@link ChannelHandler}.
     *
     * The first {@link ChannelHandler} in the {@link ChannelPipeline} deregisters the {@link Channel} in the
     * {@linkplain ChannelHandler#channelRead(ChannelHandlerContext, Object)} method, while
     * the subsequent {@link ChannelHandler}s make sure that the {@link Channel} really is deregistered. The last
     * {@link ChannelHandler} registers the {@link Channel} with a new {@link EventLoop} and triggers a
     * {@linkplain ChannelHandler#write(ChannelHandlerContext, Object, ChannelPromise)} event, that is then
     * used by all {@link ChannelHandler}s to ensure that the {@link Channel} was correctly registered with the
     * new {@link EventLoop}.
     *
     * Most of the {@link ChannelHandler}s in the pipeline are assigned custom {@link EventExecutorGroup}s.
     * It's important to make sure that they are preserved during and after
     * {@linkplain io.netty.channel.Channel#deregister()}.
     */
    @Test(timeout = 5000)
    public void testDeregisterFromDifferentEventExecutorGroup() throws Exception  {
        final AtomicBoolean handlerExecuted1 = new AtomicBoolean();
        final AtomicBoolean handlerExecuted2 = new AtomicBoolean();
        final AtomicBoolean handlerExecuted3 = new AtomicBoolean();
        final AtomicBoolean handlerExecuted4 = new AtomicBoolean();
        final AtomicBoolean handlerExecuted5 = new AtomicBoolean();

        final EventLoopGroup group1 = new NioEventLoopGroup(1);
        final EventLoopGroup group2 = new NioEventLoopGroup(1);
        final EventLoopGroup group3 = new NioEventLoopGroup(1);

        ServerBootstrap serverBootstrap = new ServerBootstrap();
        Channel serverChannel = serverBootstrap.group(group1)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
                        // Deregister the Channel from the EventLoop group1.
                        ch.pipeline().addLast(new DeregisterHandler(handlerExecuted1, group1.next(), group2.next()));
                        // Ensure that the Channel is deregistered from EventLoop group1. Also make
                        // sure that despite deregistration the ChannelHandler is executed by the
                        // specified EventLoop.
                        ch.pipeline().addLast(group2, new ExpectDeregisteredHandler(handlerExecuted2, group2.next()));
                        ch.pipeline().addLast(group3, new ExpectDeregisteredHandler(handlerExecuted3, group3.next()));
                        ch.pipeline().addLast(group2, new ExpectDeregisteredHandler(handlerExecuted4, group2.next()));
                        // Register the Channel with EventLoop group2.
                        ch.pipeline().addLast(group3,
                                new ReregisterHandler(handlerExecuted5, group3.next(), group2.next()));
                    }
                }).bind(0).sync().channel();
        SocketAddress address = serverChannel.localAddress();
        Socket s = new Socket(NetUtil.LOCALHOST, ((InetSocketAddress) address).getPort());
        // write garbage just so to get channelRead(...) invoked.
        s.getOutputStream().write(1);

        while (!(handlerExecuted1.get() &&
                handlerExecuted2.get() &&
                handlerExecuted3.get() &&
                handlerExecuted4.get() &&
                handlerExecuted5.get())) {
            Thread.sleep(10);
        }

        s.close();
        serverChannel.close();
        group1.shutdownGracefully();
        group2.shutdownGracefully();
        group3.shutdownGracefully();
    }

    /**
     * Make sure the {@link EventLoop} and {@link ChannelHandlerInvoker} accessible from within a
     * {@link ChannelHandler} are wrapped by a {@link PausableEventExecutor}.
     */
    @Test(timeout = 5000)
    public void testWrappedEventLoop() throws Exception {
        final AtomicBoolean channelActiveCalled1 = new AtomicBoolean();
        final AtomicBoolean channelActiveCalled2 = new AtomicBoolean();
        final EventLoopGroup group1 = new NioEventLoopGroup();
        final EventLoopGroup group2 = new NioEventLoopGroup();

        ServerBootstrap serverBootstrap = new ServerBootstrap();
        Channel serverChannel = serverBootstrap.group(group1)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new TestChannelHandler3(channelActiveCalled1));
                        ch.pipeline().addLast(group2, new TestChannelHandler4(channelActiveCalled2));
                    }
                }).bind(0).sync().channel();
        SocketAddress address = serverChannel.localAddress();
        Socket client = new Socket(NetUtil.LOCALHOST, ((InetSocketAddress) address).getPort());

        while (!(channelActiveCalled1.get() && channelActiveCalled2.get())) {
            Thread.sleep(10);
        }

        client.close();
        serverChannel.close();
        group1.shutdownGracefully();
        group2.shutdownGracefully();
    }

    /**
     * Test for https://github.com/netty/netty/issues/803
     */
    @Test
    public void testReregister() throws Exception {
        final EventLoopGroup group1 = new NioEventLoopGroup();
        final EventLoopGroup group2 = new NioEventLoopGroup();
        final EventExecutorGroup group3 = new DefaultEventExecutorGroup(2);

        ServerBootstrap bootstrap = new ServerBootstrap();
        ChannelFuture future = bootstrap.channel(NioServerSocketChannel.class).group(group1)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
                    }
                }).handler(new ChannelInitializer<ServerSocketChannel>() {
                    @Override
                    public void initChannel(ServerSocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new TestChannelHandler1());
                        ch.pipeline().addLast(group3, new TestChannelHandler2());
                    }
                })
                .bind(0).awaitUninterruptibly();

        EventExecutor executor1 = future.channel().pipeline().context(TestChannelHandler1.class).executor();
        EventExecutor unwrapped1 = executor1.unwrap();
        EventExecutor executor3 = future.channel().pipeline().context(TestChannelHandler2.class).executor();

        future.channel().deregister().sync();

        Channel channel = group2.register(future.channel()).sync().channel();
        EventExecutor executor2 = channel.pipeline().context(TestChannelHandler1.class).executor();

        // same wrapped executor
        assertSame(executor1, executor2);
        // different executor under the wrapper
        assertNotSame(unwrapped1, executor2.unwrap());
        // executor3 must remain unchanged
        assertSame(executor3.unwrap(), future.channel().pipeline()
                .context(TestChannelHandler2.class)
                .executor()
                .unwrap());
    }

    /**
     * See https://github.com/netty/netty/issues/2814
     */
    @Test(timeout = 5000)
    public void testPromise() {
        ChannelHandler handler = new TestChannelHandler1();
        AbstractChannel ch = new EmbeddedChannel(handler);
        DefaultChannelPipeline p = new DefaultChannelPipeline(ch);
        DefaultChannelHandlerInvoker invoker = new DefaultChannelHandlerInvoker(GlobalEventExecutor.INSTANCE);

        ChannelHandlerContext ctx = new DefaultChannelHandlerContext(p, invoker, "Test", handler);
        // Make sure no ClassCastException is thrown
        Promise<Integer> promise = ctx.executor().newPromise();
        promise.setSuccess(0);
        assertTrue(promise.isSuccess());

        ctx = new DefaultChannelHandlerContext(p, null, "Test", handler);
        // Make sure no ClassCastException is thrown
        promise = ctx.executor().newPromise();
        promise.setSuccess(0);
        assertTrue(promise.isSuccess());
    }

    private static final class TestChannelHandler1 extends ChannelHandlerAdapter { }

    private static final class TestChannelHandler2 extends ChannelHandlerAdapter {
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { }
    }

    private static final class TestChannelHandler3 extends ChannelHandlerAdapter {
        AtomicBoolean channelActiveCalled;

        TestChannelHandler3(AtomicBoolean channelActiveCalled) {
            this.channelActiveCalled = channelActiveCalled;
        }

        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            channelActiveCalled.set(true);

            assertTrue(ctx.executor() instanceof PausableEventExecutor);
            assertTrue(ctx.channel().eventLoop() instanceof PausableEventExecutor);
            assertTrue(ctx.invoker().executor() instanceof PausableEventExecutor);
            assertTrue(ctx.channel().eventLoop().asInvoker().executor() instanceof PausableEventExecutor);
            assertSame(ctx.executor().unwrap(), ctx.channel().eventLoop().unwrap());

            super.channelActive(ctx);
        }
    }

    private static final class TestChannelHandler4 extends ChannelHandlerAdapter {
        AtomicBoolean channelActiveCalled;

        TestChannelHandler4(AtomicBoolean channelActiveCalled) {
            this.channelActiveCalled = channelActiveCalled;
        }

        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            channelActiveCalled.set(true);
            assertTrue(ctx.executor() instanceof PausableEventExecutor);
            assertTrue(ctx.channel().eventLoop() instanceof PausableEventExecutor);
            assertTrue(ctx.invoker().executor() instanceof PausableEventExecutor);
            assertTrue(ctx.channel().eventLoop().asInvoker().executor() instanceof PausableEventExecutor);

            // This is executed by its own invoker, which has to be wrapped by
            // a separate PausableEventExecutor.
            assertNotSame(ctx.executor(), ctx.channel().eventLoop());
            assertNotSame(ctx.executor().unwrap(), ctx.channel().eventLoop().unwrap());

            super.channelActive(ctx);
        }
    }

    private static final class DeregisterHandler extends ChannelHandlerAdapter {

        final AtomicBoolean handlerExecuted;
        final EventLoop expectedEventLoop1;
        final EventLoop expectedEventLoop2;

        DeregisterHandler(AtomicBoolean handlerExecuted, EventLoop expectedEventLoop1, EventLoop expectedEventLoop2) {
            this.handlerExecuted = handlerExecuted;
            this.expectedEventLoop1 = expectedEventLoop1;
            this.expectedEventLoop2 = expectedEventLoop2;
        }

        @Override
        public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
            assertSame(expectedEventLoop1, ctx.executor().unwrap());
            assertTrue(ctx.channel().isRegistered());
            ctx.channel().deregister().addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    assertFalse(ctx.channel().isRegistered());

                    boolean success = false;
                    try {
                        ctx.channel().eventLoop().execute(NOEXEC);
                        success = true;
                    } catch (Throwable t) {
                        assertTrue(t instanceof RejectedExecutionException);
                    }
                    assertFalse(success);

                    handlerExecuted.set(true);
                    ctx.fireChannelRead(msg);
                }
            });
        }

        @Override
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
            assertSame(expectedEventLoop2, ctx.executor().unwrap());

            assertTrue(ctx.channel().isRegistered());
            ctx.executor().execute(NOOP);
            ctx.channel().eventLoop().execute(NOOP);

            promise.setSuccess();
        }
    }

    private static final class ExpectDeregisteredHandler extends ChannelHandlerAdapter {

        final AtomicBoolean handlerExecuted;
        final EventLoop expectedEventLoop;

        ExpectDeregisteredHandler(AtomicBoolean handlerExecuted, EventLoop expectedEventLoop) {
            this.handlerExecuted = handlerExecuted;
            this.expectedEventLoop = expectedEventLoop;
        }

        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            assertSame(expectedEventLoop, ctx.executor().unwrap());
            assertFalse(ctx.channel().isRegistered());

            boolean success = false;
            try {
                ctx.channel().eventLoop().execute(NOEXEC);
                success = true;
            } catch (Throwable t) {
                assertTrue(t instanceof RejectedExecutionException);
            }
            assertFalse(success);

            handlerExecuted.set(true);
            super.channelRead(ctx, msg);
        }

        @Override
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
            assertSame(expectedEventLoop, ctx.executor().unwrap());

            assertTrue(ctx.channel().isRegistered());
            ctx.executor().execute(NOOP);
            ctx.channel().eventLoop().execute(NOOP);

            super.write(ctx, msg, promise);
        }
    }

    private static final class ReregisterHandler extends ChannelHandlerAdapter {
        final AtomicBoolean handlerExecuted;
        final EventLoop expectedEventLoop;
        final EventLoop newEventLoop;

        ReregisterHandler(AtomicBoolean handlerExecuted, EventLoop expectedEventLoop, EventLoop newEventLoop) {
            this.handlerExecuted = handlerExecuted;
            this.expectedEventLoop = expectedEventLoop;
            this.newEventLoop = newEventLoop;
        }

        @Override
        public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
            assertSame(expectedEventLoop, ctx.executor().unwrap());

            assertFalse(ctx.channel().isRegistered());
            newEventLoop.register(ctx.channel()).addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    assertTrue(ctx.channel().isRegistered());

                    ctx.executor().execute(NOOP);
                    ctx.channel().eventLoop().execute(NOOP);

                    ctx.write(Unpooled.buffer(), ctx.channel().newPromise()).addListener(new ChannelFutureListener() {
                        @Override
                        public void operationComplete(ChannelFuture future) throws Exception {
                            handlerExecuted.set(true);
                        }
                    });
                }
            });

            super.channelRead(ctx, msg);
        }

        @Override
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
            assertSame(expectedEventLoop, ctx.executor().unwrap());

            assertTrue(ctx.channel().isRegistered());
            ctx.executor().execute(NOOP);
            ctx.channel().eventLoop().execute(NOOP);

            super.write(ctx, msg, promise);
        }
    }
}
TOP

Related Classes of io.netty.channel.ChannelDeregistrationTest

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.