Package io.undertow.websockets.core

Source Code of io.undertow.websockets.core.WebSocketChannel

* JBoss, Home of Professional Open Source.
* Copyright 2012 Red Hat, Inc., and individual contributors
* as indicated by the @author tags.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package io.undertow.websockets.core;

import io.undertow.channels.IdleTimeoutConduit;
import org.xnio.ChannelExceptionHandler;
import org.xnio.ChannelListener;
import org.xnio.ChannelListener.Setter;
import org.xnio.ChannelListeners;
import org.xnio.IoUtils;
import org.xnio.Option;
import org.xnio.Pool;
import org.xnio.Pooled;
import org.xnio.StreamConnection;
import org.xnio.XnioIoThread;
import org.xnio.XnioWorker;
import org.xnio.channels.ConnectedChannel;
import org.xnio.channels.StreamSinkChannel;
import org.xnio.channels.StreamSourceChannel;
import org.xnio.conduits.PushBackStreamSourceConduit;

import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.Collections;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.xnio.IoUtils.safeClose;

* A {@link ConnectedChannel} which can be used to send and receive WebSocket Frames.
* @author <a href="">Norman Maurer</a>
* @author Stuart Douglas
public abstract class WebSocketChannel implements ConnectedChannel {

    private final boolean client;

    private final Queue<SendChannel> senders = new ArrayDeque<SendChannel>();
    private final StreamConnection channel;
    private final StreamConnection connectedChannel;
    private final IdleTimeoutConduit idleTimeoutConduit;

    private final WebSocketVersion version;
    private final String wsUrl;
    private final ChannelListener.SimpleSetter<WebSocketChannel> closeSetter;
    private final ChannelListener.SimpleSetter<WebSocketChannel> receiveSetter;
    private final PushBackStreamSourceConduit pushBackConduit;
    private final Pool<ByteBuffer> bufferPool;

    private volatile StreamSourceFrameChannel receiver;
     * an incoming frame that has not been created yet
    private volatile PartialFrame partialFrame;

    private final AtomicBoolean broken = new AtomicBoolean(false);

    private boolean receivesSuspended = true;
    private boolean closeFrameReceived;
    private boolean closeFrameSent;
    private final Set<String> subProtocols;
    private final boolean extensionsSupported;

    // TODO: Maybe init lazy to safe memory when not used by the user ?
    private final ConcurrentMap<String, Object> attrs = new ConcurrentHashMap<String, Object>();

     * Create a new {@link WebSocketChannel}
     * 8
     * @param connectedStreamChannel The {@link org.xnio.channels.ConnectedStreamChannel} over which the WebSocket Frames should get send and received.
     *                               Be aware that it already must be "upgraded".
     * @param bufferPool             The {@link org.xnio.Pool} which will be used to acquire {@link java.nio.ByteBuffer}'s from.
     * @param version                The {@link io.undertow.websockets.core.WebSocketVersion} of the {@link io.undertow.websockets.core.WebSocketChannel}
     * @param wsUrl                  The url for which the {@link io.undertow.websockets.core.protocol.version00.WebSocket00Channel} was created.
     * @param client
    protected WebSocketChannel(final StreamConnection connectedStreamChannel, Pool<ByteBuffer> bufferPool, WebSocketVersion version, String wsUrl, Set<String> subProtocols, final boolean client, boolean extensionsSupported) {
        this.client = client;
        IdleTimeoutConduit idle = new IdleTimeoutConduit(connectedStreamChannel.getSinkChannel().getConduit(), connectedStreamChannel.getSourceChannel().getConduit());
        this.idleTimeoutConduit = idle;
        channel = connectedStreamChannel;
        this.version = version;
        this.wsUrl = wsUrl;
        this.bufferPool = bufferPool;
        this.extensionsSupported = extensionsSupported;
        this.subProtocols = Collections.unmodifiableSet(subProtocols);
        connectedChannel = connectedStreamChannel;

        closeSetter = new ChannelListener.SimpleSetter<WebSocketChannel>();
        receiveSetter = new ChannelListener.SimpleSetter<WebSocketChannel>();

        this.pushBackConduit = new PushBackStreamSourceConduit(channel.getSourceChannel().getConduit());

        channel.getSourceChannel().getReadSetter().set(new WebSocketReadListener());
        connectedStreamChannel.getSinkChannel().getWriteSetter().set(new WebSocketWriteListener());
        connectedStreamChannel.getSinkChannel().getCloseSetter().set(new WebSocketCloseListener());

    public final boolean setAttribute(String key, Object value) {
        if (value == null) {
            return attrs.remove(key) != null;
        } else {
            return attrs.putIfAbsent(key, value) == null;

    public final Object getAttribute(String key) {
        return attrs.get(key);

     * Returns {@code true} if extensions are supported by this WebSocket Channel.
    public boolean areExtensionsSupported() {
        return extensionsSupported;

     * Returns an unmodifiable {@link Set} of the selected subprotocols if any.
    public Set<String> getSubProtocols() {
        return subProtocols;

     * Get the buffer pool for this connection.
     * @return the buffer pool for this connection
    public Pool<ByteBuffer> getBufferPool() {
        return bufferPool;

     * Check if the given {@link java.nio.channels.Channel} is currently active
    private boolean isActive(StreamSinkFrameChannel channel) {
        SendChannel sender = senders.peek();
        if (sender == channel) {
            return true;
        if (sender instanceof FragmentedMessageChannelImpl) {
            return ((FragmentedMessageChannelImpl) sender).isActive(channel);
        return false;

    public SocketAddress getLocalAddress() {
        return connectedChannel.getLocalAddress();

    public <A extends SocketAddress> A getLocalAddress(Class<A> type) {
        return connectedChannel.getLocalAddress(type);

    public XnioWorker getWorker() {
        return channel.getWorker();

    public XnioIoThread getIoThread() {
        return channel.getIoThread();

    public boolean supportsOption(Option<?> option) {
        return channel.supportsOption(option);

    public <T> T getOption(Option<T> option) throws IOException {
        return channel.getOption(option);

    public <T> T setOption(Option<T> option, T value) throws IOException {
        return channel.setOption(option, value);

    public boolean isOpen() {
        return channel.isOpen();

    public boolean isCloseFrameReceived() {
        return closeFrameReceived;

    public boolean isCloseFrameSent() {
        return closeFrameSent;

    public SocketAddress getPeerAddress() {
        return connectedChannel.getPeerAddress();

    public <A extends SocketAddress> A getPeerAddress(Class<A> type) {
        return connectedChannel.getPeerAddress(type);

     * Get the request URI scheme. Normally this is one of {@code ws} or {@code wss}.
     * @return the request URI scheme
    public String getRequestScheme() {
        if (getUrl().startsWith("wss:")) {
            return "wss";
        } else {
            return "ws";

     * Return {@code true} if this is handled via WebSocket Secure.
    public boolean isSecure() {
        return "wss".equals(getRequestScheme());

     * Return the URL of the WebSocket endpoint.
     * @return url The URL of the endpoint
    public String getUrl() {
        return wsUrl;

     * Return the {@link WebSocketVersion} which is used
     * @return version The {@link WebSocketVersion} which is in use
    public WebSocketVersion getVersion() {
        return version;

     * Get the source address of the WebSocket Channel.
     * @return the source address of the WebSocket Channel
    public InetSocketAddress getSourceAddress() {
        return getPeerAddress(InetSocketAddress.class);

     * Get the destination address of the WebSocket Channel.
     * @return the destination address of the WebSocket Channel
    public InetSocketAddress getDestinationAddress() {
        return getLocalAddress(InetSocketAddress.class);

     * Async receive, returns null if no frame is ready. Otherwise returns a
     * channel that can be used to read the frame contents.
    public StreamSourceFrameChannel receive() throws IOException {
        if (receiver != null) {
            return null;
        final Pooled<ByteBuffer> pooled = getBufferPool().allocate();
        final ByteBuffer buffer = pooled.getResource();
        boolean free = true;

        try {
            if (closeFrameReceived) {
                return null;
            PartialFrame partialFrame = this.partialFrame;
            if (partialFrame == null) {
                partialFrame = this.partialFrame = receiveFrame(new StreamSourceChannelControl());

            int res;
            while (!partialFrame.isDone()) {
                try {
                    res = channel.getSourceChannel().read(buffer);
                } catch (IOException e) {
                    if (WebSocketLogger.REQUEST_LOGGER.isDebugEnabled()) {
                        WebSocketLogger.REQUEST_LOGGER.debugf(e, "Connection closed with IOException");
                    throw e;
                if (res == 0) {
                    return null;
                if (res == -1) {
                    try {

                    } catch (IOException e) {
                        if (WebSocketLogger.REQUEST_LOGGER.isDebugEnabled()) {
                            WebSocketLogger.REQUEST_LOGGER.debugf(e, "Connection closed with IOException when attempting to shut down reads");
                        // nothing we can do here.. close
                        throw e;
                    throw WebSocketMessages.MESSAGES.channelClosed();
                try {
                    partialFrame.handle(buffer, channel, pushBackConduit);
                } catch (WebSocketException e) {
                    //the data was corrupt
                    if (WebSocketLogger.REQUEST_LOGGER.isDebugEnabled()) {
                        WebSocketLogger.REQUEST_LOGGER.debugf(e, "receive failed due to Exception");
                    WebSockets.sendClose(new CloseMessage(CloseMessage.WRONG_CODE, e.getMessage()).toByteBuffer(), this, null);
                    // nothing we can do here.. close
                    throw new IOException(e);
            if (buffer.hasRemaining()) {
                // something was left in the buffer, push it back so it can be processed by the actual Source
                free = false;

            this.partialFrame = null;
            receiver = partialFrame.getChannel();
            if (receiver.getType() == WebSocketFrameType.CLOSE) {
                closeFrameReceived = true;
            return receiver;
        } finally {
            if (free) {

     * Return the {@link Setter} which will holds the {@link ChannelListener} that gets notified once a frame was
     * received.
    public Setter<WebSocketChannel> getReceiveSetter() {
        return receiveSetter;

     * Suspend the receive of new frames via {@link #receive()}
    public synchronized void suspendReceives() {
        receivesSuspended = true;
        if (receiver == null) {

    public boolean isClient() {
        return client;

     * Resume the receive of new frames via {@link #receive()}
    public synchronized void resumeReceives() {
        receivesSuspended = false;
        if (receiver == null) {

     * Forcibly closes the {@link WebSocketChannel}. Generally clients will wish to use {@link #sendClose()} for
     * a clean shutdown
    public void close() throws IOException {

     * Returns a new {@link StreamSinkFrameChannel} for sending the given {@link WebSocketFrameType} with the given payload.
     * If this method is called multiple times, subsequent {@link StreamSinkFrameChannel}'s will not be writable until all previous frames
     * were completely written.
     * @param type        The {@link WebSocketFrameType} for which a {@link StreamSinkChannel} should be created
     * @param payloadSize The size of the payload which will be included in the WebSocket Frame. This may be 0 if you want
     *                    to transmit no payload at all.
    public final StreamSinkFrameChannel send(WebSocketFrameType type, long payloadSize) throws IOException {
        if (payloadSize < 0) {
            throw WebSocketMessages.MESSAGES.negativePayloadLength();
        if (broken.get()) {
            throw WebSocketMessages.MESSAGES.streamIsBroken();

        StreamSinkFrameChannel ch = createStreamSinkChannel(channel.getSinkChannel(), type, payloadSize);
        synchronized (this) {
            if (type == WebSocketFrameType.PING || type == WebSocketFrameType.PONG || type == WebSocketFrameType.CLOSE) {
                // PING / PONG / CLOSE frames can be send while a fragmented message is send, so take special care
                SendChannel sch = senders.peek();
                if (sch instanceof FragmentedMessageChannelImpl) {
                    ((FragmentedMessageChannelImpl) sch).fragmentedSenders.add(ch);
                } else {
            } else {

            if (isActive(ch)) {
                // Channel is first in the queue so mark it as active
            return ch;

     * Return a {@link FragmentedMessageChannel} which can be used t send a TEXT WebSocket message in fragments.
     * This means the first fragment will be send as TEXT frame and the following as CONTINUATION frames.
     * <p/>
     * If this method is called multiple times, subsequent {@link FragmentedMessageChannel}'s will not be writable until all previous frames
     * were completely written.
    public final synchronized FragmentedMessageChannel sendFragmentedText() {
        FragmentedMessageChannelImpl fragmentedMessageChannel = new FragmentedMessageChannelImpl(WebSocketFrameType.TEXT);
        return fragmentedMessageChannel;

     * Return a {@link FragmentedMessageChannel} which can be used t send a BINARY WebSocket message in fragments.
     * This means the first fragment will be send as TEXT frame and the following as CONTINUATION frames.
     * <p/>
     * If this method is called multiple times, subsequent {@link FragmentedMessageChannel}'s will not be writable until all previous frames
     * were completely written.
    public final synchronized FragmentedMessageChannel sendFragmentedBinary() {
        FragmentedMessageChannelImpl fragmentedMessageChannel = new FragmentedMessageChannelImpl(WebSocketFrameType.BINARY);
        return fragmentedMessageChannel;

     * Send a Close frame without a payload
    public void sendClose() throws IOException {
        StreamSinkFrameChannel closeChannel = send(WebSocketFrameType.CLOSE, 0);
        if (!closeChannel.flush()) {
                    null, new ChannelExceptionHandler<StreamSinkFrameChannel>() {
                public void handleException(final StreamSinkFrameChannel channel, final IOException exception) {

    public Setter<? extends WebSocketChannel> getCloseSetter() {
        return closeSetter;

     * Create a new {@link StreamSourceFrameChannel}  which can be used to read the data of the received WebSocket Frame
     * @param streamSourceChannelControl@return
     *         channel                  A {@link StreamSourceFrameChannel} will be used to read a Frame from.
     *         This will return {@code null} if the right {@link StreamSourceFrameChannel} could not be detected with the given
     *         buffer and so more data is needed.
    protected abstract PartialFrame receiveFrame(StreamSourceChannelControl streamSourceChannelControl);

     * Create a new StreamSinkFrameChannel which can be used to send a WebSocket Frame of the type {@link WebSocketFrameType}.
     * @param channel     The {@link StreamSinkChannel} to wrap
     * @param type        The {@link WebSocketFrameType} of the WebSocketFrame which will be send over this {@link StreamSinkFrameChannel}
     * @param payloadSize The size of the payload to transmit. May be 0 if non payload at all should be included.
    protected abstract StreamSinkFrameChannel createStreamSinkChannel(StreamSinkChannel channel, WebSocketFrameType type, long payloadSize);

     * Mark the given {@link StreamSinkFrameChannel} as complete and so remove the obtained ones. Calling this method will also
     * take care of call {@link StreamSinkFrameChannel#activate()} on the new active {@link StreamSinkFrameChannel}.
    final synchronized void complete(StreamSinkFrameChannel channel) {
        boolean active = isActive(channel);

        if(channel.getType() == WebSocketFrameType.CLOSE) {
            closeFrameSent = true;

        if (senders.peek() == channel) {
        } else {
            FragmentedMessageChannelImpl fragmented = (FragmentedMessageChannelImpl) senders.peek();
            if (fragmented != null) {
                if (fragmented.remove(channel)) {

        if (active) {
            SendChannel ch = senders.peek();

            // check if there is some sink waiting
            if (ch != null) {
                if (ch instanceof StreamSinkFrameChannel) {
                    ((StreamSinkFrameChannel) ch).activate();
                } else if (ch instanceof FragmentedMessageChannelImpl) {
                    ((FragmentedMessageChannelImpl) ch).activate();
            } else {
                WebSocketLogger.REQUEST_LOGGER.debugf("Suspending writes on %s in complete method as there is no new sender");

     * Called when a sub channel fails to fulfil its contract, and leaves the channel in an inconsistent state.
     * <p/>
     * The underlying channel will be closed, and any sub channels that have reads/writes resumed will have their
     * listeners notified. It is expected that these listeners will then attempt to use the channel, and their standard
     * error handling logic will take over
    @SuppressWarnings({"unchecked", "rawtypes"})
    void markBroken() {
        if (broken.compareAndSet(false, true)) {

            StreamSourceFrameChannel receiver = this.receiver;
            if (receiver != null && receiver.isReadResumed()) {
                receiver.queueListener(((ChannelListener.SimpleSetter) receiver.getReadSetter()).get());
            synchronized (this) {
                for (final SendChannel channel : senders) {
                    //we just activate them all at once
                    //the underlying channel is already closed, so they cannot write anyway
                    if (channel instanceof StreamSinkFrameChannel) {
                        ((StreamSinkFrameChannel) channel).activate();
                    } else if (channel instanceof FragmentedMessageChannelImpl) {
                        ((FragmentedMessageChannelImpl) channel).activate();

     * {@link ChannelListener} which delegates the read notification to the appropriate listener
    private final class WebSocketReadListener implements ChannelListener<StreamSourceChannel> {
        @SuppressWarnings({"unchecked", "rawtypes"})
        public void handleEvent(final StreamSourceChannel channel) {
            final StreamSourceFrameChannel receiver = WebSocketChannel.this.receiver;
            if (receiver != null) {
                final ChannelListener listener = ((SimpleSetter) receiver.getReadSetter()).get();
                if (listener != null) {
                    WebSocketLogger.REQUEST_LOGGER.debugf("Invoking read listener %s on %s", listener, receiver);
                    ChannelListeners.invokeChannelListener(receiver, listener);
                } else {
                    WebSocketLogger.REQUEST_LOGGER.debugf("Suspending reads on channel %s due to no listener", receiver);
            } else if (closeFrameReceived) {
            } else {
                final ChannelListener listener = receiveSetter.get();
                if (listener != null) {
                    WebSocketLogger.REQUEST_LOGGER.debugf("Invoking receive listener", receiver);
                    ChannelListeners.invokeChannelListener(WebSocketChannel.this, listener);
                } else {

    private class WebSocketWriteListener implements ChannelListener<StreamSinkChannel> {
        public void handleEvent(final StreamSinkChannel channel) {
            SendChannel ch = null, oldCh;
            for (; ; ) {
                oldCh = ch;
                boolean writeResumed = false;
                final StreamSinkFrameChannel sink;
                synchronized (WebSocketChannel.this) {
                    ch = senders.peek();
                    if (ch != null) {
                        if (ch instanceof FragmentedMessageChannelImpl) {
                            FragmentedMessageChannelImpl fragmented = (FragmentedMessageChannelImpl) ch;
                            sink = fragmented.fragmentedSenders.peek();
                            if (sink != null) {
                                writeResumed = sink.isWriteResumed();

                        } else if (ch instanceof StreamSinkFrameChannel) {
                            sink = (StreamSinkFrameChannel) ch;
                            writeResumed = ((StreamSinkFrameChannel) ch).isWriteResumed();
                        } else {
                            sink = null;
                    } else {
                        sink = null;
                if (ch != null && ch != oldCh) {
                    if (!writeResumed) {
                    ChannelListener<? super StreamSinkFrameChannel> channelListener = (ChannelListener<? super StreamSinkFrameChannel>) sink.getWriteSetter().get();
                    WebSocketLogger.REQUEST_LOGGER.debugf("Invoking write listener %s on %s", channelListener, sink);
                    ChannelListeners.invokeChannelListener(sink, channelListener);
                } else if (ch == null) {
                    //we have to make sure that another channel has not been added in the mean time
                    synchronized (WebSocketChannel.this) {
                        SendChannel sendChannel = senders.peek();
                        if (sendChannel == null || (sendChannel instanceof FragmentedMessageChannelImpl && ((FragmentedMessageChannelImpl) sendChannel).fragmentedSenders.peek() == null)) {
                            WebSocketLogger.REQUEST_LOGGER.debugf("Suspending writes on channel %s due to no sender", WebSocketChannel.this);
                } else {

     * close listener, just goes through and activates any sub channels to make sure their listeners are invoked
    private class WebSocketCloseListener implements ChannelListener<StreamSinkChannel> {

        public void handleEvent(final StreamSinkChannel c) {
            StreamSourceFrameChannel receiver = WebSocketChannel.this.receiver;
            if (receiver != null && receiver.isOpen() && receiver.isReadResumed()) {
                ChannelListeners.invokeChannelListener(receiver, (ChannelListener<? super StreamSourceFrameChannel>) receiver.getReadSetter().get());
            synchronized (WebSocketChannel.this) {
                for (final SendChannel channel : senders) {
                    //we just activate them all at once
                    //the underlying channel is already closed, so they cannot write anyway
                    if (channel instanceof StreamSinkFrameChannel) {
                        ((StreamSinkFrameChannel) channel).activate();
                    } else if (channel instanceof FragmentedMessageChannelImpl) {
                        ((FragmentedMessageChannelImpl) channel).activate();
            ChannelListeners.invokeChannelListener(WebSocketChannel.this, closeSetter.get());

     * Interface that represenets a channel that is in the process of being created
    public interface PartialFrame {

         * @return The channel, or null if the channel is not availble yet
        StreamSourceFrameChannel getChannel();

         * Handles the data, any remaining data will be pushed back
        void handle(ByteBuffer data, StreamConnection channel, PushBackStreamSourceConduit pushBack) throws WebSocketException;

         * @return true if the channel is available
        boolean isDone();

    public class StreamSourceChannelControl {

        private StreamSourceChannelControl() {

         * Called once the frame was read for the given {@link StreamSourceFrameChannel}.
        public void readFrameDone(StreamSourceFrameChannel channel) {
            if (channel.getType() == WebSocketFrameType.CLOSE) {
                if (isCloseFrameSent()) {
            synchronized (WebSocketChannel.this) {
                if (channel == receiver) {
                    receiver = null;
                    if (receivesSuspended) {
                    } else {

    public void setIdleTimeout(long timeout) {

    public long getIdleTimeout() {
        return idleTimeoutConduit.getIdleTimeout();

    private final class FragmentedMessageChannelImpl implements FragmentedMessageChannel {
        private final WebSocketFrameType type;
        private boolean first = true;
        private boolean finalSent;

        private final Queue<StreamSinkFrameChannel> fragmentedSenders = new ArrayDeque<StreamSinkFrameChannel>();

        public FragmentedMessageChannelImpl(WebSocketFrameType type) {
            this.type = type;

        public StreamSinkFrameChannel send(long payloadSize, boolean finalFrame) throws IOException {
            WebSocketFrameType type;

            synchronized (this) {
                if (finalSent) {
                    throw WebSocketMessages.MESSAGES.fragmentedSenderCompleteAlready();
                if (payloadSize < 0) {
                    throw WebSocketMessages.MESSAGES.negativePayloadLength();
                if (broken.get()) {
                    throw WebSocketMessages.MESSAGES.streamIsBroken();

                if (finalFrame) {
                    finalSent = true;
                if (first) {
                    first = false;
                    type = this.type;
                } else {
                    type = WebSocketFrameType.CONTINUATION;

            StreamSinkFrameChannel sink = createStreamSinkChannel(channel.getSinkChannel(), type, payloadSize);

            synchronized (WebSocketChannel.this) {

                if (senders.peek() == this && isActive(sink)) {
            return sink;

        public WebSocketChannel getWebSocketChannel() {
            return WebSocketChannel.this;

        // Only called within synchronized block
        boolean isActive(StreamSinkFrameChannel channel) {
            return fragmentedSenders.peek() == channel;

        // Only called within synchronized block
        void activate() {
            synchronized (WebSocketChannel.this) {
                StreamSinkFrameChannel ch = fragmentedSenders.peek();

                if (ch != null) {

        // Only called within synchronized block
        boolean remove(StreamSinkFrameChannel channel) {
            return finalSent && fragmentedSenders.isEmpty();


Related Classes of io.undertow.websockets.core.WebSocketChannel

Copyright © 2018 All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact