Package org.hornetq.core.server.cluster.impl

Source Code of org.hornetq.core.server.cluster.impl.BroadcastGroupImpl

/*
* Copyright 2009 Red Hat, Inc.
* Red Hat licenses this file to you under the Apache License, version
* 2.0 (the "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*    http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied.  See the License for the specific language governing
* permissions and limitations under the License.
*/

package org.hornetq.core.server.cluster.impl;

import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ScheduledFuture;

import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQBuffers;
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.management.NotificationType;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.server.cluster.BroadcastGroup;
import org.hornetq.core.server.management.Notification;
import org.hornetq.core.server.management.NotificationService;
import org.hornetq.utils.TypedProperties;
import org.hornetq.utils.UUIDGenerator;

/**
* A BroadcastGroupImpl
*
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
*
* Created 15 Nov 2008 09:45:32
*
*/
public class BroadcastGroupImpl implements BroadcastGroup, Runnable
{
   private static final Logger log = Logger.getLogger(BroadcastGroupImpl.class);

   private final String nodeID;

   private final String name;

   private final InetAddress localAddress;

   private final int localPort;

   private final InetAddress groupAddress;

   private final int groupPort;

   private DatagramSocket socket;

   private final List<Pair<TransportConfiguration, TransportConfiguration>> connectorPairs = new ArrayList<Pair<TransportConfiguration, TransportConfiguration>>();

   private boolean started;

   private ScheduledFuture<?> future;

   private boolean active;

   // Each broadcast group has a unique id - we use this to detect when more than one group broadcasts the same node id
   // on the network which would be an error
   private final String uniqueID;

   private NotificationService notificationService;

   /**
    * Broadcast group is bound locally to the wildcard address
    */
   public BroadcastGroupImpl(final String nodeID,
                             final String name,
                             final InetAddress localAddress,
                             final int localPort,
                             final InetAddress groupAddress,
                             final int groupPort,
                             final boolean active) throws Exception
   {
      this.nodeID = nodeID;

      this.name = name;

      this.localAddress = localAddress;

      this.localPort = localPort;

      this.groupAddress = groupAddress;

      this.groupPort = groupPort;

      this.active = active;

      uniqueID = UUIDGenerator.getInstance().generateStringUUID();
   }

   public void setNotificationService(final NotificationService notificationService)
   {
      this.notificationService = notificationService;
   }

   public synchronized void start() throws Exception
   {
      if (started)
      {
         return;
      }

      if (localPort != -1)
      {
         socket = new DatagramSocket(localPort, localAddress);
      }
      else
      {
         if (localAddress != null)
         {
            log.warn("local-bind-address specified for broadcast group but no local-bind-port specified so socket will NOT be bound " +
                     "to a local address/port");
         }
         socket = new DatagramSocket();
      }

      started = true;

      if (notificationService != null)
      {
         TypedProperties props = new TypedProperties();
         props.putSimpleStringProperty(new SimpleString("name"), new SimpleString(name));
         Notification notification = new Notification(nodeID, NotificationType.BROADCAST_GROUP_STARTED, props);
         notificationService.sendNotification(notification);
      }
   }

   public synchronized void stop()
   {
      if (!started)
      {
         return;
      }

      if (future != null)
      {
         future.cancel(false);
      }

      socket.close();

      started = false;

      if (notificationService != null)
      {
         TypedProperties props = new TypedProperties();
         props.putSimpleStringProperty(new SimpleString("name"), new SimpleString(name));
         Notification notification = new Notification(nodeID, NotificationType.BROADCAST_GROUP_STOPPED, props);
         try
         {
            notificationService.sendNotification(notification);
         }
         catch (Exception e)
         {
            BroadcastGroupImpl.log.warn("unable to send notification when broadcast group is stopped", e);
         }
      }

   }

   public synchronized boolean isStarted()
   {
      return started;
   }

   public String getName()
   {
      return name;
   }

   public synchronized void addConnectorPair(final Pair<TransportConfiguration, TransportConfiguration> connectorPair)
   {
      connectorPairs.add(connectorPair);
   }

   public synchronized void removeConnectorPair(final Pair<TransportConfiguration, TransportConfiguration> connectorPair)
   {
      connectorPairs.remove(connectorPair);
   }

   public synchronized int size()
   {
      return connectorPairs.size();
   }

   public synchronized void activate()
   {
      active = true;
   }

   public synchronized void broadcastConnectors() throws Exception
   {
      if (!active)
      {
         return;
      }

      HornetQBuffer buff = HornetQBuffers.dynamicBuffer(4096);

      buff.writeString(nodeID);

      buff.writeString(uniqueID);

      buff.writeInt(connectorPairs.size());

      for (Pair<TransportConfiguration, TransportConfiguration> connectorPair : connectorPairs)
      {
         connectorPair.a.encode(buff);

         if (connectorPair.b != null)
         {
            buff.writeBoolean(true);

            connectorPair.b.encode(buff);
         }
         else
         {
            buff.writeBoolean(false);
         }
      }

      byte[] data = buff.toByteBuffer().array();

      DatagramPacket packet = new DatagramPacket(data, data.length, groupAddress, groupPort);

      socket.send(packet);
   }

   public void run()
   {
      if (!started)
      {
         return;
      }

      try
      {
         broadcastConnectors();
      }
      catch (Exception e)
      {
         BroadcastGroupImpl.log.error("Failed to broadcast connector configs", e);
      }
   }

   public synchronized void setScheduledFuture(final ScheduledFuture<?> future)
   {
      this.future = future;
   }

}
TOP

Related Classes of org.hornetq.core.server.cluster.impl.BroadcastGroupImpl

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.