/**
* @(#)$Id: UnicastDetector.java,v 1.2 2003/01/14 13:40:54 juhalindfors Exp $
*
* This code is PROPRIETARY AND CONFIDENTIAL to Vocalocity, Inc.
* -- DO NOT RE-DISTRIBUTE THIS SOURCE CODE WITHOUT EXPRESS PERMISSION. --
*
* This source code is Copyright (c) 2002 by Vocalocity, Inc.
* All Rights Reserved.
*
* The source code for this program is not published or
* otherwise divested of its trade secrets, irrespective
* of what has been deposited with the US Copyright Office.
*
*/
package org.jboss.mx.remote.discovery.unicast;
import org.jboss.mx.remote.discovery.AbstractDetector;
import org.jboss.mx.remote.discovery.DetectionNotification;
import org.jboss.mx.remote.connector.ConnectorFactory;
import org.jboss.mx.remote.connector.socket.SocketConnector;
import org.jboss.mx.remote.JMXUtil;
import org.jboss.mx.util.SerializationHelper;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import javax.management.MBeanServerFactory;
import java.util.*;
import java.net.InetAddress;
import java.net.Socket;
import java.net.ServerSocket;
import java.net.DatagramPacket;
import java.io.OutputStream;
import java.io.InputStream;
import java.io.BufferedOutputStream;
import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
/**
* UnicastDetector is an implementation of the UnicastDetectorMBean MBean that uses 1..n of
* unicast locations (IP Address and port) to receive DetectionNotifications. The UnicastDetector
* will also publish Detections on the <tt>publishPort</tt> as well so that remote UnicastDetectors
* can use this same Detector instance as a Detector.
* @author <a href="mailto:jhaynie@vocalocity.net">Jeff Haynie</a>
* @version $Revision: 1.2 $
*/
public class UnicastDetector extends AbstractDetector implements UnicastDetectorMBean
{
private static final String TYPE = "unicast";
private int localport=DEFAULT_PUBLISH_PORT;
private List locations=Collections.synchronizedList(new ArrayList(2));
private LinkedQueue detections=new LinkedQueue();
public UnicastDetector ()
{
this.type = TYPE;
}
/**
* Sub-classes should override this method to provide
* custum 'start' logic.
*
* <p>This method is empty, and is provided for convenience
* when concrete service classes do not need to perform
* anything specific for this state change.
*/
protected void startService() throws Exception
{
if (this.localport!=-1)
{
listener=new Listener();
listener.start();
}
super.startService();
}
/**
* Sub-classes should override this method to provide
* custum 'stop' logic.
*
* <p>This method is empty, and is provided for convenience
* when concrete service classes do not need to perform
* anything specific for this state change.
*/
protected void stopService() throws Exception
{
if (listener!=null)
{
listener.running=false;
try
{
listener.socket.close();
}
catch (Exception ig) { }
listener.socket=null;
listener.interrupt();
listener=null;
}
super.stopService();
}
private Listener listener;
private final class Listener extends Thread
{
boolean running=true;
ServerSocket socket;
Listener ()
throws Exception
{
setName("UnicastDetector-Listener");
setPriority(Thread.NORM_PRIORITY-1);
setDaemon(false);
socket = new ServerSocket(localport);
}
public void run ()
{
while(running)
{
try
{
final Socket client=socket.accept();
if (logMethod && log.isDebugEnabled())
{
log.debug("<< accepting connection from: "+client);
}
new Thread()
{
public void run ()
{
try
{
byte buf[]=new byte[2096];
if (logMethod && log.isDebugEnabled())
{
log.debug("<< reading ... "+client);
}
int count = client.getInputStream().read(buf);
if (count>0)
{
// just write to our queue
detections.put(SerializationHelper.deserialize(buf));
}
else
{
if (logMethod && log.isDebugEnabled())
{
log.debug("<< NO DATA READ FROM CLIENT");
}
}
}
catch (Exception ex)
{
if (logMethod && log.isDebugEnabled())
{
log.debug("Exception reading buffer from remote: "+socket,ex);
}
}
finally
{
if (socket!=null)
{
try
{
socket.close();
}
catch (Exception ig) { }
socket=null;
}
}
}
}.start();
}
catch (Exception ex)
{
if (ex instanceof InterruptedException)
{
break;
}
}
finally
{
try
{
sleep(250);
}
catch (Exception ig) { }
}
}
}
}
private final class Location
{
InetAddress addr;
int port;
Socket socket;
OutputStream output;
Location (InetAddress addr, int port)
{
this.addr=addr;
this.port=port;
}
void write (byte buf[])
{
try
{
if (socket==null)
{
open();
}
if (output!=null)
{
output.write(buf);
output.flush();
}
}
catch (Exception ex)
{
if (logMethod && log.isDebugEnabled())
{
log.debug("Exception writing buffer",ex);
}
}
}
void open ()
{
try
{
socket=new Socket(addr,port);
output = new BufferedOutputStream(socket.getOutputStream());
}
catch (Exception ex)
{
if (logMethod && log.isDebugEnabled())
{
log.debug("Error creating listener: "+addr+":"+port,ex);
}
socket=null;
}
}
void close ()
{
try
{
output.flush();
socket.close();
}
catch (Exception ex) {}
finally
{
socket=null;
}
}
}
/**
* set the port tihs detector will accept unicast detections via
*
* @param port
*/
public void setPublishPort(int port)
{
Integer old = new Integer(this.localport);
this.localport=port;
if (log.isDebugEnabled())
{
log.debug("Setting publish port from: "+old+" to: "+port);
}
fireAttributeChange("detector.publish.port","PublishPort",Integer.class,old,new Integer(port));
}
/**
* get the port that unicast detections are accepted
*
* @return port number
*/
public int getPublisherPort()
{
return localport;
}
/**
* add a location
*
* @param ip
* @param port
*/
public void addLocation(InetAddress ip, int port)
{
locations.add(new Location(ip,port));
}
/**
* remove a location
*
* @param ip
* @param port
*/
public void removeLocation(InetAddress ip, int port)
{
Iterator i=locations.iterator();
while(i.hasNext())
{
Location l=(Location)i.next();
if (l.addr.equals(ip) && l.port==port)
{
l.close();
i.remove();
l=null;
}
}
}
/**
* set a space-separated locations in the format: <tt>ip:port</tt>
*
* @param propertiesList
*/
public void setLocations(String propertiesList)
{
StringTokenizer tok=new StringTokenizer(propertiesList," ,");
while(tok.hasMoreTokens())
{
String token=tok.nextToken();
int colon = token.indexOf(":");
if (colon>0)
{
try
{
InetAddress ip = InetAddress.getByName(token.substring(0,colon));
int port = Integer.parseInt(token.substring(colon+1));
addLocation(ip,port);
}
catch (Exception ex)
{
log.warn("Couldn't create Location: "+token,ex);
}
}
}
}
/**
* get a space-separate locations list in the format: <tt>ip:port[,ip:port]</tt>
*
* @return location list string
*/
public String getLocations()
{
StringBuffer buf=new StringBuffer();
Iterator i = locations.iterator();
while(i.hasNext())
{
Location l = (Location)i.next();
buf.append(l.addr.getHostAddress()+":"+l.port);
if (i.hasNext())
{
buf.append(" ");
}
}
return buf.toString();
}
/**
* multicast publish the detection msg
*
* @param msg
* @throws Exception
*/
protected void publish (DetectionNotification msg)
throws Exception
{
byte buf[] = SerializationHelper.serialize(msg);
synchronized(locations)
{
Iterator i = locations.iterator();
while(i.hasNext())
{
Location l=(Location)i.next();
try
{
if (l.socket==null)
{
l.open();
}
if (logMethod && log.isDebugEnabled())
{
log.debug(">> writing to client... "+l.socket);
}
l.write(buf);
}
catch (Exception ex)
{
// ignore for now
if (logMethod&&log.isDebugEnabled())
{
log.debug("Error writing detection: "+msg,ex);
}
}
}
}
}
/**
* receive multicast detection
*
* @return detection notification
* @throws Exception
*/
protected DetectionNotification receiveDetection ()
throws Exception
{
try
{
// block until we receive a new notification
return (DetectionNotification)detections.take();
}
catch (InterruptedException ie)
{
return null;
}
}
public static void main (String args[])
{
try
{
org.apache.log4j.BasicConfigurator.configure();
org.apache.log4j.Category.getRoot().setLevel(org.apache.log4j.Level.DEBUG);
UnicastDetector detector=new UnicastDetector();
detector.setPublishPort(-1);
detector.createService();
SocketConnector connector=new SocketConnector();
//detector.addLocation(InetAddress.getLocalHost(),2105);
//detector.addLocation(InetAddress.getLocalHost(),2102);
MBeanServer server=MBeanServerFactory.createMBeanServer();
server.registerMBean(connector,new ObjectName("jmx.remoting:type=Connector,transport=socket"));
server.registerMBean(detector,new ObjectName("jmx.remoting:type=Detector,transport=unicast"));
connector.start();
detector.start();
}
catch (Exception ex)
{
ex.printStackTrace();
}
}
}