Package org.cometd.oort

Source Code of org.cometd.oort.OortObserveCometTest

/*
* Copyright (c) 2008-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.cometd.oort;

import java.net.URI;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import org.cometd.bayeux.Message;
import org.cometd.bayeux.client.ClientSession;
import org.cometd.bayeux.client.ClientSessionChannel;
import org.cometd.bayeux.server.BayeuxServer;
import org.cometd.bayeux.server.LocalSession;
import org.cometd.bayeux.server.ServerChannel;
import org.cometd.bayeux.server.ServerMessage;
import org.cometd.bayeux.server.ServerSession;
import org.cometd.client.BayeuxClient;
import org.cometd.client.ext.AckExtension;
import org.cometd.server.ext.AcknowledgedMessagesExtension;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.junit.Assert;
import org.junit.Test;

public class OortObserveCometTest extends OortTest
{
    public OortObserveCometTest(String serverTransport)
    {
        super(serverTransport);
    }

    @Test
    public void testObserveStartedOortAndExpectToBeObserved() throws Exception
    {
        Server server1 = startServer(0);
        final Oort oort1 = startOort(server1);
        Server server2 = startServer(0);
        Oort oort2 = startOort(server2);

        CountDownLatch latch = new CountDownLatch(1);
        oort2.addCometListener(new CometJoinedListener(latch));

        OortComet oortComet12 = oort1.observeComet(oort2.getURL());
        Assert.assertTrue(oortComet12.waitFor(5000, BayeuxClient.State.CONNECTED));

        Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
        OortComet oortComet21 = oort2.findComet(oort1.getURL());
        Assert.assertNotNull(oortComet21);
        Assert.assertTrue(oortComet21.waitFor(5000, BayeuxClient.State.CONNECTED));

        Assert.assertEquals(1, oort1.getKnownComets().size());
        Assert.assertEquals(1, oort2.getKnownComets().size());
    }

    @Test
    public void testCometACometBConnectedWhenCometAConnectsToCometCThenAlsoCometBConnectsToCometC() throws Exception
    {
        Server serverA = startServer(0);
        Oort oortA = startOort(serverA);
        Server serverB = startServer(0);
        Oort oortB = startOort(serverB);

        CountDownLatch latch = new CountDownLatch(6);
        Oort.CometListener listener = new CometJoinedListener(latch);
        oortA.addCometListener(listener);
        oortB.addCometListener(listener);

        OortComet oortCometAB = oortA.observeComet(oortB.getURL());
        Assert.assertTrue(oortCometAB.waitFor(5000, BayeuxClient.State.CONNECTED));

        Server serverC = startServer(0);
        Oort oortC = startOort(serverC);

        oortC.addCometListener(listener);

        OortComet oortCometAC = oortA.observeComet(oortC.getURL());
        Assert.assertTrue(oortCometAC.waitFor(5000, BayeuxClient.State.CONNECTED));

        // Be sure all the links have established
        Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
        OortComet oortCometBA = oortB.findComet(oortA.getURL());
        Assert.assertTrue(oortCometBA.waitFor(5000, BayeuxClient.State.CONNECTED));
        OortComet oortCometCA = oortC.findComet(oortA.getURL());
        Assert.assertTrue(oortCometCA.waitFor(5000, BayeuxClient.State.CONNECTED));
        OortComet oortCometCB = oortC.findComet(oortB.getURL());
        Assert.assertTrue(oortCometCB.waitFor(5000, BayeuxClient.State.CONNECTED));
        OortComet oortCometBC = oortB.findComet(oortC.getURL());
        Assert.assertTrue(oortCometBC.waitFor(5000, BayeuxClient.State.CONNECTED));

        Assert.assertEquals(2, oortA.getKnownComets().size());
        Assert.assertEquals(2, oortB.getKnownComets().size());
        Assert.assertEquals(2, oortC.getKnownComets().size());
    }

    @Test
    public void testConnectTwoCloudsAndDisconnectOneComet() throws Exception
    {
        // Cloud #1, A and B
        Server serverA = startServer(0);
        Oort oortA = startOort(serverA);
        Server serverB = startServer(0);
        Oort oortB = startOort(serverB);
        CountDownLatch latch1 = new CountDownLatch(2);
        CometJoinedListener listener1 = new CometJoinedListener(latch1);
        oortA.addCometListener(listener1);
        oortB.addCometListener(listener1);
        OortComet oortCometAB = oortA.observeComet(oortB.getURL());
        Assert.assertTrue(oortCometAB.waitFor(5000, BayeuxClient.State.CONNECTED));
        Assert.assertTrue(latch1.await(5, TimeUnit.SECONDS));
        OortComet oortCometBA = oortB.findComet(oortA.getURL());
        Assert.assertTrue(oortCometBA.waitFor(5000, BayeuxClient.State.CONNECTED));

        // Cloud #2, C and D
        Server serverC = startServer(0);
        Oort oortC = startOort(serverC);
        Server serverD = startServer(0);
        Oort oortD = startOort(serverD);
        CountDownLatch latch2 = new CountDownLatch(2);
        CometJoinedListener listener2 = new CometJoinedListener(latch2);
        oortC.addCometListener(listener2);
        oortD.addCometListener(listener2);
        OortComet oortCometCD = oortC.observeComet(oortD.getURL());
        Assert.assertTrue(oortCometCD.waitFor(5000, BayeuxClient.State.CONNECTED));
        Assert.assertTrue(latch2.await(5, TimeUnit.SECONDS));
        OortComet oortCometDC = oortD.findComet(oortC.getURL());
        Assert.assertTrue(oortCometDC.waitFor(5000, BayeuxClient.State.CONNECTED));

        // Now connect A and C, then we must see that A connects to D and B connects to C and D
        CountDownLatch latch3 = new CountDownLatch(8);
        CometJoinedListener listener3 = new CometJoinedListener(latch3);
        oortA.addCometListener(listener3);
        oortB.addCometListener(listener3);
        oortC.addCometListener(listener3);
        oortD.addCometListener(listener3);
        OortComet oortCometAC = oortA.observeComet(oortC.getURL());
        Assert.assertTrue(oortCometAC.waitFor(5000, BayeuxClient.State.CONNECTED));
        Assert.assertTrue(latch3.await(5, TimeUnit.SECONDS));
        OortComet oortCometAD = oortA.findComet(oortD.getURL());
        Assert.assertTrue(oortCometAD.waitFor(5000, BayeuxClient.State.CONNECTED));
        OortComet oortCometBC = oortB.findComet(oortC.getURL());
        Assert.assertTrue(oortCometBC.waitFor(5000, BayeuxClient.State.CONNECTED));
        OortComet oortCometBD = oortB.findComet(oortD.getURL());
        Assert.assertTrue(oortCometBD.waitFor(5000, BayeuxClient.State.CONNECTED));
        OortComet oortCometCA = oortC.findComet(oortA.getURL());
        Assert.assertTrue(oortCometCA.waitFor(5000, BayeuxClient.State.CONNECTED));
        OortComet oortCometCB = oortC.findComet(oortB.getURL());
        Assert.assertTrue(oortCometCB.waitFor(5000, BayeuxClient.State.CONNECTED));
        OortComet oortCometDA = oortD.findComet(oortA.getURL());
        Assert.assertTrue(oortCometDA.waitFor(5000, BayeuxClient.State.CONNECTED));
        OortComet oortCometDB = oortD.findComet(oortB.getURL());
        Assert.assertTrue(oortCometDB.waitFor(5000, BayeuxClient.State.CONNECTED));

        Assert.assertEquals(3, oortA.getKnownComets().size());
        Assert.assertEquals(3, oortB.getKnownComets().size());
        Assert.assertEquals(3, oortC.getKnownComets().size());
        Assert.assertEquals(3, oortD.getKnownComets().size());

        // Remove comet C, then A and B must still be connected to D
        CountDownLatch latch4 = new CountDownLatch(3);
        CometLeftListener listener4 = new CometLeftListener(latch4);
        oortA.addCometListener(listener4);
        oortB.addCometListener(listener4);
        oortD.addCometListener(listener4);
        stopOort(oortC);
        stopServer(serverC);
        Assert.assertTrue(latch4.await(5, TimeUnit.SECONDS));
        Assert.assertTrue(oortCometAC.waitFor(5000, BayeuxClient.State.DISCONNECTED));
        Assert.assertTrue(oortCometCA.waitFor(5000, BayeuxClient.State.DISCONNECTED));
        Assert.assertTrue(oortCometBC.waitFor(5000, BayeuxClient.State.DISCONNECTED));
        Assert.assertTrue(oortCometCB.waitFor(5000, BayeuxClient.State.DISCONNECTED));
        Assert.assertTrue(oortCometDC.waitFor(5000, BayeuxClient.State.DISCONNECTED));
        Assert.assertTrue(oortCometCD.waitFor(5000, BayeuxClient.State.DISCONNECTED));

        Assert.assertEquals(2, oortA.getKnownComets().size());
        Assert.assertEquals(2, oortB.getKnownComets().size());
        Assert.assertEquals(2, oortD.getKnownComets().size());
    }

    @Test
    public void testObserveStartedOortAndDetectStop() throws Exception
    {
        Server server1 = startServer(0);
        Server server2 = startServer(0);

        Oort oort1 = startOort(server1);
        Assert.assertNotNull(oort1.getOortSession());
        Assert.assertTrue(oort1.getOortSession().isConnected());

        Oort oort2 = startOort(server2);

        Assert.assertNull(oort1.observeComet(oort1.getURL()));

        OortComet oortComet12 = oort1.observeComet(oort2.getURL());
        Assert.assertNotNull(oortComet12);
        Assert.assertTrue(oortComet12.waitFor(5000, BayeuxClient.State.CONNECTED));
        Assert.assertEquals(1, oort1.getKnownComets().size());

        OortComet oortComet21 = oort2.observeComet(oort1.getURL());
        Assert.assertNotNull(oortComet21);
        Assert.assertTrue(oortComet21.waitFor(5000, BayeuxClient.State.CONNECTED));

        stopOort(oort2);
        Assert.assertFalse(oort2.getOortSession().isConnected());
        Assert.assertEquals(0, oort2.getKnownComets().size());

        // Stopping oort2 implies oort1 must see it disappearing
        Assert.assertTrue(oortComet12.waitFor(5000, BayeuxClient.State.DISCONNECTED));
        Assert.assertEquals(0, oort1.getKnownComets().size());
    }

    @Test
    public void testObserveNonStartedOortAndDetectStart() throws Exception
    {
        Server server1 = startServer(0);
        String url = (String)server1.getAttribute(OortConfigServlet.OORT_URL_PARAM);
        final BayeuxServer bayeuxServer = (BayeuxServer)server1.getAttribute(BayeuxServer.ATTRIBUTE);
        Oort oort1 = new Oort(bayeuxServer, url);
        oort1.start();

        Server server2 = startServer(0);
        String url2 = (String)server2.getAttribute(OortConfigServlet.OORT_URL_PARAM);
        int port = new URI(url2).getPort();
        stopServer(server2);

        OortComet oortComet12 = oort1.observeComet(url2);
        Assert.assertTrue(oortComet12.waitFor(5000, BayeuxClient.State.REHANDSHAKING));

        server2 = startServer(port);
        Oort oort2 = startOort(server2);
        Assert.assertEquals(oort2.getURL(), url2);

        Assert.assertTrue(oortComet12.waitFor(5000, BayeuxClient.State.CONNECTED));

        stopOort(oort1);
    }

    @Test
    public void testObserveStartedOortAndDetectRestart() throws Exception
    {
        Server server1 = startServer(0);
        Oort oort1 = startOort(server1);

        Server server2 = startServer(0);
        Oort oort2 = startOort(server2);

        OortComet oortComet12 = oort1.observeComet(oort2.getURL());
        Assert.assertTrue(oortComet12.waitFor(5000, BayeuxClient.State.CONNECTED));
        OortComet oortComet21 = oort2.observeComet(oort1.getURL());
        Assert.assertTrue(oortComet21.waitFor(5000, BayeuxClient.State.CONNECTED));

        ServerConnector connector2 = (ServerConnector)server2.getConnectors()[0];
        int port2 = connector2.getLocalPort();
        connector2.stop();

        Assert.assertTrue(oortComet12.waitFor(5000, BayeuxClient.State.UNCONNECTED));

        connector2.setPort(port2);
        connector2.start();

        Assert.assertTrue(oortComet12.waitFor(5000, BayeuxClient.State.CONNECTED));
    }

    @Test
    public void testDeobserve() throws Exception
    {
        Server serverA = startServer(0);
        Oort oortA = startOort(serverA);
        Server serverB = startServer(0);
        Oort oortB = startOort(serverB);
        Server serverC = startServer(0);
        Oort oortC = startOort(serverC);

        CountDownLatch latch1 = new CountDownLatch(6);
        CometJoinedListener listener1 = new CometJoinedListener(latch1);
        oortA.addCometListener(listener1);
        oortB.addCometListener(listener1);
        oortC.addCometListener(listener1);

        OortComet oortCometAB = oortA.observeComet(oortB.getURL());
        Assert.assertTrue(oortCometAB.waitFor(5000, BayeuxClient.State.CONNECTED));
        OortComet oortCometAC = oortA.observeComet(oortC.getURL());
        Assert.assertTrue(oortCometAC.waitFor(5000, BayeuxClient.State.CONNECTED));
        Assert.assertTrue(latch1.await(5, TimeUnit.SECONDS));
        OortComet oortCometBA = oortB.findComet(oortA.getURL());
        Assert.assertTrue(oortCometBA.waitFor(5000, BayeuxClient.State.CONNECTED));
        OortComet oortCometBC = oortB.findComet(oortC.getURL());
        Assert.assertTrue(oortCometBC.waitFor(5000, BayeuxClient.State.CONNECTED));
        OortComet oortCometCA = oortC.findComet(oortA.getURL());
        Assert.assertTrue(oortCometCA.waitFor(5000, BayeuxClient.State.CONNECTED));
        OortComet oortCometCB = oortC.findComet(oortB.getURL());
        Assert.assertTrue(oortCometCB.waitFor(5000, BayeuxClient.State.CONNECTED));

        Assert.assertEquals(2, oortA.getKnownComets().size());
        Assert.assertTrue(oortA.getKnownComets().contains(oortB.getURL()));
        Assert.assertTrue(oortA.getKnownComets().contains(oortC.getURL()));
        Assert.assertEquals(2, oortB.getKnownComets().size());
        Assert.assertTrue(oortB.getKnownComets().contains(oortA.getURL()));
        Assert.assertTrue(oortB.getKnownComets().contains(oortC.getURL()));
        Assert.assertEquals(2, oortC.getKnownComets().size());
        Assert.assertTrue(oortC.getKnownComets().contains(oortA.getURL()));
        Assert.assertTrue(oortC.getKnownComets().contains(oortB.getURL()));

        BayeuxClient clientA = startClient(oortA, null);
        Assert.assertTrue(clientA.waitFor(5000, BayeuxClient.State.CONNECTED));
        // Be sure that disconnecting clientA we do not mess with the known comets
        stopClient(clientA);

        Assert.assertEquals(2, oortA.getKnownComets().size());
        Assert.assertEquals(2, oortB.getKnownComets().size());
        Assert.assertEquals(2, oortC.getKnownComets().size());

        // Deobserve A-B
        CountDownLatch latch2 = new CountDownLatch(2);
        CometLeftListener listener2 = new CometLeftListener(latch2);
        oortA.addCometListener(listener2);
        oortB.addCometListener(listener2);
        OortComet cometAB = oortA.deobserveComet(oortB.getURL());
        Assert.assertSame(oortCometAB, cometAB);
        Assert.assertTrue(oortCometAB.waitFor(5000, BayeuxClient.State.DISCONNECTED));
        latch2.await(5, TimeUnit.SECONDS);
        Assert.assertTrue(oortCometBA.waitFor(5000, BayeuxClient.State.DISCONNECTED));

        // A is now only connected to C
        Assert.assertEquals(1, oortA.getKnownComets().size());
        Assert.assertTrue(oortA.getKnownComets().contains(oortC.getURL()));
        // B is now only connected to C
        Assert.assertEquals(1, oortB.getKnownComets().size());
        Assert.assertTrue(oortB.getKnownComets().contains(oortC.getURL()));
        // C is still connected to A and B
        Assert.assertEquals(2, oortC.getKnownComets().size());
        Assert.assertTrue(oortC.getKnownComets().contains(oortA.getURL()));
        Assert.assertTrue(oortC.getKnownComets().contains(oortB.getURL()));
    }

    @Test
    public void testObserveSameCometWithDifferentURL() throws Exception
    {
        Server serverA = startServer(0);
        Oort oortA = startOort(serverA);
        String urlA = oortA.getURL();
        String urlAA = urlA.replace("localhost", "127.0.0.1");

        OortComet oortCometAB = oortA.observeComet(urlAA);

        Assert.assertTrue(oortCometAB.waitFor(5000, BayeuxClient.State.DISCONNECTED));
    }

    @Test
    public void testObserveDifferentCometWithDifferentURL() throws Exception
    {
        Server serverA = startServer(0);
        Oort oortA = startOort(serverA);
        Server serverB = startServer(0);
        Oort oortB = startOort(serverB);

        String urlB = oortB.getURL();
        String otherURLB = urlB.replace("localhost", "127.0.0.1");

        CountDownLatch latch = new CountDownLatch(2);
        CometJoinedListener listener = new CometJoinedListener(latch);
        oortA.addCometListener(listener);
        oortB.addCometListener(listener);

        OortComet oortCometAB = oortA.observeComet(otherURLB);
        Assert.assertTrue(oortCometAB.waitFor(5000, BayeuxClient.State.CONNECTED));
        Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
        OortComet oortCometBA = oortB.findComet(oortA.getURL());
        Assert.assertTrue(oortCometBA.waitFor(5000, BayeuxClient.State.CONNECTED));

        Assert.assertEquals(1, oortB.getKnownComets().size());

        // Only master URLs are known
        Assert.assertFalse(oortA.getKnownComets().contains(otherURLB));
        Assert.assertTrue(oortA.getKnownComets().contains(urlB));
        // But comets can be known with alias URLs
        OortComet oortCometAB1 = oortA.getComet(urlB);
        Assert.assertNotNull(oortCometAB1);
        OortComet oortCometAB2 = oortA.getComet(otherURLB);
        Assert.assertNotNull(oortCometAB2);
        Assert.assertSame(oortCometAB, oortCometAB1);
        Assert.assertSame(oortCometAB1, oortCometAB2);

        // Now try with the original URL
        OortComet oortCometAB3 = oortA.observeComet(urlB);
        Assert.assertNotNull(oortCometAB3);
        Assert.assertSame(oortCometAB, oortCometAB3);

        // Try with yet another URL
        String anotherURLB = urlB.replace("localhost", "[::1]");
        OortComet oortCometAB4 = oortA.observeComet(anotherURLB);
        Assert.assertTrue(oortCometAB4.waitFor(5000, BayeuxClient.State.DISCONNECTED));
        OortComet oortCometAB5 = oortA.getComet(anotherURLB);
        Assert.assertSame(oortCometAB, oortCometAB5);

        // Now disconnect with the original URL
        OortComet oortCometAB6 = oortA.deobserveComet(urlB);
        Assert.assertNotNull(oortCometAB6);
        Assert.assertTrue(oortCometAB6.waitFor(5000, BayeuxClient.State.DISCONNECTED));
    }

    @Test
    public void testSingleCometsJoinsTheCloud() throws Exception
    {
        Server serverA = startServer(0);
        Oort oortA = startOort(serverA);
        Server serverB = startServer(0);
        Oort oortB = startOort(serverB);
        Server serverC = startServer(0);
        Oort oortC = startOort(serverC);
        Server serverD = startServer(0);
        Oort oortD = startOort(serverD);

        CountDownLatch latch1 = new CountDownLatch(2);
        CometJoinedListener listener1 = new CometJoinedListener(latch1);
        oortA.addCometListener(listener1);
        oortB.addCometListener(listener1);

        // Make a "cloud" connecting A and B
        OortComet oortCometAB = oortA.observeComet(oortB.getURL());
        Assert.assertTrue(oortCometAB.waitFor(5000, BayeuxClient.State.CONNECTED));
        Assert.assertTrue(latch1.await(5, TimeUnit.SECONDS));
        OortComet oortCometBA = oortB.findComet(oortA.getURL());
        Assert.assertTrue(oortCometBA.waitFor(5000, BayeuxClient.State.CONNECTED));

        // Now C wants to join the cloud, and connects to B
        CountDownLatch latch2 = new CountDownLatch(4);
        CometJoinedListener listener2 = new CometJoinedListener(latch2);
        oortA.addCometListener(listener2);
        oortB.addCometListener(listener2);
        oortC.addCometListener(listener2);
        OortComet oortCometCB = oortC.observeComet(oortB.getURL());
        Assert.assertTrue(oortCometCB.waitFor(5000, BayeuxClient.State.CONNECTED));
        Assert.assertTrue(latch2.await(5, TimeUnit.SECONDS));
        OortComet oortCometBC = oortB.findComet(oortC.getURL());
        Assert.assertTrue(oortCometBC.waitFor(5000, BayeuxClient.State.CONNECTED));
        OortComet oortCometAC = oortA.findComet(oortC.getURL());
        Assert.assertTrue(oortCometAC.waitFor(5000, BayeuxClient.State.CONNECTED));
        OortComet oortCometCA = oortC.findComet(oortA.getURL());
        Assert.assertTrue(oortCometCA.waitFor(5000, BayeuxClient.State.CONNECTED));

        // Make sure C is connected to B also
        Assert.assertEquals(2, oortA.getKnownComets().size());
        Assert.assertEquals(2, oortB.getKnownComets().size());
        Assert.assertEquals(2, oortC.getKnownComets().size());

        // Now also D want to join the cloud, and connects to B
        CountDownLatch latch3 = new CountDownLatch(6);
        CometJoinedListener listener3 = new CometJoinedListener(latch3);
        oortA.addCometListener(listener3);
        oortB.addCometListener(listener3);
        oortC.addCometListener(listener3);
        oortD.addCometListener(listener3);
        OortComet oortCometDB = oortD.observeComet(oortB.getURL());
        Assert.assertTrue(oortCometDB.waitFor(5000, BayeuxClient.State.CONNECTED));
        Assert.assertTrue(latch3.await(5, TimeUnit.SECONDS));
        OortComet oortCometBD = oortB.findComet(oortD.getURL());
        Assert.assertTrue(oortCometBD.waitFor(5000, BayeuxClient.State.CONNECTED));
        OortComet oortCometAD = oortA.findComet(oortD.getURL());
        Assert.assertTrue(oortCometAD.waitFor(5000, BayeuxClient.State.CONNECTED));
        OortComet oortCometDA = oortD.findComet(oortA.getURL());
        Assert.assertTrue(oortCometDA.waitFor(5000, BayeuxClient.State.CONNECTED));
        OortComet oortCometDC = oortD.findComet(oortC.getURL());
        Assert.assertTrue(oortCometDC.waitFor(5000, BayeuxClient.State.CONNECTED));
        OortComet oortCometCD = oortC.findComet(oortD.getURL());
        Assert.assertTrue(oortCometCD.waitFor(5000, BayeuxClient.State.CONNECTED));

        // Make sure C is connected to B also
        Assert.assertEquals(3, oortA.getKnownComets().size());
        Assert.assertEquals(3, oortB.getKnownComets().size());
        Assert.assertEquals(3, oortC.getKnownComets().size());
        Assert.assertEquals(3, oortD.getKnownComets().size());
    }

    @Test
    public void testAckExtensionConfiguration() throws Exception
    {
        Server serverA = startServer(0);
        Oort oortA = startOort(serverA);
        stopOort(oortA);
        oortA.setAckExtensionEnabled(true);
        oortA.start();

        BayeuxServer bayeuxServerA = oortA.getBayeuxServer();
        int ackExtensions = 0;
        for (BayeuxServer.Extension extension : bayeuxServerA.getExtensions())
            if (extension instanceof AcknowledgedMessagesExtension)
                ++ackExtensions;
        Assert.assertEquals(1, ackExtensions);

        Server serverB = startServer(0);
        BayeuxServer bayeuxServerB = (BayeuxServer)serverB.getAttribute(BayeuxServer.ATTRIBUTE);
        bayeuxServerB.addExtension(new AcknowledgedMessagesExtension());
        Oort oortB = startOort(serverB);
        stopOort(oortB);
        oortB.setAckExtensionEnabled(true);
        oortB.start();

        ackExtensions = 0;
        for (BayeuxServer.Extension extension : bayeuxServerB.getExtensions())
            if (extension instanceof AcknowledgedMessagesExtension)
                ++ackExtensions;
        Assert.assertEquals(1, ackExtensions);

        CountDownLatch latch1 = new CountDownLatch(2);
        CometJoinedListener listener1 = new CometJoinedListener(latch1);
        oortA.addCometListener(listener1);
        oortB.addCometListener(listener1);

        OortComet oortCometAB = oortA.observeComet(oortB.getURL());
        Assert.assertTrue(oortCometAB.waitFor(5000, BayeuxClient.State.CONNECTED));
        Assert.assertTrue(latch1.await(5, TimeUnit.SECONDS));
        OortComet oortCometBA = oortB.findComet(oortA.getURL());
        Assert.assertTrue(oortCometBA.waitFor(5000, BayeuxClient.State.CONNECTED));

        ackExtensions = 0;
        for (ClientSession.Extension extension : oortCometAB.getExtensions())
            if (extension instanceof AckExtension)
                ++ackExtensions;
        Assert.assertEquals(1, ackExtensions);

        ackExtensions = 0;
        for (ClientSession.Extension extension : oortCometBA.getExtensions())
            if (extension instanceof AckExtension)
                ++ackExtensions;
        Assert.assertEquals(1, ackExtensions);
    }

    @Test
    public void testPublishDuringCometJoined() throws Exception
    {
        Server serverA = startServer(0);
        final Oort oortA = startOort(serverA);
        final BayeuxServer bayeuxServerA = oortA.getBayeuxServer();

        final LocalSession serviceA = bayeuxServerA.newLocalSession("test");
        serviceA.handshake();
        final String channelName = "/test";
        final String data = "data";
        final CountDownLatch joinedLatch = new CountDownLatch(1);
        oortA.addCometListener(new Oort.CometListener.Adapter()
        {
            public void cometJoined(Event event)
            {
                bayeuxServerA.createChannelIfAbsent(channelName).getReference().publish(serviceA, data);
                joinedLatch.countDown();
            }
        });
        oortA.observeChannel(channelName);

        Server serverB = startServer(0);
        Oort oortB = startOort(serverB);
        oortB.observeChannel(channelName);

        BayeuxServer bayeuxServerB = oortB.getBayeuxServer();
        final CountDownLatch latch = new CountDownLatch(1);
        bayeuxServerB.createChannelIfAbsent(channelName).getReference().addListener(new ServerChannel.MessageListener()
        {
            public boolean onMessage(ServerSession from, ServerChannel channel, ServerMessage.Mutable message)
            {
                if (data.equals(message.getData()))
                    latch.countDown();
                return true;
            }
        });

        // Link the nodes
        oortB.observeComet(oortA.getURL());

        Assert.assertTrue(joinedLatch.await(5, TimeUnit.SECONDS));
        Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
    }

    @Test
    public void testConfigureMaxMessageSize() throws Exception
    {
        int maxMessageSize = 1024;
        Map<String, String> options = new HashMap<>();
        options.put("ws.maxMessageSize", String.valueOf(maxMessageSize));
        Server serverA = startServer(0, options);
        Oort oortA = startOort(serverA);
        Server serverB = startServer(0, options);
        Oort oortB = startOort(serverB);

        CountDownLatch latch = new CountDownLatch(2);
        oortA.addCometListener(new CometJoinedListener(latch));
        oortB.addCometListener(new CometJoinedListener(latch));

        OortComet oortComet12 = oortA.observeComet(oortB.getURL());
        Assert.assertTrue(oortComet12.waitFor(5000, BayeuxClient.State.CONNECTED));

        OortComet oortComet21 = oortB.findComet(oortA.getURL());
        Assert.assertNotNull(oortComet21);
        Assert.assertTrue(oortComet21.waitFor(5000, BayeuxClient.State.CONNECTED));

        Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));

        String channelName = "/foo";
        oortA.observeChannel(channelName);
        oortB.observeChannel(channelName);

        BayeuxClient clientA = startClient(oortA, null);
        Assert.assertTrue(clientA.waitFor(5000, BayeuxClient.State.CONNECTED));
        BayeuxClient clientB = startClient(oortB, null);
        Assert.assertTrue(clientB.waitFor(5000, BayeuxClient.State.CONNECTED));

        final AtomicReference<CountDownLatch> messageLatch = new AtomicReference<>(new CountDownLatch(1));
        clientB.getChannel(channelName).subscribe(new ClientSessionChannel.MessageListener()
        {
            public void onMessage(ClientSessionChannel channel, Message message)
            {
                messageLatch.get().countDown();
            }
        });

        // Wait a while to be sure to be subscribed
        Thread.sleep(1000);

        char[] clob = new char[maxMessageSize / 2];
        Arrays.fill(clob, 'w');
        clientA.getChannel(channelName).publish(new String(clob));
        Assert.assertTrue(messageLatch.get().await(5, TimeUnit.SECONDS));

        // Make the message larger than allowed
        messageLatch.set(new CountDownLatch(1));
        clob = new char[maxMessageSize * 2];
        Arrays.fill(clob, 'z');
        clientA.getChannel(channelName).publish(new String(clob));
        Assert.assertFalse(messageLatch.get().await(1, TimeUnit.SECONDS));
    }
}
TOP

Related Classes of org.cometd.oort.OortObserveCometTest

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.