Package org.jboss.mx.remote.discovery.unicast

Source Code of org.jboss.mx.remote.discovery.unicast.UnicastDetector

/**
* @(#)$Id: UnicastDetector.java,v 1.2 2003/01/14 13:40:54 juhalindfors Exp $
*
* This code is PROPRIETARY AND CONFIDENTIAL to Vocalocity, Inc.
* -- DO NOT RE-DISTRIBUTE THIS SOURCE CODE WITHOUT EXPRESS PERMISSION. --
*
* This source code is Copyright (c) 2002 by Vocalocity, Inc.
* All Rights Reserved.
*
* The source code for this program is not published or
* otherwise divested of its trade secrets, irrespective
* of what has been deposited with the US Copyright Office.
*
*/
package org.jboss.mx.remote.discovery.unicast;

import org.jboss.mx.remote.discovery.AbstractDetector;
import org.jboss.mx.remote.discovery.DetectionNotification;
import org.jboss.mx.remote.connector.ConnectorFactory;
import org.jboss.mx.remote.connector.socket.SocketConnector;
import org.jboss.mx.remote.JMXUtil;
import org.jboss.mx.util.SerializationHelper;

import javax.management.MBeanServer;
import javax.management.ObjectName;
import javax.management.MBeanServerFactory;
import java.util.*;
import java.net.InetAddress;
import java.net.Socket;
import java.net.ServerSocket;
import java.net.DatagramPacket;
import java.io.OutputStream;
import java.io.InputStream;
import java.io.BufferedOutputStream;

import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;

/**
* UnicastDetector is an implementation of the UnicastDetectorMBean MBean that uses 1..n of
* unicast locations (IP Address and port) to receive DetectionNotifications.  The UnicastDetector
* will also publish Detections on the <tt>publishPort</tt> as well so that remote UnicastDetectors
* can use this same Detector instance as a Detector.
* @author <a href="mailto:jhaynie@vocalocity.net">Jeff Haynie</a>
* @version $Revision: 1.2 $
*/
public class UnicastDetector extends AbstractDetector implements UnicastDetectorMBean
{
    private static final String TYPE = "unicast";
    private int localport=DEFAULT_PUBLISH_PORT;
    private List locations=Collections.synchronizedList(new ArrayList(2));
    private LinkedQueue detections=new LinkedQueue();

    public UnicastDetector ()
    {
        this.type = TYPE;
    }
    /**
     * Sub-classes should override this method to provide
     * custum 'start' logic.
     *
     * <p>This method is empty, and is provided for convenience
     *    when concrete service classes do not need to perform
     *    anything specific for this state change.
     */
    protected void startService() throws Exception
    {
        if (this.localport!=-1)
        {
            listener=new Listener();
            listener.start();
        }
        super.startService();
    }


    /**
     * Sub-classes should override this method to provide
     * custum 'stop' logic.
     *
     * <p>This method is empty, and is provided for convenience
     *    when concrete service classes do not need to perform
     *    anything specific for this state change.
     */
    protected void stopService() throws Exception
    {
        if (listener!=null)
        {
            listener.running=false;
            try
            {
                listener.socket.close();
            }
            catch (Exception ig) { }
            listener.socket=null;
            listener.interrupt();
            listener=null;
        }
        super.stopService();
    }

    private Listener listener;

    private final class Listener extends Thread
    {
        boolean running=true;
        ServerSocket socket;

        Listener ()
            throws Exception
        {
            setName("UnicastDetector-Listener");
            setPriority(Thread.NORM_PRIORITY-1);
            setDaemon(false);
            socket = new ServerSocket(localport);
        }
        public void run ()
        {
            while(running)
            {
                try
                {
                    final Socket client=socket.accept();
                    if (logMethod && log.isDebugEnabled())
                    {
                        log.debug("<< accepting connection from: "+client);
                    }
                    new Thread()
                    {
                         public void run ()
                         {
                             try
                             {
                                 byte buf[]=new byte[2096];
                                 if (logMethod && log.isDebugEnabled())
                                 {
                                     log.debug("<< reading ... "+client);
                                 }
                                 int count = client.getInputStream().read(buf);
                                 if (count>0)
                                 {
                                     // just write to our queue
                                     detections.put(SerializationHelper.deserialize(buf));
                                 }
                                 else
                                 {
                                     if (logMethod && log.isDebugEnabled())
                                     {
                                         log.debug("<< NO DATA READ FROM CLIENT");
                                     }
                                 }
                             }
                             catch (Exception ex)
                             {
                                 if (logMethod && log.isDebugEnabled())
                                 {
                                     log.debug("Exception reading buffer from remote: "+socket,ex);
                                 }
                             }
                             finally
                             {
                                 if (socket!=null)
                                 {
                                     try
                                     {
                                        socket.close();
                                     }
                                     catch (Exception ig) { }
                                     socket=null;
                                 }
                             }
                         }
                    }.start();
                }
                catch (Exception ex)
                {
                    if (ex instanceof InterruptedException)
                    {
                        break;
                    }
                }
                finally
                {
                    try
                    {
                        sleep(250);
                    }
                    catch (Exception ig) { }
                }
            }
        }
    }
    private final class Location
    {
        InetAddress addr;
        int port;
        Socket socket;
        OutputStream output;

        Location (InetAddress addr, int port)
        {
          this.addr=addr;
          this.port=port;
        }
        void write (byte buf[])
        {
            try
            {
                if (socket==null)
                {
                    open();
                }
                if (output!=null)
                {
                    output.write(buf);
                    output.flush();
                }
            }
            catch (Exception ex)
            {
                if (logMethod && log.isDebugEnabled())
                {
                    log.debug("Exception writing buffer",ex);
                }
            }
        }
        void open ()
        {
            try
            {
                socket=new Socket(addr,port);
                output = new BufferedOutputStream(socket.getOutputStream());
            }
            catch (Exception ex)
            {
                if (logMethod && log.isDebugEnabled())
                {
                    log.debug("Error creating listener: "+addr+":"+port,ex);
                }
                socket=null;
            }
        }
        void close ()
        {
            try
            {
                output.flush();
                socket.close();
            }
            catch (Exception ex) {}
            finally
            {
                socket=null;
            }
        }
    }
    /**
     * set the port tihs detector will accept unicast detections via
     *
     * @param port
     */
    public void setPublishPort(int port)
    {
        Integer old = new Integer(this.localport);
        this.localport=port;
        if (log.isDebugEnabled())
        {
            log.debug("Setting publish port from: "+old+" to: "+port);
        }
        fireAttributeChange("detector.publish.port","PublishPort",Integer.class,old,new Integer(port));
    }

    /**
     * get the port that unicast detections are accepted
     *
     * @return port number
     */
    public int getPublisherPort()
    {
        return localport;
    }

    /**
     * add a location
     *
     * @param ip
     * @param port
     */
    public void addLocation(InetAddress ip, int port)
    {
        locations.add(new Location(ip,port));
    }

    /**
     * remove a location
     *
     * @param ip
     * @param port
     */
    public void removeLocation(InetAddress ip, int port)
    {
        Iterator i=locations.iterator();
        while(i.hasNext())
        {
            Location l=(Location)i.next();
            if (l.addr.equals(ip) && l.port==port)
            {
                l.close();
                i.remove();
                l=null;
            }
        }
    }

    /**
     * set a space-separated locations in the format: <tt>ip:port</tt>
     *
     * @param propertiesList
     */
    public void setLocations(String propertiesList)
    {
        StringTokenizer tok=new StringTokenizer(propertiesList," ,");
        while(tok.hasMoreTokens())
        {
            String token=tok.nextToken();
            int colon = token.indexOf(":");
            if (colon>0)
            {
                try
                {
                    InetAddress ip = InetAddress.getByName(token.substring(0,colon));
                    int port = Integer.parseInt(token.substring(colon+1));
                    addLocation(ip,port);
                }
                catch (Exception ex)
                {
                    log.warn("Couldn't create Location: "+token,ex);
                }
            }
        }
    }

    /**
     * get a space-separate locations list in the format: <tt>ip:port[,ip:port]</tt>
     *
     * @return location list string
     */
    public String getLocations()
    {
        StringBuffer buf=new StringBuffer();
        Iterator i = locations.iterator();
        while(i.hasNext())
        {
            Location l = (Location)i.next();
            buf.append(l.addr.getHostAddress()+":"+l.port);
            if (i.hasNext())
            {
                buf.append(" ");
            }
        }
        return buf.toString();
    }


    /**
     * multicast publish the detection msg
     *
     * @param msg
     * @throws Exception
     */
    protected void publish (DetectionNotification msg)
        throws Exception
    {
        byte buf[] = SerializationHelper.serialize(msg);
        synchronized(locations)
        {
            Iterator i = locations.iterator();
            while(i.hasNext())
            {
                Location l=(Location)i.next();
                try
                {
                    if (l.socket==null)
                    {
                        l.open();
                    }
                    if (logMethod && log.isDebugEnabled())
                    {
                        log.debug(">> writing to client... "+l.socket);
                    }
                    l.write(buf);
                }
                catch (Exception ex)
                {
                    // ignore for now
                    if (logMethod&&log.isDebugEnabled())
                    {
                        log.debug("Error writing detection: "+msg,ex);
                    }
                }
            }
        }
    }

    /**
     * receive multicast detection
     *
     * @return detection notification
     * @throws Exception
     */
    protected DetectionNotification receiveDetection ()
        throws Exception
    {
        try
        {
            // block until we receive a new notification
            return (DetectionNotification)detections.take();
        }
        catch (InterruptedException ie)
        {
            return null;
        }
    }

    public static void main (String args[])
    {
        try
        {
            org.apache.log4j.BasicConfigurator.configure();
            org.apache.log4j.Category.getRoot().setLevel(org.apache.log4j.Level.DEBUG);
            UnicastDetector detector=new UnicastDetector();
            detector.setPublishPort(-1);
            detector.createService();
            SocketConnector connector=new SocketConnector();
            //detector.addLocation(InetAddress.getLocalHost(),2105);
            //detector.addLocation(InetAddress.getLocalHost(),2102);
            MBeanServer server=MBeanServerFactory.createMBeanServer();
            server.registerMBean(connector,new ObjectName("jmx.remoting:type=Connector,transport=socket"));
            server.registerMBean(detector,new ObjectName("jmx.remoting:type=Detector,transport=unicast"));
            connector.start();
            detector.start();
        }
        catch (Exception ex)
        {
            ex.printStackTrace();
        }
    }
}
TOP

Related Classes of org.jboss.mx.remote.discovery.unicast.UnicastDetector

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.