Package org.jivesoftware.openfire

Source Code of org.jivesoftware.openfire.Channel

/**
* $RCSfile$
* $Revision: 617 $
* $Date: 2004-12-03 05:59:50 -0300 (Fri, 03 Dec 2004) $
*
* Copyright (C) 2005-2008 Jive Software. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.jivesoftware.openfire;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.jivesoftware.openfire.session.Session;
import org.jivesoftware.util.LocaleUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xmpp.packet.Packet;

/**
* A channel provides a mechanism to queue work units for processing. Each work unit is
* encapsulated as a ChannelMessage, and processing of each message is performed by a
* ChannelHandler.<p>
*
* As a request is handled by the system, it will travel through a sequence of channels.
* This architecture has a number of advantages:
* <ul>
*      <li> Each request doesn't need to correspond to a thread. Instead, a thread pool
*          in each channel processes requests from a queue.
*      <li> Due to the queue at each channel, the system is much better able to respond
*          to load spikes.
* </ul><p>
*
* Channels are modeled after SEDA stages. For much much more in-depth architecture information,
* refer to the <a href="http://www.cs.berkeley.edu/~mdw/proj/sandstorm/">SEDA website</a>.
*
* @author Matt Tucker
*/
public class Channel<T extends Packet> {

  private static final Logger Log = LoggerFactory.getLogger(Channel.class);

    private String name;
    private ChannelHandler channelHandler;

    ThreadPoolExecutor executor;

    /**
     * Creates a new channel. The channel should be registered after it's created.
     *
     * @param name the name of the channel.
     * @param channelHandler the handler for this channel.
     */
    public Channel(String name, ChannelHandler<T> channelHandler) {
        this.name = name;
        this.channelHandler = channelHandler;

        executor = new ThreadPoolExecutor(1, 8, 15, TimeUnit.SECONDS, new LinkedBlockingQueue());
    }

    /**
     * Returns the name of the channel.
     *
     * @return the name of the channel.
     */
    public String getName() {
        return name;
    }

    /**
     * Enqueus a message to be handled by this channel. After the ChannelHandler is done
     * processing the message, it will be sent to the next channel. Messages with a higher
     * priority will be handled first.
     *
     * @param packet an XMPP packet to add to the channel for processing.
     */
    public void add(final T packet) {
        Runnable r = new Runnable() {
            public void run() {
                try {
                    channelHandler.process(packet);
                }
                catch (Exception e) {
                    Log.error(LocaleUtils.getLocalizedString("admin.error"), e);
                  
                        try {
                            Session session = SessionManager.getInstance().getSession(packet.getFrom());
                            session.close();
                        }
                        catch (Exception e1) {
                           Log.error(e1.getMessage(), e1);
                        }
                }
            }
        };
        executor.execute(r);
    }

    /**
     * Returns true if the channel is currently running. The channel can be started and
     * stopped by calling the start() and stop() methods.
     *
     * @return true if the channel is running.
     */
    public boolean isRunning() {
        return !executor.isShutdown();
    }

    /**
     * Starts the channel, which means that worker threads will start processing messages
     * from the queue. If the server isn't running, messages can still be enqueued.
     */
    public void start() {

    }

    /**
     * Stops the channel, which means that worker threads will stop processing messages from
     * the queue. If the server isn't running, messages can still be enqueued.
     */
    public synchronized void stop() {
        executor.shutdown();
    }

    /**
     * Returns the number of currently active worker threads in the channel. This value
     * will always fall in between the min a max thread count.
     *
     * @return the current number of worker threads.
     */
    public int getThreadCount() {
        return executor.getPoolSize();
    }

    /**
     * Returns the min number of threads the channel will use for processing messages.
     * The channel will automatically de-allocate worker threads as the queue load shrinks,
     * down to the defined minimum. This lets the channel consume fewer resources when load
     * is low.
     *
     * @return the min number of threads that can be used by the channel.
     */
    public int getMinThreadCount() {
        return executor.getCorePoolSize();
    }

    /**
     * Sets the min number of threads the channel will use for processing messages.
     * The channel will automatically de-allocate worker threads as the queue load shrinks,
     * down to the defined minimum. This lets the channel consume fewer resources when load
     * is low.
     *
     * @param minThreadCount the min number of threads that can be used by the channel.
     */
    public void setMinThreadCount(int minThreadCount) {
        executor.setCorePoolSize(minThreadCount);
    }

    /**
     * Returns the max number of threads the channel will use for processing messages. The
     * channel will automatically allocate new worker threads as the queue load grows, up to the
     * defined maximum. This lets the channel meet higher concurrency needs, but prevents too
     * many threads from being allocated, which decreases overall system performance.
     *
     * @return the max number of threads that can be used by the channel.
     */
    public int getMaxThreadCount() {
        return executor.getMaximumPoolSize();
    }

    /**
     * Sets the max number of threads the channel will use for processing messages. The channel
     * will automatically allocate new worker threads as the queue size grows, up to the defined
     * maximum. This lets the channel meet higher concurrency needs, but prevents too many threads
     * from being allocated, which decreases overall system performance.
     *
     * @param maxThreadCount the max number of threads that can be used by the channel.
     */
    public void setMaxThreadCount(int maxThreadCount) {
        executor.setMaximumPoolSize(maxThreadCount);
    }

    /**
     * Returns the current number of ChannelMessage objects waiting to be processed by
     * the channel.
     *
     * @return the current number of elements in the processing queue.
     */
    public int getQueueSize() {
        return executor.getQueue().size();
    }
}
TOP

Related Classes of org.jivesoftware.openfire.Channel

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.