Package com.sun.sgs.test.impl.service.channel

Source Code of com.sun.sgs.test.impl.service.channel.TestChannelServiceImpl$ManagedChannelListener

/*
* Copyright 2007-2010 Sun Microsystems, Inc.
*
* This file is part of Project Darkstar Server.
*
* Project Darkstar Server is free software: you can redistribute it
* and/or modify it under the terms of the GNU General Public License
* version 2 as published by the Free Software Foundation and
* distributed hereunder to you.
*
* Project Darkstar Server is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program.  If not, see <http://www.gnu.org/licenses/>.
*
* --
*/

package com.sun.sgs.test.impl.service.channel;

import com.sun.sgs.app.AppContext;
import com.sun.sgs.app.Channel;
import com.sun.sgs.app.ChannelListener;
import com.sun.sgs.app.ClientSession;
import com.sun.sgs.app.DataManager;
import com.sun.sgs.app.Delivery;
import com.sun.sgs.app.ManagedObject;
import com.sun.sgs.app.ManagedReference;
import com.sun.sgs.app.ResourceUnavailableException;
import com.sun.sgs.app.TransactionNotActiveException;
import com.sun.sgs.impl.service.channel.ChannelServiceImpl;
import com.sun.sgs.impl.service.session.ClientSessionWrapper;
import com.sun.sgs.impl.util.AbstractService.Version;
import com.sun.sgs.test.util.ConfigurableNodePolicy;
import com.sun.sgs.test.util.SgsTestNode;
import com.sun.sgs.test.util.TestAbstractKernelRunnable;
import com.sun.sgs.tools.test.FilteredNameRunner;
import com.sun.sgs.tools.test.IntegrationTest;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.Set;

import org.junit.Test;
import org.junit.runner.RunWith;

@RunWith(FilteredNameRunner.class)
public class TestChannelServiceImpl extends AbstractChannelServiceTest {
   
    /** Constructs a test instance. */
    public TestChannelServiceImpl() throws Exception  {
    }

    // -- Test constructor --
    @Test
    public void testConstructorNullProperties() throws Exception {
  try {
      new ChannelServiceImpl(null, serverNode.getSystemRegistry(),
           serverNode.getProxy());
      fail("Expected NullPointerException");
  } catch (NullPointerException e) {
      System.err.println(e);
  }
    }

    @Test
    public void testConstructorNullComponentRegistry() throws Exception {
  try {
      new ChannelServiceImpl(serviceProps, null, serverNode.getProxy());
      fail("Expected NullPointerException");
  } catch (NullPointerException e) {
      System.err.println(e);
  }
    }

    @Test
    public void testConstructorNullTransactionProxy() throws Exception {
  try {
      new ChannelServiceImpl(serviceProps, serverNode.getSystemRegistry(),
           null);
      fail("Expected NullPointerException");
  } catch (NullPointerException e) {
      System.err.println(e);
  }
    }

    @Test
    public void testConstructorNoAppName() throws Exception {
  try {
      new ChannelServiceImpl(
    new Properties(), serverNode.getSystemRegistry(),
    serverNode.getProxy());
      fail("Expected IllegalArgumentException");
  } catch (IllegalArgumentException e) {
      System.err.println(e);
  }
    }

    @Test
    public void testConstructedVersion() throws Exception {
  txnScheduler.runTask(new TestAbstractKernelRunnable() {
    public void run() {
        Version version = (Version)
      dataService.getServiceBinding(VERSION_KEY);
        if (version.getMajorVersion() != MAJOR_VERSION ||
      version.getMinorVersion() != MINOR_VERSION)
        {
      fail("Expected service version (major=" +
           MAJOR_VERSION + ", minor=" + MINOR_VERSION +
           "), got:" + version);
        }
    }}, taskOwner);
    }

    @Test
    public void testConstructorWithCurrentVersion() throws Exception {
  txnScheduler.runTask(new TestAbstractKernelRunnable() {
    public void run() {
        Version version = new Version(MAJOR_VERSION, MINOR_VERSION);
        dataService.setServiceBinding(VERSION_KEY, version);
    }}, taskOwner);

  ChannelServiceImpl newChannelService = null;
  try {
      newChannelService =
    new ChannelServiceImpl(serviceProps,
               serverNode.getSystemRegistry(),
               serverNode.getProxy());
  } finally {
      if (newChannelService != null) {
    newChannelService.shutdown();
      }
  }
    }

    @Test
    public void testConstructorWithMajorVersionMismatch() throws Exception {
  txnScheduler.runTask(new TestAbstractKernelRunnable() {
    public void run() {
        Version version =
      new Version(MAJOR_VERSION + 1, MINOR_VERSION);
        dataService.setServiceBinding(VERSION_KEY, version);
    }}, taskOwner);

  try {
      new ChannelServiceImpl(serviceProps, serverNode.getSystemRegistry(),
           serverNode.getProxy());
      fail("Expected IllegalStateException");
  } catch (IllegalStateException e) {
      System.err.println(e);
  }
    }

    @Test
    public void testConstructorWithMinorVersionMismatch() throws Exception {
  txnScheduler.runTask(new TestAbstractKernelRunnable() {
    public void run() {
        Version version =
      new Version(MAJOR_VERSION, MINOR_VERSION + 1);
        dataService.setServiceBinding(VERSION_KEY, version);
    }}, taskOwner);

  try {
      new ChannelServiceImpl(serviceProps, serverNode.getSystemRegistry(),
           serverNode.getProxy());
      fail("Expected IllegalStateException");
  } catch (IllegalStateException e) {
      System.err.println(e);
  }
    }

    // -- Test createChannel --

    @Test
    public void testCreateChannelNullName() throws Exception {
  txnScheduler.runTask(new TestAbstractKernelRunnable() {
      public void run() {
    try {
        channelService.createChannel(
      null, new DummyChannelListener(), Delivery.RELIABLE);
        fail("Expected NullPointerException");
    catch (NullPointerException e) {
        System.err.println(e);
    }
      }}, taskOwner);
    }

    @Test
    public void testCreateChannelNullListener() throws Exception {
  txnScheduler.runTask(new TestAbstractKernelRunnable() {
      public void run() {
    try {
        channelService.createChannel(
      "foo", null, Delivery.RELIABLE);
        System.err.println("null listener allowed");
    catch (NullPointerException e) {
        fail("Got NullPointerException");
    }
      }}, taskOwner);
    }

    @Test
    public void testCreateChannelNonSerializableListener() throws Exception {
  txnScheduler.runTask(new TestAbstractKernelRunnable() {
      public void run() {
    try {
        channelService.createChannel(
      "foo", new NonSerializableChannelListener(),
      Delivery.RELIABLE);
        fail("Expected IllegalArgumentException");
    catch (IllegalArgumentException e) {
        System.err.println(e);
    }
      }}, taskOwner);
    }

    @Test
    public void testCreateChannelNoTxn() throws Exception {
  try {
      channelService.createChannel("x", null, Delivery.RELIABLE);
      fail("Expected TransactionNotActiveException");
  } catch (TransactionNotActiveException e) {
      System.err.println(e);
  }
    }

    @Test
    public void testChannelToStringNoTxn() throws Exception {
  final List<Channel> channel = new ArrayList<Channel>();
  txnScheduler.runTask(new TestAbstractKernelRunnable() {
      public void run() {
    channel.add(
        channelService.createChannel(
      "test", new DummyChannelListener(), Delivery.RELIABLE));
    System.err.println(channel.get(0).toString());
      }}, taskOwner);
  System.err.println(channel.get(0).toString());
    }
   
    // -- Test Channel serialization --

    @Test
    public void testChannelWriteReadObject() throws Exception {
  txnScheduler.runTask(new TestAbstractKernelRunnable() {
      public void run() throws Exception {
    Channel savedChannel =
        channelService.createChannel("x", null, Delivery.RELIABLE);
    ByteArrayOutputStream bout = new ByteArrayOutputStream();
    ObjectOutputStream out = new ObjectOutputStream(bout);
    out.writeObject(savedChannel);
    out.flush();
    out.close();
   
    ByteArrayInputStream bin =
        new ByteArrayInputStream(bout.toByteArray());
    ObjectInputStream in = new ObjectInputStream(bin);
    Channel channel = (Channel) in.readObject();

    if (!savedChannel.equals(channel)) {
        fail("Expected channel: " + savedChannel +
       ", got " + channel);
    }
    System.err.println("Channel {write,read}Object successful");
      }
  }, taskOwner);
    }
   
    // -- Test Channel.getName --

    @Test
    public void testChannelGetNameNoTxn() throws Exception {
  Channel channel = createChannel();
  try {
      channel.getName();
      fail("Expected TransactionNotActiveException");
  } catch (TransactionNotActiveException e) {
      System.err.println(e);
  }
    }

    @Test
    public void testChannelGetNameMismatchedTxn() throws Exception {
  final Channel channel = createChannel();
  txnScheduler.runTask(new TestAbstractKernelRunnable() {
      public void run() {
    try {
        channel.getName();
        fail("Expected TransactionNotActiveException");
    } catch (TransactionNotActiveException e) {
        System.err.println(e);
    }
      }
  }, taskOwner);
    }

    @Test
    public void testChannelGetName() throws Exception {
  txnScheduler.runTask(new TestAbstractKernelRunnable() {
      public void run() {
    String name = "foo";
    Channel channel = channelService.createChannel(
        name, null, Delivery.RELIABLE);
    if (!name.equals(channel.getName())) {
        fail("Expected: " + name + ", got: " + channel.getName());
    }
      }
  }, taskOwner);
    }

    @Test
    public void testChannelGetNameClosedChannel() throws Exception {
  txnScheduler.runTask(new TestAbstractKernelRunnable() {
      public void run() {
    String name = "foo";
    Channel channel = channelService.createChannel(
        name, null, Delivery.RELIABLE);
    dataService.removeObject(channel);
    try {
        channel.getName();
        fail("Expected IllegalStateException");
    } catch (IllegalStateException e) {
        System.err.println(e);
    }
      }
  }, taskOwner);
    }

    // -- Test Channel.getDelivery --

    @Test
    public void testChannelGetDeliveryNoTxn() throws Exception {
  Channel channel = createChannel();
  try {
      channel.getDelivery();
      fail("Expected TransactionNotActiveException");
  } catch (TransactionNotActiveException e) {
      System.err.println(e);
  }
    }

    @Test
    public void testChannelGetDeliveryMismatchedTxn() throws Exception {
  // TBD: should the implementation work this way?
  final Channel channel = createChannel();
  txnScheduler.runTask(new TestAbstractKernelRunnable() {
      public void run() {
    try {
        channel.getDelivery();
        fail("Expected TransactionNotActiveException");
    } catch (TransactionNotActiveException e) {
        System.err.println(e);
    }
      }
  }, taskOwner);
    }

    @Test
    public void testChannelGetDelivery() throws Exception {
  txnScheduler.runTask(new TestAbstractKernelRunnable() {
      public void run() {
    for (Delivery delivery : Delivery.values()) {
        Channel channel = channelService.createChannel(
      delivery.toString(), null, delivery);
        if (!delivery.equals(channel.getDelivery())) {
      fail("Expected: " + delivery + ", got: " +
           channel.getDelivery());
        }
    }
    System.err.println("Delivery requirements are equal");
      }
  }, taskOwner);
    }

    @Test
    public void testChannelGetDeliveryClosedChannel() throws Exception {
  txnScheduler.runTask(new TestAbstractKernelRunnable() {
      public void run() {
    for (Delivery delivery : Delivery.values()) {
        Channel channel = channelService.createChannel(
      delivery.toString(), null, delivery);
        dataService.removeObject(channel);
        try {
      channel.getDelivery();
      fail("Expected IllegalStateException");
        } catch (IllegalStateException e) {
      System.err.println(e);
        }
    }
    System.err.println("Got delivery requirement on close channel");
      }
  }, taskOwner);
    }

    // -- Test Channel.hasSessions --

    @Test
    public void testChannelHasSessionsNoTxn() throws Exception {
  Channel channel = createChannel();
  try {
      channel.hasSessions();
      fail("Expected TransactionNotActiveException");
  } catch (TransactionNotActiveException e) {
      System.err.println(e);
  }
    }

    @Test
    public void testChannelHasSessionsMismatchedTxn() throws Exception {
  final Channel channel = createChannel();
  txnScheduler.runTask(new TestAbstractKernelRunnable() {
      public void run() {
    try {
        channel.hasSessions();
        fail("Expected TransactionNotActiveException");
    } catch (TransactionNotActiveException e) {
        System.err.println(e);
    }
      }
  }, taskOwner);
    }

    @Test
    public void testChannelHasSessionsNoSessionsJoined() throws Exception {
  txnScheduler.runTask(new TestAbstractKernelRunnable() {
      public void run() {
    String name = "foo";
    Channel channel = channelService.createChannel(
        name, null, Delivery.RELIABLE);
    if (channel.hasSessions()) {
        fail("Expected no sessions joined");
    }
    System.err.println("no sessions joined");
      }
  }, taskOwner);
    }

    @Test
    public void testChannelHasSessionsWithSessionsJoined() throws Exception {
  final String channelName = "foo";
  createChannel(channelName);
  ClientGroup group = new ClientGroup(someUsers);
  try {
      joinUsers("foo", someUsers);
      txnScheduler.runTask(new TestAbstractKernelRunnable() {
    public void run() {
        Channel channel = channelService.getChannel(channelName);
        if (! channel.hasSessions()) {
      fail("Expected sessions joined");
        }
    }
    }, taskOwner);
  } finally {
      group.disconnect(false);
  }
    }

    @Test
    public void testChannelHasSessionsClosedChannel() throws Exception {
  txnScheduler.runTask(new TestAbstractKernelRunnable() {
      public void run() {
    String name = "foo";
    Channel channel = channelService.createChannel(
        name, null, Delivery.RELIABLE);
    dataService.removeObject(channel);
    try {
        channel.hasSessions();
        fail("Expected IllegalStateException");
    } catch (IllegalStateException e) {
        System.err.println(e);
    }
      }
  }, taskOwner);
    }

    // -- Test Channel.getSessions --

    @Test
    public void testChannelGetSessionsNoTxn() throws Exception {
  Channel channel = createChannel();
  try {
      channel.getSessions();
      fail("Expected TransactionNotActiveException");
  } catch (TransactionNotActiveException e) {
      System.err.println(e);
  }
    }

    @Test
    public void testChannelGetSessionsMismatchedTxn() throws Exception {
  final Channel channel = createChannel();
  txnScheduler.runTask(new TestAbstractKernelRunnable() {
      public void run() {
    try {
        channel.getSessions();
        fail("Expected TransactionNotActiveException");
    } catch (TransactionNotActiveException e) {
        System.err.println(e);
    }
      }
  }, taskOwner);
    }

    @Test
    public void testChannelGetSessionsNoSessionsJoined() throws Exception {
  txnScheduler.runTask(new TestAbstractKernelRunnable() {
      public void run() {
    String name = "foo";
    Channel channel = channelService.createChannel(
        name, null, Delivery.RELIABLE);
    if (channel.getSessions().hasNext()) {
        fail("Expected no sessions joined");
    }
    System.err.println("no sessions joined");
      }
  }, taskOwner);
    }

    @Test
    public void testChannelGetSessionsWithSessionsJoined() throws Exception {
  final String channelName = "foo";
  createChannel(channelName);
  ClientGroup group = new ClientGroup(someUsers);
  try {
      joinUsers("foo", someUsers);
      checkUsersJoined("foo", someUsers);
      txnScheduler.runTask(new TestAbstractKernelRunnable() {
    public void run() {
        Channel channel = channelService.getChannel(channelName);
        List<String> users =
      new ArrayList<String>(Arrays.asList(someUsers));
        Iterator<ClientSession> iter = channel.getSessions();
        while (iter.hasNext()) {
      ClientSession session = iter.next();
      if (!(session instanceof ClientSessionWrapper)) {
          fail("session not ClientSessionWrapper instance: " +
         session);
      }
      String name = session.getName();
      if (! users.contains(name)) {
          fail("unexpected channel member: " + name);
      } else {
          System.err.println("getSessions includes: " + name);
          users.remove(name);
      }
        }
        if (! users.isEmpty()) {
      fail("Expected getSessions to include: " + users);
        }
    }}, taskOwner);
  } finally {
      group.disconnect(false);
  }
    }

    @Test
    public void testChannelGetSessionsMultipleNodes() throws Exception {
  addNodes(2);
  ConfigurableNodePolicy.setRoundRobinPolicy();
  testChannelGetSessionsWithSessionsJoined();
    }

    @Test
    public void testChannelGetSessionsClosedChannel() throws Exception {
  txnScheduler.runTask(new TestAbstractKernelRunnable() {
      public void run() {
    String name = "foo";
    Channel channel = channelService.createChannel(
        name, null, Delivery.RELIABLE);
    dataService.removeObject(channel);
    try {
        channel.getSessions();
        fail("Expected IllegalStateException");
    } catch (IllegalStateException e) {
        System.err.println(e);
    }
      }
  }, taskOwner);
    }

    // -- Test Channel.join --

    @Test
    public void testChannelJoinNoTxn() throws Exception {
  Channel channel = createChannel();
  DummyClient client = newClient();
  try {
      channel.join(client.getSession());
      fail("Expected TransactionNotActiveException");
  } catch (TransactionNotActiveException e) {
      System.err.println(e);
  } finally {
      if (client != null) {
    client.disconnect();
      }
  }
    }

    @Test
    public void testChannelJoinClosedChannel() throws Exception {
  final DummyClient client = newClient();
  try {
      txnScheduler.runTask(new TestAbstractKernelRunnable() {
    public void run() throws Exception {
        Channel channel =
      channelService.createChannel(
          "x", null, Delivery.RELIABLE);
        dataService.removeObject(channel);
        try {
      channel.join(client.getSession());
      fail("Expected IllegalStateException");
        } catch (IllegalStateException e) {
      System.err.println(e);
        }
    }
    }, taskOwner);
     
  } finally {
      if (client != null) {
    client.disconnect();
      }
  }
    }

    @Test
    public void testChannelJoinNullClientSession() throws Exception {
  txnScheduler.runTask(new TestAbstractKernelRunnable() {
      public void run() {
    Channel channel =
        channelService.createChannel("x", null, Delivery.RELIABLE);
    try {
        channel.join((ClientSession) null);
        fail("Expected NullPointerException");
    } catch (NullPointerException e) {
        System.err.println(e);
    }
      }
  }, taskOwner);
    }

    @Test
    @IntegrationTest
    public void testChannelJoin() throws Exception {
  String channelName = "joinTest";
  ClientGroup group = new ClientGroup(someUsers);
  Thread.sleep(1000);
  printServiceBindings("before channel create");
  int count = getObjectCount();
  createChannel(channelName);
 
  try {
      joinUsers(channelName, someUsers);
      checkUsersJoined(channelName, someUsers);
      printServiceBindings("before close");
      closeChannel(channelName);
      Thread.sleep(1000);
      printServiceBindings("after close");
      assertEquals(count, getObjectCount());
  } finally {
      group.disconnect(false);
  }
    }

    // -- Test Channel.leave --

    @Test
    public void testChannelLeaveNoTxn() throws Exception {
  Channel channel = createChannel();
  DummyClient client = newClient();
  try {
      channel.leave(client.getSession());
      fail("Expected TransactionNotActiveException");
  } catch (TransactionNotActiveException e) {
      System.err.println(e);
  } finally {
      if (client != null) {
    client.disconnect();
      }
  }
    }

    @Test
    public void testChannelLeaveMismatchedTxn() throws Exception {
  // TBD: should the implementation work this way?
  final String channelName = "test";
  final Channel channel = createChannel(channelName);
  final DummyClient client = newClient();
  try {
      txnScheduler.runTask(new TestAbstractKernelRunnable() {
    public void run() throws Exception {
        try {
      channel.leave(client.getSession());
      fail("Expected TransactionNotActiveException");
        } catch (TransactionNotActiveException e) {
      System.err.println(e);
        }
    }
    }, taskOwner);
  } finally {
      if (client != null) {
    client.disconnect();
      }
  }
    }

    @Test
    public void testChannelLeaveClosedChannel() throws Exception {
  final String channelName = "leaveClosedChannelTest";
  final String user = "daffy";
  createChannel(channelName);
  ClientGroup group = new ClientGroup(user);

  try {
      txnScheduler.runTask(new TestAbstractKernelRunnable() {
    public void run() {
        Channel channel = getChannel(channelName);
        ClientSession session =
      (ClientSession) dataService.getBinding(user);
        channel.join(session);
        dataService.removeObject(channel);
        try {
      channel.leave(session);
      fail("Expected IllegalStateException");
        } catch (IllegalStateException e) {
      System.err.println(e);
        }
    }
      }, taskOwner);
 
  } finally {
      group.disconnect(false);
  }
    }

    @Test
    public void testChannelLeaveNullClientSession() throws Exception {
 
  txnScheduler.runTask(new TestAbstractKernelRunnable() {
      public void run() {
    Channel channel =
        channelService.createChannel("x", null, Delivery.RELIABLE);
    try {
        channel.leave((ClientSession) null);
        fail("Expected NullPointerException");
    } catch (NullPointerException e) {
        System.err.println(e);
    }
      }
  }, taskOwner);
    }

    @Test
    @IntegrationTest
    public void testChannelLeaveSessionNotJoined() throws Exception {
  final String channelName = "leaveTest";
  createChannel(channelName);
  ClientGroup group = new ClientGroup(someUsers);
 
  try {
      txnScheduler.runTask(new TestAbstractKernelRunnable() {
    public void run() {
        Channel channel = getChannel(channelName);
 
        ClientSession moe =
      (ClientSession) dataService.getBinding(MOE);
        channel.join(moe);

        try {
      ClientSession larry =
          (ClientSession) dataService.getBinding(LARRY);
      channel.leave(larry);
      System.err.println("leave of non-member session returned");
     
        } catch (Exception e) {
      System.err.println(e);
      fail("test failed with exception: " + e);
        }
       
    }
       }, taskOwner);

      Thread.sleep(100);
     
      txnScheduler.runTask(new TestAbstractKernelRunnable() {
    public void run() {
        Channel channel = getChannel(channelName);
 
        ClientSession moe =
      (ClientSession) dataService.getBinding(MOE);

        ClientSession larry =
      (ClientSession) dataService.getBinding(LARRY);
       
        Set<ClientSession> sessions = getSessions(channel);
        System.err.println(
      "sessions set (should only have moe): " + sessions);
        if (sessions.size() != 1) {
      fail("Expected 1 session, got " +
           sessions.size());
        }

        if (! sessions.contains(moe)) {
      fail("Expected session: " + moe);
        }
        dataService.removeObject(channel);
    }
       }, taskOwner);
     
  } finally {
      group.disconnect(false);
  }
    }

    @Test
    @IntegrationTest
    public void testChannelLeave() throws Exception {
  final String channelName = "leaveTest";
  createChannel(channelName);
  ClientGroup group = new ClientGroup(someUsers);
 
  try {
      Thread.sleep(1000);
      int count = getObjectCount();
      joinUsers(channelName, someUsers);
      checkUsersJoined(channelName, someUsers);

      for (final String user : someUsers) {
   
    txnScheduler.runTask(new TestAbstractKernelRunnable() {
        public void run() {
      Channel channel = getChannel(channelName);
      ClientSession session = getSession(user);
      channel.leave(session);
        }}, taskOwner);

    Thread.sleep(100);
   
    txnScheduler.runTask(new TestAbstractKernelRunnable() {
        public void run() {
      Channel channel = getChannel(channelName);
      ClientSession session = getSession(user);
      if (getSessions(channel).contains(session)) {
          fail("Failed to remove session: " + session);
      }}}, taskOwner);
      }
     
      Thread.sleep(1000);
      assertEquals(count, getObjectCount());
     
      txnScheduler.runTask(new TestAbstractKernelRunnable() {
    public void run() {
        Channel channel = getChannel(channelName);

        int numJoinedSessions = getSessions(channel).size();
        if (numJoinedSessions != 0) {
      fail("Expected no sessions, got " + numJoinedSessions);
        }
        System.err.println("All sessions left");
       
        dataService.removeObject(channel);
    }}, taskOwner);

     
  } finally {
      group.disconnect(false);
  }

    }

    // -- Test Channel.leaveAll --

    @Test
    public void testChannelLeaveAllNoTxn() throws Exception {
  Channel channel = createChannel();
  try {
      channel.leaveAll();
      fail("Expected TransactionNotActiveException");
  } catch (TransactionNotActiveException e) {
      System.err.println(e);
  }
    }

    @Test
    public void testChannelLeaveAllClosedChannel() throws Exception {
  txnScheduler.runTask(new TestAbstractKernelRunnable() {
      public void run() {
    Channel channel =
        channelService.createChannel("x", null, Delivery.RELIABLE);
    dataService.removeObject(channel);
    try {
        channel.leaveAll();
        fail("Expected IllegalStateException");
    } catch (IllegalStateException e) {
        System.err.println(e);
    }
      }
  }, taskOwner);
    }

    @Test
    public void testChannelLeaveAllNoSessionsJoined() throws Exception {
  txnScheduler.runTask(new TestAbstractKernelRunnable() {
      public void run() {
    Channel channel =
        channelService.createChannel("x", null, Delivery.RELIABLE);
    channel.leaveAll();
    System.err.println(
        "leaveAll succeeded with no sessions joined");
      }
  }, taskOwner);
    }

    @Test
    public void testChannelLeaveAll() throws Exception {
  final String channelName = "leaveAllTest";
  createChannel(channelName);
  ClientGroup group = new ClientGroup(someUsers);
 
  try {
      joinUsers(channelName, someUsers);
      checkUsersJoined(channelName, someUsers);
      leaveAll(channelName);
      Thread.sleep(100);
      checkUsersJoined(channelName, noUsers);
      for (DummyClient client : group.getClients()) {
    client.assertLeftChannel(channelName);
      }
  } finally {
      group.disconnect(false);
  }
    }

    @Test
    public void testChannelLeaveAllThenJoin() throws Exception {
  final String channelName = "leaveAllTest";
  createChannel(channelName);
  ClientGroup group = new ClientGroup(someUsers);
 
  try {
      joinUsers(channelName, someUsers);
      checkUsersJoined(channelName, someUsers);
      leaveAll(channelName);
      Thread.sleep(200);
      checkUsersJoined(channelName, noUsers);
      for (DummyClient client : group.getClients()) {
    client.assertLeftChannel(channelName);
      }
      joinUsers(channelName, someUsers);
      checkUsersJoined(channelName, someUsers);
      sendMessagesToChannel(channelName, 2);
      checkChannelMessagesReceived(group, channelName, 2);
     
  } finally {
      group.disconnect(false);
  }
    }
   
    @Test
    public void testChannelLeaveAllThenSend() throws Exception {
  final String channelName = "leaveAllTest";
  createChannel(channelName);
  ClientGroup group = new ClientGroup(someUsers);
 
  try {
      joinUsers(channelName, someUsers);
      checkUsersJoined(channelName, someUsers);
      leaveAll(channelName);
      Thread.sleep(200);
      checkUsersJoined(channelName, noUsers);
      for (DummyClient client : group.getClients()) {
    client.assertLeftChannel(channelName);
      }
      sendMessagesToChannel(channelName, 2);
      checkChannelMessagesReceived(group, channelName, 0);
     
  } finally {
      group.disconnect(false);
  }
    }
   
    @Test
    @IntegrationTest
    public void testChannelLeaveAllWithCoordinatorCrash() throws Exception {
  final String channelName = "leaveAllTest";
  // Create channel on coordinator node
  SgsTestNode coordinatorNode = addNode();
  createChannel(channelName, null, coordinatorNode);
  // Clients will log into server node.
  ClientGroup group = new ClientGroup(serverNode.getAppPort(), someUsers);
 
  try {
      joinUsers(channelName, someUsers);
      checkUsersJoined(channelName, someUsers);
      holdChannelServerMethodToNode(serverNode, "close");
      leaveAll(channelName);
      waitForHeldChannelServerMethodToNode(serverNode);
      coordinatorNode.shutdown(false);
      Thread.sleep(2000);
      checkUsersJoined(channelName, noUsers);
      for (DummyClient client : group.getClients()) {
    client.assertLeftChannel(channelName);
      }
     
  } finally {
      group.disconnect(false);
  }
    }
   
    // -- Test Channel.send --

    private static byte[] testMessage = new byte[] {'x'};

    @Test
    public void testChannelSendAllNoTxn() throws Exception {
  Channel channel = createChannel();
  try {
      channel.send(null, ByteBuffer.wrap(testMessage));
      fail("Expected TransactionNotActiveException");
  } catch (TransactionNotActiveException e) {
      System.err.println(e);
  }
    }

    @Test
    public void testChannelSendAllClosedChannel() throws Exception {
  final String channelName = "test";
  createChannel(channelName);
  txnScheduler.runTask(new TestAbstractKernelRunnable() {
      public void run() {
    Channel channel = getChannel(channelName);
    dataService.removeObject(channel);
    try {
        channel.send(null, ByteBuffer.wrap(testMessage));
        fail("Expected IllegalStateException");
    } catch (IllegalStateException e) {
        System.err.println(e);
    }
      }
  }, taskOwner);
    }

    @Test
    public void testChannelSendNullMessage() throws Exception {
  final String channelName = "test";
  createChannel(channelName);
  txnScheduler.runTask(new TestAbstractKernelRunnable() {
      public void run() {
    Channel channel = getChannel(channelName);
    try {
        channel.send(null, null);
        fail("Expected NullPointerException");
    } catch (NullPointerException e) {
        System.err.println(e);
    }
      }
  }, taskOwner);
    }

    @Test
    public void testChannelSend() throws Exception {
 
  String channelName = "test";
  createChannel(channelName);
  ClientGroup group = new ClientGroup(sevenDwarfs);
  try {
      joinUsers(channelName, sevenDwarfs);
      sendMessagesToChannel(channelName, 3);
      checkChannelMessagesReceived(group, channelName, 3);
  } finally {
      group.disconnect(false);
  }
    }

    @Test
    public void testChannelSendMultipleNodes() throws Exception {
  addNodes(3);
  ConfigurableNodePolicy.setRoundRobinPolicy();
  testChannelSend();
    }

    @Test
    @IntegrationTest
    public void testChannelSendToNewMembersAfterAllNodesFail()
  throws Exception
    {
  addNodes(3);
  ConfigurableNodePolicy.setRoundRobinPolicy();
  String channelName = "test";
  createChannel(channelName);
  Thread.sleep(1000);
  int count = getObjectCount();
  printServiceBindings("after channel create");
  ClientGroup group1 = new ClientGroup(sevenDwarfs);
  joinUsers(channelName, sevenDwarfs);
  sendMessagesToChannel(channelName, 3);
  checkChannelMessagesReceived(group1, channelName, 3);
  printServiceBindings("after users joined");
  System.err.println("simulate watchdog server crash...");
  tearDown(false);
  setUp(false);
        Thread.sleep(1000);
  addNodes(3);
  ConfigurableNodePolicy.setRoundRobinPolicy();
  Thread.sleep(2000);
  int afterCount = getObjectCount();
  for (int i = 0; i < 2; i++) {
      // Make sure that previous sessions were cleaned up.
      if (count == afterCount) {
    break;
      } else {
    Thread.sleep(1000);
    afterCount = getObjectCount();
      }
      System.err.println("retry: count: " + count +
             ", afterCount: " + afterCount);
  }
  printServiceBindings("after recovery");
  ClientGroup group2 = new ClientGroup(someUsers);
  try {
      joinUsers(channelName, someUsers);
      sendMessagesToChannel(channelName, 2);
      checkChannelMessagesReceived(group2, channelName, 2);
      group1.checkMembership(channelName, false);
      assertEquals(count, afterCount);
  } finally {
      group2.disconnect(false);
  }
    }

    @Test
    @IntegrationTest
    public void testChannelSendToExistingMembersAfterNodeFailure()
  throws Exception
    {
  SgsTestNode coordinatorNode = addNode();
  SgsTestNode otherNode = addNode();
  ConfigurableNodePolicy.setRoundRobinPolicy();
 
  // create channels on specific node which will be the coordinator node
  String[] channelNames = new String[] {"channel1", "channel2"};
  for (String channelName : channelNames) {
      createChannel(channelName, null, coordinatorNode);
  }
 
  ClientGroup group = new ClientGroup(sevenDwarfs);
  try {
      for (String channelName : channelNames) {
    joinUsers(channelName, sevenDwarfs);
    sendMessagesToChannel(channelName, 2);
    checkChannelMessagesReceived(group, channelName, 2);
      }
      printServiceBindings("after users joined");
      // nuke non-coordinator node
      System.err.println("shutting down other node: " + otherNode);
      int otherNodePort = otherNode.getAppPort();
      shutdownNode(otherNode);
            Thread.sleep(1000);
      // remove disconnected sessions from client group
      System.err.println("remove disconnected sessions");
      ClientGroup disconnectedSessionsGroup =
    group.removeSessionsFromGroup(otherNodePort);
      // send messages to sessions that are left
      System.err.println("send messages to remaining members");
      for (String channelName : channelNames) {
    sendMessagesToChannel(channelName, 2);
    checkChannelMessagesReceived(group, channelName, 2);
      }
      if (!disconnectedSessionsGroup.isDisconnectedGroup()) {
    fail("expected disconnected client(s)");
      }

      for (String channelName : channelNames) {
    disconnectedSessionsGroup.checkMembership(channelName, false);
      }
     
  } finally {
      printServiceBindings("before group disconnect");
      group.disconnect(false);
  }
    }

    @Test
    @IntegrationTest
    public void testChannelSendToExistingMembersAfterCoordinatorFailure()
  throws Exception
    {
  SgsTestNode coordinatorNode = addNode();
  SgsTestNode otherNode = addNode();
  ConfigurableNodePolicy.setRoundRobinPolicy();
 
  // create channels on specific node which will be the coordinator node
  String[] channelNames = new String[] {"channel1", "channel2"};
  for (String channelName : channelNames) {
      createChannel(channelName, null, coordinatorNode);
  }

  ClientGroup group = new ClientGroup(sevenDwarfs);
  try {
      for (String channelName : channelNames) {
    joinUsers(channelName, sevenDwarfs);
    sendMessagesToChannel(channelName, 2);
    checkChannelMessagesReceived(group, channelName, 2);
      }
      printServiceBindings("after users joined");
      // nuke coordinator node
      System.err.println("shutting down coordinator: " + coordinatorNode);
      int coordinatorNodePort = coordinatorNode.getAppPort();
      shutdownNode(coordinatorNode);
            Thread.sleep(1000);
      // remove disconnected sessions from client group
      System.err.println("remove disconnected sessions");
      ClientGroup disconnectedSessionsGroup =
    group.removeSessionsFromGroup(coordinatorNodePort);
      // send messages to sessions that are left
      System.err.println("send messages to remaining members");
      for (String channelName : channelNames) {
    sendMessagesToChannel(channelName, 2);
    checkChannelMessagesReceived(group, channelName, 2);
      }
      if (!disconnectedSessionsGroup.isDisconnectedGroup()) {
    fail("expected disconnected client(s)");
      }

      for (String channelName : channelNames) {
    disconnectedSessionsGroup.checkMembership(channelName, false);
      }
     
  } finally {
      printServiceBindings("before group disconnect");
      group.disconnect(false);
  }
    }

    // -- Test client send to channel (with and without ChannelListener) --

    @Test
    @IntegrationTest
    public void testNonMemberClientSendToChannelWithNoListener ()
  throws Exception
    {
  String channelName = "foo";
  createChannel(channelName);
  ClientGroup group = new ClientGroup(someUsers);
  DummyClient nonMember = newClient();
  try {
      joinUsers(channelName, someUsers);
      DummyClient moe = group.getClient(MOE);
      moe.assertJoinedChannel(channelName);
      nonMember.sendChannelMessage(channelName, 0);
      Thread.sleep(2000);
      for (DummyClient client : group.getClients()) {
    if (client.nextChannelMessage() != null) {
        fail(client.name + " received message!");
    }
      }
  } finally {
      group.disconnect(false);
      nonMember.disconnect();
  }
    }

    @Test
    @IntegrationTest
    public void testNonMemberClientSendToChannelWithForwardingListener ()
  throws Exception
    {
  String channelName = "foo";
  createChannel(channelName, new DummyChannelListener(channelName, true));
  ClientGroup group = new ClientGroup(someUsers);
  DummyClient nonMember = newClient();
  try {
      joinUsers(channelName, someUsers);
      DummyClient moe = group.getClient(MOE);
      moe.assertJoinedChannel(channelName);
      nonMember.sendChannelMessage(channelName, 0);
      Thread.sleep(2000);
      for (DummyClient client : group.getClients()) {
    if (client.nextChannelMessage() != null) {
        fail(client.name + " received message!");
    }
      }
  } finally {
      group.disconnect(false);
      nonMember.disconnect();
  }
    }

    @Test
    @IntegrationTest
    public void testClientSendToChannelWithNoListener() throws Exception {
  String channelName = "foo";
  createChannel(channelName);
  ClientGroup group = new ClientGroup(someUsers);
  try {
      joinUsers(channelName, someUsers);
      DummyClient moe = group.getClient(MOE);
      moe.assertJoinedChannel(channelName);
      moe.sendChannelMessage(channelName, 0);
      Thread.sleep(2000);
      boolean fail = false;
      for (DummyClient client : group.getClients()) {
    if (client.nextChannelMessage() == null) {
        System.err.println(client.name + " did not receive message!");
        fail = true;
    }
      }
      if (fail) {
    fail("test failed; one or more clients did not get message");
      }
  } finally {
      group.disconnect(false);
  }
    }

    @Test
    @IntegrationTest
    public void testClientSendToChannelWithForwardingListener()
  throws Exception
    {
  String channelName = "foo";
  createChannel(channelName, new DummyChannelListener(channelName, true));
  ClientGroup group = new ClientGroup(someUsers);
  try {
      joinUsers(channelName, someUsers);
      DummyClient moe = group.getClient(MOE);
      moe.assertJoinedChannel(channelName);
      moe.sendChannelMessage(channelName, 0);
      Thread.sleep(2000);
      boolean fail = false;
      for (DummyClient client : group.getClients()) {
    if (client.nextChannelMessage() == null) {
        System.err.println(client.name + " did not receive message!");
        fail = true;
    }
      }
      if (fail) {
    fail("test failed; one or more clients did not get message");
      }
  } finally {
      group.disconnect(false);
  }
    }

    @Test
    @IntegrationTest
    public void testClientSendToChannelWithRejectingListener()
  throws Exception
    {
  String channelName = "foo";
  createChannel(channelName, new DummyChannelListener(channelName, false));
  ClientGroup group = new ClientGroup(someUsers);
  try {
      joinUsers(channelName, someUsers);
      DummyClient moe = group.getClient(MOE);
      moe.assertJoinedChannel(channelName);
      moe.sendChannelMessage(channelName, 0);
      Thread.sleep(2000);
      boolean fail = false;
      for (DummyClient client : group.getClients()) {
    if (client.nextChannelMessage() != null) {
        System.err.println(client.name + " received message!");
        fail = true;
    }
      }
      if (fail) {
    fail("test failed; one or more clients received message");
      }
  } finally {
      group.disconnect(false);
  }
    }
   
    @Test
    @IntegrationTest
    public void testClientSendToChannelWithFilteringListener()
  throws Exception
    {
  String channelName = "foo";
  createChannel(channelName, new FilteringChannelListener(channelName));
  ClientGroup group = new ClientGroup(someUsers);
  try {
      joinUsers(channelName, someUsers);
      DummyClient moe = group.getClient(MOE);
      moe.assertJoinedChannel(channelName);
      int numMessages = 10;
      for (int i = 0; i < numMessages; i++) {
    moe.sendChannelMessage(channelName, i);
      }
      Thread.sleep(4000);
      boolean fail = false;
      for (int i = 0; i < numMessages / 2; i++) {
    for (DummyClient client : group.getClients()) {
        MessageInfo info = client.nextChannelMessage();
        if (info == null) {
      System.err.println(
          client.name +
          " should have received message: " + i * 2);
      fail = true;
        } else {
      System.err.println(
         client.name + " received message: " + info.seq);
      if (info.seq % 2 != 0) {
          System.err.println("odd numbered message received!");
          fail = true;
      }
        }
    }
      }
      if (fail) {
    fail("test failed; see output");
      }
  } finally {
      group.disconnect(false);
  }
    }

    @Test
    public void testClientSendToChannelValidatingWrappedClientSession()
  throws Exception
    {
  final String channelName = "foo";
  final String user = "dummy";
  final String listenerName = "ValidatingChannelListener";
  DummyClient client = new DummyClient(user);
  client.connect(port).login();

  // Create a channel with a ValidatingChannelListener and join the
  // client to the channel.
  txnScheduler.runTask(new TestAbstractKernelRunnable() {
      public void run() {
    ChannelListener listener =
        new ValidatingChannelListener();
    dataService.setBinding(listenerName, listener);
    ClientSession session =
        (ClientSession) dataService.getBinding(user);
    Channel channel =
        channelService.createChannel(
      channelName, listener, Delivery.RELIABLE);
    channel.join(session);
      }
  }, taskOwner);

  // Wait for the client to join, and then send a channel message.
  client.assertJoinedChannel(channelName);
  client.sendChannelMessage(channelName, 0);

  // Validate that the session passed to the handleChannelMessage
  // method was a wrapped ClientSession.
  txnScheduler.runTask(new TestAbstractKernelRunnable() {
      public void run() {
    ValidatingChannelListener listener = (ValidatingChannelListener)
        dataService.getBinding(listenerName);
    ClientSession session =
        (ClientSession) dataService.getBinding(user);
    listener.validateSession(session);
    System.err.println("sessions are equal");
      }
  }, taskOwner);
    }

    @Test
    @IntegrationTest
    public void testJoinLeavePerformance() throws Exception {
  final String channelName = "perf";
  createChannel(channelName);
  String user = "dummy";
  DummyClient client = new DummyClient(user);
  client.connect(port).login();

  final String sessionKey = user;
  isPerformanceTest = true;
  int numIterations = 100;
  long startTime = System.currentTimeMillis();
  for (int i = 0; i < numIterations; i++) {
      txnScheduler.runTask(new TestAbstractKernelRunnable() {
    public void run() {
        Channel channel = channelService.getChannel(channelName);
        DataManager dataManager = AppContext.getDataManager();
        ClientSession session = (ClientSession)
      dataManager.getBinding(sessionKey);
        channel.join(session);
        channel.leave(session);
    }}, taskOwner);
  }
  long endTime = System.currentTimeMillis();
  System.err.println("join/leave, iterations: " + numIterations +
         ", elapsed time: " + (endTime - startTime) +
         " ms.");
    }

    // -- Test Channel.close --

    @Test
    public void testChannelCloseNoTxn() throws Exception {
  Channel channel = createChannel();
  try {
      dataService.removeObject(channel);
      fail("Expected TransactionNotActiveException");
  } catch (TransactionNotActiveException e) {
      System.err.println(e);
  }
    }

    @Test
    @IntegrationTest
    public void testChannelClose() throws Exception {
  final String channelName = "closeTest";
  int count = getObjectCount();
  createChannel(channelName);
  printServiceBindings("after channel create");
  txnScheduler.runTask(new TestAbstractKernelRunnable() {
      public void run() {
    Channel channel = getChannel(channelName);
    dataService.removeObject(channel);
      }
  }, taskOwner);
  Thread.sleep(100);
  txnScheduler.runTask(new TestAbstractKernelRunnable() {
      public void run() {
    Channel channel = getChannel(channelName);
    if (getChannel(channelName) != null) {
        fail("obtained closed channel");
    }
      }
  }, taskOwner);
  printServiceBindings("after channel close");
        assertEquals(count, getObjectCount());
    }

    @Test
    public void testChannelCloseDoesNotRemoveManagedChannelListener()
  throws Exception
    {
  testChannelCloseWithManagedChanneListener(false);
   
   
    @Test
    public void testChannelCloseAfterRemovingManagedChannelListener()
  throws Exception
    {
  testChannelCloseWithManagedChanneListener(true);
    }

    private void testChannelCloseWithManagedChanneListener(
  final boolean removeListener)
  throws Exception
    {
  final String channelName = "closeTest";
  final String listenerName = channelName + ".listener";
  int count = getObjectCount();
  // Create channel with managed ChannelListener.
  txnScheduler.runTask(new TestAbstractKernelRunnable() {
      public void run() {
    ManagedChannelListener listener = new ManagedChannelListener();
    Channel channel = AppContext.getChannelManager().
        createChannel(channelName, listener, Delivery.RELIABLE);
    DataManager dataManager = AppContext.getDataManager();
    dataManager.setBinding(channelName, channel);
    dataManager.setBinding(listenerName, listener);
      }
  }, taskOwner);
  printServiceBindings("after channel create");
  // Remove managed channel listener and close channel.
  txnScheduler.runTask(new TestAbstractKernelRunnable() {
      public void run() {
    DataManager dataManager = AppContext.getDataManager();
    ManagedChannelListener listener = (ManagedChannelListener)
        dataManager.getBinding(listenerName);
    Channel channel = (Channel) dataManager.getBinding(channelName);
    dataManager.removeBinding(listenerName);
    if (removeListener) {
        dataManager.removeObject(listener);
    }
    dataManager.removeObject(channel);
      }
  }, taskOwner);
  Thread.sleep(100);
  txnScheduler.runTask(new TestAbstractKernelRunnable() {
      public void run() {
    Channel channel = getChannel(channelName);
    if (getChannel(channelName) != null) {
        fail("obtained closed channel");
    }
      }
  }, taskOwner);
  printServiceBindings("after channel close");
  // The object count should be equal if the listener was removed and
  // be one more if the listener was not removed.  If the object
  // count is not what is expected, then channel data structures were
  // not properly cleaned up (possibly due to the fact that obtaining
  // the managed channel listener threw an ObjectNotFoundException).
        assertEquals(count  + (removeListener ? 0 : 1), getObjectCount());
    }
   
    @Test
    @IntegrationTest
    public void testChannelCloseWithMembers() throws Exception {
  final String channelName = "closeTest";
  String user = "user";
  DummyClient client = new DummyClient(user);
  client.connect(serverNode.getAppPort()).login();
  try {
      int count = getObjectCount();
      createChannel(channelName);
      printServiceBindings("after channel create");
      joinUsers(channelName, user);
      client.assertJoinedChannel(channelName);
      client.sendChannelMessage(channelName, 0);
      checkChannelMessagesReceived(client, channelName, 1);
 
      txnScheduler.runTask(new TestAbstractKernelRunnable() {
    public void run() {
        Channel channel = getChannel(channelName);
        dataService.removeObject(channel);
    } }, taskOwner);
      Thread.sleep(100);
      txnScheduler.runTask(new TestAbstractKernelRunnable() {
          public void run() {
        Channel channel = getChannel(channelName);
        if (getChannel(channelName) != null) {
      fail("obtained closed channel");
        }
    } }, taskOwner);
      printServiceBindings("after channel close");
      Thread.sleep(6000);
      printServiceBindings("after sleep");
      assertEquals(count, getObjectCount());
  } finally {
      client.disconnect();
  }
    }
   
    @Test
    @IntegrationTest
    public void testSessionRemovedFromChannelOnLogout() throws Exception {
  String channelName = "test";
  createChannel(channelName);
  int count = getObjectCount();
  ClientGroup group = new ClientGroup(someUsers);

  try {
      joinUsers(channelName, someUsers);
      Thread.sleep(500);
      group.checkMembership(channelName, true);
      group.disconnect(true);
      Thread.sleep(WAIT_TIME); // this is necessary, and unfortunate...
      group.checkMembership(channelName, false);
      assertEquals(count, getObjectCount());
     
  } catch (RuntimeException e) {
      System.err.println("unexpected failure");
      e.printStackTrace();
      printServiceBindings("after exception");
      fail("unexpected failure: " + e);
  } finally {
      group.disconnect(false);
  }
    }

    @Test
    @IntegrationTest
    public void testSessionsRemovedOnRecovery() throws Exception {
  String channelName = "test";
  createChannel(channelName);
  int count = getObjectCount();
  ClientGroup group = new ClientGroup(someUsers);
 
  try {
      joinUsers(channelName, someUsers);
      Thread.sleep(500);
      group.checkMembership(channelName, true);
      printServiceBindings("after users joined");

      // simulate crash
      System.err.println("simulate watchdog server crash...");
      tearDown(false);
      setUp(false);

      Thread.sleep(WAIT_TIME); // await recovery actions
      group.checkMembership(channelName, false);
      assertEquals(count, getObjectCount());
      printServiceBindings("after recovery");

  } catch (RuntimeException e) {
      System.err.println("unexpected failure");
      e.printStackTrace();
      fail("unexpected failure: " + e);
  } finally {
      printServiceBindings("before group disconnect");
      group.disconnect(false);
  }
 
    }

    @Test
    @IntegrationTest
    public void testSendFromClientSessionWithDelayedJoin() throws Exception {
  String user = "user";
  String channelName = "test";
  // Create channel with coordinator on server node.
  createChannel(channelName);
  SgsTestNode userNode = addNode();
  DummyClient client = new DummyClient(user);
  client.connect(userNode.getAppPort()).login();
  try {
      holdChannelServerMethodToNode(userNode, "join");
      joinUsers(channelName, user);
      waitForHeldChannelServerMethodToNode(userNode);
      for (int i = 0; i < 3; i++) {
    client.sendChannelMessage(channelName, i);
      }
      Thread.sleep(1000);
      releaseChannelServerMethodHeld(userNode);
      checkChannelMessagesReceived(client, channelName, 3);
  } finally {
      client.disconnect();
  }
    }

    @Test
    @IntegrationTest
    public void testSendFromClientSessionWithDelayedLeave() throws Exception {
  String user = "user";
  String channelName = "test";
  // Create channel with coordinator on server node.
  createChannel(channelName);
  SgsTestNode userNode = addNode();
  DummyClient client = new DummyClient(user);
  client.connect(userNode.getAppPort()).login();
  try {
      joinUsers(channelName, user);
      client.assertJoinedChannel(channelName);
      client.sendChannelMessage(channelName, 0);
      checkChannelMessagesReceived(client, channelName, 1);
      holdChannelServerMethodToNode(userNode, "leave");
      leaveUsers(channelName, user);
      waitForHeldChannelServerMethodToNode(userNode);
      client.sendChannelMessage(channelName, 0);
      Thread.sleep(1000);
      releaseChannelServerMethodHeld(userNode);
      client.assertLeftChannel(channelName);
      assertNull(client.nextChannelMessage());
  } finally {
      client.disconnect();
  }
    }

    @Test
    @IntegrationTest
    public void testSendFromClientSessionWithDelayedJoinLeave()
  throws Exception
    {
  String user = "user";
  String channelName = "test";
  // Create channel with coordinator on server node.
  createChannel(channelName);
  SgsTestNode userNode = addNode();
  DummyClient client = new DummyClient(user);
  client.connect(userNode.getAppPort()).login();
  try {
      holdChannelServerMethodToNode(userNode, "join");
      joinUsers(channelName, user);
      waitForHeldChannelServerMethodToNode(userNode);
      leaveUsers(channelName, user);
      client.sendChannelMessage(channelName, 0);
      releaseChannelServerMethodHeld(userNode);
      client.assertLeftChannel(channelName);
      Thread.sleep(3000);
      assertNull(client.nextChannelMessage());
  } finally {
      client.disconnect();
  }
    }
   
    @Test
    @IntegrationTest
    public void testSendFromClientSessionWithDelayedJoinLeaveJoin()
  throws Exception
    {
  String user = "user";
  String channelName = "test";
  // Create channel with coordinator on server node.
  createChannel(channelName);
  SgsTestNode userNode = addNode();
  DummyClient client = new DummyClient(user);
  client.connect(userNode.getAppPort()).login();
  try {
      holdChannelServerMethodToNode(userNode, "join");
      joinUsers(channelName, user);
      waitForHeldChannelServerMethodToNode(userNode);
      leaveUsers(channelName, user);
      joinUsers(channelName, user);
      for (int i = 0; i < 2; i++) {
    client.sendChannelMessage(channelName, i);
      }
      Thread.sleep(1000);
      releaseChannelServerMethodHeld(userNode);
      client.assertJoinedChannel(channelName);
      checkChannelMessagesReceived(client, channelName, 2);
      for (int i = 0; i < 2; i++) {
    client.sendChannelMessage(channelName, i);
      }
      checkChannelMessagesReceived(client, channelName, 2);
  } finally {
      client.disconnect();
  }
    }
    @Test
    @IntegrationTest
    public void testSendFromClientSessionWithDelayedJoinAndCoordinatorCrash()
  throws Exception
    {
  String user = "user";
  String channelName = "test";
  SgsTestNode coordinatorNode = addNode();
  // Create channel with coordinator on coordinator node.
  createChannel(channelName, null, coordinatorNode);
  SgsTestNode userNode = addNode();
  DummyClient client = new DummyClient(user);
  client.connect(userNode.getAppPort()).login();
  try {
      holdChannelServerMethodToNode(userNode, "join");
      joinUsers(channelName, user);
      waitForHeldChannelServerMethodToNode(userNode);
      for (int i = 0; i < 3; i++) {
    client.sendChannelMessage(channelName, i);
      }
      Thread.sleep(1000);
      // Shutdown coordinator node.
      coordinatorNode.shutdown(false);
      Thread.sleep(3000);
      checkChannelMessagesReceived(client, channelName, 3);
  } finally {
      client.disconnect();
  }
    }

    @Test
    @IntegrationTest
    public void
  testSendFromClientSessionWithDelayedLeaveAndCoordinatorCrash()
      throws Exception
    {
  String user = "user";
  String channelName = "test";
  SgsTestNode coordinatorNode = addNode();
  // Create channel with coordinator on server node.
  createChannel(channelName, null, coordinatorNode);
  SgsTestNode userNode = addNode();
  DummyClient client = new DummyClient(user);
  client.connect(userNode.getAppPort()).login();
  try {
      joinUsers(channelName, user);
      client.assertJoinedChannel(channelName);
      client.sendChannelMessage(channelName, 0);
      checkChannelMessagesReceived(client, channelName, 1);
      holdChannelServerMethodToNode(userNode, "leave");
      leaveUsers(channelName, user);
      waitForHeldChannelServerMethodToNode(userNode);
      client.sendChannelMessage(channelName, 0);
      Thread.sleep(1000);
      coordinatorNode.shutdown(false);
      Thread.sleep(3000);
      client.assertLeftChannel(channelName);
      assertNull(client.nextChannelMessage());
  } finally {
      client.disconnect();
  }
    }

    @Test
    @IntegrationTest
    public void
  testSendFromClientSessionWithDelayedJoinLeaveAndCoordinatorCrash()
      throws Exception
    {
  String user = "user";
  String channelName = "test";
  SgsTestNode coordinatorNode = addNode();
  // Create channel with coordinator on coordinator node.
  createChannel(channelName, null, coordinatorNode);
  SgsTestNode userNode = addNode();
  DummyClient client = new DummyClient(user);
  client.connect(userNode.getAppPort()).login();
  try {
      holdChannelServerMethodToNode(userNode, "join");
      joinUsers(channelName, user);
      waitForHeldChannelServerMethodToNode(userNode);
      leaveUsers(channelName, user);
      client.sendChannelMessage(channelName, 0);
      Thread.sleep(1000);
      coordinatorNode.shutdown(false);
      Thread.sleep(3000);
      client.assertLeftChannel(channelName);
      assertNull(client.nextChannelMessage());
  } finally {
      client.disconnect();
  }
    }
   
    @Test
    @IntegrationTest
    public void
  testSendFromClientSessionWithDelayedJoinLeaveJoinAndCoordinatorCrash()
      throws Exception
    {
  String user = "user";
  String channelName = "test";
  SgsTestNode coordinatorNode = addNode();
  // Create channel with coordinator on server node.
  createChannel(channelName, null, coordinatorNode);
  SgsTestNode userNode = addNode();
  DummyClient client = new DummyClient(user);
  client.connect(userNode.getAppPort()).login();
  try {
      holdChannelServerMethodToNode(userNode, "join");
      joinUsers(channelName, user);
      waitForHeldChannelServerMethodToNode(userNode);
      leaveUsers(channelName, user);
      joinUsers(channelName, user);
      for (int i = 0; i < 2; i++) {
    client.sendChannelMessage(channelName, i);
      }
      Thread.sleep(1000);
      coordinatorNode.shutdown(false);
      Thread.sleep(3000);
      client.assertJoinedChannel(channelName);
      checkChannelMessagesReceived(client, channelName, 2);
      for (int i = 0; i < 2; i++) {
    client.sendChannelMessage(channelName, i);
      }
      checkChannelMessagesReceived(client, channelName, 2);
  } finally {
      client.disconnect();
  }
    }

    @Test
    @IntegrationTest
    public void testSendWithCoordinatorCrash() throws Exception {
  String user = "user";
  String channelName = "test";
  SgsTestNode coordinatorNode = addNode();
  // Create channel with coordinator on coordinator node.
  createChannel(channelName, null, coordinatorNode);
  SgsTestNode userNode = addNode();
  DummyClient client = new DummyClient(user);
  client.connect(userNode.getAppPort()).login();
  try {
      joinUsers(channelName, user);
      client.assertJoinedChannel(channelName);
      holdChannelServerMethodToNode(userNode, "send");
      for (int i = 0; i < 3; i++) {
    client.sendChannelMessage(channelName, i);
      }
      waitForHeldChannelServerMethodToNode(userNode);
      // Shutdown coordinator node.
      coordinatorNode.shutdown(false);
      Thread.sleep(5000);
      checkChannelMessagesReceived(client, channelName, 3);
  } finally {
      client.disconnect();
  }
    }

    // -- other classes --

    private static class NonSerializableChannelListener
  implements ChannelListener
    {
  NonSerializableChannelListener() {}
 
  public void receivedMessage(
      Channel channel, ClientSession session, ByteBuffer message)
  {
  }
    }

    private static class DummyChannelListener
  implements ChannelListener, Serializable
    {
  private final static long serialVersionUID = 1L;

  private final String name;
  private final boolean allowMessages;
 
  DummyChannelListener() {
      this(null, true);
  }

  DummyChannelListener(String name, boolean allowMessages) {
      this.name = name;
      this.allowMessages = allowMessages;
  }
 
  public void receivedMessage(
      Channel channel, ClientSession session, ByteBuffer message)
  {
      if (name != null) {
    assertEquals(channel,
           AppContext.getChannelManager().getChannel(name));
      }
      if (allowMessages) {
    channel.send(session, message);
      }
  }
    }
   
    private static class FilteringChannelListener
  implements ChannelListener, Serializable
    {
  private final static long serialVersionUID = 1L;

  private final String name;
 
  FilteringChannelListener(String name) {
      this.name = name;
  }
 
  public void receivedMessage(
      Channel channel, ClientSession session, ByteBuffer message)
  {
      if (name != null) {
    assertEquals(channel,
           AppContext.getChannelManager().getChannel(name));
      }

      if (message.getInt() % 2 == 0) {
    message.flip();
    channel.send(session, message);
      }
  }
    }

    private static class ValidatingChannelListener
  implements ChannelListener, Serializable, ManagedObject
    {
  private final static long serialVersionUID = 1L;

  private ManagedReference<ClientSession> sessionRef = null;
 
  ValidatingChannelListener() {
  }

  public void receivedMessage(
      Channel channel, ClientSession session, ByteBuffer message)
  {
      System.err.println(
    "ValidatingChannelListener.receivedMessage: session = " +
    session);
      DataManager dm = AppContext.getDataManager();
      dm.markForUpdate(this);
      sessionRef = dm.createReference(session);
  }

  public void validateSession(ClientSession session) {
      if (this.sessionRef == null) {
    throw new ResourceUnavailableException("sessionRef is null");
      } else {
    System.err.println(
        "ValidatingChannelListener.validateSession: session = " +
        session);
    ClientSession thisSession = sessionRef.get();
    if (! (thisSession instanceof ClientSessionWrapper)) {
        fail("unwrapped session: " + thisSession);
    } else if (! thisSession.equals(session)) {
        fail("sessions not equal: thisSession: " +
       thisSession + ", session: " + session);
    }
      }
  }
    }
   
    private DummyClient newClient() {
  DummyClient client = new DummyClient("dummy");
  client.connect(port).login();
  return client;
    }
   
    private void closeChannel(final String name) throws Exception {

  txnScheduler.runTask(new TestAbstractKernelRunnable() {
      public void run() {
    Channel channel = channelService.getChannel(name);
    dataService.removeObject(channel);
      }}, taskOwner);
    }

    private static class ManagedChannelListener
  extends DummyChannelListener implements ManagedObject
    {
    }
}
TOP

Related Classes of com.sun.sgs.test.impl.service.channel.TestChannelServiceImpl$ManagedChannelListener

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.